From 579ab9fdf001614c97f3655e53b5bfe2c790a17b Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Wed, 27 Mar 2024 10:43:54 +0800 Subject: [PATCH] fix(interactive): add store-state service back (#3664) --- .../groot/common/config/ZkConfig.java | 2 +- .../groot/coordinator/GraphInitializer.java | 10 +- .../groot/frontend/BatchDdlClient.java | 12 +- .../groot/frontend/ClientService.java | 309 ++++++++++-------- ...stClient.java => FrontendStoreClient.java} | 27 +- .../groot/frontend/SchemaWriter.java | 39 --- .../frontend/SnapshotUpdateCommitter.java | 32 -- .../groot/frontend/StoreIngestClients.java | 59 ---- .../groot/store/DefaultSnapshotCommitter.java | 34 -- ...Service.java => FrontendStoreService.java} | 20 +- .../groot/store/SnapshotCommitter.java | 20 -- .../groot/store/StoreWriteService.java | 53 --- .../graphscope/groot/store/WriterAgent.java | 16 +- .../graphscope/groot/servers/Frontend.java | 30 +- .../groot/servers/NodeLauncher.java | 8 +- .../graphscope/groot/servers/Store.java | 36 +- .../servers/ir/FrontendQueryManager.java | 9 +- .../groot/servers/ir/GaiaEngine.java | 6 +- .../groot/servers/ir/IrServiceProducer.java | 6 +- .../groot/tests/frontend/FrontendRpcTest.java | 6 +- .../groot/tests/store/StoreRpcTest.java | 12 - .../groot/tests/store/WriterAgentTest.java | 7 +- ...ice.proto => frontend_store_service.proto} | 3 +- proto/groot/store_write_service.proto | 4 - python/graphscope/client/connection.py | 7 + 25 files changed, 303 insertions(+), 464 deletions(-) rename interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/{StoreIngestClient.java => FrontendStoreClient.java} (80%) delete mode 100644 interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SchemaWriter.java delete mode 100644 interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SnapshotUpdateCommitter.java delete mode 100644 interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClients.java delete mode 100644 interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/DefaultSnapshotCommitter.java rename interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/{StoreIngestService.java => FrontendStoreService.java} (83%) delete mode 100644 interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotCommitter.java delete mode 100644 interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreWriteService.java rename proto/groot/{store_ingest_service.proto => frontend_store_service.proto} (91%) diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/ZkConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/ZkConfig.java index 1a8ac07e57b2..d87de555811b 100755 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/ZkConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/ZkConfig.java @@ -26,7 +26,7 @@ public class ZkConfig { Config.intConfig("zk.connection.timeout.ms", 1000); public static final Config ZK_SESSION_TIMEOUT_MS = - Config.intConfig("zk.session.timeout.ms", 10000); + Config.intConfig("zk.session.timeout.ms", 30000); public static final Config ZK_BASE_SLEEP_MS = Config.intConfig("zk.base.sleep.ms", 1000); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GraphInitializer.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GraphInitializer.java index c11dc1d7fecc..a51a00cf264b 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GraphInitializer.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GraphInitializer.java @@ -113,10 +113,12 @@ public void destroyAll() throws Exception { if (this.logService.initialized()) { this.logService.destroy(); } - String zkRoot = ZkConfig.ZK_BASE_PATH.get(configs); - Stat stat = this.curator.checkExists().forPath(zkRoot); - if (stat != null) { - this.curator.delete().deletingChildrenIfNeeded().forPath(zkRoot); + if (this.curator != null) { + String zkRoot = ZkConfig.ZK_BASE_PATH.get(configs); + Stat stat = this.curator.checkExists().forPath(zkRoot); + if (stat != null) { + this.curator.delete().deletingChildrenIfNeeded().forPath(zkRoot); + } } } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/BatchDdlClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/BatchDdlClient.java index 312137627a07..8d3ab2253310 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/BatchDdlClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/BatchDdlClient.java @@ -3,6 +3,7 @@ import com.alibaba.graphscope.groot.SnapshotCache; import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef; import com.alibaba.graphscope.groot.common.util.UuidUtils; +import com.alibaba.graphscope.groot.rpc.RoleClients; import com.alibaba.graphscope.groot.schema.ddl.DdlExecutors; import com.alibaba.graphscope.groot.schema.request.DdlException; import com.alibaba.graphscope.groot.schema.request.DdlRequestBatch; @@ -12,10 +13,12 @@ public class BatchDdlClient { private final DdlExecutors ddlExecutors; private final SnapshotCache snapshotCache; - private final SchemaWriter schemaWriter; + private final RoleClients schemaWriter; public BatchDdlClient( - DdlExecutors ddlExecutors, SnapshotCache snapshotCache, SchemaWriter schemaWriter) { + DdlExecutors ddlExecutors, + SnapshotCache snapshotCache, + RoleClients schemaWriter) { this.ddlExecutors = ddlExecutors; this.snapshotCache = snapshotCache; this.schemaWriter = schemaWriter; @@ -28,7 +31,8 @@ public long batchDdl(DdlRequestBatch ddlRequestBatch) { } catch (InvalidProtocolBufferException e) { throw new DdlException(e); } - return this.schemaWriter.submitBatchDdl( - UuidUtils.getBase64UUIDString(), "ddl", ddlRequestBatch); + return this.schemaWriter + .getClient(0) + .submitBatchDdl(UuidUtils.getBase64UUIDString(), "ddl", ddlRequestBatch.toProto()); } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java index 16ab8eee49e3..41b05991eb67 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java @@ -21,6 +21,7 @@ import com.alibaba.graphscope.groot.common.util.DataLoadTarget; import com.alibaba.graphscope.groot.meta.MetaService; import com.alibaba.graphscope.groot.metrics.MetricsAggregator; +import com.alibaba.graphscope.groot.rpc.RoleClients; import com.alibaba.graphscope.groot.schema.request.AddEdgeKindRequest; import com.alibaba.graphscope.groot.schema.request.CreateEdgeTypeRequest; import com.alibaba.graphscope.groot.schema.request.CreateVertexTypeRequest; @@ -46,19 +47,19 @@ public class ClientService extends ClientGrpc.ClientImplBase { private final SnapshotCache snapshotCache; private final MetricsAggregator metricsAggregator; - private final StoreIngestClients storeIngestor; + private final RoleClients frontendStoreClients; private final MetaService metaService; private final BatchDdlClient batchDdlClient; public ClientService( SnapshotCache snapshotCache, MetricsAggregator metricsAggregator, - StoreIngestClients storeIngestor, + RoleClients frontendStoreClients, MetaService metaService, BatchDdlClient batchDdlClient) { this.snapshotCache = snapshotCache; this.metricsAggregator = metricsAggregator; - this.storeIngestor = storeIngestor; + this.frontendStoreClients = frontendStoreClients; this.metaService = metaService; this.batchDdlClient = batchDdlClient; } @@ -339,37 +340,39 @@ public void ingestData( AtomicBoolean finished = new AtomicBoolean(false); for (int i = 0; i < storeCount; i++) { logger.info("Store [" + i + "] started to ingest..."); - this.storeIngestor.ingest( - i, - dataPath, - config, - new CompletionCallback() { - @Override - public void onCompleted(Void res) { - if (!finished.get() && counter.decrementAndGet() == 0) { - finish(null); - } - } - - @Override - public void onError(Throwable t) { - logger.error("failed ingest", t); - finish(t); - } - - private void finish(Throwable t) { - if (finished.getAndSet(true)) { - return; - } - logger.info("ingest finished. Error [" + t + "]"); - if (t != null) { - responseObserver.onError(t); - } else { - responseObserver.onNext(IngestDataResponse.newBuilder().build()); - responseObserver.onCompleted(); - } - } - }); + this.frontendStoreClients + .getClient(i) + .storeIngest( + dataPath, + config, + new CompletionCallback() { + @Override + public void onCompleted(Void res) { + if (!finished.get() && counter.decrementAndGet() == 0) { + finish(null); + } + } + + @Override + public void onError(Throwable t) { + logger.error("failed ingest", t); + finish(t); + } + + private void finish(Throwable t) { + if (finished.getAndSet(true)) { + return; + } + logger.info("ingest finished. Error [" + t + "]"); + if (t != null) { + responseObserver.onError(t); + } else { + responseObserver.onNext( + IngestDataResponse.newBuilder().build()); + responseObserver.onCompleted(); + } + } + }); } } @@ -382,36 +385,38 @@ public void clearIngest( AtomicBoolean finished = new AtomicBoolean(false); String dataPath = request.getDataPath(); for (int i = 0; i < storeCount; i++) { - this.storeIngestor.clearIngest( - i, - dataPath, - new CompletionCallback() { - @Override - public void onCompleted(Void res) { - if (!finished.get() && counter.decrementAndGet() == 0) { - finish(null); - } - } - - @Override - public void onError(Throwable t) { - logger.error("failed clear ingest", t); - finish(t); - } - - private void finish(Throwable t) { - if (finished.getAndSet(true)) { - return; - } - logger.info("clearing ingest finished. Error [" + t + "]"); - if (t != null) { - responseObserver.onError(t); - } else { - responseObserver.onNext(ClearIngestResponse.newBuilder().build()); - responseObserver.onCompleted(); - } - } - }); + this.frontendStoreClients + .getClient(i) + .storeClearIngest( + dataPath, + new CompletionCallback() { + @Override + public void onCompleted(Void res) { + if (!finished.get() && counter.decrementAndGet() == 0) { + finish(null); + } + } + + @Override + public void onError(Throwable t) { + logger.error("failed clear ingest", t); + finish(t); + } + + private void finish(Throwable t) { + if (finished.getAndSet(true)) { + return; + } + logger.info("clearing ingest finished. Error [" + t + "]"); + if (t != null) { + responseObserver.onError(t); + } else { + responseObserver.onNext( + ClearIngestResponse.newBuilder().build()); + responseObserver.onCompleted(); + } + } + }); } } @@ -424,39 +429,40 @@ public void reopenSecondary( AtomicInteger counter = new AtomicInteger(storeCount); AtomicBoolean finished = new AtomicBoolean(false); for (int i = 0; i < storeCount; i++) { - this.storeIngestor.reopenSecondary( - i, - new CompletionCallback() { - @Override - public void onCompleted(Void res) { - if (!finished.get() && counter.decrementAndGet() == 0) { - finish(null); - } - } - - @Override - public void onError(Throwable t) { - logger.error("failed reopen secondary", t); - finish(t); - } - - private void finish(Throwable t) { - if (finished.getAndSet(true)) { - return; - } - logger.info("reopen secondary finished. Error [" + t + "]"); - if (t != null) { - responseObserver.onError(t); - } else { - ReopenSecondaryResponse res = - ReopenSecondaryResponse.newBuilder() - .setSuccess(true) - .build(); - responseObserver.onNext(res); - responseObserver.onCompleted(); - } - } - }); + this.frontendStoreClients + .getClient(i) + .reopenSecondary( + new CompletionCallback() { + @Override + public void onCompleted(Void res) { + if (!finished.get() && counter.decrementAndGet() == 0) { + finish(null); + } + } + + @Override + public void onError(Throwable t) { + logger.error("failed reopen secondary", t); + finish(t); + } + + private void finish(Throwable t) { + if (finished.getAndSet(true)) { + return; + } + logger.info("reopen secondary finished. Error [" + t + "]"); + if (t != null) { + responseObserver.onError(t); + } else { + ReopenSecondaryResponse res = + ReopenSecondaryResponse.newBuilder() + .setSuccess(true) + .build(); + responseObserver.onNext(res); + responseObserver.onCompleted(); + } + } + }); } } @@ -468,37 +474,84 @@ public void compactDB( AtomicInteger counter = new AtomicInteger(storeCount); AtomicBoolean finished = new AtomicBoolean(false); for (int i = 0; i < storeCount; i++) { - this.storeIngestor.compactDB( - i, - new CompletionCallback() { - @Override - public void onCompleted(Void res) { - if (!finished.get() && counter.decrementAndGet() == 0) { - finish(null); - } - } - - @Override - public void onError(Throwable t) { - logger.error("failed compact", t); - finish(t); - } - - private void finish(Throwable t) { - if (finished.getAndSet(true)) { - return; - } - logger.info("compact finished. Error [" + t + "]"); - if (t != null) { - responseObserver.onError(t); - } else { - CompactDBResponse res = - CompactDBResponse.newBuilder().setSuccess(true).build(); - responseObserver.onNext(res); - responseObserver.onCompleted(); - } - } - }); + this.frontendStoreClients + .getClient(i) + .storeCompact( + new CompletionCallback() { + @Override + public void onCompleted(Void res) { + if (!finished.get() && counter.decrementAndGet() == 0) { + finish(null); + } + } + + @Override + public void onError(Throwable t) { + logger.error("failed compact", t); + finish(t); + } + + private void finish(Throwable t) { + if (finished.getAndSet(true)) { + return; + } + logger.info("compact finished. Error [" + t + "]"); + if (t != null) { + responseObserver.onError(t); + } else { + CompactDBResponse res = + CompactDBResponse.newBuilder() + .setSuccess(true) + .build(); + responseObserver.onNext(res); + responseObserver.onCompleted(); + } + } + }); + } + } + + @Override + public void getStoreState( + GetStoreStateRequest request, StreamObserver responseObserver) { + GetStoreStateResponse.Builder response = GetStoreStateResponse.newBuilder(); + logger.info("getStoreState"); + int storeCount = this.metaService.getStoreCount(); + AtomicInteger counter = new AtomicInteger(storeCount); + AtomicBoolean finished = new AtomicBoolean(false); + + for (int i = 0; i < storeCount; i++) { + this.frontendStoreClients + .getClient(i) + .getStoreState( + new CompletionCallback() { + @Override + public void onCompleted(GetStoreStateResponse res) { + response.mergeFrom(res); + if (!finished.get() && counter.decrementAndGet() == 0) { + finish(null); + } + } + + @Override + public void onError(Throwable t) { + logger.error("failed clear ingest", t); + finish(t); + } + + private void finish(Throwable t) { + if (finished.getAndSet(true)) { + return; + } + if (t != null) { + responseObserver.onError(t); + } else { + responseObserver.onNext( + GetStoreStateResponse.newBuilder().build()); + responseObserver.onCompleted(); + } + } + }); } } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendStoreClient.java similarity index 80% rename from interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClient.java rename to interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendStoreClient.java index 48d53b09c969..5a79f4c8cf73 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendStoreClient.java @@ -22,13 +22,13 @@ import java.util.Map; -public class StoreIngestClient extends RpcClient { +public class FrontendStoreClient extends RpcClient { - private final StoreIngestGrpc.StoreIngestStub stub; + private final FrontendStoreServiceGrpc.FrontendStoreServiceStub stub; - public StoreIngestClient(ManagedChannel channel) { + public FrontendStoreClient(ManagedChannel channel) { super(channel); - this.stub = StoreIngestGrpc.newStub(channel); + this.stub = FrontendStoreServiceGrpc.newStub(channel); } public void storeIngest( @@ -110,4 +110,23 @@ public void onError(Throwable t) { public void onCompleted() {} }); } + + public void getStoreState(CompletionCallback callback) { + this.stub.getState( + GetStoreStateRequest.newBuilder().build(), + new StreamObserver<>() { + @Override + public void onNext(GetStoreStateResponse value) { + callback.onCompleted(value); + } + + @Override + public void onError(Throwable t) { + callback.onError(t); + } + + @Override + public void onCompleted() {} + }); + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SchemaWriter.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SchemaWriter.java deleted file mode 100644 index 650204074132..000000000000 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SchemaWriter.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - *

http://www.apache.org/licenses/LICENSE-2.0 - * - *

Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.graphscope.groot.frontend; - -import com.alibaba.graphscope.groot.rpc.RoleClients; -import com.alibaba.graphscope.groot.schema.request.DdlRequestBatch; -import com.alibaba.graphscope.proto.groot.DdlRequestBatchPb; - -public class SchemaWriter { - - private final RoleClients schemaClients; - - public SchemaWriter(RoleClients schemaClients) { - this.schemaClients = schemaClients; - } - - public long submitBatchDdl( - String requestId, String sessionId, DdlRequestBatch ddlRequestBatch) { - return this.submitBatchDdl(requestId, sessionId, ddlRequestBatch.toProto()); - } - - public long submitBatchDdl( - String requestId, String sessionId, DdlRequestBatchPb ddlRequestBatchPb) { - return this.schemaClients - .getClient(0) - .submitBatchDdl(requestId, sessionId, ddlRequestBatchPb); - } -} diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SnapshotUpdateCommitter.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SnapshotUpdateCommitter.java deleted file mode 100644 index d5931bfb4436..000000000000 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SnapshotUpdateCommitter.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.graphscope.groot.frontend; - -import com.alibaba.graphscope.groot.common.RoleType; -import com.alibaba.graphscope.groot.rpc.ChannelManager; -import com.alibaba.graphscope.groot.rpc.RoleClients; - -// wrapper to create SnapshotUpdateClient by getting channel of Coordinator -public class SnapshotUpdateCommitter extends RoleClients { - public SnapshotUpdateCommitter(ChannelManager channelManager) { - super(channelManager, RoleType.COORDINATOR, SnapshotUpdateClient::new); - } - - public void updateSnapshot(int frontendId, long snapshotId) throws RuntimeException { - getClient(0).updateSnapshot(frontendId, snapshotId); - } -} diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClients.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClients.java deleted file mode 100644 index c2249376f973..000000000000 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClients.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - *

http://www.apache.org/licenses/LICENSE-2.0 - * - *

Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.graphscope.groot.frontend; - -import com.alibaba.graphscope.groot.CompletionCallback; -import com.alibaba.graphscope.groot.common.RoleType; -import com.alibaba.graphscope.groot.rpc.ChannelManager; -import com.alibaba.graphscope.groot.rpc.RoleClients; - -import io.grpc.ManagedChannel; - -import java.util.HashMap; -import java.util.Map; -import java.util.function.Function; - -public class StoreIngestClients extends RoleClients { - - public StoreIngestClients( - ChannelManager channelManager, - RoleType targetRole, - Function clientBuilder) { - super(channelManager, targetRole, clientBuilder); - } - - public void ingest(int storeId, String path, CompletionCallback callback) { - this.ingest(storeId, path, new HashMap(), callback); - } - - public void ingest( - int storeId, - String path, - Map config, - CompletionCallback callback) { - this.getClient(storeId).storeIngest(path, config, callback); - } - - public void clearIngest(int storeId, String path, CompletionCallback callback) { - this.getClient(storeId).storeClearIngest(path, callback); - } - - public void compactDB(int storeId, CompletionCallback callback) { - this.getClient(storeId).storeCompact(callback); - } - - public void reopenSecondary(int storeId, CompletionCallback callback) { - this.getClient(storeId).reopenSecondary(callback); - } -} diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/DefaultSnapshotCommitter.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/DefaultSnapshotCommitter.java deleted file mode 100644 index 4890e89676ad..000000000000 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/DefaultSnapshotCommitter.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - *

http://www.apache.org/licenses/LICENSE-2.0 - * - *

Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.graphscope.groot.store; - -import com.alibaba.graphscope.groot.common.RoleType; -import com.alibaba.graphscope.groot.rpc.ChannelManager; -import com.alibaba.graphscope.groot.rpc.RoleClients; - -import java.util.List; - -public class DefaultSnapshotCommitter extends RoleClients - implements SnapshotCommitter { - - public DefaultSnapshotCommitter(ChannelManager channelManager) { - super(channelManager, RoleType.COORDINATOR, SnapshotCommitClient::new); - } - - @Override - public void commitSnapshotId( - int storeId, long snapshotId, long ddlSnapshotId, List offsets) { - getClient(0).commitSnapshotId(storeId, snapshotId, ddlSnapshotId, offsets); - } -} diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreIngestService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/FrontendStoreService.java similarity index 83% rename from interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreIngestService.java rename to interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/FrontendStoreService.java index dc93dff8fd21..621ad6faccdb 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreIngestService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/FrontendStoreService.java @@ -22,11 +22,11 @@ import java.io.IOException; import java.util.Map; -public class StoreIngestService extends StoreIngestGrpc.StoreIngestImplBase { +public class FrontendStoreService extends FrontendStoreServiceGrpc.FrontendStoreServiceImplBase { private final StoreService storeService; - public StoreIngestService(StoreService storeService) { + public FrontendStoreService(StoreService storeService) { this.storeService = storeService; } @@ -111,4 +111,20 @@ public void onError(Throwable t) { } }); } + + @Override + public void getState( + GetStoreStateRequest request, StreamObserver responseObserver) { + long[] spaces = this.storeService.getDiskStatus(); + GetStoreStateResponse.Builder builder = GetStoreStateResponse.newBuilder(); + PartitionStatePb state = + PartitionStatePb.newBuilder() + .setTotalSpace(spaces[0]) + .setUsableSpace(spaces[1]) + .build(); + + builder.putPartitionStates(storeService.getStoreId(), state); + responseObserver.onNext(builder.build()); + responseObserver.onCompleted(); + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotCommitter.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotCommitter.java deleted file mode 100644 index 73918737b223..000000000000 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotCommitter.java +++ /dev/null @@ -1,20 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - *

http://www.apache.org/licenses/LICENSE-2.0 - * - *

Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.graphscope.groot.store; - -import java.util.List; - -public interface SnapshotCommitter { - void commitSnapshotId(int storeId, long snapshotId, long ddlSnapshotId, List offsets); -} diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreWriteService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreWriteService.java deleted file mode 100644 index 92afa73e3f02..000000000000 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreWriteService.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - *

http://www.apache.org/licenses/LICENSE-2.0 - * - *

Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.graphscope.groot.store; - -import com.alibaba.graphscope.groot.operation.StoreDataBatch; -import com.alibaba.graphscope.proto.groot.StoreDataBatchPb; -import com.alibaba.graphscope.proto.groot.StoreWriteGrpc; -import com.alibaba.graphscope.proto.groot.WriteStoreRequest; -import com.alibaba.graphscope.proto.groot.WriteStoreResponse; - -import io.grpc.stub.StreamObserver; - -import java.util.ArrayList; -import java.util.List; - -public class StoreWriteService extends StoreWriteGrpc.StoreWriteImplBase { - - private final WriterAgent writerAgent; - - public StoreWriteService(WriterAgent writerAgent) { - this.writerAgent = writerAgent; - } - - @Override - public void writeStore( - WriteStoreRequest request, StreamObserver responseObserver) { - List dataBatchesList = request.getDataBatchesList(); - List batches = new ArrayList<>(dataBatchesList.size()); - try { - for (StoreDataBatchPb pb : dataBatchesList) { - batches.add(StoreDataBatch.parseProto(pb)); - } - boolean success = writerAgent.writeStore2(batches); - WriteStoreResponse response = - WriteStoreResponse.newBuilder().setSuccess(success).build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); - } catch (Exception e) { - responseObserver.onError(e); - } - } -} diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java index b869914961bc..99e6d72325ac 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java @@ -22,6 +22,7 @@ import com.alibaba.graphscope.groot.metrics.MetricsAgent; import com.alibaba.graphscope.groot.metrics.MetricsCollector; import com.alibaba.graphscope.groot.operation.StoreDataBatch; +import com.alibaba.graphscope.groot.rpc.RoleClients; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +53,7 @@ public class WriterAgent implements MetricsAgent { private final int queueCount; private final StoreService storeService; private final MetaService metaService; - private final SnapshotCommitter snapshotCommitter; + private final RoleClients snapshotCommitter; private volatile boolean shouldStop = true; private SnapshotSortQueue bufferQueue; @@ -78,7 +79,7 @@ public WriterAgent( Configs configs, StoreService storeService, MetaService metaService, - SnapshotCommitter snapshotCommitter, + RoleClients snapshotCommitter, MetricsCollector metricsCollector) { this.configs = configs; this.storeId = CommonConfig.NODE_IDX.get(configs); @@ -91,15 +92,11 @@ public WriterAgent( metricsCollector.register(this, this::updateMetrics); } - /** should be called once, before start */ - public void init(long availSnapshotId) { - this.availSnapshotInfoRef.set(new SnapshotInfo(availSnapshotId, availSnapshotId)); - } - public void start() { this.lastCommitSI = -1L; this.consumeSI = 0L; this.consumeDdlSnapshotId = 0L; + this.availSnapshotInfoRef.set(new SnapshotInfo(0, 0)); this.shouldStop = false; this.bufferQueue = new SnapshotSortQueue(this.configs, this.metaService); @@ -222,8 +219,9 @@ private void asyncCommit() { List queueOffsets = new ArrayList<>(this.consumedQueueOffsets); try { // logger.info("commit SI {}, last DDL SI {}", availSnapshotId, ddlSnapshotId); - this.snapshotCommitter.commitSnapshotId( - storeId, curSI, ddlSnapshotId, queueOffsets); + this.snapshotCommitter + .getClient(0) + .commitSnapshotId(storeId, curSI, ddlSnapshotId, queueOffsets); this.lastCommitSI = curSI; } catch (Exception e) { logger.warn("commit failed. SI {}, offset {}. ignored", curSI, queueOffsets, e); diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java index 6207d4ca7918..ab023dbc38f2 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java @@ -61,17 +61,17 @@ public class Frontend extends NodeBase { private static final Logger logger = LoggerFactory.getLogger(Frontend.class); private CuratorFramework curator; - private NodeDiscovery discovery; - private ChannelManager channelManager; - private MetaService metaService; - private RpcServer rpcServer; - private RpcServer serviceServer; - private ClientService clientService; - private AbstractService graphService; + private final NodeDiscovery discovery; + private final ChannelManager channelManager; + private final MetaService metaService; + private final RpcServer rpcServer; + private final RpcServer serviceServer; + private final ClientService clientService; + private final AbstractService graphService; - private SnapshotCache snapshotCache; + private final SnapshotCache snapshotCache; - private GraphWriter graphWriter; + private final GraphWriter graphWriter; public Frontend(Configs configs) { super(configs); @@ -97,12 +97,10 @@ public Frontend(Configs configs) { new MetricsAggregator( configs, frontendMetricsCollectClients, storeMetricsCollectClients); - StoreIngestClients storeIngestClients = - new StoreIngestClients(this.channelManager, RoleType.STORE, StoreIngestClient::new); - SchemaWriter schemaWriter = - new SchemaWriter( - new RoleClients<>( - this.channelManager, RoleType.COORDINATOR, SchemaClient::new)); + RoleClients frontendStoreClients = + new RoleClients<>(this.channelManager, RoleType.STORE, FrontendStoreClient::new); + RoleClients schemaWriter = + new RoleClients<>(this.channelManager, RoleType.COORDINATOR, SchemaClient::new); BatchDdlClient batchDdlClient = new BatchDdlClient(new DdlExecutors(), snapshotCache, schemaWriter); @@ -113,7 +111,7 @@ public Frontend(Configs configs) { new ClientService( snapshotCache, metricsAggregator, - storeIngestClients, + frontendStoreClients, this.metaService, batchDdlClient); diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/NodeLauncher.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/NodeLauncher.java index 9b1447e4cf3b..479452baafa6 100755 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/NodeLauncher.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/NodeLauncher.java @@ -23,10 +23,10 @@ public class NodeLauncher { private static final Logger logger = LoggerFactory.getLogger(NodeLauncher.class); - private NodeBase node; + private final NodeBase node; - private Thread keepAliveThread; - private CountDownLatch keepAliveLatch = new CountDownLatch(1); + private final Thread keepAliveThread; + private final CountDownLatch keepAliveLatch = new CountDownLatch(1); public NodeLauncher(NodeBase node) { this.node = node; @@ -54,7 +54,7 @@ public void start() { logger.error("failed to stop node", e); } })); - Runtime.getRuntime().addShutdownHook(new Thread(() -> keepAliveLatch.countDown())); + Runtime.getRuntime().addShutdownHook(new Thread(keepAliveLatch::countDown)); try { this.node.start(); } catch (Exception e) { diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Store.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Store.java index e8dcdb269c3c..2537efd5b38b 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Store.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Store.java @@ -13,6 +13,7 @@ */ package com.alibaba.graphscope.groot.servers; +import com.alibaba.graphscope.groot.common.RoleType; import com.alibaba.graphscope.groot.common.config.Configs; import com.alibaba.graphscope.groot.common.exception.GrootException; import com.alibaba.graphscope.groot.discovery.*; @@ -22,6 +23,7 @@ import com.alibaba.graphscope.groot.metrics.MetricsCollector; import com.alibaba.graphscope.groot.rpc.ChannelManager; import com.alibaba.graphscope.groot.rpc.GrootNameResolverFactory; +import com.alibaba.graphscope.groot.rpc.RoleClients; import com.alibaba.graphscope.groot.rpc.RpcServer; import com.alibaba.graphscope.groot.servers.ir.IrServiceProducer; import com.alibaba.graphscope.groot.store.*; @@ -29,7 +31,6 @@ import com.alibaba.graphscope.groot.store.backup.StoreBackupService; import com.alibaba.graphscope.groot.wal.LogService; import com.alibaba.graphscope.groot.wal.LogServiceFactory; -import com.google.common.annotations.VisibleForTesting; import io.grpc.NameResolver; @@ -37,18 +38,18 @@ public class Store extends NodeBase { - private NodeDiscovery discovery; - private ChannelManager channelManager; - private MetaService metaService; - private StoreService storeService; - private WriterAgent writerAgent; - private BackupAgent backupAgent; - private RpcServer rpcServer; - private AbstractService executorService; + private final NodeDiscovery discovery; + private final ChannelManager channelManager; + private final MetaService metaService; + private final StoreService storeService; + private final WriterAgent writerAgent; + private final BackupAgent backupAgent; + private final RpcServer rpcServer; + private final AbstractService executorService; - private KafkaProcessor processor; + private final KafkaProcessor processor; - private PartitionService partitionService; + private final PartitionService partitionService; public Store(Configs configs) { super(configs); @@ -61,7 +62,8 @@ public Store(Configs configs) { this.metaService = new DefaultMetaService(configs); MetricsCollector metricsCollector = new MetricsCollector(configs); this.storeService = new StoreService(configs, this.metaService, metricsCollector); - SnapshotCommitter snapshotCommitter = new DefaultSnapshotCommitter(this.channelManager); + RoleClients snapshotCommitter = + new RoleClients<>(channelManager, RoleType.COORDINATOR, SnapshotCommitClient::new); MetricsCollectService metricsCollectService = new MetricsCollectService(metricsCollector); LogService logService = LogServiceFactory.makeLogService(configs); @@ -72,17 +74,15 @@ public Store(Configs configs) { this.metaService, snapshotCommitter, metricsCollector); - StoreWriteService storeWriteService = new StoreWriteService(this.writerAgent); this.backupAgent = new BackupAgent(configs, this.storeService); StoreBackupService storeBackupService = new StoreBackupService(this.backupAgent); StoreSchemaService storeSchemaService = new StoreSchemaService(this.storeService); - StoreIngestService storeIngestService = new StoreIngestService(this.storeService); + FrontendStoreService storeIngestService = new FrontendStoreService(this.storeService); StoreSnapshotService storeSnapshotService = new StoreSnapshotService(this.storeService); this.rpcServer = new RpcServer( configs, localNodeProvider, - storeWriteService, storeBackupService, storeSchemaService, storeIngestService, @@ -103,7 +103,6 @@ public void start() { } catch (IOException e) { throw new GrootException(e); } - this.writerAgent.init(0); this.writerAgent.start(); this.backupAgent.start(); try { @@ -139,9 +138,4 @@ public static void main(String[] args) throws IOException { NodeLauncher nodeLauncher = new NodeLauncher(store); nodeLauncher.start(); } - - @VisibleForTesting - public StoreService getStoreService() { - return storeService; - } } diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/FrontendQueryManager.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/FrontendQueryManager.java index c8b273aa5498..9f467a4c5a9d 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/FrontendQueryManager.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/FrontendQueryManager.java @@ -19,7 +19,8 @@ import com.alibaba.graphscope.common.manager.IrMetaQueryCallback; import com.alibaba.graphscope.common.store.IrMeta; import com.alibaba.graphscope.common.store.IrMetaFetcher; -import com.alibaba.graphscope.groot.frontend.SnapshotUpdateCommitter; +import com.alibaba.graphscope.groot.frontend.SnapshotUpdateClient; +import com.alibaba.graphscope.groot.rpc.RoleClients; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; @@ -33,13 +34,13 @@ public class FrontendQueryManager extends IrMetaQueryCallback { // manage queries private final BlockingQueue queryQueue; - private final SnapshotUpdateCommitter committer; + private final RoleClients committer; private ScheduledExecutorService updateExecutor; private long oldSnapshotId = Long.MIN_VALUE; private final int frontendId; public FrontendQueryManager( - IrMetaFetcher fetcher, int frontendId, SnapshotUpdateCommitter committer) { + IrMetaFetcher fetcher, int frontendId, RoleClients committer) { super(fetcher); this.queryQueue = new ArrayBlockingQueue<>(QUEUE_SIZE); this.committer = committer; @@ -116,7 +117,7 @@ public void run() { minSnapshotId = queryQueue.peek().snapshotId; } if (minSnapshotId > oldSnapshotId) { - committer.updateSnapshot(frontendId, minSnapshotId); + committer.getClient(0).updateSnapshot(frontendId, minSnapshotId); oldSnapshotId = minSnapshotId; } } catch (Exception e) { diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/GaiaEngine.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/GaiaEngine.java index 0e71beb03609..f4429242f033 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/GaiaEngine.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/GaiaEngine.java @@ -59,11 +59,7 @@ public GaiaEngine(Configs configs, DiscoveryFactory discoveryFactory) { @Override public void init() { Configs engineConfigs = - Configs.newBuilder(this.configs) - .put( - "worker.num", - String.valueOf(CommonConfig.STORE_NODE_COUNT.get(this.configs))) - .build(); + Configs.newBuilder(configs).put("worker.num", String.valueOf(nodeCount)).build(); byte[] configBytes = engineConfigs.toProto().toByteArray(); this.pointer = GaiaLibrary.INSTANCE.initialize(configBytes, configBytes.length); } diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/IrServiceProducer.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/IrServiceProducer.java index 40b3dacd2b9f..fc39e1d5ff69 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/IrServiceProducer.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/IrServiceProducer.java @@ -28,9 +28,10 @@ import com.alibaba.graphscope.groot.common.config.Configs; import com.alibaba.graphscope.groot.common.schema.api.SchemaFetcher; import com.alibaba.graphscope.groot.discovery.DiscoveryFactory; -import com.alibaba.graphscope.groot.frontend.SnapshotUpdateCommitter; +import com.alibaba.graphscope.groot.frontend.SnapshotUpdateClient; import com.alibaba.graphscope.groot.meta.MetaService; import com.alibaba.graphscope.groot.rpc.ChannelManager; +import com.alibaba.graphscope.groot.rpc.RoleClients; import com.alibaba.graphscope.groot.servers.AbstractService; import com.alibaba.graphscope.groot.store.StoreService; @@ -56,7 +57,8 @@ public AbstractService makeGraphService( com.alibaba.graphscope.common.config.Configs irConfigs = getConfigs(); logger.info("IR configs: {}", irConfigs); IrMetaFetcher irMetaFetcher = new GrootMetaFetcher(schemaFetcher); - SnapshotUpdateCommitter updateCommitter = new SnapshotUpdateCommitter(channelManager); + RoleClients updateCommitter = + new RoleClients<>(channelManager, RoleType.COORDINATOR, SnapshotUpdateClient::new); int frontendId = CommonConfig.NODE_IDX.get(configs); FrontendQueryManager queryManager = new FrontendQueryManager(irMetaFetcher, frontendId, updateCommitter); diff --git a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/frontend/FrontendRpcTest.java b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/frontend/FrontendRpcTest.java index 141647146587..016f825e1283 100644 --- a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/frontend/FrontendRpcTest.java +++ b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/frontend/FrontendRpcTest.java @@ -83,9 +83,9 @@ void testSchemaWriter() { SchemaClient client = mock(SchemaClient.class); when(clients.getClient(0)).thenReturn(client); - SchemaWriter schemaWriter = new SchemaWriter(clients); - schemaWriter.submitBatchDdl( - "test_req", "test_session", DdlRequestBatch.newBuilder().build()); + clients.getClient(0) + .submitBatchDdl( + "test_req", "test_session", DdlRequestBatch.newBuilder().build().toProto()); verify(client) .submitBatchDdl( "test_req", "test_session", DdlRequestBatch.newBuilder().build().toProto()); diff --git a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/store/StoreRpcTest.java b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/store/StoreRpcTest.java index b1425d7987dd..26ca24084626 100644 --- a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/store/StoreRpcTest.java +++ b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/store/StoreRpcTest.java @@ -18,7 +18,6 @@ import com.alibaba.graphscope.groot.CompletionCallback; import com.alibaba.graphscope.groot.store.*; import com.alibaba.graphscope.groot.store.StoreSchemaService; -import com.alibaba.graphscope.groot.store.StoreWriteService; import com.alibaba.graphscope.groot.store.backup.BackupAgent; import com.alibaba.graphscope.groot.store.backup.StoreBackupId; import com.alibaba.graphscope.groot.store.backup.StoreBackupService; @@ -64,17 +63,6 @@ void testStoreSchemaService() throws IOException { verify(observer).onCompleted(); } - @Test - void testStoreWriteService() throws InterruptedException { - WriterAgent writerAgent = mock(WriterAgent.class); - when(writerAgent.writeStore2(any())).thenReturn(true); - StoreWriteService storeWriteService = new StoreWriteService(writerAgent); - StreamObserver observer = mock(StreamObserver.class); - storeWriteService.writeStore(WriteStoreRequest.newBuilder().build(), observer); - verify(observer).onNext(WriteStoreResponse.newBuilder().setSuccess(true).build()); - verify(observer).onCompleted(); - } - @Test void testStoreBackupService() { BackupAgent mockBackupAgent = mock(BackupAgent.class); diff --git a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/store/WriterAgentTest.java b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/store/WriterAgentTest.java index 1242a324294c..1a78382046fa 100644 --- a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/store/WriterAgentTest.java +++ b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/store/WriterAgentTest.java @@ -21,7 +21,8 @@ import com.alibaba.graphscope.groot.meta.MetaService; import com.alibaba.graphscope.groot.metrics.MetricsCollector; import com.alibaba.graphscope.groot.operation.StoreDataBatch; -import com.alibaba.graphscope.groot.store.SnapshotCommitter; +import com.alibaba.graphscope.groot.rpc.RoleClients; +import com.alibaba.graphscope.groot.store.SnapshotCommitClient; import com.alibaba.graphscope.groot.store.StoreService; import com.alibaba.graphscope.groot.store.WriterAgent; @@ -44,7 +45,7 @@ void testWriterAgent() throws InterruptedException, ExecutionException { MetaService mockMetaService = mock(MetaService.class); when(mockMetaService.getQueueCount()).thenReturn(1); - SnapshotCommitter mockSnapshotCommitter = mock(SnapshotCommitter.class); + RoleClients mockSnapshotCommitter = mock(RoleClients.class); WriterAgent writerAgent = new WriterAgent( @@ -53,7 +54,6 @@ void testWriterAgent() throws InterruptedException, ExecutionException { mockMetaService, mockSnapshotCommitter, new MetricsCollector(configs)); - writerAgent.init(0L); writerAgent.start(); @@ -63,6 +63,7 @@ void testWriterAgent() throws InterruptedException, ExecutionException { verify(mockStoreService, timeout(5000L).times(1)).batchWrite(storeDataBatch); verify(mockSnapshotCommitter, timeout(5000L).times(1)) + .getClient(0) .commitSnapshotId(0, 1L, 0L, Collections.singletonList(10L)); writerAgent.stop(); diff --git a/proto/groot/store_ingest_service.proto b/proto/groot/frontend_store_service.proto similarity index 91% rename from proto/groot/store_ingest_service.proto rename to proto/groot/frontend_store_service.proto index c2151a7a77fc..0280a60d9b00 100644 --- a/proto/groot/store_ingest_service.proto +++ b/proto/groot/frontend_store_service.proto @@ -21,9 +21,10 @@ import "groot/sdk/model.proto"; option java_package = "com.alibaba.graphscope.proto.groot"; option java_multiple_files = true; -service StoreIngest { +service FrontendStoreService { rpc storeIngest(IngestDataRequest) returns(IngestDataResponse); rpc storeClearIngest(ClearIngestRequest) returns(ClearIngestResponse); rpc compactDB(CompactDBRequest) returns(CompactDBResponse); rpc reopenSecondary(ReopenSecondaryRequest) returns (ReopenSecondaryResponse); + rpc GetState(GetStoreStateRequest) returns (GetStoreStateResponse); } diff --git a/proto/groot/store_write_service.proto b/proto/groot/store_write_service.proto index eeff60faebbf..7586e3110325 100644 --- a/proto/groot/store_write_service.proto +++ b/proto/groot/store_write_service.proto @@ -21,10 +21,6 @@ option java_multiple_files = true; import "groot/sdk/model.proto"; -service StoreWrite { - rpc writeStore(WriteStoreRequest) returns (WriteStoreResponse); -} - message PartitionToBatchPb { map partitionToBatch = 1; } diff --git a/python/graphscope/client/connection.py b/python/graphscope/client/connection.py index d3b71fdd9c25..751540c6f9ef 100644 --- a/python/graphscope/client/connection.py +++ b/python/graphscope/client/connection.py @@ -185,6 +185,13 @@ def remote_flush(self, snapshot_id, timeout_ms=3000): ) return response.success + def get_store_state(self): + request = model_pb2.GetStoreStateRequest() + response = self._client_service_stub.getStoreState( + request, metadata=self._metadata + ) + return response.partitionStates + def replay_records(self, offset: int, timestamp: int): request = write_service_pb2.ReplayRecordsRequest() request.offset = offset