-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure the download algorithm exhausts all hosts that have the sector #1606
base: dev
Are you sure you want to change the base?
Conversation
ce5edbc
to
ce2e944
Compare
3923885
to
96b5198
Compare
@@ -245,7 +245,7 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster { | |||
tt := test.NewTT(t) | |||
|
|||
// Ensure we don't hang | |||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute) | |||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a test cluster hangs for a minute something is very wrong imo
download("file4", data4, 0, int64(len(data4))) | ||
tt.Retry(100, 100*time.Millisecond, func() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem like something that should be non-deterministic?
sectors = append(sectors[:i], sectors[i+1:]...) | ||
s.unusedHostSectors[host] = sectors | ||
} | ||
// update pending sectors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get why we need this loop here with an index pointing from a sector to completed
instead of using a single sectors []*sectorDownloadInfo
where each sectorDownloadInfo
has a data []byte
field which can be used to store the downloaded data as well as figure out whether the download is complete.
} | ||
for _, hk := range sector.hks { | ||
if _, ok := candidates[hk]; !ok { | ||
candidates[hk] = sector |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is something I don't get and also what I believe makes this code kinda spaghetti.
At this point the sectors are sorted by number of times they have been selected in ascending order. So I'd loop over the sectors and then find the fastest host of the current sector, select it and launch it. If a sector can't be launched, move on to the next one.
I don't get why you collect candidates across sectors or need a minSelected
or exhausted
variable. The goto
is also a pretty good clue that something here is not quite right.
Imo exhausted
is also implicit. e.g. if you get through all sectors from least selected to most selected without launching a single host.
for i, hk := range s.hks { | ||
if hk == h { | ||
s.hks = append(s.hks[:i], s.hks[i+1:]...) // remove the host | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd increment selected
here to make sure it is only incremented when a valid host is selected.
This PR alters the download algorithm in a way we try and download the sector from all hosts it's on, not just the latest host, which is a field we'll be removing in #1597