本站源代码
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1079 lines
26KB

  1. // Copyright (c) 2014 Couchbase, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:generate protoc --gofast_out=. upsidedown.proto
  15. package upsidedown
  16. import (
  17. "encoding/binary"
  18. "encoding/json"
  19. "fmt"
  20. "math"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/blevesearch/bleve/analysis"
  25. "github.com/blevesearch/bleve/document"
  26. "github.com/blevesearch/bleve/index"
  27. "github.com/blevesearch/bleve/index/store"
  28. "github.com/blevesearch/bleve/registry"
  29. "github.com/golang/protobuf/proto"
  30. )
  31. const Name = "upside_down"
  32. // RowBufferSize should ideally this is sized to be the smallest
  33. // size that can contain an index row key and its corresponding
  34. // value. It is not a limit, if need be a larger buffer is
  35. // allocated, but performance will be more optimal if *most*
  36. // rows fit this size.
  37. const RowBufferSize = 4 * 1024
  38. var VersionKey = []byte{'v'}
  39. const Version uint8 = 7
  40. var IncompatibleVersion = fmt.Errorf("incompatible version, %d is supported", Version)
  41. type UpsideDownCouch struct {
  42. version uint8
  43. path string
  44. storeName string
  45. storeConfig map[string]interface{}
  46. store store.KVStore
  47. fieldCache *index.FieldCache
  48. analysisQueue *index.AnalysisQueue
  49. stats *indexStat
  50. m sync.RWMutex
  51. // fields protected by m
  52. docCount uint64
  53. writeMutex sync.Mutex
  54. }
  55. type docBackIndexRow struct {
  56. docID string
  57. doc *document.Document // If deletion, doc will be nil.
  58. backIndexRow *BackIndexRow
  59. }
  60. func NewUpsideDownCouch(storeName string, storeConfig map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) {
  61. rv := &UpsideDownCouch{
  62. version: Version,
  63. fieldCache: index.NewFieldCache(),
  64. storeName: storeName,
  65. storeConfig: storeConfig,
  66. analysisQueue: analysisQueue,
  67. }
  68. rv.stats = &indexStat{i: rv}
  69. return rv, nil
  70. }
  71. func (udc *UpsideDownCouch) init(kvwriter store.KVWriter) (err error) {
  72. // version marker
  73. rowsAll := [][]UpsideDownCouchRow{
  74. {NewVersionRow(udc.version)},
  75. }
  76. err = udc.batchRows(kvwriter, nil, rowsAll, nil)
  77. return
  78. }
  79. func (udc *UpsideDownCouch) loadSchema(kvreader store.KVReader) (err error) {
  80. it := kvreader.PrefixIterator([]byte{'f'})
  81. defer func() {
  82. if cerr := it.Close(); err == nil && cerr != nil {
  83. err = cerr
  84. }
  85. }()
  86. key, val, valid := it.Current()
  87. for valid {
  88. var fieldRow *FieldRow
  89. fieldRow, err = NewFieldRowKV(key, val)
  90. if err != nil {
  91. return
  92. }
  93. udc.fieldCache.AddExisting(fieldRow.name, fieldRow.index)
  94. it.Next()
  95. key, val, valid = it.Current()
  96. }
  97. val, err = kvreader.Get([]byte{'v'})
  98. if err != nil {
  99. return
  100. }
  101. var vr *VersionRow
  102. vr, err = NewVersionRowKV([]byte{'v'}, val)
  103. if err != nil {
  104. return
  105. }
  106. if vr.version != Version {
  107. err = IncompatibleVersion
  108. return
  109. }
  110. return
  111. }
  112. var rowBufferPool sync.Pool
  113. func GetRowBuffer() []byte {
  114. if rb, ok := rowBufferPool.Get().([]byte); ok {
  115. return rb
  116. } else {
  117. return make([]byte, RowBufferSize)
  118. }
  119. }
  120. func PutRowBuffer(buf []byte) {
  121. rowBufferPool.Put(buf)
  122. }
  123. func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]UpsideDownCouchRow, updateRowsAll [][]UpsideDownCouchRow, deleteRowsAll [][]UpsideDownCouchRow) (err error) {
  124. dictionaryDeltas := make(map[string]int64)
  125. // count up bytes needed for buffering.
  126. addNum := 0
  127. addKeyBytes := 0
  128. addValBytes := 0
  129. updateNum := 0
  130. updateKeyBytes := 0
  131. updateValBytes := 0
  132. deleteNum := 0
  133. deleteKeyBytes := 0
  134. rowBuf := GetRowBuffer()
  135. for _, addRows := range addRowsAll {
  136. for _, row := range addRows {
  137. tfr, ok := row.(*TermFrequencyRow)
  138. if ok {
  139. if tfr.DictionaryRowKeySize() > len(rowBuf) {
  140. rowBuf = make([]byte, tfr.DictionaryRowKeySize())
  141. }
  142. dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf)
  143. if err != nil {
  144. return err
  145. }
  146. dictionaryDeltas[string(rowBuf[:dictKeySize])] += 1
  147. }
  148. addKeyBytes += row.KeySize()
  149. addValBytes += row.ValueSize()
  150. }
  151. addNum += len(addRows)
  152. }
  153. for _, updateRows := range updateRowsAll {
  154. for _, row := range updateRows {
  155. updateKeyBytes += row.KeySize()
  156. updateValBytes += row.ValueSize()
  157. }
  158. updateNum += len(updateRows)
  159. }
  160. for _, deleteRows := range deleteRowsAll {
  161. for _, row := range deleteRows {
  162. tfr, ok := row.(*TermFrequencyRow)
  163. if ok {
  164. // need to decrement counter
  165. if tfr.DictionaryRowKeySize() > len(rowBuf) {
  166. rowBuf = make([]byte, tfr.DictionaryRowKeySize())
  167. }
  168. dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf)
  169. if err != nil {
  170. return err
  171. }
  172. dictionaryDeltas[string(rowBuf[:dictKeySize])] -= 1
  173. }
  174. deleteKeyBytes += row.KeySize()
  175. }
  176. deleteNum += len(deleteRows)
  177. }
  178. PutRowBuffer(rowBuf)
  179. mergeNum := len(dictionaryDeltas)
  180. mergeKeyBytes := 0
  181. mergeValBytes := mergeNum * DictionaryRowMaxValueSize
  182. for dictRowKey := range dictionaryDeltas {
  183. mergeKeyBytes += len(dictRowKey)
  184. }
  185. // prepare batch
  186. totBytes := addKeyBytes + addValBytes +
  187. updateKeyBytes + updateValBytes +
  188. deleteKeyBytes +
  189. 2*(mergeKeyBytes+mergeValBytes)
  190. buf, wb, err := writer.NewBatchEx(store.KVBatchOptions{
  191. TotalBytes: totBytes,
  192. NumSets: addNum + updateNum,
  193. NumDeletes: deleteNum,
  194. NumMerges: mergeNum,
  195. })
  196. if err != nil {
  197. return err
  198. }
  199. defer func() {
  200. _ = wb.Close()
  201. }()
  202. // fill the batch
  203. for _, addRows := range addRowsAll {
  204. for _, row := range addRows {
  205. keySize, err := row.KeyTo(buf)
  206. if err != nil {
  207. return err
  208. }
  209. valSize, err := row.ValueTo(buf[keySize:])
  210. if err != nil {
  211. return err
  212. }
  213. wb.Set(buf[:keySize], buf[keySize:keySize+valSize])
  214. buf = buf[keySize+valSize:]
  215. }
  216. }
  217. for _, updateRows := range updateRowsAll {
  218. for _, row := range updateRows {
  219. keySize, err := row.KeyTo(buf)
  220. if err != nil {
  221. return err
  222. }
  223. valSize, err := row.ValueTo(buf[keySize:])
  224. if err != nil {
  225. return err
  226. }
  227. wb.Set(buf[:keySize], buf[keySize:keySize+valSize])
  228. buf = buf[keySize+valSize:]
  229. }
  230. }
  231. for _, deleteRows := range deleteRowsAll {
  232. for _, row := range deleteRows {
  233. keySize, err := row.KeyTo(buf)
  234. if err != nil {
  235. return err
  236. }
  237. wb.Delete(buf[:keySize])
  238. buf = buf[keySize:]
  239. }
  240. }
  241. for dictRowKey, delta := range dictionaryDeltas {
  242. dictRowKeyLen := copy(buf, dictRowKey)
  243. binary.LittleEndian.PutUint64(buf[dictRowKeyLen:], uint64(delta))
  244. wb.Merge(buf[:dictRowKeyLen], buf[dictRowKeyLen:dictRowKeyLen+DictionaryRowMaxValueSize])
  245. buf = buf[dictRowKeyLen+DictionaryRowMaxValueSize:]
  246. }
  247. // write out the batch
  248. return writer.ExecuteBatch(wb)
  249. }
  250. func (udc *UpsideDownCouch) Open() (err error) {
  251. // acquire the write mutex for the duration of Open()
  252. udc.writeMutex.Lock()
  253. defer udc.writeMutex.Unlock()
  254. // open the kv store
  255. storeConstructor := registry.KVStoreConstructorByName(udc.storeName)
  256. if storeConstructor == nil {
  257. err = index.ErrorUnknownStorageType
  258. return
  259. }
  260. // now open the store
  261. udc.store, err = storeConstructor(&mergeOperator, udc.storeConfig)
  262. if err != nil {
  263. return
  264. }
  265. // start a reader to look at the index
  266. var kvreader store.KVReader
  267. kvreader, err = udc.store.Reader()
  268. if err != nil {
  269. return
  270. }
  271. var value []byte
  272. value, err = kvreader.Get(VersionKey)
  273. if err != nil {
  274. _ = kvreader.Close()
  275. return
  276. }
  277. if value != nil {
  278. err = udc.loadSchema(kvreader)
  279. if err != nil {
  280. _ = kvreader.Close()
  281. return
  282. }
  283. // set doc count
  284. udc.m.Lock()
  285. udc.docCount, err = udc.countDocs(kvreader)
  286. udc.m.Unlock()
  287. err = kvreader.Close()
  288. } else {
  289. // new index, close the reader and open writer to init
  290. err = kvreader.Close()
  291. if err != nil {
  292. return
  293. }
  294. var kvwriter store.KVWriter
  295. kvwriter, err = udc.store.Writer()
  296. if err != nil {
  297. return
  298. }
  299. defer func() {
  300. if cerr := kvwriter.Close(); err == nil && cerr != nil {
  301. err = cerr
  302. }
  303. }()
  304. // init the index
  305. err = udc.init(kvwriter)
  306. }
  307. return
  308. }
  309. func (udc *UpsideDownCouch) countDocs(kvreader store.KVReader) (count uint64, err error) {
  310. it := kvreader.PrefixIterator([]byte{'b'})
  311. defer func() {
  312. if cerr := it.Close(); err == nil && cerr != nil {
  313. err = cerr
  314. }
  315. }()
  316. _, _, valid := it.Current()
  317. for valid {
  318. count++
  319. it.Next()
  320. _, _, valid = it.Current()
  321. }
  322. return
  323. }
  324. func (udc *UpsideDownCouch) rowCount() (count uint64, err error) {
  325. // start an isolated reader for use during the rowcount
  326. kvreader, err := udc.store.Reader()
  327. if err != nil {
  328. return
  329. }
  330. defer func() {
  331. if cerr := kvreader.Close(); err == nil && cerr != nil {
  332. err = cerr
  333. }
  334. }()
  335. it := kvreader.RangeIterator(nil, nil)
  336. defer func() {
  337. if cerr := it.Close(); err == nil && cerr != nil {
  338. err = cerr
  339. }
  340. }()
  341. _, _, valid := it.Current()
  342. for valid {
  343. count++
  344. it.Next()
  345. _, _, valid = it.Current()
  346. }
  347. return
  348. }
  349. func (udc *UpsideDownCouch) Close() error {
  350. return udc.store.Close()
  351. }
  352. func (udc *UpsideDownCouch) Update(doc *document.Document) (err error) {
  353. // do analysis before acquiring write lock
  354. analysisStart := time.Now()
  355. numPlainTextBytes := doc.NumPlainTextBytes()
  356. resultChan := make(chan *index.AnalysisResult)
  357. aw := index.NewAnalysisWork(udc, doc, resultChan)
  358. // put the work on the queue
  359. udc.analysisQueue.Queue(aw)
  360. // wait for the result
  361. result := <-resultChan
  362. close(resultChan)
  363. atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart)))
  364. udc.writeMutex.Lock()
  365. defer udc.writeMutex.Unlock()
  366. // open a reader for backindex lookup
  367. var kvreader store.KVReader
  368. kvreader, err = udc.store.Reader()
  369. if err != nil {
  370. return
  371. }
  372. // first we lookup the backindex row for the doc id if it exists
  373. // lookup the back index row
  374. var backIndexRow *BackIndexRow
  375. backIndexRow, err = backIndexRowForDoc(kvreader, index.IndexInternalID(doc.ID))
  376. if err != nil {
  377. _ = kvreader.Close()
  378. atomic.AddUint64(&udc.stats.errors, 1)
  379. return
  380. }
  381. err = kvreader.Close()
  382. if err != nil {
  383. return
  384. }
  385. // start a writer for this update
  386. indexStart := time.Now()
  387. var kvwriter store.KVWriter
  388. kvwriter, err = udc.store.Writer()
  389. if err != nil {
  390. return
  391. }
  392. defer func() {
  393. if cerr := kvwriter.Close(); err == nil && cerr != nil {
  394. err = cerr
  395. }
  396. }()
  397. // prepare a list of rows
  398. var addRowsAll [][]UpsideDownCouchRow
  399. var updateRowsAll [][]UpsideDownCouchRow
  400. var deleteRowsAll [][]UpsideDownCouchRow
  401. addRows, updateRows, deleteRows := udc.mergeOldAndNew(backIndexRow, result.Rows)
  402. if len(addRows) > 0 {
  403. addRowsAll = append(addRowsAll, addRows)
  404. }
  405. if len(updateRows) > 0 {
  406. updateRowsAll = append(updateRowsAll, updateRows)
  407. }
  408. if len(deleteRows) > 0 {
  409. deleteRowsAll = append(deleteRowsAll, deleteRows)
  410. }
  411. err = udc.batchRows(kvwriter, addRowsAll, updateRowsAll, deleteRowsAll)
  412. if err == nil && backIndexRow == nil {
  413. udc.m.Lock()
  414. udc.docCount++
  415. udc.m.Unlock()
  416. }
  417. atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart)))
  418. if err == nil {
  419. atomic.AddUint64(&udc.stats.updates, 1)
  420. atomic.AddUint64(&udc.stats.numPlainTextBytesIndexed, numPlainTextBytes)
  421. } else {
  422. atomic.AddUint64(&udc.stats.errors, 1)
  423. }
  424. return
  425. }
  426. func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []index.IndexRow) (addRows []UpsideDownCouchRow, updateRows []UpsideDownCouchRow, deleteRows []UpsideDownCouchRow) {
  427. addRows = make([]UpsideDownCouchRow, 0, len(rows))
  428. if backIndexRow == nil {
  429. addRows = addRows[0:len(rows)]
  430. for i, row := range rows {
  431. addRows[i] = row
  432. }
  433. return addRows, nil, nil
  434. }
  435. updateRows = make([]UpsideDownCouchRow, 0, len(rows))
  436. deleteRows = make([]UpsideDownCouchRow, 0, len(rows))
  437. var existingTermKeys map[string]struct{}
  438. backIndexTermKeys := backIndexRow.AllTermKeys()
  439. if len(backIndexTermKeys) > 0 {
  440. existingTermKeys = make(map[string]struct{}, len(backIndexTermKeys))
  441. for _, key := range backIndexTermKeys {
  442. existingTermKeys[string(key)] = struct{}{}
  443. }
  444. }
  445. var existingStoredKeys map[string]struct{}
  446. backIndexStoredKeys := backIndexRow.AllStoredKeys()
  447. if len(backIndexStoredKeys) > 0 {
  448. existingStoredKeys = make(map[string]struct{}, len(backIndexStoredKeys))
  449. for _, key := range backIndexStoredKeys {
  450. existingStoredKeys[string(key)] = struct{}{}
  451. }
  452. }
  453. keyBuf := GetRowBuffer()
  454. for _, row := range rows {
  455. switch row := row.(type) {
  456. case *TermFrequencyRow:
  457. if existingTermKeys != nil {
  458. if row.KeySize() > len(keyBuf) {
  459. keyBuf = make([]byte, row.KeySize())
  460. }
  461. keySize, _ := row.KeyTo(keyBuf)
  462. if _, ok := existingTermKeys[string(keyBuf[:keySize])]; ok {
  463. updateRows = append(updateRows, row)
  464. delete(existingTermKeys, string(keyBuf[:keySize]))
  465. continue
  466. }
  467. }
  468. addRows = append(addRows, row)
  469. case *StoredRow:
  470. if existingStoredKeys != nil {
  471. if row.KeySize() > len(keyBuf) {
  472. keyBuf = make([]byte, row.KeySize())
  473. }
  474. keySize, _ := row.KeyTo(keyBuf)
  475. if _, ok := existingStoredKeys[string(keyBuf[:keySize])]; ok {
  476. updateRows = append(updateRows, row)
  477. delete(existingStoredKeys, string(keyBuf[:keySize]))
  478. continue
  479. }
  480. }
  481. addRows = append(addRows, row)
  482. default:
  483. updateRows = append(updateRows, row)
  484. }
  485. }
  486. PutRowBuffer(keyBuf)
  487. // any of the existing rows that weren't updated need to be deleted
  488. for existingTermKey := range existingTermKeys {
  489. termFreqRow, err := NewTermFrequencyRowK([]byte(existingTermKey))
  490. if err == nil {
  491. deleteRows = append(deleteRows, termFreqRow)
  492. }
  493. }
  494. // any of the existing stored fields that weren't updated need to be deleted
  495. for existingStoredKey := range existingStoredKeys {
  496. storedRow, err := NewStoredRowK([]byte(existingStoredKey))
  497. if err == nil {
  498. deleteRows = append(deleteRows, storedRow)
  499. }
  500. }
  501. return addRows, updateRows, deleteRows
  502. }
  503. func (udc *UpsideDownCouch) storeField(docID []byte, field document.Field, fieldIndex uint16, rows []index.IndexRow, backIndexStoredEntries []*BackIndexStoreEntry) ([]index.IndexRow, []*BackIndexStoreEntry) {
  504. fieldType := encodeFieldType(field)
  505. storedRow := NewStoredRow(docID, fieldIndex, field.ArrayPositions(), fieldType, field.Value())
  506. // record the back index entry
  507. backIndexStoredEntry := BackIndexStoreEntry{Field: proto.Uint32(uint32(fieldIndex)), ArrayPositions: field.ArrayPositions()}
  508. return append(rows, storedRow), append(backIndexStoredEntries, &backIndexStoredEntry)
  509. }
  510. func encodeFieldType(f document.Field) byte {
  511. fieldType := byte('x')
  512. switch f.(type) {
  513. case *document.TextField:
  514. fieldType = 't'
  515. case *document.NumericField:
  516. fieldType = 'n'
  517. case *document.DateTimeField:
  518. fieldType = 'd'
  519. case *document.BooleanField:
  520. fieldType = 'b'
  521. case *document.GeoPointField:
  522. fieldType = 'g'
  523. case *document.CompositeField:
  524. fieldType = 'c'
  525. }
  526. return fieldType
  527. }
  528. func (udc *UpsideDownCouch) indexField(docID []byte, includeTermVectors bool, fieldIndex uint16, fieldLength int, tokenFreqs analysis.TokenFrequencies, rows []index.IndexRow, backIndexTermsEntries []*BackIndexTermsEntry) ([]index.IndexRow, []*BackIndexTermsEntry) {
  529. fieldNorm := float32(1.0 / math.Sqrt(float64(fieldLength)))
  530. termFreqRows := make([]TermFrequencyRow, len(tokenFreqs))
  531. termFreqRowsUsed := 0
  532. terms := make([]string, 0, len(tokenFreqs))
  533. for k, tf := range tokenFreqs {
  534. termFreqRow := &termFreqRows[termFreqRowsUsed]
  535. termFreqRowsUsed++
  536. InitTermFrequencyRow(termFreqRow, tf.Term, fieldIndex, docID,
  537. uint64(frequencyFromTokenFreq(tf)), fieldNorm)
  538. if includeTermVectors {
  539. termFreqRow.vectors, rows = udc.termVectorsFromTokenFreq(fieldIndex, tf, rows)
  540. }
  541. // record the back index entry
  542. terms = append(terms, k)
  543. rows = append(rows, termFreqRow)
  544. }
  545. backIndexTermsEntry := BackIndexTermsEntry{Field: proto.Uint32(uint32(fieldIndex)), Terms: terms}
  546. backIndexTermsEntries = append(backIndexTermsEntries, &backIndexTermsEntry)
  547. return rows, backIndexTermsEntries
  548. }
  549. func (udc *UpsideDownCouch) Delete(id string) (err error) {
  550. indexStart := time.Now()
  551. udc.writeMutex.Lock()
  552. defer udc.writeMutex.Unlock()
  553. // open a reader for backindex lookup
  554. var kvreader store.KVReader
  555. kvreader, err = udc.store.Reader()
  556. if err != nil {
  557. return
  558. }
  559. // first we lookup the backindex row for the doc id if it exists
  560. // lookup the back index row
  561. var backIndexRow *BackIndexRow
  562. backIndexRow, err = backIndexRowForDoc(kvreader, index.IndexInternalID(id))
  563. if err != nil {
  564. _ = kvreader.Close()
  565. atomic.AddUint64(&udc.stats.errors, 1)
  566. return
  567. }
  568. err = kvreader.Close()
  569. if err != nil {
  570. return
  571. }
  572. if backIndexRow == nil {
  573. atomic.AddUint64(&udc.stats.deletes, 1)
  574. return
  575. }
  576. // start a writer for this delete
  577. var kvwriter store.KVWriter
  578. kvwriter, err = udc.store.Writer()
  579. if err != nil {
  580. return
  581. }
  582. defer func() {
  583. if cerr := kvwriter.Close(); err == nil && cerr != nil {
  584. err = cerr
  585. }
  586. }()
  587. var deleteRowsAll [][]UpsideDownCouchRow
  588. deleteRows := udc.deleteSingle(id, backIndexRow, nil)
  589. if len(deleteRows) > 0 {
  590. deleteRowsAll = append(deleteRowsAll, deleteRows)
  591. }
  592. err = udc.batchRows(kvwriter, nil, nil, deleteRowsAll)
  593. if err == nil {
  594. udc.m.Lock()
  595. udc.docCount--
  596. udc.m.Unlock()
  597. }
  598. atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart)))
  599. if err == nil {
  600. atomic.AddUint64(&udc.stats.deletes, 1)
  601. } else {
  602. atomic.AddUint64(&udc.stats.errors, 1)
  603. }
  604. return
  605. }
  606. func (udc *UpsideDownCouch) deleteSingle(id string, backIndexRow *BackIndexRow, deleteRows []UpsideDownCouchRow) []UpsideDownCouchRow {
  607. idBytes := []byte(id)
  608. for _, backIndexEntry := range backIndexRow.termsEntries {
  609. for i := range backIndexEntry.Terms {
  610. tfr := NewTermFrequencyRow([]byte(backIndexEntry.Terms[i]), uint16(*backIndexEntry.Field), idBytes, 0, 0)
  611. deleteRows = append(deleteRows, tfr)
  612. }
  613. }
  614. for _, se := range backIndexRow.storedEntries {
  615. sf := NewStoredRow(idBytes, uint16(*se.Field), se.ArrayPositions, 'x', nil)
  616. deleteRows = append(deleteRows, sf)
  617. }
  618. // also delete the back entry itself
  619. deleteRows = append(deleteRows, backIndexRow)
  620. return deleteRows
  621. }
  622. func decodeFieldType(typ byte, name string, pos []uint64, value []byte) document.Field {
  623. switch typ {
  624. case 't':
  625. return document.NewTextField(name, pos, value)
  626. case 'n':
  627. return document.NewNumericFieldFromBytes(name, pos, value)
  628. case 'd':
  629. return document.NewDateTimeFieldFromBytes(name, pos, value)
  630. case 'b':
  631. return document.NewBooleanFieldFromBytes(name, pos, value)
  632. case 'g':
  633. return document.NewGeoPointFieldFromBytes(name, pos, value)
  634. }
  635. return nil
  636. }
  637. func frequencyFromTokenFreq(tf *analysis.TokenFreq) int {
  638. return tf.Frequency()
  639. }
  640. func (udc *UpsideDownCouch) termVectorsFromTokenFreq(field uint16, tf *analysis.TokenFreq, rows []index.IndexRow) ([]*TermVector, []index.IndexRow) {
  641. a := make([]TermVector, len(tf.Locations))
  642. rv := make([]*TermVector, len(tf.Locations))
  643. for i, l := range tf.Locations {
  644. var newFieldRow *FieldRow
  645. fieldIndex := field
  646. if l.Field != "" {
  647. // lookup correct field
  648. fieldIndex, newFieldRow = udc.fieldIndexOrNewRow(l.Field)
  649. if newFieldRow != nil {
  650. rows = append(rows, newFieldRow)
  651. }
  652. }
  653. a[i] = TermVector{
  654. field: fieldIndex,
  655. arrayPositions: l.ArrayPositions,
  656. pos: uint64(l.Position),
  657. start: uint64(l.Start),
  658. end: uint64(l.End),
  659. }
  660. rv[i] = &a[i]
  661. }
  662. return rv, rows
  663. }
  664. func (udc *UpsideDownCouch) termFieldVectorsFromTermVectors(in []*TermVector) []*index.TermFieldVector {
  665. if len(in) == 0 {
  666. return nil
  667. }
  668. a := make([]index.TermFieldVector, len(in))
  669. rv := make([]*index.TermFieldVector, len(in))
  670. for i, tv := range in {
  671. fieldName := udc.fieldCache.FieldIndexed(tv.field)
  672. a[i] = index.TermFieldVector{
  673. Field: fieldName,
  674. ArrayPositions: tv.arrayPositions,
  675. Pos: tv.pos,
  676. Start: tv.start,
  677. End: tv.end,
  678. }
  679. rv[i] = &a[i]
  680. }
  681. return rv
  682. }
  683. func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
  684. analysisStart := time.Now()
  685. resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps))
  686. var numUpdates uint64
  687. var numPlainTextBytes uint64
  688. for _, doc := range batch.IndexOps {
  689. if doc != nil {
  690. numUpdates++
  691. numPlainTextBytes += doc.NumPlainTextBytes()
  692. }
  693. }
  694. if len(batch.IndexOps) > 0 {
  695. go func() {
  696. for _, doc := range batch.IndexOps {
  697. if doc != nil {
  698. aw := index.NewAnalysisWork(udc, doc, resultChan)
  699. // put the work on the queue
  700. udc.analysisQueue.Queue(aw)
  701. }
  702. }
  703. }()
  704. }
  705. // retrieve back index rows concurrent with analysis
  706. docBackIndexRowErr := error(nil)
  707. docBackIndexRowCh := make(chan *docBackIndexRow, len(batch.IndexOps))
  708. udc.writeMutex.Lock()
  709. defer udc.writeMutex.Unlock()
  710. go func() {
  711. defer close(docBackIndexRowCh)
  712. // open a reader for backindex lookup
  713. var kvreader store.KVReader
  714. kvreader, err = udc.store.Reader()
  715. if err != nil {
  716. docBackIndexRowErr = err
  717. return
  718. }
  719. defer func() {
  720. if cerr := kvreader.Close(); err == nil && cerr != nil {
  721. docBackIndexRowErr = cerr
  722. }
  723. }()
  724. for docID, doc := range batch.IndexOps {
  725. backIndexRow, err := backIndexRowForDoc(kvreader, index.IndexInternalID(docID))
  726. if err != nil {
  727. docBackIndexRowErr = err
  728. return
  729. }
  730. docBackIndexRowCh <- &docBackIndexRow{docID, doc, backIndexRow}
  731. }
  732. }()
  733. // wait for analysis result
  734. newRowsMap := make(map[string][]index.IndexRow)
  735. var itemsDeQueued uint64
  736. for itemsDeQueued < numUpdates {
  737. result := <-resultChan
  738. newRowsMap[result.DocID] = result.Rows
  739. itemsDeQueued++
  740. }
  741. close(resultChan)
  742. atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart)))
  743. docsAdded := uint64(0)
  744. docsDeleted := uint64(0)
  745. indexStart := time.Now()
  746. // prepare a list of rows
  747. var addRowsAll [][]UpsideDownCouchRow
  748. var updateRowsAll [][]UpsideDownCouchRow
  749. var deleteRowsAll [][]UpsideDownCouchRow
  750. // add the internal ops
  751. var updateRows []UpsideDownCouchRow
  752. var deleteRows []UpsideDownCouchRow
  753. for internalKey, internalValue := range batch.InternalOps {
  754. if internalValue == nil {
  755. // delete
  756. deleteInternalRow := NewInternalRow([]byte(internalKey), nil)
  757. deleteRows = append(deleteRows, deleteInternalRow)
  758. } else {
  759. updateInternalRow := NewInternalRow([]byte(internalKey), internalValue)
  760. updateRows = append(updateRows, updateInternalRow)
  761. }
  762. }
  763. if len(updateRows) > 0 {
  764. updateRowsAll = append(updateRowsAll, updateRows)
  765. }
  766. if len(deleteRows) > 0 {
  767. deleteRowsAll = append(deleteRowsAll, deleteRows)
  768. }
  769. // process back index rows as they arrive
  770. for dbir := range docBackIndexRowCh {
  771. if dbir.doc == nil && dbir.backIndexRow != nil {
  772. // delete
  773. deleteRows := udc.deleteSingle(dbir.docID, dbir.backIndexRow, nil)
  774. if len(deleteRows) > 0 {
  775. deleteRowsAll = append(deleteRowsAll, deleteRows)
  776. }
  777. docsDeleted++
  778. } else if dbir.doc != nil {
  779. addRows, updateRows, deleteRows := udc.mergeOldAndNew(dbir.backIndexRow, newRowsMap[dbir.docID])
  780. if len(addRows) > 0 {
  781. addRowsAll = append(addRowsAll, addRows)
  782. }
  783. if len(updateRows) > 0 {
  784. updateRowsAll = append(updateRowsAll, updateRows)
  785. }
  786. if len(deleteRows) > 0 {
  787. deleteRowsAll = append(deleteRowsAll, deleteRows)
  788. }
  789. if dbir.backIndexRow == nil {
  790. docsAdded++
  791. }
  792. }
  793. }
  794. if docBackIndexRowErr != nil {
  795. return docBackIndexRowErr
  796. }
  797. // start a writer for this batch
  798. var kvwriter store.KVWriter
  799. kvwriter, err = udc.store.Writer()
  800. if err != nil {
  801. return
  802. }
  803. err = udc.batchRows(kvwriter, addRowsAll, updateRowsAll, deleteRowsAll)
  804. if err != nil {
  805. _ = kvwriter.Close()
  806. atomic.AddUint64(&udc.stats.errors, 1)
  807. return
  808. }
  809. err = kvwriter.Close()
  810. atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart)))
  811. if err == nil {
  812. udc.m.Lock()
  813. udc.docCount += docsAdded
  814. udc.docCount -= docsDeleted
  815. udc.m.Unlock()
  816. atomic.AddUint64(&udc.stats.updates, numUpdates)
  817. atomic.AddUint64(&udc.stats.deletes, docsDeleted)
  818. atomic.AddUint64(&udc.stats.batches, 1)
  819. atomic.AddUint64(&udc.stats.numPlainTextBytesIndexed, numPlainTextBytes)
  820. } else {
  821. atomic.AddUint64(&udc.stats.errors, 1)
  822. }
  823. persistedCallback := batch.PersistedCallback()
  824. if persistedCallback != nil {
  825. persistedCallback(err)
  826. }
  827. return
  828. }
  829. func (udc *UpsideDownCouch) SetInternal(key, val []byte) (err error) {
  830. internalRow := NewInternalRow(key, val)
  831. udc.writeMutex.Lock()
  832. defer udc.writeMutex.Unlock()
  833. var writer store.KVWriter
  834. writer, err = udc.store.Writer()
  835. if err != nil {
  836. return
  837. }
  838. defer func() {
  839. if cerr := writer.Close(); err == nil && cerr != nil {
  840. err = cerr
  841. }
  842. }()
  843. batch := writer.NewBatch()
  844. batch.Set(internalRow.Key(), internalRow.Value())
  845. return writer.ExecuteBatch(batch)
  846. }
  847. func (udc *UpsideDownCouch) DeleteInternal(key []byte) (err error) {
  848. internalRow := NewInternalRow(key, nil)
  849. udc.writeMutex.Lock()
  850. defer udc.writeMutex.Unlock()
  851. var writer store.KVWriter
  852. writer, err = udc.store.Writer()
  853. if err != nil {
  854. return
  855. }
  856. defer func() {
  857. if cerr := writer.Close(); err == nil && cerr != nil {
  858. err = cerr
  859. }
  860. }()
  861. batch := writer.NewBatch()
  862. batch.Delete(internalRow.Key())
  863. return writer.ExecuteBatch(batch)
  864. }
  865. func (udc *UpsideDownCouch) Reader() (index.IndexReader, error) {
  866. kvr, err := udc.store.Reader()
  867. if err != nil {
  868. return nil, fmt.Errorf("error opening store reader: %v", err)
  869. }
  870. udc.m.RLock()
  871. defer udc.m.RUnlock()
  872. return &IndexReader{
  873. index: udc,
  874. kvreader: kvr,
  875. docCount: udc.docCount,
  876. }, nil
  877. }
  878. func (udc *UpsideDownCouch) Stats() json.Marshaler {
  879. return udc.stats
  880. }
  881. func (udc *UpsideDownCouch) StatsMap() map[string]interface{} {
  882. return udc.stats.statsMap()
  883. }
  884. func (udc *UpsideDownCouch) Advanced() (store.KVStore, error) {
  885. return udc.store, nil
  886. }
  887. func (udc *UpsideDownCouch) fieldIndexOrNewRow(name string) (uint16, *FieldRow) {
  888. index, existed := udc.fieldCache.FieldNamed(name, true)
  889. if !existed {
  890. return index, NewFieldRow(index, name)
  891. }
  892. return index, nil
  893. }
  894. func init() {
  895. registry.RegisterIndexType(Name, NewUpsideDownCouch)
  896. }
  897. func backIndexRowForDoc(kvreader store.KVReader, docID index.IndexInternalID) (*BackIndexRow, error) {
  898. // use a temporary row structure to build key
  899. tempRow := BackIndexRow{
  900. doc: docID,
  901. }
  902. keyBuf := GetRowBuffer()
  903. if tempRow.KeySize() > len(keyBuf) {
  904. keyBuf = make([]byte, 2*tempRow.KeySize())
  905. }
  906. defer PutRowBuffer(keyBuf)
  907. keySize, err := tempRow.KeyTo(keyBuf)
  908. if err != nil {
  909. return nil, err
  910. }
  911. value, err := kvreader.Get(keyBuf[:keySize])
  912. if err != nil {
  913. return nil, err
  914. }
  915. if value == nil {
  916. return nil, nil
  917. }
  918. backIndexRow, err := NewBackIndexRowKV(keyBuf[:keySize], value)
  919. if err != nil {
  920. return nil, err
  921. }
  922. return backIndexRow, nil
  923. }
上海开阖软件有限公司 沪ICP备12045867号-1