From cc0f423943625ae57cda3d1209c6235ffb3550bc Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Sun, 20 Oct 2024 22:11:33 +0800 Subject: [PATCH] [Feature](batch write part1) Implement the basic workflow on FE side Signed-off-by: PengFei Li --- .../java/com/starrocks/common/Config.java | 14 + .../com/starrocks/http/rest/LoadAction.java | 53 +- .../starrocks/http/rest/RestBaseResult.java | 7 + .../load/batchwrite/BatchWriteId.java | 54 ++ .../load/batchwrite/BatchWriteMgr.java | 218 +++++ .../CoordinatorBackendAssigner.java | 55 ++ .../CoordinatorBackendAssignerImpl.java | 802 ++++++++++++++++++ .../load/batchwrite/IsomorphicBatchWrite.java | 268 ++++++ .../load/batchwrite/LoadExecuteCallback.java | 28 + .../load/batchwrite/LoadExecutor.java | 276 ++++++ .../RequestCoordinatorBackendResult.java | 31 + .../load/batchwrite/RequestLoadResult.java | 28 + .../starrocks/load/batchwrite/StatusOr.java | 51 ++ .../starrocks/load/batchwrite/TableId.java | 67 ++ .../load/streamload/StreamLoadHttpHeader.java | 22 +- .../load/streamload/StreamLoadInfo.java | 15 +- .../load/streamload/StreamLoadKvParams.java | 111 +-- .../load/streamload/StreamLoadTask.java | 4 +- .../starrocks/planner/StreamLoadScanNode.java | 40 +- .../com/starrocks/server/GlobalStateMgr.java | 8 + .../service/FrontendServiceImpl.java | 33 + .../java/com/starrocks/sql/LoadPlanner.java | 20 + .../com/starrocks/http/LoadActionTest.java | 130 +++ .../load/batchwrite/BatchWriteMgrTest.java | 177 ++++ .../load/batchwrite/BatchWriteTestBase.java | 108 +++ .../CoordinatorBackendAssignerTest.java | 201 +++++ .../batchwrite/IsomorphicBatchWriteTest.java | 304 +++++++ .../load/batchwrite/LoadExecutorTest.java | 358 ++++++++ .../streamload/StreamLoadKvParamsTest.java | 28 + .../planner/StreamLoadPlannerTest.java | 3 +- .../service/FrontendServiceImplTest.java | 28 + gensrc/thrift/FrontendService.thrift | 19 + gensrc/thrift/PlanNodes.thrift | 4 + 33 files changed, 3495 insertions(+), 70 deletions(-) create mode 100644 fe/fe-core/src/main/java/com/starrocks/load/batchwrite/BatchWriteId.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/load/batchwrite/BatchWriteMgr.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/load/batchwrite/CoordinatorBackendAssigner.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/load/batchwrite/CoordinatorBackendAssignerImpl.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/load/batchwrite/IsomorphicBatchWrite.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/load/batchwrite/LoadExecuteCallback.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/load/batchwrite/LoadExecutor.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/load/batchwrite/RequestCoordinatorBackendResult.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/load/batchwrite/RequestLoadResult.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/load/batchwrite/StatusOr.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/load/batchwrite/TableId.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/http/LoadActionTest.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/load/batchwrite/BatchWriteMgrTest.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/load/batchwrite/BatchWriteTestBase.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/load/batchwrite/CoordinatorBackendAssignerTest.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/load/batchwrite/IsomorphicBatchWriteTest.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/load/batchwrite/LoadExecutorTest.java diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index ec2c116b733a8..d1d1d1aae4ffd 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -3268,4 +3268,18 @@ public class Config extends ConfigBase { @ConfField(mutable = false) public static int lake_remove_table_thread_num = 4; + + @ConfField(mutable = true) + public static int batch_write_gc_check_interval_ms = 60000; + + @ConfField(mutable = true) + public static int batch_write_idle_ms = 3600000; + + @ConfField(mutable = true) + public static int batch_write_be_assigner_schedule_interval_ms = 5000; + + @ConfField(mutable = true) + public static double batch_write_balance_diff_ratio = 0.1; + + public static int batch_write_load_executor_threads_num = 4096; } diff --git a/fe/fe-core/src/main/java/com/starrocks/http/rest/LoadAction.java b/fe/fe-core/src/main/java/com/starrocks/http/rest/LoadAction.java index 817bc2ed57484..c751623e92815 100644 --- a/fe/fe-core/src/main/java/com/starrocks/http/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/com/starrocks/http/rest/LoadAction.java @@ -40,6 +40,10 @@ import com.starrocks.http.BaseRequest; import com.starrocks.http.BaseResponse; import com.starrocks.http.IllegalArgException; +import com.starrocks.load.batchwrite.RequestCoordinatorBackendResult; +import com.starrocks.load.batchwrite.TableId; +import com.starrocks.load.streamload.StreamLoadHttpHeader; +import com.starrocks.load.streamload.StreamLoadKvParams; import com.starrocks.privilege.AccessDeniedException; import com.starrocks.privilege.PrivilegeType; import com.starrocks.qe.ConnectContext; @@ -59,6 +63,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; public class LoadAction extends RestBaseAction { private static final Logger LOG = LogManager.getLogger(LoadAction.class); @@ -95,6 +100,12 @@ public void executeWithoutPasswordInternal(BaseRequest request, BaseResponse res throw new DdlException("There is no 100-continue header"); } + boolean enableBatchWrite = "true".equalsIgnoreCase( + request.getRequest().headers().get(StreamLoadHttpHeader.HTTP_ENABLE_BATCH_WRITE)); + if (enableBatchWrite && redirectToLeader(request, response)) { + return; + } + String dbName = request.getSingleParameter(DB_KEY); if (Strings.isNullOrEmpty(dbName)) { throw new DdlException("No database selected."); @@ -105,11 +116,20 @@ public void executeWithoutPasswordInternal(BaseRequest request, BaseResponse res throw new DdlException("No table selected."); } - String label = request.getRequest().headers().get(LABEL_KEY); - Authorizer.checkTableAction(ConnectContext.get().getCurrentUserIdentity(), ConnectContext.get().getCurrentRoleIds(), dbName, tableName, PrivilegeType.INSERT); + if (!enableBatchWrite) { + processNormalStreamLoad(request, response, dbName, tableName); + } else { + processBatchWriteStreamLoad(request, response, dbName, tableName); + } + } + + private void processNormalStreamLoad( + BaseRequest request, BaseResponse response, String dbName, String tableName) throws DdlException { + String label = request.getRequest().headers().get(LABEL_KEY); + String warehouseName = WarehouseManager.DEFAULT_WAREHOUSE_NAME; if (request.getRequest().headers().contains(WAREHOUSE_KEY)) { warehouseName = request.getRequest().headers().get(WAREHOUSE_KEY); @@ -147,5 +167,34 @@ public void executeWithoutPasswordInternal(BaseRequest request, BaseResponse res redirectAddr.toString(), dbName, tableName, label, warehouseName); redirectTo(request, response, redirectAddr); } + + private void processBatchWriteStreamLoad( + BaseRequest request, BaseResponse response, String dbName, String tableName) throws DdlException { + TableId tableId = new TableId(dbName, tableName); + StreamLoadKvParams params = StreamLoadKvParams.fromHttpHeaders(request.getRequest().headers()); + RequestCoordinatorBackendResult result = GlobalStateMgr.getCurrentState() + .getBatchWriteMgr().requestCoordinatorBackends(tableId, params); + if (!result.isOk()) { + BatchWriteResponseResult responseResult = new BatchWriteResponseResult( + result.getStatus().status_code.name(), ActionStatus.FAILED, + result.getStatus().error_msgs.get(0)); + sendResult(request, response, responseResult); + return; + } + + List nodes = result.getValue(); + int index = ThreadLocalRandom.current().nextInt(nodes.size()); + ComputeNode node = nodes.get(index); + TNetworkAddress redirectAddr = new TNetworkAddress(node.getHost(), node.getHttpPort()); + LOG.info("redirect batch write to destination={}, db: {}, tbl: {}", redirectAddr.toString(), dbName, tableName); + redirectTo(request, response, redirectAddr); + } + + public static class BatchWriteResponseResult extends RestBaseResult { + + public BatchWriteResponseResult(String code, ActionStatus status, String msg) { + super(code, status, msg); + } + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/http/rest/RestBaseResult.java b/fe/fe-core/src/main/java/com/starrocks/http/rest/RestBaseResult.java index 70085152db5ca..e86a12e6ef3f2 100644 --- a/fe/fe-core/src/main/java/com/starrocks/http/rest/RestBaseResult.java +++ b/fe/fe-core/src/main/java/com/starrocks/http/rest/RestBaseResult.java @@ -84,6 +84,13 @@ public RestBaseResult(String msg) { this.message = msg; } + public RestBaseResult(String code, ActionStatus status, String msg) { + this.code = code; + this.status = status; + this.msg = msg; + this.message = msg; + } + public static RestBaseResult getOk() { return OK; } diff --git a/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/BatchWriteId.java b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/BatchWriteId.java new file mode 100644 index 0000000000000..af065a40843b6 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/BatchWriteId.java @@ -0,0 +1,54 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.load.batchwrite; + +import com.starrocks.load.streamload.StreamLoadKvParams; + +import java.util.Objects; + +/** + * Represents an identifier for a batch write. + */ +public class BatchWriteId { + + /** The ID of the table associated with the batch write. */ + private final TableId tableId; + + /** The parameters for the stream load associated with the batch write. */ + private final StreamLoadKvParams params; + + + public BatchWriteId(TableId tableId, StreamLoadKvParams params) { + this.tableId = tableId; + this.params = params; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BatchWriteId that = (BatchWriteId) o; + return Objects.equals(tableId, that.tableId) && Objects.equals(params, that.params); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, params); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/BatchWriteMgr.java b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/BatchWriteMgr.java new file mode 100644 index 0000000000000..58176c13cec9c --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/BatchWriteMgr.java @@ -0,0 +1,218 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.load.batchwrite; + +import com.starrocks.common.Config; +import com.starrocks.common.Pair; +import com.starrocks.common.ThreadPoolManager; +import com.starrocks.common.util.FrontendDaemon; +import com.starrocks.load.streamload.StreamLoadInfo; +import com.starrocks.load.streamload.StreamLoadKvParams; +import com.starrocks.qe.ConnectContext; +import com.starrocks.thrift.TStatus; +import com.starrocks.thrift.TStatusCode; +import org.apache.arrow.util.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static com.starrocks.server.WarehouseManager.DEFAULT_WAREHOUSE_NAME; + +/** + * Manages batch write operations. + */ +public class BatchWriteMgr extends FrontendDaemon { + + private static final Logger LOG = LoggerFactory.getLogger(BatchWriteMgr.class); + + // An atomic counter used to generate unique ids for isomorphic batch writes. + private final AtomicLong idGenerator; + + // A read-write lock to ensure thread-safe access to the loadMap. + private final ReentrantReadWriteLock lock; + + // A concurrent map that stores IsomorphicBatchWrite instances, keyed by BatchWriteId. + private final ConcurrentHashMap isomorphicBatchWriteMap; + + // An assigner that manages the assignment of coordinator backends. + private final CoordinatorBackendAssigner coordinatorBackendAssigner; + + // A thread pool executor for executing batch write tasks. + private final ThreadPoolExecutor threadPoolExecutor; + + public BatchWriteMgr() { + super("group-commit-mgr", Config.batch_write_gc_check_interval_ms); + this.idGenerator = new AtomicLong(0L); + this.isomorphicBatchWriteMap = new ConcurrentHashMap<>(); + this.lock = new ReentrantReadWriteLock(); + this.coordinatorBackendAssigner = new CoordinatorBackendAssignerImpl(); + this.threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool( + Config.batch_write_load_executor_threads_num, "group-commit-load", true); + } + + @Override + public void start() { + super.start(); + this.coordinatorBackendAssigner.start(); + LOG.info("Start batch write manager"); + } + + @Override + protected void runAfterCatalogReady() { + setInterval(Config.batch_write_gc_check_interval_ms); + cleanupInactiveBatchWrite(); + } + + /** + * Requests coordinator backends for the specified table and load parameters. + * + * @param tableId The ID of the table for which the coordinator backends are requested. + * @param params The parameters for the stream load. + * @return A RequestCoordinatorBackendResult containing the status of the operation and the coordinator backends. + */ + public RequestCoordinatorBackendResult requestCoordinatorBackends(TableId tableId, StreamLoadKvParams params) { + lock.readLock().lock(); + try { + Pair result = getOrCreateTableBatchWrite(tableId, params); + if (result.first.getStatus_code() != TStatusCode.OK) { + return new RequestCoordinatorBackendResult(result.first, null); + } + return result.second.requestCoordinatorBackends(); + } finally { + lock.readLock().unlock(); + } + } + + /** + * Requests a load operation for the specified table and load parameters. + * + * @param tableId The ID of the table for which the load is requested. + * @param params The parameters for the stream load. + * @param backendId The id of the backend where the request is from. + * @param backendHost The host of the backend where the request is from. + * @return A RequestLoadResult containing the status of the operation and the load result. + */ + public RequestLoadResult requestLoad( + TableId tableId, StreamLoadKvParams params, long backendId, String backendHost) { + lock.readLock().lock(); + try { + Pair result = getOrCreateTableBatchWrite(tableId, params); + if (result.first.getStatus_code() != TStatusCode.OK) { + return new RequestLoadResult(result.first, null); + } + return result.second.requestLoad(backendId, backendHost); + } finally { + lock.readLock().unlock(); + } + } + + /** + * Cleans up inactive batch writes to release resources. + */ + @VisibleForTesting + void cleanupInactiveBatchWrite() { + lock.writeLock().lock(); + try { + List> loads = isomorphicBatchWriteMap.entrySet().stream() + .filter(entry -> !entry.getValue().isActive()) + .collect(Collectors.toList()); + for (Map.Entry entry : loads) { + isomorphicBatchWriteMap.remove(entry.getKey()); + coordinatorBackendAssigner.unregisterBatchWrite(entry.getValue().getId()); + } + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Retrieves or creates an IsomorphicBatchWrite instance for the specified table and parameters. + * + * @param tableId The ID of the table for which the batch write is requested. + * @param params The parameters for the stream load. + * @return A Pair containing the status of the operation and the IsomorphicBatchWrite instance. + */ + private Pair getOrCreateTableBatchWrite(TableId tableId, StreamLoadKvParams params) { + BatchWriteId uniqueId = new BatchWriteId(tableId, params); + IsomorphicBatchWrite load = isomorphicBatchWriteMap.get(uniqueId); + if (load != null) { + return new Pair<>(new TStatus(TStatusCode.OK), load); + } + + String warehouseName = params.getWarehouse().orElse(DEFAULT_WAREHOUSE_NAME); + StreamLoadInfo streamLoadInfo; + try { + streamLoadInfo = StreamLoadInfo.fromHttpStreamLoadRequest(null, -1, Optional.empty(), params); + } catch (Exception e) { + TStatus status = new TStatus(); + status.setStatus_code(TStatusCode.INVALID_ARGUMENT); + status.setError_msgs(Collections.singletonList( + String.format("Failed to build stream load info, error: %s", e.getMessage()))); + return new Pair<>(status, null); + } + + Integer batchWriteIntervalMs = params.getBatchWriteIntervalMs().orElse(null); + if (batchWriteIntervalMs == null || batchWriteIntervalMs <= 0) { + TStatus status = new TStatus(); + status.setStatus_code(TStatusCode.INVALID_ARGUMENT); + status.setError_msgs(Collections.singletonList( + "Batch write interval must be set positive, but is " + batchWriteIntervalMs)); + return new Pair<>(status, null); + } + + Integer batchWriteParallel = params.getBatchWriteParallel().orElse(null); + if (batchWriteParallel == null || batchWriteParallel <= 0) { + TStatus status = new TStatus(); + status.setStatus_code(TStatusCode.INVALID_ARGUMENT); + status.setError_msgs(Collections.singletonList( + "Bathc load parallel must be set positive, but is " + batchWriteParallel)); + return new Pair<>(status, null); + } + + load = isomorphicBatchWriteMap.computeIfAbsent(uniqueId, uid -> { + long id = idGenerator.getAndIncrement(); + IsomorphicBatchWrite newLoad = new IsomorphicBatchWrite( + id, tableId, warehouseName, streamLoadInfo, batchWriteIntervalMs, batchWriteParallel, + params.toMap(), new ConnectContext(), coordinatorBackendAssigner, threadPoolExecutor); + coordinatorBackendAssigner.registerBatchWrite(id, newLoad.getWarehouse(), tableId, newLoad.getBatchWriteParallel()); + return newLoad; + }); + + return new Pair<>(new TStatus(TStatusCode.OK), load); + } + + /** + * Returns the number of batch writes currently managed. + * + * @return The number of batch writes. + */ + public int numBatchWrites() { + return isomorphicBatchWriteMap.size(); + } + + @VisibleForTesting + Map getIsomorphicBatchWriteMap() { + return isomorphicBatchWriteMap; + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/CoordinatorBackendAssigner.java b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/CoordinatorBackendAssigner.java new file mode 100644 index 0000000000000..87345e08db236 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/CoordinatorBackendAssigner.java @@ -0,0 +1,55 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.load.batchwrite; + +import com.starrocks.system.ComputeNode; + +import java.util.List; + +/** + * Interface for assigning coordinator backends to loads. + */ +public interface CoordinatorBackendAssigner { + + /** + * Starts the backend assigner. + */ + void start(); + + /** + * Registers a batch write with the specified parameters. + * + * @param id The ID of the batch write operation. + * @param warehouseName The name of the warehouse. + * @param tableId The identifier of the table. + * @param expectParallel The expected parallelism. + */ + void registerBatchWrite(long id, String warehouseName, TableId tableId, int expectParallel); + + /** + * Unregisters a batch write. + * + * @param id The ID of the batch write to unregister. + */ + void unregisterBatchWrite(long id); + + /** + * Retrieves the list of compute nodes assigned to the batch write. + * + * @param id The ID of the batch write. + * @return A list of compute nodes assigned to the batch write. + */ + List getBackends(long id); +} diff --git a/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/CoordinatorBackendAssignerImpl.java b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/CoordinatorBackendAssignerImpl.java new file mode 100644 index 0000000000000..d9a8c3cca2740 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/CoordinatorBackendAssignerImpl.java @@ -0,0 +1,802 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.load.batchwrite; + +import com.starrocks.common.Config; +import com.starrocks.common.ThreadPoolManager; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.RunMode; +import com.starrocks.system.ComputeNode; +import org.apache.arrow.util.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Collectors; + +// Need balance when +// 1. nodes changed +// 2. unregister load +// there is no need to balance for register load because we allocate node from the least workload +public class CoordinatorBackendAssignerImpl implements CoordinatorBackendAssigner { + + private static final Logger LOG = LoggerFactory.getLogger(CoordinatorBackendAssignerImpl.class); + + private static final int MIN_CHECK_INTERVAL_MS = 1000; + + // load id -> LoadMeta + private final ConcurrentHashMap registeredLoadMetas; + private final AtomicLong scheduleTaskIdAllocator; + private final PriorityBlockingQueue scheduleTaskQueue; + private final ExecutorService executorService; + + private final AtomicLong pendingDetectUnavailableNodesTask; + private final AtomicLong numScheduledTasks; + private final AtomicBoolean isPeriodicalScheduleRunning; + + // warehouse -> scheduling meta of this warehouse. + // It's only ead/write in the single schedule thread + private final Map warehouseScheduleMetas; + private final Map loadWaitForGc; + + public CoordinatorBackendAssignerImpl() { + this.registeredLoadMetas = new ConcurrentHashMap<>(); + this.scheduleTaskIdAllocator = new AtomicLong(0); + this.scheduleTaskQueue = new PriorityBlockingQueue<>(32, ScheduleTaskComparator.INSTANCE); + this.executorService = ThreadPoolManager.newDaemonCacheThreadPool( + 1, "coordinator-be-assigner-scheduler", true); + this.pendingDetectUnavailableNodesTask = new AtomicLong(0); + this.numScheduledTasks = new AtomicLong(0); + this.isPeriodicalScheduleRunning = new AtomicBoolean(false); + this.warehouseScheduleMetas = new HashMap<>(); + this.loadWaitForGc = new ConcurrentHashMap<>(); + } + + @Override + public void start() { + this.executorService.submit(this::runSchedule); + LOG.info("Start coordinator be assigner"); + } + + private void runSchedule() { + while (true) { + ScheduleTask task; + try { + int checkIntervalMs; + if (warehouseScheduleMetas.isEmpty()) { + // There is no need to schedule periodically if there is no load, and + // it will be woken up by a REGISTER_LOAD task when receiving a new load + checkIntervalMs = Integer.MAX_VALUE; + LOG.info("Disable periodical schedule because there is no load"); + } else { + checkIntervalMs = Math.max(MIN_CHECK_INTERVAL_MS, Config.batch_write_be_assigner_schedule_interval_ms); + LOG.debug("Set schedule interval to {} ms", checkIntervalMs); + } + task = scheduleTaskQueue.poll(checkIntervalMs, TimeUnit.MILLISECONDS); + } catch (Throwable throwable) { + LOG.warn("Failed to poll schedule task queue", throwable); + continue; + } + + if (task == null) { + long startTime = System.currentTimeMillis(); + if (!isPeriodicalScheduleRunning.compareAndSet(false, true)) { + continue; + } + try { + Iterator> iterator = loadWaitForGc.entrySet().iterator(); + while (iterator.hasNext()) { + runScheduleOnRegisterLoad(iterator.next().getValue()); + iterator.remove(); + } + periodicalCheckNodeChangedAndBalance(); + LOG.debug("Success to execute periodical schedule, cost: {} ms", + System.currentTimeMillis() - startTime); + } catch (Throwable throwable) { + LOG.error("Failed to execute periodical schedule, cost: {} ms", + System.currentTimeMillis() - startTime, throwable); + } finally { + isPeriodicalScheduleRunning.compareAndSet(true, false); + } + continue; + } + + long startTime = System.currentTimeMillis(); + LOG.info("Start to execute task, type: {}, task id: {}", task.getScheduleType(), task.getTaskId()); + try { + task.getScheduleRunnable().run(); + LOG.info("Success to execute task, type: {}, task id: {}, cost: {} ms", + task.getScheduleType(), task.getTaskId(), System.currentTimeMillis() - startTime); + } catch (Throwable throwable) { + LOG.error("Failed to execute task, type: {}, task id: {}, cost: {} ms", + task.getScheduleType(), task.getTaskId(), System.currentTimeMillis() - startTime, throwable); + } finally { + task.finish(); + if (task.getScheduleType() == ScheduleType.DETECT_UNAVAILABLE_NODES) { + pendingDetectUnavailableNodesTask.decrementAndGet(); + } + numScheduledTasks.incrementAndGet(); + } + } + } + + @Override + public void registerBatchWrite(long id, String warehouseName, TableId tableId, int expectParallel) { + LoadMeta loadMeta = registeredLoadMetas.computeIfAbsent( + id, k -> new LoadMeta(id, warehouseName, tableId, expectParallel)); + LOG.info("Register load, load id: {}, warehouse: {}, {}, parallel: {}", + id, warehouseName, tableId, expectParallel); + try { + syncScheduleOnRegisterLoad(loadMeta); + LOG.info("Register load finish schedule, load id: {}", id); + } catch (Exception e) { + LOG.warn("Failed to wait schedule result when registering load, load id: {}", id, e); + } + + } + + @Override + public void unregisterBatchWrite(long id) { + LoadMeta meta = registeredLoadMetas.remove(id); + if (meta != null) { + asyncScheduleOnUnregisterLoad(meta); + LOG.info("Unregister load, load id: {}, warehouse: {}, {}", id, meta.warehouseName, meta.tableId); + } + } + + @Override + public List getBackends(long id) { + LoadMeta meta = registeredLoadMetas.get(id); + if (meta == null) { + return null; + } + List nodes = meta.nodes; + if (nodes == null) { + return null; + } + + // lazily initialized until find unavailable nodes + List availableNodes = null; + List unavailableNodes = null; + // check available + for (int i = 0; i < nodes.size(); i++) { + ComputeNode node = nodes.get(i); + if (!node.isAvailable()) { + // initialize availableNodes when find the first unavailable node + if (availableNodes == null) { + availableNodes = new ArrayList<>(nodes.subList(0, i)); + unavailableNodes = new ArrayList<>(); + } + unavailableNodes.add(node); + } else { + if (availableNodes != null) { + availableNodes.add(node); + } + } + } + if (unavailableNodes != null) { + asyncScheduleOnDetectUnavailableNodes(meta, unavailableNodes); + } + + return Collections.unmodifiableList(availableNodes == null ? nodes : availableNodes); + } + + private void syncScheduleOnRegisterLoad(LoadMeta loadMeta) throws Exception { + ScheduleTask task = new ScheduleTask( + scheduleTaskIdAllocator.incrementAndGet(), + ScheduleType.REGISTER_LOAD, + () -> this.runScheduleOnRegisterLoad(loadMeta)); + boolean ret = scheduleTaskQueue.add(task); + if (!ret) { + LOG.error("Failed to submit schedule task {}, load id: {}, queue size: {}", + ScheduleType.REGISTER_LOAD, loadMeta.loadId, scheduleTaskQueue.size()); + throw new RejectedExecutionException( + String.format("Failed to submit schedule task %s, load id: %s", ScheduleType.REGISTER_LOAD, loadMeta.loadId)); + } + LOG.info("Submit schedule task {}, load id: {}, task id: {}", + ScheduleType.REGISTER_LOAD, loadMeta.loadId, task.getTaskId()); + + // TODO add timeout + task.wailFinish(); + } + + private void runScheduleOnRegisterLoad(LoadMeta loadMeta) { + String warehouseName = loadMeta.warehouseName; + WarehouseScheduleMeta warehouseMeta = + warehouseScheduleMetas.computeIfAbsent(warehouseName, WarehouseScheduleMeta::new); + + Map loadMetas = warehouseMeta.schedulingLoadMetas; + long loadId = loadMeta.loadId; + if (loadMetas.putIfAbsent(loadId, loadMeta) != null) { + LOG.error("Register duplicate load id: {}, warehouse: {}, expect parallel: {}, nodes: {}", + loadMeta.loadId, loadMeta.warehouseName, loadMeta.expectParallel, loadMeta.nodes); + return; + } + + NavigableSet nodeMetas = warehouseMeta.sortedNodeMetaSet; + if (nodeMetas.isEmpty()) { + List nodes = getAvailableNodes(warehouseName); + nodes.forEach(node -> nodeMetas.add(new NodeMeta(node))); + } + + int needNodeNum = Math.min(nodeMetas.size(), loadMeta.expectParallel); + List changedNodeMetas = new ArrayList<>(needNodeNum); + List selectedNodes = new ArrayList<>(needNodeNum); + for (int i = 0; i < needNodeNum; i++) { + NodeMeta nodeMeta = nodeMetas.pollFirst(); + if (nodeMeta == null) { + // should not happen, just log an error message + LOG.error("There is no enough node for load id: {}, num nodes: {}, need nodes: {}, selected nodes: {}", + loadId, changedNodeMetas.size(), needNodeNum, selectedNodes); + break; + } + if (nodeMeta.loadIds.contains(loadId)) { + // should not happen, just log an error message + LOG.error("Node already contains load, node: {}, load id: {}", nodeMeta.node, loadId); + } else { + nodeMeta.loadIds.add(loadId); + selectedNodes.add(nodeMeta.node); + } + changedNodeMetas.add(nodeMeta); + } + nodeMetas.addAll(changedNodeMetas); + loadMeta.nodes = selectedNodes; + LOG.info("Allocate nodes for load id: {}, warehouse: {}, {}, expect parallel: {}, actual parallel: {}, " + + "selected nodes: {}", loadMeta.loadId, loadMeta.warehouseName, loadMeta.tableId, + loadMeta.expectParallel, needNodeNum, selectedNodes); + logStat(warehouseMeta); + } + + private void asyncScheduleOnUnregisterLoad(LoadMeta loadMeta) { + ScheduleTask task = new ScheduleTask( + scheduleTaskIdAllocator.incrementAndGet(), + ScheduleType.UNREGISTER_LOAD, + () -> this.runScheduleOnUnregisterLoad(loadMeta)); + boolean ret = scheduleTaskQueue.add(task); + if (!ret) { + loadWaitForGc.put(loadMeta.loadId, loadMeta); + LOG.error("Failed to submit schedule task {}, load id: {}, queue size: {}", + ScheduleType.UNREGISTER_LOAD, loadMeta.loadId, scheduleTaskQueue.size()); + } else { + LOG.info("Submit schedule task {}, load id: {}, task id: {}", + ScheduleType.UNREGISTER_LOAD, loadMeta.loadId, task.getTaskId()); + } + } + + private void runScheduleOnUnregisterLoad(LoadMeta loadMeta) { + String warehouseName = loadMeta.warehouseName; + WarehouseScheduleMeta warehouseMeta = warehouseScheduleMetas.get(warehouseName); + if (warehouseMeta == null) { + LOG.error("Unregister a load from a non-exist warehouse, load id: {}, warehouse: {}, expect parallel: {}," + + " nodes: {}", loadMeta.loadId, loadMeta.warehouseName, loadMeta.expectParallel, loadMeta.nodes); + return; + } + + Map loadMetas = warehouseMeta.schedulingLoadMetas; + long loadId = loadMeta.loadId; + if (loadMetas == null || !loadMetas.containsKey(loadId)) { + LOG.error("Unregister a non-scheduling load, load id: {}, warehouse: {}, expect parallel: {}, nodes: {}", + loadMeta.loadId, loadMeta.warehouseName, loadMeta.expectParallel, loadMeta.nodes); + return; + } + + NavigableSet nodeMetas = warehouseMeta.sortedNodeMetaSet; + List changedNodeMetas = new ArrayList<>(); + Iterator iterator = nodeMetas.iterator(); + // TODO make it more efficient without loop all nodes + while (iterator.hasNext()) { + NodeMeta nodeMeta = iterator.next(); + if (nodeMeta.loadIds.remove(loadId)) { + changedNodeMetas.add(nodeMeta); + iterator.remove(); + } + } + nodeMetas.addAll(changedNodeMetas); + loadMetas.remove(loadId); + warehouseMeta.needBalance.set(true); + LOG.info("Deallocate nodes for load id: {}, warehouse: {}, {}, nodes: {}", + loadMeta.loadId, loadMeta.warehouseName, loadMeta.tableId, loadMeta.nodes); + logStat(warehouseMeta); + } + + private void asyncScheduleOnDetectUnavailableNodes(LoadMeta loadMeta, List unavailableNodes) { + // remove duplicated task, allowing one is being executed, and one is in the queue + if (pendingDetectUnavailableNodesTask.incrementAndGet() > 3) { + pendingDetectUnavailableNodesTask.decrementAndGet(); + return; + } + + ScheduleTask task = new ScheduleTask( + scheduleTaskIdAllocator.incrementAndGet(), + ScheduleType.DETECT_UNAVAILABLE_NODES, + () -> this.runScheduleOnDetectUnavailableNodes(loadMeta)); + boolean ret = scheduleTaskQueue.add(task); + if (!ret) { + pendingDetectUnavailableNodesTask.decrementAndGet(); + LOG.error("Failed to submit schedule task {}, load id {}, unavailable node ids: {}, queue size: {}", + ScheduleType.DETECT_UNAVAILABLE_NODES, loadMeta.loadId, unavailableNodes, scheduleTaskQueue.size()); + } else { + LOG.info("Submit schedule task {}, load id: {}, task id: {}, pending task num: {}", + ScheduleType.DETECT_UNAVAILABLE_NODES, loadMeta.loadId, task.getTaskId(), + pendingDetectUnavailableNodesTask.get()); + } + } + + private void runScheduleOnDetectUnavailableNodes(LoadMeta loadMeta) { + WarehouseScheduleMeta warehouseMeta = warehouseScheduleMetas.get(loadMeta.warehouseName); + if (warehouseMeta == null) { + return; + } + runScheduleIfNodeChanged(warehouseMeta); + } + + @VisibleForTesting + void periodicalCheckNodeChangedAndBalance() { + List warehouseNames = new ArrayList<>(warehouseScheduleMetas.keySet()); + for (String name : warehouseNames) { + WarehouseScheduleMeta warehouseMeta = warehouseScheduleMetas.get(name); + if (warehouseMeta.schedulingLoadMetas.isEmpty()) { + warehouseScheduleMetas.remove(warehouseMeta.warehouseName); + LOG.info("Remove warehouse {} because there is no scheduling load", warehouseMeta.warehouseName); + } else { + runScheduleIfNodeChanged(warehouseMeta); + doBalanceIfNeeded(warehouseMeta, Config.batch_write_balance_diff_ratio); + if (LOG.isDebugEnabled()) { + logStat(warehouseMeta); + } + } + } + } + + private void runScheduleIfNodeChanged(WarehouseScheduleMeta warehouseMeta) { + // 1. find the newest unavailable/available nodes + Map systemAvailableNodes = + getAvailableNodes(warehouseMeta.warehouseName).stream() + .collect(Collectors.toMap(ComputeNode::getId, Function.identity())); + Map unavailableNodeMetas = new HashMap<>(); + Map availableNodeMetas = new HashMap<>(); + for (NodeMeta nodeMeta : warehouseMeta.sortedNodeMetaSet) { + ComputeNode node = nodeMeta.node; + if (systemAvailableNodes.containsKey(node.getId())) { + availableNodeMetas.put(node.getId(), nodeMeta); + } else { + unavailableNodeMetas.put(node.getId(), nodeMeta); + } + } + boolean nodesChanged = !unavailableNodeMetas.isEmpty() || availableNodeMetas.size() < systemAvailableNodes.size(); + if (!nodesChanged) { + LOG.debug("There is no node change for warehouse {}, num nodes: {}", + warehouseMeta.warehouseName, systemAvailableNodes.size()); + return; + } + List newAvailableNodes = new ArrayList<>(); + // add those new available nodes to the map + for (ComputeNode node : systemAvailableNodes.values()) { + if (!availableNodeMetas.containsKey(node.getId())) { + availableNodeMetas.put(node.getId(), new NodeMeta(node)); + newAvailableNodes.add(node); + } + } + + LOG.info("Trigger schedule for node change, warehouse: {}, num new nodes: {}, num unavailable nodes: {}," + + " to add nodes: {}, to remove nodes: {}", + warehouseMeta.warehouseName, newAvailableNodes.size(), unavailableNodeMetas.size(), newAvailableNodes, + unavailableNodeMetas.values().stream().map(meta -> meta.node).collect(Collectors.toList())); + + // 2. Reallocate nodes to each load if + // 1. the old node is unavailable + // 2. there is new nodes to increase the parallel to the expected + // For each load + // a. calculate the new parallel as min(expectParallel, #availableNodes) + // b. remove the old unavailable nodes + // c. if the current number of allocated nodes not meets the new parallel, + // iterate from the node with the least number of loads, and choose + // those not containing this load id + // The iterating order can allocate loads to the node with the least loads as + // much as possible, but the result can be not balanced, and we will check + // balance in balanceIfNeeded() + long startTime = System.currentTimeMillis(); + NavigableSet sortedNodeMetaSet = warehouseMeta.sortedNodeMetaSet; + sortedNodeMetaSet.clear(); + sortedNodeMetaSet.addAll(availableNodeMetas.values()); + // load id -> new node id set + Map> changedLoadIds = new HashMap<>(); + for (LoadMeta loadMeta : warehouseMeta.schedulingLoadMetas.values()) { + long loadId = loadMeta.loadId; + int newParallel = Math.min(loadMeta.expectParallel, availableNodeMetas.size()); + Set allocatedNodeIds = new HashSet<>(); + boolean removeUnavailableNodes = false; + for (ComputeNode node : loadMeta.nodes) { + if (availableNodeMetas.containsKey(node.getId())) { + allocatedNodeIds.add(node.getId()); + } else { + removeUnavailableNodes = true; + } + } + List changedNodeMetas = new ArrayList<>(newParallel - allocatedNodeIds.size()); + Iterator iterator = sortedNodeMetaSet.iterator(); + while (iterator.hasNext() && allocatedNodeIds.size() < newParallel) { + NodeMeta nodeMeta = iterator.next(); + long nodeId = nodeMeta.node.getId(); + if (allocatedNodeIds.add(nodeId)) { + nodeMeta.loadIds.add(loadId); + changedNodeMetas.add(nodeMeta); + iterator.remove(); + } + } + sortedNodeMetaSet.addAll(changedNodeMetas); + if (removeUnavailableNodes || !changedNodeMetas.isEmpty()) { + changedLoadIds.put(loadId, allocatedNodeIds); + } + } + + // 3. update load metas + for (Map.Entry> entry : changedLoadIds.entrySet()) { + long loadId = entry.getKey(); + Set newNodeIds = entry.getValue(); + LoadMeta loadMeta = warehouseMeta.schedulingLoadMetas.get(loadId); + List newNodes = new ArrayList<>(newNodeIds.size()); + newNodeIds.forEach(id -> newNodes.add(availableNodeMetas.get(id).node)); + List oldNodes = loadMeta.nodes; + loadMeta.nodes = newNodes; + LOG.info("Node change trigger reassign, load id: {}, warehouse: {}, {}, num old nodes: {}, " + + "num new nodes: {}, old nodes: {}, new nodes: {}", + loadMeta.loadId, loadMeta.warehouseName, loadMeta.tableId, + oldNodes.size(), newNodes.size(), oldNodes, newNodes); + } + warehouseMeta.needBalance.set(true); + LOG.info("Finish to schedule triggered by node change, warehouse: {}, num changed loads: {}, cost: {} ms", + warehouseMeta.warehouseName, changedLoadIds.size(), System.currentTimeMillis() - startTime); + logStat(warehouseMeta); + } + + private void doBalanceIfNeeded(WarehouseScheduleMeta warehouseMeta, double expectDiffRatio) { + if (!warehouseMeta.needBalance.compareAndSet(true, false)) { + return; + } + + double loadDiffRatio = calculateLoadDiffRatio(warehouseMeta.sortedNodeMetaSet); + if (loadDiffRatio <= expectDiffRatio) { + return; + } + + LOG.info("Start to balance warehouse {}, expect load diff ratio: {}, current load diff ratio: {}", + warehouseMeta.warehouseName, expectDiffRatio, loadDiffRatio); + // TODO limit the number of loads in one balance cycle + long startTime = System.currentTimeMillis(); + NavigableSet unfinishedNodeMetas = new TreeSet<>(warehouseMeta.sortedNodeMetaSet); + NavigableSet finishedNodeMetas = new TreeSet<>(NodeMetaComparator.INSTANCE); + int unfinishedParalell = unfinishedNodeMetas.stream().mapToInt(meta -> meta.loadIds.size()).sum(); + Set needUpdateLoadIds = new HashSet<>(); + while (!unfinishedNodeMetas.isEmpty()) { + int expectMaxLoadNum = (unfinishedParalell + unfinishedNodeMetas.size() - 1) / unfinishedNodeMetas.size(); + NodeMeta largeNodeMeta = unfinishedNodeMetas.pollLast(); + Set largeLoadIds = largeNodeMeta.loadIds; + List changedNodeMetas = new ArrayList<>(); + while (!unfinishedNodeMetas.isEmpty()) { + NodeMeta smallNodeMeta = unfinishedNodeMetas.pollFirst(); + changedNodeMetas.add(smallNodeMeta); + Set smallLoadIds = smallNodeMeta.loadIds; + List changedLoadIds = new ArrayList<>(); + for (long loadId : largeLoadIds) { + if (largeLoadIds.size() - changedLoadIds.size() <= expectMaxLoadNum) { + break; + } + if (smallLoadIds.size() + changedLoadIds.size() >= expectMaxLoadNum) { + break; + } + if (!smallLoadIds.contains(loadId)) { + changedLoadIds.add(loadId); + } + } + changedLoadIds.forEach(largeLoadIds::remove); + smallLoadIds.addAll(changedLoadIds); + needUpdateLoadIds.addAll(changedLoadIds); + if (largeLoadIds.size() <= expectMaxLoadNum) { + break; + } + } + unfinishedNodeMetas.addAll(changedNodeMetas); + finishedNodeMetas.add(largeNodeMeta); + unfinishedParalell -= largeNodeMeta.loadIds.size(); + double diffRatio = calculateLoadDiffRatio(finishedNodeMetas, unfinishedNodeMetas); + if (diffRatio <= expectDiffRatio) { + break; + } + } + warehouseMeta.sortedNodeMetaSet.clear(); + warehouseMeta.sortedNodeMetaSet.addAll(finishedNodeMetas); + warehouseMeta.sortedNodeMetaSet.addAll(unfinishedNodeMetas); + + for (long loadId : needUpdateLoadIds) { + LoadMeta loadMeta = warehouseMeta.schedulingLoadMetas.get(loadId); + List newNodes = new ArrayList<>(loadMeta.expectParallel); + for (NodeMeta nodeMeta : warehouseMeta.sortedNodeMetaSet) { + if (nodeMeta.loadIds.contains(loadId)) { + newNodes.add(nodeMeta.node); + } + } + List oldNodes = loadMeta.nodes; + loadMeta.nodes = newNodes; + LOG.info("Node change trigger reassign, load id: {}, warehouse: {}, {}, num old nodes: {}, " + + "num new nodes: {}, old nodes: {}, new nodes: {}", + loadMeta.loadId, loadMeta.warehouseName, loadMeta.tableId, + oldNodes.size(), newNodes.size(), oldNodes, newNodes); + } + LOG.info("Finish to balance warehouse {}, num changed loads: {}, cost: {} ms", + warehouseMeta.warehouseName, needUpdateLoadIds.size(), System.currentTimeMillis() - startTime); + logStat(warehouseMeta); + } + + private double calculateLoadDiffRatio(NavigableSet nodeMetas) { + if (nodeMetas.size() <= 1) { + return 0; + } + + int minLoadNum = nodeMetas.first().loadIds.size(); + int maxLoadNum = nodeMetas.last().loadIds.size(); + if (maxLoadNum <= 1) { + return 0; + } + return (maxLoadNum - minLoadNum) / (double) maxLoadNum; + } + + private double calculateLoadDiffRatio(NavigableSet left, NavigableSet right) { + if (left.isEmpty()) { + return calculateLoadDiffRatio(right); + } + + if (right.isEmpty()) { + return calculateLoadDiffRatio(left); + } + + int minLoadNum = Math.min(left.first().loadIds.size(), right.first().loadIds.size()); + int maxLoadNum = Math.max(left.last().loadIds.size(), right.last().loadIds.size()); + if (maxLoadNum <= 1) { + return 0; + } + return (maxLoadNum - minLoadNum) / (double) maxLoadNum; + } + + private void logStat(WarehouseScheduleMeta warehouseMeta) { + StringBuilder nodeStat = new StringBuilder(); + NavigableSet nodeMetas = warehouseMeta.sortedNodeMetaSet; + Map loadMetas = warehouseMeta.schedulingLoadMetas; + int totalExpectParallel = loadMetas.values().stream().mapToInt(meta -> meta.expectParallel).sum(); + int totalActualParallel = loadMetas.values().stream().mapToInt(meta -> meta.nodes.size()).sum(); + int totalNodeLoadParallel = nodeMetas.stream().mapToInt(meta -> meta.loadIds.size()).sum(); + for (NodeMeta nodeMeta : nodeMetas) { + if (nodeStat.length() > 0) { + nodeStat.append(","); + } + ComputeNode node = nodeMeta.node; + nodeStat.append("["); + nodeStat.append(node.getId()); + nodeStat.append(","); + nodeStat.append(node.getHost()); + nodeStat.append(","); + nodeStat.append(nodeMeta.loadIds.size()); + nodeStat.append("]"); + } + + LOG.info("Statistics for warehouse {}, num nodes: {}, num loads: {}, total parallel expect/actual: {}/{}, " + + "node parallel: {}, load diff ratio actual: {}, load distribution [node id,host,#loads]: {}", + warehouseMeta.warehouseName, nodeMetas.size(), loadMetas.size(), totalExpectParallel, + totalActualParallel, totalNodeLoadParallel, calculateLoadDiffRatio(nodeMetas), nodeStat); + + if (LOG.isDebugEnabled()) { + for (LoadMeta meta : loadMetas.values()) { + LOG.debug("Load details, warehouse: {}, load id: {}, {}, parallel expect/actual: {}/{}, nodes: {}", + meta.warehouseName, meta.loadId, meta.tableId, meta.expectParallel, + meta.nodes.size(), meta.nodes); + } + for (NodeMeta nodeMeta : nodeMetas) { + LOG.debug("Node details, warehouse: {}, node: {}, num load: {}, load ids: {}", + warehouseMeta.warehouseName, nodeMeta.node, nodeMeta.loadIds.size(), nodeMeta.loadIds); + } + } + } + + private List getAvailableNodes(String warehouseName) { + List nodes = new ArrayList<>(); + if (RunMode.isSharedDataMode()) { + List computeIds = GlobalStateMgr.getCurrentState().getWarehouseMgr().getAllComputeNodeIds(warehouseName); + for (long nodeId : computeIds) { + ComputeNode node = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getBackendOrComputeNode(nodeId); + if (node != null && node.isAvailable()) { + nodes.add(node); + } + } + } else { + nodes.addAll(GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getAvailableBackends()); + } + return nodes; + } + + @VisibleForTesting + long numScheduledTasks() { + return numScheduledTasks.get(); + } + + @VisibleForTesting + WarehouseScheduleMeta getWarehouseMeta(String warehouse) { + return warehouseScheduleMetas.get(warehouse); + } + + @VisibleForTesting + void disablePeriodicalScheduleForTest() { + while (!isPeriodicalScheduleRunning.compareAndSet(false, true)) { + } + } + + @VisibleForTesting + double currentLoadDiffRatio(String warehouse) { + WarehouseScheduleMeta whMeta = warehouseScheduleMetas.get(warehouse); + if (whMeta == null) { + return 0; + } + return calculateLoadDiffRatio(whMeta.sortedNodeMetaSet); + } + + enum ScheduleType { + UNREGISTER_LOAD(0), + DETECT_UNAVAILABLE_NODES(1), + REGISTER_LOAD(2); + + // a larger number is with a higher priority + private final int priority; + + ScheduleType(int priority) { + this.priority = priority; + } + + public int getPriority() { + return priority; + } + } + + static class ScheduleTask { + + final long taskId; + final ScheduleType scheduleType; + final Runnable scheduleRunnable; + final CompletableFuture future; + + public ScheduleTask(long taskId, ScheduleType scheduleType, Runnable scheduleRunnable) { + this.taskId = taskId; + this.scheduleType = scheduleType; + this.scheduleRunnable = scheduleRunnable; + this.future = new CompletableFuture<>(); + } + + public long getTaskId() { + return taskId; + } + + public ScheduleType getScheduleType() { + return scheduleType; + } + + public Runnable getScheduleRunnable() { + return scheduleRunnable; + } + + public void finish() { + future.complete(null); + } + + public void wailFinish() throws Exception { + future.get(); + } + } + + // a task is larger with a higher ScheduleType priority and a larger unique task id + static class ScheduleTaskComparator implements Comparator { + + static final ScheduleTaskComparator INSTANCE = new ScheduleTaskComparator(); + + @Override + public int compare(ScheduleTask task1, ScheduleTask task2) { + int ret = Integer.compare(task1.getScheduleType().getPriority(), task2.getScheduleType().getPriority()); + if (ret != 0) { + return ret; + } + + // a smaller task id is with high priority + return Long.compare(task2.getTaskId(), task1.getTaskId()); + } + } + + static class NodeMeta { + ComputeNode node; + Set loadIds; + + public NodeMeta(ComputeNode node) { + this.node = node; + this.loadIds = new HashSet<>(); + } + } + + // a node is larger with a larger number of loads and a larger node id + static class NodeMetaComparator implements Comparator { + + public static final NodeMetaComparator INSTANCE = new NodeMetaComparator(); + + @Override + public int compare(NodeMeta node1, NodeMeta node2) { + int cmp = Integer.compare(node1.loadIds.size(), node2.loadIds.size()); + if (cmp != 0) { + return cmp; + } + return Long.compare(node1.node.getId(), node2.node.getId()); + } + } + + static class LoadMeta { + final long loadId; + final String warehouseName; + final TableId tableId; + final int expectParallel; + // copy-on-write. only can be set in the schedule thread which is single + volatile List nodes; + + public LoadMeta(long loadId, String warehouseName, TableId tableId, int expectParallel) { + this.loadId = loadId; + this.warehouseName = warehouseName; + this.tableId = tableId; + this.expectParallel = expectParallel; + this.nodes = new ArrayList<>(); + } + } + + static class WarehouseScheduleMeta { + final String warehouseName; + + // A set of node metas sorted according to NodeMetaComparator + final NavigableSet sortedNodeMetaSet; + + // load id -> LoadMeta. Loads that is being scheduled in nodeMetaMap. It may be not consistent + // with CoordinatorBackendAssignerImpl.registeredLoadMetas because registerLoad()/unregisterLoad() + // will submit schedule tasks which will executed asynchronously + final Map schedulingLoadMetas; + final AtomicBoolean needBalance; + + public WarehouseScheduleMeta(String warehouseName) { + this.warehouseName = warehouseName; + this.sortedNodeMetaSet = new TreeSet<>(NodeMetaComparator.INSTANCE); + this.schedulingLoadMetas = new HashMap<>(); + this.needBalance = new AtomicBoolean(false); + } + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/IsomorphicBatchWrite.java b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/IsomorphicBatchWrite.java new file mode 100644 index 0000000000000..9800ddb25a61c --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/IsomorphicBatchWrite.java @@ -0,0 +1,268 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.load.batchwrite; + +import com.google.common.collect.ImmutableMap; +import com.starrocks.common.Config; +import com.starrocks.common.util.DebugUtil; +import com.starrocks.common.util.UUIDUtil; +import com.starrocks.load.streamload.StreamLoadInfo; +import com.starrocks.qe.ConnectContext; +import com.starrocks.qe.DefaultCoordinator; +import com.starrocks.qe.scheduler.Coordinator; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.system.ComputeNode; +import com.starrocks.thrift.TStatus; +import com.starrocks.thrift.TStatusCode; +import com.starrocks.thrift.TUniqueId; +import org.apache.arrow.util.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +/** + * Responsible for managing batch write operations with the isomorphic parameters. + * It will allocate a load for each write operation, and monitor the status of the load. + */ +public class IsomorphicBatchWrite implements LoadExecuteCallback { + + private static final Logger LOG = LoggerFactory.getLogger(IsomorphicBatchWrite.class); + + private static final String LABEL_PREFIX = "batch_write_"; + + private final long id; + private final TableId tableId; + private final String warehouseName; + private final StreamLoadInfo streamLoadInfo; + private final int batchWriteIntervalMs; + private final int batchWriteParallel; + private final ImmutableMap loadParameters; + private final ConnectContext connectContext; + + /** + * The assigner for coordinator backends. + */ + private final CoordinatorBackendAssigner coordinatorBackendAssigner; + + /** + * The executor to run batch write tasks. + */ + private final Executor executor; + + /** + * The factory to create query coordinators. + */ + private final Coordinator.Factory queryCoordinatorFactory; + + /** + * The lock to manage concurrent access to the load executor map. + */ + private final ReentrantReadWriteLock lock; + + /** + * The map to store load executors, keyed by their labels. + */ + private final ConcurrentHashMap loadExecutorMap; + + private final AtomicLong lastLoadCreateTimeMs; + + public IsomorphicBatchWrite( + long id, + TableId tableId, + String warehouseName, + StreamLoadInfo streamLoadInfo, + int batchWriteIntervalMs, + int batchWriteParallel, + Map loadParameters, + ConnectContext connectContext, + CoordinatorBackendAssigner coordinatorBackendAssigner, + Executor executor) { + this.id = id; + this.tableId = tableId; + this.warehouseName = warehouseName; + this.streamLoadInfo = streamLoadInfo; + this.batchWriteIntervalMs = batchWriteIntervalMs; + this.batchWriteParallel = batchWriteParallel; + this.loadParameters = ImmutableMap.builder().putAll(loadParameters).build(); + this.connectContext = connectContext; + this.coordinatorBackendAssigner = coordinatorBackendAssigner; + this.executor = executor; + this.queryCoordinatorFactory = new DefaultCoordinator.Factory(); + this.loadExecutorMap = new ConcurrentHashMap<>(); + this.lock = new ReentrantReadWriteLock(); + this.lastLoadCreateTimeMs = new AtomicLong(System.currentTimeMillis()); + } + + public long getId() { + return id; + } + + public TableId getTableId() { + return tableId; + } + + public String getWarehouse() { + return warehouseName; + } + + public int getBatchWriteParallel() { + return batchWriteParallel; + } + + public int numRunningLoads() { + return loadExecutorMap.size(); + } + + /** + * Requests coordinator backends for the batch write operation. + * The backend can accept write operations for the specified table. + * + * @return The result of the request for coordinator backends. + */ + public RequestCoordinatorBackendResult requestCoordinatorBackends() { + TStatus status = new TStatus(); + List backends = null; + try { + backends = coordinatorBackendAssigner.getBackends(id); + if (!backends.isEmpty()) { + status.setStatus_code(TStatusCode.OK); + } else { + status.setStatus_code(TStatusCode.SERVICE_UNAVAILABLE); + String errMsg = String.format( + "Can't find available backends, db: %s, table: %s, warehouse: %s, load id: %s", + tableId.getDbName(), tableId.getTableName(), warehouseName, id); + status.setError_msgs(Collections.singletonList(errMsg)); + backends = null; + LOG.error(errMsg); + } + } catch (Throwable throwable) { + status.setStatus_code(TStatusCode.INTERNAL_ERROR); + String msg = String.format("Unexpected exception happen when getting backends, db: %s, table: %s, " + + "warehouse: %s, load id: %s, error: %s", tableId.getDbName(), tableId.getTableName(), + warehouseName, id, throwable.getMessage()); + status.setError_msgs(Collections.singletonList(msg)); + LOG.error("Failed to get backends, db: {}, table: {}, warehouse: {}, load id: {}", + tableId.getDbName(), tableId.getTableName(), warehouseName, id, throwable); + } + return new RequestCoordinatorBackendResult(status, backends); + } + + /** + * Requests a load for the write operation from the specified backend. + * + * @param backendId The id of the backend. + * @param backendHost The host of the backend. + * @return The result of the request for the load. + */ + public RequestLoadResult requestLoad(long backendId, String backendHost) { + TStatus status = new TStatus(); + lock.readLock().lock(); + try { + for (LoadExecutor loadExecutor : loadExecutorMap.values()) { + if (loadExecutor.isActive() && loadExecutor.containCoordinatorBackend(backendId)) { + status.setStatus_code(TStatusCode.OK); + return new RequestLoadResult(status, loadExecutor.getLabel()); + } + } + } finally { + lock.readLock().unlock(); + } + + lock.writeLock().lock(); + try { + for (LoadExecutor loadExecutor : loadExecutorMap.values()) { + if (loadExecutor.isActive() && loadExecutor.containCoordinatorBackend(backendId)) { + status.setStatus_code(TStatusCode.OK); + return new RequestLoadResult(status, loadExecutor.getLabel()); + } + } + + RequestCoordinatorBackendResult requestCoordinatorBackendResult = requestCoordinatorBackends(); + if (!requestCoordinatorBackendResult.isOk()) { + return new RequestLoadResult(requestCoordinatorBackendResult.getStatus(), null); + } + + Set backendIds = requestCoordinatorBackendResult.getValue().stream() + .map(ComputeNode::getId).collect(Collectors.toSet()); + if (!backendIds.contains(backendId)) { + ComputeNode backend = GlobalStateMgr.getCurrentState() + .getNodeMgr().getClusterInfo().getBackendOrComputeNode(backendId); + if (backend == null || !backend.isAvailable()) { + status.setStatus_code(TStatusCode.SERVICE_UNAVAILABLE); + status.setError_msgs(Collections.singletonList( + String.format("Backend [%s, %s] is not available", backendId, backendHost))); + return new RequestLoadResult(status, null); + } + backendIds.add(backendId); + } + + String label = LABEL_PREFIX + DebugUtil.printId(UUIDUtil.toTUniqueId(UUID.randomUUID())); + TUniqueId loadId = UUIDUtil.toTUniqueId(UUID.randomUUID()); + LoadExecutor loadExecutor = new LoadExecutor( + tableId, label, loadId, streamLoadInfo, batchWriteIntervalMs, loadParameters, + connectContext, backendIds, queryCoordinatorFactory, this); + loadExecutorMap.put(label, loadExecutor); + try { + executor.execute(loadExecutor); + } catch (Exception e) { + loadExecutorMap.remove(label); + status.setStatus_code(TStatusCode.INTERNAL_ERROR); + status.setError_msgs(Collections.singletonList(e.getMessage())); + return new RequestLoadResult(status, null); + } + status.setStatus_code(TStatusCode.OK); + lastLoadCreateTimeMs.set(System.currentTimeMillis()); + return new RequestLoadResult(status, label); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Checks if the batch write operation is active. + * + * @return true if there are active load executors or the idle time is less than + * the configured threshold, false otherwise. + */ + public boolean isActive() { + long idleTime = System.currentTimeMillis() - lastLoadCreateTimeMs.get(); + return !loadExecutorMap.isEmpty() || idleTime < Config.batch_write_idle_ms; + } + + @Override + public void finishLoad(String label) { + lock.writeLock().lock(); + try { + loadExecutorMap.remove(label); + } finally { + lock.writeLock().unlock(); + } + } + + @VisibleForTesting + LoadExecutor getLoadExecutor(String label) { + return loadExecutorMap.get(label); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/LoadExecuteCallback.java b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/LoadExecuteCallback.java new file mode 100644 index 0000000000000..fdad8c476cd9a --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/LoadExecuteCallback.java @@ -0,0 +1,28 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.load.batchwrite; + +/** + * Callback interface for load execution. + */ +public interface LoadExecuteCallback { + + /** + * Called when the load operation is finished. + * + * @param label The label associated with the load operation. + */ + void finishLoad(String label); +} diff --git a/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/LoadExecutor.java b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/LoadExecutor.java new file mode 100644 index 0000000000000..c3dcac7c7d829 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/LoadExecutor.java @@ -0,0 +1,276 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.load.batchwrite; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.starrocks.catalog.Database; +import com.starrocks.catalog.OlapTable; +import com.starrocks.catalog.Table; +import com.starrocks.common.LoadException; +import com.starrocks.common.Pair; +import com.starrocks.common.Status; +import com.starrocks.common.util.DebugUtil; +import com.starrocks.common.util.concurrent.lock.LockType; +import com.starrocks.common.util.concurrent.lock.Locker; +import com.starrocks.load.streamload.StreamLoadInfo; +import com.starrocks.qe.ConnectContext; +import com.starrocks.qe.QeProcessorImpl; +import com.starrocks.qe.scheduler.Coordinator; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.sql.LoadPlanner; +import com.starrocks.thrift.TUniqueId; +import com.starrocks.transaction.TabletCommitInfo; +import com.starrocks.transaction.TabletFailInfo; +import com.starrocks.transaction.TransactionState; +import org.apache.arrow.util.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Responsible for executing a load. + */ +public class LoadExecutor implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(LoadExecutor.class); + + // Initialized in constructor ================================== + private final TableId tableId; + private final String label; + private final TUniqueId loadId; + private final StreamLoadInfo streamLoadInfo; + private final ImmutableMap loadParameters; + private final ConnectContext connectContext; + private final Set coordinatorBackendIds; + private final int batchWriteIntervalMs; + private final Coordinator.Factory coordinatorFactory; + private final LoadExecuteCallback loadExecuteCallback; + private final TimeTrace timeTrace; + private final AtomicReference failure; + + // Initialized in beginTxn() ================================== + private long txnId = -1; + + // Initialized in executeLoad() ================================== + private Coordinator coordinator; + private List tabletCommitInfo; + private List tabletFailInfo; + + public LoadExecutor( + TableId tableId, + String label, + TUniqueId loadId, + StreamLoadInfo streamLoadInfo, + int batchWriteIntervalMs, + ImmutableMap loadParameters, + ConnectContext connectContext, + Set coordinatorBackendIds, + Coordinator.Factory coordinatorFactory, + LoadExecuteCallback loadExecuteCallback) { + this.tableId = tableId; + this.label = label; + this.loadId = loadId; + this.streamLoadInfo = streamLoadInfo; + this.batchWriteIntervalMs = batchWriteIntervalMs; + this.loadParameters = loadParameters; + this.connectContext = connectContext; + this.coordinatorBackendIds = coordinatorBackendIds; + this.coordinatorFactory = coordinatorFactory; + this.loadExecuteCallback = loadExecuteCallback; + this.timeTrace = new TimeTrace(); + this.failure = new AtomicReference<>(); + } + + @Override + public void run() { + timeTrace.startRunTsMs.set(System.currentTimeMillis()); + try { + beginTxn(); + executeLoad(); + commitAndPublishTxn(); + } catch (Throwable e) { + failure.set(e); + abortTxn(e); + LOG.error("Failed to execute load, label: {}, load id: {}, txn id: {}", + label, DebugUtil.printId(loadId), txnId, e); + } finally { + loadExecuteCallback.finishLoad(label); + timeTrace.finishTimeMs.set(System.currentTimeMillis()); + } + } + + public String getLabel() { + return label; + } + + /** + * Checks if the given backend id is contained in the coordinator backend IDs. + */ + public boolean containCoordinatorBackend(long backendId) { + return coordinatorBackendIds.contains(backendId); + } + + /** + * Checks if this batch is active and can accept new load requests. + */ + public boolean isActive() { + if (failure.get() != null) { + return false; + } + long joinPlanTimeMs = timeTrace.joinPlanTimeMs.get(); + return joinPlanTimeMs <= 0 || (System.currentTimeMillis() - joinPlanTimeMs < batchWriteIntervalMs); + } + + public Throwable getFailure() { + return failure.get(); + } + + private void beginTxn() throws Exception { + timeTrace.beginTxnTsMs.set(System.currentTimeMillis()); + Pair pair = getDbAndTable(); + txnId = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().beginTransaction( + pair.first.getId(), Lists.newArrayList(pair.second.getId()), label, + TransactionState.TxnCoordinator.fromThisFE(), + TransactionState.LoadJobSourceType.FRONTEND_STREAMING, + streamLoadInfo.getTimeout(), streamLoadInfo.getWarehouseId()); + } + + private void commitAndPublishTxn() throws Exception { + timeTrace.commitTxnTimeMs.set(System.currentTimeMillis()); + Pair pair = getDbAndTable(); + long publishTimeoutMs = + streamLoadInfo.getTimeout() * 1000L - (timeTrace.commitTxnTimeMs.get() - timeTrace.createTimeMs.get()); + boolean publishSuccess = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().commitAndPublishTransaction( + pair.first, txnId, tabletCommitInfo, tabletFailInfo, publishTimeoutMs, null); + if (!publishSuccess) { + throw new LoadException(String.format( + "Publish timeout, total timeout time: %s ms, publish timeout time: %s ms", + streamLoadInfo.getTimeout() * 1000, publishTimeoutMs)); + } + } + + private void abortTxn(Throwable reason) { + if (txnId == -1) { + return; + } + try { + Pair pair = getDbAndTable(); + GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().abortTransaction( + pair.first.getId(), txnId, reason == null ? "" : reason.getMessage()); + } catch (Exception e) { + LOG.error("Failed to abort transaction {}", txnId, e); + } + } + + private void executeLoad() throws Exception { + timeTrace.executePlanTimeMs.set(System.currentTimeMillis()); + try { + Pair pair = getDbAndTable(); + timeTrace.buildPlanTimeMs.set(System.currentTimeMillis()); + LoadPlanner loadPlanner = new LoadPlanner(-1, loadId, txnId, pair.first.getId(), + tableId.getDbName(), pair.second, streamLoadInfo.isStrictMode(), streamLoadInfo.getTimezone(), + streamLoadInfo.isPartialUpdate(), connectContext, null, + streamLoadInfo.getLoadMemLimit(), streamLoadInfo.getExecMemLimit(), + streamLoadInfo.getNegative(), coordinatorBackendIds.size(), streamLoadInfo.getColumnExprDescs(), + streamLoadInfo, label, streamLoadInfo.getTimeout()); + loadPlanner.setWarehouseId(streamLoadInfo.getWarehouseId()); + loadPlanner.setBatchWrite(batchWriteIntervalMs, loadParameters, coordinatorBackendIds); + loadPlanner.plan(); + + coordinator = coordinatorFactory.createStreamLoadScheduler(loadPlanner); + QeProcessorImpl.INSTANCE.registerQuery(loadId, coordinator); + timeTrace.executePlanTimeMs.set(System.currentTimeMillis()); + coordinator.exec(); + + timeTrace.joinPlanTimeMs.set(System.currentTimeMillis()); + int waitSecond = streamLoadInfo.getTimeout() - + (int) (System.currentTimeMillis() - timeTrace.createTimeMs.get()) / 1000; + if (coordinator.join(waitSecond)) { + Status status = coordinator.getExecStatus(); + if (!status.ok()) { + throw new LoadException( + String.format("Failed to execute load, status code: %s, error message: %s", + status.getErrorCodeString(), status.getErrorMsg())); + } + tabletCommitInfo = TabletCommitInfo.fromThrift(coordinator.getCommitInfos()); + tabletFailInfo = TabletFailInfo.fromThrift(coordinator.getFailInfos()); + } else { + throw new LoadException( + String.format("Timeout to execute load after waiting for %s seconds", waitSecond)); + } + } finally { + QeProcessorImpl.INSTANCE.unregisterQuery(loadId); + } + } + + private Pair getDbAndTable() throws Exception { + GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState(); + Database db = globalStateMgr.getLocalMetastore().getDb(tableId.getDbName()); + if (db == null) { + throw new LoadException(String.format("Database %s does not exist", tableId.getDbName())); + } + + Table table; + Locker locker = new Locker(); + locker.lockDatabase(db.getId(), LockType.READ); + try { + table = GlobalStateMgr.getCurrentState() + .getLocalMetastore().getTable(db.getFullName(), tableId.getTableName()); + } finally { + locker.unLockDatabase(db.getId(), LockType.READ); + } + if (table == null) { + throw new LoadException(String.format( + "Table [%s.%s] does not exist", tableId.getDbName(), tableId.getTableName())); + } + return Pair.create(db, (OlapTable) table); + } + + @VisibleForTesting + Set getCoordinatorBackendIds() { + return coordinatorBackendIds; + } + + @VisibleForTesting + Coordinator getCoordinator() { + return coordinator; + } + + @VisibleForTesting + TimeTrace getTimeTrace() { + return timeTrace; + } + + // Trace the timing of various stages of the load operation. + static class TimeTrace { + AtomicLong createTimeMs; + AtomicLong startRunTsMs = new AtomicLong(-1); + AtomicLong beginTxnTsMs = new AtomicLong(-1); + AtomicLong buildPlanTimeMs = new AtomicLong(-1); + AtomicLong executePlanTimeMs = new AtomicLong(-1); + AtomicLong joinPlanTimeMs = new AtomicLong(-1); + AtomicLong commitTxnTimeMs = new AtomicLong(-1); + AtomicLong finishTimeMs = new AtomicLong(-1); + + public TimeTrace() { + this.createTimeMs = new AtomicLong(System.currentTimeMillis()); + } + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/RequestCoordinatorBackendResult.java b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/RequestCoordinatorBackendResult.java new file mode 100644 index 0000000000000..5f5d3243e83b2 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/RequestCoordinatorBackendResult.java @@ -0,0 +1,31 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.load.batchwrite; + +import com.starrocks.system.ComputeNode; +import com.starrocks.thrift.TStatus; + +import java.util.List; + +/** + * Represents the result of a request for coordinator backends. + * The value is a list of compute nodes that can accept write. + */ +public class RequestCoordinatorBackendResult extends StatusOr> { + + public RequestCoordinatorBackendResult(TStatus status, List result) { + super(status, result); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/RequestLoadResult.java b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/RequestLoadResult.java new file mode 100644 index 0000000000000..9b56bc7de55b2 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/RequestLoadResult.java @@ -0,0 +1,28 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.load.batchwrite; + +import com.starrocks.thrift.TStatus; + +/** + * Represents the result of a load request. The value is a label + * that can be used to track the load. + */ +public class RequestLoadResult extends StatusOr { + + public RequestLoadResult(TStatus status, String label) { + super(status, label); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/StatusOr.java b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/StatusOr.java new file mode 100644 index 0000000000000..ae5fd30dad0f3 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/StatusOr.java @@ -0,0 +1,51 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.load.batchwrite; + +import com.google.common.base.Preconditions; +import com.starrocks.thrift.TStatus; +import com.starrocks.thrift.TStatusCode; + +/** + * Abstract class representing a status or a value. + * The value is only present if the status is OK. + * + * @param The type of the value. + */ +public abstract class StatusOr { + + protected final TStatus status; + protected final T value; + + public StatusOr(TStatus status, T value) { + Preconditions.checkArgument(( + status.status_code == TStatusCode.OK && value != null) + || (status.status_code != TStatusCode.OK && value == null)); + this.status = status; + this.value = value; + } + + public boolean isOk() { + return status.status_code.equals(TStatusCode.OK); + } + + public TStatus getStatus() { + return status; + } + + public T getValue() { + return value; + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/TableId.java b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/TableId.java new file mode 100644 index 0000000000000..0917aaca552dd --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/TableId.java @@ -0,0 +1,67 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.load.batchwrite; + +import java.util.Objects; + +/** + * Represents an identifier for a table in the database. + */ +public class TableId { + + /** The name of the database. */ + private final String dbName; + + /** The name of the table. */ + private final String tableName; + + public TableId(String dbName, String tableName) { + this.dbName = dbName; + this.tableName = tableName; + } + + public String getDbName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TableId tableId = (TableId) o; + return Objects.equals(dbName, tableId.dbName) && Objects.equals(tableName, tableId.tableName); + } + + @Override + public int hashCode() { + return Objects.hash(dbName, tableName); + } + + @Override + public String toString() { + return "TableId{" + + "dbName='" + dbName + '\'' + + ", tableName='" + tableName + '\'' + + '}'; + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadHttpHeader.java b/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadHttpHeader.java index 792f87b849762..aadbf661a26e1 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadHttpHeader.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadHttpHeader.java @@ -16,6 +16,8 @@ import java.util.Arrays; import java.util.List; +import static com.starrocks.http.rest.RestBaseAction.WAREHOUSE_KEY; + /** * Http header names for stream load. They should be consistent * with those in http_common.h @@ -41,6 +43,7 @@ public class StreamLoadHttpHeader { public static final String HTTP_MERGE_CONDITION = "merge_condition"; public static final String HTTP_LOG_REJECTED_RECORD_NUM = "log_rejected_record_num"; public static final String HTTP_COMPRESSION = "compression"; + public static final String HTTP_WAREHOUSE = WAREHOUSE_KEY; // Headers for csv format ============================ public static final String HTTP_COLUMN_SEPARATOR = "column_separator"; @@ -55,13 +58,20 @@ public class StreamLoadHttpHeader { public static final String HTTP_JSONROOT = "json_root"; public static final String HTTP_STRIP_OUTER_ARRAY = "strip_outer_array"; + // Headers for batch write ========================== + public static final String HTTP_ENABLE_BATCH_WRITE = "enable_batch_write"; + public static final String HTTP_BATCH_WRITE_ASYNC = "batch_write_async"; + public static final String HTTP_BATCH_WRITE_INTERVAL_MS = "batch_write_interval_ms"; + public static final String HTTP_BATCH_WRITE_PARALLEL = "batch_write_parallel"; + // A list of all headers. If add a new header, should also add it to the list. public static final List HTTP_HEADER_LIST = Arrays.asList( - HTTP_FORMAT, HTTP_COLUMNS, HTTP_WHERE, HTTP_COLUMN_SEPARATOR, HTTP_ROW_DELIMITER, - HTTP_SKIP_HEADER, HTTP_TRIM_SPACE, HTTP_ENCLOSE, HTTP_ESCAPE, HTTP_MAX_FILTER_RATIO, - HTTP_TIMEOUT, HTTP_PARTITIONS, HTTP_TEMP_PARTITIONS, HTTP_NEGATIVE, HTTP_STRICT_MODE, - HTTP_TIMEZONE, HTTP_LOAD_MEM_LIMIT, HTTP_JSONPATHS, HTTP_JSONROOT, HTTP_STRIP_OUTER_ARRAY, - HTTP_PARTIAL_UPDATE, HTTP_PARTIAL_UPDATE_MODE, HTTP_TRANSMISSION_COMPRESSION_TYPE, HTTP_LOAD_DOP, - HTTP_ENABLE_REPLICATED_STORAGE, HTTP_MERGE_CONDITION, HTTP_LOG_REJECTED_RECORD_NUM, HTTP_COMPRESSION + HTTP_FORMAT, HTTP_COLUMNS, HTTP_WHERE, HTTP_COLUMN_SEPARATOR, HTTP_ROW_DELIMITER, HTTP_SKIP_HEADER, + HTTP_TRIM_SPACE, HTTP_ENCLOSE, HTTP_ESCAPE, HTTP_MAX_FILTER_RATIO, HTTP_TIMEOUT, HTTP_PARTITIONS, + HTTP_TEMP_PARTITIONS, HTTP_NEGATIVE, HTTP_STRICT_MODE, HTTP_TIMEZONE, HTTP_LOAD_MEM_LIMIT, + HTTP_JSONPATHS, HTTP_JSONROOT, HTTP_STRIP_OUTER_ARRAY, HTTP_PARTIAL_UPDATE, HTTP_PARTIAL_UPDATE_MODE, + HTTP_TRANSMISSION_COMPRESSION_TYPE, HTTP_LOAD_DOP, HTTP_ENABLE_REPLICATED_STORAGE, HTTP_MERGE_CONDITION, + HTTP_LOG_REJECTED_RECORD_NUM, HTTP_COMPRESSION, HTTP_WAREHOUSE, HTTP_ENABLE_BATCH_WRITE, + HTTP_BATCH_WRITE_ASYNC, HTTP_BATCH_WRITE_INTERVAL_MS, HTTP_BATCH_WRITE_PARALLEL ); } diff --git a/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadInfo.java b/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadInfo.java index 7d9c63e515e0a..5e7143cdca586 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadInfo.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadInfo.java @@ -48,6 +48,8 @@ import java.util.List; import java.util.Optional; +import static com.starrocks.server.WarehouseManager.DEFAULT_WAREHOUSE_NAME; + public class StreamLoadInfo { private static final Logger LOG = LogManager.getLogger(StreamLoadInfo.class); @@ -100,7 +102,7 @@ public StreamLoadInfo(TUniqueId id, long txnId, TFileType fileType, TFileFormatT this.stripOuterArray = false; } - public StreamLoadInfo(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType, int timeout) { + public StreamLoadInfo(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType, Optional timeout) { this.id = id; this.txnId = txnId; this.fileType = fileType; @@ -108,7 +110,7 @@ public StreamLoadInfo(TUniqueId id, long txnId, TFileType fileType, TFileFormatT this.jsonPaths = ""; this.jsonRoot = ""; this.stripOuterArray = false; - this.timeout = timeout; + timeout.ifPresent(integer -> this.timeout = integer); } public String getConfluentSchemaRegistryUrl() { @@ -271,12 +273,19 @@ public long getWarehouseId() { return warehouseId; } - public static StreamLoadInfo fromHttpStreamLoadRequest(TUniqueId id, long txnId, int timeout, StreamLoadKvParams params) + public static StreamLoadInfo fromHttpStreamLoadRequest( + TUniqueId id, long txnId, Optional timeout, StreamLoadKvParams params) throws UserException { StreamLoadInfo streamLoadInfo = new StreamLoadInfo(id, txnId, params.getFileType().orElse(TFileType.FILE_STREAM), params.getFileFormatType().orElse(TFileFormatType.FORMAT_CSV_PLAIN), timeout); streamLoadInfo.setOptionalFromStreamLoad(params); + String warehouseName = params.getWarehouse().orElse(DEFAULT_WAREHOUSE_NAME); + Warehouse warehouse = GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouse(warehouseName); + if (warehouse == null) { + throw new UserException(String.format("Warehouse [%s] does not exist", warehouseName)); + } + streamLoadInfo.setWarehouseId(warehouse.getId()); return streamLoadInfo; } diff --git a/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadKvParams.java b/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadKvParams.java index 6a4454ce7a6ac..4ddaf50e6f6ff 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadKvParams.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadKvParams.java @@ -19,15 +19,20 @@ import com.starrocks.thrift.TPartialUpdateMode; import io.netty.handler.codec.http.HttpHeaders; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; import static com.starrocks.http.rest.RestBaseAction.WAREHOUSE_KEY; +import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_BATCH_WRITE_ASYNC; +import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_BATCH_WRITE_INTERVAL_MS; +import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_BATCH_WRITE_PARALLEL; import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_COLUMNS; import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_COLUMN_SEPARATOR; import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_COMPRESSION; +import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_ENABLE_BATCH_WRITE; import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_ENABLE_REPLICATED_STORAGE; import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_ENCLOSE; import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_ESCAPE; @@ -145,11 +150,7 @@ public Optional getIsTempPartition() { @Override public Optional getNegative() { - String negative = params.get(HTTP_NEGATIVE); - if (negative == null) { - return Optional.empty(); - } - return Optional.of(Boolean.parseBoolean(negative)); + return getBoolParam(HTTP_NEGATIVE); } @Override @@ -163,20 +164,12 @@ public Optional getMaxFilterRatio() { @Override public Optional getTimeout() { - String timeout = params.get(HTTP_TIMEOUT); - if (timeout == null) { - return Optional.empty(); - } - return Optional.of(Integer.parseInt(timeout)); + return getIntParam(HTTP_TIMEOUT); } @Override public Optional getStrictMode() { - String strictMode = params.get(HTTP_STRICT_MODE); - if (strictMode == null) { - return Optional.empty(); - } - return Optional.of(Boolean.parseBoolean(strictMode)); + return getBoolParam(HTTP_STRICT_MODE); } @Override @@ -186,11 +179,7 @@ public Optional getTimezone() { @Override public Optional getLoadMemLimit() { - String loadMemLimit = params.get(HTTP_LOAD_MEM_LIMIT); - if (loadMemLimit == null) { - return Optional.empty(); - } - return Optional.of(Long.parseLong(loadMemLimit)); + return getLongParam(HTTP_LOAD_MEM_LIMIT); } @Override @@ -200,20 +189,12 @@ public Optional getTransmissionCompressionType() { @Override public Optional getLoadDop() { - String loadDop = params.get(HTTP_LOAD_DOP); - if (loadDop == null) { - return Optional.empty(); - } - return Optional.of(Integer.parseInt(loadDop)); + return getIntParam(HTTP_LOAD_DOP); } @Override public Optional getEnableReplicatedStorage() { - String enableReplicatedStorage = params.get(HTTP_ENABLE_REPLICATED_STORAGE); - if (enableReplicatedStorage == null) { - return Optional.empty(); - } - return Optional.of(Boolean.parseBoolean(enableReplicatedStorage)); + return getBoolParam(HTTP_ENABLE_REPLICATED_STORAGE); } @Override @@ -223,20 +204,12 @@ public Optional getMergeCondition() { @Override public Optional getLogRejectedRecordNum() { - String logRejectedRecordNum = params.get(HTTP_LOG_REJECTED_RECORD_NUM); - if (logRejectedRecordNum == null) { - return Optional.empty(); - } - return Optional.of(Long.parseLong(logRejectedRecordNum)); + return getLongParam(HTTP_LOG_REJECTED_RECORD_NUM); } @Override public Optional getPartialUpdate() { - String partialUpdate = params.get(HTTP_PARTIAL_UPDATE); - if (partialUpdate == null) { - return Optional.empty(); - } - return Optional.of(Boolean.parseBoolean(partialUpdate)); + return getBoolParam(HTTP_PARTIAL_UPDATE); } @Override @@ -282,11 +255,7 @@ public Optional getRowDelimiter() { @Override public Optional getSkipHeader() { - String skipHeader = params.get(HTTP_SKIP_HEADER); - if (skipHeader == null) { - return Optional.empty(); - } - return Optional.of(Long.parseLong(skipHeader)); + return getLongParam(HTTP_SKIP_HEADER); } @Override @@ -309,11 +278,7 @@ public Optional getEscape() { @Override public Optional getTrimSpace() { - String trimSpace = params.get(HTTP_TRIM_SPACE); - if (trimSpace == null) { - return Optional.empty(); - } - return Optional.of(Boolean.parseBoolean(trimSpace)); + return getBoolParam(HTTP_TRIM_SPACE); } @Override @@ -328,11 +293,51 @@ public Optional getJsonRoot() { @Override public Optional getStripOuterArray() { - String stripOuterArray = params.get(HTTP_STRIP_OUTER_ARRAY); - if (stripOuterArray == null) { + return getBoolParam(HTTP_STRIP_OUTER_ARRAY); + } + + public Optional getEnableBatchWrite() { + return getBoolParam(HTTP_ENABLE_BATCH_WRITE); + } + + public Optional getBatchWriteAsync() { + return getBoolParam(HTTP_BATCH_WRITE_ASYNC); + } + + public Optional getBatchWriteIntervalMs() { + return getIntParam(HTTP_BATCH_WRITE_INTERVAL_MS); + } + + public Optional getBatchWriteParallel() { + return getIntParam(HTTP_BATCH_WRITE_PARALLEL); + } + + private Optional getBoolParam(String paramName) { + String value = params.get(paramName); + if (value == null) { return Optional.empty(); } - return Optional.of(Boolean.parseBoolean(stripOuterArray)); + return Optional.of(Boolean.parseBoolean(value)); + } + + private Optional getIntParam(String paramName) { + String value = params.get(paramName); + if (value == null) { + return Optional.empty(); + } + return Optional.of(Integer.parseInt(value)); + } + + private Optional getLongParam(String paramName) { + String value = params.get(paramName); + if (value == null) { + return Optional.empty(); + } + return Optional.of(Long.parseLong(value)); + } + + public Map toMap() { + return Collections.unmodifiableMap(params); } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java b/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java index 2376ae78bf1ba..d44462759a8a4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java @@ -79,6 +79,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -767,7 +768,8 @@ private Coordinator.Factory getCoordinatorFactory() { public void unprotectedExecute(HttpHeaders headers) throws UserException { streamLoadParams = StreamLoadKvParams.fromHttpHeaders(headers); - streamLoadInfo = StreamLoadInfo.fromHttpStreamLoadRequest(loadId, txnId, (int) timeoutMs / 1000, streamLoadParams); + streamLoadInfo = StreamLoadInfo.fromHttpStreamLoadRequest( + loadId, txnId, Optional.of((int) timeoutMs / 1000), streamLoadParams); if (table == null) { getTable(); } diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/StreamLoadScanNode.java index b321023081cb6..1e9b1a2ce0e26 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/StreamLoadScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/StreamLoadScanNode.java @@ -34,6 +34,7 @@ package com.starrocks.planner; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.starrocks.analysis.Analyzer; @@ -80,9 +81,11 @@ import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.UUID; import static com.starrocks.catalog.DefaultExpr.SUPPORTED_DEFAULT_FNS; @@ -117,6 +120,11 @@ public class StreamLoadScanNode extends LoadScanNode { private boolean needAssignBE; + private boolean enableBatchWrite = false; + private int batchWriteIntervalMs; + private ImmutableMap batchWriteParameters; + private Set batchWriteBackendIds; + private List backends; private int nextBe = 0; private final Random random = new Random(System.currentTimeMillis()); @@ -175,6 +183,14 @@ public void setNeedAssignBE(boolean needAssignBE) { this.needAssignBE = needAssignBE; } + public void setBatchWrite(int batchWriteIntervalMs, ImmutableMap loadParameters, Set batchWriteBackendIds) { + setNeedAssignBE(true); + this.enableBatchWrite = true; + this.batchWriteIntervalMs = batchWriteIntervalMs; + this.batchWriteParameters = loadParameters; + this.batchWriteBackendIds = new HashSet<>(batchWriteBackendIds); + } + public boolean nullExprInAutoIncrement() { return nullExprInAutoIncrement; } @@ -252,15 +268,28 @@ public void finalizeStats(Analyzer analyzer) throws UserException, UserException private void assignBackends() throws UserException { backends = Lists.newArrayList(); - for (Backend be : GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getIdToBackend().values()) { - if (be.isAvailable()) { - backends.add(be); + if (enableBatchWrite) { + for (long backendId : batchWriteBackendIds) { + Backend backend = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getBackend(backendId); + if (backend == null) { + throw new UserException(String.format("Can't find batch write backend [%s]", backendId)); + } + if (!backend.isAvailable()) { + throw new UserException(String.format("Batch write backend [%s] is not available", backendId)); + } + backends.add(backend); + } + } else { + for (Backend be : GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getIdToBackend().values()) { + if (be.isAvailable()) { + backends.add(be); + } } + Collections.shuffle(backends, random); } if (backends.isEmpty()) { throw new UserException("No available backends"); } - Collections.shuffle(backends, random); } private void finalizeParams() throws UserException { @@ -398,6 +427,9 @@ private void createScanRange() throws UserException { brokerScanRange.setBroker_addresses(Lists.newArrayList()); if (needAssignBE) { brokerScanRange.setChannel_id(curChannelId++); + brokerScanRange.setEnable_batch_write(enableBatchWrite); + brokerScanRange.setBatch_write_interval_ms(batchWriteIntervalMs); + brokerScanRange.setBatch_write_parameters(batchWriteParameters); } TScanRangeLocations locations = new TScanRangeLocations(); TScanRange scanRange = new TScanRange(); diff --git a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java index 60914c8e0e9c7..e67b84f5be3c9 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java @@ -141,6 +141,7 @@ import com.starrocks.load.ExportMgr; import com.starrocks.load.InsertOverwriteJobMgr; import com.starrocks.load.Load; +import com.starrocks.load.batchwrite.BatchWriteMgr; import com.starrocks.load.loadv2.LoadEtlChecker; import com.starrocks.load.loadv2.LoadJobScheduler; import com.starrocks.load.loadv2.LoadLoadingChecker; @@ -299,6 +300,7 @@ public class GlobalStateMgr { private final LoadMgr loadMgr; private final RoutineLoadMgr routineLoadMgr; private final StreamLoadMgr streamLoadMgr; + private final BatchWriteMgr batchWriteMgr; private final ExportMgr exportMgr; private final MaterializedViewMgr materializedViewMgr; @@ -631,6 +633,7 @@ private GlobalStateMgr(boolean isCkptGlobalState, NodeMgr nodeMgr) { this.load = new Load(); this.streamLoadMgr = new StreamLoadMgr(); this.routineLoadMgr = new RoutineLoadMgr(); + this.batchWriteMgr = new BatchWriteMgr(); this.exportMgr = new ExportMgr(); this.materializedViewMgr = new MaterializedViewMgr(); @@ -1369,6 +1372,7 @@ private void startLeaderOnlyDaemonThreads() { // start routine load scheduler routineLoadScheduler.start(); routineLoadTaskScheduler.start(); + batchWriteMgr.start(); // start dynamic partition task dynamicPartitionScheduler.start(); // start daemon thread to update db used data quota for db txn manager periodically @@ -2125,6 +2129,10 @@ public StreamLoadMgr getStreamLoadMgr() { return streamLoadMgr; } + public BatchWriteMgr getBatchWriteMgr() { + return batchWriteMgr; + } + public RoutineLoadTaskScheduler getRoutineLoadTaskScheduler() { return routineLoadTaskScheduler; } diff --git a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java index fbae7a0305252..04cfdf77d296d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java @@ -104,6 +104,8 @@ import com.starrocks.lake.compaction.CompactionMgr; import com.starrocks.leader.LeaderImpl; import com.starrocks.load.EtlJobType; +import com.starrocks.load.batchwrite.RequestLoadResult; +import com.starrocks.load.batchwrite.TableId; import com.starrocks.load.loadv2.LoadJob; import com.starrocks.load.loadv2.LoadMgr; import com.starrocks.load.loadv2.ManualLoadTxnCommitAttachment; @@ -116,6 +118,7 @@ import com.starrocks.load.routineload.RoutineLoadJob; import com.starrocks.load.routineload.RoutineLoadMgr; import com.starrocks.load.streamload.StreamLoadInfo; +import com.starrocks.load.streamload.StreamLoadKvParams; import com.starrocks.load.streamload.StreamLoadMgr; import com.starrocks.load.streamload.StreamLoadTask; import com.starrocks.metric.MetricRepo; @@ -167,6 +170,8 @@ import com.starrocks.thrift.TAuthenticateParams; import com.starrocks.thrift.TBatchReportExecStatusParams; import com.starrocks.thrift.TBatchReportExecStatusResult; +import com.starrocks.thrift.TBatchWriteRequest; +import com.starrocks.thrift.TBatchWriteResult; import com.starrocks.thrift.TBeginRemoteTxnRequest; import com.starrocks.thrift.TBeginRemoteTxnResponse; import com.starrocks.thrift.TColumnDef; @@ -1681,6 +1686,34 @@ TExecPlanFragmentParams streamLoadPutImpl(TStreamLoadPutRequest request) throws } } + @Override + public TBatchWriteResult requestBatchWrite(TBatchWriteRequest request) throws TException { + TBatchWriteResult result = new TBatchWriteResult(); + try { + checkPasswordAndLoadPriv(request.getUser(), request.getPasswd(), request.getDb(), + request.getTbl(), request.getUser_ip()); + TableId tableId = new TableId(request.getDb(), request.getTbl()); + StreamLoadKvParams params = new StreamLoadKvParams(request.getParams()); + RequestLoadResult loadResult = GlobalStateMgr.getCurrentState() + .getBatchWriteMgr().requestLoad(tableId, params, request.getBackend_id(), request.getBackend_host()); + result.setStatus(loadResult.getStatus()); + if (loadResult.isOk()) { + result.setLabel(loadResult.getValue()); + } + } catch (AuthenticationException authenticationException) { + TStatus status = new TStatus(); + status.setStatus_code(TStatusCode.NOT_AUTHORIZED); + status.addToError_msgs(authenticationException.getMessage()); + result.setStatus(status); + } catch (Throwable throwable) { + TStatus status = new TStatus(); + status.setStatus_code(TStatusCode.INTERNAL_ERROR); + status.addToError_msgs(throwable.getMessage()); + result.setStatus(status); + } + return result; + } + @Override public TStatus snapshotLoaderReport(TSnapshotLoaderReportRequest request) throws TException { if (GlobalStateMgr.getCurrentState().getBackupHandler().report(request.getTask_type(), request.getJob_id(), diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/LoadPlanner.java b/fe/fe-core/src/main/java/com/starrocks/sql/LoadPlanner.java index a3659a72e0d41..77f344347554e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/LoadPlanner.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/LoadPlanner.java @@ -15,6 +15,7 @@ package com.starrocks.sql; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.starrocks.analysis.Analyzer; @@ -74,6 +75,7 @@ import org.apache.logging.log4j.Logger; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -135,6 +137,7 @@ public class LoadPlanner { private TPartialUpdateMode partialUpdateMode = TPartialUpdateMode.ROW_MODE; private long warehouseId = WarehouseManager.DEFAULT_WAREHOUSE_ID; + private Set candidateBackendIds = new HashSet<>(); private LoadJob.JSONOptions jsonOptions = new LoadJob.JSONOptions(); @@ -142,6 +145,12 @@ public class LoadPlanner { private String mergeConditionStr; + // Only valid for stream load + private boolean enableBatchWrite = false; + private int batchWriteIntervalMs; + private ImmutableMap batchWriteParameters; + private Set batchWriteBackendIds; + public LoadPlanner(long loadJobId, TUniqueId loadId, long txnId, long dbId, OlapTable destTable, boolean strictMode, String timezone, long timeoutS, long startTime, boolean partialUpdate, ConnectContext context, @@ -239,6 +248,14 @@ public long getWarehouseId() { return warehouseId; } + public void setBatchWrite( + int batchWriteIntervalMs, ImmutableMap loadParameters, Set batchWriteBackendIds) { + this.enableBatchWrite = true; + this.batchWriteIntervalMs = batchWriteIntervalMs; + this.batchWriteParameters = loadParameters; + this.batchWriteBackendIds = new HashSet<>(batchWriteBackendIds); + } + public void setPartialUpdateMode(TPartialUpdateMode mode) { this.partialUpdateMode = mode; } @@ -417,6 +434,9 @@ private ScanNode prepareScanNodes() throws UserException { StreamLoadScanNode streamScanNode = new StreamLoadScanNode(loadId, new PlanNodeId(0), tupleDesc, destTable, streamLoadInfo, dbName, label, parallelInstanceNum, txnId, warehouseId); streamScanNode.setNeedAssignBE(true); + if (enableBatchWrite) { + streamScanNode.setBatchWrite(batchWriteIntervalMs, batchWriteParameters, batchWriteBackendIds); + } streamScanNode.setUseVectorizedLoad(true); streamScanNode.init(analyzer); streamScanNode.finalizeStats(analyzer); diff --git a/fe/fe-core/src/test/java/com/starrocks/http/LoadActionTest.java b/fe/fe-core/src/test/java/com/starrocks/http/LoadActionTest.java new file mode 100644 index 0000000000000..22d37a2f7c321 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/http/LoadActionTest.java @@ -0,0 +1,130 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.http; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.starrocks.load.batchwrite.BatchWriteMgr; +import com.starrocks.load.batchwrite.RequestCoordinatorBackendResult; +import com.starrocks.load.batchwrite.TableId; +import com.starrocks.load.streamload.StreamLoadKvParams; +import com.starrocks.system.ComputeNode; +import com.starrocks.thrift.TStatus; +import com.starrocks.thrift.TStatusCode; +import mockit.Mock; +import mockit.MockUp; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import okhttp3.ResponseBody; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_ENABLE_BATCH_WRITE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class LoadActionTest extends StarRocksHttpTestCase { + + private OkHttpClient noRedirectClient = new OkHttpClient.Builder() + .readTimeout(100, TimeUnit.SECONDS) + .followRedirects(false) + .build(); + + @Test + public void testBatchWriteStreamLoadSuccess() throws Exception { + Map map = new HashMap<>(); + map.put(HTTP_ENABLE_BATCH_WRITE, "true"); + Request request = buildRequest(map); + List computeNodes = new ArrayList<>(); + List redirectLocations = new ArrayList<>(); + for (int i = 1; i <= 3; i++) { + String host = "192.0.0." + i; + int httpPort = 8040; + computeNodes.add(new ComputeNode(i, host, 9050)); + computeNodes.get(i - 1).setHttpPort(httpPort); + redirectLocations.add(getLoadUrl(host, httpPort)); + } + + new MockUp() { + @Mock + public RequestCoordinatorBackendResult requestCoordinatorBackends(TableId tableId, StreamLoadKvParams params) { + return new RequestCoordinatorBackendResult(new TStatus(TStatusCode.OK), computeNodes); + } + }; + + try (Response response = noRedirectClient.newCall(request).execute()) { + assertEquals(307, response.code()); + String location = response.header("Location"); + assertTrue(redirectLocations.contains(location)); + } + } + + @Test + public void testBatchWriteStreamLoadFailure() throws Exception { + Map map = new HashMap<>(); + map.put(HTTP_ENABLE_BATCH_WRITE, "true"); + Request request = buildRequest(map); + + new MockUp() { + @Mock + public RequestCoordinatorBackendResult requestCoordinatorBackends(TableId tableId, StreamLoadKvParams params) { + TStatus status = new TStatus(); + status.setStatus_code(TStatusCode.INTERNAL_ERROR); + status.addToError_msgs("artificial failure"); + return new RequestCoordinatorBackendResult(status, null); + } + }; + + try (Response response = noRedirectClient.newCall(request).execute()) { + assertEquals(200, response.code()); + Map result = parseResponseBody(response); + assertEquals("INTERNAL_ERROR", result.get("code")); + assertEquals("FAILED", result.get("status")); + assertEquals("artificial failure", result.get("message")); + assertEquals("artificial failure", result.get("msg")); + } + } + + private Request buildRequest(Map headers) { + Request.Builder builder = new Request.Builder(); + builder.addHeader("Authorization", rootAuth); + builder.addHeader("Expect", "100-continue"); + for (Map.Entry entry : headers.entrySet()) { + builder.addHeader(entry.getKey(), entry.getValue()); + } + builder.put(RequestBody.create(new byte[0])); + builder.url(String.format(BASE_URL + "/api/%s/%s/_stream_load", DB_NAME, TABLE_NAME)); + return builder.build(); + } + + private String getLoadUrl(String host, int port) { + return String.format("http://%s:%d/api/%s/%s/_stream_load", host, port, DB_NAME, TABLE_NAME); + } + + private static Map parseResponseBody(Response response) throws IOException { + ResponseBody body = response.body(); + assertNotNull(body); + String bodyStr = body.string(); + return objectMapper.readValue(bodyStr, new TypeReference<>() {}); + } +} diff --git a/fe/fe-core/src/test/java/com/starrocks/load/batchwrite/BatchWriteMgrTest.java b/fe/fe-core/src/test/java/com/starrocks/load/batchwrite/BatchWriteMgrTest.java new file mode 100644 index 0000000000000..ed264f60e3bdc --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/load/batchwrite/BatchWriteMgrTest.java @@ -0,0 +1,177 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.load.batchwrite; + +import com.starrocks.load.streamload.StreamLoadKvParams; +import com.starrocks.thrift.TStatusCode; +import mockit.Mock; +import mockit.MockUp; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; + +import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_BATCH_WRITE_INTERVAL_MS; +import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_BATCH_WRITE_PARALLEL; +import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_FORMAT; +import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_WAREHOUSE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class BatchWriteMgrTest extends BatchWriteTestBase { + + private BatchWriteMgr batchWriteMgr; + + private TableId tableId1; + private TableId tableId2; + private TableId tableId3; + private TableId tableId4; + + @Before + public void setup() { + batchWriteMgr = new BatchWriteMgr(); + batchWriteMgr.start(); + + tableId1 = new TableId(DB_NAME_1, TABLE_NAME_1_1); + tableId2 = new TableId(DB_NAME_1, TABLE_NAME_1_2); + tableId3 = new TableId(DB_NAME_2, TABLE_NAME_2_1); + tableId4 = new TableId(DB_NAME_2, TABLE_NAME_2_2); + } + + @Test + public void testRequestBackends() { + StreamLoadKvParams params1 = new StreamLoadKvParams(new HashMap<>() {{ + put(HTTP_BATCH_WRITE_INTERVAL_MS, "1000"); + put(HTTP_BATCH_WRITE_PARALLEL, "1"); + }}); + RequestCoordinatorBackendResult result1 = + batchWriteMgr.requestCoordinatorBackends(tableId1, params1); + assertTrue(result1.isOk()); + assertEquals(1, result1.getValue().size()); + assertEquals(1, batchWriteMgr.numBatchWrites()); + + StreamLoadKvParams params2 = new StreamLoadKvParams(new HashMap<>() {{ + put(HTTP_FORMAT, "json"); + put(HTTP_BATCH_WRITE_INTERVAL_MS, "1000"); + put(HTTP_BATCH_WRITE_PARALLEL, "1"); + }}); + RequestCoordinatorBackendResult result2 = + batchWriteMgr.requestCoordinatorBackends(tableId1, params2); + assertTrue(result2.isOk()); + assertEquals(1, result2.getValue().size()); + assertEquals(2, batchWriteMgr.numBatchWrites()); + + StreamLoadKvParams params3 = new StreamLoadKvParams(new HashMap<>() {{ + put(HTTP_BATCH_WRITE_INTERVAL_MS, "10000"); + put(HTTP_BATCH_WRITE_PARALLEL, "4"); + }}); + RequestCoordinatorBackendResult result3 = + batchWriteMgr.requestCoordinatorBackends(tableId2, params3); + assertTrue(result3.isOk()); + assertEquals(4, result3.getValue().size()); + assertEquals(3, batchWriteMgr.numBatchWrites()); + + StreamLoadKvParams params4 = new StreamLoadKvParams(new HashMap<>() {{ + put(HTTP_BATCH_WRITE_INTERVAL_MS, "10000"); + put(HTTP_BATCH_WRITE_PARALLEL, "4"); + }}); + RequestCoordinatorBackendResult result4 = + batchWriteMgr.requestCoordinatorBackends(tableId3, params4); + assertTrue(result4.isOk()); + assertEquals(4, result4.getValue().size()); + assertEquals(4, batchWriteMgr.numBatchWrites()); + } + + @Test + public void testRequestLoad() { + StreamLoadKvParams params = new StreamLoadKvParams(new HashMap<>() {{ + put(HTTP_BATCH_WRITE_INTERVAL_MS, "100000"); + put(HTTP_BATCH_WRITE_PARALLEL, "4"); + }}); + RequestLoadResult result1 = batchWriteMgr.requestLoad( + tableId4, params, allNodes.get(0).getId(), allNodes.get(0).getHost()); + assertTrue(result1.isOk()); + assertNotNull(result1.getValue()); + assertEquals(1, batchWriteMgr.numBatchWrites()); + + RequestLoadResult result2 = batchWriteMgr.requestLoad( + tableId4, params, allNodes.get(0).getId(), allNodes.get(0).getHost()); + assertTrue(result2.isOk()); + assertEquals(result1.getValue(), result2.getValue()); + assertEquals(1, batchWriteMgr.numBatchWrites()); + } + + @Test + public void testCheckParameters() { + StreamLoadKvParams params1 = new StreamLoadKvParams(new HashMap<>()); + RequestCoordinatorBackendResult result1 = + batchWriteMgr.requestCoordinatorBackends(tableId1, params1); + assertFalse(result1.isOk()); + assertEquals(TStatusCode.INVALID_ARGUMENT, result1.getStatus().getStatus_code()); + assertEquals(0, batchWriteMgr.numBatchWrites()); + + StreamLoadKvParams params2 = new StreamLoadKvParams(new HashMap<>() {{ + put(HTTP_BATCH_WRITE_INTERVAL_MS, "10000"); + }}); + RequestCoordinatorBackendResult result2 = + batchWriteMgr.requestCoordinatorBackends(tableId2, params2); + assertFalse(result2.isOk()); + assertEquals(TStatusCode.INVALID_ARGUMENT, result2.getStatus().getStatus_code()); + assertEquals(0, batchWriteMgr.numBatchWrites()); + + StreamLoadKvParams params3 = new StreamLoadKvParams(new HashMap<>() {{ + put(HTTP_BATCH_WRITE_INTERVAL_MS, "10000"); + put(HTTP_BATCH_WRITE_PARALLEL, "4"); + put(HTTP_WAREHOUSE, "no_exist"); + }}); + RequestCoordinatorBackendResult result3 = + batchWriteMgr.requestCoordinatorBackends(tableId3, params3); + assertFalse(result3.isOk()); + assertEquals(TStatusCode.INVALID_ARGUMENT, result3.getStatus().getStatus_code()); + assertEquals(0, batchWriteMgr.numBatchWrites()); + } + + @Test + public void testCleanupInactiveBatchWrite() { + StreamLoadKvParams params1 = new StreamLoadKvParams(new HashMap<>() {{ + put(HTTP_BATCH_WRITE_INTERVAL_MS, "10000"); + put(HTTP_BATCH_WRITE_PARALLEL, "4"); + }}); + RequestCoordinatorBackendResult result1 = batchWriteMgr.requestCoordinatorBackends(tableId1, params1); + assertTrue(result1.isOk()); + assertEquals(1, batchWriteMgr.numBatchWrites()); + + StreamLoadKvParams params2 = new StreamLoadKvParams(new HashMap<>() {{ + put(HTTP_BATCH_WRITE_INTERVAL_MS, "100000"); + put(HTTP_BATCH_WRITE_PARALLEL, "4"); + }}); + RequestLoadResult result2 = batchWriteMgr.requestLoad( + tableId4, params2, allNodes.get(0).getId(), allNodes.get(0).getHost()); + assertTrue(result2.isOk()); + assertEquals(2, batchWriteMgr.numBatchWrites()); + + new MockUp() { + + @Mock + public boolean isActive() { + return false; + } + }; + batchWriteMgr.cleanupInactiveBatchWrite(); + assertEquals(0, batchWriteMgr.numBatchWrites()); + } +} diff --git a/fe/fe-core/src/test/java/com/starrocks/load/batchwrite/BatchWriteTestBase.java b/fe/fe-core/src/test/java/com/starrocks/load/batchwrite/BatchWriteTestBase.java new file mode 100644 index 0000000000000..90fd74861a304 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/load/batchwrite/BatchWriteTestBase.java @@ -0,0 +1,108 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.load.batchwrite; + +import com.starrocks.catalog.Database; +import com.starrocks.catalog.MaterializedIndex; +import com.starrocks.catalog.OlapTable; +import com.starrocks.catalog.Partition; +import com.starrocks.catalog.Tablet; +import com.starrocks.qe.ConnectContext; +import com.starrocks.schema.MTable; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.system.ComputeNode; +import com.starrocks.thrift.TTabletCommitInfo; +import com.starrocks.transaction.TransactionStatus; +import com.starrocks.utframe.StarRocksAssert; +import com.starrocks.utframe.UtFrameUtils; +import org.junit.BeforeClass; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public abstract class BatchWriteTestBase { + + protected static final String DB_NAME_1 = "batch_write_load_db_1"; + protected static final String TABLE_NAME_1_1 = "batch_write_load_tbl_1_1"; + protected static final String TABLE_NAME_1_2 = "batch_write_load_tbl_1_2"; + protected static final String DB_NAME_2 = "batch_write_load_db_2"; + protected static final String TABLE_NAME_2_1 = "batch_write_load_tbl_2_1"; + protected static final String TABLE_NAME_2_2 = "batch_write_load_tbl_2_2"; + + protected static ConnectContext connectContext; + protected static StarRocksAssert starRocksAssert; + protected static Database DATABASE_1; + protected static OlapTable TABLE_1_1; + protected static OlapTable TABLE_1_2; + protected static Database DATABASE_2; + protected static OlapTable TABLE_2_1; + protected static OlapTable TABLE_2_2; + + protected static List allNodes; + + @BeforeClass + public static void beforeClass() throws Exception { + UtFrameUtils.createMinStarRocksCluster(); + UtFrameUtils.addMockBackend(10002); + UtFrameUtils.addMockBackend(10003); + UtFrameUtils.addMockBackend(10004); + UtFrameUtils.addMockBackend(10005); + connectContext = UtFrameUtils.createDefaultCtx(); + starRocksAssert = new StarRocksAssert(connectContext); + starRocksAssert.withDatabase(DB_NAME_1) + .useDatabase(DB_NAME_1) + .withTable(new MTable(TABLE_NAME_1_1, Arrays.asList("c0 INT", "c1 STRING"))) + .withTable(new MTable(TABLE_NAME_1_2, Arrays.asList("c0 INT"))); + starRocksAssert.withDatabase(DB_NAME_2) + .useDatabase(DB_NAME_2) + .withTable(new MTable(TABLE_NAME_2_1, Arrays.asList("c0 INT", "c1 STRING"))) + .withTable(new MTable(TABLE_NAME_2_2, Arrays.asList("c0 INT"))); + + DATABASE_1 = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(DB_NAME_1); + TABLE_1_1 = (OlapTable) DATABASE_1.getTable(TABLE_NAME_1_1); + TABLE_1_2 = (OlapTable) DATABASE_1.getTable(TABLE_NAME_1_2); + + DATABASE_2 = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(DB_NAME_2); + TABLE_2_1 = (OlapTable) DATABASE_2.getTable(TABLE_NAME_2_1); + TABLE_2_2 = (OlapTable) DATABASE_2.getTable(TABLE_NAME_2_2); + + allNodes = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getAvailableBackends() + .stream().map(node -> (ComputeNode) node).collect(Collectors.toList()); + Collections.shuffle(allNodes); + } + + protected List buildCommitInfos() { + List commitInfos = new ArrayList<>(); + for (Partition partition : TABLE_1_1.getPartitions()) { + List materializedIndices = + partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL); + for (MaterializedIndex index : materializedIndices) { + for (Tablet tablet : index.getTablets()) { + for (long backendId : tablet.getBackendIds()) { + commitInfos.add(new TTabletCommitInfo(tablet.getId(), backendId)); + } + } + } + } + return commitInfos; + } + + protected TransactionStatus getTxnStatus(String label) { + return GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getLabelStatus(DATABASE_1.getId(), label); + } +} diff --git a/fe/fe-core/src/test/java/com/starrocks/load/batchwrite/CoordinatorBackendAssignerTest.java b/fe/fe-core/src/test/java/com/starrocks/load/batchwrite/CoordinatorBackendAssignerTest.java new file mode 100644 index 0000000000000..bcb6e9471f586 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/load/batchwrite/CoordinatorBackendAssignerTest.java @@ -0,0 +1,201 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.load.batchwrite; + +import com.starrocks.common.Config; +import com.starrocks.system.ComputeNode; +import com.starrocks.utframe.UtFrameUtils; +import org.awaitility.Awaitility; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class CoordinatorBackendAssignerTest extends BatchWriteTestBase { + + private CoordinatorBackendAssignerImpl assigner; + + @Before + public void setup() { + assigner = new CoordinatorBackendAssignerImpl(); + assigner.start(); + } + + @Test + public void testRegisterBatchWrite() { + assigner.registerBatchWrite( + 1L, "wh1", new TableId(DB_NAME_1, TABLE_NAME_1_1), 1); + List nodes1 = assigner.getBackends(1); + assertNotNull(nodes1); + assertEquals(1, nodes1.size()); + assertEquals(1, nodes1.stream().map(ComputeNode::getId).collect(Collectors.toSet()).size()); + + assigner.registerBatchWrite( + 2L, "wh1", new TableId(DB_NAME_1, TABLE_NAME_1_2), 2); + List nodes2 = assigner.getBackends(2); + assertNotNull(nodes2); + assertEquals(2, nodes2.size()); + assertEquals(2, nodes2.stream().map(ComputeNode::getId).collect(Collectors.toSet()).size()); + + assigner.registerBatchWrite( + 3L, "wh2", new TableId(DB_NAME_2, TABLE_NAME_2_1), 4); + List nodes3 = assigner.getBackends(3); + assertNotNull(nodes3); + assertEquals(4, nodes3.size()); + assertEquals(4, nodes3.stream().map(ComputeNode::getId).collect(Collectors.toSet()).size()); + + assigner.registerBatchWrite( + 4L, "wh2", new TableId(DB_NAME_2, TABLE_NAME_2_2), 10); + List nodes4 = assigner.getBackends(4); + assertNotNull(nodes4); + assertEquals(5, nodes4.size()); + assertEquals(5, nodes4.stream().map(ComputeNode::getId).collect(Collectors.toSet()).size()); + } + + @Test + public void testUnRegisterBatchWrite() { + assigner.registerBatchWrite( + 1L, "wh1", new TableId(DB_NAME_1, TABLE_NAME_1_1), 1); + assigner.registerBatchWrite( + 2L, "wh1", new TableId(DB_NAME_1, TABLE_NAME_1_2), 2); + assigner.registerBatchWrite( + 3L, "wh2", new TableId(DB_NAME_2, TABLE_NAME_2_1), 4); + assigner.registerBatchWrite( + 4L, "wh2", new TableId(DB_NAME_2, TABLE_NAME_2_2), 10); + + final AtomicLong expectNumScheduledTask = new AtomicLong(assigner.numScheduledTasks()); + assigner.unregisterBatchWrite(1); + assertNull(assigner.getBackends(1)); + expectNumScheduledTask.incrementAndGet(); + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> assigner.numScheduledTasks() == expectNumScheduledTask.get()); + assertFalse(containsLoadMeta(1, "wh1")); + + assigner.unregisterBatchWrite(2); + assertNull(assigner.getBackends(2)); + expectNumScheduledTask.incrementAndGet(); + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> assigner.numScheduledTasks() == expectNumScheduledTask.get()); + assertFalse(containsLoadMeta(2, "wh1")); + + assigner.unregisterBatchWrite(3); + assertNull(assigner.getBackends(3)); + expectNumScheduledTask.incrementAndGet(); + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> assigner.numScheduledTasks() == expectNumScheduledTask.get()); + assertFalse(containsLoadMeta(3, "wh2")); + + assigner.unregisterBatchWrite(4); + assertNull(assigner.getBackends(4)); + expectNumScheduledTask.incrementAndGet(); + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> assigner.numScheduledTasks() == expectNumScheduledTask.get()); + assertFalse(containsLoadMeta(4, "wh2")); + } + + @Test + public void testDetectUnavailableNodesWhenGetBackends() { + assigner.registerBatchWrite( + 1L, "wh1", new TableId(DB_NAME_1, TABLE_NAME_1_1), 1); + assigner.registerBatchWrite( + 2L, "wh1", new TableId(DB_NAME_1, TABLE_NAME_1_2), 2); + assigner.registerBatchWrite( + 3L, "wh1", new TableId(DB_NAME_2, TABLE_NAME_2_1), 3); + assigner.registerBatchWrite( + 4L, "wh1", new TableId(DB_NAME_2, TABLE_NAME_2_2), 4); + + assigner.disablePeriodicalScheduleForTest(); + List nodes = assigner.getBackends(1); + assertEquals(1, nodes.size()); + ComputeNode notAliveNode = nodes.get(0); + assertTrue(notAliveNode.isAvailable()); + notAliveNode.setAlive(false); + assertFalse(notAliveNode.isAvailable()); + + final AtomicLong expectNumScheduledTask = new AtomicLong(assigner.numScheduledTasks()); + assertTrue(assigner.getBackends(1).isEmpty()); + expectNumScheduledTask.incrementAndGet(); + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> assigner.numScheduledTasks() == expectNumScheduledTask.get()); + + for (int i = 1; i <= 4; i++) { + List newNodes = assigner.getBackends(i); + assertEquals(i, newNodes.size()); + assertFalse(newNodes.stream().map(ComputeNode::getId) + .collect(Collectors.toSet()) + .contains(notAliveNode.getId())); + } + notAliveNode.setAlive(true); + } + + @Test + public void testBalance() throws Exception { + Set backendIds = new HashSet<>(); + for (int i = 1; i <= 100; i++) { + assigner.registerBatchWrite( + i, "wh1", new TableId(DB_NAME_1, TABLE_NAME_1_1), 4); + List nodes = assigner.getBackends(i); + assertEquals(4, nodes.size()); + nodes.forEach(node -> backendIds.add(node.getId())); + } + assertEquals(5, backendIds.size()); + assertTrue(assigner.currentLoadDiffRatio("wh1") < Config.batch_write_balance_diff_ratio); + + assigner.disablePeriodicalScheduleForTest(); + for (int i = 10006; i <= 10010; i++) { + UtFrameUtils.addMockBackend(i); + } + backendIds.clear(); + assigner.periodicalCheckNodeChangedAndBalance(); + assertTrue(assigner.currentLoadDiffRatio("wh1") < Config.batch_write_balance_diff_ratio); + for (int i = 1; i <= 100; i++) { + List nodes = assigner.getBackends(i); + assertEquals(4, nodes.size()); + nodes.forEach(node -> backendIds.add(node.getId())); + } + assertEquals(10, backendIds.size()); + + for (int i = 10006; i <= 10010; i++) { + UtFrameUtils.dropMockBackend(i); + } + } + + private boolean containsLoadMeta(long loadId, String warehouse) { + CoordinatorBackendAssignerImpl.WarehouseScheduleMeta whMeta = assigner.getWarehouseMeta(warehouse); + if (whMeta == null) { + return false; + } + if (whMeta.schedulingLoadMetas.containsKey(loadId)) { + return true; + } + for (CoordinatorBackendAssignerImpl.NodeMeta nodeMeta : whMeta.sortedNodeMetaSet) { + if (nodeMeta.loadIds.contains(loadId)) { + return true; + } + } + return false; + } +} diff --git a/fe/fe-core/src/test/java/com/starrocks/load/batchwrite/IsomorphicBatchWriteTest.java b/fe/fe-core/src/test/java/com/starrocks/load/batchwrite/IsomorphicBatchWriteTest.java new file mode 100644 index 0000000000000..6de5318df75d6 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/load/batchwrite/IsomorphicBatchWriteTest.java @@ -0,0 +1,304 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.load.batchwrite; + +import com.google.common.collect.ImmutableMap; +import com.starrocks.load.loadv2.LoadJob; +import com.starrocks.load.streamload.StreamLoadHttpHeader; +import com.starrocks.load.streamload.StreamLoadInfo; +import com.starrocks.load.streamload.StreamLoadKvParams; +import com.starrocks.qe.DefaultCoordinator; +import com.starrocks.server.WarehouseManager; +import com.starrocks.system.ComputeNode; +import com.starrocks.task.LoadEtlTask; +import com.starrocks.thrift.FrontendServiceVersion; +import com.starrocks.thrift.TReportExecStatusParams; +import com.starrocks.thrift.TStatus; +import com.starrocks.thrift.TStatusCode; +import com.starrocks.thrift.TTabletFailInfo; +import com.starrocks.transaction.TransactionStatus; +import mockit.Expectations; +import mockit.Mocked; +import org.jetbrains.annotations.NotNull; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public class IsomorphicBatchWriteTest extends BatchWriteTestBase { + + @Mocked + private CoordinatorBackendAssigner assigner; + private TestThreadPoolExecutor executor; + private int parallel; + + private IsomorphicBatchWrite load; + + @Before + public void setup() throws Exception { + executor = new TestThreadPoolExecutor(); + parallel = 4; + assertTrue("Number nodes " + allNodes.size(), parallel < allNodes.size()); + Map map = new HashMap<>(); + map.put(StreamLoadHttpHeader.HTTP_FORMAT, "json"); + map.put(StreamLoadHttpHeader.HTTP_ENABLE_BATCH_WRITE, "true"); + map.put(StreamLoadHttpHeader.HTTP_BATCH_WRITE_ASYNC, "true"); + StreamLoadKvParams params = new StreamLoadKvParams(map); + StreamLoadInfo streamLoadInfo = + StreamLoadInfo.fromHttpStreamLoadRequest(null, -1, Optional.empty(), params); + load = new IsomorphicBatchWrite( + 1, + new TableId(DB_NAME_1, TABLE_NAME_1_1), + WarehouseManager.DEFAULT_WAREHOUSE_NAME, + streamLoadInfo, + 1000, + parallel, + params.toMap(), + connectContext, + assigner, + executor); + } + + @Test + public void testRequestBackendsSuccess() { + List nodes = allNodes.subList(0, parallel); + new Expectations() { + { + assigner.getBackends(1); + result = nodes; + } + }; + RequestCoordinatorBackendResult requestResult = load.requestCoordinatorBackends(); + assertTrue(requestResult.isOk()); + assertSame(nodes, requestResult.getValue()); + } + + @Test + public void testRequestBackendsWithEmptyResult() { + new Expectations() { + { + assigner.getBackends(1); + result = Collections.emptyList(); + } + }; + RequestCoordinatorBackendResult requestResult = load.requestCoordinatorBackends(); + assertFalse(requestResult.isOk()); + assertEquals(TStatusCode.SERVICE_UNAVAILABLE, requestResult.getStatus().getStatus_code()); + } + + @Test + public void testRequestBackendsWithException() { + new Expectations() { + { + assigner.getBackends(1); + result = new Exception("artificial failure"); + } + }; + RequestCoordinatorBackendResult requestResult = load.requestCoordinatorBackends(); + assertFalse(requestResult.isOk()); + assertEquals(TStatusCode.INTERNAL_ERROR, requestResult.getStatus().getStatus_code()); + } + + @Test + public void testRequestLoadFromCoordinatorBackend() throws Exception { + List nodes = allNodes.subList(0, parallel); + new Expectations() { + { + assigner.getBackends(1); + result = nodes; + } + }; + RequestLoadResult result1 = load.requestLoad(nodes.get(0).getId(), nodes.get(0).getHost()); + assertTrue(result1.isOk()); + String label = result1.getValue(); + assertNotNull(label); + assertEquals(1, load.numRunningLoads()); + LoadExecutor loadExecutor = load.getLoadExecutor(label); + assertNotNull(loadExecutor); + assertEquals(nodes.stream().map(ComputeNode::getId).collect(Collectors.toSet()), + loadExecutor.getCoordinatorBackendIds()); + + RequestLoadResult result2 = load.requestLoad(nodes.get(1).getId(), nodes.get(1).getHost()); + assertTrue(result2.isOk()); + assertEquals(label, result2.getValue()); + + executor.manualRun(loadExecutor); + + assertEquals(TransactionStatus.VISIBLE, getTxnStatus(label)); + assertNull(load.getLoadExecutor(label)); + assertEquals(0, load.numRunningLoads()); + } + + @Test + public void testRequestLoadFromNoneCoordinatorBackend() throws Exception { + List nodes = allNodes.subList(0, parallel); + new Expectations() { + { + assigner.getBackends(1); + result = nodes; + } + }; + + // Request from coordinator backend + RequestLoadResult result1 = load.requestLoad(nodes.get(0).getId(), nodes.get(0).getHost()); + assertTrue(result1.isOk()); + String label1 = result1.getValue(); + assertNotNull(label1); + assertEquals(1, load.numRunningLoads()); + LoadExecutor loadExecutor1 = load.getLoadExecutor(label1); + assertNotNull(loadExecutor1); + assertEquals(nodes.stream().map(ComputeNode::getId).collect(Collectors.toSet()), + loadExecutor1.getCoordinatorBackendIds()); + + RequestLoadResult result2 = load.requestLoad(allNodes.get(parallel).getId(), allNodes.get(parallel).getHost()); + assertTrue(result2.isOk()); + String label2 = result2.getValue(); + assertNotNull(label2); + assertEquals(2, load.numRunningLoads()); + assertNotEquals(label1, label2); + LoadExecutor loadExecutor2 = load.getLoadExecutor(label2); + assertNotNull(loadExecutor2); + assertNotSame(loadExecutor1, loadExecutor2); + Set expectNodeIds = nodes.stream().map(ComputeNode::getId).collect(Collectors.toSet()); + expectNodeIds.add(allNodes.get(parallel).getId()); + assertEquals(expectNodeIds, loadExecutor2.getCoordinatorBackendIds()); + + executor.manualRun(loadExecutor1); + executor.manualRun(loadExecutor2); + + assertEquals(TransactionStatus.VISIBLE, getTxnStatus(label1)); + assertEquals(TransactionStatus.VISIBLE, getTxnStatus(label2)); + assertEquals(0, load.numRunningLoads()); + } + + @Test + public void testRequestLoadFromUnavailableBackend() { + List nodes = allNodes.subList(0, parallel); + new Expectations() { + { + assigner.getBackends(1); + result = nodes; + } + }; + + RequestLoadResult result = load.requestLoad(Integer.MAX_VALUE, "127.0.0.1"); + assertFalse(result.isOk()); + assertEquals(TStatusCode.SERVICE_UNAVAILABLE, result.getStatus().getStatus_code()); + } + + @Test + public void testExecuteLoadFail() { + List nodes = allNodes.subList(0, parallel); + new Expectations() { + { + assigner.getBackends(1); + result = nodes; + } + }; + + executor.setThrowException(true); + RequestLoadResult result = load.requestLoad(nodes.get(0).getId(), nodes.get(0).getHost()); + assertFalse(result.isOk()); + assertEquals(TStatusCode.INTERNAL_ERROR, result.getStatus().getStatus_code()); + assertEquals(0, load.numRunningLoads()); + } + + private class TestThreadPoolExecutor implements Executor { + + private final Set pendingRunnable; + private boolean throwException; + + public TestThreadPoolExecutor() { + this.pendingRunnable = new HashSet<>(); + this.throwException = false; + } + + public boolean isThrowException() { + return throwException; + } + + public void setThrowException(boolean throwException) { + this.throwException = throwException; + } + + @Override + public void execute(@NotNull Runnable command) { + if (throwException) { + throw new RejectedExecutionException("artificial failure"); + } + + pendingRunnable.add(command); + } + + public void manualRun(Runnable runnable) throws Exception { + boolean exist = pendingRunnable.remove(runnable); + if (!exist) { + return; + } + + if (!(runnable instanceof LoadExecutor)) { + runnable.run(); + return; + } + + LoadExecutor loadExecutor = (LoadExecutor) runnable; + Thread thread = new Thread(loadExecutor); + thread.start(); + while (loadExecutor.getTimeTrace().joinPlanTimeMs.get() <= 0) { + Thread.sleep(10); + } + DefaultCoordinator coordinator = (DefaultCoordinator) loadExecutor.getCoordinator(); + assertNotNull(coordinator); + coordinator.getExecutionDAG().getExecutions().forEach(execution -> { + int indexInJob = execution.getIndexInJob(); + TReportExecStatusParams request = new TReportExecStatusParams(FrontendServiceVersion.V1); + request.setBackend_num(indexInJob) + .setDone(true) + .setStatus(new TStatus(TStatusCode.OK)) + .setFragment_instance_id(execution.getInstanceId()); + request.setCommitInfos(buildCommitInfos()); + TTabletFailInfo failInfo = new TTabletFailInfo(); + request.setFailInfos(Collections.singletonList(failInfo)); + Map currLoadCounters = ImmutableMap.of( + LoadEtlTask.DPP_NORMAL_ALL, String.valueOf(10), + LoadEtlTask.DPP_ABNORMAL_ALL, String.valueOf(0), + LoadJob.UNSELECTED_ROWS, String.valueOf(0), + LoadJob.LOADED_BYTES, String.valueOf(40) + ); + request.setLoad_counters(currLoadCounters); + coordinator.updateFragmentExecStatus(request); + }); + thread.join(); + } + } +} diff --git a/fe/fe-core/src/test/java/com/starrocks/load/batchwrite/LoadExecutorTest.java b/fe/fe-core/src/test/java/com/starrocks/load/batchwrite/LoadExecutorTest.java new file mode 100644 index 0000000000000..9ab0ed7e9e7b5 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/load/batchwrite/LoadExecutorTest.java @@ -0,0 +1,358 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.load.batchwrite; + +import com.google.common.collect.ImmutableMap; +import com.starrocks.analysis.DescriptorTable; +import com.starrocks.catalog.OlapTable; +import com.starrocks.common.LoadException; +import com.starrocks.common.Status; +import com.starrocks.common.util.DebugUtil; +import com.starrocks.common.util.UUIDUtil; +import com.starrocks.load.streamload.StreamLoadHttpHeader; +import com.starrocks.load.streamload.StreamLoadInfo; +import com.starrocks.load.streamload.StreamLoadKvParams; +import com.starrocks.planner.PlanFragment; +import com.starrocks.planner.ScanNode; +import com.starrocks.planner.StreamLoadPlanner; +import com.starrocks.qe.ConnectContext; +import com.starrocks.qe.scheduler.Coordinator; +import com.starrocks.schema.MTable; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.sql.LoadPlanner; +import com.starrocks.thrift.TDescriptorTable; +import com.starrocks.thrift.TNetworkAddress; +import com.starrocks.thrift.TStatusCode; +import com.starrocks.thrift.TUniqueId; +import com.starrocks.transaction.TransactionStatus; +import com.starrocks.utframe.StarRocksAssert; +import com.starrocks.utframe.UtFrameUtils; +import mockit.Expectations; +import mockit.Mocked; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public class LoadExecutorTest extends BatchWriteTestBase { + + private String label; + private TUniqueId loadId; + StreamLoadKvParams kvParams; + private StreamLoadInfo streamLoadInfo; + private TestLoadExecuteCallback loadExecuteCallback; + + @Mocked + private Coordinator coordinator; + private TestCoordinatorFactor coordinatorFactory; + + @BeforeClass + public static void beforeClass() throws Exception { + UtFrameUtils.createMinStarRocksCluster(); + UtFrameUtils.addMockBackend(10002); + UtFrameUtils.addMockBackend(10003); + UtFrameUtils.addMockBackend(10004); + connectContext = UtFrameUtils.createDefaultCtx(); + starRocksAssert = new StarRocksAssert(connectContext); + starRocksAssert.withDatabase(DB_NAME_1) + .useDatabase(DB_NAME_1) + .withTable(new MTable(TABLE_NAME_1_1, Arrays.asList("c0 INT", "c1 STRING"))); + DATABASE_1 = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(DB_NAME_1); + TABLE_1_1 = (OlapTable) DATABASE_1.getTable(TABLE_NAME_1_1); + } + + @Before + public void setup() throws Exception { + label = "batch_write_" + DebugUtil.printId(UUIDUtil.toTUniqueId(UUID.randomUUID())); + loadId = UUIDUtil.toTUniqueId(UUID.randomUUID()); + + Map map = new HashMap<>(); + map.put(StreamLoadHttpHeader.HTTP_FORMAT, "json"); + map.put(StreamLoadHttpHeader.HTTP_ENABLE_BATCH_WRITE, "true"); + map.put(StreamLoadHttpHeader.HTTP_BATCH_WRITE_ASYNC, "true"); + kvParams = new StreamLoadKvParams(map); + streamLoadInfo = StreamLoadInfo.fromHttpStreamLoadRequest(null, -1, Optional.empty(), kvParams); + loadExecuteCallback = new TestLoadExecuteCallback(); + coordinatorFactory = new TestCoordinatorFactor(coordinator); + } + + @Test + public void testLoadSuccess() throws Exception { + LoadExecutor executor = new LoadExecutor( + new TableId(DB_NAME_1, TABLE_NAME_1_1), + label, + loadId, + streamLoadInfo, + 1000, + ImmutableMap.builder().putAll(kvParams.toMap()).build(), + connectContext, + new HashSet<>(Arrays.asList(10002L, 10003L)), + coordinatorFactory, + loadExecuteCallback + ); + + new Expectations() { + { + coordinator.join((anyInt)); + result = true; + coordinator.getExecStatus(); + result = new Status(); + coordinator.getCommitInfos(); + result = buildCommitInfos(); + } + }; + + executor.run(); + assertNull(executor.getFailure()); + assertEquals(1, loadExecuteCallback.getFinishedLoads().size()); + assertEquals(label, loadExecuteCallback.getFinishedLoads().get(0)); + TransactionStatus txnStatus = + GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getLabelStatus(DATABASE_1.getId(), label); + assertEquals(TransactionStatus.VISIBLE, txnStatus); + } + + @Test + public void testPlanExecuteFail() { + LoadExecutor executor = new LoadExecutor( + new TableId(DB_NAME_1, TABLE_NAME_1_1), + label, + loadId, + streamLoadInfo, + 1000, + ImmutableMap.builder().putAll(kvParams.toMap()).build(), + connectContext, + new HashSet<>(Arrays.asList(10002L, 10003L)), + coordinatorFactory, + loadExecuteCallback + ); + + Status status = new Status(TStatusCode.INTERNAL_ERROR, "artificial failure"); + new Expectations() { + { + coordinator.join((anyInt)); + result = true; + coordinator.getExecStatus(); + result = status; + } + }; + + Exception expectException = new LoadException( + String.format("Failed to execute load, status code: %s, error message: %s", + status.getErrorCodeString(), status.getErrorMsg())); + testLoadFailBase(executor, expectException, TransactionStatus.ABORTED); + } + + @Test + public void testPlanExecuteTimeout() { + LoadExecutor executor = new LoadExecutor( + new TableId(DB_NAME_1, TABLE_NAME_1_1), + label, + loadId, + streamLoadInfo, + 1000, + ImmutableMap.builder().putAll(kvParams.toMap()).build(), + connectContext, + new HashSet<>(Arrays.asList(10002L, 10003L)), + coordinatorFactory, + loadExecuteCallback + ); + + new Expectations() { + { + coordinator.join((anyInt)); + result = false; + } + }; + + Exception expectException = new LoadException("Timeout to execute load"); + testLoadFailBase(executor, expectException, TransactionStatus.ABORTED); + } + + @Test + public void testTableDoesNotExist() { + String fakeTableName = TABLE_NAME_1_1 + "_fake"; + LoadExecutor executor = new LoadExecutor( + new TableId(DB_NAME_1, fakeTableName), + label, + loadId, + streamLoadInfo, + 1000, + ImmutableMap.builder().putAll(kvParams.toMap()).build(), + connectContext, + new HashSet<>(Arrays.asList(10002L, 10003L)), + coordinatorFactory, + loadExecuteCallback + ); + + Exception expectException = new LoadException( + String.format("Table [%s.%s] does not exist", DB_NAME_1, fakeTableName)); + testLoadFailBase(executor, expectException, TransactionStatus.UNKNOWN); + } + + + private void testLoadFailBase( + LoadExecutor executor, Exception expectedException, TransactionStatus expectedTxnStatus) { + executor.run(); + Throwable throwable = executor.getFailure(); + assertNotNull(throwable); + assertSame(expectedException.getClass(), throwable.getClass()); + assertTrue(throwable.getMessage().contains(expectedException.getMessage())); + assertEquals(1, loadExecuteCallback.getFinishedLoads().size()); + assertEquals(label, loadExecuteCallback.getFinishedLoads().get(0)); + TransactionStatus txnStatus = + GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getLabelStatus(DATABASE_1.getId(), label); + assertEquals(expectedTxnStatus, txnStatus); + } + + @Test + public void testIsActive() { + LoadExecutor executor = new LoadExecutor( + new TableId(DB_NAME_1, TABLE_NAME_1_1), + label, + loadId, + streamLoadInfo, + 1000, + ImmutableMap.builder().putAll(kvParams.toMap()).build(), + connectContext, + new HashSet<>(Arrays.asList(10002L, 10003L)), + coordinatorFactory, + loadExecuteCallback + ); + + assertTrue(executor.isActive()); + + new Expectations() { + { + coordinator.join((anyInt)); + result = true; + coordinator.getExecStatus(); + result = new Status(TStatusCode.INTERNAL_ERROR, "artificial failure"); + } + }; + executor.run(); + assertFalse(executor.isActive()); + } + + @Test + public void testContainCoordinatorBackend() { + LoadExecutor executor = new LoadExecutor( + new TableId(DB_NAME_1, TABLE_NAME_1_1), + label, + loadId, + streamLoadInfo, + 1000, + ImmutableMap.builder().putAll(kvParams.toMap()).build(), + connectContext, + new HashSet<>(Arrays.asList(10002L, 10003L)), + coordinatorFactory, + loadExecuteCallback + ); + assertFalse(executor.containCoordinatorBackend(10001L)); + assertTrue(executor.containCoordinatorBackend(10002L)); + } + + private static class TestLoadExecuteCallback implements LoadExecuteCallback { + + private final List finishedLoads = new ArrayList<>(); + + public List getFinishedLoads() { + return finishedLoads; + } + + @Override + public void finishLoad(String label) { + finishedLoads.add(label); + } + } + + private static class TestCoordinatorFactor implements Coordinator.Factory { + + private final Coordinator coordinator; + + public TestCoordinatorFactor(Coordinator coordinator) { + this.coordinator = coordinator; + } + + @Override + public Coordinator createStreamLoadScheduler(LoadPlanner loadPlanner) { + return coordinator; + } + + @Override + public Coordinator createQueryScheduler(ConnectContext context, List fragments, + List scanNodes, TDescriptorTable descTable) { + return coordinator; + } + + @Override + public Coordinator createInsertScheduler(ConnectContext context, List fragments, + List scanNodes, TDescriptorTable descTable) { + return coordinator; + } + + @Override + public Coordinator createBrokerLoadScheduler(LoadPlanner loadPlanner) { + return coordinator; + } + + + @Override + public Coordinator createSyncStreamLoadScheduler(StreamLoadPlanner planner, TNetworkAddress address) { + return coordinator; + } + + @Override + public Coordinator createNonPipelineBrokerLoadScheduler(Long jobId, TUniqueId queryId, + DescriptorTable descTable, List fragments, + List scanNodes, String timezone, + long startTime, Map sessionVariables, + ConnectContext context, long execMemLimit, + long warehouseId) { + return coordinator; + } + + @Override + public Coordinator createBrokerExportScheduler(Long jobId, TUniqueId queryId, DescriptorTable descTable, + List fragments, List scanNodes, + String timezone, long startTime, + Map sessionVariables, long execMemLimit, + long warehouseId) { + return coordinator; + } + + @Override + public Coordinator createRefreshDictionaryCacheScheduler(ConnectContext context, TUniqueId queryId, + DescriptorTable descTable, + List fragments, + List scanNodes) { + return coordinator; + } + } +} diff --git a/fe/fe-core/src/test/java/com/starrocks/load/streamload/StreamLoadKvParamsTest.java b/fe/fe-core/src/test/java/com/starrocks/load/streamload/StreamLoadKvParamsTest.java index 76809bbb0c43e..7c5599fda3169 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/streamload/StreamLoadKvParamsTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/streamload/StreamLoadKvParamsTest.java @@ -24,9 +24,13 @@ import java.util.Map; import static com.starrocks.http.rest.RestBaseAction.WAREHOUSE_KEY; +import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_BATCH_WRITE_ASYNC; +import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_BATCH_WRITE_INTERVAL_MS; +import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_BATCH_WRITE_PARALLEL; import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_COLUMNS; import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_COLUMN_SEPARATOR; import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_COMPRESSION; +import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_ENABLE_BATCH_WRITE; import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_ENABLE_REPLICATED_STORAGE; import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_ENCLOSE; import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_ESCAPE; @@ -300,6 +304,30 @@ private StreamLoadParams buildParams(String key, String value) { return new StreamLoadKvParams(Collections.singletonMap(key, value)); } + @Test + public void testBatchWrite() { + { + StreamLoadKvParams params = new StreamLoadKvParams(new HashMap<>()); + assertFalse(params.getEnableBatchWrite().isPresent()); + assertFalse(params.getBatchWriteAsync().isPresent()); + assertFalse(params.getBatchWriteIntervalMs().isPresent()); + assertFalse(params.getBatchWriteParallel().isPresent()); + } + + { + Map map = new HashMap<>(); + map.put(HTTP_ENABLE_BATCH_WRITE, "true"); + map.put(HTTP_BATCH_WRITE_ASYNC, "true"); + map.put(HTTP_BATCH_WRITE_INTERVAL_MS, "1000"); + map.put(HTTP_BATCH_WRITE_PARALLEL, "4"); + StreamLoadKvParams params = new StreamLoadKvParams(map); + assertEquals(true, params.getEnableBatchWrite().orElse(null)); + assertEquals(true, params.getBatchWriteAsync().orElse(null)); + assertEquals(Integer.valueOf(1000), params.getBatchWriteIntervalMs().orElse(null)); + assertEquals(Integer.valueOf(4), params.getBatchWriteParallel().orElse(null)); + } + } + @Test public void testHashCodeAndEquals() { Map map1 = new HashMap<>(); diff --git a/fe/fe-core/src/test/java/com/starrocks/planner/StreamLoadPlannerTest.java b/fe/fe-core/src/test/java/com/starrocks/planner/StreamLoadPlannerTest.java index 46166d7ebdf72..6604295548947 100644 --- a/fe/fe-core/src/test/java/com/starrocks/planner/StreamLoadPlannerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/planner/StreamLoadPlannerTest.java @@ -73,6 +73,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.UUID; import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_PARTIAL_UPDATE_MODE; @@ -215,7 +216,7 @@ public void testPartialUpdateMode() throws UserException { Collections.singletonMap(HTTP_PARTIAL_UPDATE_MODE, "column")); UUID uuid = UUID.randomUUID(); TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); - StreamLoadInfo streamLoadInfo2 = StreamLoadInfo.fromHttpStreamLoadRequest(loadId, 100, 100, param); + StreamLoadInfo streamLoadInfo2 = StreamLoadInfo.fromHttpStreamLoadRequest(loadId, 100, Optional.of(100), param); RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(); StreamLoadInfo streamLoadInfo3 = StreamLoadInfo.fromRoutineLoadJob(routineLoadJob); } diff --git a/fe/fe-core/src/test/java/com/starrocks/service/FrontendServiceImplTest.java b/fe/fe-core/src/test/java/com/starrocks/service/FrontendServiceImplTest.java index 0d6ada7448397..4c380d1b578a5 100644 --- a/fe/fe-core/src/test/java/com/starrocks/service/FrontendServiceImplTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/service/FrontendServiceImplTest.java @@ -41,6 +41,8 @@ import com.starrocks.sql.ast.PartitionDesc; import com.starrocks.sql.ast.SingleItemListPartitionDesc; import com.starrocks.thrift.TAuthInfo; +import com.starrocks.thrift.TBatchWriteRequest; +import com.starrocks.thrift.TBatchWriteResult; import com.starrocks.thrift.TColumnDef; import com.starrocks.thrift.TCreatePartitionRequest; import com.starrocks.thrift.TCreatePartitionResult; @@ -100,6 +102,12 @@ import java.util.UUID; import java.util.stream.Collectors; +import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_BATCH_WRITE_ASYNC; +import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_BATCH_WRITE_INTERVAL_MS; +import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_BATCH_WRITE_PARALLEL; +import static com.starrocks.load.streamload.StreamLoadHttpHeader.HTTP_ENABLE_BATCH_WRITE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; @@ -1196,6 +1204,26 @@ public void testStreamLoadPutTimeout() throws UserException, TException, LockTim Assert.assertEquals(TStatusCode.TIMEOUT, result.status.status_code); } + @Test + public void testRequestBatchWrite() throws Exception { + FrontendServiceImpl impl = new FrontendServiceImpl(exeEnv); + TBatchWriteRequest request = new TBatchWriteRequest(); + request.setDb("test"); + request.setTbl("site_access_hour"); + request.setUser("root"); + request.setPasswd(""); + request.setBackend_id(10001); + request.setBackend_host("127.0.0.1"); + request.putToParams(HTTP_ENABLE_BATCH_WRITE, "true"); + request.putToParams(HTTP_BATCH_WRITE_ASYNC, "true"); + request.putToParams(HTTP_BATCH_WRITE_INTERVAL_MS, "1000"); + request.putToParams(HTTP_BATCH_WRITE_PARALLEL, "4"); + + TBatchWriteResult result = impl.requestBatchWrite(request); + assertEquals(TStatusCode.OK, result.getStatus().getStatus_code()); + assertNotNull(result.getLabel()); + } + @Test public void testMetaNotFound() throws UserException { FrontendServiceImpl impl = spy(new FrontendServiceImpl(exeEnv)); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index a316ea5d61b59..f1d0a009d5d8c 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -962,6 +962,23 @@ struct TStreamLoadPutResult { 2: optional InternalService.TExecPlanFragmentParams params } +struct TBatchWriteRequest { + 1: optional string db + 2: optional string tbl + 3: optional string user + 4: optional string passwd + 5: optional string user_ip + 6: optional i64 backend_id + 7: optional string backend_host; + 8: optional map params; +} + +struct TBatchWriteResult { + 1: optional Status.TStatus status; + // only valid for success + 2: optional string label; +} + struct TKafkaRLTaskProgress { 1: required map partitionCmtOffset 2: optional map partitionCmtOffsetTimestamp @@ -1903,6 +1920,8 @@ service FrontendService { TStreamLoadPutResult streamLoadPut(1: TStreamLoadPutRequest request) + TBatchWriteResult requestBatchWrite(1: TBatchWriteRequest request) + Status.TStatus snapshotLoaderReport(1: TSnapshotLoaderReportRequest request) TRefreshTableResponse refreshTable(1:TRefreshTableRequest request) diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 1cc56182d8702..008731902f610 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -280,6 +280,10 @@ struct TBrokerScanRange { 3: required list broker_addresses // used for channel stream load only 4: optional i32 channel_id + // available when this is a stream load in batch write mode + 5: optional bool enable_batch_write + 6: optional i32 batch_write_interval_ms + 7: optional map batch_write_parameters; } // Es scan range