gooderp18绿色标准版
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

1050 lines
40KB

  1. import base64
  2. import bisect
  3. import functools
  4. import hashlib
  5. import json
  6. import logging
  7. import os
  8. import psycopg2
  9. import random
  10. import socket
  11. import struct
  12. import selectors
  13. import threading
  14. import time
  15. from collections import defaultdict, deque
  16. from contextlib import closing, suppress
  17. from enum import IntEnum
  18. from psycopg2.pool import PoolError
  19. from urllib.parse import urlparse
  20. from weakref import WeakSet
  21. from werkzeug.local import LocalStack
  22. from werkzeug.datastructures import ImmutableMultiDict, MultiDict
  23. from werkzeug.exceptions import BadRequest, HTTPException, ServiceUnavailable
  24. import odoo
  25. from odoo import api, modules
  26. from .models.bus import dispatch
  27. from odoo.http import root, Request, Response, SessionExpiredException, get_default_session
  28. from odoo.modules.registry import Registry
  29. from odoo.service import model as service_model
  30. from odoo.service.server import CommonServer
  31. from odoo.service.security import check_session
  32. from odoo.tools import config, lazy_property
  33. _logger = logging.getLogger(__name__)
  34. MAX_TRY_ON_POOL_ERROR = 10
  35. DELAY_ON_POOL_ERROR = 0.03
  36. def acquire_cursor(db):
  37. """ Try to acquire a cursor up to `MAX_TRY_ON_POOL_ERROR` """
  38. for tryno in range(1, MAX_TRY_ON_POOL_ERROR + 1):
  39. with suppress(PoolError):
  40. return Registry(db).cursor()
  41. time.sleep(random.uniform(DELAY_ON_POOL_ERROR, DELAY_ON_POOL_ERROR * tryno))
  42. raise PoolError('Failed to acquire cursor after %s retries' % MAX_TRY_ON_POOL_ERROR)
  43. # ------------------------------------------------------
  44. # EXCEPTIONS
  45. # ------------------------------------------------------
  46. class UpgradeRequired(HTTPException):
  47. code = 426
  48. description = "Wrong websocket version was given during the handshake"
  49. def get_headers(self, environ=None):
  50. headers = super().get_headers(environ)
  51. headers.append((
  52. 'Sec-WebSocket-Version',
  53. '; '.join(WebsocketConnectionHandler.SUPPORTED_VERSIONS)
  54. ))
  55. return headers
  56. class WebsocketException(Exception):
  57. """ Base class for all websockets exceptions """
  58. class ConnectionClosed(WebsocketException):
  59. """
  60. Raised when the other end closes the socket without performing
  61. the closing handshake.
  62. """
  63. class InvalidCloseCodeException(WebsocketException):
  64. def __init__(self, code):
  65. super().__init__(f"Invalid close code: {code}")
  66. class InvalidDatabaseException(WebsocketException):
  67. """
  68. When raised: the database probably does not exists anymore, the
  69. database is corrupted or the database version doesn't match the
  70. server version.
  71. """
  72. class InvalidStateException(WebsocketException):
  73. """
  74. Raised when an operation is forbidden in the current state.
  75. """
  76. class InvalidWebsocketRequest(WebsocketException):
  77. """
  78. Raised when a websocket request is invalid (format, wrong args).
  79. """
  80. class PayloadTooLargeException(WebsocketException):
  81. """
  82. Raised when a websocket message is too large.
  83. """
  84. class ProtocolError(WebsocketException):
  85. """
  86. Raised when a frame format doesn't match expectations.
  87. """
  88. class RateLimitExceededException(Exception):
  89. """
  90. Raised when a client exceeds the number of request in a given
  91. time.
  92. """
  93. # ------------------------------------------------------
  94. # WEBSOCKET LIFECYCLE
  95. # ------------------------------------------------------
  96. class LifecycleEvent(IntEnum):
  97. OPEN = 0
  98. CLOSE = 1
  99. # ------------------------------------------------------
  100. # WEBSOCKET
  101. # ------------------------------------------------------
  102. class Opcode(IntEnum):
  103. CONTINUE = 0x00
  104. TEXT = 0x01
  105. BINARY = 0x02
  106. CLOSE = 0x08
  107. PING = 0x09
  108. PONG = 0x0A
  109. class CloseCode(IntEnum):
  110. CLEAN = 1000
  111. GOING_AWAY = 1001
  112. PROTOCOL_ERROR = 1002
  113. INCORRECT_DATA = 1003
  114. ABNORMAL_CLOSURE = 1006
  115. INCONSISTENT_DATA = 1007
  116. MESSAGE_VIOLATING_POLICY = 1008
  117. MESSAGE_TOO_BIG = 1009
  118. EXTENSION_NEGOTIATION_FAILED = 1010
  119. SERVER_ERROR = 1011
  120. RESTART = 1012
  121. TRY_LATER = 1013
  122. BAD_GATEWAY = 1014
  123. SESSION_EXPIRED = 4001
  124. KEEP_ALIVE_TIMEOUT = 4002
  125. class ConnectionState(IntEnum):
  126. OPEN = 0
  127. CLOSING = 1
  128. CLOSED = 2
  129. DATA_OP = {Opcode.TEXT, Opcode.BINARY}
  130. CTRL_OP = {Opcode.CLOSE, Opcode.PING, Opcode.PONG}
  131. HEARTBEAT_OP = {Opcode.PING, Opcode.PONG}
  132. VALID_CLOSE_CODES = {
  133. code for code in CloseCode if code is not CloseCode.ABNORMAL_CLOSURE
  134. }
  135. CLEAN_CLOSE_CODES = {CloseCode.CLEAN, CloseCode.GOING_AWAY, CloseCode.RESTART}
  136. RESERVED_CLOSE_CODES = range(3000, 5000)
  137. _XOR_TABLE = [bytes(a ^ b for a in range(256)) for b in range(256)]
  138. class Frame:
  139. def __init__(
  140. self,
  141. opcode,
  142. payload=b'',
  143. fin=True,
  144. rsv1=False,
  145. rsv2=False,
  146. rsv3=False
  147. ):
  148. self.opcode = opcode
  149. self.payload = payload
  150. self.fin = fin
  151. self.rsv1 = rsv1
  152. self.rsv2 = rsv2
  153. self.rsv3 = rsv3
  154. class CloseFrame(Frame):
  155. def __init__(self, code, reason):
  156. if code not in VALID_CLOSE_CODES and code not in RESERVED_CLOSE_CODES:
  157. raise InvalidCloseCodeException(code)
  158. payload = struct.pack('!H', code)
  159. if reason:
  160. payload += reason.encode('utf-8')
  161. self.code = code
  162. self.reason = reason
  163. super().__init__(Opcode.CLOSE, payload)
  164. _websocket_instances = WeakSet()
  165. class Websocket:
  166. __event_callbacks = defaultdict(set)
  167. # Maximum size for a message in bytes, whether it is sent as one
  168. # frame or many fragmented ones.
  169. MESSAGE_MAX_SIZE = 2 ** 20
  170. # Proxies usually close a connection after 1 minute of inactivity.
  171. # Therefore, a PING frame have to be sent if no frame is either sent
  172. # or received within CONNECTION_TIMEOUT - 15 seconds.
  173. CONNECTION_TIMEOUT = 60
  174. INACTIVITY_TIMEOUT = CONNECTION_TIMEOUT - 15
  175. # How much time (in second) the history of last dispatched notifications is
  176. # kept in memory for each websocket.
  177. # To avoid duplicate notifications, we fetch them based on their ids.
  178. # However during parallel transactions, ids are assigned immediately (when
  179. # they are requested), but the notifications are dispatched at the time of
  180. # the commit. This means lower id notifications might be dispatched after
  181. # higher id notifications.
  182. # Simply incrementing the last id is sufficient to guarantee no duplicates,
  183. # but it is not sufficient to guarantee all notifications are dispatched,
  184. # and in particular not sufficient for those with a lower id coming after a
  185. # higher id was dispatched.
  186. # To solve the issue of missed notifications, the lowest id, stored in
  187. # ``_last_notif_sent_id``, is held back by a few seconds to give time for
  188. # concurrent transactions to finish. To avoid dispatching duplicate
  189. # notifications, the history of already dispatched notifications during this
  190. # period is kept in memory in ``_notif_history`` and the corresponding
  191. # notifications are discarded from subsequent dispatching even if their id
  192. # is higher than ``_last_notif_sent_id``.
  193. # In practice, what is important functionally is the time between the create
  194. # of the notification and the commit of the transaction in business code.
  195. # If this time exceeds this threshold, the notification will never be
  196. # dispatched if the target user receive any other notification in the
  197. # meantime.
  198. # Transactions known to be long should therefore create their notifications
  199. # at the end, as close as possible to their commit.
  200. MAX_NOTIFICATION_HISTORY_SEC = 10
  201. # How many requests can be made in excess of the given rate.
  202. RL_BURST = int(config['websocket_rate_limit_burst'])
  203. # How many seconds between each request.
  204. RL_DELAY = float(config['websocket_rate_limit_delay'])
  205. def __init__(self, sock, session, cookies):
  206. # Session linked to the current websocket connection.
  207. self._session = session
  208. # Cookies linked to the current websocket connection.
  209. self._cookies = cookies
  210. self._db = session.db
  211. self.__socket = sock
  212. self._close_sent = False
  213. self._close_received = False
  214. self._timeout_manager = TimeoutManager()
  215. # Used for rate limiting.
  216. self._incoming_frame_timestamps = deque(maxlen=self.RL_BURST)
  217. # Used to notify the websocket that bus notifications are
  218. # available.
  219. self.__notif_sock_w, self.__notif_sock_r = socket.socketpair()
  220. self._channels = set()
  221. # For ``_last_notif_sent_id and ``_notif_history``, see
  222. # ``MAX_NOTIFICATION_HISTORY_SEC`` for more details.
  223. # id of the last sent notification that is no longer in _notif_history
  224. self._last_notif_sent_id = 0
  225. # history of last sent notifications in the format (notif_id, send_time)
  226. # always sorted by notif_id ASC
  227. self._notif_history = []
  228. # Websocket start up
  229. self.__selector = (
  230. selectors.PollSelector()
  231. if odoo.evented and hasattr(selectors, 'PollSelector')
  232. else selectors.DefaultSelector()
  233. )
  234. self.__selector.register(self.__socket, selectors.EVENT_READ)
  235. self.__selector.register(self.__notif_sock_r, selectors.EVENT_READ)
  236. self.state = ConnectionState.OPEN
  237. _websocket_instances.add(self)
  238. self._trigger_lifecycle_event(LifecycleEvent.OPEN)
  239. # ------------------------------------------------------
  240. # PUBLIC METHODS
  241. # ------------------------------------------------------
  242. def get_messages(self):
  243. while self.state is not ConnectionState.CLOSED:
  244. try:
  245. readables = {
  246. selector_key[0].fileobj for selector_key in
  247. self.__selector.select(self.INACTIVITY_TIMEOUT)
  248. }
  249. if self._timeout_manager.has_timed_out() and self.state is ConnectionState.OPEN:
  250. self.disconnect(
  251. CloseCode.ABNORMAL_CLOSURE
  252. if self._timeout_manager.timeout_reason is TimeoutReason.NO_RESPONSE
  253. else CloseCode.KEEP_ALIVE_TIMEOUT
  254. )
  255. continue
  256. if not readables:
  257. self._send_ping_frame()
  258. continue
  259. if self.__notif_sock_r in readables:
  260. self._dispatch_bus_notifications()
  261. if self.__socket in readables:
  262. message = self._process_next_message()
  263. if message is not None:
  264. yield message
  265. except Exception as exc:
  266. self._handle_transport_error(exc)
  267. def disconnect(self, code, reason=None):
  268. """
  269. Initiate the closing handshake that is, send a close frame
  270. to the other end which will then send us back an
  271. acknowledgment. Upon the reception of this acknowledgment,
  272. the `_terminate` method will be called to perform an
  273. orderly shutdown. Note that we don't need to wait for the
  274. acknowledgment if the connection was failed beforewards.
  275. """
  276. if code is not CloseCode.ABNORMAL_CLOSURE:
  277. self._send_close_frame(code, reason)
  278. else:
  279. self._terminate()
  280. @classmethod
  281. def onopen(cls, func):
  282. cls.__event_callbacks[LifecycleEvent.OPEN].add(func)
  283. return func
  284. @classmethod
  285. def onclose(cls, func):
  286. cls.__event_callbacks[LifecycleEvent.CLOSE].add(func)
  287. return func
  288. def subscribe(self, channels, last):
  289. """ Subscribe to bus channels. """
  290. self._channels = channels
  291. # Only assign the last id according to the client once: the server is
  292. # more reliable later on, see ``MAX_NOTIFICATION_HISTORY_SEC``.
  293. if self._last_notif_sent_id == 0:
  294. self._last_notif_sent_id = last
  295. # Dispatch past notifications if there are any.
  296. self.trigger_notification_dispatching()
  297. def trigger_notification_dispatching(self):
  298. """
  299. Warn the socket that notifications are available. Ignore if a
  300. dispatch is already planned or if the socket is already in the
  301. closing state.
  302. """
  303. if self.state is not ConnectionState.OPEN:
  304. return
  305. readables = {
  306. selector_key[0].fileobj for selector_key in
  307. self.__selector.select(0)
  308. }
  309. if self.__notif_sock_r not in readables:
  310. # Send a random bit to mark the socket as readable.
  311. self.__notif_sock_w.send(b'x')
  312. # ------------------------------------------------------
  313. # PRIVATE METHODS
  314. # ------------------------------------------------------
  315. def _get_next_frame(self):
  316. # 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
  317. # +-+-+-+-+-------+-+-------------+-------------------------------+
  318. # |F|R|R|R| opcode|M| Payload len | Extended payload length |
  319. # |I|S|S|S| (4) |A| (7) | (16/64) |
  320. # |N|V|V|V| |S| | (if payload len==126/127) |
  321. # | |1|2|3| |K| | |
  322. # +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
  323. # | Extended payload length continued, if payload len == 127 |
  324. # + - - - - - - - - - - - - - - - +-------------------------------+
  325. # | |Masking-key, if MASK set to 1 |
  326. # +-------------------------------+-------------------------------+
  327. # | Masking-key (continued) | Payload Data |
  328. # +-------------------------------- - - - - - - - - - - - - - - - +
  329. # : Payload Data continued ... :
  330. # + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
  331. # | Payload Data continued ... |
  332. # +---------------------------------------------------------------+
  333. def recv_bytes(n):
  334. """ Pull n bytes from the socket """
  335. data = bytearray()
  336. while len(data) < n:
  337. received_data = self.__socket.recv(n - len(data))
  338. if not received_data:
  339. raise ConnectionClosed()
  340. data.extend(received_data)
  341. return data
  342. def is_bit_set(byte, n):
  343. """
  344. Check whether nth bit of byte is set or not (from left
  345. to right).
  346. """
  347. return byte & (1 << (7 - n))
  348. def apply_mask(payload, mask):
  349. # see: https://www.willmcgugan.com/blog/tech/post/speeding-up-websockets-60x/
  350. a, b, c, d = (_XOR_TABLE[n] for n in mask)
  351. payload[::4] = payload[::4].translate(a)
  352. payload[1::4] = payload[1::4].translate(b)
  353. payload[2::4] = payload[2::4].translate(c)
  354. payload[3::4] = payload[3::4].translate(d)
  355. return payload
  356. self._limit_rate()
  357. first_byte, second_byte = recv_bytes(2)
  358. fin, rsv1, rsv2, rsv3 = (is_bit_set(first_byte, n) for n in range(4))
  359. try:
  360. opcode = Opcode(first_byte & 0b00001111)
  361. except ValueError as exc:
  362. raise ProtocolError(exc)
  363. payload_length = second_byte & 0b01111111
  364. if rsv1 or rsv2 or rsv3:
  365. raise ProtocolError("Reserved bits must be unset")
  366. if not is_bit_set(second_byte, 0):
  367. raise ProtocolError("Frame must be masked")
  368. if opcode in CTRL_OP:
  369. if not fin:
  370. raise ProtocolError("Control frames cannot be fragmented")
  371. if payload_length > 125:
  372. raise ProtocolError(
  373. "Control frames payload must be smaller than 126"
  374. )
  375. if payload_length == 126:
  376. payload_length = struct.unpack('!H', recv_bytes(2))[0]
  377. elif payload_length == 127:
  378. payload_length = struct.unpack('!Q', recv_bytes(8))[0]
  379. if payload_length > self.MESSAGE_MAX_SIZE:
  380. raise PayloadTooLargeException()
  381. mask = recv_bytes(4)
  382. payload = apply_mask(recv_bytes(payload_length), mask)
  383. frame = Frame(opcode, bytes(payload), fin, rsv1, rsv2, rsv3)
  384. self._timeout_manager.acknowledge_frame_receipt(frame)
  385. return frame
  386. def _process_next_message(self):
  387. """
  388. Process the next message coming throught the socket. If a
  389. data message can be extracted, return its decoded payload.
  390. As per the RFC, only control frames will be processed once
  391. the connection reaches the closing state.
  392. """
  393. frame = self._get_next_frame()
  394. if frame.opcode in CTRL_OP:
  395. self._handle_control_frame(frame)
  396. return
  397. if self.state is not ConnectionState.OPEN:
  398. # After receiving a control frame indicating the connection
  399. # should be closed, a peer discards any further data
  400. # received.
  401. return
  402. if frame.opcode is Opcode.CONTINUE:
  403. raise ProtocolError("Unexpected continuation frame")
  404. message = frame.payload
  405. if not frame.fin:
  406. message = self._recover_fragmented_message(frame)
  407. return (
  408. message.decode('utf-8')
  409. if message is not None and frame.opcode is Opcode.TEXT else message
  410. )
  411. def _recover_fragmented_message(self, initial_frame):
  412. message_fragments = bytearray(initial_frame.payload)
  413. while True:
  414. frame = self._get_next_frame()
  415. if frame.opcode in CTRL_OP:
  416. # Control frames can be received in the middle of a
  417. # fragmented message, process them as soon as possible.
  418. self._handle_control_frame(frame)
  419. if self.state is not ConnectionState.OPEN:
  420. return
  421. continue
  422. if frame.opcode is not Opcode.CONTINUE:
  423. raise ProtocolError("A continuation frame was expected")
  424. message_fragments.extend(frame.payload)
  425. if len(message_fragments) > self.MESSAGE_MAX_SIZE:
  426. raise PayloadTooLargeException()
  427. if frame.fin:
  428. return bytes(message_fragments)
  429. def _send(self, message):
  430. if self.state is not ConnectionState.OPEN:
  431. raise InvalidStateException(
  432. "Trying to send a frame on a closed socket"
  433. )
  434. opcode = Opcode.BINARY
  435. if not isinstance(message, (bytes, bytearray)):
  436. opcode = Opcode.TEXT
  437. self._send_frame(Frame(opcode, message))
  438. def _send_frame(self, frame):
  439. if frame.opcode in CTRL_OP and len(frame.payload) > 125:
  440. raise ProtocolError(
  441. "Control frames should have a payload length smaller than 126"
  442. )
  443. if isinstance(frame.payload, str):
  444. frame.payload = frame.payload.encode('utf-8')
  445. elif not isinstance(frame.payload, (bytes, bytearray)):
  446. frame.payload = json.dumps(frame.payload).encode('utf-8')
  447. output = bytearray()
  448. first_byte = (
  449. (0b10000000 if frame.fin else 0)
  450. | (0b01000000 if frame.rsv1 else 0)
  451. | (0b00100000 if frame.rsv2 else 0)
  452. | (0b00010000 if frame.rsv3 else 0)
  453. | frame.opcode
  454. )
  455. payload_length = len(frame.payload)
  456. if payload_length < 126:
  457. output.extend(
  458. struct.pack('!BB', first_byte, payload_length)
  459. )
  460. elif payload_length < 65536:
  461. output.extend(
  462. struct.pack('!BBH', first_byte, 126, payload_length)
  463. )
  464. else:
  465. output.extend(
  466. struct.pack('!BBQ', first_byte, 127, payload_length)
  467. )
  468. output.extend(frame.payload)
  469. self.__socket.sendall(output)
  470. self._timeout_manager.acknowledge_frame_sent(frame)
  471. if not isinstance(frame, CloseFrame):
  472. return
  473. self.state = ConnectionState.CLOSING
  474. self._close_sent = True
  475. if frame.code not in CLEAN_CLOSE_CODES or self._close_received:
  476. return self._terminate()
  477. # After sending a control frame indicating the connection
  478. # should be closed, a peer does not send any further data.
  479. self.__selector.unregister(self.__notif_sock_r)
  480. def _send_close_frame(self, code, reason=None):
  481. """ Send a close frame. """
  482. self._send_frame(CloseFrame(code, reason))
  483. def _send_ping_frame(self):
  484. """ Send a ping frame """
  485. self._send_frame(Frame(Opcode.PING))
  486. def _send_pong_frame(self, payload):
  487. """ Send a pong frame """
  488. self._send_frame(Frame(Opcode.PONG, payload))
  489. def _terminate(self):
  490. """ Close the underlying TCP socket. """
  491. with suppress(OSError, TimeoutError):
  492. self.__socket.shutdown(socket.SHUT_WR)
  493. # Call recv until obtaining a return value of 0 indicating
  494. # the other end has performed an orderly shutdown. A timeout
  495. # is set to ensure the connection will be closed even if
  496. # the other end does not close the socket properly.
  497. self.__socket.settimeout(1)
  498. while self.__socket.recv(4096):
  499. pass
  500. self.__selector.unregister(self.__socket)
  501. self.__selector.close()
  502. self.__socket.close()
  503. self.state = ConnectionState.CLOSED
  504. dispatch.unsubscribe(self)
  505. self._trigger_lifecycle_event(LifecycleEvent.CLOSE)
  506. with acquire_cursor(self._db) as cr:
  507. env = api.Environment(cr, self._session.uid, self._session.context)
  508. env["ir.websocket"]._on_websocket_closed(self._cookies)
  509. def _handle_control_frame(self, frame):
  510. if frame.opcode is Opcode.PING:
  511. self._send_pong_frame(frame.payload)
  512. elif frame.opcode is Opcode.CLOSE:
  513. self.state = ConnectionState.CLOSING
  514. self._close_received = True
  515. code, reason = CloseCode.CLEAN, None
  516. if len(frame.payload) >= 2:
  517. code = struct.unpack('!H', frame.payload[:2])[0]
  518. reason = frame.payload[2:].decode('utf-8')
  519. elif frame.payload:
  520. raise ProtocolError("Malformed closing frame")
  521. if not self._close_sent:
  522. self._send_close_frame(code, reason)
  523. else:
  524. self._terminate()
  525. def _handle_transport_error(self, exc):
  526. """
  527. Find out which close code should be sent according to given
  528. exception and call `self.disconnect` in order to close the
  529. connection cleanly.
  530. """
  531. code, reason = CloseCode.SERVER_ERROR, str(exc)
  532. if isinstance(exc, (ConnectionClosed, OSError)):
  533. code = CloseCode.ABNORMAL_CLOSURE
  534. elif isinstance(exc, (ProtocolError, InvalidCloseCodeException)):
  535. code = CloseCode.PROTOCOL_ERROR
  536. elif isinstance(exc, UnicodeDecodeError):
  537. code = CloseCode.INCONSISTENT_DATA
  538. elif isinstance(exc, PayloadTooLargeException):
  539. code = CloseCode.MESSAGE_TOO_BIG
  540. elif isinstance(exc, (PoolError, RateLimitExceededException)):
  541. code = CloseCode.TRY_LATER
  542. elif isinstance(exc, SessionExpiredException):
  543. code = CloseCode.SESSION_EXPIRED
  544. if code is CloseCode.SERVER_ERROR:
  545. reason = None
  546. registry = Registry(self._session.db)
  547. sequence = registry.registry_sequence
  548. registry = registry.check_signaling()
  549. if sequence != registry.registry_sequence:
  550. _logger.warning("Bus operation aborted; registry has been reloaded")
  551. else:
  552. _logger.error(exc, exc_info=True)
  553. self.disconnect(code, reason)
  554. def _limit_rate(self):
  555. """
  556. This method is a simple rate limiter designed not to allow
  557. more than one request by `RL_DELAY` seconds. `RL_BURST` specify
  558. how many requests can be made in excess of the given rate at the
  559. begining. When requests are received too fast, raises the
  560. `RateLimitExceededException`.
  561. """
  562. now = time.time()
  563. if len(self._incoming_frame_timestamps) >= self.RL_BURST:
  564. elapsed_time = now - self._incoming_frame_timestamps[0]
  565. if elapsed_time < self.RL_DELAY * self.RL_BURST:
  566. raise RateLimitExceededException()
  567. self._incoming_frame_timestamps.append(now)
  568. def _trigger_lifecycle_event(self, event_type):
  569. """
  570. Trigger a lifecycle event that is, call every function
  571. registered for this event type. Every callback is given both the
  572. environment and the related websocket.
  573. """
  574. if not self.__event_callbacks[event_type]:
  575. return
  576. with closing(acquire_cursor(self._db)) as cr:
  577. lang = api.Environment(cr, self._session.uid, {})['res.lang']._get_code(self._session.context.get('lang'))
  578. env = api.Environment(cr, self._session.uid, dict(self._session.context, lang=lang))
  579. for callback in self.__event_callbacks[event_type]:
  580. try:
  581. service_model.retrying(functools.partial(callback, env, self), env)
  582. except Exception:
  583. _logger.warning(
  584. 'Error during Websocket %s callback',
  585. LifecycleEvent(event_type).name,
  586. exc_info=True
  587. )
  588. def _dispatch_bus_notifications(self):
  589. """
  590. Dispatch notifications related to the registered channels. If
  591. the session is expired, close the connection with the
  592. `SESSION_EXPIRED` close code. If no cursor can be acquired,
  593. close the connection with the `TRY_LATER` close code.
  594. """
  595. session = root.session_store.get(self._session.sid)
  596. if not session:
  597. raise SessionExpiredException()
  598. with acquire_cursor(session.db) as cr:
  599. env = api.Environment(cr, session.uid, dict(session.context, lang=None))
  600. if session.uid is not None and not check_session(session, env):
  601. raise SessionExpiredException()
  602. # Mark the notification request as processed.
  603. self.__notif_sock_r.recv(1)
  604. notifications = env["bus.bus"]._poll(
  605. self._channels, self._last_notif_sent_id, [n[0] for n in self._notif_history]
  606. )
  607. if not notifications:
  608. return
  609. for notif in notifications:
  610. bisect.insort(self._notif_history, (notif["id"], time.time()), key=lambda x: x[0])
  611. # Discard all the smallest notification ids that have expired and
  612. # increment the last id accordingly. History can only be trimmed of ids
  613. # that are below the new last id otherwise some notifications might be
  614. # dispatched again.
  615. # For example, if the theshold is 10s, and the state is:
  616. # last id 2, history [(3, 8s), (6, 10s), (7, 7s)]
  617. # If 6 is removed because it is above the threshold, the next query will
  618. # be (id > 2 AND id NOT IN (3, 7)) which will fetch 6 again.
  619. # 6 can only be removed after 3 reaches the threshold and is removed as
  620. # well, and if 4 appears in the meantime, 3 can be removed but 6 will
  621. # have to wait for 4 to reach the threshold as well.
  622. last_index = -1
  623. for i, notif in enumerate(self._notif_history):
  624. if time.time() - notif[1] > self.MAX_NOTIFICATION_HISTORY_SEC:
  625. last_index = i
  626. else:
  627. break
  628. if last_index != -1:
  629. self._last_notif_sent_id = self._notif_history[last_index][0]
  630. self._notif_history = self._notif_history[last_index + 1 :]
  631. self._send(notifications)
  632. class TimeoutReason(IntEnum):
  633. KEEP_ALIVE = 0
  634. NO_RESPONSE = 1
  635. class TimeoutManager:
  636. """
  637. This class handles the Websocket timeouts. If no response to a
  638. PING/CLOSE frame is received after `TIMEOUT` seconds or if the
  639. connection is opened for more than `self._keep_alive_timeout` seconds,
  640. the connection is considered to have timed out. To determine if the
  641. connection has timed out, use the `has_timed_out` method.
  642. """
  643. TIMEOUT = 15
  644. # Timeout specifying how many seconds the connection should be kept
  645. # alive.
  646. KEEP_ALIVE_TIMEOUT = int(config['websocket_keep_alive_timeout'])
  647. def __init__(self):
  648. super().__init__()
  649. self._awaited_opcode = None
  650. # Time in which the connection was opened.
  651. self._opened_at = time.time()
  652. # Custom keep alive timeout for each TimeoutManager to avoid multiple
  653. # connections timing out at the same time.
  654. self._keep_alive_timeout = (
  655. self.KEEP_ALIVE_TIMEOUT + random.uniform(0, self.KEEP_ALIVE_TIMEOUT / 2)
  656. )
  657. self.timeout_reason = None
  658. # Start time recorded when we started awaiting an answer to a
  659. # PING/CLOSE frame.
  660. self._waiting_start_time = None
  661. def acknowledge_frame_receipt(self, frame):
  662. if self._awaited_opcode is frame.opcode:
  663. self._awaited_opcode = None
  664. self._waiting_start_time = None
  665. def acknowledge_frame_sent(self, frame):
  666. """
  667. Acknowledge a frame was sent. If this frame is a PING/CLOSE
  668. frame, start waiting for an answer.
  669. """
  670. if self.has_timed_out():
  671. return
  672. if frame.opcode is Opcode.PING:
  673. self._awaited_opcode = Opcode.PONG
  674. elif frame.opcode is Opcode.CLOSE:
  675. self._awaited_opcode = Opcode.CLOSE
  676. if self._awaited_opcode is not None:
  677. self._waiting_start_time = time.time()
  678. def has_timed_out(self):
  679. """
  680. Determine whether the connection has timed out or not. The
  681. connection times out when the answer to a CLOSE/PING frame
  682. is not received within `TIMEOUT` seconds or if the connection
  683. is opened for more than `self._keep_alive_timeout` seconds.
  684. """
  685. now = time.time()
  686. if now - self._opened_at >= self._keep_alive_timeout:
  687. self.timeout_reason = TimeoutReason.KEEP_ALIVE
  688. return True
  689. if self._awaited_opcode and now - self._waiting_start_time >= self.TIMEOUT:
  690. self.timeout_reason = TimeoutReason.NO_RESPONSE
  691. return True
  692. return False
  693. # ------------------------------------------------------
  694. # WEBSOCKET SERVING
  695. # ------------------------------------------------------
  696. _wsrequest_stack = LocalStack()
  697. wsrequest = _wsrequest_stack()
  698. class WebsocketRequest:
  699. def __init__(self, db, httprequest, websocket):
  700. self.db = db
  701. self.httprequest = httprequest
  702. self.session = None
  703. self.ws = websocket
  704. def __enter__(self):
  705. _wsrequest_stack.push(self)
  706. return self
  707. def __exit__(self, *args):
  708. _wsrequest_stack.pop()
  709. def serve_websocket_message(self, message):
  710. try:
  711. jsonrequest = json.loads(message)
  712. event_name = jsonrequest['event_name'] # mandatory
  713. except KeyError as exc:
  714. raise InvalidWebsocketRequest(
  715. f'Key {exc.args[0]!r} is missing from request'
  716. ) from exc
  717. except ValueError as exc:
  718. raise InvalidWebsocketRequest(
  719. f'Invalid JSON data, {exc.args[0]}'
  720. ) from exc
  721. data = jsonrequest.get('data')
  722. self.session = self._get_session()
  723. try:
  724. self.registry = Registry(self.db)
  725. threading.current_thread().dbname = self.registry.db_name
  726. self.registry.check_signaling()
  727. except (
  728. AttributeError, psycopg2.OperationalError, psycopg2.ProgrammingError
  729. ) as exc:
  730. raise InvalidDatabaseException() from exc
  731. with closing(acquire_cursor(self.db)) as cr:
  732. lang = api.Environment(cr, self.session.uid, {})['res.lang']._get_code(self.session.context.get('lang'))
  733. self.env = api.Environment(cr, self.session.uid, dict(self.session.context, lang=lang))
  734. threading.current_thread().uid = self.env.uid
  735. service_model.retrying(
  736. functools.partial(self._serve_ir_websocket, event_name, data),
  737. self.env,
  738. )
  739. def _serve_ir_websocket(self, event_name, data):
  740. """
  741. Delegate most of the processing to the ir.websocket model
  742. which is extensible by applications. Directly call the
  743. appropriate ir.websocket method since only two events are
  744. tolerated: `subscribe` and `update_presence`.
  745. """
  746. self.env['ir.websocket']._authenticate()
  747. if event_name == 'subscribe':
  748. self.env['ir.websocket']._subscribe(data)
  749. if event_name == 'update_presence':
  750. self.env['ir.websocket']._update_bus_presence(**data)
  751. def _get_session(self):
  752. session = root.session_store.get(self.ws._session.sid)
  753. if not session:
  754. raise SessionExpiredException()
  755. return session
  756. def update_env(self, user=None, context=None, su=None):
  757. """
  758. Update the environment of the current websocket request.
  759. """
  760. Request.update_env(self, user, context, su)
  761. def update_context(self, **overrides):
  762. """
  763. Override the environment context of the current request with the
  764. values of ``overrides``. To replace the entire context, please
  765. use :meth:`~update_env` instead.
  766. """
  767. self.update_env(context=dict(self.env.context, **overrides))
  768. @lazy_property
  769. def cookies(self):
  770. cookies = MultiDict(self.httprequest.cookies)
  771. if self.registry:
  772. self.registry['ir.http']._sanitize_cookies(cookies)
  773. return ImmutableMultiDict(cookies)
  774. class WebsocketConnectionHandler:
  775. SUPPORTED_VERSIONS = {'13'}
  776. # Given by the RFC in order to generate Sec-WebSocket-Accept from
  777. # Sec-WebSocket-Key value.
  778. _HANDSHAKE_GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
  779. _REQUIRED_HANDSHAKE_HEADERS = {
  780. 'connection', 'host', 'sec-websocket-key',
  781. 'sec-websocket-version', 'upgrade', 'origin',
  782. }
  783. # Latest version of the websocket worker. This version should be incremented
  784. # every time `websocket_worker.js` is modified to force the browser to fetch
  785. # the new worker bundle.
  786. _VERSION = "18.0-3"
  787. @classmethod
  788. def websocket_allowed(cls, request):
  789. return not modules.module.current_test
  790. @classmethod
  791. def open_connection(cls, request, version):
  792. """
  793. Open a websocket connection if the handshake is successfull.
  794. :return: Response indicating the server performed a connection
  795. upgrade.
  796. :raise: UpgradeRequired if there is no intersection between the
  797. versions the client supports and those we support.
  798. :raise: BadRequest if the handshake data is incorrect.
  799. """
  800. if not cls.websocket_allowed(request):
  801. raise ServiceUnavailable("Websocket is disabled in test mode")
  802. public_session = cls._handle_public_configuration(request)
  803. try:
  804. response = cls._get_handshake_response(request.httprequest.headers)
  805. socket = request.httprequest._HTTPRequest__environ['socket']
  806. session, db, httprequest = (public_session or request.session), request.db, request.httprequest
  807. response.call_on_close(lambda: cls._serve_forever(
  808. Websocket(socket, session, httprequest.cookies),
  809. db,
  810. httprequest,
  811. version
  812. ))
  813. # Force save the session. Session must be persisted to handle
  814. # WebSocket authentication.
  815. request.session.is_dirty = True
  816. return response
  817. except KeyError as exc:
  818. raise RuntimeError(
  819. f"Couldn't bind the websocket. Is the connection opened on the evented port ({config['gevent_port']})?"
  820. ) from exc
  821. except HTTPException as exc:
  822. # The HTTP stack does not log exceptions derivated from the
  823. # HTTPException class since they are valid responses.
  824. _logger.error(exc)
  825. raise
  826. @classmethod
  827. def _get_handshake_response(cls, headers):
  828. """
  829. :return: Response indicating the server performed a connection
  830. upgrade.
  831. :raise: BadRequest
  832. :raise: UpgradeRequired
  833. """
  834. cls._assert_handshake_validity(headers)
  835. # sha-1 is used as it is required by
  836. # https://datatracker.ietf.org/doc/html/rfc6455#page-7
  837. accept_header = hashlib.sha1(
  838. (headers['sec-websocket-key'] + cls._HANDSHAKE_GUID).encode()).digest()
  839. accept_header = base64.b64encode(accept_header)
  840. return Response(status=101, headers={
  841. 'Upgrade': 'websocket',
  842. 'Connection': 'Upgrade',
  843. 'Sec-WebSocket-Accept': accept_header.decode(),
  844. })
  845. @classmethod
  846. def _handle_public_configuration(cls, request):
  847. if not os.getenv('ODOO_BUS_PUBLIC_SAMESITE_WS'):
  848. return
  849. headers = request.httprequest.headers
  850. origin_url = urlparse(headers.get('origin'))
  851. if origin_url.netloc != headers.get('host') or origin_url.scheme != request.httprequest.scheme:
  852. _logger.warning(
  853. 'Downgrading websocket session. Host=%(host)s, Origin=%(origin)s, Scheme=%(scheme)s.',
  854. {
  855. 'host': headers.get('host'),
  856. 'origin': headers.get('origin'),
  857. 'scheme': request.httprequest.scheme,
  858. },
  859. )
  860. session = root.session_store.new()
  861. session.update(get_default_session(), db=request.session.db)
  862. root.session_store.save(session)
  863. return session
  864. return None
  865. @classmethod
  866. def _assert_handshake_validity(cls, headers):
  867. """
  868. :raise: UpgradeRequired if there is no intersection between
  869. the version the client supports and those we support.
  870. :raise: BadRequest in case of invalid handshake.
  871. """
  872. missing_or_empty_headers = {
  873. header for header in cls._REQUIRED_HANDSHAKE_HEADERS
  874. if header not in headers
  875. }
  876. if missing_or_empty_headers:
  877. raise BadRequest(
  878. f"""Empty or missing header(s): {', '.join(missing_or_empty_headers)}"""
  879. )
  880. if headers['upgrade'].lower() != 'websocket':
  881. raise BadRequest('Invalid upgrade header')
  882. if 'upgrade' not in headers['connection'].lower():
  883. raise BadRequest('Invalid connection header')
  884. if headers['sec-websocket-version'] not in cls.SUPPORTED_VERSIONS:
  885. raise UpgradeRequired()
  886. key = headers['sec-websocket-key']
  887. try:
  888. decoded_key = base64.b64decode(key, validate=True)
  889. except ValueError:
  890. raise BadRequest("Sec-WebSocket-Key should be b64 encoded")
  891. if len(decoded_key) != 16:
  892. raise BadRequest(
  893. "Sec-WebSocket-Key should be of length 16 once decoded"
  894. )
  895. @classmethod
  896. def _serve_forever(cls, websocket, db, httprequest, version):
  897. """
  898. Process incoming messages and dispatch them to the application.
  899. """
  900. current_thread = threading.current_thread()
  901. current_thread.type = 'websocket'
  902. if httprequest.user_agent and version != cls._VERSION:
  903. # Close the connection from an outdated worker. We can't use a
  904. # custom close code because the connection is considered successful,
  905. # preventing exponential reconnect backoff. This would cause old
  906. # workers to reconnect frequently, putting pressure on the server.
  907. # Clean closes don't trigger reconnections, assuming they are
  908. # intentional. The reason indicates to the origin worker not to
  909. # reconnect, preventing old workers from lingering after updates.
  910. # Non browsers are ignored since IOT devices do not provide the
  911. # worker version.
  912. websocket.disconnect(CloseCode.CLEAN, "OUTDATED_VERSION")
  913. for message in websocket.get_messages():
  914. with WebsocketRequest(db, httprequest, websocket) as req:
  915. try:
  916. req.serve_websocket_message(message)
  917. except SessionExpiredException:
  918. websocket.disconnect(CloseCode.SESSION_EXPIRED)
  919. except PoolError:
  920. websocket.disconnect(CloseCode.TRY_LATER)
  921. except Exception:
  922. _logger.exception("Exception occurred during websocket request handling")
  923. def _kick_all():
  924. """ Disconnect all the websocket instances. """
  925. for websocket in _websocket_instances:
  926. if websocket.state is ConnectionState.OPEN:
  927. websocket.disconnect(CloseCode.GOING_AWAY)
  928. CommonServer.on_stop(_kick_all)
上海开阖软件有限公司 沪ICP备12045867号-1