|
- package rupture
-
- import (
- "fmt"
- "hash/fnv"
- "path/filepath"
- "strconv"
-
- "github.com/blevesearch/bleve"
- "github.com/blevesearch/bleve/document"
- "github.com/blevesearch/bleve/mapping"
- )
-
- // ShardedIndex an index that is built onto of multiple underlying bleve
- // indices (i.e. shards). Similar to bleve's index aliases, some methods may
- // not be supported.
- type ShardedIndex interface {
- bleve.Index
- shards() []bleve.Index
- }
-
- // a type alias for bleve.Index, so that the anonymous field of
- // shardedIndex does not conflict with the Index(..) method.
- type bleveIndex bleve.Index
-
- type shardedIndex struct {
- bleveIndex
- indices []bleve.Index
- }
-
- func hash(id string, n int) uint64 {
- fnvHash := fnv.New64()
- fnvHash.Write([]byte(id))
- return fnvHash.Sum64() % uint64(n)
- }
-
- func childIndexerPath(rootPath string, i int) string {
- return filepath.Join(rootPath, strconv.Itoa(i))
- }
-
- // NewShardedIndex creates a sharded index at the specified path, with the
- // specified mapping and number of shards.
- func NewShardedIndex(path string, mapping mapping.IndexMapping, numShards int) (ShardedIndex, error) {
- if numShards <= 0 {
- return nil, fmt.Errorf("Invalid number of shards: %d", numShards)
- }
- err := writeJSON(shardedIndexMetadataPath(path), &shardedIndexMetadata{NumShards: numShards})
- if err != nil {
- return nil, err
- }
-
- s := &shardedIndex{
- indices: make([]bleve.Index, numShards),
- }
- for i := 0; i < numShards; i++ {
- s.indices[i], err = bleve.New(childIndexerPath(path, i), mapping)
- if err != nil {
- return nil, err
- }
- }
- s.bleveIndex = bleve.NewIndexAlias(s.indices...)
- return s, nil
- }
-
- // OpenShardedIndex opens a sharded index at the specified path.
- func OpenShardedIndex(path string) (ShardedIndex, error) {
- var meta shardedIndexMetadata
- var err error
- if err = readJSON(shardedIndexMetadataPath(path), &meta); err != nil {
- return nil, err
- }
-
- s := &shardedIndex{
- indices: make([]bleve.Index, meta.NumShards),
- }
- for i := 0; i < meta.NumShards; i++ {
- s.indices[i], err = bleve.Open(childIndexerPath(path, i))
- if err != nil {
- return nil, err
- }
- }
- s.bleveIndex = bleve.NewIndexAlias(s.indices...)
- return s, nil
- }
-
- func (s *shardedIndex) Index(id string, data interface{}) error {
- return s.indices[hash(id, len(s.indices))].Index(id, data)
- }
-
- func (s *shardedIndex) Delete(id string) error {
- return s.indices[hash(id, len(s.indices))].Delete(id)
- }
-
- func (s *shardedIndex) Document(id string) (*document.Document, error) {
- return s.indices[hash(id, len(s.indices))].Document(id)
- }
-
- func (s *shardedIndex) Close() error {
- if err := s.bleveIndex.Close(); err != nil {
- return err
- }
- for _, index := range s.indices {
- if err := index.Close(); err != nil {
- return err
- }
- }
- return nil
- }
-
- func (s *shardedIndex) shards() []bleve.Index {
- return s.indices
- }
-
- type shardedIndexFlushingBatch struct {
- batches []*singleIndexFlushingBatch
- }
-
- // NewShardedFlushingBatch creates a flushing batch with the specified batch
- // size for the specified sharded index.
- func NewShardedFlushingBatch(index ShardedIndex, maxBatchSize int) FlushingBatch {
- indices := index.shards()
- b := &shardedIndexFlushingBatch{
- batches: make([]*singleIndexFlushingBatch, len(indices)),
- }
- for i, index := range indices {
- b.batches[i] = newFlushingBatch(index, maxBatchSize)
- }
- return b
- }
-
- func (b *shardedIndexFlushingBatch) Index(id string, data interface{}) error {
- return b.batches[hash(id, len(b.batches))].Index(id, data)
- }
-
- func (b *shardedIndexFlushingBatch) Delete(id string) error {
- return b.batches[hash(id, len(b.batches))].Delete(id)
- }
-
- func (b *shardedIndexFlushingBatch) Flush() error {
- for _, batch := range b.batches {
- if err := batch.Flush(); err != nil {
- return err
- }
- }
- return nil
- }
|