本站源代码
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

532 lines
12KB

  1. // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
  2. // All rights reserved.
  3. //
  4. // Use of this source code is governed by a BSD-style license that can be
  5. // found in the LICENSE file.
  6. package leveldb
  7. import (
  8. "fmt"
  9. "sort"
  10. "sync/atomic"
  11. "github.com/syndtr/goleveldb/leveldb/cache"
  12. "github.com/syndtr/goleveldb/leveldb/iterator"
  13. "github.com/syndtr/goleveldb/leveldb/opt"
  14. "github.com/syndtr/goleveldb/leveldb/storage"
  15. "github.com/syndtr/goleveldb/leveldb/table"
  16. "github.com/syndtr/goleveldb/leveldb/util"
  17. )
  18. // tFile holds basic information about a table.
  19. type tFile struct {
  20. fd storage.FileDesc
  21. seekLeft int32
  22. size int64
  23. imin, imax internalKey
  24. }
  25. // Returns true if given key is after largest key of this table.
  26. func (t *tFile) after(icmp *iComparer, ukey []byte) bool {
  27. return ukey != nil && icmp.uCompare(ukey, t.imax.ukey()) > 0
  28. }
  29. // Returns true if given key is before smallest key of this table.
  30. func (t *tFile) before(icmp *iComparer, ukey []byte) bool {
  31. return ukey != nil && icmp.uCompare(ukey, t.imin.ukey()) < 0
  32. }
  33. // Returns true if given key range overlaps with this table key range.
  34. func (t *tFile) overlaps(icmp *iComparer, umin, umax []byte) bool {
  35. return !t.after(icmp, umin) && !t.before(icmp, umax)
  36. }
  37. // Cosumes one seek and return current seeks left.
  38. func (t *tFile) consumeSeek() int32 {
  39. return atomic.AddInt32(&t.seekLeft, -1)
  40. }
  41. // Creates new tFile.
  42. func newTableFile(fd storage.FileDesc, size int64, imin, imax internalKey) *tFile {
  43. f := &tFile{
  44. fd: fd,
  45. size: size,
  46. imin: imin,
  47. imax: imax,
  48. }
  49. // We arrange to automatically compact this file after
  50. // a certain number of seeks. Let's assume:
  51. // (1) One seek costs 10ms
  52. // (2) Writing or reading 1MB costs 10ms (100MB/s)
  53. // (3) A compaction of 1MB does 25MB of IO:
  54. // 1MB read from this level
  55. // 10-12MB read from next level (boundaries may be misaligned)
  56. // 10-12MB written to next level
  57. // This implies that 25 seeks cost the same as the compaction
  58. // of 1MB of data. I.e., one seek costs approximately the
  59. // same as the compaction of 40KB of data. We are a little
  60. // conservative and allow approximately one seek for every 16KB
  61. // of data before triggering a compaction.
  62. f.seekLeft = int32(size / 16384)
  63. if f.seekLeft < 100 {
  64. f.seekLeft = 100
  65. }
  66. return f
  67. }
  68. func tableFileFromRecord(r atRecord) *tFile {
  69. return newTableFile(storage.FileDesc{Type: storage.TypeTable, Num: r.num}, r.size, r.imin, r.imax)
  70. }
  71. // tFiles hold multiple tFile.
  72. type tFiles []*tFile
  73. func (tf tFiles) Len() int { return len(tf) }
  74. func (tf tFiles) Swap(i, j int) { tf[i], tf[j] = tf[j], tf[i] }
  75. func (tf tFiles) nums() string {
  76. x := "[ "
  77. for i, f := range tf {
  78. if i != 0 {
  79. x += ", "
  80. }
  81. x += fmt.Sprint(f.fd.Num)
  82. }
  83. x += " ]"
  84. return x
  85. }
  86. // Returns true if i smallest key is less than j.
  87. // This used for sort by key in ascending order.
  88. func (tf tFiles) lessByKey(icmp *iComparer, i, j int) bool {
  89. a, b := tf[i], tf[j]
  90. n := icmp.Compare(a.imin, b.imin)
  91. if n == 0 {
  92. return a.fd.Num < b.fd.Num
  93. }
  94. return n < 0
  95. }
  96. // Returns true if i file number is greater than j.
  97. // This used for sort by file number in descending order.
  98. func (tf tFiles) lessByNum(i, j int) bool {
  99. return tf[i].fd.Num > tf[j].fd.Num
  100. }
  101. // Sorts tables by key in ascending order.
  102. func (tf tFiles) sortByKey(icmp *iComparer) {
  103. sort.Sort(&tFilesSortByKey{tFiles: tf, icmp: icmp})
  104. }
  105. // Sorts tables by file number in descending order.
  106. func (tf tFiles) sortByNum() {
  107. sort.Sort(&tFilesSortByNum{tFiles: tf})
  108. }
  109. // Returns sum of all tables size.
  110. func (tf tFiles) size() (sum int64) {
  111. for _, t := range tf {
  112. sum += t.size
  113. }
  114. return sum
  115. }
  116. // Searches smallest index of tables whose its smallest
  117. // key is after or equal with given key.
  118. func (tf tFiles) searchMin(icmp *iComparer, ikey internalKey) int {
  119. return sort.Search(len(tf), func(i int) bool {
  120. return icmp.Compare(tf[i].imin, ikey) >= 0
  121. })
  122. }
  123. // Searches smallest index of tables whose its largest
  124. // key is after or equal with given key.
  125. func (tf tFiles) searchMax(icmp *iComparer, ikey internalKey) int {
  126. return sort.Search(len(tf), func(i int) bool {
  127. return icmp.Compare(tf[i].imax, ikey) >= 0
  128. })
  129. }
  130. // Returns true if given key range overlaps with one or more
  131. // tables key range. If unsorted is true then binary search will not be used.
  132. func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool {
  133. if unsorted {
  134. // Check against all files.
  135. for _, t := range tf {
  136. if t.overlaps(icmp, umin, umax) {
  137. return true
  138. }
  139. }
  140. return false
  141. }
  142. i := 0
  143. if len(umin) > 0 {
  144. // Find the earliest possible internal key for min.
  145. i = tf.searchMax(icmp, makeInternalKey(nil, umin, keyMaxSeq, keyTypeSeek))
  146. }
  147. if i >= len(tf) {
  148. // Beginning of range is after all files, so no overlap.
  149. return false
  150. }
  151. return !tf[i].before(icmp, umax)
  152. }
  153. // Returns tables whose its key range overlaps with given key range.
  154. // Range will be expanded if ukey found hop across tables.
  155. // If overlapped is true then the search will be restarted if umax
  156. // expanded.
  157. // The dst content will be overwritten.
  158. func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles {
  159. dst = dst[:0]
  160. for i := 0; i < len(tf); {
  161. t := tf[i]
  162. if t.overlaps(icmp, umin, umax) {
  163. if umin != nil && icmp.uCompare(t.imin.ukey(), umin) < 0 {
  164. umin = t.imin.ukey()
  165. dst = dst[:0]
  166. i = 0
  167. continue
  168. } else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 {
  169. umax = t.imax.ukey()
  170. // Restart search if it is overlapped.
  171. if overlapped {
  172. dst = dst[:0]
  173. i = 0
  174. continue
  175. }
  176. }
  177. dst = append(dst, t)
  178. }
  179. i++
  180. }
  181. return dst
  182. }
  183. // Returns tables key range.
  184. func (tf tFiles) getRange(icmp *iComparer) (imin, imax internalKey) {
  185. for i, t := range tf {
  186. if i == 0 {
  187. imin, imax = t.imin, t.imax
  188. continue
  189. }
  190. if icmp.Compare(t.imin, imin) < 0 {
  191. imin = t.imin
  192. }
  193. if icmp.Compare(t.imax, imax) > 0 {
  194. imax = t.imax
  195. }
  196. }
  197. return
  198. }
  199. // Creates iterator index from tables.
  200. func (tf tFiles) newIndexIterator(tops *tOps, icmp *iComparer, slice *util.Range, ro *opt.ReadOptions) iterator.IteratorIndexer {
  201. if slice != nil {
  202. var start, limit int
  203. if slice.Start != nil {
  204. start = tf.searchMax(icmp, internalKey(slice.Start))
  205. }
  206. if slice.Limit != nil {
  207. limit = tf.searchMin(icmp, internalKey(slice.Limit))
  208. } else {
  209. limit = tf.Len()
  210. }
  211. tf = tf[start:limit]
  212. }
  213. return iterator.NewArrayIndexer(&tFilesArrayIndexer{
  214. tFiles: tf,
  215. tops: tops,
  216. icmp: icmp,
  217. slice: slice,
  218. ro: ro,
  219. })
  220. }
  221. // Tables iterator index.
  222. type tFilesArrayIndexer struct {
  223. tFiles
  224. tops *tOps
  225. icmp *iComparer
  226. slice *util.Range
  227. ro *opt.ReadOptions
  228. }
  229. func (a *tFilesArrayIndexer) Search(key []byte) int {
  230. return a.searchMax(a.icmp, internalKey(key))
  231. }
  232. func (a *tFilesArrayIndexer) Get(i int) iterator.Iterator {
  233. if i == 0 || i == a.Len()-1 {
  234. return a.tops.newIterator(a.tFiles[i], a.slice, a.ro)
  235. }
  236. return a.tops.newIterator(a.tFiles[i], nil, a.ro)
  237. }
  238. // Helper type for sortByKey.
  239. type tFilesSortByKey struct {
  240. tFiles
  241. icmp *iComparer
  242. }
  243. func (x *tFilesSortByKey) Less(i, j int) bool {
  244. return x.lessByKey(x.icmp, i, j)
  245. }
  246. // Helper type for sortByNum.
  247. type tFilesSortByNum struct {
  248. tFiles
  249. }
  250. func (x *tFilesSortByNum) Less(i, j int) bool {
  251. return x.lessByNum(i, j)
  252. }
  253. // Table operations.
  254. type tOps struct {
  255. s *session
  256. noSync bool
  257. evictRemoved bool
  258. cache *cache.Cache
  259. bcache *cache.Cache
  260. bpool *util.BufferPool
  261. }
  262. // Creates an empty table and returns table writer.
  263. func (t *tOps) create() (*tWriter, error) {
  264. fd := storage.FileDesc{Type: storage.TypeTable, Num: t.s.allocFileNum()}
  265. fw, err := t.s.stor.Create(fd)
  266. if err != nil {
  267. return nil, err
  268. }
  269. return &tWriter{
  270. t: t,
  271. fd: fd,
  272. w: fw,
  273. tw: table.NewWriter(fw, t.s.o.Options),
  274. }, nil
  275. }
  276. // Builds table from src iterator.
  277. func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
  278. w, err := t.create()
  279. if err != nil {
  280. return
  281. }
  282. defer func() {
  283. if err != nil {
  284. w.drop()
  285. }
  286. }()
  287. for src.Next() {
  288. err = w.append(src.Key(), src.Value())
  289. if err != nil {
  290. return
  291. }
  292. }
  293. err = src.Error()
  294. if err != nil {
  295. return
  296. }
  297. n = w.tw.EntriesLen()
  298. f, err = w.finish()
  299. return
  300. }
  301. // Opens table. It returns a cache handle, which should
  302. // be released after use.
  303. func (t *tOps) open(f *tFile) (ch *cache.Handle, err error) {
  304. ch = t.cache.Get(0, uint64(f.fd.Num), func() (size int, value cache.Value) {
  305. var r storage.Reader
  306. r, err = t.s.stor.Open(f.fd)
  307. if err != nil {
  308. return 0, nil
  309. }
  310. var bcache *cache.NamespaceGetter
  311. if t.bcache != nil {
  312. bcache = &cache.NamespaceGetter{Cache: t.bcache, NS: uint64(f.fd.Num)}
  313. }
  314. var tr *table.Reader
  315. tr, err = table.NewReader(r, f.size, f.fd, bcache, t.bpool, t.s.o.Options)
  316. if err != nil {
  317. r.Close()
  318. return 0, nil
  319. }
  320. return 1, tr
  321. })
  322. if ch == nil && err == nil {
  323. err = ErrClosed
  324. }
  325. return
  326. }
  327. // Finds key/value pair whose key is greater than or equal to the
  328. // given key.
  329. func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []byte, err error) {
  330. ch, err := t.open(f)
  331. if err != nil {
  332. return nil, nil, err
  333. }
  334. defer ch.Release()
  335. return ch.Value().(*table.Reader).Find(key, true, ro)
  336. }
  337. // Finds key that is greater than or equal to the given key.
  338. func (t *tOps) findKey(f *tFile, key []byte, ro *opt.ReadOptions) (rkey []byte, err error) {
  339. ch, err := t.open(f)
  340. if err != nil {
  341. return nil, err
  342. }
  343. defer ch.Release()
  344. return ch.Value().(*table.Reader).FindKey(key, true, ro)
  345. }
  346. // Returns approximate offset of the given key.
  347. func (t *tOps) offsetOf(f *tFile, key []byte) (offset int64, err error) {
  348. ch, err := t.open(f)
  349. if err != nil {
  350. return
  351. }
  352. defer ch.Release()
  353. return ch.Value().(*table.Reader).OffsetOf(key)
  354. }
  355. // Creates an iterator from the given table.
  356. func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
  357. ch, err := t.open(f)
  358. if err != nil {
  359. return iterator.NewEmptyIterator(err)
  360. }
  361. iter := ch.Value().(*table.Reader).NewIterator(slice, ro)
  362. iter.SetReleaser(ch)
  363. return iter
  364. }
  365. // Removes table from persistent storage. It waits until
  366. // no one use the the table.
  367. func (t *tOps) remove(f *tFile) {
  368. t.cache.Delete(0, uint64(f.fd.Num), func() {
  369. if err := t.s.stor.Remove(f.fd); err != nil {
  370. t.s.logf("table@remove removing @%d %q", f.fd.Num, err)
  371. } else {
  372. t.s.logf("table@remove removed @%d", f.fd.Num)
  373. }
  374. if t.evictRemoved && t.bcache != nil {
  375. t.bcache.EvictNS(uint64(f.fd.Num))
  376. }
  377. })
  378. }
  379. // Closes the table ops instance. It will close all tables,
  380. // regadless still used or not.
  381. func (t *tOps) close() {
  382. t.bpool.Close()
  383. t.cache.Close()
  384. if t.bcache != nil {
  385. t.bcache.CloseWeak()
  386. }
  387. }
  388. // Creates new initialized table ops instance.
  389. func newTableOps(s *session) *tOps {
  390. var (
  391. cacher cache.Cacher
  392. bcache *cache.Cache
  393. bpool *util.BufferPool
  394. )
  395. if s.o.GetOpenFilesCacheCapacity() > 0 {
  396. cacher = cache.NewLRU(s.o.GetOpenFilesCacheCapacity())
  397. }
  398. if !s.o.GetDisableBlockCache() {
  399. var bcacher cache.Cacher
  400. if s.o.GetBlockCacheCapacity() > 0 {
  401. bcacher = s.o.GetBlockCacher().New(s.o.GetBlockCacheCapacity())
  402. }
  403. bcache = cache.NewCache(bcacher)
  404. }
  405. if !s.o.GetDisableBufferPool() {
  406. bpool = util.NewBufferPool(s.o.GetBlockSize() + 5)
  407. }
  408. return &tOps{
  409. s: s,
  410. noSync: s.o.GetNoSync(),
  411. evictRemoved: s.o.GetBlockCacheEvictRemoved(),
  412. cache: cache.NewCache(cacher),
  413. bcache: bcache,
  414. bpool: bpool,
  415. }
  416. }
  417. // tWriter wraps the table writer. It keep track of file descriptor
  418. // and added key range.
  419. type tWriter struct {
  420. t *tOps
  421. fd storage.FileDesc
  422. w storage.Writer
  423. tw *table.Writer
  424. first, last []byte
  425. }
  426. // Append key/value pair to the table.
  427. func (w *tWriter) append(key, value []byte) error {
  428. if w.first == nil {
  429. w.first = append([]byte{}, key...)
  430. }
  431. w.last = append(w.last[:0], key...)
  432. return w.tw.Append(key, value)
  433. }
  434. // Returns true if the table is empty.
  435. func (w *tWriter) empty() bool {
  436. return w.first == nil
  437. }
  438. // Closes the storage.Writer.
  439. func (w *tWriter) close() {
  440. if w.w != nil {
  441. w.w.Close()
  442. w.w = nil
  443. }
  444. }
  445. // Finalizes the table and returns table file.
  446. func (w *tWriter) finish() (f *tFile, err error) {
  447. defer w.close()
  448. err = w.tw.Close()
  449. if err != nil {
  450. return
  451. }
  452. if !w.t.noSync {
  453. err = w.w.Sync()
  454. if err != nil {
  455. return
  456. }
  457. }
  458. f = newTableFile(w.fd, int64(w.tw.BytesLen()), internalKey(w.first), internalKey(w.last))
  459. return
  460. }
  461. // Drops the table.
  462. func (w *tWriter) drop() {
  463. w.close()
  464. w.t.s.stor.Remove(w.fd)
  465. w.t.s.reuseFileNum(w.fd.Num)
  466. w.tw = nil
  467. w.first = nil
  468. w.last = nil
  469. }
上海开阖软件有限公司 沪ICP备12045867号-1