-
Notifications
You must be signed in to change notification settings - Fork 150
Scale mutation queue with multiple shards #1048
Conversation
impl/sql/mutationstorage/queue.go
Outdated
} | ||
for rows.Next() { | ||
var shardID int64 | ||
rows.Scan(&shardID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
G104: Errors unhandled.
impl/sql/mutationstorage/queue.go
Outdated
} | ||
|
||
// ReadQueue reads all mutations that are still in the queue up to batchSize. | ||
func (m *Mutations) ReadQueue(ctx context.Context, domainID string, shardID, low, high int64) ([]*mutator.QueueMessage, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line is 128 characters
Codecov Report
@@ Coverage Diff @@
## master #1048 +/- ##
==========================================
+ Coverage 65.24% 65.26% +0.01%
==========================================
Files 39 39
Lines 2728 2764 +36
==========================================
+ Hits 1780 1804 +24
- Misses 630 633 +3
- Partials 318 327 +9
Continue to review full report at Codecov.
|
b0d86e4
to
3c088f0
Compare
Send - Write to multiple shards HighWatermarks - return multiple high watermarks
3c088f0
to
bf5e4e2
Compare
if err != nil { | ||
t.Fatalf("Failed to create Mutations: %v", err) | ||
} | ||
domainID := "foo" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
string foo
has 3 occurrences, make it a constant
impl/sql/mutationstorage/queue.go
Outdated
} | ||
for rows.Next() { | ||
var shardID int64 | ||
rows.Scan(&shardID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
G104: Errors unhandled.
bf5e4e2
to
75969ea
Compare
impl/sql/mutationstorage/queue.go
Outdated
} | ||
|
||
// randShard returns a random, enabled shard for domainID. | ||
func (m *Mutations) randShard(ctx context.Context, domainID string) (int64, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
47-73 lines are duplicate of impl/sql/mutationstorage/queue.go:92-118
impl/sql/mutationstorage/queue.go
Outdated
} | ||
|
||
// randomShard returns a random shard from the list of active shards for domainID. | ||
func (m *Mutations) randomShard(ctx context.Context, domainID string) (int64, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
92-118 lines are duplicate of impl/sql/mutationstorage/queue.go:47-73
75969ea
to
6ebf36c
Compare
Updated and nice and green. PTAL. |
core/adminserver/admin_server.go
Outdated
@@ -260,6 +270,13 @@ func (s *Server) CreateDomain(ctx context.Context, in *pb.CreateDomainRequest) ( | |||
}); err != nil { | |||
return nil, fmt.Errorf("adminserver: domains.Write(): %v", err) | |||
} | |||
|
|||
// Create shards for queue. | |||
shardIDs := []int64{1} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who is supposed to create shards if they are more than 1? This looks like a place for some TODO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created issue #1048
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably meant #1063
(putting it to create a link between github issues)
@@ -85,6 +85,12 @@ func (e *miniEnv) Close() { | |||
e.stopMockServer() | |||
} | |||
|
|||
type fakeQueueAdmin struct{} | |||
|
|||
func (*fakeQueueAdmin) AddShards(ctx context.Context, domainID string, shardIDs ...int64) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the receiver simply be taken by value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that would simplify things a bit.
@@ -36,8 +36,9 @@ message MapMetadata { | |||
// of logged items MUST be monotonically increasing. | |||
int64 highest_watermark = 2; | |||
} | |||
// source defines the range of inputs used for this map revision. | |||
SourceSlice source = 1; | |||
reserved 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you at a point where you can't afford breaking compatibility by simply removing this field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fortunately we're early enough that we're happy to break compatibility.
This PR is also introducing backwards-incompatible database schema changes as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that was my point. Why using reserved 1
instead of just dropping / reassigning the tag to the new field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good habits I suppose :-)
PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % nits.
core/sequencer/server.go
Outdated
ReadQueue(ctx context.Context, domainID string, low, high int64) ([]*mutator.QueueMessage, error) | ||
// HighWatermark returns the highest timestamp in the mutations table. | ||
HighWatermarks(ctx context.Context, domainID string) (map[int64]int64, error) | ||
// Read returns up to batchSize messages for domainID. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think yes. How about renaming the ReadQueue
method to just Read
, and nicely naming the interface (e.g. Logs
as you say)? I think that a more verbose interface name like ShardedMutationLog
or similar could be fine as well.
Thanks very much for the review! |
Tables with an incrementing value in the lower bits of the primary key are notoriously hard to scale because all the writes will go to a single server. This PR allows the queue to scale by adding write support for multiple queues.
Depends on: read by high/low watermarks #1045
Part of: retry-able mutations #1044