gooderp18绿色标准版
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

256 line
12KB

  1. # Part of Odoo. See LICENSE file for full copyright and licensing details.
  2. import odoo
  3. import sys
  4. import optparse
  5. import logging
  6. from collections import defaultdict
  7. from . import Command
  8. from odoo.modules.registry import Registry
  9. from odoo.tools import SQL
  10. _logger = logging.getLogger(__name__)
  11. class Obfuscate(Command):
  12. """Obfuscate data in a given odoo database"""
  13. def __init__(self):
  14. super().__init__()
  15. self.cr = None
  16. def _ensure_cr(func):
  17. def check_cr(self, *args, **kwargs):
  18. if not self.cr:
  19. raise Exception("No database connection")
  20. return func(self, *args, **kwargs)
  21. return check_cr
  22. @_ensure_cr
  23. def begin(self):
  24. self.cr.execute("begin work")
  25. self.cr.execute("CREATE EXTENSION IF NOT EXISTS pgcrypto")
  26. @_ensure_cr
  27. def commit(self):
  28. self.cr.commit()
  29. @_ensure_cr
  30. def rollback(self):
  31. self.cr.rollback()
  32. @_ensure_cr
  33. def set_pwd(self, pwd):
  34. """Set password to cypher/uncypher datas"""
  35. self.cr.execute("INSERT INTO ir_config_parameter (key, value) VALUES ('odoo_cyph_pwd', 'odoo_cyph_'||encode(pgp_sym_encrypt(%s, %s), 'base64')) ON CONFLICT(key) DO NOTHING", [pwd, pwd])
  36. @_ensure_cr
  37. def check_pwd(self, pwd):
  38. """If password is set, check if it's valid"""
  39. uncypher_pwd = self.uncypher_string(SQL.identifier('value'), pwd)
  40. try:
  41. query = SQL("SELECT %s FROM ir_config_parameter WHERE key='odoo_cyph_pwd'", uncypher_pwd)
  42. self.cr.execute(query)
  43. if self.cr.rowcount == 0 or (self.cr.rowcount == 1 and self.cr.fetchone()[0] == pwd):
  44. return True
  45. except Exception as e: # noqa: BLE001
  46. _logger.error("Error checking password: %s", e)
  47. return False
  48. @_ensure_cr
  49. def clear_pwd(self):
  50. """Unset password to cypher/uncypher datas"""
  51. self.cr.execute("DELETE FROM ir_config_parameter WHERE key='odoo_cyph_pwd' ")
  52. def cypher_string(self, sql_field: SQL, password):
  53. # don't double cypher fields
  54. return SQL("""CASE WHEN starts_with(%(field_name)s, 'odoo_cyph_') THEN %(field_name)s ELSE 'odoo_cyph_'||encode(pgp_sym_encrypt(%(field_name)s, %(pwd)s), 'base64') END""", field_name=sql_field, pwd=password)
  55. def uncypher_string(self, sql_field: SQL, password):
  56. return SQL("""CASE WHEN starts_with(%(field_name)s, 'odoo_cyph_') THEN pgp_sym_decrypt(decode(substring(%(field_name)s, 11)::text, 'base64'), %(pwd)s) ELSE %(field_name)s END""", field_name=sql_field, pwd=password)
  57. def check_field(self, table, field):
  58. qry = "SELECT udt_name FROM information_schema.columns WHERE table_name=%s AND column_name=%s"
  59. self.cr.execute(qry, [table, field])
  60. if self.cr.rowcount == 1:
  61. res = self.cr.fetchone()
  62. if res[0] in ['text', 'varchar']:
  63. # Doesn t work for selection fields ...
  64. return 'string'
  65. if res[0] == 'jsonb':
  66. return 'json'
  67. return False
  68. def get_all_fields(self):
  69. qry = "SELECT table_name, column_name FROM information_schema.columns WHERE table_schema='public' AND udt_name IN ['text', 'varchar', 'jsonb'] AND NOT table_name LIKE 'ir_%' ORDER BY 1,2"
  70. self.cr.execute(qry)
  71. return self.cr.fetchall()
  72. def convert_table(self, table, fields, pwd, with_commit=False, unobfuscate=False):
  73. cypherings = []
  74. cyph_fct = self.uncypher_string if unobfuscate else self.cypher_string
  75. for field in fields:
  76. field_type = self.check_field(table, field)
  77. sql_field = SQL.identifier(field)
  78. if field_type == 'string':
  79. cypher_query = cyph_fct(sql_field, pwd)
  80. cypherings.append(SQL('%s=%s', SQL.identifier(field), cypher_query))
  81. elif field_type == 'json':
  82. # List every key
  83. # Loop on keys
  84. # Nest the jsonb_set calls to update all values at once
  85. # Do not create the key in json if doesn't esist
  86. new_field_value = sql_field
  87. self.cr.execute(SQL('select distinct jsonb_object_keys(%s) as key from %s', sql_field, SQL.identifier(table)))
  88. keys = [k[0] for k in self.cr.fetchall()]
  89. for key in keys:
  90. cypher_query = cyph_fct(SQL("%s->>%s", sql_field, key), pwd)
  91. new_field_value = SQL(
  92. """jsonb_set(%s, array[%s], to_jsonb(%s)::jsonb, FALSE)""",
  93. new_field_value, key, cypher_query
  94. )
  95. cypherings.append(SQL('%s=%s', sql_field, new_field_value))
  96. if cypherings:
  97. query = SQL("UPDATE %s SET %s", SQL.identifier(table), SQL(',').join(cypherings))
  98. self.cr.execute(query)
  99. if with_commit:
  100. self.commit()
  101. self.begin()
  102. def confirm_not_secure(self):
  103. _logger.info("The obfuscate method is not considered as safe to transfer anonymous datas to a third party.")
  104. conf_y = input(f"This will alter data in the database {self.dbname} and can lead to a data loss. Would you like to proceed [y/N]? ")
  105. if conf_y.upper() != 'Y':
  106. self.rollback()
  107. sys.exit(0)
  108. conf_db = input(f"Please type your database name ({self.dbname}) in UPPERCASE to confirm you understand this operation is not considered secure : ")
  109. if self.dbname.upper() != conf_db:
  110. self.rollback()
  111. sys.exit(0)
  112. return True
  113. def run(self, cmdargs):
  114. parser = odoo.tools.config.parser
  115. group = optparse.OptionGroup(parser, "Obfuscate Configuration")
  116. group.add_option('--pwd', dest="pwd", default=False, help="Cypher password")
  117. group.add_option('--fields', dest="fields", default=False, help="List of table.columns to obfuscate/unobfuscate: table1.column1,table2.column1,table2.column2")
  118. group.add_option('--exclude', dest="exclude", default=False, help="List of table.columns to exclude from obfuscate/unobfuscate: table1.column1,table2.column1,table2.column2")
  119. group.add_option('--file', dest="file", default=False, help="File containing the list of table.columns to obfuscate/unobfuscate")
  120. group.add_option('--unobfuscate', action='store_true', default=False)
  121. group.add_option('--allfields', action='store_true', default=False, help="Used in unobfuscate mode, try to unobfuscate all fields. Cannot be used in obfuscate mode. Slower than specifying fields.")
  122. group.add_option('--vacuum', action='store_true', default=False, help="Vacuum database after unobfuscating")
  123. group.add_option('--pertablecommit', action='store_true', default=False, help="Commit after each table instead of a big transaction")
  124. group.add_option(
  125. '-y', '--yes', dest="yes", action='store_true', default=False,
  126. help="Don't ask for manual confirmation. Use it carefully as the obfuscate method is not considered as safe to transfer anonymous datas to a third party.")
  127. parser.add_option_group(group)
  128. if not cmdargs:
  129. sys.exit(parser.print_help())
  130. try:
  131. opt = odoo.tools.config.parse_config(cmdargs, setup_logging=True)
  132. if not opt.pwd:
  133. _logger.error("--pwd is required")
  134. sys.exit("ERROR: --pwd is required")
  135. if opt.allfields and not opt.unobfuscate:
  136. _logger.error("--allfields can only be used in unobfuscate mode")
  137. sys.exit("ERROR: --allfields can only be used in unobfuscate mode")
  138. self.dbname = odoo.tools.config['db_name']
  139. self.registry = Registry(self.dbname)
  140. with self.registry.cursor() as cr:
  141. self.cr = cr
  142. self.begin()
  143. if self.check_pwd(opt.pwd):
  144. fields = [
  145. ('mail_tracking_value', 'old_value_char'),
  146. ('mail_tracking_value', 'old_value_text'),
  147. ('mail_tracking_value', 'new_value_char'),
  148. ('mail_tracking_value', 'new_value_text'),
  149. ('res_partner', 'name'),
  150. ('res_partner', 'complete_name'),
  151. ('res_partner', 'email'),
  152. ('res_partner', 'phone'),
  153. ('res_partner', 'mobile'),
  154. ('res_partner', 'street'),
  155. ('res_partner', 'street2'),
  156. ('res_partner', 'city'),
  157. ('res_partner', 'zip'),
  158. ('res_partner', 'vat'),
  159. ('res_partner', 'website'),
  160. ('res_country', 'name'),
  161. ('mail_message', 'subject'),
  162. ('mail_message', 'email_from'),
  163. ('mail_message', 'reply_to'),
  164. ('mail_message', 'body'),
  165. ('crm_lead', 'name'),
  166. ('crm_lead', 'contact_name'),
  167. ('crm_lead', 'partner_name'),
  168. ('crm_lead', 'email_from'),
  169. ('crm_lead', 'phone'),
  170. ('crm_lead', 'mobile'),
  171. ('crm_lead', 'website'),
  172. ('crm_lead', 'description'),
  173. ]
  174. if opt.fields:
  175. if not opt.allfields:
  176. fields += [tuple(f.split('.')) for f in opt.fields.split(',')]
  177. else:
  178. _logger.error("--allfields option is set, ignoring --fields option")
  179. if opt.file:
  180. with open(opt.file, encoding='utf-8') as f:
  181. fields += [tuple(l.strip().split('.')) for l in f]
  182. if opt.exclude:
  183. if not opt.allfields:
  184. fields = [f for f in fields if f not in [tuple(f.split('.')) for f in opt.exclude.split(',')]]
  185. else:
  186. _logger.error("--allfields option is set, ignoring --exclude option")
  187. if opt.allfields:
  188. fields = self.get_all_fields()
  189. else:
  190. invalid_fields = [f for f in fields if not self.check_field(f[0], f[1])]
  191. if invalid_fields:
  192. _logger.error("Invalid fields: %s", ', '.join([f"{f[0]}.{f[1]}" for f in invalid_fields]))
  193. fields = [f for f in fields if f not in invalid_fields]
  194. if not opt.unobfuscate and not opt.yes:
  195. self.confirm_not_secure()
  196. _logger.info("Processing fields: %s", ', '.join([f"{f[0]}.{f[1]}" for f in fields]))
  197. tables = defaultdict(set)
  198. for t, f in fields:
  199. if t[0:3] != 'ir_' and '.' not in t:
  200. tables[t].add(f)
  201. if opt.unobfuscate:
  202. _logger.info("Unobfuscating datas")
  203. for table in tables:
  204. _logger.info("Unobfuscating table %s", table)
  205. self.convert_table(table, tables[table], opt.pwd, opt.pertablecommit, True)
  206. if opt.vacuum:
  207. _logger.info("Vacuuming obfuscated tables")
  208. for table in tables:
  209. _logger.debug("Vacuuming table %s", table)
  210. self.cr.execute(SQL("VACUUM FULL %s", SQL.identifier(table)))
  211. self.clear_pwd()
  212. else:
  213. _logger.info("Obfuscating datas")
  214. self.set_pwd(opt.pwd)
  215. for table in tables:
  216. _logger.info("Obfuscating table %s", table)
  217. self.convert_table(table, tables[table], opt.pwd, opt.pertablecommit)
  218. self.commit()
  219. else:
  220. self.rollback()
  221. except Exception as e: # noqa: BLE001
  222. sys.exit("ERROR: %s" % e)
上海开阖软件有限公司 沪ICP备12045867号-1