|
- from odoo.http import request
- import itertools
- import operator
- import time
- import pickle
- from odoo import models, api
- from odoo.exceptions import UserError
- from functools import reduce
-
-
- class ReportBase(models.Model):
- _name = 'report.base'
- _description = '使用search_read来直接生成数据的基本类,其他类可以直接异名继承当前类来重用搜索、过滤、分组等函数'
-
- _expired_time = 60
- _cache_record = False
- _cache_env = False
- _cache_time = False
-
- def select_sql(self, sql_type='out'):
- return ''
-
- def from_sql(self, sql_type='out'):
- return ''
-
- def where_sql(self, sql_type='out'):
- return ''
-
- def group_sql(self, sql_type='out'):
- return ''
-
- def order_sql(self, sql_type='out'):
- return ''
-
- def get_context(self, sql_type='out', context=None):
- return {}
-
- def execute_sql(self, sql_type='out'):
- context = self.get_context(sql_type, context=self.env.context)
- for key, value in list(context.items()):
- if key == "date_end":
- continue
- if key == "date_start":
- continue
- if isinstance(context[key], str):
- context[key] = value.encode('utf-8')
- search_sql = (self.select_sql(sql_type) + self.from_sql(sql_type) + self.where_sql(
- sql_type) + self.group_sql(sql_type) + self.order_sql(
- sql_type)).format(**context)
-
- self.env.cr.execute(search_sql)
-
- return self.env.cr.dictfetchall()
-
- def collect_data_by_sql(self, sql_type='out'):
- return []
-
- def check_valid_domain(self, domain):
- if not isinstance(domain, (list, tuple)):
- raise UserError('不可识别的domain条件,请检查domain"%s"是否正确' % str(domain))
-
- def _get_next_domain(self, domains, index):
- domain = domains[index]
- if domain == '|':
- _, index = self.get_next_or_domain(domains, index + 1)
- else:
- index += 1
- if domain != '&':
- self.check_valid_domain(domain)
-
- return index
-
- def get_next_or_domain(self, domains, index):
- index = self._get_next_domain(domains, index)
-
- return index, self._get_next_domain(domains, index)
-
- def _process_domain(self, result, domain):
- if domain and len(domain) == 3:
- field, opto, value = domain
-
- compute_operator = {
- 'ilike': lambda field, value: value.lower() in field.lower(),
- 'like': lambda field, value: value in field,
- 'not ilike': lambda field, value: value.lower() not in field.lower(),
- 'not like': lambda field, value: value not in field,
- 'in': lambda field, value: field in value,
- 'not in': lambda field, value: field not in value,
- '=': operator.eq,
- '!=': operator.ne,
- '>': operator.gt,
- '<': operator.lt,
- '>=': operator.ge,
- '<=': operator.le,
- }
-
- opto = opto.lower()
- if field in result:
- if opto in compute_operator.keys():
- return compute_operator.get(opto)(result.get(field), value)
-
- raise UserError('暂时无法解析的domain条件%s,请联系管理员' % str(domain))
-
- raise UserError('不可识别的domain条件,请检查domain"%s"是否正确' % str(domain))
-
- def _compute_domain_util(self, result, domains):
- index = 0
- while index < len(domains):
- domain = domains[index]
- index += 1
- if domain == '|':
- left_index, right_index = self.get_next_or_domain(
- domains, index)
-
- if not self._compute_domain_util(result, domains[index:left_index]) and not self._compute_domain_util(
- result, domains[left_index:right_index]):
- return False
-
- index = right_index
- else:
- if domain == '&':
- continue
- self.check_valid_domain(domain)
- if not self._process_domain(result, domain):
- return False
-
- return True
-
- def _compute_domain(self, result, domain):
- return list(filter(lambda res: self._compute_domain_util(res, domain), result))
-
- @api.model
- def read_group(self, domain, fields, groupby, offset=0, limit=65535, orderby=False, lazy=True):
-
- def dict_plus(collect, values):
- for key, value in values.items():
- if isinstance(value, (int, float)):
- if key not in collect:
- collect[key] = 0
- collect[key] += value
-
- collect[groupby[0] + '_count'] += 1
-
- return collect
-
- res = []
- values = self.search_read(
- domain=domain, fields=fields, offset=offset, limit=limit or 65535, order=orderby)
-
- if groupby:
- key = operator.itemgetter(groupby[0])
- for group, itervalue in itertools.groupby(sorted(values, key=key), key):
- collect = {'__domain': [
- (groupby[0], '=', group)], groupby[0]: group, groupby[0] + '_count': 0}
- collect = reduce(lambda collect, value: dict_plus(
- collect, value), itervalue, collect)
-
- if len(groupby) > 1:
- collect.update({
- '__context': {'group_by': groupby[1:]}
- })
-
- if domain:
- collect['__domain'].extend(domain)
-
- res.append(collect)
-
- return res
-
- def _compute_order(self, result, order):
- """暂时不支持多重排序"""
- if order:
- order = order.partition(',')[0].partition(' ')
- for line in result:
- if line.get(order[0]) is False:
- line.update({order[0]: ''})
- result.sort(key=lambda item: item.get(
- order[0]), reverse=order[2] == 'ASC')
-
- return result
-
- def _compute_limit_and_offset(self, result, limit, offset):
- return list(result)[offset:limit + offset]
-
- def update_result_none_to_false(self, result):
- for val in result:
- for key, value in val.items():
- if value is None:
- val[key] = False
-
- return result
-
- def get_data_from_cache(self, sql_type='out'):
- if self._cache_env != (self.env.uid, self.env.context) \
- or not self._cache_record or self._cache_time + self._expired_time < time.time():
- self.__class__._cache_record = self.update_result_none_to_false(
- self.collect_data_by_sql(sql_type))
- self.__class__._cache_time = time.time()
- self.__class__._cache_env = (self.env.uid, self.env.context)
-
- return self._cache_record
-
- @api.model
- def search_read(self, domain=None, fields=None, offset=0, limit=65535, order=None):
- result = self.get_data_from_cache(sql_type='out')
-
- result = self._compute_domain(result, domain)
- result = self._compute_order(result, order)
- result = self._compute_limit_and_offset(result, limit, offset)
-
- return result
-
- @api.model
- def search_count(self, domain):
- result = self.get_data_from_cache(sql_type='out')
- result = self._compute_domain(result, domain)
-
- return len(list(result))
-
- def read(self, fields=None, context=None, load='_classic_read'):
- res = []
- fields = fields or []
-
- fields.append('id')
- for record in self.get_data_from_cache():
- if record.get('id') in self.ids:
- res.append({field: record.get(field) for field in fields})
-
- return res
|