|
- // Copyright (c) 2014 Couchbase, Inc.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
-
- package bleve
-
- import (
- "context"
- "sort"
- "sync"
- "time"
-
- "github.com/blevesearch/bleve/document"
- "github.com/blevesearch/bleve/index"
- "github.com/blevesearch/bleve/index/store"
- "github.com/blevesearch/bleve/mapping"
- "github.com/blevesearch/bleve/search"
- )
-
- type indexAliasImpl struct {
- name string
- indexes []Index
- mutex sync.RWMutex
- open bool
- }
-
- // NewIndexAlias creates a new IndexAlias over the provided
- // Index objects.
- func NewIndexAlias(indexes ...Index) *indexAliasImpl {
- return &indexAliasImpl{
- name: "alias",
- indexes: indexes,
- open: true,
- }
- }
-
- func (i *indexAliasImpl) isAliasToSingleIndex() error {
- if len(i.indexes) < 1 {
- return ErrorAliasEmpty
- } else if len(i.indexes) > 1 {
- return ErrorAliasMulti
- }
- return nil
- }
-
- func (i *indexAliasImpl) Index(id string, data interface{}) error {
- i.mutex.RLock()
- defer i.mutex.RUnlock()
-
- if !i.open {
- return ErrorIndexClosed
- }
-
- err := i.isAliasToSingleIndex()
- if err != nil {
- return err
- }
-
- return i.indexes[0].Index(id, data)
- }
-
- func (i *indexAliasImpl) Delete(id string) error {
- i.mutex.RLock()
- defer i.mutex.RUnlock()
-
- if !i.open {
- return ErrorIndexClosed
- }
-
- err := i.isAliasToSingleIndex()
- if err != nil {
- return err
- }
-
- return i.indexes[0].Delete(id)
- }
-
- func (i *indexAliasImpl) Batch(b *Batch) error {
- i.mutex.RLock()
- defer i.mutex.RUnlock()
-
- if !i.open {
- return ErrorIndexClosed
- }
-
- err := i.isAliasToSingleIndex()
- if err != nil {
- return err
- }
-
- return i.indexes[0].Batch(b)
- }
-
- func (i *indexAliasImpl) Document(id string) (*document.Document, error) {
- i.mutex.RLock()
- defer i.mutex.RUnlock()
-
- if !i.open {
- return nil, ErrorIndexClosed
- }
-
- err := i.isAliasToSingleIndex()
- if err != nil {
- return nil, err
- }
-
- return i.indexes[0].Document(id)
- }
-
- func (i *indexAliasImpl) DocCount() (uint64, error) {
- i.mutex.RLock()
- defer i.mutex.RUnlock()
-
- rv := uint64(0)
-
- if !i.open {
- return 0, ErrorIndexClosed
- }
-
- for _, index := range i.indexes {
- otherCount, err := index.DocCount()
- if err == nil {
- rv += otherCount
- }
- // tolerate errors to produce partial counts
- }
-
- return rv, nil
- }
-
- func (i *indexAliasImpl) Search(req *SearchRequest) (*SearchResult, error) {
- return i.SearchInContext(context.Background(), req)
- }
-
- func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest) (*SearchResult, error) {
- i.mutex.RLock()
- defer i.mutex.RUnlock()
-
- if !i.open {
- return nil, ErrorIndexClosed
- }
-
- if len(i.indexes) < 1 {
- return nil, ErrorAliasEmpty
- }
-
- // short circuit the simple case
- if len(i.indexes) == 1 {
- return i.indexes[0].SearchInContext(ctx, req)
- }
-
- return MultiSearch(ctx, req, i.indexes...)
- }
-
- func (i *indexAliasImpl) Fields() ([]string, error) {
- i.mutex.RLock()
- defer i.mutex.RUnlock()
-
- if !i.open {
- return nil, ErrorIndexClosed
- }
-
- err := i.isAliasToSingleIndex()
- if err != nil {
- return nil, err
- }
-
- return i.indexes[0].Fields()
- }
-
- func (i *indexAliasImpl) FieldDict(field string) (index.FieldDict, error) {
- i.mutex.RLock()
-
- if !i.open {
- i.mutex.RUnlock()
- return nil, ErrorIndexClosed
- }
-
- err := i.isAliasToSingleIndex()
- if err != nil {
- i.mutex.RUnlock()
- return nil, err
- }
-
- fieldDict, err := i.indexes[0].FieldDict(field)
- if err != nil {
- i.mutex.RUnlock()
- return nil, err
- }
-
- return &indexAliasImplFieldDict{
- index: i,
- fieldDict: fieldDict,
- }, nil
- }
-
- func (i *indexAliasImpl) FieldDictRange(field string, startTerm []byte, endTerm []byte) (index.FieldDict, error) {
- i.mutex.RLock()
-
- if !i.open {
- i.mutex.RUnlock()
- return nil, ErrorIndexClosed
- }
-
- err := i.isAliasToSingleIndex()
- if err != nil {
- i.mutex.RUnlock()
- return nil, err
- }
-
- fieldDict, err := i.indexes[0].FieldDictRange(field, startTerm, endTerm)
- if err != nil {
- i.mutex.RUnlock()
- return nil, err
- }
-
- return &indexAliasImplFieldDict{
- index: i,
- fieldDict: fieldDict,
- }, nil
- }
-
- func (i *indexAliasImpl) FieldDictPrefix(field string, termPrefix []byte) (index.FieldDict, error) {
- i.mutex.RLock()
-
- if !i.open {
- i.mutex.RUnlock()
- return nil, ErrorIndexClosed
- }
-
- err := i.isAliasToSingleIndex()
- if err != nil {
- i.mutex.RUnlock()
- return nil, err
- }
-
- fieldDict, err := i.indexes[0].FieldDictPrefix(field, termPrefix)
- if err != nil {
- i.mutex.RUnlock()
- return nil, err
- }
-
- return &indexAliasImplFieldDict{
- index: i,
- fieldDict: fieldDict,
- }, nil
- }
-
- func (i *indexAliasImpl) Close() error {
- i.mutex.Lock()
- defer i.mutex.Unlock()
-
- i.open = false
- return nil
- }
-
- func (i *indexAliasImpl) Mapping() mapping.IndexMapping {
- i.mutex.RLock()
- defer i.mutex.RUnlock()
-
- if !i.open {
- return nil
- }
-
- err := i.isAliasToSingleIndex()
- if err != nil {
- return nil
- }
-
- return i.indexes[0].Mapping()
- }
-
- func (i *indexAliasImpl) Stats() *IndexStat {
- i.mutex.RLock()
- defer i.mutex.RUnlock()
-
- if !i.open {
- return nil
- }
-
- err := i.isAliasToSingleIndex()
- if err != nil {
- return nil
- }
-
- return i.indexes[0].Stats()
- }
-
- func (i *indexAliasImpl) StatsMap() map[string]interface{} {
- i.mutex.RLock()
- defer i.mutex.RUnlock()
-
- if !i.open {
- return nil
- }
-
- err := i.isAliasToSingleIndex()
- if err != nil {
- return nil
- }
-
- return i.indexes[0].StatsMap()
- }
-
- func (i *indexAliasImpl) GetInternal(key []byte) ([]byte, error) {
- i.mutex.RLock()
- defer i.mutex.RUnlock()
-
- if !i.open {
- return nil, ErrorIndexClosed
- }
-
- err := i.isAliasToSingleIndex()
- if err != nil {
- return nil, err
- }
-
- return i.indexes[0].GetInternal(key)
- }
-
- func (i *indexAliasImpl) SetInternal(key, val []byte) error {
- i.mutex.RLock()
- defer i.mutex.RUnlock()
-
- if !i.open {
- return ErrorIndexClosed
- }
-
- err := i.isAliasToSingleIndex()
- if err != nil {
- return err
- }
-
- return i.indexes[0].SetInternal(key, val)
- }
-
- func (i *indexAliasImpl) DeleteInternal(key []byte) error {
- i.mutex.RLock()
- defer i.mutex.RUnlock()
-
- if !i.open {
- return ErrorIndexClosed
- }
-
- err := i.isAliasToSingleIndex()
- if err != nil {
- return err
- }
-
- return i.indexes[0].DeleteInternal(key)
- }
-
- func (i *indexAliasImpl) Advanced() (index.Index, store.KVStore, error) {
- i.mutex.RLock()
- defer i.mutex.RUnlock()
-
- if !i.open {
- return nil, nil, ErrorIndexClosed
- }
-
- err := i.isAliasToSingleIndex()
- if err != nil {
- return nil, nil, err
- }
-
- return i.indexes[0].Advanced()
- }
-
- func (i *indexAliasImpl) Add(indexes ...Index) {
- i.mutex.Lock()
- defer i.mutex.Unlock()
-
- i.indexes = append(i.indexes, indexes...)
- }
-
- func (i *indexAliasImpl) removeSingle(index Index) {
- for pos, in := range i.indexes {
- if in == index {
- i.indexes = append(i.indexes[:pos], i.indexes[pos+1:]...)
- break
- }
- }
- }
-
- func (i *indexAliasImpl) Remove(indexes ...Index) {
- i.mutex.Lock()
- defer i.mutex.Unlock()
-
- for _, in := range indexes {
- i.removeSingle(in)
- }
- }
-
- func (i *indexAliasImpl) Swap(in, out []Index) {
- i.mutex.Lock()
- defer i.mutex.Unlock()
-
- // add
- i.indexes = append(i.indexes, in...)
-
- // delete
- for _, ind := range out {
- i.removeSingle(ind)
- }
- }
-
- // createChildSearchRequest creates a separate
- // request from the original
- // For now, avoid data race on req structure.
- // TODO disable highlight/field load on child
- // requests, and add code to do this only on
- // the actual final results.
- // Perhaps that part needs to be optional,
- // could be slower in remote usages.
- func createChildSearchRequest(req *SearchRequest) *SearchRequest {
- rv := SearchRequest{
- Query: req.Query,
- Size: req.Size + req.From,
- From: 0,
- Highlight: req.Highlight,
- Fields: req.Fields,
- Facets: req.Facets,
- Explain: req.Explain,
- Sort: req.Sort.Copy(),
- IncludeLocations: req.IncludeLocations,
- Score: req.Score,
- }
- return &rv
- }
-
- type asyncSearchResult struct {
- Name string
- Result *SearchResult
- Err error
- }
-
- // MultiSearch executes a SearchRequest across multiple Index objects,
- // then merges the results. The indexes must honor any ctx deadline.
- func MultiSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*SearchResult, error) {
-
- searchStart := time.Now()
- asyncResults := make(chan *asyncSearchResult, len(indexes))
-
- // run search on each index in separate go routine
- var waitGroup sync.WaitGroup
-
- var searchChildIndex = func(in Index, childReq *SearchRequest) {
- rv := asyncSearchResult{Name: in.Name()}
- rv.Result, rv.Err = in.SearchInContext(ctx, childReq)
- asyncResults <- &rv
- waitGroup.Done()
- }
-
- waitGroup.Add(len(indexes))
- for _, in := range indexes {
- go searchChildIndex(in, createChildSearchRequest(req))
- }
-
- // on another go routine, close after finished
- go func() {
- waitGroup.Wait()
- close(asyncResults)
- }()
-
- var sr *SearchResult
- indexErrors := make(map[string]error)
-
- for asr := range asyncResults {
- if asr.Err == nil {
- if sr == nil {
- // first result
- sr = asr.Result
- } else {
- // merge with previous
- sr.Merge(asr.Result)
- }
- } else {
- indexErrors[asr.Name] = asr.Err
- }
- }
-
- // merge just concatenated all the hits
- // now lets clean it up
-
- // handle case where no results were successful
- if sr == nil {
- sr = &SearchResult{
- Status: &SearchStatus{
- Errors: make(map[string]error),
- },
- }
- }
-
- // sort all hits with the requested order
- if len(req.Sort) > 0 {
- sorter := newMultiSearchHitSorter(req.Sort, sr.Hits)
- sort.Sort(sorter)
- }
-
- // now skip over the correct From
- if req.From > 0 && len(sr.Hits) > req.From {
- sr.Hits = sr.Hits[req.From:]
- } else if req.From > 0 {
- sr.Hits = search.DocumentMatchCollection{}
- }
-
- // now trim to the correct size
- if req.Size > 0 && len(sr.Hits) > req.Size {
- sr.Hits = sr.Hits[0:req.Size]
- }
-
- // fix up facets
- for name, fr := range req.Facets {
- sr.Facets.Fixup(name, fr.Size)
- }
-
- // fix up original request
- sr.Request = req
- searchDuration := time.Since(searchStart)
- sr.Took = searchDuration
-
- // fix up errors
- if len(indexErrors) > 0 {
- if sr.Status.Errors == nil {
- sr.Status.Errors = make(map[string]error)
- }
- for indexName, indexErr := range indexErrors {
- sr.Status.Errors[indexName] = indexErr
- sr.Status.Total++
- sr.Status.Failed++
- }
- }
-
- return sr, nil
- }
-
- func (i *indexAliasImpl) NewBatch() *Batch {
- i.mutex.RLock()
- defer i.mutex.RUnlock()
-
- if !i.open {
- return nil
- }
-
- err := i.isAliasToSingleIndex()
- if err != nil {
- return nil
- }
-
- return i.indexes[0].NewBatch()
- }
-
- func (i *indexAliasImpl) Name() string {
- return i.name
- }
-
- func (i *indexAliasImpl) SetName(name string) {
- i.name = name
- }
-
- type indexAliasImplFieldDict struct {
- index *indexAliasImpl
- fieldDict index.FieldDict
- }
-
- func (f *indexAliasImplFieldDict) Next() (*index.DictEntry, error) {
- return f.fieldDict.Next()
- }
-
- func (f *indexAliasImplFieldDict) Close() error {
- defer f.index.mutex.RUnlock()
- return f.fieldDict.Close()
- }
-
- type multiSearchHitSorter struct {
- hits search.DocumentMatchCollection
- sort search.SortOrder
- cachedScoring []bool
- cachedDesc []bool
- }
-
- func newMultiSearchHitSorter(sort search.SortOrder, hits search.DocumentMatchCollection) *multiSearchHitSorter {
- return &multiSearchHitSorter{
- sort: sort,
- hits: hits,
- cachedScoring: sort.CacheIsScore(),
- cachedDesc: sort.CacheDescending(),
- }
- }
-
- func (m *multiSearchHitSorter) Len() int { return len(m.hits) }
- func (m *multiSearchHitSorter) Swap(i, j int) { m.hits[i], m.hits[j] = m.hits[j], m.hits[i] }
- func (m *multiSearchHitSorter) Less(i, j int) bool {
- c := m.sort.Compare(m.cachedScoring, m.cachedDesc, m.hits[i], m.hits[j])
- return c < 0
- }
|