diff --git a/internal/infra/backend.go b/internal/infra/backend.go index 3352271..aa05ff1 100644 --- a/internal/infra/backend.go +++ b/internal/infra/backend.go @@ -31,6 +31,11 @@ import ( "github.com/rs/zerolog/log" ) +var ( + prefixPulled = []byte{0} //nolint:gochecknoglobals + prefixNormal = []byte{1} //nolint:gochecknoglobals +) + func decode(value []byte) ([]silo.DataNode, error) { var set map[silo.DataNode]any @@ -68,11 +73,11 @@ func encode(items []silo.DataNode) ([]byte, error) { } type Snapshot struct { - db *pebble.Batch + db *pebble.DB } func (s Snapshot) Next() (silo.DataNode, bool, error) { - iter, err := s.db.NewIter(&pebble.IterOptions{}) //nolint:exhaustruct + iter, err := s.db.NewIter(&pebble.IterOptions{LowerBound: prefixNormal}) //nolint:exhaustruct if errors.Is(err, pebble.ErrNotFound) { return silo.DataNode{Key: "", Data: ""}, false, nil } else if err != nil { @@ -85,7 +90,7 @@ func (s Snapshot) Next() (silo.DataNode, bool, error) { return silo.DataNode{Key: "", Data: ""}, false, nil } - key, err := silo.DecodeDataNode(iter.Key()) + key, err := silo.DecodeDataNode(iter.Key()[1:]) if err != nil { return silo.DataNode{Key: "", Data: ""}, false, fmt.Errorf("%w", err) } @@ -99,7 +104,10 @@ func (s Snapshot) PullAll(node silo.DataNode) ([]silo.DataNode, error) { return nil, fmt.Errorf("%w", err) } - item, closer, err := s.db.Get(key) + keyNormal := append(prefixNormal, key...) //nolint:gocritic + keyPulled := append(prefixPulled, key...) //nolint:gocritic + + item, closer, err := s.db.Get(keyNormal) if errors.Is(err, pebble.ErrNotFound) { return []silo.DataNode{}, nil } else if err != nil { @@ -112,7 +120,11 @@ func (s Snapshot) PullAll(node silo.DataNode) ([]silo.DataNode, error) { return nil, fmt.Errorf("%w", err) } - if err := s.db.Delete(key, pebble.NoSync); err != nil { + if err := s.db.Set(keyPulled, item, pebble.NoSync); err != nil { + return nil, fmt.Errorf("%w", err) + } + + if err := s.db.Delete(keyNormal, pebble.NoSync); err != nil { return nil, fmt.Errorf("%w", err) } @@ -120,10 +132,27 @@ func (s Snapshot) PullAll(node silo.DataNode) ([]silo.DataNode, error) { } func (s Snapshot) Close() error { - if err := s.db.Close(); err != nil { + iter, err := s.db.NewIter(&pebble.IterOptions{UpperBound: prefixNormal}) //nolint:exhaustruct + if err != nil && !errors.Is(err, pebble.ErrNotFound) { return fmt.Errorf("%w", err) } + defer iter.Close() + + for iter.First(); iter.Valid(); iter.Next() { + key := iter.Key()[1:] + keyNormal := append(prefixNormal, key...) //nolint:gocritic + keyPulled := append(prefixPulled, key...) //nolint:gocritic + + if err := s.db.Set(keyNormal, iter.Value(), pebble.NoSync); err != nil { + return fmt.Errorf("%w", err) + } + + if err := s.db.Delete(keyPulled, pebble.NoSync); err != nil { + return fmt.Errorf("%w", err) + } + } + return nil } @@ -137,7 +166,7 @@ func (b Backend) Get(node silo.DataNode) ([]silo.DataNode, error) { return nil, fmt.Errorf("%w", err) } - item, closer, err := b.db.Get(key) + item, closer, err := b.db.Get(append(prefixNormal, key...)) if errors.Is(err, pebble.ErrNotFound) { return []silo.DataNode{}, nil } else if err != nil { @@ -167,7 +196,7 @@ func (b Backend) Store(key silo.DataNode, value silo.DataNode) error { return fmt.Errorf("%w", err) } - if err := b.db.Set(rawKey, rawNodes, pebble.NoSync); err != nil { + if err := b.db.Set(append(prefixNormal, rawKey...), rawNodes, pebble.NoSync); err != nil { return fmt.Errorf("%w", err) } @@ -175,7 +204,7 @@ func (b Backend) Store(key silo.DataNode, value silo.DataNode) error { } func (b Backend) Snapshot() silo.Snapshot { //nolint:ireturn - return Snapshot{b.db.NewIndexedBatch()} + return Snapshot(b) } func (b Backend) Close() error {