本站源代码
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

370 lines
9.9KB

  1. // Copyright (c) 2014 Couchbase, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package collector
  15. import (
  16. "context"
  17. "reflect"
  18. "time"
  19. "github.com/blevesearch/bleve/index"
  20. "github.com/blevesearch/bleve/search"
  21. "github.com/blevesearch/bleve/size"
  22. )
  23. var reflectStaticSizeTopNCollector int
  24. func init() {
  25. var coll TopNCollector
  26. reflectStaticSizeTopNCollector = int(reflect.TypeOf(coll).Size())
  27. }
  28. type collectorStore interface {
  29. // Add the document, and if the new store size exceeds the provided size
  30. // the last element is removed and returned. If the size has not been
  31. // exceeded, nil is returned.
  32. AddNotExceedingSize(doc *search.DocumentMatch, size int) *search.DocumentMatch
  33. Final(skip int, fixup collectorFixup) (search.DocumentMatchCollection, error)
  34. }
  35. // PreAllocSizeSkipCap will cap preallocation to this amount when
  36. // size+skip exceeds this value
  37. var PreAllocSizeSkipCap = 1000
  38. type collectorCompare func(i, j *search.DocumentMatch) int
  39. type collectorFixup func(d *search.DocumentMatch) error
  40. // TopNCollector collects the top N hits, optionally skipping some results
  41. type TopNCollector struct {
  42. size int
  43. skip int
  44. total uint64
  45. maxScore float64
  46. took time.Duration
  47. sort search.SortOrder
  48. results search.DocumentMatchCollection
  49. facetsBuilder *search.FacetsBuilder
  50. store collectorStore
  51. needDocIds bool
  52. neededFields []string
  53. cachedScoring []bool
  54. cachedDesc []bool
  55. lowestMatchOutsideResults *search.DocumentMatch
  56. updateFieldVisitor index.DocumentFieldTermVisitor
  57. dvReader index.DocValueReader
  58. }
  59. // CheckDoneEvery controls how frequently we check the context deadline
  60. const CheckDoneEvery = uint64(1024)
  61. // NewTopNCollector builds a collector to find the top 'size' hits
  62. // skipping over the first 'skip' hits
  63. // ordering hits by the provided sort order
  64. func NewTopNCollector(size int, skip int, sort search.SortOrder) *TopNCollector {
  65. hc := &TopNCollector{size: size, skip: skip, sort: sort}
  66. // pre-allocate space on the store to avoid reslicing
  67. // unless the size + skip is too large, then cap it
  68. // everything should still work, just reslices as necessary
  69. backingSize := size + skip + 1
  70. if size+skip > PreAllocSizeSkipCap {
  71. backingSize = PreAllocSizeSkipCap + 1
  72. }
  73. if size+skip > 10 {
  74. hc.store = newStoreHeap(backingSize, func(i, j *search.DocumentMatch) int {
  75. return hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, i, j)
  76. })
  77. } else {
  78. hc.store = newStoreSlice(backingSize, func(i, j *search.DocumentMatch) int {
  79. return hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, i, j)
  80. })
  81. }
  82. // these lookups traverse an interface, so do once up-front
  83. if sort.RequiresDocID() {
  84. hc.needDocIds = true
  85. }
  86. hc.neededFields = sort.RequiredFields()
  87. hc.cachedScoring = sort.CacheIsScore()
  88. hc.cachedDesc = sort.CacheDescending()
  89. return hc
  90. }
  91. func (hc *TopNCollector) Size() int {
  92. sizeInBytes := reflectStaticSizeTopNCollector + size.SizeOfPtr
  93. if hc.facetsBuilder != nil {
  94. sizeInBytes += hc.facetsBuilder.Size()
  95. }
  96. for _, entry := range hc.neededFields {
  97. sizeInBytes += len(entry) + size.SizeOfString
  98. }
  99. sizeInBytes += len(hc.cachedScoring) + len(hc.cachedDesc)
  100. return sizeInBytes
  101. }
  102. // Collect goes to the index to find the matching documents
  103. func (hc *TopNCollector) Collect(ctx context.Context, searcher search.Searcher, reader index.IndexReader) error {
  104. startTime := time.Now()
  105. var err error
  106. var next *search.DocumentMatch
  107. // pre-allocate enough space in the DocumentMatchPool
  108. // unless the size + skip is too large, then cap it
  109. // everything should still work, just allocates DocumentMatches on demand
  110. backingSize := hc.size + hc.skip + 1
  111. if hc.size+hc.skip > PreAllocSizeSkipCap {
  112. backingSize = PreAllocSizeSkipCap + 1
  113. }
  114. searchContext := &search.SearchContext{
  115. DocumentMatchPool: search.NewDocumentMatchPool(backingSize+searcher.DocumentMatchPoolSize(), len(hc.sort)),
  116. Collector: hc,
  117. }
  118. hc.dvReader, err = reader.DocValueReader(hc.neededFields)
  119. if err != nil {
  120. return err
  121. }
  122. hc.updateFieldVisitor = func(field string, term []byte) {
  123. if hc.facetsBuilder != nil {
  124. hc.facetsBuilder.UpdateVisitor(field, term)
  125. }
  126. hc.sort.UpdateVisitor(field, term)
  127. }
  128. dmHandlerMaker := MakeTopNDocumentMatchHandler
  129. if cv := ctx.Value(search.MakeDocumentMatchHandlerKey); cv != nil {
  130. dmHandlerMaker = cv.(search.MakeDocumentMatchHandler)
  131. }
  132. // use the application given builder for making the custom document match
  133. // handler and perform callbacks/invocations on the newly made handler.
  134. dmHandler, loadID, err := dmHandlerMaker(searchContext)
  135. if err != nil {
  136. return err
  137. }
  138. hc.needDocIds = hc.needDocIds || loadID
  139. select {
  140. case <-ctx.Done():
  141. return ctx.Err()
  142. default:
  143. next, err = searcher.Next(searchContext)
  144. }
  145. for err == nil && next != nil {
  146. if hc.total%CheckDoneEvery == 0 {
  147. select {
  148. case <-ctx.Done():
  149. return ctx.Err()
  150. default:
  151. }
  152. }
  153. err = hc.prepareDocumentMatch(searchContext, reader, next)
  154. if err != nil {
  155. break
  156. }
  157. err = dmHandler(next)
  158. if err != nil {
  159. break
  160. }
  161. next, err = searcher.Next(searchContext)
  162. }
  163. // help finalize/flush the results in case
  164. // of custom document match handlers.
  165. err = dmHandler(nil)
  166. if err != nil {
  167. return err
  168. }
  169. // compute search duration
  170. hc.took = time.Since(startTime)
  171. if err != nil {
  172. return err
  173. }
  174. // finalize actual results
  175. err = hc.finalizeResults(reader)
  176. if err != nil {
  177. return err
  178. }
  179. return nil
  180. }
  181. var sortByScoreOpt = []string{"_score"}
  182. func (hc *TopNCollector) prepareDocumentMatch(ctx *search.SearchContext,
  183. reader index.IndexReader, d *search.DocumentMatch) (err error) {
  184. // visit field terms for features that require it (sort, facets)
  185. if len(hc.neededFields) > 0 {
  186. err = hc.visitFieldTerms(reader, d)
  187. if err != nil {
  188. return err
  189. }
  190. }
  191. // increment total hits
  192. hc.total++
  193. d.HitNumber = hc.total
  194. // update max score
  195. if d.Score > hc.maxScore {
  196. hc.maxScore = d.Score
  197. }
  198. // see if we need to load ID (at this early stage, for example to sort on it)
  199. if hc.needDocIds {
  200. d.ID, err = reader.ExternalID(d.IndexInternalID)
  201. if err != nil {
  202. return err
  203. }
  204. }
  205. // compute this hits sort value
  206. if len(hc.sort) == 1 && hc.cachedScoring[0] {
  207. d.Sort = sortByScoreOpt
  208. } else {
  209. hc.sort.Value(d)
  210. }
  211. return nil
  212. }
  213. func MakeTopNDocumentMatchHandler(
  214. ctx *search.SearchContext) (search.DocumentMatchHandler, bool, error) {
  215. var hc *TopNCollector
  216. var ok bool
  217. if hc, ok = ctx.Collector.(*TopNCollector); ok {
  218. return func(d *search.DocumentMatch) error {
  219. if d == nil {
  220. return nil
  221. }
  222. // optimization, we track lowest sorting hit already removed from heap
  223. // with this one comparison, we can avoid all heap operations if
  224. // this hit would have been added and then immediately removed
  225. if hc.lowestMatchOutsideResults != nil {
  226. cmp := hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, d,
  227. hc.lowestMatchOutsideResults)
  228. if cmp >= 0 {
  229. // this hit can't possibly be in the result set, so avoid heap ops
  230. ctx.DocumentMatchPool.Put(d)
  231. return nil
  232. }
  233. }
  234. removed := hc.store.AddNotExceedingSize(d, hc.size+hc.skip)
  235. if removed != nil {
  236. if hc.lowestMatchOutsideResults == nil {
  237. hc.lowestMatchOutsideResults = removed
  238. } else {
  239. cmp := hc.sort.Compare(hc.cachedScoring, hc.cachedDesc,
  240. removed, hc.lowestMatchOutsideResults)
  241. if cmp < 0 {
  242. tmp := hc.lowestMatchOutsideResults
  243. hc.lowestMatchOutsideResults = removed
  244. ctx.DocumentMatchPool.Put(tmp)
  245. }
  246. }
  247. }
  248. return nil
  249. }, false, nil
  250. }
  251. return nil, false, nil
  252. }
  253. // visitFieldTerms is responsible for visiting the field terms of the
  254. // search hit, and passing visited terms to the sort and facet builder
  255. func (hc *TopNCollector) visitFieldTerms(reader index.IndexReader, d *search.DocumentMatch) error {
  256. if hc.facetsBuilder != nil {
  257. hc.facetsBuilder.StartDoc()
  258. }
  259. err := hc.dvReader.VisitDocValues(d.IndexInternalID, hc.updateFieldVisitor)
  260. if hc.facetsBuilder != nil {
  261. hc.facetsBuilder.EndDoc()
  262. }
  263. return err
  264. }
  265. // SetFacetsBuilder registers a facet builder for this collector
  266. func (hc *TopNCollector) SetFacetsBuilder(facetsBuilder *search.FacetsBuilder) {
  267. hc.facetsBuilder = facetsBuilder
  268. hc.neededFields = append(hc.neededFields, hc.facetsBuilder.RequiredFields()...)
  269. }
  270. // finalizeResults starts with the heap containing the final top size+skip
  271. // it now throws away the results to be skipped
  272. // and does final doc id lookup (if necessary)
  273. func (hc *TopNCollector) finalizeResults(r index.IndexReader) error {
  274. var err error
  275. hc.results, err = hc.store.Final(hc.skip, func(doc *search.DocumentMatch) error {
  276. if doc.ID == "" {
  277. // look up the id since we need it for lookup
  278. var err error
  279. doc.ID, err = r.ExternalID(doc.IndexInternalID)
  280. if err != nil {
  281. return err
  282. }
  283. }
  284. doc.Complete(nil)
  285. return nil
  286. })
  287. return err
  288. }
  289. // Results returns the collected hits
  290. func (hc *TopNCollector) Results() search.DocumentMatchCollection {
  291. return hc.results
  292. }
  293. // Total returns the total number of hits
  294. func (hc *TopNCollector) Total() uint64 {
  295. return hc.total
  296. }
  297. // MaxScore returns the maximum score seen across all the hits
  298. func (hc *TopNCollector) MaxScore() float64 {
  299. return hc.maxScore
  300. }
  301. // Took returns the time spent collecting hits
  302. func (hc *TopNCollector) Took() time.Duration {
  303. return hc.took
  304. }
  305. // FacetResults returns the computed facets results
  306. func (hc *TopNCollector) FacetResults() search.FacetResults {
  307. if hc.facetsBuilder != nil {
  308. return hc.facetsBuilder.Results()
  309. }
  310. return nil
  311. }
上海开阖软件有限公司 沪ICP备12045867号-1