本站源代码
選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

1475 行
37KB

  1. package couchbase
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/tls"
  6. "crypto/x509"
  7. "encoding/base64"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "io/ioutil"
  13. "math/rand"
  14. "net/http"
  15. "net/url"
  16. "runtime"
  17. "sort"
  18. "strconv"
  19. "strings"
  20. "sync"
  21. "time"
  22. "unsafe"
  23. "github.com/couchbase/goutils/logging"
  24. "github.com/couchbase/gomemcached" // package name is 'gomemcached'
  25. "github.com/couchbase/gomemcached/client" // package name is 'memcached'
  26. )
  27. // HTTPClient to use for REST and view operations.
  28. var MaxIdleConnsPerHost = 256
  29. var ClientTimeOut = 10 * time.Second
  30. var HTTPTransport = &http.Transport{MaxIdleConnsPerHost: MaxIdleConnsPerHost}
  31. var HTTPClient = &http.Client{Transport: HTTPTransport, Timeout: ClientTimeOut}
  32. // PoolSize is the size of each connection pool (per host).
  33. var PoolSize = 64
  34. // PoolOverflow is the number of overflow connections allowed in a
  35. // pool.
  36. var PoolOverflow = 16
  37. // AsynchronousCloser turns on asynchronous closing for overflow connections
  38. var AsynchronousCloser = false
  39. // TCP KeepAlive enabled/disabled
  40. var TCPKeepalive = false
  41. // Enable MutationToken
  42. var EnableMutationToken = false
  43. // Enable Data Type response
  44. var EnableDataType = false
  45. // Enable Xattr
  46. var EnableXattr = false
  47. // Enable Collections
  48. var EnableCollections = false
  49. // TCP keepalive interval in seconds. Default 30 minutes
  50. var TCPKeepaliveInterval = 30 * 60
  51. // Used to decide whether to skip verification of certificates when
  52. // connecting to an ssl port.
  53. var skipVerify = true
  54. var certFile = ""
  55. var keyFile = ""
  56. var rootFile = ""
  57. func SetSkipVerify(skip bool) {
  58. skipVerify = skip
  59. }
  60. func SetCertFile(cert string) {
  61. certFile = cert
  62. }
  63. func SetKeyFile(cert string) {
  64. keyFile = cert
  65. }
  66. func SetRootFile(cert string) {
  67. rootFile = cert
  68. }
  69. // Allow applications to speciify the Poolsize and Overflow
  70. func SetConnectionPoolParams(size, overflow int) {
  71. if size > 0 {
  72. PoolSize = size
  73. }
  74. if overflow > 0 {
  75. PoolOverflow = overflow
  76. }
  77. }
  78. // Turn off overflow connections
  79. func DisableOverflowConnections() {
  80. PoolOverflow = 0
  81. }
  82. // Toggle asynchronous overflow closer
  83. func EnableAsynchronousCloser(closer bool) {
  84. AsynchronousCloser = closer
  85. }
  86. // Allow TCP keepalive parameters to be set by the application
  87. func SetTcpKeepalive(enabled bool, interval int) {
  88. TCPKeepalive = enabled
  89. if interval > 0 {
  90. TCPKeepaliveInterval = interval
  91. }
  92. }
  93. // AuthHandler is a callback that gets the auth username and password
  94. // for the given bucket.
  95. type AuthHandler interface {
  96. GetCredentials() (string, string, string)
  97. }
  98. // AuthHandler is a callback that gets the auth username and password
  99. // for the given bucket and sasl for memcached.
  100. type AuthWithSaslHandler interface {
  101. AuthHandler
  102. GetSaslCredentials() (string, string)
  103. }
  104. // MultiBucketAuthHandler is kind of AuthHandler that may perform
  105. // different auth for different buckets.
  106. type MultiBucketAuthHandler interface {
  107. AuthHandler
  108. ForBucket(bucket string) AuthHandler
  109. }
  110. // HTTPAuthHandler is kind of AuthHandler that performs more general
  111. // for outgoing http requests than is possible via simple
  112. // GetCredentials() call (i.e. digest auth or different auth per
  113. // different destinations).
  114. type HTTPAuthHandler interface {
  115. AuthHandler
  116. SetCredsForRequest(req *http.Request) error
  117. }
  118. // RestPool represents a single pool returned from the pools REST API.
  119. type RestPool struct {
  120. Name string `json:"name"`
  121. StreamingURI string `json:"streamingUri"`
  122. URI string `json:"uri"`
  123. }
  124. // Pools represents the collection of pools as returned from the REST API.
  125. type Pools struct {
  126. ComponentsVersion map[string]string `json:"componentsVersion,omitempty"`
  127. ImplementationVersion string `json:"implementationVersion"`
  128. IsAdmin bool `json:"isAdminCreds"`
  129. UUID string `json:"uuid"`
  130. Pools []RestPool `json:"pools"`
  131. }
  132. // A Node is a computer in a cluster running the couchbase software.
  133. type Node struct {
  134. ClusterCompatibility int `json:"clusterCompatibility"`
  135. ClusterMembership string `json:"clusterMembership"`
  136. CouchAPIBase string `json:"couchApiBase"`
  137. Hostname string `json:"hostname"`
  138. InterestingStats map[string]float64 `json:"interestingStats,omitempty"`
  139. MCDMemoryAllocated float64 `json:"mcdMemoryAllocated"`
  140. MCDMemoryReserved float64 `json:"mcdMemoryReserved"`
  141. MemoryFree float64 `json:"memoryFree"`
  142. MemoryTotal float64 `json:"memoryTotal"`
  143. OS string `json:"os"`
  144. Ports map[string]int `json:"ports"`
  145. Services []string `json:"services"`
  146. Status string `json:"status"`
  147. Uptime int `json:"uptime,string"`
  148. Version string `json:"version"`
  149. ThisNode bool `json:"thisNode,omitempty"`
  150. }
  151. // A Pool of nodes and buckets.
  152. type Pool struct {
  153. BucketMap map[string]*Bucket
  154. Nodes []Node
  155. BucketURL map[string]string `json:"buckets"`
  156. client *Client
  157. }
  158. // VBucketServerMap is the a mapping of vbuckets to nodes.
  159. type VBucketServerMap struct {
  160. HashAlgorithm string `json:"hashAlgorithm"`
  161. NumReplicas int `json:"numReplicas"`
  162. ServerList []string `json:"serverList"`
  163. VBucketMap [][]int `json:"vBucketMap"`
  164. }
  165. type DurablitySettings struct {
  166. Persist PersistTo
  167. Observe ObserveTo
  168. }
  169. // Bucket is the primary entry point for most data operations.
  170. // Bucket is a locked data structure. All access to its fields should be done using read or write locking,
  171. // as appropriate.
  172. //
  173. // Some access methods require locking, but rely on the caller to do so. These are appropriate
  174. // for calls from methods that have already locked the structure. Methods like this
  175. // take a boolean parameter "bucketLocked".
  176. type Bucket struct {
  177. sync.RWMutex
  178. AuthType string `json:"authType"`
  179. Capabilities []string `json:"bucketCapabilities"`
  180. CapabilitiesVersion string `json:"bucketCapabilitiesVer"`
  181. Type string `json:"bucketType"`
  182. Name string `json:"name"`
  183. NodeLocator string `json:"nodeLocator"`
  184. Quota map[string]float64 `json:"quota,omitempty"`
  185. Replicas int `json:"replicaNumber"`
  186. Password string `json:"saslPassword"`
  187. URI string `json:"uri"`
  188. StreamingURI string `json:"streamingUri"`
  189. LocalRandomKeyURI string `json:"localRandomKeyUri,omitempty"`
  190. UUID string `json:"uuid"`
  191. ConflictResolutionType string `json:"conflictResolutionType,omitempty"`
  192. DDocs struct {
  193. URI string `json:"uri"`
  194. } `json:"ddocs,omitempty"`
  195. BasicStats map[string]interface{} `json:"basicStats,omitempty"`
  196. Controllers map[string]interface{} `json:"controllers,omitempty"`
  197. // These are used for JSON IO, but isn't used for processing
  198. // since it needs to be swapped out safely.
  199. VBSMJson VBucketServerMap `json:"vBucketServerMap"`
  200. NodesJSON []Node `json:"nodes"`
  201. pool *Pool
  202. connPools unsafe.Pointer // *[]*connectionPool
  203. vBucketServerMap unsafe.Pointer // *VBucketServerMap
  204. nodeList unsafe.Pointer // *[]Node
  205. commonSufix string
  206. ah AuthHandler // auth handler
  207. ds *DurablitySettings // Durablity Settings for this bucket
  208. closed bool
  209. }
  210. // PoolServices is all the bucket-independent services in a pool
  211. type PoolServices struct {
  212. Rev int `json:"rev"`
  213. NodesExt []NodeServices `json:"nodesExt"`
  214. Capabilities json.RawMessage `json:"clusterCapabilities"`
  215. }
  216. // NodeServices is all the bucket-independent services running on
  217. // a node (given by Hostname)
  218. type NodeServices struct {
  219. Services map[string]int `json:"services,omitempty"`
  220. Hostname string `json:"hostname"`
  221. ThisNode bool `json:"thisNode"`
  222. }
  223. type BucketNotFoundError struct {
  224. bucket string
  225. }
  226. func (e *BucketNotFoundError) Error() string {
  227. return fmt.Sprint("No bucket named " + e.bucket)
  228. }
  229. type BucketAuth struct {
  230. name string
  231. saslPwd string
  232. bucket string
  233. }
  234. func newBucketAuth(name string, pass string, bucket string) *BucketAuth {
  235. return &BucketAuth{name: name, saslPwd: pass, bucket: bucket}
  236. }
  237. func (ba *BucketAuth) GetCredentials() (string, string, string) {
  238. return ba.name, ba.saslPwd, ba.bucket
  239. }
  240. // VBServerMap returns the current VBucketServerMap.
  241. func (b *Bucket) VBServerMap() *VBucketServerMap {
  242. b.RLock()
  243. defer b.RUnlock()
  244. ret := (*VBucketServerMap)(b.vBucketServerMap)
  245. return ret
  246. }
  247. func (b *Bucket) GetVBmap(addrs []string) (map[string][]uint16, error) {
  248. vbmap := b.VBServerMap()
  249. servers := vbmap.ServerList
  250. if addrs == nil {
  251. addrs = vbmap.ServerList
  252. }
  253. m := make(map[string][]uint16)
  254. for _, addr := range addrs {
  255. m[addr] = make([]uint16, 0)
  256. }
  257. for vbno, idxs := range vbmap.VBucketMap {
  258. if len(idxs) == 0 {
  259. return nil, fmt.Errorf("vbmap: No KV node no for vb %d", vbno)
  260. } else if idxs[0] < 0 || idxs[0] >= len(servers) {
  261. return nil, fmt.Errorf("vbmap: Invalid KV node no %d for vb %d", idxs[0], vbno)
  262. }
  263. addr := servers[idxs[0]]
  264. if _, ok := m[addr]; ok {
  265. m[addr] = append(m[addr], uint16(vbno))
  266. }
  267. }
  268. return m, nil
  269. }
  270. // true if node is not on the bucket VBmap
  271. func (b *Bucket) checkVBmap(node string) bool {
  272. vbmap := b.VBServerMap()
  273. servers := vbmap.ServerList
  274. for _, idxs := range vbmap.VBucketMap {
  275. if len(idxs) == 0 {
  276. return true
  277. } else if idxs[0] < 0 || idxs[0] >= len(servers) {
  278. return true
  279. }
  280. if servers[idxs[0]] == node {
  281. return false
  282. }
  283. }
  284. return true
  285. }
  286. func (b *Bucket) GetName() string {
  287. b.RLock()
  288. defer b.RUnlock()
  289. ret := b.Name
  290. return ret
  291. }
  292. // Nodes returns the current list of nodes servicing this bucket.
  293. func (b *Bucket) Nodes() []Node {
  294. b.RLock()
  295. defer b.RUnlock()
  296. ret := *(*[]Node)(b.nodeList)
  297. return ret
  298. }
  299. // return the list of healthy nodes
  300. func (b *Bucket) HealthyNodes() []Node {
  301. nodes := []Node{}
  302. for _, n := range b.Nodes() {
  303. if n.Status == "healthy" && n.CouchAPIBase != "" {
  304. nodes = append(nodes, n)
  305. }
  306. if n.Status != "healthy" { // log non-healthy node
  307. logging.Infof("Non-healthy node; node details:")
  308. logging.Infof("Hostname=%v, Status=%v, CouchAPIBase=%v, ThisNode=%v", n.Hostname, n.Status, n.CouchAPIBase, n.ThisNode)
  309. }
  310. }
  311. return nodes
  312. }
  313. func (b *Bucket) getConnPools(bucketLocked bool) []*connectionPool {
  314. if !bucketLocked {
  315. b.RLock()
  316. defer b.RUnlock()
  317. }
  318. if b.connPools != nil {
  319. return *(*[]*connectionPool)(b.connPools)
  320. } else {
  321. return nil
  322. }
  323. }
  324. func (b *Bucket) replaceConnPools(with []*connectionPool) {
  325. b.Lock()
  326. defer b.Unlock()
  327. old := b.connPools
  328. b.connPools = unsafe.Pointer(&with)
  329. if old != nil {
  330. for _, pool := range *(*[]*connectionPool)(old) {
  331. if pool != nil {
  332. pool.Close()
  333. }
  334. }
  335. }
  336. return
  337. }
  338. func (b *Bucket) getConnPool(i int) *connectionPool {
  339. if i < 0 {
  340. return nil
  341. }
  342. p := b.getConnPools(false /* not already locked */)
  343. if len(p) > i {
  344. return p[i]
  345. }
  346. return nil
  347. }
  348. func (b *Bucket) getConnPoolByHost(host string, bucketLocked bool) *connectionPool {
  349. pools := b.getConnPools(bucketLocked)
  350. for _, p := range pools {
  351. if p != nil && p.host == host {
  352. return p
  353. }
  354. }
  355. return nil
  356. }
  357. // Given a vbucket number, returns a memcached connection to it.
  358. // The connection must be returned to its pool after use.
  359. func (b *Bucket) getConnectionToVBucket(vb uint32) (*memcached.Client, *connectionPool, error) {
  360. for {
  361. vbm := b.VBServerMap()
  362. if len(vbm.VBucketMap) < int(vb) {
  363. return nil, nil, fmt.Errorf("go-couchbase: vbmap smaller than vbucket list: %v vs. %v",
  364. vb, vbm.VBucketMap)
  365. }
  366. masterId := vbm.VBucketMap[vb][0]
  367. if masterId < 0 {
  368. return nil, nil, fmt.Errorf("go-couchbase: No master for vbucket %d", vb)
  369. }
  370. pool := b.getConnPool(masterId)
  371. conn, err := pool.Get()
  372. if err != errClosedPool {
  373. return conn, pool, err
  374. }
  375. // If conn pool was closed, because another goroutine refreshed the vbucket map, retry...
  376. }
  377. }
  378. // To get random documents, we need to cover all the nodes, so select
  379. // a connection at random.
  380. func (b *Bucket) getRandomConnection() (*memcached.Client, *connectionPool, error) {
  381. for {
  382. var currentPool = 0
  383. pools := b.getConnPools(false /* not already locked */)
  384. if len(pools) == 0 {
  385. return nil, nil, fmt.Errorf("No connection pool found")
  386. } else if len(pools) > 1 { // choose a random connection
  387. currentPool = rand.Intn(len(pools))
  388. } // if only one pool, currentPool defaults to 0, i.e., the only pool
  389. // get the pool
  390. pool := pools[currentPool]
  391. conn, err := pool.Get()
  392. if err != errClosedPool {
  393. return conn, pool, err
  394. }
  395. // If conn pool was closed, because another goroutine refreshed the vbucket map, retry...
  396. }
  397. }
  398. //
  399. // Get a random document from a bucket. Since the bucket may be distributed
  400. // across nodes, we must first select a random connection, and then use the
  401. // Client.GetRandomDoc() call to get a random document from that node.
  402. //
  403. func (b *Bucket) GetRandomDoc() (*gomemcached.MCResponse, error) {
  404. // get a connection from the pool
  405. conn, pool, err := b.getRandomConnection()
  406. if err != nil {
  407. return nil, err
  408. }
  409. // We may need to select the bucket before GetRandomDoc()
  410. // will work. This is sometimes done at startup (see defaultMkConn())
  411. // but not always, depending on the auth type.
  412. _, err = conn.SelectBucket(b.Name)
  413. if err != nil {
  414. return nil, err
  415. }
  416. // get a randomm document from the connection
  417. doc, err := conn.GetRandomDoc()
  418. // need to return the connection to the pool
  419. pool.Return(conn)
  420. return doc, err
  421. }
  422. func (b *Bucket) getMasterNode(i int) string {
  423. p := b.getConnPools(false /* not already locked */)
  424. if len(p) > i {
  425. return p[i].host
  426. }
  427. return ""
  428. }
  429. func (b *Bucket) authHandler(bucketLocked bool) (ah AuthHandler) {
  430. if !bucketLocked {
  431. b.RLock()
  432. defer b.RUnlock()
  433. }
  434. pool := b.pool
  435. name := b.Name
  436. if pool != nil {
  437. ah = pool.client.ah
  438. }
  439. if mbah, ok := ah.(MultiBucketAuthHandler); ok {
  440. return mbah.ForBucket(name)
  441. }
  442. if ah == nil {
  443. ah = &basicAuth{name, ""}
  444. }
  445. return
  446. }
  447. // NodeAddresses gets the (sorted) list of memcached node addresses
  448. // (hostname:port).
  449. func (b *Bucket) NodeAddresses() []string {
  450. vsm := b.VBServerMap()
  451. rv := make([]string, len(vsm.ServerList))
  452. copy(rv, vsm.ServerList)
  453. sort.Strings(rv)
  454. return rv
  455. }
  456. // CommonAddressSuffix finds the longest common suffix of all
  457. // host:port strings in the node list.
  458. func (b *Bucket) CommonAddressSuffix() string {
  459. input := []string{}
  460. for _, n := range b.Nodes() {
  461. input = append(input, n.Hostname)
  462. }
  463. return FindCommonSuffix(input)
  464. }
  465. // A Client is the starting point for all services across all buckets
  466. // in a Couchbase cluster.
  467. type Client struct {
  468. BaseURL *url.URL
  469. ah AuthHandler
  470. Info Pools
  471. tlsConfig *tls.Config
  472. }
  473. func maybeAddAuth(req *http.Request, ah AuthHandler) error {
  474. if hah, ok := ah.(HTTPAuthHandler); ok {
  475. return hah.SetCredsForRequest(req)
  476. }
  477. if ah != nil {
  478. user, pass, _ := ah.GetCredentials()
  479. req.Header.Set("Authorization", "Basic "+
  480. base64.StdEncoding.EncodeToString([]byte(user+":"+pass)))
  481. }
  482. return nil
  483. }
  484. // arbitary number, may need to be tuned #FIXME
  485. const HTTP_MAX_RETRY = 5
  486. // Someday golang network packages will implement standard
  487. // error codes. Until then #sigh
  488. func isHttpConnError(err error) bool {
  489. estr := err.Error()
  490. return strings.Contains(estr, "broken pipe") ||
  491. strings.Contains(estr, "broken connection") ||
  492. strings.Contains(estr, "connection reset")
  493. }
  494. var client *http.Client
  495. func ClientConfigForX509(certFile, keyFile, rootFile string) (*tls.Config, error) {
  496. cfg := &tls.Config{}
  497. if certFile != "" && keyFile != "" {
  498. tlsCert, err := tls.LoadX509KeyPair(certFile, keyFile)
  499. if err != nil {
  500. return nil, err
  501. }
  502. cfg.Certificates = []tls.Certificate{tlsCert}
  503. } else {
  504. //error need to pass both certfile and keyfile
  505. return nil, fmt.Errorf("N1QL: Need to pass both certfile and keyfile")
  506. }
  507. var caCert []byte
  508. var err1 error
  509. caCertPool := x509.NewCertPool()
  510. if rootFile != "" {
  511. // Read that value in
  512. caCert, err1 = ioutil.ReadFile(rootFile)
  513. if err1 != nil {
  514. return nil, fmt.Errorf(" Error in reading cacert file, err: %v", err1)
  515. }
  516. caCertPool.AppendCertsFromPEM(caCert)
  517. }
  518. cfg.RootCAs = caCertPool
  519. return cfg, nil
  520. }
  521. func doHTTPRequest(req *http.Request) (*http.Response, error) {
  522. var err error
  523. var res *http.Response
  524. // we need a client that ignores certificate errors, since we self-sign
  525. // our certs
  526. if client == nil && req.URL.Scheme == "https" {
  527. var tr *http.Transport
  528. if skipVerify {
  529. tr = &http.Transport{
  530. TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
  531. }
  532. } else {
  533. // Handle cases with cert
  534. cfg, err := ClientConfigForX509(certFile, keyFile, rootFile)
  535. if err != nil {
  536. return nil, err
  537. }
  538. tr = &http.Transport{
  539. TLSClientConfig: cfg,
  540. }
  541. }
  542. client = &http.Client{Transport: tr}
  543. } else if client == nil {
  544. client = HTTPClient
  545. }
  546. for i := 0; i < HTTP_MAX_RETRY; i++ {
  547. res, err = client.Do(req)
  548. if err != nil && isHttpConnError(err) {
  549. continue
  550. }
  551. break
  552. }
  553. if err != nil {
  554. return nil, err
  555. }
  556. return res, err
  557. }
  558. func doPutAPI(baseURL *url.URL, path string, params map[string]interface{}, authHandler AuthHandler, out interface{}) error {
  559. return doOutputAPI("PUT", baseURL, path, params, authHandler, out)
  560. }
  561. func doPostAPI(baseURL *url.URL, path string, params map[string]interface{}, authHandler AuthHandler, out interface{}) error {
  562. return doOutputAPI("POST", baseURL, path, params, authHandler, out)
  563. }
  564. func doOutputAPI(
  565. httpVerb string,
  566. baseURL *url.URL,
  567. path string,
  568. params map[string]interface{},
  569. authHandler AuthHandler,
  570. out interface{}) error {
  571. var requestUrl string
  572. if q := strings.Index(path, "?"); q > 0 {
  573. requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:]
  574. } else {
  575. requestUrl = baseURL.Scheme + "://" + baseURL.Host + path
  576. }
  577. postData := url.Values{}
  578. for k, v := range params {
  579. postData.Set(k, fmt.Sprintf("%v", v))
  580. }
  581. req, err := http.NewRequest(httpVerb, requestUrl, bytes.NewBufferString(postData.Encode()))
  582. if err != nil {
  583. return err
  584. }
  585. req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
  586. err = maybeAddAuth(req, authHandler)
  587. if err != nil {
  588. return err
  589. }
  590. res, err := doHTTPRequest(req)
  591. if err != nil {
  592. return err
  593. }
  594. defer res.Body.Close()
  595. if res.StatusCode != 200 {
  596. bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
  597. return fmt.Errorf("HTTP error %v getting %q: %s",
  598. res.Status, requestUrl, bod)
  599. }
  600. d := json.NewDecoder(res.Body)
  601. if err = d.Decode(&out); err != nil {
  602. return err
  603. }
  604. return nil
  605. }
  606. func queryRestAPI(
  607. baseURL *url.URL,
  608. path string,
  609. authHandler AuthHandler,
  610. out interface{}) error {
  611. var requestUrl string
  612. if q := strings.Index(path, "?"); q > 0 {
  613. requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:]
  614. } else {
  615. requestUrl = baseURL.Scheme + "://" + baseURL.Host + path
  616. }
  617. req, err := http.NewRequest("GET", requestUrl, nil)
  618. if err != nil {
  619. return err
  620. }
  621. err = maybeAddAuth(req, authHandler)
  622. if err != nil {
  623. return err
  624. }
  625. res, err := doHTTPRequest(req)
  626. if err != nil {
  627. return err
  628. }
  629. defer res.Body.Close()
  630. if res.StatusCode != 200 {
  631. bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
  632. return fmt.Errorf("HTTP error %v getting %q: %s",
  633. res.Status, requestUrl, bod)
  634. }
  635. d := json.NewDecoder(res.Body)
  636. if err = d.Decode(&out); err != nil {
  637. return err
  638. }
  639. return nil
  640. }
  641. func (c *Client) ProcessStream(path string, callb func(interface{}) error, data interface{}) error {
  642. return c.processStream(c.BaseURL, path, c.ah, callb, data)
  643. }
  644. // Based on code in http://src.couchbase.org/source/xref/trunk/goproj/src/github.com/couchbase/indexing/secondary/dcp/pools.go#309
  645. func (c *Client) processStream(baseURL *url.URL, path string, authHandler AuthHandler, callb func(interface{}) error, data interface{}) error {
  646. var requestUrl string
  647. if q := strings.Index(path, "?"); q > 0 {
  648. requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:]
  649. } else {
  650. requestUrl = baseURL.Scheme + "://" + baseURL.Host + path
  651. }
  652. req, err := http.NewRequest("GET", requestUrl, nil)
  653. if err != nil {
  654. return err
  655. }
  656. err = maybeAddAuth(req, authHandler)
  657. if err != nil {
  658. return err
  659. }
  660. res, err := doHTTPRequest(req)
  661. if err != nil {
  662. return err
  663. }
  664. defer res.Body.Close()
  665. if res.StatusCode != 200 {
  666. bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
  667. return fmt.Errorf("HTTP error %v getting %q: %s",
  668. res.Status, requestUrl, bod)
  669. }
  670. reader := bufio.NewReader(res.Body)
  671. for {
  672. bs, err := reader.ReadBytes('\n')
  673. if err != nil {
  674. return err
  675. }
  676. if len(bs) == 1 && bs[0] == '\n' {
  677. continue
  678. }
  679. err = json.Unmarshal(bs, data)
  680. if err != nil {
  681. return err
  682. }
  683. err = callb(data)
  684. if err != nil {
  685. return err
  686. }
  687. }
  688. return nil
  689. }
  690. func (c *Client) parseURLResponse(path string, out interface{}) error {
  691. return queryRestAPI(c.BaseURL, path, c.ah, out)
  692. }
  693. func (c *Client) parsePostURLResponse(path string, params map[string]interface{}, out interface{}) error {
  694. return doPostAPI(c.BaseURL, path, params, c.ah, out)
  695. }
  696. func (c *Client) parsePutURLResponse(path string, params map[string]interface{}, out interface{}) error {
  697. return doPutAPI(c.BaseURL, path, params, c.ah, out)
  698. }
  699. func (b *Bucket) parseURLResponse(path string, out interface{}) error {
  700. nodes := b.Nodes()
  701. if len(nodes) == 0 {
  702. return errors.New("no couch rest URLs")
  703. }
  704. // Pick a random node to start querying.
  705. startNode := rand.Intn(len(nodes))
  706. maxRetries := len(nodes)
  707. for i := 0; i < maxRetries; i++ {
  708. node := nodes[(startNode+i)%len(nodes)] // Wrap around the nodes list.
  709. // Skip non-healthy nodes.
  710. if node.Status != "healthy" || node.CouchAPIBase == "" {
  711. continue
  712. }
  713. url := &url.URL{
  714. Host: node.Hostname,
  715. Scheme: "http",
  716. }
  717. // Lock here to avoid having pool closed under us.
  718. b.RLock()
  719. err := queryRestAPI(url, path, b.pool.client.ah, out)
  720. b.RUnlock()
  721. if err == nil {
  722. return err
  723. }
  724. }
  725. return errors.New("All nodes failed to respond or no healthy nodes for bucket found")
  726. }
  727. func (b *Bucket) parseAPIResponse(path string, out interface{}) error {
  728. nodes := b.Nodes()
  729. if len(nodes) == 0 {
  730. return errors.New("no couch rest URLs")
  731. }
  732. var err error
  733. var u *url.URL
  734. // Pick a random node to start querying.
  735. startNode := rand.Intn(len(nodes))
  736. maxRetries := len(nodes)
  737. for i := 0; i < maxRetries; i++ {
  738. node := nodes[(startNode+i)%len(nodes)] // Wrap around the nodes list.
  739. // Skip non-healthy nodes.
  740. if node.Status != "healthy" || node.CouchAPIBase == "" {
  741. continue
  742. }
  743. u, err = ParseURL(node.CouchAPIBase)
  744. // Lock here so pool does not get closed under us.
  745. b.RLock()
  746. if err != nil {
  747. b.RUnlock()
  748. return fmt.Errorf("config error: Bucket %q node #%d CouchAPIBase=%q: %v",
  749. b.Name, i, node.CouchAPIBase, err)
  750. } else if b.pool != nil {
  751. u.User = b.pool.client.BaseURL.User
  752. }
  753. u.Path = path
  754. // generate the path so that the strings are properly escaped
  755. // MB-13770
  756. requestPath := strings.Split(u.String(), u.Host)[1]
  757. err = queryRestAPI(u, requestPath, b.pool.client.ah, out)
  758. b.RUnlock()
  759. if err == nil {
  760. return err
  761. }
  762. }
  763. var errStr string
  764. if err != nil {
  765. errStr = "Error " + err.Error()
  766. }
  767. return errors.New("All nodes failed to respond or returned error or no healthy nodes for bucket found." + errStr)
  768. }
  769. type basicAuth struct {
  770. u, p string
  771. }
  772. func (b basicAuth) GetCredentials() (string, string, string) {
  773. return b.u, b.p, b.u
  774. }
  775. func basicAuthFromURL(us string) (ah AuthHandler) {
  776. u, err := ParseURL(us)
  777. if err != nil {
  778. return
  779. }
  780. if user := u.User; user != nil {
  781. pw, _ := user.Password()
  782. ah = basicAuth{user.Username(), pw}
  783. }
  784. return
  785. }
  786. // ConnectWithAuth connects to a couchbase cluster with the given
  787. // authentication handler.
  788. func ConnectWithAuth(baseU string, ah AuthHandler) (c Client, err error) {
  789. c.BaseURL, err = ParseURL(baseU)
  790. if err != nil {
  791. return
  792. }
  793. c.ah = ah
  794. return c, c.parseURLResponse("/pools", &c.Info)
  795. }
  796. // Call this method with a TLS certificate file name to make communication
  797. // with the KV engine encrypted.
  798. //
  799. // This method should be called immediately after a Connect*() method.
  800. func (c *Client) InitTLS(certFile string) error {
  801. serverCert, err := ioutil.ReadFile(certFile)
  802. if err != nil {
  803. return err
  804. }
  805. CA_Pool := x509.NewCertPool()
  806. CA_Pool.AppendCertsFromPEM(serverCert)
  807. c.tlsConfig = &tls.Config{RootCAs: CA_Pool}
  808. return nil
  809. }
  810. func (c *Client) ClearTLS() {
  811. c.tlsConfig = nil
  812. }
  813. // ConnectWithAuthCreds connects to a couchbase cluster with the give
  814. // authorization creds returned by cb_auth
  815. func ConnectWithAuthCreds(baseU, username, password string) (c Client, err error) {
  816. c.BaseURL, err = ParseURL(baseU)
  817. if err != nil {
  818. return
  819. }
  820. c.ah = newBucketAuth(username, password, "")
  821. return c, c.parseURLResponse("/pools", &c.Info)
  822. }
  823. // Connect to a couchbase cluster. An authentication handler will be
  824. // created from the userinfo in the URL if provided.
  825. func Connect(baseU string) (Client, error) {
  826. return ConnectWithAuth(baseU, basicAuthFromURL(baseU))
  827. }
  828. type BucketInfo struct {
  829. Name string // name of bucket
  830. Password string // SASL password of bucket
  831. }
  832. //Get SASL buckets
  833. func GetBucketList(baseU string) (bInfo []BucketInfo, err error) {
  834. c := &Client{}
  835. c.BaseURL, err = ParseURL(baseU)
  836. if err != nil {
  837. return
  838. }
  839. c.ah = basicAuthFromURL(baseU)
  840. var buckets []Bucket
  841. err = c.parseURLResponse("/pools/default/buckets", &buckets)
  842. if err != nil {
  843. return
  844. }
  845. bInfo = make([]BucketInfo, 0)
  846. for _, bucket := range buckets {
  847. bucketInfo := BucketInfo{Name: bucket.Name, Password: bucket.Password}
  848. bInfo = append(bInfo, bucketInfo)
  849. }
  850. return bInfo, err
  851. }
  852. //Set viewUpdateDaemonOptions
  853. func SetViewUpdateParams(baseU string, params map[string]interface{}) (viewOpts map[string]interface{}, err error) {
  854. c := &Client{}
  855. c.BaseURL, err = ParseURL(baseU)
  856. if err != nil {
  857. return
  858. }
  859. c.ah = basicAuthFromURL(baseU)
  860. if len(params) < 1 {
  861. return nil, fmt.Errorf("No params to set")
  862. }
  863. err = c.parsePostURLResponse("/settings/viewUpdateDaemon", params, &viewOpts)
  864. if err != nil {
  865. return
  866. }
  867. return viewOpts, err
  868. }
  869. // This API lets the caller know, if the list of nodes a bucket is
  870. // connected to has gone through an edit (a rebalance operation)
  871. // since the last update to the bucket, in which case a Refresh is
  872. // advised.
  873. func (b *Bucket) NodeListChanged() bool {
  874. b.RLock()
  875. pool := b.pool
  876. uri := b.URI
  877. b.RUnlock()
  878. tmpb := &Bucket{}
  879. err := pool.client.parseURLResponse(uri, tmpb)
  880. if err != nil {
  881. return true
  882. }
  883. bNodes := *(*[]Node)(b.nodeList)
  884. if len(bNodes) != len(tmpb.NodesJSON) {
  885. return true
  886. }
  887. bucketHostnames := map[string]bool{}
  888. for _, node := range bNodes {
  889. bucketHostnames[node.Hostname] = true
  890. }
  891. for _, node := range tmpb.NodesJSON {
  892. if _, found := bucketHostnames[node.Hostname]; !found {
  893. return true
  894. }
  895. }
  896. return false
  897. }
  898. // Sample data for scopes and collections as returned from the
  899. // /pooles/default/$BUCKET_NAME/collections API.
  900. // {"myScope2":{"myCollectionC":{}},"myScope1":{"myCollectionB":{},"myCollectionA":{}},"_default":{"_default":{}}}
  901. // Structures for parsing collections manifest.
  902. // The map key is the name of the scope.
  903. // Example data:
  904. // {"uid":"b","scopes":[
  905. // {"name":"_default","uid":"0","collections":[
  906. // {"name":"_default","uid":"0"}]},
  907. // {"name":"myScope1","uid":"8","collections":[
  908. // {"name":"myCollectionB","uid":"c"},
  909. // {"name":"myCollectionA","uid":"b"}]},
  910. // {"name":"myScope2","uid":"9","collections":[
  911. // {"name":"myCollectionC","uid":"d"}]}]}
  912. type InputManifest struct {
  913. Uid string
  914. Scopes []InputScope
  915. }
  916. type InputScope struct {
  917. Name string
  918. Uid string
  919. Collections []InputCollection
  920. }
  921. type InputCollection struct {
  922. Name string
  923. Uid string
  924. }
  925. // Structures for storing collections information.
  926. type Manifest struct {
  927. Uid uint64
  928. Scopes map[string]*Scope // map by name
  929. }
  930. type Scope struct {
  931. Name string
  932. Uid uint64
  933. Collections map[string]*Collection // map by name
  934. }
  935. type Collection struct {
  936. Name string
  937. Uid uint64
  938. }
  939. var _EMPTY_MANIFEST *Manifest = &Manifest{Uid: 0, Scopes: map[string]*Scope{}}
  940. func parseCollectionsManifest(res *gomemcached.MCResponse) (*Manifest, error) {
  941. if !EnableCollections {
  942. return _EMPTY_MANIFEST, nil
  943. }
  944. var im InputManifest
  945. err := json.Unmarshal(res.Body, &im)
  946. if err != nil {
  947. return nil, err
  948. }
  949. uid, err := strconv.ParseUint(im.Uid, 16, 64)
  950. if err != nil {
  951. return nil, err
  952. }
  953. mani := &Manifest{Uid: uid, Scopes: make(map[string]*Scope, len(im.Scopes))}
  954. for _, iscope := range im.Scopes {
  955. scope_uid, err := strconv.ParseUint(iscope.Uid, 16, 64)
  956. if err != nil {
  957. return nil, err
  958. }
  959. scope := &Scope{Uid: scope_uid, Name: iscope.Name, Collections: make(map[string]*Collection, len(iscope.Collections))}
  960. mani.Scopes[iscope.Name] = scope
  961. for _, icoll := range iscope.Collections {
  962. coll_uid, err := strconv.ParseUint(icoll.Uid, 16, 64)
  963. if err != nil {
  964. return nil, err
  965. }
  966. coll := &Collection{Uid: coll_uid, Name: icoll.Name}
  967. scope.Collections[icoll.Name] = coll
  968. }
  969. }
  970. return mani, nil
  971. }
  972. // This function assumes the bucket is locked.
  973. func (b *Bucket) GetCollectionsManifest() (*Manifest, error) {
  974. // Collections not used?
  975. if !EnableCollections {
  976. return nil, fmt.Errorf("Collections not enabled.")
  977. }
  978. b.RLock()
  979. pools := b.getConnPools(true /* already locked */)
  980. pool := pools[0] // Any pool will do, so use the first one.
  981. b.RUnlock()
  982. client, err := pool.Get()
  983. if err != nil {
  984. return nil, fmt.Errorf("Unable to get connection to retrieve collections manifest: %v. No collections access to bucket %s.", err, b.Name)
  985. }
  986. // We need to select the bucket before GetCollectionsManifest()
  987. // will work. This is sometimes done at startup (see defaultMkConn())
  988. // but not always, depending on the auth type.
  989. // Doing this is safe because we collect the the connections
  990. // by bucket, so the bucket being selected will never change.
  991. _, err = client.SelectBucket(b.Name)
  992. if err != nil {
  993. pool.Return(client)
  994. return nil, fmt.Errorf("Unable to select bucket %s: %v. No collections access to bucket %s.", err, b.Name, b.Name)
  995. }
  996. res, err := client.GetCollectionsManifest()
  997. if err != nil {
  998. pool.Return(client)
  999. return nil, fmt.Errorf("Unable to retrieve collections manifest: %v. No collections access to bucket %s.", err, b.Name)
  1000. }
  1001. mani, err := parseCollectionsManifest(res)
  1002. if err != nil {
  1003. pool.Return(client)
  1004. return nil, fmt.Errorf("Unable to parse collections manifest: %v. No collections access to bucket %s.", err, b.Name)
  1005. }
  1006. pool.Return(client)
  1007. return mani, nil
  1008. }
  1009. func (b *Bucket) RefreshFully() error {
  1010. return b.refresh(false)
  1011. }
  1012. func (b *Bucket) Refresh() error {
  1013. return b.refresh(true)
  1014. }
  1015. func (b *Bucket) refresh(preserveConnections bool) error {
  1016. b.RLock()
  1017. pool := b.pool
  1018. uri := b.URI
  1019. client := pool.client
  1020. b.RUnlock()
  1021. tlsConfig := client.tlsConfig
  1022. var poolServices PoolServices
  1023. var err error
  1024. if tlsConfig != nil {
  1025. poolServices, err = client.GetPoolServices("default")
  1026. if err != nil {
  1027. return err
  1028. }
  1029. }
  1030. tmpb := &Bucket{}
  1031. err = pool.client.parseURLResponse(uri, tmpb)
  1032. if err != nil {
  1033. return err
  1034. }
  1035. pools := b.getConnPools(false /* bucket not already locked */)
  1036. // We need this lock to ensure that bucket refreshes happening because
  1037. // of NMVb errors received during bulkGet do not end up over-writing
  1038. // pool.inUse.
  1039. b.Lock()
  1040. for _, pool := range pools {
  1041. if pool != nil {
  1042. pool.inUse = false
  1043. }
  1044. }
  1045. newcps := make([]*connectionPool, len(tmpb.VBSMJson.ServerList))
  1046. for i := range newcps {
  1047. if preserveConnections {
  1048. pool := b.getConnPoolByHost(tmpb.VBSMJson.ServerList[i], true /* bucket already locked */)
  1049. if pool != nil && pool.inUse == false {
  1050. // if the hostname and index is unchanged then reuse this pool
  1051. newcps[i] = pool
  1052. pool.inUse = true
  1053. continue
  1054. }
  1055. }
  1056. hostport := tmpb.VBSMJson.ServerList[i]
  1057. if tlsConfig != nil {
  1058. hostport, err = MapKVtoSSL(hostport, &poolServices)
  1059. if err != nil {
  1060. b.Unlock()
  1061. return err
  1062. }
  1063. }
  1064. if b.ah != nil {
  1065. newcps[i] = newConnectionPool(hostport,
  1066. b.ah, AsynchronousCloser, PoolSize, PoolOverflow, tlsConfig, b.Name)
  1067. } else {
  1068. newcps[i] = newConnectionPool(hostport,
  1069. b.authHandler(true /* bucket already locked */),
  1070. AsynchronousCloser, PoolSize, PoolOverflow, tlsConfig, b.Name)
  1071. }
  1072. }
  1073. b.replaceConnPools2(newcps, true /* bucket already locked */)
  1074. tmpb.ah = b.ah
  1075. b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson)
  1076. b.nodeList = unsafe.Pointer(&tmpb.NodesJSON)
  1077. b.Unlock()
  1078. return nil
  1079. }
  1080. func (p *Pool) refresh() (err error) {
  1081. p.BucketMap = make(map[string]*Bucket)
  1082. buckets := []Bucket{}
  1083. err = p.client.parseURLResponse(p.BucketURL["uri"], &buckets)
  1084. if err != nil {
  1085. return err
  1086. }
  1087. for i, _ := range buckets {
  1088. b := new(Bucket)
  1089. *b = buckets[i]
  1090. b.pool = p
  1091. b.nodeList = unsafe.Pointer(&b.NodesJSON)
  1092. // MB-33185 this is merely defensive, just in case
  1093. // refresh() gets called on a perfectly node pool
  1094. ob, ok := p.BucketMap[b.Name]
  1095. if ok && ob.connPools != nil {
  1096. ob.Close()
  1097. }
  1098. b.replaceConnPools(make([]*connectionPool, len(b.VBSMJson.ServerList)))
  1099. p.BucketMap[b.Name] = b
  1100. runtime.SetFinalizer(b, bucketFinalizer)
  1101. }
  1102. return nil
  1103. }
  1104. // GetPool gets a pool from within the couchbase cluster (usually
  1105. // "default").
  1106. func (c *Client) GetPool(name string) (p Pool, err error) {
  1107. var poolURI string
  1108. for _, p := range c.Info.Pools {
  1109. if p.Name == name {
  1110. poolURI = p.URI
  1111. break
  1112. }
  1113. }
  1114. if poolURI == "" {
  1115. return p, errors.New("No pool named " + name)
  1116. }
  1117. err = c.parseURLResponse(poolURI, &p)
  1118. p.client = c
  1119. err = p.refresh()
  1120. return
  1121. }
  1122. // GetPoolServices returns all the bucket-independent services in a pool.
  1123. // (See "Exposing services outside of bucket context" in http://goo.gl/uuXRkV)
  1124. func (c *Client) GetPoolServices(name string) (ps PoolServices, err error) {
  1125. var poolName string
  1126. for _, p := range c.Info.Pools {
  1127. if p.Name == name {
  1128. poolName = p.Name
  1129. }
  1130. }
  1131. if poolName == "" {
  1132. return ps, errors.New("No pool named " + name)
  1133. }
  1134. poolURI := "/pools/" + poolName + "/nodeServices"
  1135. err = c.parseURLResponse(poolURI, &ps)
  1136. return
  1137. }
  1138. func (b *Bucket) GetPoolServices(name string) (*PoolServices, error) {
  1139. b.RLock()
  1140. pool := b.pool
  1141. b.RUnlock()
  1142. ps, err := pool.client.GetPoolServices(name)
  1143. if err != nil {
  1144. return nil, err
  1145. }
  1146. return &ps, nil
  1147. }
  1148. // Close marks this bucket as no longer needed, closing connections it
  1149. // may have open.
  1150. func (b *Bucket) Close() {
  1151. b.Lock()
  1152. defer b.Unlock()
  1153. if b.connPools != nil {
  1154. for _, c := range b.getConnPools(true /* already locked */) {
  1155. if c != nil {
  1156. c.Close()
  1157. }
  1158. }
  1159. b.connPools = nil
  1160. }
  1161. }
  1162. func bucketFinalizer(b *Bucket) {
  1163. if b.connPools != nil {
  1164. if !b.closed {
  1165. logging.Warnf("Finalizing a bucket with active connections.")
  1166. }
  1167. // MB-33185 do not leak connection pools
  1168. b.Close()
  1169. }
  1170. }
  1171. // GetBucket gets a bucket from within this pool.
  1172. func (p *Pool) GetBucket(name string) (*Bucket, error) {
  1173. rv, ok := p.BucketMap[name]
  1174. if !ok {
  1175. return nil, &BucketNotFoundError{bucket: name}
  1176. }
  1177. err := rv.Refresh()
  1178. if err != nil {
  1179. return nil, err
  1180. }
  1181. return rv, nil
  1182. }
  1183. // GetBucket gets a bucket from within this pool.
  1184. func (p *Pool) GetBucketWithAuth(bucket, username, password string) (*Bucket, error) {
  1185. rv, ok := p.BucketMap[bucket]
  1186. if !ok {
  1187. return nil, &BucketNotFoundError{bucket: bucket}
  1188. }
  1189. rv.ah = newBucketAuth(username, password, bucket)
  1190. err := rv.Refresh()
  1191. if err != nil {
  1192. return nil, err
  1193. }
  1194. return rv, nil
  1195. }
  1196. // GetPool gets the pool to which this bucket belongs.
  1197. func (b *Bucket) GetPool() *Pool {
  1198. b.RLock()
  1199. defer b.RUnlock()
  1200. ret := b.pool
  1201. return ret
  1202. }
  1203. // GetClient gets the client from which we got this pool.
  1204. func (p *Pool) GetClient() *Client {
  1205. return p.client
  1206. }
  1207. // Release bucket connections when the pool is no longer in use
  1208. func (p *Pool) Close() {
  1209. // fine to loop through the buckets unlocked
  1210. // locking happens at the bucket level
  1211. for b, _ := range p.BucketMap {
  1212. // MB-33208 defer closing connection pools until the bucket is no longer used
  1213. bucket := p.BucketMap[b]
  1214. bucket.Lock()
  1215. bucket.closed = true
  1216. bucket.Unlock()
  1217. }
  1218. }
  1219. // GetBucket is a convenience function for getting a named bucket from
  1220. // a URL
  1221. func GetBucket(endpoint, poolname, bucketname string) (*Bucket, error) {
  1222. var err error
  1223. client, err := Connect(endpoint)
  1224. if err != nil {
  1225. return nil, err
  1226. }
  1227. pool, err := client.GetPool(poolname)
  1228. if err != nil {
  1229. return nil, err
  1230. }
  1231. return pool.GetBucket(bucketname)
  1232. }
  1233. // ConnectWithAuthAndGetBucket is a convenience function for
  1234. // getting a named bucket from a given URL and an auth callback
  1235. func ConnectWithAuthAndGetBucket(endpoint, poolname, bucketname string,
  1236. ah AuthHandler) (*Bucket, error) {
  1237. client, err := ConnectWithAuth(endpoint, ah)
  1238. if err != nil {
  1239. return nil, err
  1240. }
  1241. pool, err := client.GetPool(poolname)
  1242. if err != nil {
  1243. return nil, err
  1244. }
  1245. return pool.GetBucket(bucketname)
  1246. }
上海开阖软件有限公司 沪ICP备12045867号-1