# Part of NHClinical. See LICENSE file for full copyright and licensing details
# -*- coding: utf-8 -*-
from openerp.osv.expression import (OR_OPERATOR, AND_OPERATOR, ExtendedLeaf,
create_substitution_leaf, MAGIC_COLUMNS,
normalize_domain, TRUE_LEAF, traceback,
FALSE_LEAF, select_from_where,
NEGATIVE_TERM_OPERATORS,
select_distinct_from_where_not_null)
from openerp.osv import fields
import logging
_logger = logging.getLogger()
# ----------------------------------------
# Parsing
# ----------------------------------------
[docs]def _quote(to_quote):
if '"' not in to_quote:
return '"%s"' % to_quote
return to_quote
[docs]def parse(self, cr, uid, context):
"""
Transform the leaves of the expression
The principle is to pop elements from a leaf stack one at a time.
Each leaf is processed. The processing is a if/elif list of various
cases that appear in the leafs (many2one, function fields, ...).
Two things can happen as a processing result:
- the leaf has been modified and/or new leafs have to be introduced
in the expression; they are pushed into the leaf stack, to be
processed right after
- the leaf is added to the result
Some internal var explanation:
:var obj working_model: model object, model containing the field
(the name provided in the left operand)
:var list field_path: left operand seen as a path
(foo.bar -> [foo, bar])
:var obj relational_model: relational model of a field (field._obj)
ex: res_partner.bank_ids -> res.partner.bank
"""
def to_ids(value, relational_model, context=None, limit=None):
"""
Normalize a single id or name, or a list of those,
into a list of ids
:param {int,long,basestring,list,tuple} value:
if int, long -> return [value]
if basestring, convert it into a list of basestrings, then
if list of basestring ->
perform a name_search on relational_model for each name
return the list of related ids
"""
names = []
if isinstance(value, basestring):
names = [value]
elif value and isinstance(value, (tuple, list)) and \
all(isinstance(item, basestring) for item in value):
names = value
elif isinstance(value, (int, long)):
return [value]
if names:
name_get_list = [name_get[0] for name in names for name_get in
relational_model.name_search(
cr, uid, name, [], 'ilike', context=context,
limit=limit)]
return list(set(name_get_list))
return list(value)
def child_of_domain(left, ids, left_model, parent=None, prefix='',
context=None):
"""
Return a domain implementing the child_of operator for
[(left,child_of,ids)], either as a range using the parent_left/right
tree lookup fields (when available),
or as an expanded [(left,in,child_ids)]
"""
if left_model._parent_store and (not left_model.pool._init):
# TODO: Improve joins implemented for many with '.', replace by:
# doms += ['&',(prefix+'.parent_left','<',o.parent_right),
# (prefix+'.parent_left','>=',o.parent_left)]
doms = []
for o in left_model.browse(cr, uid, ids, context=context):
if doms:
doms.insert(0, OR_OPERATOR)
doms += [AND_OPERATOR, ('parent_left', '<', o.parent_right),
('parent_left', '>=', o.parent_left)]
if prefix:
return [(left, 'in', left_model.search(
cr, uid, doms, context=context))]
return doms
else:
def recursive_children(ids, model, parent_field):
if not ids:
return []
ids2 = model.search(cr, uid, [(parent_field, 'in', ids)],
context=context)
return ids + recursive_children(ids2, model, parent_field)
return [(left, 'in', recursive_children(
ids, left_model, parent or left_model._parent_name))]
def pop():
""" Pop a leaf to process. """
return self.stack.pop()
def push(leaf):
""" Push a leaf to be processed right after. """
self.stack.append(leaf)
def push_result(leaf):
""" Push a leaf to the results. This leaf has been fully processed
and validated. """
self.result.append(leaf)
self.result = []
self.stack = [ExtendedLeaf(leaf, self.root_model)
for leaf in self.expression]
# process from right to left; expression is from left to right
self.stack.reverse()
while self.stack:
# Get the next leaf to process
leaf = pop()
# Get working variables
working_model = leaf.model
if leaf.is_operator():
left, operator, right = leaf.leaf, None, None
elif leaf.is_true_leaf() or leaf.is_false_leaf():
# because we consider left as a string
left, operator, right = ('%s' % leaf.leaf[0], leaf.leaf[1],
leaf.leaf[2])
else:
left, operator, right = leaf.leaf
field_path = left.split('.', 1)
field = working_model._columns.get(field_path[0])
# Neova Health BEGIN
if not working_model._columns.get(field_path[0]) and \
field_path[0] == 'id':
# field 'id' normally is not in the _columns
# the problem appeared with call
# search('t4clinical.task.base', [
# ('responsible_user_ids','in',uid)])
# -- returned [], due to this issue was looking for
# t4clinical.task.base ids in project.task ids
field = fields.integer('fake id field. quick fix')
field._obj = working_model._name
# Neova Health END
if field and field._obj:
relational_model = working_model.pool.get(field._obj)
else:
relational_model = None
# ----------------------------------------
# SIMPLE CASE
# 1. leaf is an operator
# 2. leaf is a true/false leaf
# -> add directly to result
# ----------------------------------------
if leaf.is_operator() or leaf.is_true_leaf() or leaf.is_false_leaf():
push_result(leaf)
# ----------------------------------------
# FIELD NOT FOUND
# -> from inherits'd fields -> work on the related model, and add
# a join condition
# -> ('id', 'child_of', '..') -> use a 'to_ids'
# -> but is one on the _log_access special fields, add directly to
# result
# TODO: make these fields explicitly available in self.columns instead!
# -> else: crash
# ----------------------------------------
elif not field and field_path[0] in working_model._inherit_fields:
# comments about inherits'd fields
# { 'field_name': ('parent_model', 'm2o_field_to_reach_parent',
# field_column_obj, origina_parent_model), ... }
next_model = working_model.pool.get(
working_model._inherit_fields[field_path[0]][0])
leaf.add_join_context(
next_model, working_model._inherits[next_model._name], 'id',
working_model._inherits[next_model._name])
push(leaf)
elif left == 'id' and operator == 'child_of':
ids2 = to_ids(right, working_model, context)
dom = child_of_domain(left, ids2, working_model)
for dom_leaf in reversed(dom):
new_leaf = create_substitution_leaf(leaf, dom_leaf,
working_model)
push(new_leaf)
elif not field and field_path[0] in MAGIC_COLUMNS:
push_result(leaf)
elif not field:
raise ValueError("Invalid field %r in leaf %r" % (left, str(leaf)))
# ----------------------------------------
# PATH SPOTTED
# -> many2one or one2many with _auto_join:
# - add a join, then jump into linked field: field.remaining on
# src_table is replaced by remaining on dst_table,
# and set for re-evaluation
# - if a domain is defined on the field, add it into evaluation
# on the relational table
# -> many2one, many2many, one2many: replace by an equivalent computed
# domain, given by recursively searching on the path's remaining
# -> note: hack about fields.property should not be necessary anymore
# as after transforming the field, it will go through this loop
# once again
# ----------------------------------------
elif len(field_path) > 1 and field._type == 'many2one' and \
field._auto_join:
# res_partner.state_id = res_partner__state_id.id
leaf.add_join_context(relational_model, field_path[0], 'id',
field_path[0])
push(create_substitution_leaf(
leaf, (field_path[1], operator, right), relational_model))
elif len(field_path) > 1 and field._type == 'one2many' and \
field._auto_join:
# res_partner.id = res_partner__bank_ids.partner_id
leaf.add_join_context(relational_model, 'id', field._fields_id,
field_path[0])
domain = field._domain(working_model) if callable(field._domain) \
else field._domain
push(create_substitution_leaf(
leaf, (field_path[1], operator, right), relational_model))
if domain:
domain = normalize_domain(domain)
for elem in reversed(domain):
push(create_substitution_leaf(leaf, elem,
relational_model))
push(create_substitution_leaf(leaf, AND_OPERATOR,
relational_model))
elif len(field_path) > 1 and field._auto_join:
raise NotImplementedError('_auto_join attribute not supported on '
'many2many field %s' % left)
elif len(field_path) > 1 and field._type == 'many2one':
right_ids = relational_model.search(
cr, uid, [(field_path[1], operator, right)], context=context)
leaf.leaf = (field_path[0], 'in', right_ids)
push(leaf)
# Making search easier when there is a left operand
# as field.o2m or field.m2m
elif len(field_path) > 1 and field._type in ['many2many', 'one2many']:
right_ids = relational_model.search(
cr, uid, [(field_path[1], operator, right)], context=context)
table_ids = working_model.search(
cr, uid, [(field_path[0], 'in', right_ids)],
context=dict(context, active_test=False))
leaf.leaf = ('id', 'in', table_ids)
push(leaf)
# -------------------------------------------------
# FUNCTION FIELD
# -> not stored: error if no _fnct_search,
# otherwise handle the result domain
# -> stored: management done in the remaining of parsing
# -------------------------------------------------
elif isinstance(field, fields.function) and not field.store \
and not field._fnct_search:
# this is a function field that is not stored
# the function field doesn't provide a search function and
# doesn't store values in the database, so we must ignore it:
# we generate a dummy leaf.
leaf.leaf = TRUE_LEAF
_logger.error(
"The field '%s' (%s) can not be searched: non-stored "
"function field without fnct_search",
field.string, left)
# avoid compiling stack trace if not needed
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(''.join(traceback.format_stack()))
push(leaf)
elif isinstance(field, fields.function) and not field.store:
# this is a function field that is not stored
fct_domain = field.search(cr, uid, working_model, left,
[leaf.leaf], context=context)
if not fct_domain:
leaf.leaf = TRUE_LEAF
push(leaf)
else:
# we assume that the expression is valid
# we create a dummy leaf for forcing the parsing of the
# resulting expression
for domain_element in reversed(fct_domain):
push(create_substitution_leaf(leaf, domain_element,
working_model))
# self.push(
# create_substitution_leaf(leaf, TRUE_LEAF, working_model))
# self.push(
# create_substitution_leaf(leaf, AND_OPERATOR, working_model))
# -------------------------------------------------
# RELATIONAL FIELDS
# -------------------------------------------------
# Applying recursivity on field(one2many)
elif field._type == 'one2many' and operator == 'child_of':
ids2 = to_ids(right, relational_model, context)
if field._obj != working_model._name:
dom = child_of_domain(left, ids2, relational_model,
prefix=field._obj)
else:
dom = child_of_domain('id', ids2, working_model, parent=left)
for dom_leaf in reversed(dom):
push(create_substitution_leaf(leaf, dom_leaf, working_model))
elif field._type == 'one2many':
call_null = True
if right is not False:
if isinstance(right, basestring):
ids2 = [x[0] for x in relational_model.name_search(
cr, uid, right, [], operator, context=context,
limit=None)
]
if ids2:
operator = 'in'
else:
if not isinstance(right, list):
ids2 = [right]
else:
ids2 = right
if not ids2:
if operator in ['like', 'ilike', 'in', '=']:
# no result found with given search criteria
call_null = False
push(create_substitution_leaf(leaf, FALSE_LEAF,
working_model))
else:
ids2 = select_from_where(cr, field._fields_id,
relational_model._table, 'id',
ids2, operator)
if ids2:
call_null = False
o2m_op = 'not in' if operator in \
NEGATIVE_TERM_OPERATORS else 'in'
push(create_substitution_leaf(
leaf, ('id', o2m_op, ids2), working_model))
if call_null:
o2m_op = 'in' if operator in \
NEGATIVE_TERM_OPERATORS else 'not in'
push(create_substitution_leaf(
leaf, ('id', o2m_op,
select_distinct_from_where_not_null(
cr, field._fields_id, relational_model._table)),
working_model))
elif field._type == 'many2many':
rel_table, rel_id1, rel_id2 = field._sql_names(working_model)
# FIXME
if operator == 'child_of':
def _rec_convert(ids):
if relational_model == working_model:
return ids
return select_from_where(cr, rel_id1, rel_table, rel_id2,
ids, operator)
ids2 = to_ids(right, relational_model, context)
dom = child_of_domain('id', ids2, relational_model)
ids2 = relational_model.search(cr, uid, dom, context=context)
push(create_substitution_leaf(leaf,
('id', 'in', _rec_convert(ids2)),
working_model))
else:
call_null_m2m = True
if right is not False:
if isinstance(right, basestring):
res_ids = [x[0] for x in relational_model.name_search(
cr, uid, right, [], operator, context=context)]
if res_ids:
operator = 'in'
else:
if not isinstance(right, list):
res_ids = [right]
else:
res_ids = right
if not res_ids:
if operator in ['like', 'ilike', 'in', '=']:
# no result found with given search criteria
call_null_m2m = False
push(create_substitution_leaf(leaf, FALSE_LEAF,
working_model))
else:
# operator changed because ids are directly related
# to main object
operator = 'in'
else:
call_null_m2m = False
m2m_op = 'not in' if operator in \
NEGATIVE_TERM_OPERATORS else 'in'
push(create_substitution_leaf(
leaf, ('id', m2m_op,
select_from_where(
cr, rel_id1, rel_table, rel_id2,
res_ids, operator) or [0]),
working_model))
if call_null_m2m:
m2m_op = 'in' if operator in \
NEGATIVE_TERM_OPERATORS else 'not in'
push(create_substitution_leaf(
leaf,
('id', m2m_op, select_distinct_from_where_not_null(
cr, rel_id1, rel_table)), working_model))
elif field._type == 'many2one':
if operator == 'child_of':
ids2 = to_ids(right, relational_model, context)
if field._obj != working_model._name:
dom = child_of_domain(left, ids2, relational_model,
prefix=field._obj)
else:
dom = child_of_domain('id', ids2, working_model,
parent=left)
for dom_leaf in reversed(dom):
push(create_substitution_leaf(leaf, dom_leaf,
working_model))
else:
def _get_expression(relational_model, cr, uid, left, right,
operator, context=None):
if context is None:
context = {}
c = context.copy()
c['active_test'] = False
# Special treatment to ill-formed domains
operator = (operator in ['<', '>', '<=', '>=']) and 'in' \
or operator
dict_op = {'not in': '!=', 'in': '=',
'=': 'in', '!=': 'not in'}
if isinstance(right, tuple):
right = list(right)
if (not isinstance(right, list)) and \
operator in ['not in', 'in']:
operator = dict_op[operator]
elif isinstance(right, list) and operator in ['!=', '=']:
# for domain (FIELD,'=',['value1','value2'])
operator = dict_op[operator]
res_ids = [x[0] for x in relational_model.name_search(
cr, uid, right, [], operator, limit=None, context=c)]
if operator in NEGATIVE_TERM_OPERATORS:
# TODO this should not be appended if False in 'right'
res_ids.append(False)
return left, 'in', res_ids
# resolve string-based m2o criterion into IDs
if isinstance(right, basestring) or right and \
isinstance(right, (tuple, list)) and \
all(isinstance(item, basestring) for item in right):
push(create_substitution_leaf(
leaf,
_get_expression(relational_model, cr, uid, left, right,
operator, context=context),
working_model))
else:
# right == [] or right == False and all other cases
# are handled by __leaf_to_sql()
push_result(leaf)
# -------------------------------------------------
# OTHER FIELDS
# -> datetime fields: manage time part of the datetime
# field when it is not there
# -> manage translatable fields
# -------------------------------------------------
else:
if field._type == 'datetime' and right and len(right) == 10:
if operator in ('>', '>='):
right += ' 00:00:00'
elif operator in ('<', '<='):
right += ' 23:59:59'
push(create_substitution_leaf(leaf, (left, operator, right),
working_model))
elif field.translate and right:
need_wildcard = operator in ('like', 'ilike', 'not like',
'not ilike')
sql_operator = {'=like': 'like', '=ilike': 'ilike'}.get(
operator, operator)
if need_wildcard:
right = '%%%s%%' % right
inselect_operator = 'inselect'
if sql_operator in NEGATIVE_TERM_OPERATORS:
# negate operator (fix lp:1071710)
if sql_operator[:3] == 'not':
sql_operator = sql_operator[4:]
else:
sql_operator = '='
inselect_operator = 'not inselect'
unaccent = self._unaccent if sql_operator.endswith('like') \
else lambda x: x
trans_left = unaccent('value')
quote_left = unaccent(_quote(left))
instr = unaccent('%s')
if sql_operator == 'in':
# params will be flatten by to_sql() =>
# expand the placeholders
instr = '(%s)' % ', '.join(['%s'] * len(right))
subselect = """(SELECT res_id
FROM ir_translation
WHERE name = %s
AND lang = %s
AND type = %s
AND {trans_left} {operator} {right}
) UNION (
SELECT id
FROM "{table}"
WHERE {left} {operator} {right}
)
""".format(trans_left=trans_left,
operator=sql_operator, right=instr,
table=working_model._table,
left=quote_left)
params = (
working_model._name + ',' + left,
context.get('lang') or 'en_US',
'model',
right,
right,
)
push(create_substitution_leaf(leaf, ('id', inselect_operator,
(subselect, params)
), working_model))
else:
push_result(leaf)
# ----------------------------------------
# END OF PARSING FULL DOMAIN
# -> generate joins
# ----------------------------------------
joins = set()
for leaf in self.result:
joins |= set(leaf.get_join_conditions())
self.joins = list(joins)
# openerp.osv.expression.expression.parse = parse