|
- // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
- // All rights reserved.
- //
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file.
-
- package leveldb
-
- import (
- "sync"
- "time"
-
- "github.com/syndtr/goleveldb/leveldb/errors"
- "github.com/syndtr/goleveldb/leveldb/opt"
- "github.com/syndtr/goleveldb/leveldb/storage"
- )
-
- var (
- errCompactionTransactExiting = errors.New("leveldb: compaction transact exiting")
- )
-
- type cStat struct {
- duration time.Duration
- read int64
- write int64
- }
-
- func (p *cStat) add(n *cStatStaging) {
- p.duration += n.duration
- p.read += n.read
- p.write += n.write
- }
-
- func (p *cStat) get() (duration time.Duration, read, write int64) {
- return p.duration, p.read, p.write
- }
-
- type cStatStaging struct {
- start time.Time
- duration time.Duration
- on bool
- read int64
- write int64
- }
-
- func (p *cStatStaging) startTimer() {
- if !p.on {
- p.start = time.Now()
- p.on = true
- }
- }
-
- func (p *cStatStaging) stopTimer() {
- if p.on {
- p.duration += time.Since(p.start)
- p.on = false
- }
- }
-
- type cStats struct {
- lk sync.Mutex
- stats []cStat
- }
-
- func (p *cStats) addStat(level int, n *cStatStaging) {
- p.lk.Lock()
- if level >= len(p.stats) {
- newStats := make([]cStat, level+1)
- copy(newStats, p.stats)
- p.stats = newStats
- }
- p.stats[level].add(n)
- p.lk.Unlock()
- }
-
- func (p *cStats) getStat(level int) (duration time.Duration, read, write int64) {
- p.lk.Lock()
- defer p.lk.Unlock()
- if level < len(p.stats) {
- return p.stats[level].get()
- }
- return
- }
-
- func (db *DB) compactionError() {
- var err error
- noerr:
- // No error.
- for {
- select {
- case err = <-db.compErrSetC:
- switch {
- case err == nil:
- case err == ErrReadOnly, errors.IsCorrupted(err):
- goto hasperr
- default:
- goto haserr
- }
- case <-db.closeC:
- return
- }
- }
- haserr:
- // Transient error.
- for {
- select {
- case db.compErrC <- err:
- case err = <-db.compErrSetC:
- switch {
- case err == nil:
- goto noerr
- case err == ErrReadOnly, errors.IsCorrupted(err):
- goto hasperr
- default:
- }
- case <-db.closeC:
- return
- }
- }
- hasperr:
- // Persistent error.
- for {
- select {
- case db.compErrC <- err:
- case db.compPerErrC <- err:
- case db.writeLockC <- struct{}{}:
- // Hold write lock, so that write won't pass-through.
- db.compWriteLocking = true
- case <-db.closeC:
- if db.compWriteLocking {
- // We should release the lock or Close will hang.
- <-db.writeLockC
- }
- return
- }
- }
- }
-
- type compactionTransactCounter int
-
- func (cnt *compactionTransactCounter) incr() {
- *cnt++
- }
-
- type compactionTransactInterface interface {
- run(cnt *compactionTransactCounter) error
- revert() error
- }
-
- func (db *DB) compactionTransact(name string, t compactionTransactInterface) {
- defer func() {
- if x := recover(); x != nil {
- if x == errCompactionTransactExiting {
- if err := t.revert(); err != nil {
- db.logf("%s revert error %q", name, err)
- }
- }
- panic(x)
- }
- }()
-
- const (
- backoffMin = 1 * time.Second
- backoffMax = 8 * time.Second
- backoffMul = 2 * time.Second
- )
- var (
- backoff = backoffMin
- backoffT = time.NewTimer(backoff)
- lastCnt = compactionTransactCounter(0)
-
- disableBackoff = db.s.o.GetDisableCompactionBackoff()
- )
- for n := 0; ; n++ {
- // Check whether the DB is closed.
- if db.isClosed() {
- db.logf("%s exiting", name)
- db.compactionExitTransact()
- } else if n > 0 {
- db.logf("%s retrying N·%d", name, n)
- }
-
- // Execute.
- cnt := compactionTransactCounter(0)
- err := t.run(&cnt)
- if err != nil {
- db.logf("%s error I·%d %q", name, cnt, err)
- }
-
- // Set compaction error status.
- select {
- case db.compErrSetC <- err:
- case perr := <-db.compPerErrC:
- if err != nil {
- db.logf("%s exiting (persistent error %q)", name, perr)
- db.compactionExitTransact()
- }
- case <-db.closeC:
- db.logf("%s exiting", name)
- db.compactionExitTransact()
- }
- if err == nil {
- return
- }
- if errors.IsCorrupted(err) {
- db.logf("%s exiting (corruption detected)", name)
- db.compactionExitTransact()
- }
-
- if !disableBackoff {
- // Reset backoff duration if counter is advancing.
- if cnt > lastCnt {
- backoff = backoffMin
- lastCnt = cnt
- }
-
- // Backoff.
- backoffT.Reset(backoff)
- if backoff < backoffMax {
- backoff *= backoffMul
- if backoff > backoffMax {
- backoff = backoffMax
- }
- }
- select {
- case <-backoffT.C:
- case <-db.closeC:
- db.logf("%s exiting", name)
- db.compactionExitTransact()
- }
- }
- }
- }
-
- type compactionTransactFunc struct {
- runFunc func(cnt *compactionTransactCounter) error
- revertFunc func() error
- }
-
- func (t *compactionTransactFunc) run(cnt *compactionTransactCounter) error {
- return t.runFunc(cnt)
- }
-
- func (t *compactionTransactFunc) revert() error {
- if t.revertFunc != nil {
- return t.revertFunc()
- }
- return nil
- }
-
- func (db *DB) compactionTransactFunc(name string, run func(cnt *compactionTransactCounter) error, revert func() error) {
- db.compactionTransact(name, &compactionTransactFunc{run, revert})
- }
-
- func (db *DB) compactionExitTransact() {
- panic(errCompactionTransactExiting)
- }
-
- func (db *DB) compactionCommit(name string, rec *sessionRecord) {
- db.compCommitLk.Lock()
- defer db.compCommitLk.Unlock() // Defer is necessary.
- db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error {
- return db.s.commit(rec)
- }, nil)
- }
-
- func (db *DB) memCompaction() {
- mdb := db.getFrozenMem()
- if mdb == nil {
- return
- }
- defer mdb.decref()
-
- db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(mdb.Size()))
-
- // Don't compact empty memdb.
- if mdb.Len() == 0 {
- db.logf("memdb@flush skipping")
- // drop frozen memdb
- db.dropFrozenMem()
- return
- }
-
- // Pause table compaction.
- resumeC := make(chan struct{})
- select {
- case db.tcompPauseC <- (chan<- struct{})(resumeC):
- case <-db.compPerErrC:
- close(resumeC)
- resumeC = nil
- case <-db.closeC:
- db.compactionExitTransact()
- }
-
- var (
- rec = &sessionRecord{}
- stats = &cStatStaging{}
- flushLevel int
- )
-
- // Generate tables.
- db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) {
- stats.startTimer()
- flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel)
- stats.stopTimer()
- return
- }, func() error {
- for _, r := range rec.addedTables {
- db.logf("memdb@flush revert @%d", r.num)
- if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil {
- return err
- }
- }
- return nil
- })
-
- rec.setJournalNum(db.journalFd.Num)
- rec.setSeqNum(db.frozenSeq)
-
- // Commit.
- stats.startTimer()
- db.compactionCommit("memdb", rec)
- stats.stopTimer()
-
- db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration)
-
- for _, r := range rec.addedTables {
- stats.write += r.size
- }
- db.compStats.addStat(flushLevel, stats)
-
- // Drop frozen memdb.
- db.dropFrozenMem()
-
- // Resume table compaction.
- if resumeC != nil {
- select {
- case <-resumeC:
- close(resumeC)
- case <-db.closeC:
- db.compactionExitTransact()
- }
- }
-
- // Trigger table compaction.
- db.compTrigger(db.tcompCmdC)
- }
-
- type tableCompactionBuilder struct {
- db *DB
- s *session
- c *compaction
- rec *sessionRecord
- stat0, stat1 *cStatStaging
-
- snapHasLastUkey bool
- snapLastUkey []byte
- snapLastSeq uint64
- snapIter int
- snapKerrCnt int
- snapDropCnt int
-
- kerrCnt int
- dropCnt int
-
- minSeq uint64
- strict bool
- tableSize int
-
- tw *tWriter
- }
-
- func (b *tableCompactionBuilder) appendKV(key, value []byte) error {
- // Create new table if not already.
- if b.tw == nil {
- // Check for pause event.
- if b.db != nil {
- select {
- case ch := <-b.db.tcompPauseC:
- b.db.pauseCompaction(ch)
- case <-b.db.closeC:
- b.db.compactionExitTransact()
- default:
- }
- }
-
- // Create new table.
- var err error
- b.tw, err = b.s.tops.create()
- if err != nil {
- return err
- }
- }
-
- // Write key/value into table.
- return b.tw.append(key, value)
- }
-
- func (b *tableCompactionBuilder) needFlush() bool {
- return b.tw.tw.BytesLen() >= b.tableSize
- }
-
- func (b *tableCompactionBuilder) flush() error {
- t, err := b.tw.finish()
- if err != nil {
- return err
- }
- b.rec.addTableFile(b.c.sourceLevel+1, t)
- b.stat1.write += t.size
- b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.fd.Num, b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax)
- b.tw = nil
- return nil
- }
-
- func (b *tableCompactionBuilder) cleanup() {
- if b.tw != nil {
- b.tw.drop()
- b.tw = nil
- }
- }
-
- func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) error {
- snapResumed := b.snapIter > 0
- hasLastUkey := b.snapHasLastUkey // The key might has zero length, so this is necessary.
- lastUkey := append([]byte{}, b.snapLastUkey...)
- lastSeq := b.snapLastSeq
- b.kerrCnt = b.snapKerrCnt
- b.dropCnt = b.snapDropCnt
- // Restore compaction state.
- b.c.restore()
-
- defer b.cleanup()
-
- b.stat1.startTimer()
- defer b.stat1.stopTimer()
-
- iter := b.c.newIterator()
- defer iter.Release()
- for i := 0; iter.Next(); i++ {
- // Incr transact counter.
- cnt.incr()
-
- // Skip until last state.
- if i < b.snapIter {
- continue
- }
-
- resumed := false
- if snapResumed {
- resumed = true
- snapResumed = false
- }
-
- ikey := iter.Key()
- ukey, seq, kt, kerr := parseInternalKey(ikey)
-
- if kerr == nil {
- shouldStop := !resumed && b.c.shouldStopBefore(ikey)
-
- if !hasLastUkey || b.s.icmp.uCompare(lastUkey, ukey) != 0 {
- // First occurrence of this user key.
-
- // Only rotate tables if ukey doesn't hop across.
- if b.tw != nil && (shouldStop || b.needFlush()) {
- if err := b.flush(); err != nil {
- return err
- }
-
- // Creates snapshot of the state.
- b.c.save()
- b.snapHasLastUkey = hasLastUkey
- b.snapLastUkey = append(b.snapLastUkey[:0], lastUkey...)
- b.snapLastSeq = lastSeq
- b.snapIter = i
- b.snapKerrCnt = b.kerrCnt
- b.snapDropCnt = b.dropCnt
- }
-
- hasLastUkey = true
- lastUkey = append(lastUkey[:0], ukey...)
- lastSeq = keyMaxSeq
- }
-
- switch {
- case lastSeq <= b.minSeq:
- // Dropped because newer entry for same user key exist
- fallthrough // (A)
- case kt == keyTypeDel && seq <= b.minSeq && b.c.baseLevelForKey(lastUkey):
- // For this user key:
- // (1) there is no data in higher levels
- // (2) data in lower levels will have larger seq numbers
- // (3) data in layers that are being compacted here and have
- // smaller seq numbers will be dropped in the next
- // few iterations of this loop (by rule (A) above).
- // Therefore this deletion marker is obsolete and can be dropped.
- lastSeq = seq
- b.dropCnt++
- continue
- default:
- lastSeq = seq
- }
- } else {
- if b.strict {
- return kerr
- }
-
- // Don't drop corrupted keys.
- hasLastUkey = false
- lastUkey = lastUkey[:0]
- lastSeq = keyMaxSeq
- b.kerrCnt++
- }
-
- if err := b.appendKV(ikey, iter.Value()); err != nil {
- return err
- }
- }
-
- if err := iter.Error(); err != nil {
- return err
- }
-
- // Finish last table.
- if b.tw != nil && !b.tw.empty() {
- return b.flush()
- }
- return nil
- }
-
- func (b *tableCompactionBuilder) revert() error {
- for _, at := range b.rec.addedTables {
- b.s.logf("table@build revert @%d", at.num)
- if err := b.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil {
- return err
- }
- }
- return nil
- }
-
- func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
- defer c.release()
-
- rec := &sessionRecord{}
- rec.addCompPtr(c.sourceLevel, c.imax)
-
- if !noTrivial && c.trivial() {
- t := c.levels[0][0]
- db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.fd.Num, c.sourceLevel+1)
- rec.delTable(c.sourceLevel, t.fd.Num)
- rec.addTableFile(c.sourceLevel+1, t)
- db.compactionCommit("table-move", rec)
- return
- }
-
- var stats [2]cStatStaging
- for i, tables := range c.levels {
- for _, t := range tables {
- stats[i].read += t.size
- // Insert deleted tables into record
- rec.delTable(c.sourceLevel+i, t.fd.Num)
- }
- }
- sourceSize := int(stats[0].read + stats[1].read)
- minSeq := db.minSeq()
- db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq)
-
- b := &tableCompactionBuilder{
- db: db,
- s: db.s,
- c: c,
- rec: rec,
- stat1: &stats[1],
- minSeq: minSeq,
- strict: db.s.o.GetStrict(opt.StrictCompaction),
- tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1),
- }
- db.compactionTransact("table@build", b)
-
- // Commit.
- stats[1].startTimer()
- db.compactionCommit("table", rec)
- stats[1].stopTimer()
-
- resultSize := int(stats[1].write)
- db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration)
-
- // Save compaction stats
- for i := range stats {
- db.compStats.addStat(c.sourceLevel+1, &stats[i])
- }
- }
-
- func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error {
- db.logf("table@compaction range L%d %q:%q", level, umin, umax)
- if level >= 0 {
- if c := db.s.getCompactionRange(level, umin, umax, true); c != nil {
- db.tableCompaction(c, true)
- }
- } else {
- // Retry until nothing to compact.
- for {
- compacted := false
-
- // Scan for maximum level with overlapped tables.
- v := db.s.version()
- m := 1
- for i := m; i < len(v.levels); i++ {
- tables := v.levels[i]
- if tables.overlaps(db.s.icmp, umin, umax, false) {
- m = i
- }
- }
- v.release()
-
- for level := 0; level < m; level++ {
- if c := db.s.getCompactionRange(level, umin, umax, false); c != nil {
- db.tableCompaction(c, true)
- compacted = true
- }
- }
-
- if !compacted {
- break
- }
- }
- }
-
- return nil
- }
-
- func (db *DB) tableAutoCompaction() {
- if c := db.s.pickCompaction(); c != nil {
- db.tableCompaction(c, false)
- }
- }
-
- func (db *DB) tableNeedCompaction() bool {
- v := db.s.version()
- defer v.release()
- return v.needCompaction()
- }
-
- // resumeWrite returns an indicator whether we should resume write operation if enough level0 files are compacted.
- func (db *DB) resumeWrite() bool {
- v := db.s.version()
- defer v.release()
- if v.tLen(0) < db.s.o.GetWriteL0PauseTrigger() {
- return true
- }
- return false
- }
-
- func (db *DB) pauseCompaction(ch chan<- struct{}) {
- select {
- case ch <- struct{}{}:
- case <-db.closeC:
- db.compactionExitTransact()
- }
- }
-
- type cCmd interface {
- ack(err error)
- }
-
- type cAuto struct {
- // Note for table compaction, an non-empty ackC represents it's a compaction waiting command.
- ackC chan<- error
- }
-
- func (r cAuto) ack(err error) {
- if r.ackC != nil {
- defer func() {
- recover()
- }()
- r.ackC <- err
- }
- }
-
- type cRange struct {
- level int
- min, max []byte
- ackC chan<- error
- }
-
- func (r cRange) ack(err error) {
- if r.ackC != nil {
- defer func() {
- recover()
- }()
- r.ackC <- err
- }
- }
-
- // This will trigger auto compaction but will not wait for it.
- func (db *DB) compTrigger(compC chan<- cCmd) {
- select {
- case compC <- cAuto{}:
- default:
- }
- }
-
- // This will trigger auto compaction and/or wait for all compaction to be done.
- func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) {
- ch := make(chan error)
- defer close(ch)
- // Send cmd.
- select {
- case compC <- cAuto{ch}:
- case err = <-db.compErrC:
- return
- case <-db.closeC:
- return ErrClosed
- }
- // Wait cmd.
- select {
- case err = <-ch:
- case err = <-db.compErrC:
- case <-db.closeC:
- return ErrClosed
- }
- return err
- }
-
- // Send range compaction request.
- func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (err error) {
- ch := make(chan error)
- defer close(ch)
- // Send cmd.
- select {
- case compC <- cRange{level, min, max, ch}:
- case err := <-db.compErrC:
- return err
- case <-db.closeC:
- return ErrClosed
- }
- // Wait cmd.
- select {
- case err = <-ch:
- case err = <-db.compErrC:
- case <-db.closeC:
- return ErrClosed
- }
- return err
- }
-
- func (db *DB) mCompaction() {
- var x cCmd
-
- defer func() {
- if x := recover(); x != nil {
- if x != errCompactionTransactExiting {
- panic(x)
- }
- }
- if x != nil {
- x.ack(ErrClosed)
- }
- db.closeW.Done()
- }()
-
- for {
- select {
- case x = <-db.mcompCmdC:
- switch x.(type) {
- case cAuto:
- db.memCompaction()
- x.ack(nil)
- x = nil
- default:
- panic("leveldb: unknown command")
- }
- case <-db.closeC:
- return
- }
- }
- }
-
- func (db *DB) tCompaction() {
- var (
- x cCmd
- waitQ []cCmd
- )
-
- defer func() {
- if x := recover(); x != nil {
- if x != errCompactionTransactExiting {
- panic(x)
- }
- }
- for i := range waitQ {
- waitQ[i].ack(ErrClosed)
- waitQ[i] = nil
- }
- if x != nil {
- x.ack(ErrClosed)
- }
- db.closeW.Done()
- }()
-
- for {
- if db.tableNeedCompaction() {
- select {
- case x = <-db.tcompCmdC:
- case ch := <-db.tcompPauseC:
- db.pauseCompaction(ch)
- continue
- case <-db.closeC:
- return
- default:
- }
- // Resume write operation as soon as possible.
- if len(waitQ) > 0 && db.resumeWrite() {
- for i := range waitQ {
- waitQ[i].ack(nil)
- waitQ[i] = nil
- }
- waitQ = waitQ[:0]
- }
- } else {
- for i := range waitQ {
- waitQ[i].ack(nil)
- waitQ[i] = nil
- }
- waitQ = waitQ[:0]
- select {
- case x = <-db.tcompCmdC:
- case ch := <-db.tcompPauseC:
- db.pauseCompaction(ch)
- continue
- case <-db.closeC:
- return
- }
- }
- if x != nil {
- switch cmd := x.(type) {
- case cAuto:
- if cmd.ackC != nil {
- // Check the write pause state before caching it.
- if db.resumeWrite() {
- x.ack(nil)
- } else {
- waitQ = append(waitQ, x)
- }
- }
- case cRange:
- x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
- default:
- panic("leveldb: unknown command")
- }
- x = nil
- }
- db.tableAutoCompaction()
- }
- }
|