diff --git a/baseorbitdb/events.go b/baseorbitdb/events.go new file mode 100644 index 0000000..e49969a --- /dev/null +++ b/baseorbitdb/events.go @@ -0,0 +1,18 @@ +package baseorbitdb + +import ( + "berty.tech/go-orbit-db/iface" + "github.com/libp2p/go-libp2p/core/peer" +) + +type EventExchangeHeads struct { + Peer peer.ID + Message *iface.MessageExchangeHeads +} + +func NewEventExchangeHeads(p peer.ID, msg *iface.MessageExchangeHeads) EventExchangeHeads { + return EventExchangeHeads{ + Peer: p, + Message: msg, + } +} diff --git a/baseorbitdb/events_handler.go b/baseorbitdb/events_handler.go new file mode 100644 index 0000000..d5dad13 --- /dev/null +++ b/baseorbitdb/events_handler.go @@ -0,0 +1,26 @@ +package baseorbitdb + +import ( + "context" + "fmt" + + ipfslog "berty.tech/go-ipfs-log" + "berty.tech/go-orbit-db/iface" +) + +func (o *orbitDB) handleEventExchangeHeads(ctx context.Context, e *iface.MessageExchangeHeads, store iface.Store) error { + untypedHeads := make([]ipfslog.Entry, len(e.Heads)) + for i, h := range e.Heads { + untypedHeads[i] = h + } + + o.logger.Debug(fmt.Sprintf("%s: Received %d heads for '%s':", o.PeerID().String(), len(untypedHeads), e.Address)) + + if len(untypedHeads) > 0 { + if err := store.Sync(ctx, untypedHeads); err != nil { + return fmt.Errorf("unable to sync heads: %w", err) + } + } + + return nil +} diff --git a/baseorbitdb/orbitdb.go b/baseorbitdb/orbitdb.go index a2fcf7a..98b85fc 100644 --- a/baseorbitdb/orbitdb.go +++ b/baseorbitdb/orbitdb.go @@ -18,7 +18,9 @@ import ( "berty.tech/go-orbit-db/cache/cacheleveldown" "berty.tech/go-orbit-db/iface" _ "berty.tech/go-orbit-db/internal/buildconstraints" // fail for bad go version - "berty.tech/go-orbit-db/stores" + "berty.tech/go-orbit-db/messagemarshaler" + "berty.tech/go-orbit-db/pubsub" + "berty.tech/go-orbit-db/pubsub/oneonone" "berty.tech/go-orbit-db/utils" cid "github.com/ipfs/go-cid" datastore "github.com/ipfs/go-datastore" @@ -108,7 +110,7 @@ type orbitDB struct { cache cache.Interface logger *zap.Logger tracer trace.Tracer - directChannelFactory iface.DirectChannelFactory + directChannel iface.DirectChannel messageMarshaler iface.MessageMarshaler // emitters @@ -178,17 +180,36 @@ func (o *orbitDB) setStore(address string, store iface.Store) { o.stores[address] = store } -func (o *orbitDB) closeAllStores() { +func (o *orbitDB) deleteStore(address string) { o.muStores.Lock() defer o.muStores.Unlock() + delete(o.stores, address) +} + +func (o *orbitDB) getStore(address string) (iface.Store, bool) { + o.muStores.RLock() + defer o.muStores.RUnlock() + + store, ok := o.stores[address] + + return store, ok +} + +func (o *orbitDB) closeAllStores() { + stores := []Store{} + + o.muStores.Lock() for _, store := range o.stores { + stores = append(stores, store) + } + o.muStores.Unlock() + + for _, store := range stores { if err := store.Close(); err != nil { o.logger.Error("unable to close store", zap.Error(err)) } } - - o.stores = map[string]Store{} } func (o *orbitDB) closeCache() { @@ -200,6 +221,12 @@ func (o *orbitDB) closeCache() { } } +func (o *orbitDB) closeDirectConnections() { + if err := o.directChannel.Close(); err != nil { + o.logger.Error("unable to close connection", zap.Error(err)) + } +} + func (o *orbitDB) closeKeyStore() { o.muKeyStore.Lock() defer o.muKeyStore.Unlock() @@ -314,6 +341,16 @@ func newOrbitDB(ctx context.Context, is coreapi.CoreAPI, identity *idp.Identity, options.EventBus = eventbus.NewBus() } + if options.DirectChannelFactory == nil { + options.DirectChannelFactory = oneonone.NewChannelFactory(is) + } + directConnections, err := makeDirectChannel(ctx, options.EventBus, options.DirectChannelFactory, &iface.DirectChannelOptions{ + Logger: options.Logger, + }) + if err != nil { + return nil, fmt.Errorf("unable to create a direct connection with peer: %w", err) + } + k, err := is.Key().Self(ctx) if err != nil { return nil, err @@ -324,6 +361,10 @@ func newOrbitDB(ctx context.Context, is coreapi.CoreAPI, identity *idp.Identity, options.PeerID = id } + if options.MessageMarshaler == nil { + options.MessageMarshaler = &messagemarshaler.JSONMarshaler{} + } + if options.Cache == nil { options.Cache = cacheleveldown.New(&cache.Options{Logger: options.Logger}) } @@ -344,7 +385,7 @@ func newOrbitDB(ctx context.Context, is coreapi.CoreAPI, identity *idp.Identity, directory: *options.Directory, eventBus: options.EventBus, stores: map[string]Store{}, - directChannelFactory: options.DirectChannelFactory, + directChannel: directConnections, closeKeystore: options.CloseKeystore, storeTypes: map[string]iface.StoreConstructor{}, accessControllerTypes: map[string]iface.AccessControllerConstructor{}, @@ -354,10 +395,15 @@ func newOrbitDB(ctx context.Context, is coreapi.CoreAPI, identity *idp.Identity, } // set new heads as stateful, so newly subscriber can replay last event in case they missed it - odb.emitters.newHeads, err = options.EventBus.Emitter(new(stores.EventExchangeHeads), eventbus.Stateful) + odb.emitters.newHeads, err = options.EventBus.Emitter(new(EventExchangeHeads), eventbus.Stateful) if err != nil { return nil, fmt.Errorf("unable to create global emitter: %w", err) } + + if err := odb.monitorDirectChannel(ctx, options.EventBus); err != nil { + return nil, fmt.Errorf("unable to monitor direct channel: %w", err) + } + return odb, nil } @@ -430,9 +476,14 @@ func NewOrbitDB(ctx context.Context, ipfs coreapi.CoreAPI, options *NewOrbitDBOp func (o *orbitDB) Close() error { o.closeAllStores() + o.closeDirectConnections() o.closeCache() o.closeKeyStore() + if err := o.emitters.newHeads.Close(); err != nil { + o.logger.Warn("unable to close emitter", zap.Error(err)) + } + o.cancel() return nil } @@ -718,23 +769,35 @@ func (o *orbitDB) createStore(ctx context.Context, storeType string, parsedDBAdd } } + if options.Logger == nil { + options.Logger = o.logger + } + + if options.CloseFunc == nil { + options.CloseFunc = func() {} + } + closeFunc := func() { + options.CloseFunc() + o.deleteStore(parsedDBAddress.String()) + } + store, err := storeFunc(o.IPFS(), identity, parsedDBAddress, &iface.NewStoreOptions{ - EventBus: options.EventBus, - AccessController: accessController, - Cache: options.Cache, - Replicate: options.Replicate, - Directory: *options.Directory, - SortFn: options.SortFn, - CacheDestroy: func() error { return o.cache.Destroy(o.directory, parsedDBAddress) }, - Logger: o.logger, - Tracer: o.tracer, - IO: options.IO, - StoreSpecificOpts: options.StoreSpecificOpts, - PubSub: o.pubsub, - MessageMarshaler: o.messageMarshaler, - PeerID: o.id, - DirectChannelFactory: o.directChannelFactory, - NewHeadsEmitter: o.emitters.newHeads, + EventBus: options.EventBus, + AccessController: accessController, + Cache: options.Cache, + Replicate: options.Replicate, + Directory: *options.Directory, + SortFn: options.SortFn, + CacheDestroy: func() error { return o.cache.Destroy(o.directory, parsedDBAddress) }, + Logger: options.Logger, + Tracer: o.tracer, + IO: options.IO, + StoreSpecificOpts: options.StoreSpecificOpts, + PubSub: o.pubsub, + MessageMarshaler: o.messageMarshaler, + PeerID: o.id, + DirectChannel: o.directChannel, + CloseFunc: closeFunc, }) if err != nil { return nil, fmt.Errorf("unable to instantiate store: %w", err) @@ -749,4 +812,56 @@ func (o *orbitDB) EventBus() event.Bus { return o.eventBus } +func (o *orbitDB) monitorDirectChannel(ctx context.Context, bus event.Bus) error { + sub, err := bus.Subscribe(new(iface.EventPubSubPayload), eventbus.BufSize(128)) + if err != nil { + return fmt.Errorf("unable to init pubsub subscriber: %w", err) + } + + go func() { + for { + var e interface{} + select { + case <-ctx.Done(): + return + case e = <-sub.Out(): + } + + evt := e.(iface.EventPubSubPayload) + + msg := iface.MessageExchangeHeads{} + if err := o.messageMarshaler.Unmarshal(evt.Payload, &msg); err != nil { + o.logger.Error("unable to unmarshal message payload", zap.Error(err)) + continue + } + + store, ok := o.getStore(msg.Address) + if !ok { + o.logger.Error("unable to get store from address", zap.Error(err)) + continue + } + + if err := o.handleEventExchangeHeads(ctx, &msg, store); err != nil { + o.logger.Error("unable to handle pubsub payload", zap.Error(err)) + continue + } + + if err := o.emitters.newHeads.Emit(NewEventExchangeHeads(evt.Peer, &msg)); err != nil { + o.logger.Warn("unable to emit new heads", zap.Error(err)) + } + } + }() + + return nil +} + +func makeDirectChannel(ctx context.Context, bus event.Bus, df iface.DirectChannelFactory, opts *iface.DirectChannelOptions) (iface.DirectChannel, error) { + emitter, err := pubsub.NewPayloadEmitter(bus) + if err != nil { + return nil, fmt.Errorf("unable to init pubsub emitter: %w", err) + } + + return df(ctx, emitter, opts) +} + var _ BaseOrbitDB = &orbitDB{} diff --git a/iface/interface.go b/iface/interface.go index e39340f..f74654b 100644 --- a/iface/interface.go +++ b/iface/interface.go @@ -53,6 +53,8 @@ type CreateDBOptions struct { IO ipfslog.IO Timeout time.Duration MessageMarshaler MessageMarshaler + Logger *zap.Logger + CloseFunc func() StoreSpecificOpts interface{} } @@ -345,12 +347,12 @@ type NewStoreOptions struct { Logger *zap.Logger Tracer trace.Tracer IO ipfslog.IO - StoreSpecificOpts interface{} PubSub PubSubInterface MessageMarshaler MessageMarshaler PeerID peer.ID - DirectChannelFactory DirectChannelFactory - NewHeadsEmitter event.Emitter + DirectChannel DirectChannel + CloseFunc func() + StoreSpecificOpts interface{} } type DirectChannelOptions struct { diff --git a/pubsub/directchannel/channel.go b/pubsub/directchannel/channel.go index da00da7..4064e68 100644 --- a/pubsub/directchannel/channel.go +++ b/pubsub/directchannel/channel.go @@ -82,6 +82,7 @@ func (d *directChannel) Connect(ctx context.Context, pid peer.ID) (err error) { // @NOTE(gfanton): we dont need this on direct channel // Close Closes the connection func (d *directChannel) Close() error { + d.host.RemoveStreamHandler(PROTOCOL) return d.emitter.Close() } diff --git a/stores/basestore/base_store.go b/stores/basestore/base_store.go index da87aaa..8617397 100644 --- a/stores/basestore/base_store.go +++ b/stores/basestore/base_store.go @@ -19,8 +19,6 @@ import ( "berty.tech/go-orbit-db/events" "berty.tech/go-orbit-db/iface" "berty.tech/go-orbit-db/messagemarshaler" - "berty.tech/go-orbit-db/pubsub" - "berty.tech/go-orbit-db/pubsub/oneonone" "berty.tech/go-orbit-db/pubsub/pubsubcoreapi" "berty.tech/go-orbit-db/stores" "berty.tech/go-orbit-db/stores/operation" @@ -72,7 +70,6 @@ type BaseStore struct { pubsub iface.PubSubInterface messageMarshaler iface.MessageMarshaler directChannel iface.DirectChannel - newHeadsEmitter event.Emitter muCache sync.RWMutex muIndex sync.RWMutex @@ -82,6 +79,7 @@ type BaseStore struct { tracer trace.Tracer ctx context.Context cancel context.CancelFunc + closeFunc func() // Deprecated: if possible don't use this, use EventBus() directly instead events.EventEmitter @@ -158,18 +156,7 @@ func (b *BaseStore) InitBaseStore(ipfs coreapi.CoreAPI, identity *identityprovid options.Logger = zap.NewNop() } - if options.DirectChannelFactory == nil { - options.DirectChannelFactory = oneonone.NewChannelFactory(ipfs) - } - directChannel, err := makeDirectChannel(b.ctx, options.EventBus, options.DirectChannelFactory, &iface.DirectChannelOptions{ - Logger: options.Logger, - }) - if err != nil { - return fmt.Errorf("unable to create a direct connection with peer: %w", err) - } - b.directChannel = directChannel - - b.newHeadsEmitter = options.NewHeadsEmitter + b.directChannel = options.DirectChannel k, err := ipfs.Key().Self(b.ctx) if err != nil { @@ -200,7 +187,7 @@ func (b *BaseStore) InitBaseStore(ipfs coreapi.CoreAPI, identity *identityprovid return fmt.Errorf("identity required") } - if err := b.generateEmitter(options.EventBus); err != nil { + if err := b.generateEmitter(b.eventBus); err != nil { return err } @@ -252,7 +239,7 @@ func (b *BaseStore) InitBaseStore(ipfs coreapi.CoreAPI, identity *identityprovid b.replicator, err = replicator.NewReplicator(b, options.ReplicationConcurrency, &replicator.Options{ Logger: b.logger, - EventBus: options.EventBus, + EventBus: b.eventBus, Tracer: b.tracer, }) if err != nil { @@ -274,6 +261,11 @@ func (b *BaseStore) InitBaseStore(ipfs coreapi.CoreAPI, identity *identityprovid options.Replicate = boolPtr(true) } + if options.CloseFunc == nil { + options.CloseFunc = func() {} + } + b.closeFunc = options.CloseFunc + b.options = options sub, err := b.replicator.EventBus().Subscribe(replicator.Events, eventbus.BufSize(128)) @@ -349,14 +341,14 @@ func (b *BaseStore) InitBaseStore(ipfs coreapi.CoreAPI, identity *identityprovid } }() - if err := b.monitorDirectChannel(); err != nil { - return fmt.Errorf("unable to monitor direct channel: %w", err) - } - // Subscribe to pubsub to get updates from peers, // this is what hooks us into the message propagation layer // and the p2p network if *options.Replicate { + if options.DirectChannel == nil { + return errors.New("replication needs DirectChannel") + } + if err := b.replicate(); err != nil { return fmt.Errorf("unable to start store replication: %w", err) } @@ -379,7 +371,9 @@ func (b *BaseStore) Close() error { return nil } - b.closeDirectConnections() + b.cancel() + + b.closeFunc() // Replicator teardown logic b.Replicator().Stop() @@ -404,17 +398,9 @@ func (b *BaseStore) Close() error { return fmt.Errorf("unable to close cache: %w", err) } - b.cancel() - return nil } -func (b *BaseStore) closeDirectConnections() { - if err := b.directChannel.Close(); err != nil { - b.logger.Error("unable to close connection", zap.Error(err)) - } -} - func (b *BaseStore) Address() address.Address { return b.address } @@ -426,63 +412,6 @@ func (b *BaseStore) Index() iface.StoreIndex { return b.index } -func (b *BaseStore) monitorDirectChannel() error { - sub, err := b.eventBus.Subscribe(new(iface.EventPubSubPayload), eventbus.BufSize(128)) - if err != nil { - return fmt.Errorf("unable to init pubsub subscriber: %w", err) - } - - go func() { - for { - var e interface{} - select { - case <-b.ctx.Done(): - return - case e = <-sub.Out(): - } - - evt := e.(iface.EventPubSubPayload) - - msg := iface.MessageExchangeHeads{} - if err := b.messageMarshaler.Unmarshal(evt.Payload, &msg); err != nil { - b.logger.Error("unable to unmarshal message payload", zap.Error(err)) - continue - } - - b.logger.Debug("exchanging heads", zap.String("address", msg.Address)) - if err := b.handleEventExchangeHeads(&msg); err != nil { - b.logger.Error("unable to handle pubsub payload", zap.Error(err)) - continue - } - - if b.newHeadsEmitter != nil { - if err := b.newHeadsEmitter.Emit(stores.NewEventExchangeHeads(evt.Peer, &msg)); err != nil { - b.logger.Warn("unable to emit new heads", zap.Error(err)) - } - } - } - }() - - return nil -} - -func (b *BaseStore) handleEventExchangeHeads(e *iface.MessageExchangeHeads) error { - untypedHeads := make([]ipfslog.Entry, len(e.Heads)) - for i, h := range e.Heads { - untypedHeads[i] = h - } - - b.logger.Debug(fmt.Sprintf("%s: Received %d heads for '%s':", b.peerID, len(untypedHeads), e.Address)) - - if len(untypedHeads) > 0 { - if err := b.Sync(b.ctx, untypedHeads); err != nil { - return fmt.Errorf("unable to sync heads: %w", err) - } - } - - return nil -} - func (b *BaseStore) Type() string { return "store" } @@ -1309,13 +1238,4 @@ func (c *CanAppendContext) GetLogEntries() []logac.LogEntry { return entries } -func makeDirectChannel(ctx context.Context, bus event.Bus, df iface.DirectChannelFactory, opts *iface.DirectChannelOptions) (iface.DirectChannel, error) { - emitter, err := pubsub.NewPayloadEmitter(bus) - if err != nil { - return nil, fmt.Errorf("unable to init pubsub emitter: %w", err) - } - - return df(ctx, emitter, opts) -} - var _ iface.Store = &BaseStore{} diff --git a/stores/events.go b/stores/events.go index a91ae3b..0b935f5 100644 --- a/stores/events.go +++ b/stores/events.go @@ -3,7 +3,6 @@ package stores import ( ipfslog "berty.tech/go-ipfs-log" "berty.tech/go-orbit-db/address" - "berty.tech/go-orbit-db/iface" "berty.tech/go-orbit-db/stores/replicator" cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p/core/peer" @@ -16,7 +15,6 @@ var Events = []interface{}{ new(EventLoad), new(EventReplicated), new(EventReplicate), - new(EventExchangeHeads), } type EventReplicate struct { @@ -139,16 +137,3 @@ func NewEventNewPeer(p peer.ID) EventNewPeer { Peer: p, } } - -// EventExchangeHeadsset An event as stateful, sent when new exchange head is done, so newly subscriber can replay last event in case they missed it -type EventExchangeHeads struct { - Peer peer.ID - Message *iface.MessageExchangeHeads -} - -func NewEventExchangeHeads(p peer.ID, msg *iface.MessageExchangeHeads) EventExchangeHeads { - return EventExchangeHeads{ - Peer: p, - Message: msg, - } -}