diff --git a/src/main/java/build/buildfarm/instance/shard/RedisShardBackplane.java b/src/main/java/build/buildfarm/instance/shard/RedisShardBackplane.java index ceafdd114b..aaeecf5ed6 100644 --- a/src/main/java/build/buildfarm/instance/shard/RedisShardBackplane.java +++ b/src/main/java/build/buildfarm/instance/shard/RedisShardBackplane.java @@ -490,7 +490,7 @@ private void startSubscriptionThread() { dequeueService = BuildfarmExecutors.getDequeuePool(); subscriber = new RedisShardSubscriber( - watchers, storageWorkers, configs.getBackplane().getWorkerChannel(), subscriberService); + watchers, storageWorkers, WorkerType.STORAGE.getNumber(), configs.getBackplane().getWorkerChannel(), subscriberService); operationSubscription = new RedisShardSubscription( @@ -624,13 +624,18 @@ public void observe(Operation operation) { public void addWorker(ShardWorker shardWorker) throws IOException { String json = JsonFormat.printer().print(shardWorker); Timestamp effectiveAt = Timestamps.fromMillis(shardWorker.getFirstRegisteredAt()); + WorkerChange.Add add = + WorkerChange.Add.newBuilder() + .setEffectiveAt(effectiveAt) + .setWorkerType(shardWorker.getWorkerType()) + .build(); String workerChangeJson = JsonFormat.printer() .print( WorkerChange.newBuilder() .setEffectiveAt(toTimestamp(Instant.now())) .setName(shardWorker.getEndpoint()) - .setAdd(WorkerChange.Add.newBuilder().setEffectiveAt(effectiveAt).build()) + .setAdd(add) .build()); client.call( jedis -> { diff --git a/src/main/java/build/buildfarm/instance/shard/RedisShardSubscriber.java b/src/main/java/build/buildfarm/instance/shard/RedisShardSubscriber.java index edd2bfdfae..01096b440f 100644 --- a/src/main/java/build/buildfarm/instance/shard/RedisShardSubscriber.java +++ b/src/main/java/build/buildfarm/instance/shard/RedisShardSubscriber.java @@ -62,16 +62,19 @@ void complete() { private final ListMultimap watchers; private final Map workers; + private final int workerChangeTypeMask; private final String workerChannel; private final Executor executor; RedisShardSubscriber( ListMultimap watchers, Map workers, + int workerChangeTypeMask, String workerChannel, Executor executor) { this.watchers = watchers; this.workers = workers; + this.workerChangeTypeMask = workerChangeTypeMask; this.workerChannel = workerChannel; this.executor = executor; } @@ -232,13 +235,16 @@ void onWorkerChange(WorkerChange workerChange) { } void addWorker(WorkerChange workerChange) { - synchronized (workers) { - workers.put( - workerChange.getName(), - ShardWorker.newBuilder() - .setEndpoint(workerChange.getName()) - .setFirstRegisteredAt(Timestamps.toMillis(workerChange.getAdd().getEffectiveAt())) - .build()); + if ((workerChange.getAdd().getWorkerType() & workerChangeTypeMask) != 0) { + synchronized (workers) { + workers.put( + workerChange.getName(), + ShardWorker.newBuilder() + .setEndpoint(workerChange.getName()) + .setWorkerType(workerChange.getAdd().getWorkerType()) + .setFirstRegisteredAt(Timestamps.toMillis(workerChange.getAdd().getEffectiveAt())) + .build()); + } } } diff --git a/src/main/protobuf/build/buildfarm/v1test/buildfarm.proto b/src/main/protobuf/build/buildfarm/v1test/buildfarm.proto index a47f691438..96650ff927 100644 --- a/src/main/protobuf/build/buildfarm/v1test/buildfarm.proto +++ b/src/main/protobuf/build/buildfarm/v1test/buildfarm.proto @@ -281,6 +281,8 @@ message ShardWorker { message WorkerChange { message Add { google.protobuf.Timestamp effectiveAt = 1; + + int32 worker_type = 2; } message Remove { diff --git a/src/test/java/build/buildfarm/instance/shard/RedisShardSubscriberTest.java b/src/test/java/build/buildfarm/instance/shard/RedisShardSubscriberTest.java index 8169b01c69..1ac97373ce 100644 --- a/src/test/java/build/buildfarm/instance/shard/RedisShardSubscriberTest.java +++ b/src/test/java/build/buildfarm/instance/shard/RedisShardSubscriberTest.java @@ -30,6 +30,9 @@ import build.buildfarm.instance.shard.RedisShardSubscriber.TimedWatchFuture; import build.buildfarm.v1test.OperationChange; +import build.buildfarm.v1test.ShardWorker; +import build.buildfarm.v1test.WorkerChange; +import build.buildfarm.v1test.WorkerType; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.ListMultimap; @@ -39,9 +42,13 @@ import com.google.common.truth.Correspondence; import com.google.longrunning.Operation; import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.JsonFormat; +import java.io.IOException; import java.time.Instant; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; @@ -177,7 +184,7 @@ public void flush() { RedisShardSubscriber createSubscriber( ListMultimap watchers, Executor executor) { - return new RedisShardSubscriber(watchers, /* workers= */ null, "worker-channel", executor); + return new RedisShardSubscriber(watchers, /* workers= */ null, WorkerType.NONE.getNumber(), "worker-channel", executor); } RedisShardSubscriber createSubscriber(ListMultimap watchers) { @@ -393,4 +400,40 @@ public void invalidOperationChangeIsIgnored() { operationSubscriber.onMessage("invalid-operation-change", "not-json!#?"); } + + @Test + public void addSupportedWorkerTypeOnWorkerChange() throws IOException { + Map workers = new HashMap<>(); + int storageWorkerType = WorkerType.STORAGE.getNumber(); + String workerChannel = "worker-channel"; + RedisShardSubscriber operationSubscriber = + new RedisShardSubscriber(/* watchers */ null, workers, storageWorkerType, workerChannel, directExecutor()); + String workerChangeJson = + JsonFormat.printer() + .print( + WorkerChange.newBuilder() + .setName("execute-worker") + .setAdd(WorkerChange.Add.newBuilder().setWorkerType(storageWorkerType).build()) + .build()); + operationSubscriber.onMessage(workerChannel, workerChangeJson); + assertThat(workers.size()).isEqualTo(1); + } + + @Test + public void ignoreUnsupportedWorkerTypeOnWorkerChange() throws IOException { + Map workers = new HashMap<>(); + int workerType = WorkerType.STORAGE.getNumber(); + String workerChannel = "worker-channel"; + RedisShardSubscriber operationSubscriber = + new RedisShardSubscriber(/* watchers */ null, workers, workerType, workerChannel, directExecutor()); + String workerChangeJson = + JsonFormat.printer() + .print( + WorkerChange.newBuilder() + .setName("execute-worker") + .setAdd(WorkerChange.Add.newBuilder().setWorkerType(WorkerType.EXECUTE.getNumber()).build()) + .build()); + operationSubscriber.onMessage(workerChannel, workerChangeJson); + assertThat(workers.isEmpty()).isTrue(); + } }