# Part of NHClinical. See LICENSE file for full copyright and licensing details
# -*- coding: utf-8 -*-
"""
Extends Odoo's res_users.
"""
import logging
import re
from openerp import SUPERUSER_ID, api
from openerp.osv import orm, fields, osv
_logger = logging.getLogger(__name__)
[docs]class res_users(orm.Model):
"""
Extends Odoo's
:class:`res_users<openerp.addons.base.res.res_users.res_users>`
to include point of service, parent locations of responsibility,
following patients and the related doctor for the user.
"""
_name = 'res.users'
_inherit = 'res.users'
_columns = {
'pos_id': fields.many2one('nh.clinical.pos', 'POS'),
'pos_ids': fields.many2many(
'nh.clinical.pos', 'user_pos_rel', 'user_id', 'pos_id',
'Points of Service'),
'location_ids': fields.many2many('nh.clinical.location',
'user_location_rel',
'user_id',
'location_id',
'Parent Locations of Responsibility'),
'following_ids': fields.many2many('nh.clinical.patient',
'user_patient_rel',
'user_id',
'patient_id',
'Following Patients'),
'doctor_id': fields.many2one('nh.clinical.doctor', 'Related Doctor')
}
[docs] def check_pos(self, cr, uid, user_id, exception=False, context=None):
"""
Checks if user has an assigned point of service (POS).
:param user_id: id of user to check
:type user_id: int
:param exception: ``False`` [default]
:type exception: bool
:returns: if exception parameter is ``False``, then returns
``True`` if POS is defined. Otherwise ``False``. If
is ``True``, them returns ``True`` if POS is defined. If POS
is undefined, then
:class:`osv.except_osv<openerp.osv.osv.except_osv>` is
raised
"""
user = self.browse(cr, uid, user_id, context=context)
result = bool(user.pos_ids)
if not exception:
return result
else:
if not result:
raise osv.except_osv('Point of Service Not Defined!',
'User %s has no POS defined.' % user.name)
else:
return result
[docs] def update_group_vals(self, cr, uid, user_id, vals, context=None):
"""
Updates a user's groups(see
:class:`res_partner_category_extension` and :class:`res_groups`)
for the user.
:param user_id: id of user
:type user_id: int
:param vals: expecting a dictionary with keys ``group_ids``
and/or ``category_id``. Values will be a list of ints
:type vals: dict
:returns: ``True``
:rtype: bool
"""
category_pool = self.pool['res.partner.category']
if not vals.get('category_id'):
return True
elif not isinstance(vals.get('category_id'), list):
raise osv.except_osv(
'Value Error!',
'category_id field expecting list value, %s received' %
type(vals.get('category_id')))
elif not isinstance(vals.get('category_id')[0], (list, tuple)):
raise osv.except_osv(
'Value Error!',
'many2many update expecting list or tuple value, %s received' %
type(vals.get('category_id')[0]))
add_groups_id = []
if not vals.get('groups_id'):
vals['groups_id'] = []
for cat_val in vals.get('category_id'):
if not isinstance(cat_val, (list, tuple)):
raise osv.except_osv(
'Value Error!',
'many2many update expecting list or tuple value, '
'%s received' % type(vals.get('category_id')[0]))
if cat_val[0] == 3: # Removing categories / roles
group_ids = category_pool.read(
cr, uid, cat_val[1], ['group_ids'],
context=context)['group_ids']
for gid in group_ids:
add_groups_id.append((3, gid))
elif cat_val[0] == 4: # Adding categories / roles
group_ids = category_pool.read(
cr, uid, cat_val[1], ['group_ids'],
context=context)['group_ids']
for gid in group_ids:
add_groups_id.append((4, gid))
elif cat_val[0] == 5: # Removing all categories / roles
add_groups_id = []
group_ids = []
parent_id = category_pool.search(
cr, uid, [['name', '=', 'System Administrator']])
category_ids = category_pool.get_child_of_ids(
cr, uid, parent_id[0], context=context)
for cid in category_ids:
group_ids += category_pool.read(
cr, uid, cid, ['group_ids'],
context=context)['group_ids']
for gid in group_ids:
add_groups_id.append((3, gid))
vals['groups_id'] += add_groups_id
return True
elif cat_val[0] == 6: # Replacing categories / roles
add_groups_id = []
category_ids = cat_val[2]
new_group_ids = []
for cid in category_ids:
new_group_ids += category_pool.read(
cr, uid, cid, ['group_ids'],
context=context)['group_ids']
old_group_ids = self.read(
cr, uid, user_id, ['groups_id'],
context=context)['groups_id'] if user_id else []
def filter_old_group_ids(group_id):
is_nhc_related = category_pool.search(
cr, uid, [['group_ids', 'in', [group_id]]],
context=context)
return is_nhc_related
old_group_ids = filter(filter_old_group_ids, old_group_ids)
group_ids = list(set(old_group_ids + new_group_ids))
for gid in group_ids:
if gid in old_group_ids and gid not in new_group_ids:
add_groups_id.append((3, gid))
elif gid not in old_group_ids and gid in new_group_ids:
add_groups_id.append((4, gid))
vals['groups_id'] += add_groups_id
return True
else:
raise osv.except_osv(
'Value Error!',
'Unexpected value for category_id field received: %s' %
cat_val)
add_groups_id = list(set(add_groups_id))
# Ensure no removal from group from a category the user still linked
if user_id:
old_cat_ids = set(
self.read(cr, uid, user_id, ['category_id'],
context=context)['category_id']
)
del_cat_ids = [
ctuple[1] for ctuple in vals.get('category_id')
if ctuple[0] == 3
]
new_cat_ids = [
cid for cid in old_cat_ids if cid not in del_cat_ids]
remaining_group_ids = []
for cid in new_cat_ids:
remaining_group_ids += category_pool.read(
cr, uid, cid, ['group_ids'], context=context)['group_ids']
remaining_group_ids = set(remaining_group_ids)
add_groups_id = \
filter(lambda e: e[0] != 3 or e[1] not in remaining_group_ids,
add_groups_id)
# Make sure we don't remove any group that is being added
add_groups_id = \
filter(lambda e: e[0] != 3 or (4, e[1]) not in add_groups_id,
add_groups_id)
vals['groups_id'] += add_groups_id
return True
@api.one
@api.constrains('location_ids')
def _check_location_ids(self):
activity_model = self.env['nh.activity']
locked_activities = activity_model.search([
['user_id', '=', self.id],
['location_id', 'not in', self.location_ids.ids]
])
if locked_activities:
locked_activities.write({'user_id': False})
return True
[docs] def create(self, cr, user, vals, context=None):
"""
Extends Odoo's :meth:`create()<openerp.models.Model.create>`
to update fields ``group_ids`` and ``doctor_id``.
:param vals: values to initialise the new record
:type vals: dict
:returns: id of created record
:rtype: int
"""
creator = self.browse(cr, user, user, context=context)
if creator.pos_ids and not vals.get('pos_ids'):
vals['pos_ids'] = [[6, 0, [p.id for p in creator.pos_ids]]]
self.update_group_vals(cr, user, False, vals, context=context)
res = super(res_users, self).create(
cr, user, vals,
context=dict(context or {}, mail_create_nosubscribe=True))
if 'doctor_id' in vals:
self.pool['nh.clinical.doctor'].write(
cr, user, vals['doctor_id'], {'user_id': res}, context=context)
if 'groups_id' in vals:
self.update_doctor_status(cr, user, res, context=context)
return res
[docs] def write(self, cr, uid, ids, values, context=None):
"""
Extends Odoo's :meth:`write()<openerp.models.Model.write>`
method.
:param ids: id of records to update
:type ids: list or int
:param values: values to update the records with
:type values: dict
:returns: ``True``
:rtype: bool
"""
if isinstance(ids, list) and len(ids) == 1:
self.update_group_vals(cr, uid, ids[0], values, context=context)
elif isinstance(ids, int):
self.update_group_vals(cr, uid, ids, values, context=context)
res = super(res_users, self).write(cr, uid, ids, values, context)
if values.get('location_ids') or values.get('groups_id'):
activity_pool = self.pool['nh.activity']
activity_pool.update_users(cr, uid, ids)
if 'groups_id' in values:
self.update_doctor_status(cr, uid, ids, context=context)
return res
[docs] def name_get(self, cr, uid, ids, context=None):
"""
Gets the names of users.
:param ids: user ids
:type ids: list or int
:returns: user names
:rtype: list
"""
partner_pool = self.pool['res.partner']
if context is None:
context = {}
if isinstance(ids, (int, long)):
ids = [ids]
res = []
for record in self.browse(cr, uid, ids, context=context):
doctor_groups = ['NH Clinical Doctor Group',
'NH Clinical Registrar Group',
'NH Clinical Consultant Group',
'NH Clinical Junior Doctor Group']
if set(doctor_groups).intersection(
[g.name for g in record.groups_id]):
if record.title:
name = record.title.name + ' ' + record.name
else:
name = record.name
else:
name = record.name
if record.parent_id and not record.is_company:
name = "%s, %s" % (record.parent_name, name)
if context.get('show_address_only'):
name = partner_pool._display_address(
cr, uid, record, without_company=True, context=context)
if context.get('show_address'):
name = name + "\n" + partner_pool._display_address(
cr, uid, record, without_company=True, context=context)
name = name.replace('\n\n', '\n')
name = name.replace('\n\n', '\n')
if context.get('show_email') and record.email:
name = "%s <%s>" % (name, record.email)
res.append((record.id, name))
return res
[docs] @api.model
def get_name(self, uid):
return self.browse(uid).name
[docs] def update_doctor_status(self, cr, uid, ids, context=None):
"""
Updates ``doctor`` field in :class:`res_partner` if user is a
doctor.
:param ids: user ids
:type ids: list
:returns: ``True``
:rtype: bool
"""
doctor_groups = ['NH Clinical Doctor Group',
'NH Clinical Registrar Group',
'NH Clinical Consultant Group',
'NH Clinical Junior Doctor Group']
partner_pool = self.pool['res.partner']
for record in self.browse(cr, uid, ids, context=context):
if set(doctor_groups).intersection(
[g.name for g in record.groups_id]):
partner_pool.write(
cr, uid, record.partner_id.id,
{'doctor': True}, context=context)
elif record.partner_id.doctor:
partner_pool.write(
cr, uid, record.partner_id.id, {'doctor': False},
context=context)
return True
[docs] def get_groups_string(self, cr, uid, context=None):
"""
:returns: list of NH Clinical user groups for UID in string format
"""
user = self.browse(cr, uid, uid, context=context)
return [re.sub(
r' Group', '', re.sub(r'NH Clinical ', '', g.name)
) for g in user.groups_id if 'NH Clinical' in g.name and g.name !=
'NH Clinical Base Group']
[docs] @api.multi
def filter_nurses(self, users):
nurse_group_name = 'NH Clinical Nurse Group'
return self.filter_users_by_group(users, nurse_group_name)
[docs] @api.multi
def filter_hcas(self, users):
hca_group_name = 'NH Clinical HCA Group'
return self.filter_users_by_group(users, hca_group_name)
[docs] @classmethod
def filter_users_by_group(cls, users, group):
return filter(lambda user: cls.user_in_group(user, group), users)
[docs] @staticmethod
def user_in_group(user, group):
user_groups = map(lambda group: group.name, user.groups_id)
return group in user_groups
[docs] @api.multi
def is_doctor(self):
"""
:return:
:rtype: bool
"""
return self._is_in_user_group('Doctor')
[docs] @api.multi
def is_shift_coordinator(self):
"""
:return:
:rtype: bool
"""
return self._is_in_user_group('Shift Coordinator')
[docs] @api.multi
def is_senior_manager(self):
"""
:return:
:rtype: bool
"""
return self._is_in_user_group('Senior Manager')
[docs] @api.multi
def is_system_admin(self):
"""
:return:
:rtype: bool
"""
return self._is_in_user_group('Admin')
@api.multi
def _is_in_user_group(self, user_role_name):
"""
:param user_role_name: The user's role in title case e.g.
'Shift Coordinator'.
:type user_role_name: str
:return:
:rtype: bool
"""
self.ensure_one()
for group in self.groups_id:
if group.name == 'NH Clinical {} Group'.format(user_role_name):
return True
return False
[docs]class nh_change_password_wizard(osv.TransientModel):
"""
Extension of Odoo's res.users change password wizard. Used to allow
users to change their password securely.
"""
_name = "change.password.wizard"
_inherit = "change.password.wizard"
def _default_user_ids(self, cr, uid, context=None):
if context is None:
context = {}
user_model = self.pool['res.users']
user_ids = context.get('active_ids') or []
return [
(0, 0, {'user_id': user.id, 'user_login': user.login})
for user in user_model.browse(cr, uid, user_ids, context=context)
]
_defaults = {
'user_ids': _default_user_ids,
}
[docs]class nh_clinical_specialty(orm.Model):
"""
Specialty represent a doctor clinical specialty
"""
_name = 'nh.clinical.specialty'
_description = 'A Clinical Specialty'
_specialty_groups = [
['surgical', 'Surgical Specialties'],
['medical', 'Medical Specialties'], ['psychiatry', 'Psychiatry'],
['radiology', 'Radiology'], ['pathology', 'Pathology'],
['other', 'Other']]
_columns = {
'name': fields.char('Main Specialty Title', size=150),
'code': fields.integer('Code'),
'group': fields.selection(_specialty_groups, 'Specialty Group')
}
[docs]class nh_clinical_doctor(orm.Model):
"""
Represents a doctor.
"""
_name = 'nh.clinical.doctor'
_inherits = {'res.partner': 'partner_id'}
_gender = [['BOTH', 'Both'], ['F', 'Female'], ['I', 'Intermediate'],
['M', 'Male'], ['NSP', 'Not Specified'], ['U', 'Unknown']]
_columns = {
'partner_id': fields.many2one('res.partner', 'Partner', required=1,
ondelete='restrict'),
'gender': fields.selection(_gender, 'Gender'),
'gmc': fields.char('GMC', size=10),
'specialty_id': fields.many2one('nh.clinical.specialty', 'Speciality'),
'code': fields.char('Regional Code', size=10),
'user_id': fields.many2one('res.users', 'User Account')
}
_defaults = {
'gender': 'U'
}
[docs] def create(self, cr, user, vals, context=None):
"""
Extends Odoo's :meth:`create()<openerp.models.Model.create>`
to update ``doctor_id`` field in :class:`res_users`.
:returns: ``True`` if created
:rtype: bool
"""
res = super(nh_clinical_doctor, self).create(
cr, user, vals, context=dict(context or {},
mail_create_nosubscribe=True))
if 'user_id' in vals:
self.pool['res.users'].write(
cr, user, vals['user_id'], {'doctor_id': res}, context=context)
return res
[docs] def write(self, cr, uid, ids, vals, context=None):
"""
Extends Odoo's :meth:`write()<openerp.models.Model.write>` to
update ``doctor_id`` field in :class:`res_users`.
:returns: ``True`` if created
:rtype: bool
"""
res = super(nh_clinical_doctor, self).write(
cr, uid, ids, vals, context=context)
if 'user_id' in vals:
self.pool['res.users'].write(
cr, uid, vals['user_id'],
{'doctor_id': ids[0] if isinstance(ids, list) else ids},
context=context)
return res
[docs] def evaluate_doctors_dict(self, cr, uid, data, context=None):
"""
Evaluates doctors, checking for a doctor before creating a new
doctor if it doesn't exist.
:param data: must contain ``doctors`` key. Its value will be a
list of dictionaries which must contain the keys ``code``,
``gender``, ``gmc`` and ``type``. It many contain ``title``.
:type data: dict
:returns: ``True`` if either it finds a doctor or creates a new
doctor. Otherwise ``False``
:rtype: bool
"""
if not data.get('doctors'):
_logger.warn("Trying to evaluate doctors dictionary without "
"doctors data!")
return False
else:
try:
doctors = eval(str(data['doctors']))
patient_pool = self.pool['nh.clinical.patient']
ref_doctor_ids = []
con_doctor_ids = []
for d in doctors:
doctor_id = self.search(
cr, uid, [['code', '=', d.get('code')]],
context=context)
if not doctor_id:
title_id = False
if 'title' in d.keys():
title_pool = self.pool['res.partner.title']
title_id = title_pool.get_title_by_name(
cr, uid, d['title'], context=context)
doctor = {
'name': patient_pool._get_fullname(d),
'title': title_id,
'code': d.get('code'),
'gender': d.get('gender'),
'gmc': d.get('gmc')
}
doctor_id = self.create(
cr, uid, doctor, context=context)
else:
if len(doctor_id) > 1:
_logger.warn("More than one doctor found with "
"code '%s' passed id=%s" %
(d.get('code'), doctor_id[0]))
doctor_id = doctor_id[0]
if d['type'] == 'r':
ref_doctor_ids.append(doctor_id)
else:
con_doctor_ids.append(doctor_id)
ref_doctor_ids and data.update(
{'ref_doctor_ids': [[6, False, ref_doctor_ids]]})
con_doctor_ids and data.update(
{'con_doctor_ids': [[6, False, con_doctor_ids]]})
except:
_logger.warn("Can't evaluate 'doctors': %s" %
(data['doctors']))
return False
return True
# FIXME: Here to prevent mail message from complaining when creating a user
[docs]class mail_message(osv.Model):
_name = 'mail.message'
_inherit = 'mail.message'
def _get_default_from(self, cr, uid, context=None):
this = self.pool.get('res.users').browse(
cr, SUPERUSER_ID, uid, context=context)
if this.alias_name and this.alias_domain:
return '%s <%s@%s>' % (
this.name, this.alias_name, this.alias_domain)
elif this.email:
return '%s <%s>' % (this.name, this.email)
else:
return '%s <%s>' % (this.name, 'No email')