gooderp18绿色标准版
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

1432 行
58KB

  1. #-----------------------------------------------------------
  2. # Threaded, Gevent and Prefork Servers
  3. #-----------------------------------------------------------
  4. import datetime
  5. import errno
  6. import logging
  7. import os
  8. import os.path
  9. import platform
  10. import random
  11. import select
  12. import signal
  13. import socket
  14. import subprocess
  15. import sys
  16. import threading
  17. import time
  18. from io import BytesIO
  19. import psutil
  20. import werkzeug.serving
  21. if os.name == 'posix':
  22. # Unix only for workers
  23. import fcntl
  24. import resource
  25. try:
  26. import inotify
  27. from inotify.adapters import InotifyTrees
  28. from inotify.constants import IN_MODIFY, IN_CREATE, IN_MOVED_TO
  29. INOTIFY_LISTEN_EVENTS = IN_MODIFY | IN_CREATE | IN_MOVED_TO
  30. except ImportError:
  31. inotify = None
  32. else:
  33. # Windows shim
  34. signal.SIGHUP = -1
  35. inotify = None
  36. if not inotify:
  37. try:
  38. import watchdog
  39. from watchdog.observers import Observer
  40. from watchdog.events import FileCreatedEvent, FileModifiedEvent, FileMovedEvent
  41. except ImportError:
  42. watchdog = None
  43. # Optional process names for workers
  44. try:
  45. from setproctitle import setproctitle
  46. except ImportError:
  47. setproctitle = lambda x: None
  48. import odoo
  49. from odoo.modules import get_modules
  50. from odoo.modules.registry import Registry
  51. from odoo.release import nt_service_name
  52. from odoo.tools import config
  53. from odoo.tools.cache import log_ormcache_stats
  54. from odoo.tools.misc import stripped_sys_argv, dumpstacks
  55. _logger = logging.getLogger(__name__)
  56. SLEEP_INTERVAL = 60 # 1 min
  57. def memory_info(process):
  58. """
  59. :return: the relevant memory usage according to the OS in bytes.
  60. """
  61. # psutil < 2.0 does not have memory_info, >= 3.0 does not have get_memory_info
  62. pmem = (getattr(process, 'memory_info', None) or process.get_memory_info)()
  63. # MacOSX allocates very large vms to all processes so we only monitor the rss usage.
  64. if platform.system() == 'Darwin':
  65. return pmem.rss
  66. return pmem.vms
  67. def set_limit_memory_hard():
  68. if platform.system() != 'Linux':
  69. return
  70. limit_memory_hard = config['limit_memory_hard']
  71. if odoo.evented and config['limit_memory_hard_gevent']:
  72. limit_memory_hard = config['limit_memory_hard_gevent']
  73. if limit_memory_hard:
  74. rlimit = resource.RLIMIT_AS
  75. soft, hard = resource.getrlimit(rlimit)
  76. resource.setrlimit(rlimit, (limit_memory_hard, hard))
  77. def empty_pipe(fd):
  78. try:
  79. while os.read(fd, 1):
  80. pass
  81. except OSError as e:
  82. if e.errno not in [errno.EAGAIN]:
  83. raise
  84. #----------------------------------------------------------
  85. # Werkzeug WSGI servers patched
  86. #----------------------------------------------------------
  87. class LoggingBaseWSGIServerMixIn(object):
  88. def handle_error(self, request, client_address):
  89. t, e, _ = sys.exc_info()
  90. if t == socket.error and e.errno == errno.EPIPE:
  91. # broken pipe, ignore error
  92. return
  93. _logger.exception('Exception happened during processing of request from %s', client_address)
  94. class BaseWSGIServerNoBind(LoggingBaseWSGIServerMixIn, werkzeug.serving.BaseWSGIServer):
  95. """ werkzeug Base WSGI Server patched to skip socket binding. PreforkServer
  96. use this class, sets the socket and calls the process_request() manually
  97. """
  98. def __init__(self, app):
  99. werkzeug.serving.BaseWSGIServer.__init__(self, "127.0.0.1", 0, app)
  100. # Directly close the socket. It will be replaced by WorkerHTTP when processing requests
  101. if self.socket:
  102. self.socket.close()
  103. def server_activate(self):
  104. # dont listen as we use PreforkServer#socket
  105. pass
  106. class RequestHandler(werkzeug.serving.WSGIRequestHandler):
  107. def setup(self):
  108. # timeout to avoid chrome headless preconnect during tests
  109. if config['test_enable'] or config['test_file']:
  110. self.timeout = 5
  111. # flag the current thread as handling a http request
  112. super(RequestHandler, self).setup()
  113. me = threading.current_thread()
  114. me.name = 'odoo.service.http.request.%s' % (me.ident,)
  115. def make_environ(self):
  116. environ = super().make_environ()
  117. # Add the TCP socket to environ in order for the websocket
  118. # connections to use it.
  119. environ['socket'] = self.connection
  120. if self.headers.get('Upgrade') == 'websocket':
  121. # Since the upgrade header is introduced in version 1.1, Firefox
  122. # won't accept a websocket connection if the version is set to
  123. # 1.0.
  124. self.protocol_version = "HTTP/1.1"
  125. return environ
  126. def send_header(self, keyword, value):
  127. # Prevent `WSGIRequestHandler` from sending the connection close header (compatibility with werkzeug >= 2.1.1 )
  128. # since it is incompatible with websocket.
  129. if self.headers.get('Upgrade') == 'websocket' and keyword == 'Connection' and value == 'close':
  130. # Do not keep processing requests.
  131. self.close_connection = True
  132. return
  133. super().send_header(keyword, value)
  134. def end_headers(self, *a, **kw):
  135. super().end_headers(*a, **kw)
  136. # At this point, Werkzeug assumes the connection is closed and will discard any incoming
  137. # data. In the case of WebSocket connections, data should not be discarded. Replace the
  138. # rfile/wfile of this handler to prevent any further action (compatibility with werkzeug >= 2.3.x).
  139. # See: https://github.com/pallets/werkzeug/blob/2.3.x/src/werkzeug/serving.py#L334
  140. if self.headers.get('Upgrade') == 'websocket':
  141. self.rfile = BytesIO()
  142. self.wfile = BytesIO()
  143. class ThreadedWSGIServerReloadable(LoggingBaseWSGIServerMixIn, werkzeug.serving.ThreadedWSGIServer):
  144. """ werkzeug Threaded WSGI Server patched to allow reusing a listen socket
  145. given by the environment, this is used by autoreload to keep the listen
  146. socket open when a reload happens.
  147. """
  148. def __init__(self, host, port, app):
  149. # The ODOO_MAX_HTTP_THREADS environment variable allows to limit the amount of concurrent
  150. # socket connections accepted by a threaded server, implicitly limiting the amount of
  151. # concurrent threads running for http requests handling.
  152. self.max_http_threads = os.environ.get("ODOO_MAX_HTTP_THREADS")
  153. if self.max_http_threads:
  154. try:
  155. self.max_http_threads = int(self.max_http_threads)
  156. except ValueError:
  157. # If the value can't be parsed to an integer then it's computed in an automated way to
  158. # half the size of db_maxconn because while most requests won't borrow cursors concurrently
  159. # there are some exceptions where some controllers might allocate two or more cursors.
  160. self.max_http_threads = max((config['db_maxconn'] - config['max_cron_threads']) // 2, 1)
  161. self.http_threads_sem = threading.Semaphore(self.max_http_threads)
  162. super(ThreadedWSGIServerReloadable, self).__init__(host, port, app,
  163. handler=RequestHandler)
  164. # See https://github.com/pallets/werkzeug/pull/770
  165. # This allow the request threads to not be set as daemon
  166. # so the server waits for them when shutting down gracefully.
  167. self.daemon_threads = False
  168. def server_bind(self):
  169. SD_LISTEN_FDS_START = 3
  170. if os.environ.get('LISTEN_FDS') == '1' and os.environ.get('LISTEN_PID') == str(os.getpid()):
  171. self.reload_socket = True
  172. self.socket = socket.fromfd(SD_LISTEN_FDS_START, socket.AF_INET, socket.SOCK_STREAM)
  173. _logger.info('HTTP service (werkzeug) running through socket activation')
  174. else:
  175. self.reload_socket = False
  176. super(ThreadedWSGIServerReloadable, self).server_bind()
  177. _logger.info('HTTP service (werkzeug) running on %s:%s', self.server_name, self.server_port)
  178. def server_activate(self):
  179. if not self.reload_socket:
  180. super(ThreadedWSGIServerReloadable, self).server_activate()
  181. def process_request(self, request, client_address):
  182. """
  183. Start a new thread to process the request.
  184. Override the default method of class socketserver.ThreadingMixIn
  185. to be able to get the thread object which is instantiated
  186. and set its start time as an attribute
  187. """
  188. t = threading.Thread(target = self.process_request_thread,
  189. args = (request, client_address))
  190. t.daemon = self.daemon_threads
  191. t.type = 'http'
  192. t.start_time = time.time()
  193. t.start()
  194. def _handle_request_noblock(self):
  195. if self.max_http_threads and not self.http_threads_sem.acquire(timeout=0.1):
  196. # If the semaphore is full we will return immediately to the upstream (most probably
  197. # socketserver.BaseServer's serve_forever loop which will retry immediately as the
  198. # selector will find a pending connection to accept on the socket. There is a 100 ms
  199. # penalty in such case in order to avoid cpu bound loop while waiting for the semaphore.
  200. return
  201. # upstream _handle_request_noblock will handle errors and call shutdown_request in any cases
  202. super(ThreadedWSGIServerReloadable, self)._handle_request_noblock()
  203. def shutdown_request(self, request):
  204. if self.max_http_threads:
  205. # upstream is supposed to call this function no matter what happens during processing
  206. self.http_threads_sem.release()
  207. super().shutdown_request(request)
  208. #----------------------------------------------------------
  209. # FileSystem Watcher for autoreload and cache invalidation
  210. #----------------------------------------------------------
  211. class FSWatcherBase(object):
  212. def handle_file(self, path):
  213. if path.endswith('.py') and not os.path.basename(path).startswith('.~'):
  214. try:
  215. source = open(path, 'rb').read() + b'\n'
  216. compile(source, path, 'exec')
  217. except IOError:
  218. _logger.error('autoreload: python code change detected, IOError for %s', path)
  219. except SyntaxError:
  220. _logger.error('autoreload: python code change detected, SyntaxError in %s', path)
  221. else:
  222. if not server_phoenix:
  223. _logger.info('autoreload: python code updated, autoreload activated')
  224. restart()
  225. return True
  226. class FSWatcherWatchdog(FSWatcherBase):
  227. def __init__(self):
  228. self.observer = Observer()
  229. for path in odoo.addons.__path__:
  230. _logger.info('Watching addons folder %s', path)
  231. self.observer.schedule(self, path, recursive=True)
  232. def dispatch(self, event):
  233. if isinstance(event, (FileCreatedEvent, FileModifiedEvent, FileMovedEvent)):
  234. if not event.is_directory:
  235. path = getattr(event, 'dest_path', '') or event.src_path
  236. self.handle_file(path)
  237. def start(self):
  238. self.observer.start()
  239. _logger.info('AutoReload watcher running with watchdog')
  240. def stop(self):
  241. self.observer.stop()
  242. self.observer.join()
  243. class FSWatcherInotify(FSWatcherBase):
  244. def __init__(self):
  245. self.started = False
  246. # ignore warnings from inotify in case we have duplicate addons paths.
  247. inotify.adapters._LOGGER.setLevel(logging.ERROR)
  248. # recreate a list as InotifyTrees' __init__ deletes the list's items
  249. paths_to_watch = []
  250. for path in odoo.addons.__path__:
  251. paths_to_watch.append(path)
  252. _logger.info('Watching addons folder %s', path)
  253. self.watcher = InotifyTrees(paths_to_watch, mask=INOTIFY_LISTEN_EVENTS, block_duration_s=.5)
  254. def run(self):
  255. _logger.info('AutoReload watcher running with inotify')
  256. dir_creation_events = set(('IN_MOVED_TO', 'IN_CREATE'))
  257. while self.started:
  258. for event in self.watcher.event_gen(timeout_s=0, yield_nones=False):
  259. (_, type_names, path, filename) = event
  260. if 'IN_ISDIR' not in type_names:
  261. # despite not having IN_DELETE in the watcher's mask, the
  262. # watcher sends these events when a directory is deleted.
  263. if 'IN_DELETE' not in type_names:
  264. full_path = os.path.join(path, filename)
  265. if self.handle_file(full_path):
  266. return
  267. elif dir_creation_events.intersection(type_names):
  268. full_path = os.path.join(path, filename)
  269. for root, _, files in os.walk(full_path):
  270. for file in files:
  271. if self.handle_file(os.path.join(root, file)):
  272. return
  273. def start(self):
  274. self.started = True
  275. self.thread = threading.Thread(target=self.run, name="odoo.service.autoreload.watcher")
  276. self.thread.daemon = True
  277. self.thread.start()
  278. def stop(self):
  279. self.started = False
  280. self.thread.join()
  281. del self.watcher # ensures inotify watches are freed up before reexec
  282. #----------------------------------------------------------
  283. # Servers: Threaded, Gevented and Prefork
  284. #----------------------------------------------------------
  285. class CommonServer(object):
  286. _on_stop_funcs = []
  287. def __init__(self, app):
  288. self.app = app
  289. # config
  290. self.interface = config['http_interface'] or '0.0.0.0'
  291. self.port = config['http_port']
  292. # runtime
  293. self.pid = os.getpid()
  294. def close_socket(self, sock):
  295. """ Closes a socket instance cleanly
  296. :param sock: the network socket to close
  297. :type sock: socket.socket
  298. """
  299. try:
  300. sock.shutdown(socket.SHUT_RDWR)
  301. except socket.error as e:
  302. if e.errno == errno.EBADF:
  303. # Werkzeug > 0.9.6 closes the socket itself (see commit
  304. # https://github.com/mitsuhiko/werkzeug/commit/4d8ca089)
  305. return
  306. # On OSX, socket shutdowns both sides if any side closes it
  307. # causing an error 57 'Socket is not connected' on shutdown
  308. # of the other side (or something), see
  309. # http://bugs.python.org/issue4397
  310. # note: stdlib fixed test, not behavior
  311. if e.errno != errno.ENOTCONN or platform.system() not in ['Darwin', 'Windows']:
  312. raise
  313. sock.close()
  314. @classmethod
  315. def on_stop(cls, func):
  316. """ Register a cleanup function to be executed when the server stops """
  317. cls._on_stop_funcs.append(func)
  318. def stop(self):
  319. for func in self._on_stop_funcs:
  320. try:
  321. _logger.debug("on_close call %s", func)
  322. func()
  323. except Exception:
  324. _logger.warning("Exception in %s", func.__name__, exc_info=True)
  325. class ThreadedServer(CommonServer):
  326. def __init__(self, app):
  327. super(ThreadedServer, self).__init__(app)
  328. self.main_thread_id = threading.current_thread().ident
  329. # Variable keeping track of the number of calls to the signal handler defined
  330. # below. This variable is monitored by ``quit_on_signals()``.
  331. self.quit_signals_received = 0
  332. #self.socket = None
  333. self.httpd = None
  334. self.limits_reached_threads = set()
  335. self.limit_reached_time = None
  336. def signal_handler(self, sig, frame):
  337. if sig in [signal.SIGINT, signal.SIGTERM]:
  338. # shutdown on kill -INT or -TERM
  339. self.quit_signals_received += 1
  340. if self.quit_signals_received > 1:
  341. # logging.shutdown was already called at this point.
  342. sys.stderr.write("Forced shutdown.\n")
  343. os._exit(0)
  344. # interrupt run() to start shutdown
  345. raise KeyboardInterrupt()
  346. elif hasattr(signal, 'SIGXCPU') and sig == signal.SIGXCPU:
  347. sys.stderr.write("CPU time limit exceeded! Shutting down immediately\n")
  348. sys.stderr.flush()
  349. os._exit(0)
  350. elif sig == signal.SIGHUP:
  351. # restart on kill -HUP
  352. global server_phoenix # noqa: PLW0603
  353. server_phoenix = True
  354. self.quit_signals_received += 1
  355. # interrupt run() to start shutdown
  356. raise KeyboardInterrupt()
  357. def process_limit(self):
  358. memory = memory_info(psutil.Process(os.getpid()))
  359. if config['limit_memory_soft'] and memory > config['limit_memory_soft']:
  360. _logger.warning('Server memory limit (%s) reached.', memory)
  361. self.limits_reached_threads.add(threading.current_thread())
  362. for thread in threading.enumerate():
  363. thread_type = getattr(thread, 'type', None)
  364. if not thread.daemon and thread_type != 'websocket' or thread_type == 'cron':
  365. # We apply the limits on cron threads and HTTP requests,
  366. # websocket requests excluded.
  367. if getattr(thread, 'start_time', None):
  368. thread_execution_time = time.time() - thread.start_time
  369. thread_limit_time_real = config['limit_time_real']
  370. if (getattr(thread, 'type', None) == 'cron' and
  371. config['limit_time_real_cron'] and config['limit_time_real_cron'] > 0):
  372. thread_limit_time_real = config['limit_time_real_cron']
  373. if thread_limit_time_real and thread_execution_time > thread_limit_time_real:
  374. _logger.warning(
  375. 'Thread %s virtual real time limit (%d/%ds) reached.',
  376. thread, thread_execution_time, thread_limit_time_real)
  377. self.limits_reached_threads.add(thread)
  378. # Clean-up threads that are no longer alive
  379. # e.g. threads that exceeded their real time,
  380. # but which finished before the server could restart.
  381. for thread in list(self.limits_reached_threads):
  382. if not thread.is_alive():
  383. self.limits_reached_threads.remove(thread)
  384. if self.limits_reached_threads:
  385. self.limit_reached_time = self.limit_reached_time or time.time()
  386. else:
  387. self.limit_reached_time = None
  388. def cron_thread(self, number):
  389. # Steve Reich timing style with thundering herd mitigation.
  390. #
  391. # On startup, all workers bind on a notification channel in
  392. # postgres so they can be woken up at will. At worst they wake
  393. # up every SLEEP_INTERVAL with a jitter. The jitter creates a
  394. # chorus effect that helps distribute on the timeline the moment
  395. # when individual worker wake up.
  396. #
  397. # On NOTIFY, all workers are awaken at the same time, sleeping
  398. # just a bit prevents they all poll the database at the exact
  399. # same time. This is known as the thundering herd effect.
  400. from odoo.addons.base.models.ir_cron import ir_cron
  401. def _run_cron(cr):
  402. pg_conn = cr._cnx
  403. # LISTEN / NOTIFY doesn't work in recovery mode
  404. cr.execute("SELECT pg_is_in_recovery()")
  405. in_recovery = cr.fetchone()[0]
  406. if not in_recovery:
  407. cr.execute("LISTEN cron_trigger")
  408. else:
  409. _logger.warning("PG cluster in recovery mode, cron trigger not activated")
  410. cr.commit()
  411. alive_time = time.monotonic()
  412. while config['limit_time_worker_cron'] <= 0 or (time.monotonic() - alive_time) <= config['limit_time_worker_cron']:
  413. select.select([pg_conn], [], [], SLEEP_INTERVAL + number)
  414. time.sleep(number / 100)
  415. pg_conn.poll()
  416. registries = odoo.modules.registry.Registry.registries
  417. _logger.debug('cron%d polling for jobs', number)
  418. for db_name, registry in registries.d.items():
  419. if registry.ready:
  420. thread = threading.current_thread()
  421. thread.start_time = time.time()
  422. try:
  423. ir_cron._process_jobs(db_name)
  424. except Exception:
  425. _logger.warning('cron%d encountered an Exception:', number, exc_info=True)
  426. thread.start_time = None
  427. while True:
  428. conn = odoo.sql_db.db_connect('postgres')
  429. with conn.cursor() as cr:
  430. _run_cron(cr)
  431. _logger.info('cron%d max age (%ss) reached, releasing connection.', number, config['limit_time_worker_cron'])
  432. def cron_spawn(self):
  433. """ Start the above runner function in a daemon thread.
  434. The thread is a typical daemon thread: it will never quit and must be
  435. terminated when the main process exits - with no consequence (the processing
  436. threads it spawns are not marked daemon).
  437. """
  438. # Force call to strptime just before starting the cron thread
  439. # to prevent time.strptime AttributeError within the thread.
  440. # See: http://bugs.python.org/issue7980
  441. datetime.datetime.strptime('2012-01-01', '%Y-%m-%d')
  442. for i in range(odoo.tools.config['max_cron_threads']):
  443. def target():
  444. self.cron_thread(i)
  445. t = threading.Thread(target=target, name="odoo.service.cron.cron%d" % i)
  446. t.daemon = True
  447. t.type = 'cron'
  448. t.start()
  449. _logger.debug("cron%d started!" % i)
  450. def http_spawn(self):
  451. self.httpd = ThreadedWSGIServerReloadable(self.interface, self.port, self.app)
  452. threading.Thread(
  453. target=self.httpd.serve_forever,
  454. name="odoo.service.httpd",
  455. daemon=True,
  456. ).start()
  457. def start(self, stop=False):
  458. _logger.debug("Setting signal handlers")
  459. set_limit_memory_hard()
  460. if os.name == 'posix':
  461. signal.signal(signal.SIGINT, self.signal_handler)
  462. signal.signal(signal.SIGTERM, self.signal_handler)
  463. signal.signal(signal.SIGCHLD, self.signal_handler)
  464. signal.signal(signal.SIGHUP, self.signal_handler)
  465. signal.signal(signal.SIGXCPU, self.signal_handler)
  466. signal.signal(signal.SIGQUIT, dumpstacks)
  467. signal.signal(signal.SIGUSR1, log_ormcache_stats)
  468. elif os.name == 'nt':
  469. import win32api
  470. win32api.SetConsoleCtrlHandler(lambda sig: self.signal_handler(sig, None), 1)
  471. test_mode = config['test_enable'] or config['test_file']
  472. if test_mode or (config['http_enable'] and not stop):
  473. # some tests need the http daemon to be available...
  474. self.http_spawn()
  475. def stop(self):
  476. """ Shutdown the WSGI server. Wait for non daemon threads.
  477. """
  478. if server_phoenix:
  479. _logger.info("Initiating server reload")
  480. else:
  481. _logger.info("Initiating shutdown")
  482. _logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
  483. stop_time = time.time()
  484. if self.httpd:
  485. self.httpd.shutdown()
  486. super().stop()
  487. # Manually join() all threads before calling sys.exit() to allow a second signal
  488. # to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
  489. # threading.Thread.join() should not mask signals (at least in python 2.5).
  490. me = threading.current_thread()
  491. _logger.debug('current thread: %r', me)
  492. for thread in threading.enumerate():
  493. _logger.debug('process %r (%r)', thread, thread.daemon)
  494. if (thread != me and not thread.daemon and thread.ident != self.main_thread_id and
  495. thread not in self.limits_reached_threads):
  496. while thread.is_alive() and (time.time() - stop_time) < 1:
  497. # We wait for requests to finish, up to 1 second.
  498. _logger.debug('join and sleep')
  499. # Need a busyloop here as thread.join() masks signals
  500. # and would prevent the forced shutdown.
  501. thread.join(0.05)
  502. time.sleep(0.05)
  503. odoo.sql_db.close_all()
  504. _logger.debug('--')
  505. logging.shutdown()
  506. def run(self, preload=None, stop=False):
  507. """ Start the http server and the cron thread then wait for a signal.
  508. The first SIGINT or SIGTERM signal will initiate a graceful shutdown while
  509. a second one if any will force an immediate exit.
  510. """
  511. with Registry._lock:
  512. self.start(stop=stop)
  513. rc = preload_registries(preload)
  514. if stop:
  515. if config['test_enable']:
  516. from odoo.tests.result import _logger as logger # noqa: PLC0415
  517. with Registry.registries._lock:
  518. for db, registry in Registry.registries.d.items():
  519. report = registry._assertion_report
  520. log = logger.error if not report.wasSuccessful() \
  521. else logger.warning if not report.testsRun \
  522. else logger.info
  523. log("%s when loading database %r", report, db)
  524. self.stop()
  525. return rc
  526. self.cron_spawn()
  527. # Wait for a first signal to be handled. (time.sleep will be interrupted
  528. # by the signal handler)
  529. try:
  530. while self.quit_signals_received == 0:
  531. self.process_limit()
  532. if self.limit_reached_time:
  533. has_other_valid_requests = any(
  534. not t.daemon and
  535. t not in self.limits_reached_threads
  536. for t in threading.enumerate()
  537. if getattr(t, 'type', None) == 'http')
  538. if (not has_other_valid_requests or
  539. (time.time() - self.limit_reached_time) > SLEEP_INTERVAL):
  540. # We wait there is no processing requests
  541. # other than the ones exceeding the limits, up to 1 min,
  542. # before asking for a reload.
  543. _logger.info('Dumping stacktrace of limit exceeding threads before reloading')
  544. dumpstacks(thread_idents=[thread.ident for thread in self.limits_reached_threads])
  545. self.reload()
  546. # `reload` increments `self.quit_signals_received`
  547. # and the loop will end after this iteration,
  548. # therefore leading to the server stop.
  549. # `reload` also sets the `server_phoenix` flag
  550. # to tell the server to restart the server after shutting down.
  551. else:
  552. time.sleep(1)
  553. else:
  554. time.sleep(SLEEP_INTERVAL)
  555. except KeyboardInterrupt:
  556. pass
  557. self.stop()
  558. def reload(self):
  559. os.kill(self.pid, signal.SIGHUP)
  560. class GeventServer(CommonServer):
  561. def __init__(self, app):
  562. super(GeventServer, self).__init__(app)
  563. self.port = config['gevent_port']
  564. self.httpd = None
  565. def process_limits(self):
  566. restart = False
  567. if self.ppid != os.getppid():
  568. _logger.warning("Gevent Parent changed: %s", self.pid)
  569. restart = True
  570. memory = memory_info(psutil.Process(self.pid))
  571. limit_memory_soft = config['limit_memory_soft_gevent'] or config['limit_memory_soft']
  572. if limit_memory_soft and memory > limit_memory_soft:
  573. _logger.warning('Gevent virtual memory limit reached: %s', memory)
  574. restart = True
  575. if restart:
  576. # suicide !!
  577. os.kill(self.pid, signal.SIGTERM)
  578. def watchdog(self, beat=4):
  579. import gevent
  580. self.ppid = os.getppid()
  581. while True:
  582. self.process_limits()
  583. gevent.sleep(beat)
  584. def start(self):
  585. import gevent
  586. try:
  587. from gevent.pywsgi import WSGIServer, WSGIHandler
  588. except ImportError:
  589. from gevent.wsgi import WSGIServer, WSGIHandler
  590. class ProxyHandler(WSGIHandler):
  591. """ When logging requests, try to get the client address from
  592. the environment so we get proxyfix's modifications (if any).
  593. Derived from werzeug.serving.WSGIRequestHandler.log
  594. / werzeug.serving.WSGIRequestHandler.address_string
  595. """
  596. def _connection_upgrade_requested(self):
  597. if self.headers.get('Connection', '').lower() == 'upgrade':
  598. return True
  599. if self.headers.get('Upgrade', '').lower() == 'websocket':
  600. return True
  601. return False
  602. def format_request(self):
  603. old_address = self.client_address
  604. if getattr(self, 'environ', None):
  605. self.client_address = self.environ['REMOTE_ADDR']
  606. elif not self.client_address:
  607. self.client_address = '<local>'
  608. # other cases are handled inside WSGIHandler
  609. try:
  610. return super().format_request()
  611. finally:
  612. self.client_address = old_address
  613. def finalize_headers(self):
  614. # We need to make gevent.pywsgi stop dealing with chunks when the connection
  615. # Is being upgraded. see https://github.com/gevent/gevent/issues/1712
  616. super().finalize_headers()
  617. if self.code == 101:
  618. # Switching Protocols. Disable chunked writes.
  619. self.response_use_chunked = False
  620. def get_environ(self):
  621. # Add the TCP socket to environ in order for the websocket
  622. # connections to use it.
  623. environ = super().get_environ()
  624. environ['socket'] = self.socket
  625. # Disable support for HTTP chunking on reads which cause
  626. # an issue when the connection is being upgraded, see
  627. # https://github.com/gevent/gevent/issues/1712
  628. if self._connection_upgrade_requested():
  629. environ['wsgi.input'] = self.rfile
  630. environ['wsgi.input_terminated'] = False
  631. return environ
  632. set_limit_memory_hard()
  633. if os.name == 'posix':
  634. # Set process memory limit as an extra safeguard
  635. signal.signal(signal.SIGQUIT, dumpstacks)
  636. signal.signal(signal.SIGUSR1, log_ormcache_stats)
  637. gevent.spawn(self.watchdog)
  638. self.httpd = WSGIServer(
  639. (self.interface, self.port), self.app,
  640. log=logging.getLogger('longpolling'),
  641. error_log=logging.getLogger('longpolling'),
  642. handler_class=ProxyHandler,
  643. )
  644. _logger.info('Evented Service (longpolling) running on %s:%s', self.interface, self.port)
  645. try:
  646. self.httpd.serve_forever()
  647. except:
  648. _logger.exception("Evented Service (longpolling): uncaught error during main loop")
  649. raise
  650. def stop(self):
  651. import gevent
  652. self.httpd.stop()
  653. super().stop()
  654. gevent.shutdown()
  655. def run(self, preload, stop):
  656. self.start()
  657. self.stop()
  658. class PreforkServer(CommonServer):
  659. """ Multiprocessing inspired by (g)unicorn.
  660. PreforkServer (aka Multicorn) currently uses accept(2) as dispatching
  661. method between workers but we plan to replace it by a more intelligent
  662. dispatcher to will parse the first HTTP request line.
  663. """
  664. def __init__(self, app):
  665. super().__init__(app)
  666. # config
  667. self.population = config['workers']
  668. self.timeout = config['limit_time_real']
  669. self.limit_request = config['limit_request']
  670. self.cron_timeout = config['limit_time_real_cron'] or None
  671. if self.cron_timeout == -1:
  672. self.cron_timeout = self.timeout
  673. # working vars
  674. self.beat = 4
  675. self.socket = None
  676. self.workers_http = {}
  677. self.workers_cron = {}
  678. self.workers = {}
  679. self.generation = 0
  680. self.queue = []
  681. self.long_polling_pid = None
  682. def pipe_new(self):
  683. pipe = os.pipe()
  684. for fd in pipe:
  685. # non_blocking
  686. flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
  687. fcntl.fcntl(fd, fcntl.F_SETFL, flags)
  688. # close_on_exec
  689. flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
  690. fcntl.fcntl(fd, fcntl.F_SETFD, flags)
  691. return pipe
  692. def pipe_ping(self, pipe):
  693. try:
  694. os.write(pipe[1], b'.')
  695. except IOError as e:
  696. if e.errno not in [errno.EAGAIN, errno.EINTR]:
  697. raise
  698. def signal_handler(self, sig, frame):
  699. if len(self.queue) < 5 or sig == signal.SIGCHLD:
  700. self.queue.append(sig)
  701. self.pipe_ping(self.pipe)
  702. else:
  703. _logger.warning("Dropping signal: %s", sig)
  704. def worker_spawn(self, klass, workers_registry):
  705. self.generation += 1
  706. worker = klass(self)
  707. pid = os.fork()
  708. if pid != 0:
  709. worker.pid = pid
  710. self.workers[pid] = worker
  711. workers_registry[pid] = worker
  712. return worker
  713. else:
  714. worker.run()
  715. sys.exit(0)
  716. def long_polling_spawn(self):
  717. nargs = stripped_sys_argv()
  718. cmd = [sys.executable, sys.argv[0], 'gevent'] + nargs[1:]
  719. popen = subprocess.Popen(cmd)
  720. self.long_polling_pid = popen.pid
  721. def worker_pop(self, pid):
  722. if pid == self.long_polling_pid:
  723. self.long_polling_pid = None
  724. if pid in self.workers:
  725. _logger.debug("Worker (%s) unregistered", pid)
  726. try:
  727. self.workers_http.pop(pid, None)
  728. self.workers_cron.pop(pid, None)
  729. u = self.workers.pop(pid)
  730. u.close()
  731. except OSError:
  732. return
  733. def worker_kill(self, pid, sig):
  734. try:
  735. os.kill(pid, sig)
  736. except OSError as e:
  737. if e.errno == errno.ESRCH:
  738. self.worker_pop(pid)
  739. def process_signals(self):
  740. while len(self.queue):
  741. sig = self.queue.pop(0)
  742. if sig in [signal.SIGINT, signal.SIGTERM]:
  743. raise KeyboardInterrupt
  744. elif sig == signal.SIGHUP:
  745. # restart on kill -HUP
  746. global server_phoenix # noqa: PLW0603
  747. server_phoenix = True
  748. raise KeyboardInterrupt
  749. elif sig == signal.SIGQUIT:
  750. # dump stacks on kill -3
  751. dumpstacks()
  752. elif sig == signal.SIGUSR1:
  753. # log ormcache stats on kill -SIGUSR1
  754. log_ormcache_stats()
  755. elif sig == signal.SIGTTIN:
  756. # increase number of workers
  757. self.population += 1
  758. elif sig == signal.SIGTTOU:
  759. # decrease number of workers
  760. self.population -= 1
  761. def process_zombie(self):
  762. # reap dead workers
  763. while 1:
  764. try:
  765. wpid, status = os.waitpid(-1, os.WNOHANG)
  766. if not wpid:
  767. break
  768. if (status >> 8) == 3:
  769. msg = "Critial worker error (%s)"
  770. _logger.critical(msg, wpid)
  771. raise Exception(msg % wpid)
  772. self.worker_pop(wpid)
  773. except OSError as e:
  774. if e.errno == errno.ECHILD:
  775. break
  776. raise
  777. def process_timeout(self):
  778. now = time.time()
  779. for (pid, worker) in self.workers.items():
  780. if worker.watchdog_timeout is not None and \
  781. (now - worker.watchdog_time) >= worker.watchdog_timeout:
  782. _logger.error("%s (%s) timeout after %ss",
  783. worker.__class__.__name__,
  784. pid,
  785. worker.watchdog_timeout)
  786. self.worker_kill(pid, signal.SIGKILL)
  787. def process_spawn(self):
  788. if config['http_enable']:
  789. while len(self.workers_http) < self.population:
  790. self.worker_spawn(WorkerHTTP, self.workers_http)
  791. if not self.long_polling_pid:
  792. self.long_polling_spawn()
  793. while len(self.workers_cron) < config['max_cron_threads']:
  794. self.worker_spawn(WorkerCron, self.workers_cron)
  795. def sleep(self):
  796. try:
  797. # map of fd -> worker
  798. fds = {w.watchdog_pipe[0]: w for w in self.workers.values()}
  799. fd_in = list(fds) + [self.pipe[0]]
  800. # check for ping or internal wakeups
  801. ready = select.select(fd_in, [], [], self.beat)
  802. # update worker watchdogs
  803. for fd in ready[0]:
  804. if fd in fds:
  805. fds[fd].watchdog_time = time.time()
  806. empty_pipe(fd)
  807. except select.error as e:
  808. if e.args[0] not in [errno.EINTR]:
  809. raise
  810. def start(self):
  811. # wakeup pipe, python doesn't throw EINTR when a syscall is interrupted
  812. # by a signal simulating a pseudo SA_RESTART. We write to a pipe in the
  813. # signal handler to overcome this behaviour
  814. self.pipe = self.pipe_new()
  815. # set signal handlers
  816. signal.signal(signal.SIGINT, self.signal_handler)
  817. signal.signal(signal.SIGTERM, self.signal_handler)
  818. signal.signal(signal.SIGHUP, self.signal_handler)
  819. signal.signal(signal.SIGCHLD, self.signal_handler)
  820. signal.signal(signal.SIGTTIN, self.signal_handler)
  821. signal.signal(signal.SIGTTOU, self.signal_handler)
  822. signal.signal(signal.SIGQUIT, dumpstacks)
  823. signal.signal(signal.SIGUSR1, log_ormcache_stats)
  824. if config['http_enable']:
  825. # listen to socket
  826. _logger.info('HTTP service (werkzeug) running on %s:%s', self.interface, self.port)
  827. family = socket.AF_INET
  828. if ':' in self.interface:
  829. family = socket.AF_INET6
  830. self.socket = socket.socket(family, socket.SOCK_STREAM)
  831. self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  832. self.socket.setblocking(0)
  833. self.socket.bind((self.interface, self.port))
  834. self.socket.listen(8 * self.population)
  835. def stop(self, graceful=True):
  836. if self.long_polling_pid is not None:
  837. # FIXME make longpolling process handle SIGTERM correctly
  838. self.worker_kill(self.long_polling_pid, signal.SIGKILL)
  839. self.long_polling_pid = None
  840. if self.socket:
  841. self.socket.close()
  842. if graceful:
  843. _logger.info("Stopping gracefully")
  844. super().stop()
  845. limit = time.time() + self.timeout
  846. for pid in self.workers:
  847. self.worker_kill(pid, signal.SIGINT)
  848. while self.workers and time.time() < limit:
  849. try:
  850. self.process_signals()
  851. except KeyboardInterrupt:
  852. _logger.info("Forced shutdown.")
  853. break
  854. self.process_zombie()
  855. time.sleep(0.1)
  856. else:
  857. _logger.info("Stopping forcefully")
  858. for pid in self.workers:
  859. self.worker_kill(pid, signal.SIGTERM)
  860. def run(self, preload, stop):
  861. self.start()
  862. rc = preload_registries(preload)
  863. if stop:
  864. self.stop()
  865. return rc
  866. # Empty the cursor pool, we dont want them to be shared among forked workers.
  867. odoo.sql_db.close_all()
  868. _logger.debug("Multiprocess starting")
  869. while 1:
  870. try:
  871. #_logger.debug("Multiprocess beat (%s)",time.time())
  872. self.process_signals()
  873. self.process_zombie()
  874. self.process_timeout()
  875. self.process_spawn()
  876. self.sleep()
  877. except KeyboardInterrupt:
  878. _logger.debug("Multiprocess clean stop")
  879. self.stop()
  880. break
  881. except Exception as e:
  882. _logger.exception(e)
  883. self.stop(False)
  884. return -1
  885. class Worker(object):
  886. """ Workers """
  887. def __init__(self, multi):
  888. self.multi = multi
  889. self.watchdog_time = time.time()
  890. self.watchdog_pipe = multi.pipe_new()
  891. self.eintr_pipe = multi.pipe_new()
  892. self.wakeup_fd_r, self.wakeup_fd_w = self.eintr_pipe
  893. # Can be set to None if no watchdog is desired.
  894. self.watchdog_timeout = multi.timeout
  895. self.ppid = os.getpid()
  896. self.pid = None
  897. self.alive = True
  898. # should we rename into lifetime ?
  899. self.request_max = multi.limit_request
  900. self.request_count = 0
  901. def setproctitle(self, title=""):
  902. setproctitle('odoo: %s %s %s' % (self.__class__.__name__, self.pid, title))
  903. def close(self):
  904. os.close(self.watchdog_pipe[0])
  905. os.close(self.watchdog_pipe[1])
  906. os.close(self.eintr_pipe[0])
  907. os.close(self.eintr_pipe[1])
  908. def signal_handler(self, sig, frame):
  909. self.alive = False
  910. def signal_time_expired_handler(self, n, stack):
  911. # TODO: print actual RUSAGE_SELF (since last check_limits) instead of
  912. # just repeating the config setting
  913. _logger.info('Worker (%d) CPU time limit (%s) reached.', self.pid, config['limit_time_cpu'])
  914. # We dont suicide in such case
  915. raise Exception('CPU time limit exceeded.')
  916. def sleep(self):
  917. try:
  918. select.select([self.multi.socket, self.wakeup_fd_r], [], [], self.multi.beat)
  919. # clear wakeup pipe if we were interrupted
  920. empty_pipe(self.wakeup_fd_r)
  921. except select.error as e:
  922. if e.args[0] not in [errno.EINTR]:
  923. raise
  924. def check_limits(self):
  925. # If our parent changed suicide
  926. if self.ppid != os.getppid():
  927. _logger.info("Worker (%s) Parent changed", self.pid)
  928. self.alive = False
  929. # check for lifetime
  930. if self.request_count >= self.request_max:
  931. _logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count)
  932. self.alive = False
  933. # Reset the worker if it consumes too much memory (e.g. caused by a memory leak).
  934. memory = memory_info(psutil.Process(os.getpid()))
  935. if config['limit_memory_soft'] and memory > config['limit_memory_soft']:
  936. _logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, memory)
  937. self.alive = False # Commit suicide after the request.
  938. set_limit_memory_hard()
  939. # update RLIMIT_CPU so limit_time_cpu applies per unit of work
  940. r = resource.getrusage(resource.RUSAGE_SELF)
  941. cpu_time = r.ru_utime + r.ru_stime
  942. soft, hard = resource.getrlimit(resource.RLIMIT_CPU)
  943. resource.setrlimit(resource.RLIMIT_CPU, (int(cpu_time + config['limit_time_cpu']), hard))
  944. def process_work(self):
  945. pass
  946. def start(self):
  947. self.pid = os.getpid()
  948. self.setproctitle()
  949. _logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid)
  950. # Reseed the random number generator
  951. random.seed()
  952. if self.multi.socket:
  953. # Prevent fd inheritance: close_on_exec
  954. flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
  955. fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags)
  956. # reset blocking status
  957. self.multi.socket.setblocking(0)
  958. signal.signal(signal.SIGINT, self.signal_handler)
  959. signal.signal(signal.SIGXCPU, self.signal_time_expired_handler)
  960. signal.signal(signal.SIGTERM, signal.SIG_DFL)
  961. signal.signal(signal.SIGHUP, signal.SIG_DFL)
  962. signal.signal(signal.SIGCHLD, signal.SIG_DFL)
  963. signal.signal(signal.SIGTTIN, signal.SIG_DFL)
  964. signal.signal(signal.SIGTTOU, signal.SIG_DFL)
  965. signal.set_wakeup_fd(self.wakeup_fd_w)
  966. def stop(self):
  967. pass
  968. def run(self):
  969. try:
  970. self.start()
  971. t = threading.Thread(name="Worker %s (%s) workthread" % (self.__class__.__name__, self.pid), target=self._runloop)
  972. t.daemon = True
  973. t.start()
  974. t.join()
  975. _logger.info("Worker (%s) exiting. request_count: %s, registry count: %s.",
  976. self.pid, self.request_count,
  977. len(odoo.modules.registry.Registry.registries))
  978. self.stop()
  979. except Exception:
  980. _logger.exception("Worker (%s) Exception occurred, exiting...", self.pid)
  981. # should we use 3 to abort everything ?
  982. sys.exit(1)
  983. def _runloop(self):
  984. signal.pthread_sigmask(signal.SIG_BLOCK, {
  985. signal.SIGXCPU,
  986. signal.SIGINT, signal.SIGQUIT, signal.SIGUSR1,
  987. })
  988. try:
  989. while self.alive:
  990. self.check_limits()
  991. self.multi.pipe_ping(self.watchdog_pipe)
  992. self.sleep()
  993. if not self.alive:
  994. break
  995. self.process_work()
  996. except:
  997. _logger.exception("Worker %s (%s) Exception occurred, exiting...", self.__class__.__name__, self.pid)
  998. sys.exit(1)
  999. class WorkerHTTP(Worker):
  1000. """ HTTP Request workers """
  1001. def __init__(self, multi):
  1002. super(WorkerHTTP, self).__init__(multi)
  1003. # The ODOO_HTTP_SOCKET_TIMEOUT environment variable allows to control socket timeout for
  1004. # extreme latency situations. It's generally better to use a good buffering reverse proxy
  1005. # to quickly free workers rather than increasing this timeout to accommodate high network
  1006. # latencies & b/w saturation. This timeout is also essential to protect against accidental
  1007. # DoS due to idle HTTP connections.
  1008. sock_timeout = os.environ.get("ODOO_HTTP_SOCKET_TIMEOUT")
  1009. self.sock_timeout = float(sock_timeout) if sock_timeout else 2
  1010. def process_request(self, client, addr):
  1011. client.setblocking(1)
  1012. client.settimeout(self.sock_timeout)
  1013. client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  1014. # Prevent fd inherientence close_on_exec
  1015. flags = fcntl.fcntl(client, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
  1016. fcntl.fcntl(client, fcntl.F_SETFD, flags)
  1017. # do request using BaseWSGIServerNoBind monkey patched with socket
  1018. self.server.socket = client
  1019. # tolerate broken pipe when the http client closes the socket before
  1020. # receiving the full reply
  1021. try:
  1022. self.server.process_request(client, addr)
  1023. except IOError as e:
  1024. if e.errno != errno.EPIPE:
  1025. raise
  1026. self.request_count += 1
  1027. def process_work(self):
  1028. try:
  1029. client, addr = self.multi.socket.accept()
  1030. self.process_request(client, addr)
  1031. except socket.error as e:
  1032. if e.errno not in (errno.EAGAIN, errno.ECONNABORTED):
  1033. raise
  1034. def start(self):
  1035. Worker.start(self)
  1036. self.server = BaseWSGIServerNoBind(self.multi.app)
  1037. class WorkerCron(Worker):
  1038. """ Cron workers """
  1039. def __init__(self, multi):
  1040. super(WorkerCron, self).__init__(multi)
  1041. self.alive_time = time.monotonic()
  1042. # process_work() below process a single database per call.
  1043. # The variable db_index is keeping track of the next database to
  1044. # process.
  1045. self.db_index = 0
  1046. self.watchdog_timeout = multi.cron_timeout # Use a distinct value for CRON Worker
  1047. def sleep(self):
  1048. # Really sleep once all the databases have been processed.
  1049. if self.db_index == 0:
  1050. interval = SLEEP_INTERVAL + self.pid % 10 # chorus effect
  1051. # simulate interruptible sleep with select(wakeup_fd, timeout)
  1052. try:
  1053. select.select([self.wakeup_fd_r, self.dbcursor._cnx], [], [], interval)
  1054. # clear pg_conn/wakeup pipe if we were interrupted
  1055. time.sleep(self.pid / 100 % .1)
  1056. self.dbcursor._cnx.poll()
  1057. empty_pipe(self.wakeup_fd_r)
  1058. except select.error as e:
  1059. if e.args[0] != errno.EINTR:
  1060. raise
  1061. def check_limits(self):
  1062. super().check_limits()
  1063. if config['limit_time_worker_cron'] > 0 and (time.monotonic() - self.alive_time) > config['limit_time_worker_cron']:
  1064. _logger.info('WorkerCron (%s) max age (%ss) reached.', self.pid, config['limit_time_worker_cron'])
  1065. self.alive = False
  1066. def _db_list(self):
  1067. if config['db_name']:
  1068. db_names = config['db_name'].split(',')
  1069. else:
  1070. db_names = odoo.service.db.list_dbs(True)
  1071. return db_names
  1072. def process_work(self):
  1073. _logger.debug("WorkerCron (%s) polling for jobs", self.pid)
  1074. db_names = self._db_list()
  1075. if len(db_names):
  1076. self.db_index = (self.db_index + 1) % len(db_names)
  1077. db_name = db_names[self.db_index]
  1078. self.setproctitle(db_name)
  1079. from odoo.addons import base
  1080. base.models.ir_cron.ir_cron._process_jobs(db_name)
  1081. # dont keep cursors in multi database mode
  1082. if len(db_names) > 1:
  1083. odoo.sql_db.close_db(db_name)
  1084. self.request_count += 1
  1085. if self.request_count >= self.request_max and self.request_max < len(db_names):
  1086. _logger.error("There are more dabatases to process than allowed "
  1087. "by the `limit_request` configuration variable: %s more.",
  1088. len(db_names) - self.request_max)
  1089. else:
  1090. self.db_index = 0
  1091. def start(self):
  1092. os.nice(10) # mommy always told me to be nice with others...
  1093. Worker.start(self)
  1094. if self.multi.socket:
  1095. self.multi.socket.close()
  1096. dbconn = odoo.sql_db.db_connect('postgres')
  1097. self.dbcursor = dbconn.cursor()
  1098. # LISTEN / NOTIFY doesn't work in recovery mode
  1099. self.dbcursor.execute("SELECT pg_is_in_recovery()")
  1100. in_recovery = self.dbcursor.fetchone()[0]
  1101. if not in_recovery:
  1102. self.dbcursor.execute("LISTEN cron_trigger")
  1103. else:
  1104. _logger.warning("PG cluster in recovery mode, cron trigger not activated")
  1105. self.dbcursor.commit()
  1106. def stop(self):
  1107. super().stop()
  1108. self.dbcursor.close()
  1109. #----------------------------------------------------------
  1110. # start/stop public api
  1111. #----------------------------------------------------------
  1112. server = None
  1113. server_phoenix = False
  1114. def load_server_wide_modules():
  1115. server_wide_modules = {'base', 'web'} | set(odoo.conf.server_wide_modules)
  1116. for m in server_wide_modules:
  1117. try:
  1118. odoo.modules.module.load_openerp_module(m)
  1119. except Exception:
  1120. msg = ''
  1121. if m == 'web':
  1122. msg = """
  1123. The `web` module is provided by the addons found in the `openerp-web` project.
  1124. Maybe you forgot to add those addons in your addons_path configuration."""
  1125. _logger.exception('Failed to load server-wide module `%s`.%s', m, msg)
  1126. def _reexec(updated_modules=None):
  1127. """reexecute openerp-server process with (nearly) the same arguments"""
  1128. if odoo.tools.osutil.is_running_as_nt_service():
  1129. subprocess.call('net stop {0} && net start {0}'.format(nt_service_name), shell=True)
  1130. exe = os.path.basename(sys.executable)
  1131. args = stripped_sys_argv()
  1132. if updated_modules:
  1133. args += ["-u", ','.join(updated_modules)]
  1134. if not args or args[0] != exe:
  1135. args.insert(0, exe)
  1136. # We should keep the LISTEN_* environment variabled in order to support socket activation on reexec
  1137. os.execve(sys.executable, args, os.environ)
  1138. def load_test_file_py(registry, test_file):
  1139. # pylint: disable=import-outside-toplevel
  1140. from odoo.tests import loader # noqa: PLC0415
  1141. from odoo.tests.suite import OdooSuite # noqa: PLC0415
  1142. threading.current_thread().testing = True
  1143. try:
  1144. test_path, _ = os.path.splitext(os.path.abspath(test_file))
  1145. for mod in [m for m in get_modules() if '%s%s%s' % (os.path.sep, m, os.path.sep) in test_file]:
  1146. for mod_mod in loader.get_test_modules(mod):
  1147. mod_path, _ = os.path.splitext(getattr(mod_mod, '__file__', ''))
  1148. if test_path == config._normalize(mod_path):
  1149. tests = loader.get_module_test_cases(mod_mod)
  1150. suite = OdooSuite(tests)
  1151. _logger.log(logging.INFO, 'running tests %s.', mod_mod.__name__)
  1152. suite(registry._assertion_report)
  1153. if not registry._assertion_report.wasSuccessful():
  1154. _logger.error('%s: at least one error occurred in a test', test_file)
  1155. return
  1156. finally:
  1157. threading.current_thread().testing = False
  1158. def preload_registries(dbnames):
  1159. """ Preload a registries, possibly run a test file."""
  1160. # TODO: move all config checks to args dont check tools.config here
  1161. dbnames = dbnames or []
  1162. rc = 0
  1163. for dbname in dbnames:
  1164. try:
  1165. update_module = config['init'] or config['update']
  1166. threading.current_thread().dbname = dbname
  1167. registry = Registry.new(dbname, update_module=update_module)
  1168. # run test_file if provided
  1169. if config['test_file']:
  1170. test_file = config['test_file']
  1171. if not os.path.isfile(test_file):
  1172. _logger.warning('test file %s cannot be found', test_file)
  1173. elif not test_file.endswith('py'):
  1174. _logger.warning('test file %s is not a python file', test_file)
  1175. else:
  1176. _logger.info('loading test file %s', test_file)
  1177. load_test_file_py(registry, test_file)
  1178. # run post-install tests
  1179. if config['test_enable']:
  1180. from odoo.tests import loader # noqa: PLC0415
  1181. t0 = time.time()
  1182. t0_sql = odoo.sql_db.sql_counter
  1183. module_names = (registry.updated_modules if update_module else
  1184. sorted(registry._init_modules))
  1185. _logger.info("Starting post tests")
  1186. tests_before = registry._assertion_report.testsRun
  1187. post_install_suite = loader.make_suite(module_names, 'post_install')
  1188. if post_install_suite.has_http_case():
  1189. with registry.cursor() as cr:
  1190. env = odoo.api.Environment(cr, odoo.SUPERUSER_ID, {})
  1191. env['ir.qweb']._pregenerate_assets_bundles()
  1192. result = loader.run_suite(post_install_suite)
  1193. registry._assertion_report.update(result)
  1194. _logger.info("%d post-tests in %.2fs, %s queries",
  1195. registry._assertion_report.testsRun - tests_before,
  1196. time.time() - t0,
  1197. odoo.sql_db.sql_counter - t0_sql)
  1198. registry._assertion_report.log_stats()
  1199. if registry._assertion_report and not registry._assertion_report.wasSuccessful():
  1200. rc += 1
  1201. except Exception:
  1202. _logger.critical('Failed to initialize database `%s`.', dbname, exc_info=True)
  1203. return -1
  1204. return rc
  1205. def start(preload=None, stop=False):
  1206. """ Start the odoo http server and cron processor.
  1207. """
  1208. global server
  1209. load_server_wide_modules()
  1210. if odoo.evented:
  1211. server = GeventServer(odoo.http.root)
  1212. elif config['workers']:
  1213. if config['test_enable'] or config['test_file']:
  1214. _logger.warning("Unit testing in workers mode could fail; use --workers 0.")
  1215. server = PreforkServer(odoo.http.root)
  1216. else:
  1217. if platform.system() == "Linux" and sys.maxsize > 2**32 and "MALLOC_ARENA_MAX" not in os.environ:
  1218. # glibc's malloc() uses arenas [1] in order to efficiently handle memory allocation of multi-threaded
  1219. # applications. This allows better memory allocation handling in case of multiple threads that
  1220. # would be using malloc() concurrently [2].
  1221. # Due to the python's GIL, this optimization have no effect on multithreaded python programs.
  1222. # Unfortunately, a downside of creating one arena per cpu core is the increase of virtual memory
  1223. # which Odoo is based upon in order to limit the memory usage for threaded workers.
  1224. # On 32bit systems the default size of an arena is 512K while on 64bit systems it's 64M [3],
  1225. # hence a threaded worker will quickly reach it's default memory soft limit upon concurrent requests.
  1226. # We therefore set the maximum arenas allowed to 2 unless the MALLOC_ARENA_MAX env variable is set.
  1227. # Note: Setting MALLOC_ARENA_MAX=0 allow to explicitly set the default glibs's malloc() behaviour.
  1228. #
  1229. # [1] https://sourceware.org/glibc/wiki/MallocInternals#Arenas_and_Heaps
  1230. # [2] https://www.gnu.org/software/libc/manual/html_node/The-GNU-Allocator.html
  1231. # [3] https://sourceware.org/git/?p=glibc.git;a=blob;f=malloc/malloc.c;h=00ce48c;hb=0a8262a#l862
  1232. try:
  1233. import ctypes
  1234. libc = ctypes.CDLL("libc.so.6")
  1235. M_ARENA_MAX = -8
  1236. assert libc.mallopt(ctypes.c_int(M_ARENA_MAX), ctypes.c_int(2))
  1237. except Exception:
  1238. _logger.warning("Could not set ARENA_MAX through mallopt()")
  1239. server = ThreadedServer(odoo.http.root)
  1240. watcher = None
  1241. if 'reload' in config['dev_mode'] and not odoo.evented:
  1242. if inotify:
  1243. watcher = FSWatcherInotify()
  1244. watcher.start()
  1245. elif watchdog:
  1246. watcher = FSWatcherWatchdog()
  1247. watcher.start()
  1248. else:
  1249. if os.name == 'posix' and platform.system() != 'Darwin':
  1250. module = 'inotify'
  1251. else:
  1252. module = 'watchdog'
  1253. _logger.warning("'%s' module not installed. Code autoreload feature is disabled", module)
  1254. rc = server.run(preload, stop)
  1255. if watcher:
  1256. watcher.stop()
  1257. # like the legend of the phoenix, all ends with beginnings
  1258. if server_phoenix:
  1259. _reexec()
  1260. return rc if rc else 0
  1261. def restart():
  1262. """ Restart the server
  1263. """
  1264. if os.name == 'nt':
  1265. # run in a thread to let the current thread return response to the caller.
  1266. threading.Thread(target=_reexec).start()
  1267. else:
  1268. os.kill(server.pid, signal.SIGHUP)
上海开阖软件有限公司 沪ICP备12045867号-1