gooderp18绿色标准版
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

432 行
19KB

  1. """
  2. Database Population via Duplication
  3. This tool provides utilities to duplicate records across models in Odoo, while maintaining referential integrity,
  4. handling field variations, and optimizing insertion performance. The duplication is controlled by a `factors` argument
  5. that specifies how many times each record should be duplicated. The duplication process takes into account fields
  6. that require unique constraints, distributed values (e.g., date fields), and relational fields (e.g., Many2one, Many2many).
  7. Key Features:
  8. -------------
  9. 1. **Field Variations**: Handles variations for certain fields to ensure uniqueness or to distribute values, such as:
  10. - Char/Text fields: Appends a postfix or variation to existing data.
  11. - Date/Datetime fields: Distributes dates within a specified range.
  12. 2. **Efficient Duplication**: Optimizes the duplication process by:
  13. - Dropping and restoring indexes to speed up bulk inserts.
  14. - Disabling foreign key constraint checks during duplication to avoid integrity errors.
  15. - Dynamically adjusting sequences to maintain consistency in auto-increment fields like `id`.
  16. 3. **Field-Specific Logic**:
  17. - Unique fields are identified and modified to avoid constraint violations.
  18. - Many2one fields are remapped to newly duplicated records.
  19. - One2many and Many2many relationships are handled by duplicating both sides of the relationship.
  20. 4. **Foreign Key and Index Management**:
  21. - Indexes are temporarily dropped during record creation and restored afterward to improve performance.
  22. - Foreign key checks are disabled temporarily to prevent constraint violations during record insertion.
  23. 5. **Dependency Management**: Ensures proper population order of models with dependencies (e.g., `_inherits` fields)
  24. by resolving dependencies before duplicating records.
  25. 6. **Dynamic SQL Generation**: Uses SQL queries to manipulate and duplicate data directly at the database level,
  26. ensuring performance and flexibility in handling large datasets.
  27. """
  28. from collections import defaultdict
  29. from contextlib import contextmanager
  30. from datetime import datetime
  31. from psycopg2.errors import InsufficientPrivilege
  32. from dateutil.relativedelta import relativedelta
  33. import logging
  34. from odoo.api import Environment
  35. from odoo.tools.sql import SQL
  36. from odoo.fields import Field, Many2one
  37. from odoo.models import Model
  38. _logger = logging.getLogger(__name__)
  39. # Min/Max value for a date/datetime field
  40. MIN_DATETIME = datetime((datetime.now() - relativedelta(years=4)).year, 1, 1)
  41. MAX_DATETIME = datetime.now()
  42. def get_field_variation_date(model: Model, field: Field, factor: int, series_alias: str) -> SQL:
  43. """
  44. Distribute the duplication series evenly between [field-total_days, field].
  45. We use a hard limit of (MAX_DATETIME - MIN_DATETIME) years in the past to avoid
  46. setting duplicated records too far back in the past.
  47. """
  48. total_days = min((MAX_DATETIME - MIN_DATETIME).days, factor)
  49. cast_type = SQL(field._column_type[1])
  50. def redistribute(value):
  51. return SQL(
  52. "(%(value)s - (%(factor)s - %(series_alias)s) * (%(total_days)s::float/%(factor)s) * interval '1 days')::%(cast_type)s",
  53. value=value,
  54. factor=factor,
  55. series_alias=SQL.identifier(series_alias),
  56. total_days=total_days,
  57. cast_type=cast_type,
  58. )
  59. if not field.company_dependent:
  60. return redistribute(SQL.identifier(field.name))
  61. # company_dependent -> jsonb
  62. return SQL(
  63. '(SELECT jsonb_object_agg(key, %(expr)s) FROM jsonb_each_text(%(field)s))',
  64. expr=redistribute(SQL('value::%s', cast_type)),
  65. field=SQL.identifier(field.name),
  66. )
  67. def get_field_variation_char(field: Field, postfix: str | SQL | None = None) -> SQL:
  68. """
  69. Append the `postfix` string to a char|text field.
  70. If no postfix is provided, returns no variation
  71. """
  72. if postfix is None:
  73. return SQL.identifier(field.name)
  74. if not isinstance(postfix, SQL):
  75. postfix = SQL.identifier(postfix)
  76. # if the field is translatable, it's a JSONB column, we vary all values for each key
  77. if field.translate:
  78. return SQL("""(
  79. SELECT jsonb_object_agg(key, value || %(postfix)s)
  80. FROM jsonb_each_text(%(field)s)
  81. )""", field=SQL.identifier(field.name), postfix=postfix)
  82. else:
  83. # no postfix for fields that are an '' (no point to)
  84. # or '/' (default/draft name for many model's records)
  85. return SQL("""
  86. CASE
  87. WHEN %(field)s IS NULL OR %(field)s IN ('/', '')
  88. THEN %(field)s
  89. ELSE %(field)s || %(postfix)s
  90. END
  91. """, field=SQL.identifier(field.name), postfix=postfix)
  92. class PopulateContext:
  93. def __init__(self):
  94. self.has_session_replication_role = True
  95. @contextmanager
  96. def ignore_indexes(self, model: Model):
  97. """
  98. Temporarily drop indexes on table to speed up insertion.
  99. PKey and Unique indexes are kept for constraints
  100. """
  101. indexes = model.env.execute_query_dict(SQL("""
  102. SELECT indexname AS name, indexdef AS definition
  103. FROM pg_indexes
  104. WHERE tablename = %s
  105. AND indexname NOT LIKE %s
  106. AND indexdef NOT LIKE %s
  107. """, model._table, '%pkey', '%UNIQUE%'))
  108. if indexes:
  109. _logger.info('Dropping indexes on table %s...', model._table)
  110. for index in indexes:
  111. model.env.cr.execute(SQL('DROP INDEX %s CASCADE', SQL.identifier(index['name'])))
  112. yield
  113. _logger.info('Adding indexes back on table %s...', model._table)
  114. for index in indexes:
  115. model.env.cr.execute(index['definition'])
  116. else:
  117. yield
  118. @contextmanager
  119. def ignore_fkey_constraints(self, model: Model):
  120. """
  121. Disable Fkey constraints checks by setting the session to replica.
  122. """
  123. if self.has_session_replication_role:
  124. try:
  125. model.env.cr.execute('SET session_replication_role TO replica')
  126. yield
  127. model.env.cr.execute('RESET session_replication_role')
  128. except InsufficientPrivilege:
  129. _logger.warning("Cannot ignore Fkey constraints during insertion due to insufficient privileges for current pg_role. "
  130. "Resetting transaction and retrying to populate without dropping the check on Fkey constraints. "
  131. "The bulk insertion will be vastly slower than anticipated.")
  132. model.env.cr.rollback()
  133. self.has_session_replication_role = False
  134. yield
  135. else:
  136. yield
  137. def field_needs_variation(model: Model, field: Field) -> bool:
  138. """
  139. Return True/False depending on if the field needs to be varied.
  140. Might be necessary in the case of:
  141. - unique constraints
  142. - varying dates for better distribution
  143. - field will be part of _rec_name_search, therefor variety is needed for effective searches
  144. - field has a trigram index on it
  145. """
  146. def is_unique(model_, field_):
  147. """
  148. An unique constraint is enforced by Postgres as an unique index,
  149. whether it's defined as a constraint on the table, or as an manual unique index.
  150. Both type of constraint are present in the index catalog
  151. """
  152. query = SQL("""
  153. SELECT EXISTS(SELECT 1
  154. FROM pg_index idx
  155. JOIN pg_class t ON t.oid = idx.indrelid
  156. JOIN pg_class i ON i.oid = idx.indexrelid
  157. JOIN pg_attribute a ON a.attnum = ANY (idx.indkey) AND a.attrelid = t.oid
  158. WHERE t.relname = %s -- tablename
  159. AND a.attname = %s -- column
  160. AND idx.indisunique = TRUE) AS is_unique;
  161. """, model_._table, field_.name)
  162. return model_.env.execute_query(query)[0][0]
  163. # Many2one fields are not considered, as a name_search would resolve it to the _rec_names_search of the related model
  164. in_names_search = model._rec_names_search and field.name in model._rec_names_search
  165. in_name = model._rec_name and field.name == model._rec_name
  166. if (in_name or in_names_search) and field.type != 'many2one':
  167. return True
  168. if field.type in ('date', 'datetime'):
  169. return True
  170. if field.index == 'trigram':
  171. return True
  172. return is_unique(model, field)
  173. def get_field_variation(model: Model, field: Field, factor: int, series_alias: str) -> SQL:
  174. """
  175. Returns a variation of the source field,
  176. to avoid unique constraint, or better distribute data.
  177. :return: a SQL(identifier|expression|subquery)
  178. """
  179. match field.type:
  180. case 'char' | 'text':
  181. return get_field_variation_char(field, postfix=series_alias)
  182. case 'date' | 'datetime':
  183. return get_field_variation_date(model, field, factor, series_alias)
  184. case 'html':
  185. # For the sake of simplicity we don't vary html fields
  186. return SQL.identifier(field.name)
  187. case _:
  188. _logger.warning("The field %s of type %s was marked to be varied, "
  189. "but no variation branch was found! Defaulting to a raw copy.", field, field.type)
  190. # fallback on a raw copy
  191. return SQL.identifier(field.name)
  192. def fetch_last_id(model: Model) -> int:
  193. query = SQL('SELECT id FROM %s ORDER BY id DESC LIMIT 1', SQL.identifier(model._table))
  194. return model.env.execute_query(query)[0][0]
  195. def populate_field(model: Model, field: Field, populated: dict[Model, int], factors: dict[Model, int],
  196. table_alias: str = 't', series_alias: str = 's') -> SQL | None:
  197. """
  198. Returns the source expression for copying the field (SQL(identifier|expression|subquery) | None)
  199. `table_alias` and `series_alias` are the identifiers used to reference
  200. the currently being populated table and it's series, respectively.
  201. """
  202. def copy_noop():
  203. return None
  204. def copy_raw(field_):
  205. return SQL.identifier(field_.name)
  206. def copy(field_):
  207. if field_needs_variation(model, field_):
  208. return get_field_variation(model, field_, factors[model], series_alias)
  209. else:
  210. return copy_raw(field_)
  211. def copy_id():
  212. last_id = fetch_last_id(model)
  213. populated[model] = last_id # this adds the model in the populated dict
  214. return SQL('id + %(last_id)s * %(series_alias)s', last_id=last_id, series_alias=SQL.identifier(series_alias))
  215. def copy_many2one(field_):
  216. # if the comodel was priorly populated, remap the many2one to the new copies
  217. if (comodel := model.env[field_.comodel_name]) in populated:
  218. comodel_max_id = populated[comodel]
  219. # we use MOD() instead of %, because % cannot be correctly escaped, it's a limitation of the SQL wrapper
  220. return SQL("%(table_alias)s.%(field_name)s + %(comodel_max_id)s * (MOD(%(series_alias)s - 1, %(factor)s) + 1)",
  221. table_alias=SQL.identifier(table_alias),
  222. field_name=SQL.identifier(field_.name),
  223. comodel_max_id=comodel_max_id,
  224. series_alias=SQL.identifier(series_alias),
  225. factor=factors[comodel])
  226. return copy(field_)
  227. if field.name == 'id':
  228. return copy_id()
  229. match field.type:
  230. case 'one2many':
  231. # there is nothing to copy, as it's value is implicitly read from the inverse Many2one
  232. return copy_noop()
  233. case 'many2many':
  234. # there is nothing to do, the copying of the m2m will be handled when copying the relation table
  235. return copy_noop()
  236. case 'many2one':
  237. return copy_many2one(field)
  238. case 'many2one_reference':
  239. # TODO: in the case of a reference field, there is no comodel,
  240. # but it's specified as the value of the field specified by model_field.
  241. # Not really sure how to handle this, as it involves reading the content pointed by model_field
  242. # to check on-the-fly if it's populated or not python-side, so for now we raw-copy it.
  243. # If we need to read on-the-fly, the populated structure needs to be in DB (via a new Model?)
  244. return copy(field)
  245. case 'binary':
  246. # copy only binary field that are inlined in the table
  247. return copy(field) if not field.attachment else copy_noop()
  248. case _:
  249. return copy(field)
  250. def populate_model(model: Model, populated: dict[Model, int], factors: dict[Model, int], separator_code: str) -> None:
  251. def update_sequence(model_):
  252. model_.env.execute_query(SQL("SELECT SETVAL(%(sequence)s, %(last_id)s, TRUE)",
  253. sequence=f"{model_._table}_id_seq", last_id=fetch_last_id(model_)))
  254. def has_column(field_):
  255. return field_.store and field_.column_type
  256. assert model not in populated, f"We do not populate a model ({model}) that has already been populated."
  257. _logger.info('Populating model %s %s times...', model._name, factors[model])
  258. dest_fields = []
  259. src_fields = []
  260. update_fields = []
  261. table_alias = 't'
  262. series_alias = 's'
  263. # process all stored fields (that has a respective column), if the model has an 'id', it's processed first
  264. for _, field in sorted(model._fields.items(), key=lambda pair: pair[0] != 'id'):
  265. if has_column(field):
  266. if field_needs_variation(model, field) and field.type in ('char', 'text'):
  267. update_fields.append(field)
  268. if src := populate_field(model, field, populated, factors, table_alias, series_alias):
  269. dest_fields.append(SQL.identifier(field.name))
  270. src_fields.append(src)
  271. # Update char/text fields for existing rows, to allow re-entrance
  272. if update_fields:
  273. query = SQL('UPDATE %(table)s SET (%(src_columns)s) = ROW(%(dest_columns)s)',
  274. table=SQL.identifier(model._table),
  275. src_columns=SQL(', ').join(SQL.identifier(field.name) for field in update_fields),
  276. dest_columns=SQL(', ').join(
  277. get_field_variation_char(field, postfix=SQL('CHR(%s)', separator_code))
  278. for field in update_fields))
  279. model.env.cr.execute(query)
  280. query = SQL("""
  281. INSERT INTO %(table)s (%(dest_columns)s)
  282. SELECT %(src_columns)s FROM %(table)s %(table_alias)s,
  283. GENERATE_SERIES(1, %(factor)s) %(series_alias)s
  284. """, table=SQL.identifier(model._table), factor=factors[model],
  285. dest_columns=SQL(', ').join(dest_fields), src_columns=SQL(', ').join(src_fields),
  286. table_alias=SQL.identifier(table_alias), series_alias=SQL.identifier(series_alias))
  287. model.env.cr.execute(query)
  288. # normally copying the 'id' will set the model entry in the populated dict,
  289. # but for the case of a table with no 'id' (ex: Many2many), we add manually,
  290. # by reading the key and having the defaultdict do the insertion, with a default value of 0
  291. if populated[model]:
  292. # in case we populated a model with an 'id', we update the sequence
  293. update_sequence(model)
  294. class Many2oneFieldWrapper(Many2one):
  295. def __init__(self, model, field_name, comodel_name):
  296. super().__init__(comodel_name)
  297. self._setup_attrs(model, field_name) # setup most of the default attrs
  298. class Many2manyModelWrapper:
  299. def __init__(self, env, field):
  300. self._name = field.relation # a m2m doesn't have a _name, so we use the tablename
  301. self._table = field.relation
  302. self._inherits = {}
  303. self.env = env
  304. self._rec_name = None
  305. self._rec_names_search = []
  306. # if the field is inherited, the column attributes are defined on the base_field
  307. column1 = field.column1 or field.base_field.column1
  308. column2 = field.column2 or field.base_field.column2
  309. # column1 refers to the model, while column2 refers to the comodel
  310. self._fields = {
  311. field.column1: Many2oneFieldWrapper(self, column1, field.model_name),
  312. field.column2: Many2oneFieldWrapper(self, column2, field.comodel_name),
  313. }
  314. def __repr__(self):
  315. return f"<Many2manyModelWrapper({self._name!r})>"
  316. def __eq__(self, other):
  317. return self._name == other._name
  318. def __hash__(self):
  319. return hash(self._name)
  320. def infer_many2many_model(env: Environment, field: Field) -> Model | Many2manyModelWrapper:
  321. """
  322. Returns the relation model used for the m2m:
  323. - If it's a custom model, return the model
  324. - If it's an implicite table generated by the ORM,
  325. return a wrapped model that behaves like a fake duck-typed model for the population algorithm
  326. """
  327. # check if the relation is an existing model
  328. for model_name, model_class in env.registry.items():
  329. if model_class._table == field.relation:
  330. return env[model_name]
  331. # the relation is a relational table, return a wrapped version
  332. return Many2manyModelWrapper(env, field)
  333. def populate_models(model_factors: dict[Model, int], separator_code: int) -> None:
  334. """
  335. Create factors new records using existing records as templates.
  336. If a dependency is found for a specific model, but it isn't specified by the user,
  337. it will inherit the factor of the dependant model.
  338. """
  339. def has_records(model_):
  340. query = SQL('SELECT EXISTS (SELECT 1 FROM %s)', SQL.identifier(model_._table))
  341. return model_.env.execute_query(query)[0][0]
  342. populated: dict[Model, int] = defaultdict(int)
  343. ctx: PopulateContext = PopulateContext()
  344. def process(model_):
  345. if model_ in populated:
  346. return
  347. if not has_records(model_): # if there are no records, there is nothing to populate
  348. populated[model_] = 0
  349. return
  350. # if the model has _inherits, the delegated models need to have been populated before the current one
  351. for model_name in model_._inherits:
  352. process(model_.env[model_name])
  353. with ctx.ignore_fkey_constraints(model_), ctx.ignore_indexes(model_):
  354. populate_model(model_, populated, model_factors, separator_code)
  355. # models on the other end of X2many relation should also be populated (ex: to avoid SO with no SOL)
  356. for field in model_._fields.values():
  357. if field.store and field.copy:
  358. match field.type:
  359. case 'one2many':
  360. comodel = model_.env[field.comodel_name]
  361. if comodel != model_:
  362. model_factors[comodel] = model_factors[model_]
  363. process(comodel)
  364. case 'many2many':
  365. m2m_model = infer_many2many_model(model_.env, field)
  366. model_factors[m2m_model] = model_factors[model_]
  367. process(m2m_model)
  368. for model in list(model_factors):
  369. process(model)
上海开阖软件有限公司 沪ICP备12045867号-1