-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
configurable providerquerymanager #641
base: main
Are you sure you want to change the base?
Conversation
a6677c4
to
f538d1f
Compare
Codecov ReportAttention: Patch coverage is
@@ Coverage Diff @@
## main #641 +/- ##
==========================================
- Coverage 60.43% 60.33% -0.11%
==========================================
Files 243 244 +1
Lines 31059 31034 -25
==========================================
- Hits 18771 18724 -47
- Misses 10628 10641 +13
- Partials 1660 1669 +9
... and 7 files with indirect coverage changes 🚨 Try these New Features:
|
…le to turn off the ProviderQueryManager
…add options, add WithMaxTimeout option
…rocessRequests option
f538d1f
to
e95eeb2
Compare
FindProvidersAsync(ctx context.Context, k cid.Cid) <-chan peer.ID | ||
} | ||
|
||
type FindAllProviders struct { | ||
Router bsnet.BitSwapNetwork | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're going to keep the function here as not taking a count
the name should get changed from FindProvidersAsync
to one that doesn't collide with the more widely used function that does take a count so implementations can use both if they want.
provCh := make(chan peer.ID) | ||
wg := &sync.WaitGroup{} | ||
for p := range providers { | ||
wg.Add(1) | ||
go func(p peer.ID) { | ||
defer wg.Done() | ||
span.AddEvent("FoundProvider", trace.WithAttributes(attribute.Stringer("peer", p))) | ||
err := r.Router.ConnectTo(ctx, p) | ||
if err != nil { | ||
span.RecordError(err, trace.WithAttributes(attribute.Stringer("peer", p))) | ||
log.Debugf("failed to connect to provider %s: %s", p, err) | ||
return | ||
} | ||
span.AddEvent("ConnectedToProvider", trace.WithAttributes(attribute.Stringer("peer", p))) | ||
select { | ||
case provCh <- p: | ||
case <-ctx.Done(): | ||
return | ||
} | ||
}(p) | ||
} | ||
go func() { | ||
wg.Wait() | ||
close(provCh) | ||
}() | ||
return provCh |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this was copied out of the ProviderQueryManager because the internals (or at least the tests) seem to rely on having the connections made explicit here. It seems reasonable to extract this up a level out of the routing component though.
@aschmahmann Does this mean we still want the provider query manager, and need to close #536 which removes it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this comes with the need not to replace the default provider itself, but to be able to create it by hand and turn the knobs (and guessing it's mostly about increasing the amount of workers).
It is similar to #535 although that includes some other things (FindProvidersAsync returns AddrInfos instead of PeerIDs, and Router and Network are separate things).
If we need it we can merge this, otherwise the direction of #535 seems better but I guess it's a more invasive change (haven't followed too much). Code-wise I don't see anything weird, so green light on that front.
|
||
func WithClientOption(opt client.Option) Option { | ||
return Option{opt} | ||
} | ||
|
||
func WithServerOption(opt server.Option) Option { | ||
return Option{opt} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was an unrelated change and should be a separate PR, I'll raise it separately. TLDR though is that bitswap.New
can take options for the client or the server, but right now the only way to construct them is if they're forwarded as regular Bitswap options, otherwise you have to construct the client and server yourself. This saves on duplication while being more explicit and was something I ran into here because a new client option was introduced.
bitswap/client/client.go
Outdated
func WithDefaultLookupManagement(b bool) Option { | ||
return func(bs *Client) { | ||
bs.useDefaultLookupManagement = true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In #535 , the Network is separated from the Router and that makes sense to me. As a user, if I want to use a custom Router, I am going to have to glue the normal Network (ConnectTo) and my custom Router (FindProvidersAsync) into a custom BitswapNetwork. I'm not sure if that requires much more refactoring though, given that #535 is based on other additional changes.
This would also allow to disable the ProviderQueryManager altogether when it is not passed in (here it's impossible).
Observations: - ContentRouting interface returns peer.AddrInfo but we used peer.ID everywhere. - The ProviderQueryManager only used the bitswap.Network for ConnectTo(peerID) - ConnecTo(peerID) is actually translated to Host.Connect(addrInfo) - The ProviderQueryManager can just use a libp2p.Host.Connect() or it can use a Network if the method in Network was named Connect and not ConnectTo(). - Since ContentRouting returns AddrInfos, ConnectTo() should have been taking AddrInfos anyways! - Since now we can connect using AddrInfos from the ProviderQueryManager, these are not "lost" and we do not need BitswapNetwork to be doing AddAddrs() on every provided because the ProviderQueryManager could only do a "Connect()" by peer ID. BitswapNetwork should probably stop being a content router, but for now it serves as convenience to not having to pass a Router around more explicitally. Based on that: - ProviderQueryManager does not need a Bitswap Network, it needs a ContentRouter and a Dialer (which can be the Network or can be a libp2p.Host) - FindProvidersAsync() interfaces are aligned all around so that this work. - ConnectTo(peerID) -> Connect(AddrInfo) for sanity and simplicity - BitSwapNetwork no longer does AddAddrs and calls the ContentRouter directly.
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID { | ||
out := make(chan peer.ID, max) | ||
go func() { | ||
defer close(out) | ||
providers := bsnet.routing.FindProvidersAsync(ctx, k, max) | ||
for info := range providers { | ||
if info.ID == bsnet.host.ID() { | ||
continue // ignore self as provider | ||
} | ||
bsnet.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.TempAddrTTL) | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case out <- info.ID: | ||
} | ||
} | ||
}() | ||
return out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of this is not needed because our query manager is doing Connect(peerAddr)
instead of ConnectTo(peerID)
. Addresses will be absorbed into the peerstore directly. At his point BSnet is only a convenience to have content router available where it is not passed explicitally.
span.AddEvent("FoundProvider", trace.WithAttributes(attribute.Stringer("peer", p))) | ||
err := pqm.network.ConnectTo(findProviderCtx, p) | ||
span.AddEvent("FoundProvider", trace.WithAttributes(attribute.Stringer("peer", p.ID))) | ||
err := pqm.dialer.Connect(findProviderCtx, p) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The beauty is this can now be a libp2p.Host directly. No longer need to rely on the network having absorbed provider peer addresses into the peerstore prior to connecting because we have them here already.
bitswap/client/client.go
Outdated
pqm := bspqm.New(ctx, network) | ||
if bs.pqm == nil { // not set with the options | ||
// network can do dialing and also content routing. | ||
pqm, err := rpqm.New(ctx, network, network, rpqm.WithMaxProviders(10)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't want to pass-in a Host and a ContentRouter directly to the Client, we need to rely on Network, which has them, but it would be better to remove Content Routing capabilities from the Network because it is just wrapping the content Router and nothing else at this point.
lol I meant to just comment, not approve my own changes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @hsanjuan for the changes. Looks reasonable to me, left a couple questions
@@ -51,7 +51,7 @@ func (c *client) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-c | |||
go func() { | |||
defer close(out) | |||
for i, p := range c.server.Providers(k) { | |||
if max <= i { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an unrelated fix due to this code not understanding that 0 means infinity, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know, you did this change 🤷♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, lol. Yeah, that's why it's there. I was noticing some weirdness with testing.
Sync with @aschmahmann :
I think this is the gist @aschmahmann . I think the last option would be ok (there's breakage but it both a) has a stronger reason b) is easy to fix. |
Do you mean to do this?
Existing consumers would migrate by moving their ContentRouter from bitswap.Network into the bitswap.Client option If so, that sounds reasonable to me. If you think pulling out the Providing is too much to do here we could also do something like:
|
7d75a7e
to
94d755b
Compare
94d755b
to
0ca70f4
Compare
@aschmahmann @gammazero I have a question: I was going to port the "Provider" logic from bitswap server, with its workers and queues and what not. Then I saw that we have a provider.System (the reprovider) that does have queues, does batching and even stores pending provides to disk. Naively my thought is that, if such logic is already implemented in the reprovider, a providing.Exchange should not yet have its own way and we should just pass-in the reprovider when using it. Plus I suspect that we make use of things like Accelerated-DHT-providing, What do you think? |
Makes sense to me to leave it out. It's unclear how useful the parallel queues were beforehand, but it seems like the kind of thing that's situational enough that people can choose to add whatever queues they need and we should start off keeping it simple and encouraging reuse of the existing queue. |
This aligns the ProviderQueryManager with the routing.ContentDiscovery interface.
b7c5f16
to
19cd67f
Compare
ipfs/kubo#10595 is testing this. |
fixes #640
This is an attempt at splitting the ProviderQueryManager out of Bitswap so that we can configure it more in consumers.