本站源代码
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

705 lines
14KB

  1. // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
  2. // All rights reserved.
  3. //
  4. // Use of this source code is governed by a BSD-style license that can be
  5. // found in the LICENSE file.
  6. // Package cache provides interface and implementation of a cache algorithms.
  7. package cache
  8. import (
  9. "sync"
  10. "sync/atomic"
  11. "unsafe"
  12. "github.com/syndtr/goleveldb/leveldb/util"
  13. )
  14. // Cacher provides interface to implements a caching functionality.
  15. // An implementation must be safe for concurrent use.
  16. type Cacher interface {
  17. // Capacity returns cache capacity.
  18. Capacity() int
  19. // SetCapacity sets cache capacity.
  20. SetCapacity(capacity int)
  21. // Promote promotes the 'cache node'.
  22. Promote(n *Node)
  23. // Ban evicts the 'cache node' and prevent subsequent 'promote'.
  24. Ban(n *Node)
  25. // Evict evicts the 'cache node'.
  26. Evict(n *Node)
  27. // EvictNS evicts 'cache node' with the given namespace.
  28. EvictNS(ns uint64)
  29. // EvictAll evicts all 'cache node'.
  30. EvictAll()
  31. // Close closes the 'cache tree'
  32. Close() error
  33. }
  34. // Value is a 'cacheable object'. It may implements util.Releaser, if
  35. // so the the Release method will be called once object is released.
  36. type Value interface{}
  37. // NamespaceGetter provides convenient wrapper for namespace.
  38. type NamespaceGetter struct {
  39. Cache *Cache
  40. NS uint64
  41. }
  42. // Get simply calls Cache.Get() method.
  43. func (g *NamespaceGetter) Get(key uint64, setFunc func() (size int, value Value)) *Handle {
  44. return g.Cache.Get(g.NS, key, setFunc)
  45. }
  46. // The hash tables implementation is based on:
  47. // "Dynamic-Sized Nonblocking Hash Tables", by Yujie Liu,
  48. // Kunlong Zhang, and Michael Spear.
  49. // ACM Symposium on Principles of Distributed Computing, Jul 2014.
  50. const (
  51. mInitialSize = 1 << 4
  52. mOverflowThreshold = 1 << 5
  53. mOverflowGrowThreshold = 1 << 7
  54. )
  55. type mBucket struct {
  56. mu sync.Mutex
  57. node []*Node
  58. frozen bool
  59. }
  60. func (b *mBucket) freeze() []*Node {
  61. b.mu.Lock()
  62. defer b.mu.Unlock()
  63. if !b.frozen {
  64. b.frozen = true
  65. }
  66. return b.node
  67. }
  68. func (b *mBucket) get(r *Cache, h *mNode, hash uint32, ns, key uint64, noset bool) (done, added bool, n *Node) {
  69. b.mu.Lock()
  70. if b.frozen {
  71. b.mu.Unlock()
  72. return
  73. }
  74. // Scan the node.
  75. for _, n := range b.node {
  76. if n.hash == hash && n.ns == ns && n.key == key {
  77. atomic.AddInt32(&n.ref, 1)
  78. b.mu.Unlock()
  79. return true, false, n
  80. }
  81. }
  82. // Get only.
  83. if noset {
  84. b.mu.Unlock()
  85. return true, false, nil
  86. }
  87. // Create node.
  88. n = &Node{
  89. r: r,
  90. hash: hash,
  91. ns: ns,
  92. key: key,
  93. ref: 1,
  94. }
  95. // Add node to bucket.
  96. b.node = append(b.node, n)
  97. bLen := len(b.node)
  98. b.mu.Unlock()
  99. // Update counter.
  100. grow := atomic.AddInt32(&r.nodes, 1) >= h.growThreshold
  101. if bLen > mOverflowThreshold {
  102. grow = grow || atomic.AddInt32(&h.overflow, 1) >= mOverflowGrowThreshold
  103. }
  104. // Grow.
  105. if grow && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) {
  106. nhLen := len(h.buckets) << 1
  107. nh := &mNode{
  108. buckets: make([]unsafe.Pointer, nhLen),
  109. mask: uint32(nhLen) - 1,
  110. pred: unsafe.Pointer(h),
  111. growThreshold: int32(nhLen * mOverflowThreshold),
  112. shrinkThreshold: int32(nhLen >> 1),
  113. }
  114. ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
  115. if !ok {
  116. panic("BUG: failed swapping head")
  117. }
  118. go nh.initBuckets()
  119. }
  120. return true, true, n
  121. }
  122. func (b *mBucket) delete(r *Cache, h *mNode, hash uint32, ns, key uint64) (done, deleted bool) {
  123. b.mu.Lock()
  124. if b.frozen {
  125. b.mu.Unlock()
  126. return
  127. }
  128. // Scan the node.
  129. var (
  130. n *Node
  131. bLen int
  132. )
  133. for i := range b.node {
  134. n = b.node[i]
  135. if n.ns == ns && n.key == key {
  136. if atomic.LoadInt32(&n.ref) == 0 {
  137. deleted = true
  138. // Call releaser.
  139. if n.value != nil {
  140. if r, ok := n.value.(util.Releaser); ok {
  141. r.Release()
  142. }
  143. n.value = nil
  144. }
  145. // Remove node from bucket.
  146. b.node = append(b.node[:i], b.node[i+1:]...)
  147. bLen = len(b.node)
  148. }
  149. break
  150. }
  151. }
  152. b.mu.Unlock()
  153. if deleted {
  154. // Call OnDel.
  155. for _, f := range n.onDel {
  156. f()
  157. }
  158. // Update counter.
  159. atomic.AddInt32(&r.size, int32(n.size)*-1)
  160. shrink := atomic.AddInt32(&r.nodes, -1) < h.shrinkThreshold
  161. if bLen >= mOverflowThreshold {
  162. atomic.AddInt32(&h.overflow, -1)
  163. }
  164. // Shrink.
  165. if shrink && len(h.buckets) > mInitialSize && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) {
  166. nhLen := len(h.buckets) >> 1
  167. nh := &mNode{
  168. buckets: make([]unsafe.Pointer, nhLen),
  169. mask: uint32(nhLen) - 1,
  170. pred: unsafe.Pointer(h),
  171. growThreshold: int32(nhLen * mOverflowThreshold),
  172. shrinkThreshold: int32(nhLen >> 1),
  173. }
  174. ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
  175. if !ok {
  176. panic("BUG: failed swapping head")
  177. }
  178. go nh.initBuckets()
  179. }
  180. }
  181. return true, deleted
  182. }
  183. type mNode struct {
  184. buckets []unsafe.Pointer // []*mBucket
  185. mask uint32
  186. pred unsafe.Pointer // *mNode
  187. resizeInProgess int32
  188. overflow int32
  189. growThreshold int32
  190. shrinkThreshold int32
  191. }
  192. func (n *mNode) initBucket(i uint32) *mBucket {
  193. if b := (*mBucket)(atomic.LoadPointer(&n.buckets[i])); b != nil {
  194. return b
  195. }
  196. p := (*mNode)(atomic.LoadPointer(&n.pred))
  197. if p != nil {
  198. var node []*Node
  199. if n.mask > p.mask {
  200. // Grow.
  201. pb := (*mBucket)(atomic.LoadPointer(&p.buckets[i&p.mask]))
  202. if pb == nil {
  203. pb = p.initBucket(i & p.mask)
  204. }
  205. m := pb.freeze()
  206. // Split nodes.
  207. for _, x := range m {
  208. if x.hash&n.mask == i {
  209. node = append(node, x)
  210. }
  211. }
  212. } else {
  213. // Shrink.
  214. pb0 := (*mBucket)(atomic.LoadPointer(&p.buckets[i]))
  215. if pb0 == nil {
  216. pb0 = p.initBucket(i)
  217. }
  218. pb1 := (*mBucket)(atomic.LoadPointer(&p.buckets[i+uint32(len(n.buckets))]))
  219. if pb1 == nil {
  220. pb1 = p.initBucket(i + uint32(len(n.buckets)))
  221. }
  222. m0 := pb0.freeze()
  223. m1 := pb1.freeze()
  224. // Merge nodes.
  225. node = make([]*Node, 0, len(m0)+len(m1))
  226. node = append(node, m0...)
  227. node = append(node, m1...)
  228. }
  229. b := &mBucket{node: node}
  230. if atomic.CompareAndSwapPointer(&n.buckets[i], nil, unsafe.Pointer(b)) {
  231. if len(node) > mOverflowThreshold {
  232. atomic.AddInt32(&n.overflow, int32(len(node)-mOverflowThreshold))
  233. }
  234. return b
  235. }
  236. }
  237. return (*mBucket)(atomic.LoadPointer(&n.buckets[i]))
  238. }
  239. func (n *mNode) initBuckets() {
  240. for i := range n.buckets {
  241. n.initBucket(uint32(i))
  242. }
  243. atomic.StorePointer(&n.pred, nil)
  244. }
  245. // Cache is a 'cache map'.
  246. type Cache struct {
  247. mu sync.RWMutex
  248. mHead unsafe.Pointer // *mNode
  249. nodes int32
  250. size int32
  251. cacher Cacher
  252. closed bool
  253. }
  254. // NewCache creates a new 'cache map'. The cacher is optional and
  255. // may be nil.
  256. func NewCache(cacher Cacher) *Cache {
  257. h := &mNode{
  258. buckets: make([]unsafe.Pointer, mInitialSize),
  259. mask: mInitialSize - 1,
  260. growThreshold: int32(mInitialSize * mOverflowThreshold),
  261. shrinkThreshold: 0,
  262. }
  263. for i := range h.buckets {
  264. h.buckets[i] = unsafe.Pointer(&mBucket{})
  265. }
  266. r := &Cache{
  267. mHead: unsafe.Pointer(h),
  268. cacher: cacher,
  269. }
  270. return r
  271. }
  272. func (r *Cache) getBucket(hash uint32) (*mNode, *mBucket) {
  273. h := (*mNode)(atomic.LoadPointer(&r.mHead))
  274. i := hash & h.mask
  275. b := (*mBucket)(atomic.LoadPointer(&h.buckets[i]))
  276. if b == nil {
  277. b = h.initBucket(i)
  278. }
  279. return h, b
  280. }
  281. func (r *Cache) delete(n *Node) bool {
  282. for {
  283. h, b := r.getBucket(n.hash)
  284. done, deleted := b.delete(r, h, n.hash, n.ns, n.key)
  285. if done {
  286. return deleted
  287. }
  288. }
  289. }
  290. // Nodes returns number of 'cache node' in the map.
  291. func (r *Cache) Nodes() int {
  292. return int(atomic.LoadInt32(&r.nodes))
  293. }
  294. // Size returns sums of 'cache node' size in the map.
  295. func (r *Cache) Size() int {
  296. return int(atomic.LoadInt32(&r.size))
  297. }
  298. // Capacity returns cache capacity.
  299. func (r *Cache) Capacity() int {
  300. if r.cacher == nil {
  301. return 0
  302. }
  303. return r.cacher.Capacity()
  304. }
  305. // SetCapacity sets cache capacity.
  306. func (r *Cache) SetCapacity(capacity int) {
  307. if r.cacher != nil {
  308. r.cacher.SetCapacity(capacity)
  309. }
  310. }
  311. // Get gets 'cache node' with the given namespace and key.
  312. // If cache node is not found and setFunc is not nil, Get will atomically creates
  313. // the 'cache node' by calling setFunc. Otherwise Get will returns nil.
  314. //
  315. // The returned 'cache handle' should be released after use by calling Release
  316. // method.
  317. func (r *Cache) Get(ns, key uint64, setFunc func() (size int, value Value)) *Handle {
  318. r.mu.RLock()
  319. defer r.mu.RUnlock()
  320. if r.closed {
  321. return nil
  322. }
  323. hash := murmur32(ns, key, 0xf00)
  324. for {
  325. h, b := r.getBucket(hash)
  326. done, _, n := b.get(r, h, hash, ns, key, setFunc == nil)
  327. if done {
  328. if n != nil {
  329. n.mu.Lock()
  330. if n.value == nil {
  331. if setFunc == nil {
  332. n.mu.Unlock()
  333. n.unref()
  334. return nil
  335. }
  336. n.size, n.value = setFunc()
  337. if n.value == nil {
  338. n.size = 0
  339. n.mu.Unlock()
  340. n.unref()
  341. return nil
  342. }
  343. atomic.AddInt32(&r.size, int32(n.size))
  344. }
  345. n.mu.Unlock()
  346. if r.cacher != nil {
  347. r.cacher.Promote(n)
  348. }
  349. return &Handle{unsafe.Pointer(n)}
  350. }
  351. break
  352. }
  353. }
  354. return nil
  355. }
  356. // Delete removes and ban 'cache node' with the given namespace and key.
  357. // A banned 'cache node' will never inserted into the 'cache tree'. Ban
  358. // only attributed to the particular 'cache node', so when a 'cache node'
  359. // is recreated it will not be banned.
  360. //
  361. // If onDel is not nil, then it will be executed if such 'cache node'
  362. // doesn't exist or once the 'cache node' is released.
  363. //
  364. // Delete return true is such 'cache node' exist.
  365. func (r *Cache) Delete(ns, key uint64, onDel func()) bool {
  366. r.mu.RLock()
  367. defer r.mu.RUnlock()
  368. if r.closed {
  369. return false
  370. }
  371. hash := murmur32(ns, key, 0xf00)
  372. for {
  373. h, b := r.getBucket(hash)
  374. done, _, n := b.get(r, h, hash, ns, key, true)
  375. if done {
  376. if n != nil {
  377. if onDel != nil {
  378. n.mu.Lock()
  379. n.onDel = append(n.onDel, onDel)
  380. n.mu.Unlock()
  381. }
  382. if r.cacher != nil {
  383. r.cacher.Ban(n)
  384. }
  385. n.unref()
  386. return true
  387. }
  388. break
  389. }
  390. }
  391. if onDel != nil {
  392. onDel()
  393. }
  394. return false
  395. }
  396. // Evict evicts 'cache node' with the given namespace and key. This will
  397. // simply call Cacher.Evict.
  398. //
  399. // Evict return true is such 'cache node' exist.
  400. func (r *Cache) Evict(ns, key uint64) bool {
  401. r.mu.RLock()
  402. defer r.mu.RUnlock()
  403. if r.closed {
  404. return false
  405. }
  406. hash := murmur32(ns, key, 0xf00)
  407. for {
  408. h, b := r.getBucket(hash)
  409. done, _, n := b.get(r, h, hash, ns, key, true)
  410. if done {
  411. if n != nil {
  412. if r.cacher != nil {
  413. r.cacher.Evict(n)
  414. }
  415. n.unref()
  416. return true
  417. }
  418. break
  419. }
  420. }
  421. return false
  422. }
  423. // EvictNS evicts 'cache node' with the given namespace. This will
  424. // simply call Cacher.EvictNS.
  425. func (r *Cache) EvictNS(ns uint64) {
  426. r.mu.RLock()
  427. defer r.mu.RUnlock()
  428. if r.closed {
  429. return
  430. }
  431. if r.cacher != nil {
  432. r.cacher.EvictNS(ns)
  433. }
  434. }
  435. // EvictAll evicts all 'cache node'. This will simply call Cacher.EvictAll.
  436. func (r *Cache) EvictAll() {
  437. r.mu.RLock()
  438. defer r.mu.RUnlock()
  439. if r.closed {
  440. return
  441. }
  442. if r.cacher != nil {
  443. r.cacher.EvictAll()
  444. }
  445. }
  446. // Close closes the 'cache map' and forcefully releases all 'cache node'.
  447. func (r *Cache) Close() error {
  448. r.mu.Lock()
  449. if !r.closed {
  450. r.closed = true
  451. h := (*mNode)(r.mHead)
  452. h.initBuckets()
  453. for i := range h.buckets {
  454. b := (*mBucket)(h.buckets[i])
  455. for _, n := range b.node {
  456. // Call releaser.
  457. if n.value != nil {
  458. if r, ok := n.value.(util.Releaser); ok {
  459. r.Release()
  460. }
  461. n.value = nil
  462. }
  463. // Call OnDel.
  464. for _, f := range n.onDel {
  465. f()
  466. }
  467. n.onDel = nil
  468. }
  469. }
  470. }
  471. r.mu.Unlock()
  472. // Avoid deadlock.
  473. if r.cacher != nil {
  474. if err := r.cacher.Close(); err != nil {
  475. return err
  476. }
  477. }
  478. return nil
  479. }
  480. // CloseWeak closes the 'cache map' and evict all 'cache node' from cacher, but
  481. // unlike Close it doesn't forcefully releases 'cache node'.
  482. func (r *Cache) CloseWeak() error {
  483. r.mu.Lock()
  484. if !r.closed {
  485. r.closed = true
  486. }
  487. r.mu.Unlock()
  488. // Avoid deadlock.
  489. if r.cacher != nil {
  490. r.cacher.EvictAll()
  491. if err := r.cacher.Close(); err != nil {
  492. return err
  493. }
  494. }
  495. return nil
  496. }
  497. // Node is a 'cache node'.
  498. type Node struct {
  499. r *Cache
  500. hash uint32
  501. ns, key uint64
  502. mu sync.Mutex
  503. size int
  504. value Value
  505. ref int32
  506. onDel []func()
  507. CacheData unsafe.Pointer
  508. }
  509. // NS returns this 'cache node' namespace.
  510. func (n *Node) NS() uint64 {
  511. return n.ns
  512. }
  513. // Key returns this 'cache node' key.
  514. func (n *Node) Key() uint64 {
  515. return n.key
  516. }
  517. // Size returns this 'cache node' size.
  518. func (n *Node) Size() int {
  519. return n.size
  520. }
  521. // Value returns this 'cache node' value.
  522. func (n *Node) Value() Value {
  523. return n.value
  524. }
  525. // Ref returns this 'cache node' ref counter.
  526. func (n *Node) Ref() int32 {
  527. return atomic.LoadInt32(&n.ref)
  528. }
  529. // GetHandle returns an handle for this 'cache node'.
  530. func (n *Node) GetHandle() *Handle {
  531. if atomic.AddInt32(&n.ref, 1) <= 1 {
  532. panic("BUG: Node.GetHandle on zero ref")
  533. }
  534. return &Handle{unsafe.Pointer(n)}
  535. }
  536. func (n *Node) unref() {
  537. if atomic.AddInt32(&n.ref, -1) == 0 {
  538. n.r.delete(n)
  539. }
  540. }
  541. func (n *Node) unrefLocked() {
  542. if atomic.AddInt32(&n.ref, -1) == 0 {
  543. n.r.mu.RLock()
  544. if !n.r.closed {
  545. n.r.delete(n)
  546. }
  547. n.r.mu.RUnlock()
  548. }
  549. }
  550. // Handle is a 'cache handle' of a 'cache node'.
  551. type Handle struct {
  552. n unsafe.Pointer // *Node
  553. }
  554. // Value returns the value of the 'cache node'.
  555. func (h *Handle) Value() Value {
  556. n := (*Node)(atomic.LoadPointer(&h.n))
  557. if n != nil {
  558. return n.value
  559. }
  560. return nil
  561. }
  562. // Release releases this 'cache handle'.
  563. // It is safe to call release multiple times.
  564. func (h *Handle) Release() {
  565. nPtr := atomic.LoadPointer(&h.n)
  566. if nPtr != nil && atomic.CompareAndSwapPointer(&h.n, nPtr, nil) {
  567. n := (*Node)(nPtr)
  568. n.unrefLocked()
  569. }
  570. }
  571. func murmur32(ns, key uint64, seed uint32) uint32 {
  572. const (
  573. m = uint32(0x5bd1e995)
  574. r = 24
  575. )
  576. k1 := uint32(ns >> 32)
  577. k2 := uint32(ns)
  578. k3 := uint32(key >> 32)
  579. k4 := uint32(key)
  580. k1 *= m
  581. k1 ^= k1 >> r
  582. k1 *= m
  583. k2 *= m
  584. k2 ^= k2 >> r
  585. k2 *= m
  586. k3 *= m
  587. k3 ^= k3 >> r
  588. k3 *= m
  589. k4 *= m
  590. k4 ^= k4 >> r
  591. k4 *= m
  592. h := seed
  593. h *= m
  594. h ^= k1
  595. h *= m
  596. h ^= k2
  597. h *= m
  598. h ^= k3
  599. h *= m
  600. h ^= k4
  601. h ^= h >> 13
  602. h *= m
  603. h ^= h >> 15
  604. return h
  605. }
上海开阖软件有限公司 沪ICP备12045867号-1