本站源代码
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

330 lines
8.9KB

  1. // Copyright (c) 2016, Suryandaru Triandana <syndtr@gmail.com>
  2. // All rights reserved.
  3. //
  4. // Use of this source code is governed by a BSD-style license that can be
  5. // found in the LICENSE file.
  6. package leveldb
  7. import (
  8. "errors"
  9. "sync"
  10. "time"
  11. "github.com/syndtr/goleveldb/leveldb/iterator"
  12. "github.com/syndtr/goleveldb/leveldb/opt"
  13. "github.com/syndtr/goleveldb/leveldb/util"
  14. )
  15. var errTransactionDone = errors.New("leveldb: transaction already closed")
  16. // Transaction is the transaction handle.
  17. type Transaction struct {
  18. db *DB
  19. lk sync.RWMutex
  20. seq uint64
  21. mem *memDB
  22. tables tFiles
  23. ikScratch []byte
  24. rec sessionRecord
  25. stats cStatStaging
  26. closed bool
  27. }
  28. // Get gets the value for the given key. It returns ErrNotFound if the
  29. // DB does not contains the key.
  30. //
  31. // The returned slice is its own copy, it is safe to modify the contents
  32. // of the returned slice.
  33. // It is safe to modify the contents of the argument after Get returns.
  34. func (tr *Transaction) Get(key []byte, ro *opt.ReadOptions) ([]byte, error) {
  35. tr.lk.RLock()
  36. defer tr.lk.RUnlock()
  37. if tr.closed {
  38. return nil, errTransactionDone
  39. }
  40. return tr.db.get(tr.mem.DB, tr.tables, key, tr.seq, ro)
  41. }
  42. // Has returns true if the DB does contains the given key.
  43. //
  44. // It is safe to modify the contents of the argument after Has returns.
  45. func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) {
  46. tr.lk.RLock()
  47. defer tr.lk.RUnlock()
  48. if tr.closed {
  49. return false, errTransactionDone
  50. }
  51. return tr.db.has(tr.mem.DB, tr.tables, key, tr.seq, ro)
  52. }
  53. // NewIterator returns an iterator for the latest snapshot of the transaction.
  54. // The returned iterator is not safe for concurrent use, but it is safe to use
  55. // multiple iterators concurrently, with each in a dedicated goroutine.
  56. // It is also safe to use an iterator concurrently while writes to the
  57. // transaction. The resultant key/value pairs are guaranteed to be consistent.
  58. //
  59. // Slice allows slicing the iterator to only contains keys in the given
  60. // range. A nil Range.Start is treated as a key before all keys in the
  61. // DB. And a nil Range.Limit is treated as a key after all keys in
  62. // the DB.
  63. //
  64. // WARNING: Any slice returned by interator (e.g. slice returned by calling
  65. // Iterator.Key() or Iterator.Key() methods), its content should not be modified
  66. // unless noted otherwise.
  67. //
  68. // The iterator must be released after use, by calling Release method.
  69. //
  70. // Also read Iterator documentation of the leveldb/iterator package.
  71. func (tr *Transaction) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
  72. tr.lk.RLock()
  73. defer tr.lk.RUnlock()
  74. if tr.closed {
  75. return iterator.NewEmptyIterator(errTransactionDone)
  76. }
  77. tr.mem.incref()
  78. return tr.db.newIterator(tr.mem, tr.tables, tr.seq, slice, ro)
  79. }
  80. func (tr *Transaction) flush() error {
  81. // Flush memdb.
  82. if tr.mem.Len() != 0 {
  83. tr.stats.startTimer()
  84. iter := tr.mem.NewIterator(nil)
  85. t, n, err := tr.db.s.tops.createFrom(iter)
  86. iter.Release()
  87. tr.stats.stopTimer()
  88. if err != nil {
  89. return err
  90. }
  91. if tr.mem.getref() == 1 {
  92. tr.mem.Reset()
  93. } else {
  94. tr.mem.decref()
  95. tr.mem = tr.db.mpoolGet(0)
  96. tr.mem.incref()
  97. }
  98. tr.tables = append(tr.tables, t)
  99. tr.rec.addTableFile(0, t)
  100. tr.stats.write += t.size
  101. tr.db.logf("transaction@flush created L0@%d N·%d S·%s %q:%q", t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax)
  102. }
  103. return nil
  104. }
  105. func (tr *Transaction) put(kt keyType, key, value []byte) error {
  106. tr.ikScratch = makeInternalKey(tr.ikScratch, key, tr.seq+1, kt)
  107. if tr.mem.Free() < len(tr.ikScratch)+len(value) {
  108. if err := tr.flush(); err != nil {
  109. return err
  110. }
  111. }
  112. if err := tr.mem.Put(tr.ikScratch, value); err != nil {
  113. return err
  114. }
  115. tr.seq++
  116. return nil
  117. }
  118. // Put sets the value for the given key. It overwrites any previous value
  119. // for that key; a DB is not a multi-map.
  120. // Please note that the transaction is not compacted until committed, so if you
  121. // writes 10 same keys, then those 10 same keys are in the transaction.
  122. //
  123. // It is safe to modify the contents of the arguments after Put returns.
  124. func (tr *Transaction) Put(key, value []byte, wo *opt.WriteOptions) error {
  125. tr.lk.Lock()
  126. defer tr.lk.Unlock()
  127. if tr.closed {
  128. return errTransactionDone
  129. }
  130. return tr.put(keyTypeVal, key, value)
  131. }
  132. // Delete deletes the value for the given key.
  133. // Please note that the transaction is not compacted until committed, so if you
  134. // writes 10 same keys, then those 10 same keys are in the transaction.
  135. //
  136. // It is safe to modify the contents of the arguments after Delete returns.
  137. func (tr *Transaction) Delete(key []byte, wo *opt.WriteOptions) error {
  138. tr.lk.Lock()
  139. defer tr.lk.Unlock()
  140. if tr.closed {
  141. return errTransactionDone
  142. }
  143. return tr.put(keyTypeDel, key, nil)
  144. }
  145. // Write apply the given batch to the transaction. The batch will be applied
  146. // sequentially.
  147. // Please note that the transaction is not compacted until committed, so if you
  148. // writes 10 same keys, then those 10 same keys are in the transaction.
  149. //
  150. // It is safe to modify the contents of the arguments after Write returns.
  151. func (tr *Transaction) Write(b *Batch, wo *opt.WriteOptions) error {
  152. if b == nil || b.Len() == 0 {
  153. return nil
  154. }
  155. tr.lk.Lock()
  156. defer tr.lk.Unlock()
  157. if tr.closed {
  158. return errTransactionDone
  159. }
  160. return b.replayInternal(func(i int, kt keyType, k, v []byte) error {
  161. return tr.put(kt, k, v)
  162. })
  163. }
  164. func (tr *Transaction) setDone() {
  165. tr.closed = true
  166. tr.db.tr = nil
  167. tr.mem.decref()
  168. <-tr.db.writeLockC
  169. }
  170. // Commit commits the transaction. If error is not nil, then the transaction is
  171. // not committed, it can then either be retried or discarded.
  172. //
  173. // Other methods should not be called after transaction has been committed.
  174. func (tr *Transaction) Commit() error {
  175. if err := tr.db.ok(); err != nil {
  176. return err
  177. }
  178. tr.lk.Lock()
  179. defer tr.lk.Unlock()
  180. if tr.closed {
  181. return errTransactionDone
  182. }
  183. if err := tr.flush(); err != nil {
  184. // Return error, lets user decide either to retry or discard
  185. // transaction.
  186. return err
  187. }
  188. if len(tr.tables) != 0 {
  189. // Committing transaction.
  190. tr.rec.setSeqNum(tr.seq)
  191. tr.db.compCommitLk.Lock()
  192. tr.stats.startTimer()
  193. var cerr error
  194. for retry := 0; retry < 3; retry++ {
  195. cerr = tr.db.s.commit(&tr.rec)
  196. if cerr != nil {
  197. tr.db.logf("transaction@commit error R·%d %q", retry, cerr)
  198. select {
  199. case <-time.After(time.Second):
  200. case <-tr.db.closeC:
  201. tr.db.logf("transaction@commit exiting")
  202. tr.db.compCommitLk.Unlock()
  203. return cerr
  204. }
  205. } else {
  206. // Success. Set db.seq.
  207. tr.db.setSeq(tr.seq)
  208. break
  209. }
  210. }
  211. tr.stats.stopTimer()
  212. if cerr != nil {
  213. // Return error, lets user decide either to retry or discard
  214. // transaction.
  215. return cerr
  216. }
  217. // Update compaction stats. This is safe as long as we hold compCommitLk.
  218. tr.db.compStats.addStat(0, &tr.stats)
  219. // Trigger table auto-compaction.
  220. tr.db.compTrigger(tr.db.tcompCmdC)
  221. tr.db.compCommitLk.Unlock()
  222. // Additionally, wait compaction when certain threshold reached.
  223. // Ignore error, returns error only if transaction can't be committed.
  224. tr.db.waitCompaction()
  225. }
  226. // Only mark as done if transaction committed successfully.
  227. tr.setDone()
  228. return nil
  229. }
  230. func (tr *Transaction) discard() {
  231. // Discard transaction.
  232. for _, t := range tr.tables {
  233. tr.db.logf("transaction@discard @%d", t.fd.Num)
  234. if err1 := tr.db.s.stor.Remove(t.fd); err1 == nil {
  235. tr.db.s.reuseFileNum(t.fd.Num)
  236. }
  237. }
  238. }
  239. // Discard discards the transaction.
  240. //
  241. // Other methods should not be called after transaction has been discarded.
  242. func (tr *Transaction) Discard() {
  243. tr.lk.Lock()
  244. if !tr.closed {
  245. tr.discard()
  246. tr.setDone()
  247. }
  248. tr.lk.Unlock()
  249. }
  250. func (db *DB) waitCompaction() error {
  251. if db.s.tLen(0) >= db.s.o.GetWriteL0PauseTrigger() {
  252. return db.compTriggerWait(db.tcompCmdC)
  253. }
  254. return nil
  255. }
  256. // OpenTransaction opens an atomic DB transaction. Only one transaction can be
  257. // opened at a time. Subsequent call to Write and OpenTransaction will be blocked
  258. // until in-flight transaction is committed or discarded.
  259. // The returned transaction handle is safe for concurrent use.
  260. //
  261. // Transaction is expensive and can overwhelm compaction, especially if
  262. // transaction size is small. Use with caution.
  263. //
  264. // The transaction must be closed once done, either by committing or discarding
  265. // the transaction.
  266. // Closing the DB will discard open transaction.
  267. func (db *DB) OpenTransaction() (*Transaction, error) {
  268. if err := db.ok(); err != nil {
  269. return nil, err
  270. }
  271. // The write happen synchronously.
  272. select {
  273. case db.writeLockC <- struct{}{}:
  274. case err := <-db.compPerErrC:
  275. return nil, err
  276. case <-db.closeC:
  277. return nil, ErrClosed
  278. }
  279. if db.tr != nil {
  280. panic("leveldb: has open transaction")
  281. }
  282. // Flush current memdb.
  283. if db.mem != nil && db.mem.Len() != 0 {
  284. if _, err := db.rotateMem(0, true); err != nil {
  285. return nil, err
  286. }
  287. }
  288. // Wait compaction when certain threshold reached.
  289. if err := db.waitCompaction(); err != nil {
  290. return nil, err
  291. }
  292. tr := &Transaction{
  293. db: db,
  294. seq: db.seq,
  295. mem: db.mpoolGet(0),
  296. }
  297. tr.mem.incref()
  298. db.tr = tr
  299. return tr, nil
  300. }
上海开阖软件有限公司 沪ICP备12045867号-1