Source code for location

# Part of NHClinical. See LICENSE file for full copyright and licensing details
# -*- coding: utf-8 -*-
import logging

from openerp.osv import orm, fields
from openerp import SUPERUSER_ID


_logger = logging.getLogger(__name__)


[docs]class nh_clinical_location(orm.Model): """ Represents a location where a patient may be located or an activity may take place. There are different types of locations. The most common usage is to have a `hospital` as a parent location to a group of `wards` where each ward is a parent to several beds. The bed location is where the patient can be placed. """ _name = 'nh.clinical.location' _types = [('poc', 'Point of Care'), ('structural', 'Structural'), ('virtual', 'Virtual'), ('pos', 'POS')] _usages = [('bed', 'Bed'), ('bay', 'Bay'), ('ward', 'Ward'), ('room', 'Room'), ('department', 'Department'), ('hospital', 'Hospital')] def _get_pos_id(self, cr, uid, ids, field, args, context=None): res = {} pos_pool = self.pool['nh.clinical.pos'] for location in self.browse(cr, uid, ids, context): pos_location_id = self.search( cr, uid, [['parent_id', '=', False], ['child_ids', 'child_of', location.id]]) pos_location_id = pos_location_id[0] if pos_location_id else False pos_id = pos_pool.search(cr, uid, [['location_id', '=', pos_location_id]]) res[location.id] = pos_id[0] if pos_id else False if not pos_id: _logger.debug("pos_id not found for location '%s', id=%s" % (location.code, location.id)) return res def _pos2location_id(self, cr, uid, ids, context=None): res = [] for pos in self.browse(cr, uid, ids, context): res.extend(self.pool['nh.clinical.location'].search( cr, uid, [['id', 'child_of', pos.location_id.id]])) return res def _is_available(self, cr, uid, ids, field, args, context=None): usages = [usage[0] for usage in self._usages] available_location_ids = self.get_available_location_ids( cr, uid, usages=usages, context=context) res = {} for i in ids: res[i] = i in available_location_ids return res def _get_patient_ids(self, cr, uid, ids, field, args, context=None): res = {} patient_pool = self.pool['nh.clinical.patient'] for loc in self.browse(cr, uid, ids, context=context): res[loc.id] = patient_pool.search( cr, uid, [('current_location_id', 'child_of', loc.id)], context=context ) return res def _get_nurse_follower_ids(self, cr, uid, ids, field, args, context=None): res = {} user_pool = self.pool['res.users'] for loc in self.browse(cr, uid, ids, context=context): res[loc.id] = user_pool.search( cr, uid, [['following_ids', 'in', [p.id for p in loc.patient_ids]], ['groups_id.name', 'in', ['NH Clinical Nurse Group']]], context=context ) return res def _get_hca_follower_ids(self, cr, uid, ids, field, args, context=None): res = {} user_pool = self.pool['res.users'] for loc in self.browse(cr, uid, ids, context=context): res[loc.id] = user_pool.search( cr, uid, [['following_ids', 'in', [p.id for p in loc.patient_ids]], ['groups_id.name', 'in', ['NH Clinical HCA Group']]], context=context ) return res def _get_user_ids(self, cr, uid, location_id, group_names=None, recursive=True, context=None): loc = self.browse(cr, uid, location_id, context=context) if not group_names: group_names = [] res = [] if recursive: if loc.child_ids: for child in loc.child_ids: res += self._get_user_ids(cr, uid, child.id, group_names, context=context) for user in loc.user_ids: if not group_names: res += [user.id] elif any([g.name in group_names for g in user.groups_id]): res += [user.id] return list(set(res)) def _get_hca_ids(self, cr, uid, ids, field, args, context=None): res = {} for loc in self.browse(cr, uid, ids, context=context): res[loc.id] = self._get_user_ids( cr, uid, loc.id, group_names=['NH Clinical HCA Group'], context=context ) return res def _get_nurse_ids(self, cr, uid, ids, field, args, context=None): res = {} for loc in self.browse(cr, uid, ids, context=context): res[loc.id] = self._get_user_ids( cr, uid, loc.id, group_names=['NH Clinical Nurse Group'], context=context ) return res def _get_wm_ids(self, cr, uid, ids, field, args, context=None): res = {} for loc in self.browse(cr, uid, ids, context=context): if loc.usage == 'ward': res[loc.id] = self._get_user_ids( cr, uid, loc.id, group_names=['NH Clinical Shift Coordinator Group'], recursive=False, context=context ) else: res[loc.id] = self._get_user_ids( cr, uid, loc.id, group_names=['NH Clinical Shift Coordinator Group'], context=context ) return res def _get_doctor_ids(self, cr, uid, ids, field, args, context=None): res = {} for loc in self.browse(cr, uid, ids, context=context): res[loc.id] = self._get_user_ids( cr, uid, loc.id, group_names=[ 'NH Clinical Doctor Group', 'NH Clinical Junior Doctor Group', 'NH Clinical Consultant Group', 'NH Clinical Registrar Group' ], context=context ) return res def _get_hcas(self, cr, uid, ids, field, args, context=None): res = {} for loc in self.browse(cr, uid, ids, context=context): res[loc.id] = len(self._get_user_ids( cr, uid, loc.id, group_names=['NH Clinical HCA Group'], context=context)) return res def _get_nurses(self, cr, uid, ids, field, args, context=None): res = {} for loc in self.browse(cr, uid, ids, context=context): res[loc.id] = len(self._get_user_ids( cr, uid, loc.id, group_names=['NH Clinical Nurse Group'], context=context)) return res def _get_waiting_patients(self, cr, uid, ids, field, args, context=None): """ Returns the number of patients waiting to be allocated into a location within the selected location. Which means patients that have open patient placement activities related to this location. """ res = {} placement_pool = self.pool['nh.clinical.patient.placement'] for loc in self.browse(cr, uid, ids, context=context): res[loc.id] = len(placement_pool.search( cr, uid, [('suggested_location_id', '=', loc.id), ('state', 'not in', ['completed', 'cancelled'])])) return res def _get_child_patients(self, cr, uid, ids, field, args, context=None): """ Returns the number of patients related to the child locations of this location. Number of patients related to this location are not included. """ res = {} for loc in self.browse(cr, uid, ids, context=context): sum = 0 for child in loc.child_ids: sum += len(child.patient_ids) res[loc.id] = sum return res
[docs] def get_closest_parent_id(self, cr, uid, location_id, usage, context=None): """ Gets a location's closest ancestor (parent) location id of a particular usage. Returns ``False`` if no such location exists. :param location_id: location id :type location_id: int :param usage: usage of location. See :class:`nh_clinical_location` :returns: location id of the ancestor. Otherwise ``False`` :rtype: int or bool """ location = self.read(cr, uid, location_id, ['parent_id'], context=context) if not location or not location['parent_id']: return False else: parent = self.read(cr, uid, location['parent_id'][0], ['usage'], context=context) if parent['usage'] == usage: return parent['id'] else: return self.get_closest_parent_id(cr, uid, parent['id'], usage,
context=context)
[docs] def is_child_of(self, cr, uid, location_id, code, context=None): """ Checks if a location is a child of another location. :param location_id: location id :type location_id: int :param code: location code :type code: str :returns: the dictionary ``location_id`` (key) and a string containing location name and parent location name (value) Otherwise ``False`` is returned. :rtype: dict or bool """ code_location_id = self.search(cr, uid, [['code', '=', code]], context=context) child_location_ids = self.search( cr, uid, [['id', 'child_of', code_location_id[0]]], context=context )
return location_id in child_location_ids def _get_name(self, cr, uid, ids, field, args, context=None): result = {} for location in self.browse(cr, uid, ids, context=context): if location.usage == 'ward': result[location.id] = location.name else: parent_id = self.get_closest_parent_id(cr, uid, location.id, 'ward', context=context) if parent_id: parent = self.read(cr, uid, parent_id, ['name'], context=context) else: parent = False result[location.id] = '{0} [{1}]'.format( location.name, parent['name']) if parent else location.name return result def _is_available_search(self, cr, uid, obj, name, args, domain=None, context=None): """ Permits searching :meth:`_is_available` method so is_available field is searchable, ignoring any operand not '=' or '!=' because is_available is a boolean and thus nonsensical. """ location_ids = [] for cond in args: available_value = bool(cond[2]) if cond[1] not in ['=', '!=']: continue all_ids = self.search(cr, uid, [['usage', '=', 'bed']], context=context) available_locations_map = self._is_available( cr, uid, all_ids, 'is_available', None, context=context) if cond[1] == '=': location_ids += [k for k, v in available_locations_map.items() if v == available_value] else: location_ids += [k for k, v in available_locations_map.items() if v != available_value] return [('id', 'in', location_ids)] _columns = { 'name': fields.char('Location', size=100, required=True, select=True), 'full_name': fields.function(_get_name, type='char', size=150, string='Full Name'), 'code': fields.char('Code', size=256), 'parent_id': fields.many2one('nh.clinical.location', 'Parent Location'), 'child_ids': fields.one2many('nh.clinical.location', 'parent_id', 'Child Locations'), 'type': fields.selection(_types, 'Location Type'), 'usage': fields.selection(_usages, 'Location Usage'), 'active': fields.boolean('Active'), 'pos_id': fields.function( _get_pos_id, type='many2one', relation='nh.clinical.pos', string='POS', store={ 'nh.clinical.location': ( lambda s, cr, uid, ids, c: s.search(cr, uid, [['id', 'child_of', ids]]), ['parent_id'], 10), 'nh.clinical.pos': (_pos2location_id, ['location_id'], 5), }), 'company_id': fields.related('pos_id', 'company_id', type='many2one', relation='res.company', string='Company'), 'is_available': fields.function(_is_available, type='boolean', string='Is Available?', fnct_search=_is_available_search), 'patient_capacity': fields.integer('Patient Capacity'), 'patient_ids': fields.function(_get_patient_ids, type='many2many', relation='nh.clinical.patient', string="Patients"), 'user_ids': fields.many2many('res.users', 'user_location_rel', 'location_id', 'user_id', 'Responsible Users'), # aux fields for the view, worth having a SQL model instead? 'nurse_follower_ids': fields.function(_get_nurse_follower_ids, type='many2many', relation='res.users', string="Nurse Stand-Ins"), 'hca_follower_ids': fields.function(_get_hca_follower_ids, type='many2many', relation='res.users', string="HCA Stand-Ins"), 'assigned_hca_ids': fields.function(_get_hca_ids, type='many2many', relation='res.users', string="Assigned HCAs"), 'assigned_nurse_ids': fields.function(_get_nurse_ids, type='many2many', relation='res.users', string="Assigned Nurses"), 'assigned_wm_ids': fields.function( _get_wm_ids, type='many2many', relation='res.users', string="Assigned Shift Coordinator" ), 'assigned_doctor_ids': fields.function(_get_doctor_ids, type='many2many', relation='res.users', string="Assigned Doctors"), 'related_hcas': fields.function(_get_hcas, type='integer', string="Number of related HCAs"), 'related_nurses': fields.function(_get_nurses, type='integer', string="Number of related Nurses"), 'waiting_patients': fields.function( _get_waiting_patients, type='integer', string="Number of Waiting Patients" ), 'child_patients': fields.function( _get_child_patients, type='integer', string="Number of Patients from child locations"), 'context_ids': fields.many2many('nh.clinical.context', 'nh_location_context_rel', 'location_id', 'context_id', string='Related Clinical Contexts') } def _get_default_context_ids(self, cr, uid, context=None): context_pool = self.pool['nh.clinical.context'] context_ids = context_pool.search(cr, uid, [('name', '=', 'eobs')]) return [(6, 0, context_ids)] _defaults = { 'active': True, 'patient_capacity': 1, 'context_ids': _get_default_context_ids } _sql_constraints = [ ('location_code_uniq', 'unique(code)', 'The code for a location must be unique!') ]
[docs] def onchange_usage(self, cr, uid, ids, usage, context=None): """ Hospital locations don't have parent locations and they are always Point of Service type. """ if usage != 'hospital': return {} return { 'value': {'parent_id': False, 'type': 'pos'}
}
[docs] def onchange_type(self, cr, uid, ids, usage, type, context=None): """ Hospital locations can only be Point of Service type """ if usage != 'hospital' or type == 'pos': return {} return { 'warning': { 'title': 'Warning', 'message': 'Hospital locations can only be Point of Service' }, 'value': { 'type': 'pos' }
}
[docs] def onchange_parent_id(self, cr, uid, ids, usage, parent_id, context=None): """ Hospital locations can not have a parent location """ if usage != 'hospital' or not parent_id: return {} return { 'warning': { 'title': 'Warning', 'message': 'Hospital locations can not have a parent location' }, 'value': { 'parent_id': False }
}
[docs] def get_available_location_ids(self, cr, uid, usages=None, context=None): """ Gets a list of available locations, only returning beds unless specified otherwise. :param usages: location type (``ward``, ``bed``, etc.) of available locations :type usage: list :returns: location ids of available locations (default usage is ``bed``)i :rtype: list """ if not usages: usages = ['bed'] activity_pool = self.pool['nh.activity'] open_spell_ids = activity_pool.search( cr, uid, [['data_model', '=', 'nh.clinical.spell'], ['state', '=', 'started']], context=context) busy_location_ids = [a.location_id.id if a.location_id.usage == 'bed' else False for a in activity_pool.browse(cr, uid, open_spell_ids, context=context)] busy_location_ids = list(set(busy_location_ids)) return self.search(cr, uid, [['usage', 'in', usages], ['id', 'not in', busy_location_ids]],
context=context)
[docs] def switch_active_status(self, cr, uid, location_id, context=None): """ Activates the location if inactive and deactivates it if active. :param location_id: location id of location to be switched :type location_id: int :returns: ``True`` :rtype: bool """ if isinstance(location_id, list): location_id = location_id[0] location = self.browse(cr, uid, location_id, context=context) activity_pool = self.pool['nh.activity'] activate_pool = self.pool['nh.clinical.location.activate'] deactivate_pool = self.pool['nh.clinical.location.deactivate'] if location.active: activity_id = deactivate_pool.create_activity( cr, SUPERUSER_ID, {}, {'location_id': location.id}, context=context) else: activity_id = activate_pool.create_activity( cr, SUPERUSER_ID, {}, {'location_id': location.id}, context=context)
return activity_pool.complete(cr, uid, activity_id, context=context)
[docs] def check_context_ids(self, cr, uid, context_ids, context=None): """ :param cr: :param uid: :param context_ids: :param context: :return: """ if isinstance(context_ids[0], list): if context_ids[0][0] == 4: context_ids = [c[1] for c in context_ids] elif context_ids[0][0] == 6: context_ids = context_ids[0][2] else: return True self.pool['nh.clinical.context'].check_model(cr, uid, context_ids, self._name, context=context)
return True
[docs] def get_by_code(self, cr, uid, code, auto_create=False, context=None): """ Gets the location's id by the location's code. Creates a location if ``auto_create`` is ``True`` and the location doesn't exist. :param code: location's code :type code: str :param auto_create: ``False`` [default]. :type auto_create: bool :returns: location id of the location. ``False`` if ``auto_create`` is ``True`` and location doesn't exist, the location id of new ward location created. Otherwise ``False`` :rtype: int or bool """ location_ids = self.search(cr, uid, [['code', '=', code]], context=context) if not location_ids: if not auto_create: return False else: _logger.warn("Location '%s' not found! " "Automatically creating one with this code.", code) user_pool = self.pool['res.users'] user = user_pool.browse(cr, uid, uid, context=context) location_id = self.create(cr, uid, { 'name': code, 'code': code, 'pos_id': user.pos_id.id if user.pos_id else False, 'parent_id': user.pos_ids[0].location_id.id if user.pos_ids[0].location_id else False, 'type': 'poc', 'usage': 'ward' }, context=context) else: location_id = location_ids[0]
return location_id
[docs] def create(self, cr, uid, vals, context=None): """ Extends Odoo's :meth:`create()<openerp.models.Model.create>` method. Updates :class:`nh_clinical_location` to write `context_ids` field. :param vals: values to update the records with :type vals: dict :returns: ``True`` :rtype: bool """ if vals.get('context_ids'): self.check_context_ids(cr, uid, vals.get('context_ids'), context=context) res = super(nh_clinical_location, self).create( cr, uid, vals, context=context) if vals.get('type') == 'pos' and vals.get('usage') == 'hospital': user_pool = self.pool['res.users'] user = user_pool.browse(cr, uid, uid, context=context) if 'NH Clinical Admin Group' in [g.name for g in user.groups_id]: pos_pool = self.pool['nh.clinical.pos'] pos_id = pos_pool.create(cr, uid, { 'name': vals.get('name'), 'location_id': res}) user_pool.write(cr, uid, user.id, {'pos_ids': [[4, pos_id]]})
return res
[docs] def write(self, cr, uid, ids, vals, context=None): """ Extends Odoo's :meth:`write()<openerp.models.Model.write>` method. Updates :class:`nh_clinical_location` to write `context_ids` field. :param ids: ids of the records to update :type ids: list :param vals: values to update the records with :type vals: dict :returns: ``True`` :rtype: bool """ if vals.get('context_ids'): self.check_context_ids(cr, uid, vals.get('context_ids'), context=context) return super(nh_clinical_location, self).write(cr, uid, ids, vals,
context=context)