-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into migrate-gateway
- Loading branch information
Showing
7 changed files
with
766 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
package bitswap | ||
|
||
import ( | ||
bsnet "github.com/ipfs/go-libipfs/bitswap/network" | ||
|
||
tnet "github.com/libp2p/go-libp2p-testing/net" | ||
"github.com/libp2p/go-libp2p/core/peer" | ||
) | ||
|
||
// Network is an interface for generating bitswap network interfaces | ||
// based on a test network. | ||
type Network interface { | ||
Adapter(tnet.Identity, ...bsnet.NetOpt) bsnet.BitSwapNetwork | ||
|
||
HasPeer(peer.ID) bool | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
package bitswap | ||
|
||
import ( | ||
"math/rand" | ||
"time" | ||
|
||
"github.com/ipfs/go-ipfs-delay" | ||
) | ||
|
||
var sharedRNG = rand.New(rand.NewSource(time.Now().UnixNano())) | ||
|
||
// InternetLatencyDelayGenerator generates three clusters of delays, | ||
// typical of the type of peers you would encounter on the interenet. | ||
// Given a base delay time T, the wait time generated will be either: | ||
// 1. A normalized distribution around the base time | ||
// 2. A normalized distribution around the base time plus a "medium" delay | ||
// 3. A normalized distribution around the base time plus a "large" delay | ||
// The size of the medium & large delays are determined when the generator | ||
// is constructed, as well as the relative percentages with which delays fall | ||
// into each of the three different clusters, and the standard deviation for | ||
// the normalized distribution. | ||
// This can be used to generate a number of scenarios typical of latency | ||
// distribution among peers on the internet. | ||
func InternetLatencyDelayGenerator( | ||
mediumDelay time.Duration, | ||
largeDelay time.Duration, | ||
percentMedium float64, | ||
percentLarge float64, | ||
std time.Duration, | ||
rng *rand.Rand) delay.Generator { | ||
if rng == nil { | ||
rng = sharedRNG | ||
} | ||
|
||
return &internetLatencyDelayGenerator{ | ||
mediumDelay: mediumDelay, | ||
largeDelay: largeDelay, | ||
percentLarge: percentLarge, | ||
percentMedium: percentMedium, | ||
std: std, | ||
rng: rng, | ||
} | ||
} | ||
|
||
type internetLatencyDelayGenerator struct { | ||
mediumDelay time.Duration | ||
largeDelay time.Duration | ||
percentLarge float64 | ||
percentMedium float64 | ||
std time.Duration | ||
rng *rand.Rand | ||
} | ||
|
||
func (d *internetLatencyDelayGenerator) NextWaitTime(t time.Duration) time.Duration { | ||
clusterDistribution := d.rng.Float64() | ||
baseDelay := time.Duration(d.rng.NormFloat64()*float64(d.std)) + t | ||
if clusterDistribution < d.percentLarge { | ||
return baseDelay + d.largeDelay | ||
} else if clusterDistribution < d.percentMedium+d.percentLarge { | ||
return baseDelay + d.mediumDelay | ||
} | ||
return baseDelay | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
package bitswap | ||
|
||
import ( | ||
"math" | ||
"math/rand" | ||
"testing" | ||
"time" | ||
) | ||
|
||
const testSeed = 99 | ||
|
||
func TestInternetLatencyDelayNextWaitTimeDistribution(t *testing.T) { | ||
initialValue := 1000 * time.Millisecond | ||
deviation := 100 * time.Millisecond | ||
mediumDelay := 1000 * time.Millisecond | ||
largeDelay := 3000 * time.Millisecond | ||
percentMedium := 0.2 | ||
percentLarge := 0.4 | ||
buckets := make(map[string]int) | ||
internetLatencyDistributionDelay := InternetLatencyDelayGenerator( | ||
mediumDelay, | ||
largeDelay, | ||
percentMedium, | ||
percentLarge, | ||
deviation, | ||
rand.New(rand.NewSource(testSeed))) | ||
|
||
buckets["fast"] = 0 | ||
buckets["medium"] = 0 | ||
buckets["slow"] = 0 | ||
buckets["outside_1_deviation"] = 0 | ||
|
||
// strategy here is rather than mock randomness, just use enough samples to | ||
// get approximately the distribution you'd expect | ||
for i := 0; i < 10000; i++ { | ||
next := internetLatencyDistributionDelay.NextWaitTime(initialValue) | ||
if math.Abs((next - initialValue).Seconds()) <= deviation.Seconds() { | ||
buckets["fast"]++ | ||
} else if math.Abs((next - initialValue - mediumDelay).Seconds()) <= deviation.Seconds() { | ||
buckets["medium"]++ | ||
} else if math.Abs((next - initialValue - largeDelay).Seconds()) <= deviation.Seconds() { | ||
buckets["slow"]++ | ||
} else { | ||
buckets["outside_1_deviation"]++ | ||
} | ||
} | ||
totalInOneDeviation := float64(10000 - buckets["outside_1_deviation"]) | ||
oneDeviationPercentage := totalInOneDeviation / 10000 | ||
fastPercentageResult := float64(buckets["fast"]) / totalInOneDeviation | ||
mediumPercentageResult := float64(buckets["medium"]) / totalInOneDeviation | ||
slowPercentageResult := float64(buckets["slow"]) / totalInOneDeviation | ||
|
||
// see 68-95-99 rule for normal distributions | ||
if math.Abs(oneDeviationPercentage-0.6827) >= 0.1 { | ||
t.Fatal("Failed to distribute values normally based on standard deviation") | ||
} | ||
|
||
if math.Abs(fastPercentageResult+percentMedium+percentLarge-1) >= 0.1 { | ||
t.Fatal("Incorrect percentage of values distributed around fast delay time") | ||
} | ||
|
||
if math.Abs(mediumPercentageResult-percentMedium) >= 0.1 { | ||
t.Fatal("Incorrect percentage of values distributed around medium delay time") | ||
} | ||
|
||
if math.Abs(slowPercentageResult-percentLarge) >= 0.1 { | ||
t.Fatal("Incorrect percentage of values distributed around slow delay time") | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
package bitswap | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"testing" | ||
|
||
bsmsg "github.com/ipfs/go-libipfs/bitswap/message" | ||
bsnet "github.com/ipfs/go-libipfs/bitswap/network" | ||
|
||
delay "github.com/ipfs/go-ipfs-delay" | ||
mockrouting "github.com/ipfs/go-ipfs-routing/mock" | ||
blocks "github.com/ipfs/go-libipfs/blocks" | ||
|
||
tnet "github.com/libp2p/go-libp2p-testing/net" | ||
"github.com/libp2p/go-libp2p/core/peer" | ||
) | ||
|
||
func TestSendMessageAsyncButWaitForResponse(t *testing.T) { | ||
net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) | ||
responderPeer := tnet.RandIdentityOrFatal(t) | ||
waiter := net.Adapter(tnet.RandIdentityOrFatal(t)) | ||
responder := net.Adapter(responderPeer) | ||
|
||
var wg sync.WaitGroup | ||
|
||
wg.Add(1) | ||
|
||
expectedStr := "received async" | ||
|
||
responder.Start(lambda(func( | ||
ctx context.Context, | ||
fromWaiter peer.ID, | ||
msgFromWaiter bsmsg.BitSwapMessage) { | ||
|
||
msgToWaiter := bsmsg.New(true) | ||
msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr))) | ||
err := waiter.SendMessage(ctx, fromWaiter, msgToWaiter) | ||
if err != nil { | ||
t.Error(err) | ||
} | ||
})) | ||
t.Cleanup(responder.Stop) | ||
|
||
waiter.Start(lambda(func( | ||
ctx context.Context, | ||
fromResponder peer.ID, | ||
msgFromResponder bsmsg.BitSwapMessage) { | ||
|
||
// TODO assert that this came from the correct peer and that the message contents are as expected | ||
ok := false | ||
for _, b := range msgFromResponder.Blocks() { | ||
if string(b.RawData()) == expectedStr { | ||
wg.Done() | ||
ok = true | ||
} | ||
} | ||
|
||
if !ok { | ||
t.Fatal("Message not received from the responder") | ||
} | ||
})) | ||
t.Cleanup(waiter.Stop) | ||
|
||
messageSentAsync := bsmsg.New(true) | ||
messageSentAsync.AddBlock(blocks.NewBlock([]byte("data"))) | ||
errSending := waiter.SendMessage( | ||
context.Background(), responderPeer.ID(), messageSentAsync) | ||
if errSending != nil { | ||
t.Fatal(errSending) | ||
} | ||
|
||
wg.Wait() // until waiter delegate function is executed | ||
} | ||
|
||
type receiverFunc func(ctx context.Context, p peer.ID, | ||
incoming bsmsg.BitSwapMessage) | ||
|
||
// lambda returns a Receiver instance given a receiver function | ||
func lambda(f receiverFunc) bsnet.Receiver { | ||
return &lambdaImpl{ | ||
f: f, | ||
} | ||
} | ||
|
||
type lambdaImpl struct { | ||
f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) | ||
} | ||
|
||
func (lam *lambdaImpl) ReceiveMessage(ctx context.Context, | ||
p peer.ID, incoming bsmsg.BitSwapMessage) { | ||
lam.f(ctx, p, incoming) | ||
} | ||
|
||
func (lam *lambdaImpl) ReceiveError(err error) { | ||
// TODO log error | ||
} | ||
|
||
func (lam *lambdaImpl) PeerConnected(p peer.ID) { | ||
// TODO | ||
} | ||
func (lam *lambdaImpl) PeerDisconnected(peer.ID) { | ||
// TODO | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
package bitswap | ||
|
||
import ( | ||
"context" | ||
|
||
bsnet "github.com/ipfs/go-libipfs/bitswap/network" | ||
|
||
ds "github.com/ipfs/go-datastore" | ||
mockrouting "github.com/ipfs/go-ipfs-routing/mock" | ||
|
||
tnet "github.com/libp2p/go-libp2p-testing/net" | ||
"github.com/libp2p/go-libp2p/core/peer" | ||
mockpeernet "github.com/libp2p/go-libp2p/p2p/net/mock" | ||
) | ||
|
||
type peernet struct { | ||
mockpeernet.Mocknet | ||
routingserver mockrouting.Server | ||
} | ||
|
||
// StreamNet is a testnet that uses libp2p's MockNet | ||
func StreamNet(ctx context.Context, net mockpeernet.Mocknet, rs mockrouting.Server) (Network, error) { | ||
return &peernet{net, rs}, nil | ||
} | ||
|
||
func (pn *peernet) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) bsnet.BitSwapNetwork { | ||
client, err := pn.Mocknet.AddPeer(p.PrivateKey(), p.Address()) | ||
if err != nil { | ||
panic(err.Error()) | ||
} | ||
routing := pn.routingserver.ClientWithDatastore(context.TODO(), p, ds.NewMapDatastore()) | ||
return bsnet.NewFromIpfsHost(client, routing, opts...) | ||
} | ||
|
||
func (pn *peernet) HasPeer(p peer.ID) bool { | ||
for _, member := range pn.Mocknet.Peers() { | ||
if p == member { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
|
||
var _ Network = (*peernet)(nil) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
package bitswap | ||
|
||
import ( | ||
"math/rand" | ||
) | ||
|
||
type fixedRateLimitGenerator struct { | ||
rateLimit float64 | ||
} | ||
|
||
// FixedRateLimitGenerator returns a rate limit generatoe that always generates | ||
// the specified rate limit in bytes/sec. | ||
func FixedRateLimitGenerator(rateLimit float64) RateLimitGenerator { | ||
return &fixedRateLimitGenerator{rateLimit} | ||
} | ||
|
||
func (rateLimitGenerator *fixedRateLimitGenerator) NextRateLimit() float64 { | ||
return rateLimitGenerator.rateLimit | ||
} | ||
|
||
type variableRateLimitGenerator struct { | ||
rateLimit float64 | ||
std float64 | ||
rng *rand.Rand | ||
} | ||
|
||
// VariableRateLimitGenerator makes rate limites that following a normal distribution. | ||
func VariableRateLimitGenerator(rateLimit float64, std float64, rng *rand.Rand) RateLimitGenerator { | ||
if rng == nil { | ||
rng = sharedRNG | ||
} | ||
|
||
return &variableRateLimitGenerator{ | ||
std: std, | ||
rng: rng, | ||
rateLimit: rateLimit, | ||
} | ||
} | ||
|
||
func (rateLimitGenerator *variableRateLimitGenerator) NextRateLimit() float64 { | ||
return rateLimitGenerator.rng.NormFloat64()*rateLimitGenerator.std + rateLimitGenerator.rateLimit | ||
} |
Oops, something went wrong.