|
- // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
- // All rights reserved.
- //
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file.
-
- package leveldb
-
- import (
- "fmt"
- "sync/atomic"
- "unsafe"
-
- "github.com/syndtr/goleveldb/leveldb/iterator"
- "github.com/syndtr/goleveldb/leveldb/opt"
- "github.com/syndtr/goleveldb/leveldb/util"
- )
-
- type tSet struct {
- level int
- table *tFile
- }
-
- type version struct {
- s *session
-
- levels []tFiles
-
- // Level that should be compacted next and its compaction score.
- // Score < 1 means compaction is not strictly needed. These fields
- // are initialized by computeCompaction()
- cLevel int
- cScore float64
-
- cSeek unsafe.Pointer
-
- closing bool
- ref int
- released bool
- }
-
- func newVersion(s *session) *version {
- return &version{s: s}
- }
-
- func (v *version) incref() {
- if v.released {
- panic("already released")
- }
-
- v.ref++
- if v.ref == 1 {
- // Incr file ref.
- for _, tt := range v.levels {
- for _, t := range tt {
- v.s.addFileRef(t.fd, 1)
- }
- }
- }
- }
-
- func (v *version) releaseNB() {
- v.ref--
- if v.ref > 0 {
- return
- } else if v.ref < 0 {
- panic("negative version ref")
- }
-
- for _, tt := range v.levels {
- for _, t := range tt {
- if v.s.addFileRef(t.fd, -1) == 0 {
- v.s.tops.remove(t)
- }
- }
- }
-
- v.released = true
- }
-
- func (v *version) release() {
- v.s.vmu.Lock()
- v.releaseNB()
- v.s.vmu.Unlock()
- }
-
- func (v *version) walkOverlapping(aux tFiles, ikey internalKey, f func(level int, t *tFile) bool, lf func(level int) bool) {
- ukey := ikey.ukey()
-
- // Aux level.
- if aux != nil {
- for _, t := range aux {
- if t.overlaps(v.s.icmp, ukey, ukey) {
- if !f(-1, t) {
- return
- }
- }
- }
-
- if lf != nil && !lf(-1) {
- return
- }
- }
-
- // Walk tables level-by-level.
- for level, tables := range v.levels {
- if len(tables) == 0 {
- continue
- }
-
- if level == 0 {
- // Level-0 files may overlap each other. Find all files that
- // overlap ukey.
- for _, t := range tables {
- if t.overlaps(v.s.icmp, ukey, ukey) {
- if !f(level, t) {
- return
- }
- }
- }
- } else {
- if i := tables.searchMax(v.s.icmp, ikey); i < len(tables) {
- t := tables[i]
- if v.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
- if !f(level, t) {
- return
- }
- }
- }
- }
-
- if lf != nil && !lf(level) {
- return
- }
- }
- }
-
- func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue bool) (value []byte, tcomp bool, err error) {
- if v.closing {
- return nil, false, ErrClosed
- }
-
- ukey := ikey.ukey()
-
- var (
- tset *tSet
- tseek bool
-
- // Level-0.
- zfound bool
- zseq uint64
- zkt keyType
- zval []byte
- )
-
- err = ErrNotFound
-
- // Since entries never hop across level, finding key/value
- // in smaller level make later levels irrelevant.
- v.walkOverlapping(aux, ikey, func(level int, t *tFile) bool {
- if level >= 0 && !tseek {
- if tset == nil {
- tset = &tSet{level, t}
- } else {
- tseek = true
- }
- }
-
- var (
- fikey, fval []byte
- ferr error
- )
- if noValue {
- fikey, ferr = v.s.tops.findKey(t, ikey, ro)
- } else {
- fikey, fval, ferr = v.s.tops.find(t, ikey, ro)
- }
-
- switch ferr {
- case nil:
- case ErrNotFound:
- return true
- default:
- err = ferr
- return false
- }
-
- if fukey, fseq, fkt, fkerr := parseInternalKey(fikey); fkerr == nil {
- if v.s.icmp.uCompare(ukey, fukey) == 0 {
- // Level <= 0 may overlaps each-other.
- if level <= 0 {
- if fseq >= zseq {
- zfound = true
- zseq = fseq
- zkt = fkt
- zval = fval
- }
- } else {
- switch fkt {
- case keyTypeVal:
- value = fval
- err = nil
- case keyTypeDel:
- default:
- panic("leveldb: invalid internalKey type")
- }
- return false
- }
- }
- } else {
- err = fkerr
- return false
- }
-
- return true
- }, func(level int) bool {
- if zfound {
- switch zkt {
- case keyTypeVal:
- value = zval
- err = nil
- case keyTypeDel:
- default:
- panic("leveldb: invalid internalKey type")
- }
- return false
- }
-
- return true
- })
-
- if tseek && tset.table.consumeSeek() <= 0 {
- tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
- }
-
- return
- }
-
- func (v *version) sampleSeek(ikey internalKey) (tcomp bool) {
- var tset *tSet
-
- v.walkOverlapping(nil, ikey, func(level int, t *tFile) bool {
- if tset == nil {
- tset = &tSet{level, t}
- return true
- }
- if tset.table.consumeSeek() <= 0 {
- tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
- }
- return false
- }, nil)
-
- return
- }
-
- func (v *version) getIterators(slice *util.Range, ro *opt.ReadOptions) (its []iterator.Iterator) {
- strict := opt.GetStrict(v.s.o.Options, ro, opt.StrictReader)
- for level, tables := range v.levels {
- if level == 0 {
- // Merge all level zero files together since they may overlap.
- for _, t := range tables {
- its = append(its, v.s.tops.newIterator(t, slice, ro))
- }
- } else if len(tables) != 0 {
- its = append(its, iterator.NewIndexedIterator(tables.newIndexIterator(v.s.tops, v.s.icmp, slice, ro), strict))
- }
- }
- return
- }
-
- func (v *version) newStaging() *versionStaging {
- return &versionStaging{base: v}
- }
-
- // Spawn a new version based on this version.
- func (v *version) spawn(r *sessionRecord) *version {
- staging := v.newStaging()
- staging.commit(r)
- return staging.finish()
- }
-
- func (v *version) fillRecord(r *sessionRecord) {
- for level, tables := range v.levels {
- for _, t := range tables {
- r.addTableFile(level, t)
- }
- }
- }
-
- func (v *version) tLen(level int) int {
- if level < len(v.levels) {
- return len(v.levels[level])
- }
- return 0
- }
-
- func (v *version) offsetOf(ikey internalKey) (n int64, err error) {
- for level, tables := range v.levels {
- for _, t := range tables {
- if v.s.icmp.Compare(t.imax, ikey) <= 0 {
- // Entire file is before "ikey", so just add the file size
- n += t.size
- } else if v.s.icmp.Compare(t.imin, ikey) > 0 {
- // Entire file is after "ikey", so ignore
- if level > 0 {
- // Files other than level 0 are sorted by meta->min, so
- // no further files in this level will contain data for
- // "ikey".
- break
- }
- } else {
- // "ikey" falls in the range for this table. Add the
- // approximate offset of "ikey" within the table.
- if m, err := v.s.tops.offsetOf(t, ikey); err == nil {
- n += m
- } else {
- return 0, err
- }
- }
- }
- }
-
- return
- }
-
- func (v *version) pickMemdbLevel(umin, umax []byte, maxLevel int) (level int) {
- if maxLevel > 0 {
- if len(v.levels) == 0 {
- return maxLevel
- }
- if !v.levels[0].overlaps(v.s.icmp, umin, umax, true) {
- var overlaps tFiles
- for ; level < maxLevel; level++ {
- if pLevel := level + 1; pLevel >= len(v.levels) {
- return maxLevel
- } else if v.levels[pLevel].overlaps(v.s.icmp, umin, umax, false) {
- break
- }
- if gpLevel := level + 2; gpLevel < len(v.levels) {
- overlaps = v.levels[gpLevel].getOverlaps(overlaps, v.s.icmp, umin, umax, false)
- if overlaps.size() > int64(v.s.o.GetCompactionGPOverlaps(level)) {
- break
- }
- }
- }
- }
- }
- return
- }
-
- func (v *version) computeCompaction() {
- // Precomputed best level for next compaction
- bestLevel := int(-1)
- bestScore := float64(-1)
-
- statFiles := make([]int, len(v.levels))
- statSizes := make([]string, len(v.levels))
- statScore := make([]string, len(v.levels))
- statTotSize := int64(0)
-
- for level, tables := range v.levels {
- var score float64
- size := tables.size()
- if level == 0 {
- // We treat level-0 specially by bounding the number of files
- // instead of number of bytes for two reasons:
- //
- // (1) With larger write-buffer sizes, it is nice not to do too
- // many level-0 compaction.
- //
- // (2) The files in level-0 are merged on every read and
- // therefore we wish to avoid too many files when the individual
- // file size is small (perhaps because of a small write-buffer
- // setting, or very high compression ratios, or lots of
- // overwrites/deletions).
- score = float64(len(tables)) / float64(v.s.o.GetCompactionL0Trigger())
- } else {
- score = float64(size) / float64(v.s.o.GetCompactionTotalSize(level))
- }
-
- if score > bestScore {
- bestLevel = level
- bestScore = score
- }
-
- statFiles[level] = len(tables)
- statSizes[level] = shortenb(int(size))
- statScore[level] = fmt.Sprintf("%.2f", score)
- statTotSize += size
- }
-
- v.cLevel = bestLevel
- v.cScore = bestScore
-
- v.s.logf("version@stat F·%v S·%s%v Sc·%v", statFiles, shortenb(int(statTotSize)), statSizes, statScore)
- }
-
- func (v *version) needCompaction() bool {
- return v.cScore >= 1 || atomic.LoadPointer(&v.cSeek) != nil
- }
-
- type tablesScratch struct {
- added map[int64]atRecord
- deleted map[int64]struct{}
- }
-
- type versionStaging struct {
- base *version
- levels []tablesScratch
- }
-
- func (p *versionStaging) getScratch(level int) *tablesScratch {
- if level >= len(p.levels) {
- newLevels := make([]tablesScratch, level+1)
- copy(newLevels, p.levels)
- p.levels = newLevels
- }
- return &(p.levels[level])
- }
-
- func (p *versionStaging) commit(r *sessionRecord) {
- // Deleted tables.
- for _, r := range r.deletedTables {
- scratch := p.getScratch(r.level)
- if r.level < len(p.base.levels) && len(p.base.levels[r.level]) > 0 {
- if scratch.deleted == nil {
- scratch.deleted = make(map[int64]struct{})
- }
- scratch.deleted[r.num] = struct{}{}
- }
- if scratch.added != nil {
- delete(scratch.added, r.num)
- }
- }
-
- // New tables.
- for _, r := range r.addedTables {
- scratch := p.getScratch(r.level)
- if scratch.added == nil {
- scratch.added = make(map[int64]atRecord)
- }
- scratch.added[r.num] = r
- if scratch.deleted != nil {
- delete(scratch.deleted, r.num)
- }
- }
- }
-
- func (p *versionStaging) finish() *version {
- // Build new version.
- nv := newVersion(p.base.s)
- numLevel := len(p.levels)
- if len(p.base.levels) > numLevel {
- numLevel = len(p.base.levels)
- }
- nv.levels = make([]tFiles, numLevel)
- for level := 0; level < numLevel; level++ {
- var baseTabels tFiles
- if level < len(p.base.levels) {
- baseTabels = p.base.levels[level]
- }
-
- if level < len(p.levels) {
- scratch := p.levels[level]
-
- var nt tFiles
- // Prealloc list if possible.
- if n := len(baseTabels) + len(scratch.added) - len(scratch.deleted); n > 0 {
- nt = make(tFiles, 0, n)
- }
-
- // Base tables.
- for _, t := range baseTabels {
- if _, ok := scratch.deleted[t.fd.Num]; ok {
- continue
- }
- if _, ok := scratch.added[t.fd.Num]; ok {
- continue
- }
- nt = append(nt, t)
- }
-
- // New tables.
- for _, r := range scratch.added {
- nt = append(nt, tableFileFromRecord(r))
- }
-
- if len(nt) != 0 {
- // Sort tables.
- if level == 0 {
- nt.sortByNum()
- } else {
- nt.sortByKey(p.base.s.icmp)
- }
-
- nv.levels[level] = nt
- }
- } else {
- nv.levels[level] = baseTabels
- }
- }
-
- // Trim levels.
- n := len(nv.levels)
- for ; n > 0 && nv.levels[n-1] == nil; n-- {
- }
- nv.levels = nv.levels[:n]
-
- // Compute compaction score for new version.
- nv.computeCompaction()
-
- return nv
- }
-
- type versionReleaser struct {
- v *version
- once bool
- }
-
- func (vr *versionReleaser) Release() {
- v := vr.v
- v.s.vmu.Lock()
- if !vr.once {
- v.releaseNB()
- vr.once = true
- }
- v.s.vmu.Unlock()
- }
|