本站源代码
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1141 lines
31KB

  1. // Package memcached provides a memcached binary protocol client.
  2. package memcached
  3. import (
  4. "crypto/tls"
  5. "encoding/binary"
  6. "fmt"
  7. "github.com/couchbase/gomemcached"
  8. "github.com/couchbase/goutils/logging"
  9. "github.com/couchbase/goutils/scramsha"
  10. "github.com/pkg/errors"
  11. "io"
  12. "math"
  13. "net"
  14. "strings"
  15. "sync"
  16. "sync/atomic"
  17. "time"
  18. )
  19. type ClientIface interface {
  20. Add(vb uint16, key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error)
  21. Append(vb uint16, key string, data []byte) (*gomemcached.MCResponse, error)
  22. Auth(user, pass string) (*gomemcached.MCResponse, error)
  23. AuthList() (*gomemcached.MCResponse, error)
  24. AuthPlain(user, pass string) (*gomemcached.MCResponse, error)
  25. AuthScramSha(user, pass string) (*gomemcached.MCResponse, error)
  26. CASNext(vb uint16, k string, exp int, state *CASState) bool
  27. CAS(vb uint16, k string, f CasFunc, initexp int) (*gomemcached.MCResponse, error)
  28. CollectionsGetCID(scope string, collection string) (*gomemcached.MCResponse, error)
  29. Close() error
  30. Decr(vb uint16, key string, amt, def uint64, exp int) (uint64, error)
  31. Del(vb uint16, key string) (*gomemcached.MCResponse, error)
  32. EnableMutationToken() (*gomemcached.MCResponse, error)
  33. Get(vb uint16, key string) (*gomemcached.MCResponse, error)
  34. GetCollectionsManifest() (*gomemcached.MCResponse, error)
  35. GetFromCollection(vb uint16, cid uint32, key string) (*gomemcached.MCResponse, error)
  36. GetSubdoc(vb uint16, key string, subPaths []string) (*gomemcached.MCResponse, error)
  37. GetAndTouch(vb uint16, key string, exp int) (*gomemcached.MCResponse, error)
  38. GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string) error
  39. GetMeta(vb uint16, key string) (*gomemcached.MCResponse, error)
  40. GetRandomDoc() (*gomemcached.MCResponse, error)
  41. Hijack() io.ReadWriteCloser
  42. Incr(vb uint16, key string, amt, def uint64, exp int) (uint64, error)
  43. Observe(vb uint16, key string) (result ObserveResult, err error)
  44. ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error)
  45. Receive() (*gomemcached.MCResponse, error)
  46. ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error)
  47. Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error)
  48. Set(vb uint16, key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error)
  49. SetKeepAliveOptions(interval time.Duration)
  50. SetReadDeadline(t time.Time)
  51. SetDeadline(t time.Time)
  52. SelectBucket(bucket string) (*gomemcached.MCResponse, error)
  53. SetCas(vb uint16, key string, flags int, exp int, cas uint64, body []byte) (*gomemcached.MCResponse, error)
  54. Stats(key string) ([]StatValue, error)
  55. StatsMap(key string) (map[string]string, error)
  56. StatsMapForSpecifiedStats(key string, statsMap map[string]string) error
  57. Transmit(req *gomemcached.MCRequest) error
  58. TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error
  59. TransmitResponse(res *gomemcached.MCResponse) error
  60. // UprFeed Related
  61. NewUprFeed() (*UprFeed, error)
  62. NewUprFeedIface() (UprFeedIface, error)
  63. NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error)
  64. NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error)
  65. UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error)
  66. }
  67. const bufsize = 1024
  68. var UnHealthy uint32 = 0
  69. var Healthy uint32 = 1
  70. type Features []Feature
  71. type Feature uint16
  72. const FeatureMutationToken = Feature(0x04)
  73. const FeatureXattr = Feature(0x06)
  74. const FeatureCollections = Feature(0x12)
  75. const FeatureDataType = Feature(0x0b)
  76. type memcachedConnection interface {
  77. io.ReadWriteCloser
  78. SetReadDeadline(time.Time) error
  79. SetDeadline(time.Time) error
  80. }
  81. // The Client itself.
  82. type Client struct {
  83. conn memcachedConnection
  84. // use uint32 type so that it can be accessed through atomic APIs
  85. healthy uint32
  86. opaque uint32
  87. hdrBuf []byte
  88. }
  89. var (
  90. DefaultDialTimeout = time.Duration(0) // No timeout
  91. DefaultWriteTimeout = time.Duration(0) // No timeout
  92. dialFun = func(prot, dest string) (net.Conn, error) {
  93. return net.DialTimeout(prot, dest, DefaultDialTimeout)
  94. }
  95. )
  96. // Connect to a memcached server.
  97. func Connect(prot, dest string) (rv *Client, err error) {
  98. conn, err := dialFun(prot, dest)
  99. if err != nil {
  100. return nil, err
  101. }
  102. return Wrap(conn)
  103. }
  104. // Connect to a memcached server using TLS.
  105. func ConnectTLS(prot, dest string, config *tls.Config) (rv *Client, err error) {
  106. conn, err := tls.Dial(prot, dest, config)
  107. if err != nil {
  108. return nil, err
  109. }
  110. return Wrap(conn)
  111. }
  112. func SetDefaultTimeouts(dial, read, write time.Duration) {
  113. DefaultDialTimeout = dial
  114. DefaultWriteTimeout = write
  115. }
  116. func SetDefaultDialTimeout(dial time.Duration) {
  117. DefaultDialTimeout = dial
  118. }
  119. func (c *Client) SetKeepAliveOptions(interval time.Duration) {
  120. tcpConn, ok := c.conn.(*net.TCPConn)
  121. if ok {
  122. tcpConn.SetKeepAlive(true)
  123. tcpConn.SetKeepAlivePeriod(interval)
  124. }
  125. }
  126. func (c *Client) SetReadDeadline(t time.Time) {
  127. c.conn.SetReadDeadline(t)
  128. }
  129. func (c *Client) SetDeadline(t time.Time) {
  130. c.conn.SetDeadline(t)
  131. }
  132. // Wrap an existing transport.
  133. func Wrap(conn memcachedConnection) (rv *Client, err error) {
  134. client := &Client{
  135. conn: conn,
  136. hdrBuf: make([]byte, gomemcached.HDR_LEN),
  137. opaque: uint32(1),
  138. }
  139. client.setHealthy(true)
  140. return client, nil
  141. }
  142. // Close the connection when you're done.
  143. func (c *Client) Close() error {
  144. return c.conn.Close()
  145. }
  146. // IsHealthy returns true unless the client is belived to have
  147. // difficulty communicating to its server.
  148. //
  149. // This is useful for connection pools where we want to
  150. // non-destructively determine that a connection may be reused.
  151. func (c Client) IsHealthy() bool {
  152. healthyState := atomic.LoadUint32(&c.healthy)
  153. return healthyState == Healthy
  154. }
  155. // Send a custom request and get the response.
  156. func (c *Client) Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error) {
  157. err = c.Transmit(req)
  158. if err != nil {
  159. return
  160. }
  161. resp, _, err := getResponse(c.conn, c.hdrBuf)
  162. c.setHealthy(!gomemcached.IsFatal(err))
  163. return resp, err
  164. }
  165. // Transmit send a request, but does not wait for a response.
  166. func (c *Client) Transmit(req *gomemcached.MCRequest) error {
  167. if DefaultWriteTimeout > 0 {
  168. c.conn.(net.Conn).SetWriteDeadline(time.Now().Add(DefaultWriteTimeout))
  169. }
  170. _, err := transmitRequest(c.conn, req)
  171. // clear write deadline to avoid interference with future write operations
  172. if DefaultWriteTimeout > 0 {
  173. c.conn.(net.Conn).SetWriteDeadline(time.Time{})
  174. }
  175. if err != nil {
  176. c.setHealthy(false)
  177. }
  178. return err
  179. }
  180. func (c *Client) TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error {
  181. c.conn.(net.Conn).SetWriteDeadline(deadline)
  182. _, err := transmitRequest(c.conn, req)
  183. // clear write deadline to avoid interference with future write operations
  184. c.conn.(net.Conn).SetWriteDeadline(time.Time{})
  185. if err != nil {
  186. c.setHealthy(false)
  187. }
  188. return err
  189. }
  190. // TransmitResponse send a response, does not wait.
  191. func (c *Client) TransmitResponse(res *gomemcached.MCResponse) error {
  192. if DefaultWriteTimeout > 0 {
  193. c.conn.(net.Conn).SetWriteDeadline(time.Now().Add(DefaultWriteTimeout))
  194. }
  195. _, err := transmitResponse(c.conn, res)
  196. // clear write deadline to avoid interference with future write operations
  197. if DefaultWriteTimeout > 0 {
  198. c.conn.(net.Conn).SetWriteDeadline(time.Time{})
  199. }
  200. if err != nil {
  201. c.setHealthy(false)
  202. }
  203. return err
  204. }
  205. // Receive a response
  206. func (c *Client) Receive() (*gomemcached.MCResponse, error) {
  207. resp, _, err := getResponse(c.conn, c.hdrBuf)
  208. if err != nil && resp.Status != gomemcached.KEY_ENOENT && resp.Status != gomemcached.EBUSY {
  209. c.setHealthy(false)
  210. }
  211. return resp, err
  212. }
  213. func (c *Client) ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error) {
  214. c.conn.(net.Conn).SetReadDeadline(deadline)
  215. resp, _, err := getResponse(c.conn, c.hdrBuf)
  216. // Clear read deadline to avoid interference with future read operations.
  217. c.conn.(net.Conn).SetReadDeadline(time.Time{})
  218. if err != nil && resp.Status != gomemcached.KEY_ENOENT && resp.Status != gomemcached.EBUSY {
  219. c.setHealthy(false)
  220. }
  221. return resp, err
  222. }
  223. func appendMutationToken(bytes []byte) []byte {
  224. bytes = append(bytes, 0, 0)
  225. binary.BigEndian.PutUint16(bytes[len(bytes)-2:], uint16(0x04))
  226. return bytes
  227. }
  228. //Send a hello command to enable MutationTokens
  229. func (c *Client) EnableMutationToken() (*gomemcached.MCResponse, error) {
  230. var payload []byte
  231. payload = appendMutationToken(payload)
  232. return c.Send(&gomemcached.MCRequest{
  233. Opcode: gomemcached.HELLO,
  234. Key: []byte("GoMemcached"),
  235. Body: payload,
  236. })
  237. }
  238. //Send a hello command to enable specific features
  239. func (c *Client) EnableFeatures(features Features) (*gomemcached.MCResponse, error) {
  240. var payload []byte
  241. for _, feature := range features {
  242. payload = append(payload, 0, 0)
  243. binary.BigEndian.PutUint16(payload[len(payload)-2:], uint16(feature))
  244. }
  245. return c.Send(&gomemcached.MCRequest{
  246. Opcode: gomemcached.HELLO,
  247. Key: []byte("GoMemcached"),
  248. Body: payload,
  249. })
  250. }
  251. // Get the value for a key.
  252. func (c *Client) Get(vb uint16, key string) (*gomemcached.MCResponse, error) {
  253. return c.Send(&gomemcached.MCRequest{
  254. Opcode: gomemcached.GET,
  255. VBucket: vb,
  256. Key: []byte(key),
  257. })
  258. }
  259. // Get the value for a key from a collection, identified by collection id.
  260. func (c *Client) GetFromCollection(vb uint16, cid uint32, key string) (*gomemcached.MCResponse, error) {
  261. keyBytes := []byte(key)
  262. encodedCid := make([]byte, binary.MaxVarintLen32)
  263. lenEncodedCid := binary.PutUvarint(encodedCid, uint64(cid))
  264. encodedKey := make([]byte, 0, lenEncodedCid+len(keyBytes))
  265. encodedKey = append(encodedKey, encodedCid[0:lenEncodedCid]...)
  266. encodedKey = append(encodedKey, keyBytes...)
  267. return c.Send(&gomemcached.MCRequest{
  268. Opcode: gomemcached.GET,
  269. VBucket: vb,
  270. Key: encodedKey,
  271. })
  272. }
  273. // Get the xattrs, doc value for the input key
  274. func (c *Client) GetSubdoc(vb uint16, key string, subPaths []string) (*gomemcached.MCResponse, error) {
  275. extraBuf, valueBuf := GetSubDocVal(subPaths)
  276. res, err := c.Send(&gomemcached.MCRequest{
  277. Opcode: gomemcached.SUBDOC_MULTI_LOOKUP,
  278. VBucket: vb,
  279. Key: []byte(key),
  280. Extras: extraBuf,
  281. Body: valueBuf,
  282. })
  283. if err != nil && IfResStatusError(res) {
  284. return res, err
  285. }
  286. return res, nil
  287. }
  288. // Retrieve the collections manifest.
  289. func (c *Client) GetCollectionsManifest() (*gomemcached.MCResponse, error) {
  290. res, err := c.Send(&gomemcached.MCRequest{
  291. Opcode: gomemcached.GET_COLLECTIONS_MANIFEST,
  292. })
  293. if err != nil && IfResStatusError(res) {
  294. return res, err
  295. }
  296. return res, nil
  297. }
  298. // Retrieve the collections manifest.
  299. func (c *Client) CollectionsGetCID(scope string, collection string) (*gomemcached.MCResponse, error) {
  300. res, err := c.Send(&gomemcached.MCRequest{
  301. Opcode: gomemcached.COLLECTIONS_GET_CID,
  302. Key: []byte(scope + "." + collection),
  303. })
  304. if err != nil && IfResStatusError(res) {
  305. return res, err
  306. }
  307. return res, nil
  308. }
  309. // Get the value for a key, and update expiry
  310. func (c *Client) GetAndTouch(vb uint16, key string, exp int) (*gomemcached.MCResponse, error) {
  311. extraBuf := make([]byte, 4)
  312. binary.BigEndian.PutUint32(extraBuf[0:], uint32(exp))
  313. return c.Send(&gomemcached.MCRequest{
  314. Opcode: gomemcached.GAT,
  315. VBucket: vb,
  316. Key: []byte(key),
  317. Extras: extraBuf,
  318. })
  319. }
  320. // Get metadata for a key
  321. func (c *Client) GetMeta(vb uint16, key string) (*gomemcached.MCResponse, error) {
  322. return c.Send(&gomemcached.MCRequest{
  323. Opcode: gomemcached.GET_META,
  324. VBucket: vb,
  325. Key: []byte(key),
  326. })
  327. }
  328. // Del deletes a key.
  329. func (c *Client) Del(vb uint16, key string) (*gomemcached.MCResponse, error) {
  330. return c.Send(&gomemcached.MCRequest{
  331. Opcode: gomemcached.DELETE,
  332. VBucket: vb,
  333. Key: []byte(key)})
  334. }
  335. // Get a random document
  336. func (c *Client) GetRandomDoc() (*gomemcached.MCResponse, error) {
  337. return c.Send(&gomemcached.MCRequest{
  338. Opcode: 0xB6,
  339. })
  340. }
  341. // AuthList lists SASL auth mechanisms.
  342. func (c *Client) AuthList() (*gomemcached.MCResponse, error) {
  343. return c.Send(&gomemcached.MCRequest{
  344. Opcode: gomemcached.SASL_LIST_MECHS})
  345. }
  346. // Auth performs SASL PLAIN authentication against the server.
  347. func (c *Client) Auth(user, pass string) (*gomemcached.MCResponse, error) {
  348. res, err := c.AuthList()
  349. if err != nil {
  350. return res, err
  351. }
  352. authMech := string(res.Body)
  353. if strings.Index(authMech, "PLAIN") != -1 {
  354. return c.AuthPlain(user, pass)
  355. }
  356. return nil, fmt.Errorf("auth mechanism PLAIN not supported")
  357. }
  358. // AuthScramSha performs SCRAM-SHA authentication against the server.
  359. func (c *Client) AuthScramSha(user, pass string) (*gomemcached.MCResponse, error) {
  360. res, err := c.AuthList()
  361. if err != nil {
  362. return nil, errors.Wrap(err, "Unable to obtain list of methods.")
  363. }
  364. methods := string(res.Body)
  365. method, err := scramsha.BestMethod(methods)
  366. if err != nil {
  367. return nil, errors.Wrap(err,
  368. "Unable to select SCRAM-SHA method.")
  369. }
  370. s, err := scramsha.NewScramSha(method)
  371. if err != nil {
  372. return nil, errors.Wrap(err, "Unable to initialize scramsha.")
  373. }
  374. logging.Infof("Using %v authentication for user %v%v%v", method, gomemcached.UdTagBegin, user, gomemcached.UdTagEnd)
  375. message, err := s.GetStartRequest(user)
  376. if err != nil {
  377. return nil, errors.Wrapf(err,
  378. "Error building start request for user %s.", user)
  379. }
  380. startRequest := &gomemcached.MCRequest{
  381. Opcode: 0x21,
  382. Key: []byte(method),
  383. Body: []byte(message)}
  384. startResponse, err := c.Send(startRequest)
  385. if err != nil {
  386. return nil, errors.Wrap(err, "Error sending start request.")
  387. }
  388. err = s.HandleStartResponse(string(startResponse.Body))
  389. if err != nil {
  390. return nil, errors.Wrap(err, "Error handling start response.")
  391. }
  392. message = s.GetFinalRequest(pass)
  393. // send step request
  394. finalRequest := &gomemcached.MCRequest{
  395. Opcode: 0x22,
  396. Key: []byte(method),
  397. Body: []byte(message)}
  398. finalResponse, err := c.Send(finalRequest)
  399. if err != nil {
  400. return nil, errors.Wrap(err, "Error sending final request.")
  401. }
  402. err = s.HandleFinalResponse(string(finalResponse.Body))
  403. if err != nil {
  404. return nil, errors.Wrap(err, "Error handling final response.")
  405. }
  406. return finalResponse, nil
  407. }
  408. func (c *Client) AuthPlain(user, pass string) (*gomemcached.MCResponse, error) {
  409. logging.Infof("Using plain authentication for user %v%v%v", gomemcached.UdTagBegin, user, gomemcached.UdTagEnd)
  410. return c.Send(&gomemcached.MCRequest{
  411. Opcode: gomemcached.SASL_AUTH,
  412. Key: []byte("PLAIN"),
  413. Body: []byte(fmt.Sprintf("\x00%s\x00%s", user, pass))})
  414. }
  415. // select bucket
  416. func (c *Client) SelectBucket(bucket string) (*gomemcached.MCResponse, error) {
  417. return c.Send(&gomemcached.MCRequest{
  418. Opcode: gomemcached.SELECT_BUCKET,
  419. Key: []byte(bucket)})
  420. }
  421. func (c *Client) store(opcode gomemcached.CommandCode, vb uint16,
  422. key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error) {
  423. req := &gomemcached.MCRequest{
  424. Opcode: opcode,
  425. VBucket: vb,
  426. Key: []byte(key),
  427. Cas: 0,
  428. Opaque: 0,
  429. Extras: []byte{0, 0, 0, 0, 0, 0, 0, 0},
  430. Body: body}
  431. binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp))
  432. return c.Send(req)
  433. }
  434. func (c *Client) storeCas(opcode gomemcached.CommandCode, vb uint16,
  435. key string, flags int, exp int, cas uint64, body []byte) (*gomemcached.MCResponse, error) {
  436. req := &gomemcached.MCRequest{
  437. Opcode: opcode,
  438. VBucket: vb,
  439. Key: []byte(key),
  440. Cas: cas,
  441. Opaque: 0,
  442. Extras: []byte{0, 0, 0, 0, 0, 0, 0, 0},
  443. Body: body}
  444. binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp))
  445. return c.Send(req)
  446. }
  447. // Incr increments the value at the given key.
  448. func (c *Client) Incr(vb uint16, key string,
  449. amt, def uint64, exp int) (uint64, error) {
  450. req := &gomemcached.MCRequest{
  451. Opcode: gomemcached.INCREMENT,
  452. VBucket: vb,
  453. Key: []byte(key),
  454. Extras: make([]byte, 8+8+4),
  455. }
  456. binary.BigEndian.PutUint64(req.Extras[:8], amt)
  457. binary.BigEndian.PutUint64(req.Extras[8:16], def)
  458. binary.BigEndian.PutUint32(req.Extras[16:20], uint32(exp))
  459. resp, err := c.Send(req)
  460. if err != nil {
  461. return 0, err
  462. }
  463. return binary.BigEndian.Uint64(resp.Body), nil
  464. }
  465. // Decr decrements the value at the given key.
  466. func (c *Client) Decr(vb uint16, key string,
  467. amt, def uint64, exp int) (uint64, error) {
  468. req := &gomemcached.MCRequest{
  469. Opcode: gomemcached.DECREMENT,
  470. VBucket: vb,
  471. Key: []byte(key),
  472. Extras: make([]byte, 8+8+4),
  473. }
  474. binary.BigEndian.PutUint64(req.Extras[:8], amt)
  475. binary.BigEndian.PutUint64(req.Extras[8:16], def)
  476. binary.BigEndian.PutUint32(req.Extras[16:20], uint32(exp))
  477. resp, err := c.Send(req)
  478. if err != nil {
  479. return 0, err
  480. }
  481. return binary.BigEndian.Uint64(resp.Body), nil
  482. }
  483. // Add a value for a key (store if not exists).
  484. func (c *Client) Add(vb uint16, key string, flags int, exp int,
  485. body []byte) (*gomemcached.MCResponse, error) {
  486. return c.store(gomemcached.ADD, vb, key, flags, exp, body)
  487. }
  488. // Set the value for a key.
  489. func (c *Client) Set(vb uint16, key string, flags int, exp int,
  490. body []byte) (*gomemcached.MCResponse, error) {
  491. return c.store(gomemcached.SET, vb, key, flags, exp, body)
  492. }
  493. // SetCas set the value for a key with cas
  494. func (c *Client) SetCas(vb uint16, key string, flags int, exp int, cas uint64,
  495. body []byte) (*gomemcached.MCResponse, error) {
  496. return c.storeCas(gomemcached.SET, vb, key, flags, exp, cas, body)
  497. }
  498. // Append data to the value of a key.
  499. func (c *Client) Append(vb uint16, key string, data []byte) (*gomemcached.MCResponse, error) {
  500. req := &gomemcached.MCRequest{
  501. Opcode: gomemcached.APPEND,
  502. VBucket: vb,
  503. Key: []byte(key),
  504. Cas: 0,
  505. Opaque: 0,
  506. Body: data}
  507. return c.Send(req)
  508. }
  509. // GetBulk gets keys in bulk
  510. func (c *Client) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string) error {
  511. stopch := make(chan bool)
  512. var wg sync.WaitGroup
  513. defer func() {
  514. close(stopch)
  515. wg.Wait()
  516. }()
  517. if (math.MaxInt32 - c.opaque) < (uint32(len(keys)) + 1) {
  518. c.opaque = uint32(1)
  519. }
  520. opStart := c.opaque
  521. errch := make(chan error, 2)
  522. wg.Add(1)
  523. go func() {
  524. defer func() {
  525. if r := recover(); r != nil {
  526. logging.Infof("Recovered in f %v", r)
  527. }
  528. errch <- nil
  529. wg.Done()
  530. }()
  531. ok := true
  532. for ok {
  533. select {
  534. case <-stopch:
  535. return
  536. default:
  537. res, err := c.Receive()
  538. if err != nil && IfResStatusError(res) {
  539. if res == nil || res.Status != gomemcached.KEY_ENOENT {
  540. errch <- err
  541. return
  542. }
  543. // continue receiving in case of KEY_ENOENT
  544. } else if res.Opcode == gomemcached.GET ||
  545. res.Opcode == gomemcached.SUBDOC_GET ||
  546. res.Opcode == gomemcached.SUBDOC_MULTI_LOOKUP {
  547. opaque := res.Opaque - opStart
  548. if opaque < 0 || opaque >= uint32(len(keys)) {
  549. // Every now and then we seem to be seeing an invalid opaque
  550. // value returned from the server. When this happens log the error
  551. // and the calling function will retry the bulkGet. MB-15140
  552. logging.Errorf(" Invalid opaque Value. Debug info : Res.opaque : %v(%v), Keys %v, Response received %v \n key list %v this key %v", res.Opaque, opaque, len(keys), res, keys, string(res.Body))
  553. errch <- fmt.Errorf("Out of Bounds error")
  554. return
  555. }
  556. rv[keys[opaque]] = res
  557. }
  558. if res.Opcode == gomemcached.NOOP {
  559. ok = false
  560. }
  561. }
  562. }
  563. }()
  564. memcachedReqPkt := &gomemcached.MCRequest{
  565. Opcode: gomemcached.GET,
  566. VBucket: vb,
  567. }
  568. if len(subPaths) > 0 {
  569. extraBuf, valueBuf := GetSubDocVal(subPaths)
  570. memcachedReqPkt.Opcode = gomemcached.SUBDOC_MULTI_LOOKUP
  571. memcachedReqPkt.Extras = extraBuf
  572. memcachedReqPkt.Body = valueBuf
  573. }
  574. for _, k := range keys { // Start of Get request
  575. memcachedReqPkt.Key = []byte(k)
  576. memcachedReqPkt.Opaque = c.opaque
  577. err := c.Transmit(memcachedReqPkt)
  578. if err != nil {
  579. logging.Errorf(" Transmit failed in GetBulkAll %v", err)
  580. return err
  581. }
  582. c.opaque++
  583. } // End of Get request
  584. // finally transmit a NOOP
  585. err := c.Transmit(&gomemcached.MCRequest{
  586. Opcode: gomemcached.NOOP,
  587. VBucket: vb,
  588. Opaque: c.opaque,
  589. })
  590. if err != nil {
  591. logging.Errorf(" Transmit of NOOP failed %v", err)
  592. return err
  593. }
  594. c.opaque++
  595. return <-errch
  596. }
  597. func GetSubDocVal(subPaths []string) (extraBuf, valueBuf []byte) {
  598. var ops []string
  599. totalBytesLen := 0
  600. num := 1
  601. for _, v := range subPaths {
  602. totalBytesLen = totalBytesLen + len([]byte(v))
  603. ops = append(ops, v)
  604. num = num + 1
  605. }
  606. // Xattr retrieval - subdoc multi get
  607. extraBuf = append(extraBuf, uint8(0x04))
  608. valueBuf = make([]byte, num*4+totalBytesLen)
  609. //opcode for subdoc get
  610. op := gomemcached.SUBDOC_GET
  611. // Calculate path total bytes
  612. // There are 2 ops - get xattrs - both input and $document and get whole doc
  613. valIter := 0
  614. for _, v := range ops {
  615. pathBytes := []byte(v)
  616. valueBuf[valIter+0] = uint8(op)
  617. // SubdocFlagXattrPath indicates that the path refers to
  618. // an Xattr rather than the document body.
  619. valueBuf[valIter+1] = uint8(gomemcached.SUBDOC_FLAG_XATTR)
  620. // 2 byte key
  621. binary.BigEndian.PutUint16(valueBuf[valIter+2:], uint16(len(pathBytes)))
  622. // Then n bytes path
  623. copy(valueBuf[valIter+4:], pathBytes)
  624. valIter = valIter + 4 + len(pathBytes)
  625. }
  626. return
  627. }
  628. // ObservedStatus is the type reported by the Observe method
  629. type ObservedStatus uint8
  630. // Observation status values.
  631. const (
  632. ObservedNotPersisted = ObservedStatus(0x00) // found, not persisted
  633. ObservedPersisted = ObservedStatus(0x01) // found, persisted
  634. ObservedNotFound = ObservedStatus(0x80) // not found (or a persisted delete)
  635. ObservedLogicallyDeleted = ObservedStatus(0x81) // pending deletion (not persisted yet)
  636. )
  637. // ObserveResult represents the data obtained by an Observe call
  638. type ObserveResult struct {
  639. Status ObservedStatus // Whether the value has been persisted/deleted
  640. Cas uint64 // Current value's CAS
  641. PersistenceTime time.Duration // Node's average time to persist a value
  642. ReplicationTime time.Duration // Node's average time to replicate a value
  643. }
  644. // Observe gets the persistence/replication/CAS state of a key
  645. func (c *Client) Observe(vb uint16, key string) (result ObserveResult, err error) {
  646. // http://www.couchbase.com/wiki/display/couchbase/Observe
  647. body := make([]byte, 4+len(key))
  648. binary.BigEndian.PutUint16(body[0:2], vb)
  649. binary.BigEndian.PutUint16(body[2:4], uint16(len(key)))
  650. copy(body[4:4+len(key)], key)
  651. res, err := c.Send(&gomemcached.MCRequest{
  652. Opcode: gomemcached.OBSERVE,
  653. VBucket: vb,
  654. Body: body,
  655. })
  656. if err != nil {
  657. return
  658. }
  659. // Parse the response data from the body:
  660. if len(res.Body) < 2+2+1 {
  661. err = io.ErrUnexpectedEOF
  662. return
  663. }
  664. outVb := binary.BigEndian.Uint16(res.Body[0:2])
  665. keyLen := binary.BigEndian.Uint16(res.Body[2:4])
  666. if len(res.Body) < 2+2+int(keyLen)+1+8 {
  667. err = io.ErrUnexpectedEOF
  668. return
  669. }
  670. outKey := string(res.Body[4 : 4+keyLen])
  671. if outVb != vb || outKey != key {
  672. err = fmt.Errorf("observe returned wrong vbucket/key: %d/%q", outVb, outKey)
  673. return
  674. }
  675. result.Status = ObservedStatus(res.Body[4+keyLen])
  676. result.Cas = binary.BigEndian.Uint64(res.Body[5+keyLen:])
  677. // The response reuses the Cas field to store time statistics:
  678. result.PersistenceTime = time.Duration(res.Cas>>32) * time.Millisecond
  679. result.ReplicationTime = time.Duration(res.Cas&math.MaxUint32) * time.Millisecond
  680. return
  681. }
  682. // CheckPersistence checks whether a stored value has been persisted to disk yet.
  683. func (result ObserveResult) CheckPersistence(cas uint64, deletion bool) (persisted bool, overwritten bool) {
  684. switch {
  685. case result.Status == ObservedNotFound && deletion:
  686. persisted = true
  687. case result.Cas != cas:
  688. overwritten = true
  689. case result.Status == ObservedPersisted:
  690. persisted = true
  691. }
  692. return
  693. }
  694. // Sequence number based Observe Implementation
  695. type ObserveSeqResult struct {
  696. Failover uint8 // Set to 1 if a failover took place
  697. VbId uint16 // vbucket id
  698. Vbuuid uint64 // vucket uuid
  699. LastPersistedSeqNo uint64 // last persisted sequence number
  700. CurrentSeqNo uint64 // current sequence number
  701. OldVbuuid uint64 // Old bucket vbuuid
  702. LastSeqNo uint64 // last sequence number received before failover
  703. }
  704. func (c *Client) ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error) {
  705. // http://www.couchbase.com/wiki/display/couchbase/Observe
  706. body := make([]byte, 8)
  707. binary.BigEndian.PutUint64(body[0:8], vbuuid)
  708. res, err := c.Send(&gomemcached.MCRequest{
  709. Opcode: gomemcached.OBSERVE_SEQNO,
  710. VBucket: vb,
  711. Body: body,
  712. Opaque: 0x01,
  713. })
  714. if err != nil {
  715. return
  716. }
  717. if res.Status != gomemcached.SUCCESS {
  718. return nil, fmt.Errorf(" Observe returned error %v", res.Status)
  719. }
  720. // Parse the response data from the body:
  721. if len(res.Body) < (1 + 2 + 8 + 8 + 8) {
  722. err = io.ErrUnexpectedEOF
  723. return
  724. }
  725. result = &ObserveSeqResult{}
  726. result.Failover = res.Body[0]
  727. result.VbId = binary.BigEndian.Uint16(res.Body[1:3])
  728. result.Vbuuid = binary.BigEndian.Uint64(res.Body[3:11])
  729. result.LastPersistedSeqNo = binary.BigEndian.Uint64(res.Body[11:19])
  730. result.CurrentSeqNo = binary.BigEndian.Uint64(res.Body[19:27])
  731. // in case of failover processing we can have old vbuuid and the last persisted seq number
  732. if result.Failover == 1 && len(res.Body) >= (1+2+8+8+8+8+8) {
  733. result.OldVbuuid = binary.BigEndian.Uint64(res.Body[27:35])
  734. result.LastSeqNo = binary.BigEndian.Uint64(res.Body[35:43])
  735. }
  736. return
  737. }
  738. // CasOp is the type of operation to perform on this CAS loop.
  739. type CasOp uint8
  740. const (
  741. // CASStore instructs the server to store the new value normally
  742. CASStore = CasOp(iota)
  743. // CASQuit instructs the client to stop attempting to CAS, leaving value untouched
  744. CASQuit
  745. // CASDelete instructs the server to delete the current value
  746. CASDelete
  747. )
  748. // User specified termination is returned as an error.
  749. func (c CasOp) Error() string {
  750. switch c {
  751. case CASStore:
  752. return "CAS store"
  753. case CASQuit:
  754. return "CAS quit"
  755. case CASDelete:
  756. return "CAS delete"
  757. }
  758. panic("Unhandled value")
  759. }
  760. //////// CAS TRANSFORM
  761. // CASState tracks the state of CAS over several operations.
  762. //
  763. // This is used directly by CASNext and indirectly by CAS
  764. type CASState struct {
  765. initialized bool // false on the first call to CASNext, then true
  766. Value []byte // Current value of key; update in place to new value
  767. Cas uint64 // Current CAS value of key
  768. Exists bool // Does a value exist for the key? (If not, Value will be nil)
  769. Err error // Error, if any, after CASNext returns false
  770. resp *gomemcached.MCResponse
  771. }
  772. // CASNext is a non-callback, loop-based version of CAS method.
  773. //
  774. // Usage is like this:
  775. //
  776. // var state memcached.CASState
  777. // for client.CASNext(vb, key, exp, &state) {
  778. // state.Value = some_mutation(state.Value)
  779. // }
  780. // if state.Err != nil { ... }
  781. func (c *Client) CASNext(vb uint16, k string, exp int, state *CASState) bool {
  782. if state.initialized {
  783. if !state.Exists {
  784. // Adding a new key:
  785. if state.Value == nil {
  786. state.Cas = 0
  787. return false // no-op (delete of non-existent value)
  788. }
  789. state.resp, state.Err = c.Add(vb, k, 0, exp, state.Value)
  790. } else {
  791. // Updating / deleting a key:
  792. req := &gomemcached.MCRequest{
  793. Opcode: gomemcached.DELETE,
  794. VBucket: vb,
  795. Key: []byte(k),
  796. Cas: state.Cas}
  797. if state.Value != nil {
  798. req.Opcode = gomemcached.SET
  799. req.Opaque = 0
  800. req.Extras = []byte{0, 0, 0, 0, 0, 0, 0, 0}
  801. req.Body = state.Value
  802. flags := 0
  803. binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp))
  804. }
  805. state.resp, state.Err = c.Send(req)
  806. }
  807. // If the response status is KEY_EEXISTS or NOT_STORED there's a conflict and we'll need to
  808. // get the new value (below). Otherwise, we're done (either success or failure) so return:
  809. if !(state.resp != nil && (state.resp.Status == gomemcached.KEY_EEXISTS ||
  810. state.resp.Status == gomemcached.NOT_STORED)) {
  811. state.Cas = state.resp.Cas
  812. return false // either success or fatal error
  813. }
  814. }
  815. // Initial call, or after a conflict: GET the current value and CAS and return them:
  816. state.initialized = true
  817. if state.resp, state.Err = c.Get(vb, k); state.Err == nil {
  818. state.Exists = true
  819. state.Value = state.resp.Body
  820. state.Cas = state.resp.Cas
  821. } else if state.resp != nil && state.resp.Status == gomemcached.KEY_ENOENT {
  822. state.Err = nil
  823. state.Exists = false
  824. state.Value = nil
  825. state.Cas = 0
  826. } else {
  827. return false // fatal error
  828. }
  829. return true // keep going...
  830. }
  831. // CasFunc is type type of function to perform a CAS transform.
  832. //
  833. // Input is the current value, or nil if no value exists.
  834. // The function should return the new value (if any) to set, and the store/quit/delete operation.
  835. type CasFunc func(current []byte) ([]byte, CasOp)
  836. // CAS performs a CAS transform with the given function.
  837. //
  838. // If the value does not exist, a nil current value will be sent to f.
  839. func (c *Client) CAS(vb uint16, k string, f CasFunc,
  840. initexp int) (*gomemcached.MCResponse, error) {
  841. var state CASState
  842. for c.CASNext(vb, k, initexp, &state) {
  843. newValue, operation := f(state.Value)
  844. if operation == CASQuit || (operation == CASDelete && state.Value == nil) {
  845. return nil, operation
  846. }
  847. state.Value = newValue
  848. }
  849. return state.resp, state.Err
  850. }
  851. // StatValue is one of the stats returned from the Stats method.
  852. type StatValue struct {
  853. // The stat key
  854. Key string
  855. // The stat value
  856. Val string
  857. }
  858. // Stats requests server-side stats.
  859. //
  860. // Use "" as the stat key for toplevel stats.
  861. func (c *Client) Stats(key string) ([]StatValue, error) {
  862. rv := make([]StatValue, 0, 128)
  863. req := &gomemcached.MCRequest{
  864. Opcode: gomemcached.STAT,
  865. Key: []byte(key),
  866. Opaque: 918494,
  867. }
  868. err := c.Transmit(req)
  869. if err != nil {
  870. return rv, err
  871. }
  872. for {
  873. res, _, err := getResponse(c.conn, c.hdrBuf)
  874. if err != nil {
  875. return rv, err
  876. }
  877. k := string(res.Key)
  878. if k == "" {
  879. break
  880. }
  881. rv = append(rv, StatValue{
  882. Key: k,
  883. Val: string(res.Body),
  884. })
  885. }
  886. return rv, nil
  887. }
  888. // StatsMap requests server-side stats similarly to Stats, but returns
  889. // them as a map.
  890. //
  891. // Use "" as the stat key for toplevel stats.
  892. func (c *Client) StatsMap(key string) (map[string]string, error) {
  893. rv := make(map[string]string)
  894. req := &gomemcached.MCRequest{
  895. Opcode: gomemcached.STAT,
  896. Key: []byte(key),
  897. Opaque: 918494,
  898. }
  899. err := c.Transmit(req)
  900. if err != nil {
  901. return rv, err
  902. }
  903. for {
  904. res, _, err := getResponse(c.conn, c.hdrBuf)
  905. if err != nil {
  906. return rv, err
  907. }
  908. k := string(res.Key)
  909. if k == "" {
  910. break
  911. }
  912. rv[k] = string(res.Body)
  913. }
  914. return rv, nil
  915. }
  916. // instead of returning a new statsMap, simply populate passed in statsMap, which contains all the keys
  917. // for which stats needs to be retrieved
  918. func (c *Client) StatsMapForSpecifiedStats(key string, statsMap map[string]string) error {
  919. // clear statsMap
  920. for key, _ := range statsMap {
  921. statsMap[key] = ""
  922. }
  923. req := &gomemcached.MCRequest{
  924. Opcode: gomemcached.STAT,
  925. Key: []byte(key),
  926. Opaque: 918494,
  927. }
  928. err := c.Transmit(req)
  929. if err != nil {
  930. return err
  931. }
  932. for {
  933. res, _, err := getResponse(c.conn, c.hdrBuf)
  934. if err != nil {
  935. return err
  936. }
  937. k := string(res.Key)
  938. if k == "" {
  939. break
  940. }
  941. if _, ok := statsMap[k]; ok {
  942. statsMap[k] = string(res.Body)
  943. }
  944. }
  945. return nil
  946. }
  947. // Hijack exposes the underlying connection from this client.
  948. //
  949. // It also marks the connection as unhealthy since the client will
  950. // have lost control over the connection and can't otherwise verify
  951. // things are in good shape for connection pools.
  952. func (c *Client) Hijack() io.ReadWriteCloser {
  953. c.setHealthy(false)
  954. return c.conn
  955. }
  956. func (c *Client) setHealthy(healthy bool) {
  957. healthyState := UnHealthy
  958. if healthy {
  959. healthyState = Healthy
  960. }
  961. atomic.StoreUint32(&c.healthy, healthyState)
  962. }
  963. func IfResStatusError(response *gomemcached.MCResponse) bool {
  964. return response == nil ||
  965. (response.Status != gomemcached.SUBDOC_BAD_MULTI &&
  966. response.Status != gomemcached.SUBDOC_PATH_NOT_FOUND &&
  967. response.Status != gomemcached.SUBDOC_MULTI_PATH_FAILURE_DELETED)
  968. }
上海开阖软件有限公司 沪ICP备12045867号-1