本站源代码
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

529 lines
11KB

  1. // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
  2. // All rights reserved.
  3. //
  4. // Use of this source code is governed by a BSD-style license that can be
  5. // found in the LICENSE file.
  6. package leveldb
  7. import (
  8. "fmt"
  9. "sync/atomic"
  10. "unsafe"
  11. "github.com/syndtr/goleveldb/leveldb/iterator"
  12. "github.com/syndtr/goleveldb/leveldb/opt"
  13. "github.com/syndtr/goleveldb/leveldb/util"
  14. )
  15. type tSet struct {
  16. level int
  17. table *tFile
  18. }
  19. type version struct {
  20. s *session
  21. levels []tFiles
  22. // Level that should be compacted next and its compaction score.
  23. // Score < 1 means compaction is not strictly needed. These fields
  24. // are initialized by computeCompaction()
  25. cLevel int
  26. cScore float64
  27. cSeek unsafe.Pointer
  28. closing bool
  29. ref int
  30. released bool
  31. }
  32. func newVersion(s *session) *version {
  33. return &version{s: s}
  34. }
  35. func (v *version) incref() {
  36. if v.released {
  37. panic("already released")
  38. }
  39. v.ref++
  40. if v.ref == 1 {
  41. // Incr file ref.
  42. for _, tt := range v.levels {
  43. for _, t := range tt {
  44. v.s.addFileRef(t.fd, 1)
  45. }
  46. }
  47. }
  48. }
  49. func (v *version) releaseNB() {
  50. v.ref--
  51. if v.ref > 0 {
  52. return
  53. } else if v.ref < 0 {
  54. panic("negative version ref")
  55. }
  56. for _, tt := range v.levels {
  57. for _, t := range tt {
  58. if v.s.addFileRef(t.fd, -1) == 0 {
  59. v.s.tops.remove(t)
  60. }
  61. }
  62. }
  63. v.released = true
  64. }
  65. func (v *version) release() {
  66. v.s.vmu.Lock()
  67. v.releaseNB()
  68. v.s.vmu.Unlock()
  69. }
  70. func (v *version) walkOverlapping(aux tFiles, ikey internalKey, f func(level int, t *tFile) bool, lf func(level int) bool) {
  71. ukey := ikey.ukey()
  72. // Aux level.
  73. if aux != nil {
  74. for _, t := range aux {
  75. if t.overlaps(v.s.icmp, ukey, ukey) {
  76. if !f(-1, t) {
  77. return
  78. }
  79. }
  80. }
  81. if lf != nil && !lf(-1) {
  82. return
  83. }
  84. }
  85. // Walk tables level-by-level.
  86. for level, tables := range v.levels {
  87. if len(tables) == 0 {
  88. continue
  89. }
  90. if level == 0 {
  91. // Level-0 files may overlap each other. Find all files that
  92. // overlap ukey.
  93. for _, t := range tables {
  94. if t.overlaps(v.s.icmp, ukey, ukey) {
  95. if !f(level, t) {
  96. return
  97. }
  98. }
  99. }
  100. } else {
  101. if i := tables.searchMax(v.s.icmp, ikey); i < len(tables) {
  102. t := tables[i]
  103. if v.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
  104. if !f(level, t) {
  105. return
  106. }
  107. }
  108. }
  109. }
  110. if lf != nil && !lf(level) {
  111. return
  112. }
  113. }
  114. }
  115. func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue bool) (value []byte, tcomp bool, err error) {
  116. if v.closing {
  117. return nil, false, ErrClosed
  118. }
  119. ukey := ikey.ukey()
  120. var (
  121. tset *tSet
  122. tseek bool
  123. // Level-0.
  124. zfound bool
  125. zseq uint64
  126. zkt keyType
  127. zval []byte
  128. )
  129. err = ErrNotFound
  130. // Since entries never hop across level, finding key/value
  131. // in smaller level make later levels irrelevant.
  132. v.walkOverlapping(aux, ikey, func(level int, t *tFile) bool {
  133. if level >= 0 && !tseek {
  134. if tset == nil {
  135. tset = &tSet{level, t}
  136. } else {
  137. tseek = true
  138. }
  139. }
  140. var (
  141. fikey, fval []byte
  142. ferr error
  143. )
  144. if noValue {
  145. fikey, ferr = v.s.tops.findKey(t, ikey, ro)
  146. } else {
  147. fikey, fval, ferr = v.s.tops.find(t, ikey, ro)
  148. }
  149. switch ferr {
  150. case nil:
  151. case ErrNotFound:
  152. return true
  153. default:
  154. err = ferr
  155. return false
  156. }
  157. if fukey, fseq, fkt, fkerr := parseInternalKey(fikey); fkerr == nil {
  158. if v.s.icmp.uCompare(ukey, fukey) == 0 {
  159. // Level <= 0 may overlaps each-other.
  160. if level <= 0 {
  161. if fseq >= zseq {
  162. zfound = true
  163. zseq = fseq
  164. zkt = fkt
  165. zval = fval
  166. }
  167. } else {
  168. switch fkt {
  169. case keyTypeVal:
  170. value = fval
  171. err = nil
  172. case keyTypeDel:
  173. default:
  174. panic("leveldb: invalid internalKey type")
  175. }
  176. return false
  177. }
  178. }
  179. } else {
  180. err = fkerr
  181. return false
  182. }
  183. return true
  184. }, func(level int) bool {
  185. if zfound {
  186. switch zkt {
  187. case keyTypeVal:
  188. value = zval
  189. err = nil
  190. case keyTypeDel:
  191. default:
  192. panic("leveldb: invalid internalKey type")
  193. }
  194. return false
  195. }
  196. return true
  197. })
  198. if tseek && tset.table.consumeSeek() <= 0 {
  199. tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
  200. }
  201. return
  202. }
  203. func (v *version) sampleSeek(ikey internalKey) (tcomp bool) {
  204. var tset *tSet
  205. v.walkOverlapping(nil, ikey, func(level int, t *tFile) bool {
  206. if tset == nil {
  207. tset = &tSet{level, t}
  208. return true
  209. }
  210. if tset.table.consumeSeek() <= 0 {
  211. tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
  212. }
  213. return false
  214. }, nil)
  215. return
  216. }
  217. func (v *version) getIterators(slice *util.Range, ro *opt.ReadOptions) (its []iterator.Iterator) {
  218. strict := opt.GetStrict(v.s.o.Options, ro, opt.StrictReader)
  219. for level, tables := range v.levels {
  220. if level == 0 {
  221. // Merge all level zero files together since they may overlap.
  222. for _, t := range tables {
  223. its = append(its, v.s.tops.newIterator(t, slice, ro))
  224. }
  225. } else if len(tables) != 0 {
  226. its = append(its, iterator.NewIndexedIterator(tables.newIndexIterator(v.s.tops, v.s.icmp, slice, ro), strict))
  227. }
  228. }
  229. return
  230. }
  231. func (v *version) newStaging() *versionStaging {
  232. return &versionStaging{base: v}
  233. }
  234. // Spawn a new version based on this version.
  235. func (v *version) spawn(r *sessionRecord) *version {
  236. staging := v.newStaging()
  237. staging.commit(r)
  238. return staging.finish()
  239. }
  240. func (v *version) fillRecord(r *sessionRecord) {
  241. for level, tables := range v.levels {
  242. for _, t := range tables {
  243. r.addTableFile(level, t)
  244. }
  245. }
  246. }
  247. func (v *version) tLen(level int) int {
  248. if level < len(v.levels) {
  249. return len(v.levels[level])
  250. }
  251. return 0
  252. }
  253. func (v *version) offsetOf(ikey internalKey) (n int64, err error) {
  254. for level, tables := range v.levels {
  255. for _, t := range tables {
  256. if v.s.icmp.Compare(t.imax, ikey) <= 0 {
  257. // Entire file is before "ikey", so just add the file size
  258. n += t.size
  259. } else if v.s.icmp.Compare(t.imin, ikey) > 0 {
  260. // Entire file is after "ikey", so ignore
  261. if level > 0 {
  262. // Files other than level 0 are sorted by meta->min, so
  263. // no further files in this level will contain data for
  264. // "ikey".
  265. break
  266. }
  267. } else {
  268. // "ikey" falls in the range for this table. Add the
  269. // approximate offset of "ikey" within the table.
  270. if m, err := v.s.tops.offsetOf(t, ikey); err == nil {
  271. n += m
  272. } else {
  273. return 0, err
  274. }
  275. }
  276. }
  277. }
  278. return
  279. }
  280. func (v *version) pickMemdbLevel(umin, umax []byte, maxLevel int) (level int) {
  281. if maxLevel > 0 {
  282. if len(v.levels) == 0 {
  283. return maxLevel
  284. }
  285. if !v.levels[0].overlaps(v.s.icmp, umin, umax, true) {
  286. var overlaps tFiles
  287. for ; level < maxLevel; level++ {
  288. if pLevel := level + 1; pLevel >= len(v.levels) {
  289. return maxLevel
  290. } else if v.levels[pLevel].overlaps(v.s.icmp, umin, umax, false) {
  291. break
  292. }
  293. if gpLevel := level + 2; gpLevel < len(v.levels) {
  294. overlaps = v.levels[gpLevel].getOverlaps(overlaps, v.s.icmp, umin, umax, false)
  295. if overlaps.size() > int64(v.s.o.GetCompactionGPOverlaps(level)) {
  296. break
  297. }
  298. }
  299. }
  300. }
  301. }
  302. return
  303. }
  304. func (v *version) computeCompaction() {
  305. // Precomputed best level for next compaction
  306. bestLevel := int(-1)
  307. bestScore := float64(-1)
  308. statFiles := make([]int, len(v.levels))
  309. statSizes := make([]string, len(v.levels))
  310. statScore := make([]string, len(v.levels))
  311. statTotSize := int64(0)
  312. for level, tables := range v.levels {
  313. var score float64
  314. size := tables.size()
  315. if level == 0 {
  316. // We treat level-0 specially by bounding the number of files
  317. // instead of number of bytes for two reasons:
  318. //
  319. // (1) With larger write-buffer sizes, it is nice not to do too
  320. // many level-0 compaction.
  321. //
  322. // (2) The files in level-0 are merged on every read and
  323. // therefore we wish to avoid too many files when the individual
  324. // file size is small (perhaps because of a small write-buffer
  325. // setting, or very high compression ratios, or lots of
  326. // overwrites/deletions).
  327. score = float64(len(tables)) / float64(v.s.o.GetCompactionL0Trigger())
  328. } else {
  329. score = float64(size) / float64(v.s.o.GetCompactionTotalSize(level))
  330. }
  331. if score > bestScore {
  332. bestLevel = level
  333. bestScore = score
  334. }
  335. statFiles[level] = len(tables)
  336. statSizes[level] = shortenb(int(size))
  337. statScore[level] = fmt.Sprintf("%.2f", score)
  338. statTotSize += size
  339. }
  340. v.cLevel = bestLevel
  341. v.cScore = bestScore
  342. v.s.logf("version@stat F·%v S·%s%v Sc·%v", statFiles, shortenb(int(statTotSize)), statSizes, statScore)
  343. }
  344. func (v *version) needCompaction() bool {
  345. return v.cScore >= 1 || atomic.LoadPointer(&v.cSeek) != nil
  346. }
  347. type tablesScratch struct {
  348. added map[int64]atRecord
  349. deleted map[int64]struct{}
  350. }
  351. type versionStaging struct {
  352. base *version
  353. levels []tablesScratch
  354. }
  355. func (p *versionStaging) getScratch(level int) *tablesScratch {
  356. if level >= len(p.levels) {
  357. newLevels := make([]tablesScratch, level+1)
  358. copy(newLevels, p.levels)
  359. p.levels = newLevels
  360. }
  361. return &(p.levels[level])
  362. }
  363. func (p *versionStaging) commit(r *sessionRecord) {
  364. // Deleted tables.
  365. for _, r := range r.deletedTables {
  366. scratch := p.getScratch(r.level)
  367. if r.level < len(p.base.levels) && len(p.base.levels[r.level]) > 0 {
  368. if scratch.deleted == nil {
  369. scratch.deleted = make(map[int64]struct{})
  370. }
  371. scratch.deleted[r.num] = struct{}{}
  372. }
  373. if scratch.added != nil {
  374. delete(scratch.added, r.num)
  375. }
  376. }
  377. // New tables.
  378. for _, r := range r.addedTables {
  379. scratch := p.getScratch(r.level)
  380. if scratch.added == nil {
  381. scratch.added = make(map[int64]atRecord)
  382. }
  383. scratch.added[r.num] = r
  384. if scratch.deleted != nil {
  385. delete(scratch.deleted, r.num)
  386. }
  387. }
  388. }
  389. func (p *versionStaging) finish() *version {
  390. // Build new version.
  391. nv := newVersion(p.base.s)
  392. numLevel := len(p.levels)
  393. if len(p.base.levels) > numLevel {
  394. numLevel = len(p.base.levels)
  395. }
  396. nv.levels = make([]tFiles, numLevel)
  397. for level := 0; level < numLevel; level++ {
  398. var baseTabels tFiles
  399. if level < len(p.base.levels) {
  400. baseTabels = p.base.levels[level]
  401. }
  402. if level < len(p.levels) {
  403. scratch := p.levels[level]
  404. var nt tFiles
  405. // Prealloc list if possible.
  406. if n := len(baseTabels) + len(scratch.added) - len(scratch.deleted); n > 0 {
  407. nt = make(tFiles, 0, n)
  408. }
  409. // Base tables.
  410. for _, t := range baseTabels {
  411. if _, ok := scratch.deleted[t.fd.Num]; ok {
  412. continue
  413. }
  414. if _, ok := scratch.added[t.fd.Num]; ok {
  415. continue
  416. }
  417. nt = append(nt, t)
  418. }
  419. // New tables.
  420. for _, r := range scratch.added {
  421. nt = append(nt, tableFileFromRecord(r))
  422. }
  423. if len(nt) != 0 {
  424. // Sort tables.
  425. if level == 0 {
  426. nt.sortByNum()
  427. } else {
  428. nt.sortByKey(p.base.s.icmp)
  429. }
  430. nv.levels[level] = nt
  431. }
  432. } else {
  433. nv.levels[level] = baseTabels
  434. }
  435. }
  436. // Trim levels.
  437. n := len(nv.levels)
  438. for ; n > 0 && nv.levels[n-1] == nil; n-- {
  439. }
  440. nv.levels = nv.levels[:n]
  441. // Compute compaction score for new version.
  442. nv.computeCompaction()
  443. return nv
  444. }
  445. type versionReleaser struct {
  446. v *version
  447. once bool
  448. }
  449. func (vr *versionReleaser) Release() {
  450. v := vr.v
  451. v.s.vmu.Lock()
  452. if !vr.once {
  453. v.releaseNB()
  454. vr.once = true
  455. }
  456. v.s.vmu.Unlock()
  457. }
上海开阖软件有限公司 沪ICP备12045867号-1