本站源代码
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

334 lines
8.2KB

  1. package memcached
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "math"
  8. "github.com/couchbase/gomemcached"
  9. "github.com/couchbase/goutils/logging"
  10. )
  11. // TAP protocol docs: <http://www.couchbase.com/wiki/display/couchbase/TAP+Protocol>
  12. // TapOpcode is the tap operation type (found in TapEvent)
  13. type TapOpcode uint8
  14. // Tap opcode values.
  15. const (
  16. TapBeginBackfill = TapOpcode(iota)
  17. TapEndBackfill
  18. TapMutation
  19. TapDeletion
  20. TapCheckpointStart
  21. TapCheckpointEnd
  22. tapEndStream
  23. )
  24. const tapMutationExtraLen = 16
  25. var tapOpcodeNames map[TapOpcode]string
  26. func init() {
  27. tapOpcodeNames = map[TapOpcode]string{
  28. TapBeginBackfill: "BeginBackfill",
  29. TapEndBackfill: "EndBackfill",
  30. TapMutation: "Mutation",
  31. TapDeletion: "Deletion",
  32. TapCheckpointStart: "TapCheckpointStart",
  33. TapCheckpointEnd: "TapCheckpointEnd",
  34. tapEndStream: "EndStream",
  35. }
  36. }
  37. func (opcode TapOpcode) String() string {
  38. name := tapOpcodeNames[opcode]
  39. if name == "" {
  40. name = fmt.Sprintf("#%d", opcode)
  41. }
  42. return name
  43. }
  44. // TapEvent is a TAP notification of an operation on the server.
  45. type TapEvent struct {
  46. Opcode TapOpcode // Type of event
  47. VBucket uint16 // VBucket this event applies to
  48. Flags uint32 // Item flags
  49. Expiry uint32 // Item expiration time
  50. Key, Value []byte // Item key/value
  51. Cas uint64
  52. }
  53. func makeTapEvent(req gomemcached.MCRequest) *TapEvent {
  54. event := TapEvent{
  55. VBucket: req.VBucket,
  56. }
  57. switch req.Opcode {
  58. case gomemcached.TAP_MUTATION:
  59. event.Opcode = TapMutation
  60. event.Key = req.Key
  61. event.Value = req.Body
  62. event.Cas = req.Cas
  63. case gomemcached.TAP_DELETE:
  64. event.Opcode = TapDeletion
  65. event.Key = req.Key
  66. event.Cas = req.Cas
  67. case gomemcached.TAP_CHECKPOINT_START:
  68. event.Opcode = TapCheckpointStart
  69. case gomemcached.TAP_CHECKPOINT_END:
  70. event.Opcode = TapCheckpointEnd
  71. case gomemcached.TAP_OPAQUE:
  72. if len(req.Extras) < 8+4 {
  73. return nil
  74. }
  75. switch op := int(binary.BigEndian.Uint32(req.Extras[8:])); op {
  76. case gomemcached.TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
  77. event.Opcode = TapBeginBackfill
  78. case gomemcached.TAP_OPAQUE_CLOSE_BACKFILL:
  79. event.Opcode = TapEndBackfill
  80. case gomemcached.TAP_OPAQUE_CLOSE_TAP_STREAM:
  81. event.Opcode = tapEndStream
  82. case gomemcached.TAP_OPAQUE_ENABLE_AUTO_NACK:
  83. return nil
  84. case gomemcached.TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
  85. return nil
  86. default:
  87. logging.Infof("TapFeed: Ignoring TAP_OPAQUE/%d", op)
  88. return nil // unknown opaque event
  89. }
  90. case gomemcached.NOOP:
  91. return nil // ignore
  92. default:
  93. logging.Infof("TapFeed: Ignoring %s", req.Opcode)
  94. return nil // unknown event
  95. }
  96. if len(req.Extras) >= tapMutationExtraLen &&
  97. (event.Opcode == TapMutation || event.Opcode == TapDeletion) {
  98. event.Flags = binary.BigEndian.Uint32(req.Extras[8:])
  99. event.Expiry = binary.BigEndian.Uint32(req.Extras[12:])
  100. }
  101. return &event
  102. }
  103. func (event TapEvent) String() string {
  104. switch event.Opcode {
  105. case TapBeginBackfill, TapEndBackfill, TapCheckpointStart, TapCheckpointEnd:
  106. return fmt.Sprintf("<TapEvent %s, vbucket=%d>",
  107. event.Opcode, event.VBucket)
  108. default:
  109. return fmt.Sprintf("<TapEvent %s, key=%q (%d bytes) flags=%x, exp=%d>",
  110. event.Opcode, event.Key, len(event.Value),
  111. event.Flags, event.Expiry)
  112. }
  113. }
  114. // TapArguments are parameters for requesting a TAP feed.
  115. //
  116. // Call DefaultTapArguments to get a default one.
  117. type TapArguments struct {
  118. // Timestamp of oldest item to send.
  119. //
  120. // Use TapNoBackfill to suppress all past items.
  121. Backfill uint64
  122. // If set, server will disconnect after sending existing items.
  123. Dump bool
  124. // The indices of the vbuckets to watch; empty/nil to watch all.
  125. VBuckets []uint16
  126. // Transfers ownership of vbuckets during cluster rebalance.
  127. Takeover bool
  128. // If true, server will wait for client ACK after every notification.
  129. SupportAck bool
  130. // If true, client doesn't want values so server shouldn't send them.
  131. KeysOnly bool
  132. // If true, client wants the server to send checkpoint events.
  133. Checkpoint bool
  134. // Optional identifier to use for this client, to allow reconnects
  135. ClientName string
  136. // Registers this client (by name) till explicitly deregistered.
  137. RegisteredClient bool
  138. }
  139. // Value for TapArguments.Backfill denoting that no past events at all
  140. // should be sent.
  141. const TapNoBackfill = math.MaxUint64
  142. // DefaultTapArguments returns a default set of parameter values to
  143. // pass to StartTapFeed.
  144. func DefaultTapArguments() TapArguments {
  145. return TapArguments{
  146. Backfill: TapNoBackfill,
  147. }
  148. }
  149. func (args *TapArguments) flags() []byte {
  150. var flags gomemcached.TapConnectFlag
  151. if args.Backfill != 0 {
  152. flags |= gomemcached.BACKFILL
  153. }
  154. if args.Dump {
  155. flags |= gomemcached.DUMP
  156. }
  157. if len(args.VBuckets) > 0 {
  158. flags |= gomemcached.LIST_VBUCKETS
  159. }
  160. if args.Takeover {
  161. flags |= gomemcached.TAKEOVER_VBUCKETS
  162. }
  163. if args.SupportAck {
  164. flags |= gomemcached.SUPPORT_ACK
  165. }
  166. if args.KeysOnly {
  167. flags |= gomemcached.REQUEST_KEYS_ONLY
  168. }
  169. if args.Checkpoint {
  170. flags |= gomemcached.CHECKPOINT
  171. }
  172. if args.RegisteredClient {
  173. flags |= gomemcached.REGISTERED_CLIENT
  174. }
  175. encoded := make([]byte, 4)
  176. binary.BigEndian.PutUint32(encoded, uint32(flags))
  177. return encoded
  178. }
  179. func must(err error) {
  180. if err != nil {
  181. panic(err)
  182. }
  183. }
  184. func (args *TapArguments) bytes() (rv []byte) {
  185. buf := bytes.NewBuffer([]byte{})
  186. if args.Backfill > 0 {
  187. must(binary.Write(buf, binary.BigEndian, uint64(args.Backfill)))
  188. }
  189. if len(args.VBuckets) > 0 {
  190. must(binary.Write(buf, binary.BigEndian, uint16(len(args.VBuckets))))
  191. for i := 0; i < len(args.VBuckets); i++ {
  192. must(binary.Write(buf, binary.BigEndian, uint16(args.VBuckets[i])))
  193. }
  194. }
  195. return buf.Bytes()
  196. }
  197. // TapFeed represents a stream of events from a server.
  198. type TapFeed struct {
  199. C <-chan TapEvent
  200. Error error
  201. closer chan bool
  202. }
  203. // StartTapFeed starts a TAP feed on a client connection.
  204. //
  205. // The events can be read from the returned channel. The connection
  206. // can no longer be used for other purposes; it's now reserved for
  207. // receiving the TAP messages. To stop receiving events, close the
  208. // client connection.
  209. func (mc *Client) StartTapFeed(args TapArguments) (*TapFeed, error) {
  210. rq := &gomemcached.MCRequest{
  211. Opcode: gomemcached.TAP_CONNECT,
  212. Key: []byte(args.ClientName),
  213. Extras: args.flags(),
  214. Body: args.bytes()}
  215. err := mc.Transmit(rq)
  216. if err != nil {
  217. return nil, err
  218. }
  219. ch := make(chan TapEvent)
  220. feed := &TapFeed{
  221. C: ch,
  222. closer: make(chan bool),
  223. }
  224. go mc.runFeed(ch, feed)
  225. return feed, nil
  226. }
  227. // TapRecvHook is called after every incoming tap packet is received.
  228. var TapRecvHook func(*gomemcached.MCRequest, int, error)
  229. // Internal goroutine that reads from the socket and writes events to
  230. // the channel
  231. func (mc *Client) runFeed(ch chan TapEvent, feed *TapFeed) {
  232. defer close(ch)
  233. var headerBuf [gomemcached.HDR_LEN]byte
  234. loop:
  235. for {
  236. // Read the next request from the server.
  237. //
  238. // (Can't call mc.Receive() because it reads a
  239. // _response_ not a request.)
  240. var pkt gomemcached.MCRequest
  241. n, err := pkt.Receive(mc.conn, headerBuf[:])
  242. if TapRecvHook != nil {
  243. TapRecvHook(&pkt, n, err)
  244. }
  245. if err != nil {
  246. if err != io.EOF {
  247. feed.Error = err
  248. }
  249. break loop
  250. }
  251. //logging.Infof("** TapFeed received %#v : %q", pkt, pkt.Body)
  252. if pkt.Opcode == gomemcached.TAP_CONNECT {
  253. // This is not an event from the server; it's
  254. // an error response to my connect request.
  255. feed.Error = fmt.Errorf("tap connection failed: %s", pkt.Body)
  256. break loop
  257. }
  258. event := makeTapEvent(pkt)
  259. if event != nil {
  260. if event.Opcode == tapEndStream {
  261. break loop
  262. }
  263. select {
  264. case ch <- *event:
  265. case <-feed.closer:
  266. break loop
  267. }
  268. }
  269. if len(pkt.Extras) >= 4 {
  270. reqFlags := binary.BigEndian.Uint16(pkt.Extras[2:])
  271. if reqFlags&gomemcached.TAP_ACK != 0 {
  272. if _, err := mc.sendAck(&pkt); err != nil {
  273. feed.Error = err
  274. break loop
  275. }
  276. }
  277. }
  278. }
  279. if err := mc.Close(); err != nil {
  280. logging.Errorf("Error closing memcached client: %v", err)
  281. }
  282. }
  283. func (mc *Client) sendAck(pkt *gomemcached.MCRequest) (int, error) {
  284. res := gomemcached.MCResponse{
  285. Opcode: pkt.Opcode,
  286. Opaque: pkt.Opaque,
  287. Status: gomemcached.SUCCESS,
  288. }
  289. return res.Transmit(mc.conn)
  290. }
  291. // Close terminates a TapFeed.
  292. //
  293. // Call this if you stop using a TapFeed before its channel ends.
  294. func (feed *TapFeed) Close() {
  295. close(feed.closer)
  296. }
上海开阖软件有限公司 沪ICP备12045867号-1