Skip to content

Commit

Permalink
fix(interactive): add store-state service back (alibaba#3664)
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 authored Mar 27, 2024
1 parent be47b23 commit 579ab9f
Show file tree
Hide file tree
Showing 25 changed files with 303 additions and 464 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class ZkConfig {
Config.intConfig("zk.connection.timeout.ms", 1000);

public static final Config<Integer> ZK_SESSION_TIMEOUT_MS =
Config.intConfig("zk.session.timeout.ms", 10000);
Config.intConfig("zk.session.timeout.ms", 30000);

public static final Config<Integer> ZK_BASE_SLEEP_MS =
Config.intConfig("zk.base.sleep.ms", 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,12 @@ public void destroyAll() throws Exception {
if (this.logService.initialized()) {
this.logService.destroy();
}
String zkRoot = ZkConfig.ZK_BASE_PATH.get(configs);
Stat stat = this.curator.checkExists().forPath(zkRoot);
if (stat != null) {
this.curator.delete().deletingChildrenIfNeeded().forPath(zkRoot);
if (this.curator != null) {
String zkRoot = ZkConfig.ZK_BASE_PATH.get(configs);
Stat stat = this.curator.checkExists().forPath(zkRoot);
if (stat != null) {
this.curator.delete().deletingChildrenIfNeeded().forPath(zkRoot);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.alibaba.graphscope.groot.SnapshotCache;
import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef;
import com.alibaba.graphscope.groot.common.util.UuidUtils;
import com.alibaba.graphscope.groot.rpc.RoleClients;
import com.alibaba.graphscope.groot.schema.ddl.DdlExecutors;
import com.alibaba.graphscope.groot.schema.request.DdlException;
import com.alibaba.graphscope.groot.schema.request.DdlRequestBatch;
Expand All @@ -12,10 +13,12 @@ public class BatchDdlClient {

private final DdlExecutors ddlExecutors;
private final SnapshotCache snapshotCache;
private final SchemaWriter schemaWriter;
private final RoleClients<SchemaClient> schemaWriter;

public BatchDdlClient(
DdlExecutors ddlExecutors, SnapshotCache snapshotCache, SchemaWriter schemaWriter) {
DdlExecutors ddlExecutors,
SnapshotCache snapshotCache,
RoleClients<SchemaClient> schemaWriter) {
this.ddlExecutors = ddlExecutors;
this.snapshotCache = snapshotCache;
this.schemaWriter = schemaWriter;
Expand All @@ -28,7 +31,8 @@ public long batchDdl(DdlRequestBatch ddlRequestBatch) {
} catch (InvalidProtocolBufferException e) {
throw new DdlException(e);
}
return this.schemaWriter.submitBatchDdl(
UuidUtils.getBase64UUIDString(), "ddl", ddlRequestBatch);
return this.schemaWriter
.getClient(0)
.submitBatchDdl(UuidUtils.getBase64UUIDString(), "ddl", ddlRequestBatch.toProto());
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@

import java.util.Map;

public class StoreIngestClient extends RpcClient {
public class FrontendStoreClient extends RpcClient {

private final StoreIngestGrpc.StoreIngestStub stub;
private final FrontendStoreServiceGrpc.FrontendStoreServiceStub stub;

public StoreIngestClient(ManagedChannel channel) {
public FrontendStoreClient(ManagedChannel channel) {
super(channel);
this.stub = StoreIngestGrpc.newStub(channel);
this.stub = FrontendStoreServiceGrpc.newStub(channel);
}

public void storeIngest(
Expand Down Expand Up @@ -110,4 +110,23 @@ public void onError(Throwable t) {
public void onCompleted() {}
});
}

public void getStoreState(CompletionCallback<GetStoreStateResponse> callback) {
this.stub.getState(
GetStoreStateRequest.newBuilder().build(),
new StreamObserver<>() {
@Override
public void onNext(GetStoreStateResponse value) {
callback.onCompleted(value);
}

@Override
public void onError(Throwable t) {
callback.onError(t);
}

@Override
public void onCompleted() {}
});
}
}

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import java.io.IOException;
import java.util.Map;

public class StoreIngestService extends StoreIngestGrpc.StoreIngestImplBase {
public class FrontendStoreService extends FrontendStoreServiceGrpc.FrontendStoreServiceImplBase {

private final StoreService storeService;

public StoreIngestService(StoreService storeService) {
public FrontendStoreService(StoreService storeService) {
this.storeService = storeService;
}

Expand Down Expand Up @@ -111,4 +111,20 @@ public void onError(Throwable t) {
}
});
}

@Override
public void getState(
GetStoreStateRequest request, StreamObserver<GetStoreStateResponse> responseObserver) {
long[] spaces = this.storeService.getDiskStatus();
GetStoreStateResponse.Builder builder = GetStoreStateResponse.newBuilder();
PartitionStatePb state =
PartitionStatePb.newBuilder()
.setTotalSpace(spaces[0])
.setUsableSpace(spaces[1])
.build();

builder.putPartitionStates(storeService.getStoreId(), state);
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit 579ab9f

Please sign in to comment.