Skip to content

Commit

Permalink
fix(RedisShardBackplane): Don't return execute-only workers in getSto…
Browse files Browse the repository at this point in the history
…rageWorkers (buildfarm#1813)

* fix(RedisShardBackplane): Don't return execute-only workers in getStorageWorkers()

BuildFarm now registers all workers in RedisBackplane. When an execute-only worker is registered, RedisShardSubscriber will add it to RedisBackplane's storageWorkers. Then it's possible for getStorageWorkers() to reurn execute-only workers.
To fix this bug, We add a field worker_type to the proto message WorkerChange.Add so that RedisShardSubscriber can tell if the worker type matches the expected value.

* Cleans up code to address review comments
  • Loading branch information
congt authored and Brian Dong committed Aug 23, 2024
1 parent 6c12082 commit 72acc9a
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ private void startSubscriptionThread() {
dequeueService = BuildfarmExecutors.getDequeuePool();
subscriber =
new RedisShardSubscriber(
watchers, storageWorkers, configs.getBackplane().getWorkerChannel(), subscriberService);
watchers, storageWorkers, WorkerType.STORAGE.getNumber(), configs.getBackplane().getWorkerChannel(), subscriberService);

operationSubscription =
new RedisShardSubscription(
Expand Down Expand Up @@ -624,13 +624,18 @@ public void observe(Operation operation) {
public void addWorker(ShardWorker shardWorker) throws IOException {
String json = JsonFormat.printer().print(shardWorker);
Timestamp effectiveAt = Timestamps.fromMillis(shardWorker.getFirstRegisteredAt());
WorkerChange.Add add =
WorkerChange.Add.newBuilder()
.setEffectiveAt(effectiveAt)
.setWorkerType(shardWorker.getWorkerType())
.build();
String workerChangeJson =
JsonFormat.printer()
.print(
WorkerChange.newBuilder()
.setEffectiveAt(toTimestamp(Instant.now()))
.setName(shardWorker.getEndpoint())
.setAdd(WorkerChange.Add.newBuilder().setEffectiveAt(effectiveAt).build())
.setAdd(add)
.build());
client.call(
jedis -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,19 @@ void complete() {

private final ListMultimap<String, TimedWatchFuture> watchers;
private final Map<String, ShardWorker> workers;
private final int workerChangeTypeMask;
private final String workerChannel;
private final Executor executor;

RedisShardSubscriber(
ListMultimap<String, TimedWatchFuture> watchers,
Map<String, ShardWorker> workers,
int workerChangeTypeMask,
String workerChannel,
Executor executor) {
this.watchers = watchers;
this.workers = workers;
this.workerChangeTypeMask = workerChangeTypeMask;
this.workerChannel = workerChannel;
this.executor = executor;
}
Expand Down Expand Up @@ -232,13 +235,16 @@ void onWorkerChange(WorkerChange workerChange) {
}

void addWorker(WorkerChange workerChange) {
synchronized (workers) {
workers.put(
workerChange.getName(),
ShardWorker.newBuilder()
.setEndpoint(workerChange.getName())
.setFirstRegisteredAt(Timestamps.toMillis(workerChange.getAdd().getEffectiveAt()))
.build());
if ((workerChange.getAdd().getWorkerType() & workerChangeTypeMask) != 0) {
synchronized (workers) {
workers.put(
workerChange.getName(),
ShardWorker.newBuilder()
.setEndpoint(workerChange.getName())
.setWorkerType(workerChange.getAdd().getWorkerType())
.setFirstRegisteredAt(Timestamps.toMillis(workerChange.getAdd().getEffectiveAt()))
.build());
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/main/protobuf/build/buildfarm/v1test/buildfarm.proto
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ message ShardWorker {
message WorkerChange {
message Add {
google.protobuf.Timestamp effectiveAt = 1;

int32 worker_type = 2;
}

message Remove {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

import build.buildfarm.instance.shard.RedisShardSubscriber.TimedWatchFuture;
import build.buildfarm.v1test.OperationChange;
import build.buildfarm.v1test.ShardWorker;
import build.buildfarm.v1test.WorkerChange;
import build.buildfarm.v1test.WorkerType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
Expand All @@ -39,9 +42,13 @@
import com.google.common.truth.Correspondence;
import com.google.longrunning.Operation;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -177,7 +184,7 @@ public void flush() {

RedisShardSubscriber createSubscriber(
ListMultimap<String, TimedWatchFuture> watchers, Executor executor) {
return new RedisShardSubscriber(watchers, /* workers= */ null, "worker-channel", executor);
return new RedisShardSubscriber(watchers, /* workers= */ null, WorkerType.NONE.getNumber(), "worker-channel", executor);
}

RedisShardSubscriber createSubscriber(ListMultimap<String, TimedWatchFuture> watchers) {
Expand Down Expand Up @@ -393,4 +400,40 @@ public void invalidOperationChangeIsIgnored() {

operationSubscriber.onMessage("invalid-operation-change", "not-json!#?");
}

@Test
public void addSupportedWorkerTypeOnWorkerChange() throws IOException {
Map<String, ShardWorker> workers = new HashMap<>();
int storageWorkerType = WorkerType.STORAGE.getNumber();
String workerChannel = "worker-channel";
RedisShardSubscriber operationSubscriber =
new RedisShardSubscriber(/* watchers */ null, workers, storageWorkerType, workerChannel, directExecutor());
String workerChangeJson =
JsonFormat.printer()
.print(
WorkerChange.newBuilder()
.setName("execute-worker")
.setAdd(WorkerChange.Add.newBuilder().setWorkerType(storageWorkerType).build())
.build());
operationSubscriber.onMessage(workerChannel, workerChangeJson);
assertThat(workers.size()).isEqualTo(1);
}

@Test
public void ignoreUnsupportedWorkerTypeOnWorkerChange() throws IOException {
Map<String, ShardWorker> workers = new HashMap<>();
int workerType = WorkerType.STORAGE.getNumber();
String workerChannel = "worker-channel";
RedisShardSubscriber operationSubscriber =
new RedisShardSubscriber(/* watchers */ null, workers, workerType, workerChannel, directExecutor());
String workerChangeJson =
JsonFormat.printer()
.print(
WorkerChange.newBuilder()
.setName("execute-worker")
.setAdd(WorkerChange.Add.newBuilder().setWorkerType(WorkerType.EXECUTE.getNumber()).build())
.build());
operationSubscriber.onMessage(workerChannel, workerChangeJson);
assertThat(workers.isEmpty()).isTrue();
}
}

0 comments on commit 72acc9a

Please sign in to comment.