Skip to content

Commit

Permalink
[Feature](batch write part1) Implement the basic workflow on FE side
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Oct 20, 2024
1 parent 5dd0cc5 commit cc0f423
Show file tree
Hide file tree
Showing 33 changed files with 3,495 additions and 70 deletions.
14 changes: 14 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -3268,4 +3268,18 @@ public class Config extends ConfigBase {

@ConfField(mutable = false)
public static int lake_remove_table_thread_num = 4;

@ConfField(mutable = true)
public static int batch_write_gc_check_interval_ms = 60000;

@ConfField(mutable = true)
public static int batch_write_idle_ms = 3600000;

@ConfField(mutable = true)
public static int batch_write_be_assigner_schedule_interval_ms = 5000;

@ConfField(mutable = true)
public static double batch_write_balance_diff_ratio = 0.1;

public static int batch_write_load_executor_threads_num = 4096;
}
53 changes: 51 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/http/rest/LoadAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
import com.starrocks.http.BaseRequest;
import com.starrocks.http.BaseResponse;
import com.starrocks.http.IllegalArgException;
import com.starrocks.load.batchwrite.RequestCoordinatorBackendResult;
import com.starrocks.load.batchwrite.TableId;
import com.starrocks.load.streamload.StreamLoadHttpHeader;
import com.starrocks.load.streamload.StreamLoadKvParams;
import com.starrocks.privilege.AccessDeniedException;
import com.starrocks.privilege.PrivilegeType;
import com.starrocks.qe.ConnectContext;
Expand All @@ -59,6 +63,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

public class LoadAction extends RestBaseAction {
private static final Logger LOG = LogManager.getLogger(LoadAction.class);
Expand Down Expand Up @@ -95,6 +100,12 @@ public void executeWithoutPasswordInternal(BaseRequest request, BaseResponse res
throw new DdlException("There is no 100-continue header");
}

boolean enableBatchWrite = "true".equalsIgnoreCase(
request.getRequest().headers().get(StreamLoadHttpHeader.HTTP_ENABLE_BATCH_WRITE));
if (enableBatchWrite && redirectToLeader(request, response)) {
return;
}

String dbName = request.getSingleParameter(DB_KEY);
if (Strings.isNullOrEmpty(dbName)) {
throw new DdlException("No database selected.");
Expand All @@ -105,11 +116,20 @@ public void executeWithoutPasswordInternal(BaseRequest request, BaseResponse res
throw new DdlException("No table selected.");
}

String label = request.getRequest().headers().get(LABEL_KEY);

Authorizer.checkTableAction(ConnectContext.get().getCurrentUserIdentity(), ConnectContext.get().getCurrentRoleIds(),
dbName, tableName, PrivilegeType.INSERT);

if (!enableBatchWrite) {
processNormalStreamLoad(request, response, dbName, tableName);
} else {
processBatchWriteStreamLoad(request, response, dbName, tableName);
}
}

private void processNormalStreamLoad(
BaseRequest request, BaseResponse response, String dbName, String tableName) throws DdlException {
String label = request.getRequest().headers().get(LABEL_KEY);

String warehouseName = WarehouseManager.DEFAULT_WAREHOUSE_NAME;
if (request.getRequest().headers().contains(WAREHOUSE_KEY)) {
warehouseName = request.getRequest().headers().get(WAREHOUSE_KEY);
Expand Down Expand Up @@ -147,5 +167,34 @@ public void executeWithoutPasswordInternal(BaseRequest request, BaseResponse res
redirectAddr.toString(), dbName, tableName, label, warehouseName);
redirectTo(request, response, redirectAddr);
}

private void processBatchWriteStreamLoad(
BaseRequest request, BaseResponse response, String dbName, String tableName) throws DdlException {
TableId tableId = new TableId(dbName, tableName);
StreamLoadKvParams params = StreamLoadKvParams.fromHttpHeaders(request.getRequest().headers());
RequestCoordinatorBackendResult result = GlobalStateMgr.getCurrentState()
.getBatchWriteMgr().requestCoordinatorBackends(tableId, params);
if (!result.isOk()) {
BatchWriteResponseResult responseResult = new BatchWriteResponseResult(
result.getStatus().status_code.name(), ActionStatus.FAILED,
result.getStatus().error_msgs.get(0));
sendResult(request, response, responseResult);
return;
}

List<ComputeNode> nodes = result.getValue();
int index = ThreadLocalRandom.current().nextInt(nodes.size());
ComputeNode node = nodes.get(index);
TNetworkAddress redirectAddr = new TNetworkAddress(node.getHost(), node.getHttpPort());
LOG.info("redirect batch write to destination={}, db: {}, tbl: {}", redirectAddr.toString(), dbName, tableName);
redirectTo(request, response, redirectAddr);
}

public static class BatchWriteResponseResult extends RestBaseResult {

public BatchWriteResponseResult(String code, ActionStatus status, String msg) {
super(code, status, msg);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ public RestBaseResult(String msg) {
this.message = msg;
}

public RestBaseResult(String code, ActionStatus status, String msg) {
this.code = code;
this.status = status;
this.msg = msg;
this.message = msg;
}

public static RestBaseResult getOk() {
return OK;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.load.batchwrite;

import com.starrocks.load.streamload.StreamLoadKvParams;

import java.util.Objects;

/**
* Represents an identifier for a batch write.
*/
public class BatchWriteId {

/** The ID of the table associated with the batch write. */
private final TableId tableId;

/** The parameters for the stream load associated with the batch write. */
private final StreamLoadKvParams params;


public BatchWriteId(TableId tableId, StreamLoadKvParams params) {
this.tableId = tableId;
this.params = params;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BatchWriteId that = (BatchWriteId) o;
return Objects.equals(tableId, that.tableId) && Objects.equals(params, that.params);
}

@Override
public int hashCode() {
return Objects.hash(tableId, params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.load.batchwrite;

import com.starrocks.common.Config;
import com.starrocks.common.Pair;
import com.starrocks.common.ThreadPoolManager;
import com.starrocks.common.util.FrontendDaemon;
import com.starrocks.load.streamload.StreamLoadInfo;
import com.starrocks.load.streamload.StreamLoadKvParams;
import com.starrocks.qe.ConnectContext;
import com.starrocks.thrift.TStatus;
import com.starrocks.thrift.TStatusCode;
import org.apache.arrow.util.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import static com.starrocks.server.WarehouseManager.DEFAULT_WAREHOUSE_NAME;

/**
* Manages batch write operations.
*/
public class BatchWriteMgr extends FrontendDaemon {

private static final Logger LOG = LoggerFactory.getLogger(BatchWriteMgr.class);

// An atomic counter used to generate unique ids for isomorphic batch writes.
private final AtomicLong idGenerator;

// A read-write lock to ensure thread-safe access to the loadMap.
private final ReentrantReadWriteLock lock;

// A concurrent map that stores IsomorphicBatchWrite instances, keyed by BatchWriteId.
private final ConcurrentHashMap<BatchWriteId, IsomorphicBatchWrite> isomorphicBatchWriteMap;

// An assigner that manages the assignment of coordinator backends.
private final CoordinatorBackendAssigner coordinatorBackendAssigner;

// A thread pool executor for executing batch write tasks.
private final ThreadPoolExecutor threadPoolExecutor;

public BatchWriteMgr() {
super("group-commit-mgr", Config.batch_write_gc_check_interval_ms);
this.idGenerator = new AtomicLong(0L);
this.isomorphicBatchWriteMap = new ConcurrentHashMap<>();
this.lock = new ReentrantReadWriteLock();
this.coordinatorBackendAssigner = new CoordinatorBackendAssignerImpl();
this.threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(
Config.batch_write_load_executor_threads_num, "group-commit-load", true);
}

@Override
public void start() {
super.start();
this.coordinatorBackendAssigner.start();
LOG.info("Start batch write manager");
}

@Override
protected void runAfterCatalogReady() {
setInterval(Config.batch_write_gc_check_interval_ms);
cleanupInactiveBatchWrite();
}

/**
* Requests coordinator backends for the specified table and load parameters.
*
* @param tableId The ID of the table for which the coordinator backends are requested.
* @param params The parameters for the stream load.
* @return A RequestCoordinatorBackendResult containing the status of the operation and the coordinator backends.
*/
public RequestCoordinatorBackendResult requestCoordinatorBackends(TableId tableId, StreamLoadKvParams params) {
lock.readLock().lock();
try {
Pair<TStatus, IsomorphicBatchWrite> result = getOrCreateTableBatchWrite(tableId, params);
if (result.first.getStatus_code() != TStatusCode.OK) {
return new RequestCoordinatorBackendResult(result.first, null);
}
return result.second.requestCoordinatorBackends();
} finally {
lock.readLock().unlock();
}
}

/**
* Requests a load operation for the specified table and load parameters.
*
* @param tableId The ID of the table for which the load is requested.
* @param params The parameters for the stream load.
* @param backendId The id of the backend where the request is from.
* @param backendHost The host of the backend where the request is from.
* @return A RequestLoadResult containing the status of the operation and the load result.
*/
public RequestLoadResult requestLoad(
TableId tableId, StreamLoadKvParams params, long backendId, String backendHost) {
lock.readLock().lock();
try {
Pair<TStatus, IsomorphicBatchWrite> result = getOrCreateTableBatchWrite(tableId, params);
if (result.first.getStatus_code() != TStatusCode.OK) {
return new RequestLoadResult(result.first, null);
}
return result.second.requestLoad(backendId, backendHost);
} finally {
lock.readLock().unlock();
}
}

/**
* Cleans up inactive batch writes to release resources.
*/
@VisibleForTesting
void cleanupInactiveBatchWrite() {
lock.writeLock().lock();
try {
List<Map.Entry<BatchWriteId, IsomorphicBatchWrite>> loads = isomorphicBatchWriteMap.entrySet().stream()
.filter(entry -> !entry.getValue().isActive())
.collect(Collectors.toList());
for (Map.Entry<BatchWriteId, IsomorphicBatchWrite> entry : loads) {
isomorphicBatchWriteMap.remove(entry.getKey());
coordinatorBackendAssigner.unregisterBatchWrite(entry.getValue().getId());
}
} finally {
lock.writeLock().unlock();
}
}

/**
* Retrieves or creates an IsomorphicBatchWrite instance for the specified table and parameters.
*
* @param tableId The ID of the table for which the batch write is requested.
* @param params The parameters for the stream load.
* @return A Pair containing the status of the operation and the IsomorphicBatchWrite instance.
*/
private Pair<TStatus, IsomorphicBatchWrite> getOrCreateTableBatchWrite(TableId tableId, StreamLoadKvParams params) {
BatchWriteId uniqueId = new BatchWriteId(tableId, params);
IsomorphicBatchWrite load = isomorphicBatchWriteMap.get(uniqueId);
if (load != null) {
return new Pair<>(new TStatus(TStatusCode.OK), load);
}

String warehouseName = params.getWarehouse().orElse(DEFAULT_WAREHOUSE_NAME);
StreamLoadInfo streamLoadInfo;
try {
streamLoadInfo = StreamLoadInfo.fromHttpStreamLoadRequest(null, -1, Optional.empty(), params);
} catch (Exception e) {
TStatus status = new TStatus();
status.setStatus_code(TStatusCode.INVALID_ARGUMENT);
status.setError_msgs(Collections.singletonList(
String.format("Failed to build stream load info, error: %s", e.getMessage())));
return new Pair<>(status, null);
}

Integer batchWriteIntervalMs = params.getBatchWriteIntervalMs().orElse(null);
if (batchWriteIntervalMs == null || batchWriteIntervalMs <= 0) {
TStatus status = new TStatus();
status.setStatus_code(TStatusCode.INVALID_ARGUMENT);
status.setError_msgs(Collections.singletonList(
"Batch write interval must be set positive, but is " + batchWriteIntervalMs));
return new Pair<>(status, null);
}

Integer batchWriteParallel = params.getBatchWriteParallel().orElse(null);
if (batchWriteParallel == null || batchWriteParallel <= 0) {
TStatus status = new TStatus();
status.setStatus_code(TStatusCode.INVALID_ARGUMENT);
status.setError_msgs(Collections.singletonList(
"Bathc load parallel must be set positive, but is " + batchWriteParallel));
return new Pair<>(status, null);
}

load = isomorphicBatchWriteMap.computeIfAbsent(uniqueId, uid -> {
long id = idGenerator.getAndIncrement();
IsomorphicBatchWrite newLoad = new IsomorphicBatchWrite(
id, tableId, warehouseName, streamLoadInfo, batchWriteIntervalMs, batchWriteParallel,
params.toMap(), new ConnectContext(), coordinatorBackendAssigner, threadPoolExecutor);
coordinatorBackendAssigner.registerBatchWrite(id, newLoad.getWarehouse(), tableId, newLoad.getBatchWriteParallel());
return newLoad;
});

return new Pair<>(new TStatus(TStatusCode.OK), load);
}

/**
* Returns the number of batch writes currently managed.
*
* @return The number of batch writes.
*/
public int numBatchWrites() {
return isomorphicBatchWriteMap.size();
}

@VisibleForTesting
Map<BatchWriteId, IsomorphicBatchWrite> getIsomorphicBatchWriteMap() {
return isomorphicBatchWriteMap;
}
}
Loading

0 comments on commit cc0f423

Please sign in to comment.