本站源代码
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

607 lines
12KB

  1. // Copyright (c) 2014 Couchbase, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package bleve
  15. import (
  16. "context"
  17. "sort"
  18. "sync"
  19. "time"
  20. "github.com/blevesearch/bleve/document"
  21. "github.com/blevesearch/bleve/index"
  22. "github.com/blevesearch/bleve/index/store"
  23. "github.com/blevesearch/bleve/mapping"
  24. "github.com/blevesearch/bleve/search"
  25. )
  26. type indexAliasImpl struct {
  27. name string
  28. indexes []Index
  29. mutex sync.RWMutex
  30. open bool
  31. }
  32. // NewIndexAlias creates a new IndexAlias over the provided
  33. // Index objects.
  34. func NewIndexAlias(indexes ...Index) *indexAliasImpl {
  35. return &indexAliasImpl{
  36. name: "alias",
  37. indexes: indexes,
  38. open: true,
  39. }
  40. }
  41. func (i *indexAliasImpl) isAliasToSingleIndex() error {
  42. if len(i.indexes) < 1 {
  43. return ErrorAliasEmpty
  44. } else if len(i.indexes) > 1 {
  45. return ErrorAliasMulti
  46. }
  47. return nil
  48. }
  49. func (i *indexAliasImpl) Index(id string, data interface{}) error {
  50. i.mutex.RLock()
  51. defer i.mutex.RUnlock()
  52. if !i.open {
  53. return ErrorIndexClosed
  54. }
  55. err := i.isAliasToSingleIndex()
  56. if err != nil {
  57. return err
  58. }
  59. return i.indexes[0].Index(id, data)
  60. }
  61. func (i *indexAliasImpl) Delete(id string) error {
  62. i.mutex.RLock()
  63. defer i.mutex.RUnlock()
  64. if !i.open {
  65. return ErrorIndexClosed
  66. }
  67. err := i.isAliasToSingleIndex()
  68. if err != nil {
  69. return err
  70. }
  71. return i.indexes[0].Delete(id)
  72. }
  73. func (i *indexAliasImpl) Batch(b *Batch) error {
  74. i.mutex.RLock()
  75. defer i.mutex.RUnlock()
  76. if !i.open {
  77. return ErrorIndexClosed
  78. }
  79. err := i.isAliasToSingleIndex()
  80. if err != nil {
  81. return err
  82. }
  83. return i.indexes[0].Batch(b)
  84. }
  85. func (i *indexAliasImpl) Document(id string) (*document.Document, error) {
  86. i.mutex.RLock()
  87. defer i.mutex.RUnlock()
  88. if !i.open {
  89. return nil, ErrorIndexClosed
  90. }
  91. err := i.isAliasToSingleIndex()
  92. if err != nil {
  93. return nil, err
  94. }
  95. return i.indexes[0].Document(id)
  96. }
  97. func (i *indexAliasImpl) DocCount() (uint64, error) {
  98. i.mutex.RLock()
  99. defer i.mutex.RUnlock()
  100. rv := uint64(0)
  101. if !i.open {
  102. return 0, ErrorIndexClosed
  103. }
  104. for _, index := range i.indexes {
  105. otherCount, err := index.DocCount()
  106. if err == nil {
  107. rv += otherCount
  108. }
  109. // tolerate errors to produce partial counts
  110. }
  111. return rv, nil
  112. }
  113. func (i *indexAliasImpl) Search(req *SearchRequest) (*SearchResult, error) {
  114. return i.SearchInContext(context.Background(), req)
  115. }
  116. func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest) (*SearchResult, error) {
  117. i.mutex.RLock()
  118. defer i.mutex.RUnlock()
  119. if !i.open {
  120. return nil, ErrorIndexClosed
  121. }
  122. if len(i.indexes) < 1 {
  123. return nil, ErrorAliasEmpty
  124. }
  125. // short circuit the simple case
  126. if len(i.indexes) == 1 {
  127. return i.indexes[0].SearchInContext(ctx, req)
  128. }
  129. return MultiSearch(ctx, req, i.indexes...)
  130. }
  131. func (i *indexAliasImpl) Fields() ([]string, error) {
  132. i.mutex.RLock()
  133. defer i.mutex.RUnlock()
  134. if !i.open {
  135. return nil, ErrorIndexClosed
  136. }
  137. err := i.isAliasToSingleIndex()
  138. if err != nil {
  139. return nil, err
  140. }
  141. return i.indexes[0].Fields()
  142. }
  143. func (i *indexAliasImpl) FieldDict(field string) (index.FieldDict, error) {
  144. i.mutex.RLock()
  145. if !i.open {
  146. i.mutex.RUnlock()
  147. return nil, ErrorIndexClosed
  148. }
  149. err := i.isAliasToSingleIndex()
  150. if err != nil {
  151. i.mutex.RUnlock()
  152. return nil, err
  153. }
  154. fieldDict, err := i.indexes[0].FieldDict(field)
  155. if err != nil {
  156. i.mutex.RUnlock()
  157. return nil, err
  158. }
  159. return &indexAliasImplFieldDict{
  160. index: i,
  161. fieldDict: fieldDict,
  162. }, nil
  163. }
  164. func (i *indexAliasImpl) FieldDictRange(field string, startTerm []byte, endTerm []byte) (index.FieldDict, error) {
  165. i.mutex.RLock()
  166. if !i.open {
  167. i.mutex.RUnlock()
  168. return nil, ErrorIndexClosed
  169. }
  170. err := i.isAliasToSingleIndex()
  171. if err != nil {
  172. i.mutex.RUnlock()
  173. return nil, err
  174. }
  175. fieldDict, err := i.indexes[0].FieldDictRange(field, startTerm, endTerm)
  176. if err != nil {
  177. i.mutex.RUnlock()
  178. return nil, err
  179. }
  180. return &indexAliasImplFieldDict{
  181. index: i,
  182. fieldDict: fieldDict,
  183. }, nil
  184. }
  185. func (i *indexAliasImpl) FieldDictPrefix(field string, termPrefix []byte) (index.FieldDict, error) {
  186. i.mutex.RLock()
  187. if !i.open {
  188. i.mutex.RUnlock()
  189. return nil, ErrorIndexClosed
  190. }
  191. err := i.isAliasToSingleIndex()
  192. if err != nil {
  193. i.mutex.RUnlock()
  194. return nil, err
  195. }
  196. fieldDict, err := i.indexes[0].FieldDictPrefix(field, termPrefix)
  197. if err != nil {
  198. i.mutex.RUnlock()
  199. return nil, err
  200. }
  201. return &indexAliasImplFieldDict{
  202. index: i,
  203. fieldDict: fieldDict,
  204. }, nil
  205. }
  206. func (i *indexAliasImpl) Close() error {
  207. i.mutex.Lock()
  208. defer i.mutex.Unlock()
  209. i.open = false
  210. return nil
  211. }
  212. func (i *indexAliasImpl) Mapping() mapping.IndexMapping {
  213. i.mutex.RLock()
  214. defer i.mutex.RUnlock()
  215. if !i.open {
  216. return nil
  217. }
  218. err := i.isAliasToSingleIndex()
  219. if err != nil {
  220. return nil
  221. }
  222. return i.indexes[0].Mapping()
  223. }
  224. func (i *indexAliasImpl) Stats() *IndexStat {
  225. i.mutex.RLock()
  226. defer i.mutex.RUnlock()
  227. if !i.open {
  228. return nil
  229. }
  230. err := i.isAliasToSingleIndex()
  231. if err != nil {
  232. return nil
  233. }
  234. return i.indexes[0].Stats()
  235. }
  236. func (i *indexAliasImpl) StatsMap() map[string]interface{} {
  237. i.mutex.RLock()
  238. defer i.mutex.RUnlock()
  239. if !i.open {
  240. return nil
  241. }
  242. err := i.isAliasToSingleIndex()
  243. if err != nil {
  244. return nil
  245. }
  246. return i.indexes[0].StatsMap()
  247. }
  248. func (i *indexAliasImpl) GetInternal(key []byte) ([]byte, error) {
  249. i.mutex.RLock()
  250. defer i.mutex.RUnlock()
  251. if !i.open {
  252. return nil, ErrorIndexClosed
  253. }
  254. err := i.isAliasToSingleIndex()
  255. if err != nil {
  256. return nil, err
  257. }
  258. return i.indexes[0].GetInternal(key)
  259. }
  260. func (i *indexAliasImpl) SetInternal(key, val []byte) error {
  261. i.mutex.RLock()
  262. defer i.mutex.RUnlock()
  263. if !i.open {
  264. return ErrorIndexClosed
  265. }
  266. err := i.isAliasToSingleIndex()
  267. if err != nil {
  268. return err
  269. }
  270. return i.indexes[0].SetInternal(key, val)
  271. }
  272. func (i *indexAliasImpl) DeleteInternal(key []byte) error {
  273. i.mutex.RLock()
  274. defer i.mutex.RUnlock()
  275. if !i.open {
  276. return ErrorIndexClosed
  277. }
  278. err := i.isAliasToSingleIndex()
  279. if err != nil {
  280. return err
  281. }
  282. return i.indexes[0].DeleteInternal(key)
  283. }
  284. func (i *indexAliasImpl) Advanced() (index.Index, store.KVStore, error) {
  285. i.mutex.RLock()
  286. defer i.mutex.RUnlock()
  287. if !i.open {
  288. return nil, nil, ErrorIndexClosed
  289. }
  290. err := i.isAliasToSingleIndex()
  291. if err != nil {
  292. return nil, nil, err
  293. }
  294. return i.indexes[0].Advanced()
  295. }
  296. func (i *indexAliasImpl) Add(indexes ...Index) {
  297. i.mutex.Lock()
  298. defer i.mutex.Unlock()
  299. i.indexes = append(i.indexes, indexes...)
  300. }
  301. func (i *indexAliasImpl) removeSingle(index Index) {
  302. for pos, in := range i.indexes {
  303. if in == index {
  304. i.indexes = append(i.indexes[:pos], i.indexes[pos+1:]...)
  305. break
  306. }
  307. }
  308. }
  309. func (i *indexAliasImpl) Remove(indexes ...Index) {
  310. i.mutex.Lock()
  311. defer i.mutex.Unlock()
  312. for _, in := range indexes {
  313. i.removeSingle(in)
  314. }
  315. }
  316. func (i *indexAliasImpl) Swap(in, out []Index) {
  317. i.mutex.Lock()
  318. defer i.mutex.Unlock()
  319. // add
  320. i.indexes = append(i.indexes, in...)
  321. // delete
  322. for _, ind := range out {
  323. i.removeSingle(ind)
  324. }
  325. }
  326. // createChildSearchRequest creates a separate
  327. // request from the original
  328. // For now, avoid data race on req structure.
  329. // TODO disable highlight/field load on child
  330. // requests, and add code to do this only on
  331. // the actual final results.
  332. // Perhaps that part needs to be optional,
  333. // could be slower in remote usages.
  334. func createChildSearchRequest(req *SearchRequest) *SearchRequest {
  335. rv := SearchRequest{
  336. Query: req.Query,
  337. Size: req.Size + req.From,
  338. From: 0,
  339. Highlight: req.Highlight,
  340. Fields: req.Fields,
  341. Facets: req.Facets,
  342. Explain: req.Explain,
  343. Sort: req.Sort.Copy(),
  344. IncludeLocations: req.IncludeLocations,
  345. Score: req.Score,
  346. }
  347. return &rv
  348. }
  349. type asyncSearchResult struct {
  350. Name string
  351. Result *SearchResult
  352. Err error
  353. }
  354. // MultiSearch executes a SearchRequest across multiple Index objects,
  355. // then merges the results. The indexes must honor any ctx deadline.
  356. func MultiSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*SearchResult, error) {
  357. searchStart := time.Now()
  358. asyncResults := make(chan *asyncSearchResult, len(indexes))
  359. // run search on each index in separate go routine
  360. var waitGroup sync.WaitGroup
  361. var searchChildIndex = func(in Index, childReq *SearchRequest) {
  362. rv := asyncSearchResult{Name: in.Name()}
  363. rv.Result, rv.Err = in.SearchInContext(ctx, childReq)
  364. asyncResults <- &rv
  365. waitGroup.Done()
  366. }
  367. waitGroup.Add(len(indexes))
  368. for _, in := range indexes {
  369. go searchChildIndex(in, createChildSearchRequest(req))
  370. }
  371. // on another go routine, close after finished
  372. go func() {
  373. waitGroup.Wait()
  374. close(asyncResults)
  375. }()
  376. var sr *SearchResult
  377. indexErrors := make(map[string]error)
  378. for asr := range asyncResults {
  379. if asr.Err == nil {
  380. if sr == nil {
  381. // first result
  382. sr = asr.Result
  383. } else {
  384. // merge with previous
  385. sr.Merge(asr.Result)
  386. }
  387. } else {
  388. indexErrors[asr.Name] = asr.Err
  389. }
  390. }
  391. // merge just concatenated all the hits
  392. // now lets clean it up
  393. // handle case where no results were successful
  394. if sr == nil {
  395. sr = &SearchResult{
  396. Status: &SearchStatus{
  397. Errors: make(map[string]error),
  398. },
  399. }
  400. }
  401. // sort all hits with the requested order
  402. if len(req.Sort) > 0 {
  403. sorter := newMultiSearchHitSorter(req.Sort, sr.Hits)
  404. sort.Sort(sorter)
  405. }
  406. // now skip over the correct From
  407. if req.From > 0 && len(sr.Hits) > req.From {
  408. sr.Hits = sr.Hits[req.From:]
  409. } else if req.From > 0 {
  410. sr.Hits = search.DocumentMatchCollection{}
  411. }
  412. // now trim to the correct size
  413. if req.Size > 0 && len(sr.Hits) > req.Size {
  414. sr.Hits = sr.Hits[0:req.Size]
  415. }
  416. // fix up facets
  417. for name, fr := range req.Facets {
  418. sr.Facets.Fixup(name, fr.Size)
  419. }
  420. // fix up original request
  421. sr.Request = req
  422. searchDuration := time.Since(searchStart)
  423. sr.Took = searchDuration
  424. // fix up errors
  425. if len(indexErrors) > 0 {
  426. if sr.Status.Errors == nil {
  427. sr.Status.Errors = make(map[string]error)
  428. }
  429. for indexName, indexErr := range indexErrors {
  430. sr.Status.Errors[indexName] = indexErr
  431. sr.Status.Total++
  432. sr.Status.Failed++
  433. }
  434. }
  435. return sr, nil
  436. }
  437. func (i *indexAliasImpl) NewBatch() *Batch {
  438. i.mutex.RLock()
  439. defer i.mutex.RUnlock()
  440. if !i.open {
  441. return nil
  442. }
  443. err := i.isAliasToSingleIndex()
  444. if err != nil {
  445. return nil
  446. }
  447. return i.indexes[0].NewBatch()
  448. }
  449. func (i *indexAliasImpl) Name() string {
  450. return i.name
  451. }
  452. func (i *indexAliasImpl) SetName(name string) {
  453. i.name = name
  454. }
  455. type indexAliasImplFieldDict struct {
  456. index *indexAliasImpl
  457. fieldDict index.FieldDict
  458. }
  459. func (f *indexAliasImplFieldDict) Next() (*index.DictEntry, error) {
  460. return f.fieldDict.Next()
  461. }
  462. func (f *indexAliasImplFieldDict) Close() error {
  463. defer f.index.mutex.RUnlock()
  464. return f.fieldDict.Close()
  465. }
  466. type multiSearchHitSorter struct {
  467. hits search.DocumentMatchCollection
  468. sort search.SortOrder
  469. cachedScoring []bool
  470. cachedDesc []bool
  471. }
  472. func newMultiSearchHitSorter(sort search.SortOrder, hits search.DocumentMatchCollection) *multiSearchHitSorter {
  473. return &multiSearchHitSorter{
  474. sort: sort,
  475. hits: hits,
  476. cachedScoring: sort.CacheIsScore(),
  477. cachedDesc: sort.CacheDescending(),
  478. }
  479. }
  480. func (m *multiSearchHitSorter) Len() int { return len(m.hits) }
  481. func (m *multiSearchHitSorter) Swap(i, j int) { m.hits[i], m.hits[j] = m.hits[j], m.hits[i] }
  482. func (m *multiSearchHitSorter) Less(i, j int) bool {
  483. c := m.sort.Compare(m.cachedScoring, m.cachedDesc, m.hits[i], m.hits[j])
  484. return c < 0
  485. }
上海开阖软件有限公司 沪ICP备12045867号-1