gooderp18绿色标准版
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

321 行
11KB

  1. /*-------------------------------------------------------------------------
  2. *
  3. * walreceiver.h
  4. * Exports from replication/walreceiverfuncs.c.
  5. *
  6. * Portions Copyright (c) 2010-2019, PostgreSQL Global Development Group
  7. *
  8. * src/include/replication/walreceiver.h
  9. *
  10. *-------------------------------------------------------------------------
  11. */
  12. #ifndef _WALRECEIVER_H
  13. #define _WALRECEIVER_H
  14. #include "access/xlog.h"
  15. #include "access/xlogdefs.h"
  16. #include "fmgr.h"
  17. #include "getaddrinfo.h" /* for NI_MAXHOST */
  18. #include "replication/logicalproto.h"
  19. #include "replication/walsender.h"
  20. #include "storage/latch.h"
  21. #include "storage/spin.h"
  22. #include "pgtime.h"
  23. #include "utils/tuplestore.h"
  24. /* user-settable parameters */
  25. extern int wal_receiver_status_interval;
  26. extern int wal_receiver_timeout;
  27. extern bool hot_standby_feedback;
  28. /*
  29. * MAXCONNINFO: maximum size of a connection string.
  30. *
  31. * XXX: Should this move to pg_config_manual.h?
  32. */
  33. #define MAXCONNINFO 1024
  34. /* Can we allow the standby to accept replication connection from another standby? */
  35. #define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0)
  36. /*
  37. * Values for WalRcv->walRcvState.
  38. */
  39. typedef enum
  40. {
  41. WALRCV_STOPPED, /* stopped and mustn't start up again */
  42. WALRCV_STARTING, /* launched, but the process hasn't
  43. * initialized yet */
  44. WALRCV_STREAMING, /* walreceiver is streaming */
  45. WALRCV_WAITING, /* stopped streaming, waiting for orders */
  46. WALRCV_RESTARTING, /* asked to restart streaming */
  47. WALRCV_STOPPING /* requested to stop, but still running */
  48. } WalRcvState;
  49. /* Shared memory area for management of walreceiver process */
  50. typedef struct
  51. {
  52. /*
  53. * PID of currently active walreceiver process, its current state and
  54. * start time (actually, the time at which it was requested to be
  55. * started).
  56. */
  57. pid_t pid;
  58. WalRcvState walRcvState;
  59. pg_time_t startTime;
  60. /*
  61. * receiveStart and receiveStartTLI indicate the first byte position and
  62. * timeline that will be received. When startup process starts the
  63. * walreceiver, it sets these to the point where it wants the streaming to
  64. * begin.
  65. */
  66. XLogRecPtr receiveStart;
  67. TimeLineID receiveStartTLI;
  68. /*
  69. * receivedUpto-1 is the last byte position that has already been
  70. * received, and receivedTLI is the timeline it came from. At the first
  71. * startup of walreceiver, these are set to receiveStart and
  72. * receiveStartTLI. After that, walreceiver updates these whenever it
  73. * flushes the received WAL to disk.
  74. */
  75. XLogRecPtr receivedUpto;
  76. TimeLineID receivedTLI;
  77. /*
  78. * latestChunkStart is the starting byte position of the current "batch"
  79. * of received WAL. It's actually the same as the previous value of
  80. * receivedUpto before the last flush to disk. Startup process can use
  81. * this to detect whether it's keeping up or not.
  82. */
  83. XLogRecPtr latestChunkStart;
  84. /*
  85. * Time of send and receive of any message received.
  86. */
  87. TimestampTz lastMsgSendTime;
  88. TimestampTz lastMsgReceiptTime;
  89. /*
  90. * Latest reported end of WAL on the sender
  91. */
  92. XLogRecPtr latestWalEnd;
  93. TimestampTz latestWalEndTime;
  94. /*
  95. * connection string; initially set to connect to the primary, and later
  96. * clobbered to hide security-sensitive fields.
  97. */
  98. char conninfo[MAXCONNINFO];
  99. /*
  100. * Host name (this can be a host name, an IP address, or a directory path)
  101. * and port number of the active replication connection.
  102. */
  103. char sender_host[NI_MAXHOST];
  104. int sender_port;
  105. /*
  106. * replication slot name; is also used for walreceiver to connect with the
  107. * primary
  108. */
  109. char slotname[NAMEDATALEN];
  110. /* set true once conninfo is ready to display (obfuscated pwds etc) */
  111. bool ready_to_display;
  112. /*
  113. * Latch used by startup process to wake up walreceiver after telling it
  114. * where to start streaming (after setting receiveStart and
  115. * receiveStartTLI), and also to tell it to send apply feedback to the
  116. * primary whenever specially marked commit records are applied. This is
  117. * normally mapped to procLatch when walreceiver is running.
  118. */
  119. Latch *latch;
  120. slock_t mutex; /* locks shared variables shown above */
  121. /*
  122. * force walreceiver reply? This doesn't need to be locked; memory
  123. * barriers for ordering are sufficient. But we do need atomic fetch and
  124. * store semantics, so use sig_atomic_t.
  125. */
  126. sig_atomic_t force_reply; /* used as a bool */
  127. } WalRcvData;
  128. extern WalRcvData *WalRcv;
  129. typedef struct
  130. {
  131. bool logical; /* True if this is logical replication stream,
  132. * false if physical stream. */
  133. char *slotname; /* Name of the replication slot or NULL. */
  134. XLogRecPtr startpoint; /* LSN of starting point. */
  135. union
  136. {
  137. struct
  138. {
  139. TimeLineID startpointTLI; /* Starting timeline */
  140. } physical;
  141. struct
  142. {
  143. uint32 proto_version; /* Logical protocol version */
  144. List *publication_names; /* String list of publications */
  145. } logical;
  146. } proto;
  147. } WalRcvStreamOptions;
  148. struct WalReceiverConn;
  149. typedef struct WalReceiverConn WalReceiverConn;
  150. /*
  151. * Status of walreceiver query execution.
  152. *
  153. * We only define statuses that are currently used.
  154. */
  155. typedef enum
  156. {
  157. WALRCV_ERROR, /* There was error when executing the query. */
  158. WALRCV_OK_COMMAND, /* Query executed utility or replication
  159. * command. */
  160. WALRCV_OK_TUPLES, /* Query returned tuples. */
  161. WALRCV_OK_COPY_IN, /* Query started COPY FROM. */
  162. WALRCV_OK_COPY_OUT, /* Query started COPY TO. */
  163. WALRCV_OK_COPY_BOTH /* Query started COPY BOTH replication
  164. * protocol. */
  165. } WalRcvExecStatus;
  166. /*
  167. * Return value for walrcv_query, returns the status of the execution and
  168. * tuples if any.
  169. */
  170. typedef struct WalRcvExecResult
  171. {
  172. WalRcvExecStatus status;
  173. char *err;
  174. Tuplestorestate *tuplestore;
  175. TupleDesc tupledesc;
  176. } WalRcvExecResult;
  177. /* libpqwalreceiver hooks */
  178. typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logical,
  179. const char *appname,
  180. char **err);
  181. typedef void (*walrcv_check_conninfo_fn) (const char *conninfo);
  182. typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
  183. typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
  184. char **sender_host,
  185. int *sender_port);
  186. typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
  187. TimeLineID *primary_tli);
  188. typedef int (*walrcv_server_version_fn) (WalReceiverConn *conn);
  189. typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn,
  190. TimeLineID tli,
  191. char **filename,
  192. char **content, int *size);
  193. typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn,
  194. const WalRcvStreamOptions *options);
  195. typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn,
  196. TimeLineID *next_tli);
  197. typedef int (*walrcv_receive_fn) (WalReceiverConn *conn, char **buffer,
  198. pgsocket *wait_fd);
  199. typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer,
  200. int nbytes);
  201. typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
  202. const char *slotname, bool temporary,
  203. CRSSnapshotAction snapshot_action,
  204. XLogRecPtr *lsn);
  205. typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
  206. const char *query,
  207. const int nRetTypes,
  208. const Oid *retTypes);
  209. typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
  210. typedef struct WalReceiverFunctionsType
  211. {
  212. walrcv_connect_fn walrcv_connect;
  213. walrcv_check_conninfo_fn walrcv_check_conninfo;
  214. walrcv_get_conninfo_fn walrcv_get_conninfo;
  215. walrcv_get_senderinfo_fn walrcv_get_senderinfo;
  216. walrcv_identify_system_fn walrcv_identify_system;
  217. walrcv_server_version_fn walrcv_server_version;
  218. walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
  219. walrcv_startstreaming_fn walrcv_startstreaming;
  220. walrcv_endstreaming_fn walrcv_endstreaming;
  221. walrcv_receive_fn walrcv_receive;
  222. walrcv_send_fn walrcv_send;
  223. walrcv_create_slot_fn walrcv_create_slot;
  224. walrcv_exec_fn walrcv_exec;
  225. walrcv_disconnect_fn walrcv_disconnect;
  226. } WalReceiverFunctionsType;
  227. extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
  228. #define walrcv_connect(conninfo, logical, appname, err) \
  229. WalReceiverFunctions->walrcv_connect(conninfo, logical, appname, err)
  230. #define walrcv_check_conninfo(conninfo) \
  231. WalReceiverFunctions->walrcv_check_conninfo(conninfo)
  232. #define walrcv_get_conninfo(conn) \
  233. WalReceiverFunctions->walrcv_get_conninfo(conn)
  234. #define walrcv_get_senderinfo(conn, sender_host, sender_port) \
  235. WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
  236. #define walrcv_identify_system(conn, primary_tli) \
  237. WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
  238. #define walrcv_server_version(conn) \
  239. WalReceiverFunctions->walrcv_server_version(conn)
  240. #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
  241. WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
  242. #define walrcv_startstreaming(conn, options) \
  243. WalReceiverFunctions->walrcv_startstreaming(conn, options)
  244. #define walrcv_endstreaming(conn, next_tli) \
  245. WalReceiverFunctions->walrcv_endstreaming(conn, next_tli)
  246. #define walrcv_receive(conn, buffer, wait_fd) \
  247. WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
  248. #define walrcv_send(conn, buffer, nbytes) \
  249. WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
  250. #define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) \
  251. WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
  252. #define walrcv_exec(conn, exec, nRetTypes, retTypes) \
  253. WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
  254. #define walrcv_disconnect(conn) \
  255. WalReceiverFunctions->walrcv_disconnect(conn)
  256. static inline void
  257. walrcv_clear_result(WalRcvExecResult *walres)
  258. {
  259. if (!walres)
  260. return;
  261. if (walres->err)
  262. pfree(walres->err);
  263. if (walres->tuplestore)
  264. tuplestore_end(walres->tuplestore);
  265. if (walres->tupledesc)
  266. FreeTupleDesc(walres->tupledesc);
  267. pfree(walres);
  268. }
  269. /* prototypes for functions in walreceiver.c */
  270. extern void WalReceiverMain(void) pg_attribute_noreturn();
  271. extern void ProcessWalRcvInterrupts(void);
  272. /* prototypes for functions in walreceiverfuncs.c */
  273. extern Size WalRcvShmemSize(void);
  274. extern void WalRcvShmemInit(void);
  275. extern void ShutdownWalRcv(void);
  276. extern bool WalRcvStreaming(void);
  277. extern bool WalRcvRunning(void);
  278. extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
  279. const char *conninfo, const char *slotname);
  280. extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
  281. extern int GetReplicationApplyDelay(void);
  282. extern int GetReplicationTransferLatency(void);
  283. extern void WalRcvForceReply(void);
  284. #endif /* _WALRECEIVER_H */
上海开阖软件有限公司 沪ICP备12045867号-1