Skip to content

Commit

Permalink
feat(arrow): impl remain operator by converting batch to rows
Browse files Browse the repository at this point in the history
  • Loading branch information
aqni committed Nov 18, 2024
1 parent 0695627 commit 8c5de23
Show file tree
Hide file tree
Showing 55 changed files with 1,931 additions and 793 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@
import cn.edu.tsinghua.iginx.engine.shared.RequestContext;
import cn.edu.tsinghua.iginx.engine.shared.constraint.ConstraintManager;
import cn.edu.tsinghua.iginx.engine.shared.data.read.BatchStream;
import cn.edu.tsinghua.iginx.engine.shared.data.read.BatchStreams;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.operator.Migration;
import cn.edu.tsinghua.iginx.engine.shared.operator.Operator;
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OperatorType;
import cn.edu.tsinghua.iginx.migration.MigrationPhysicalExecutor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -74,25 +77,32 @@ public BatchStream execute(RequestContext ctx, Operator root) throws PhysicalExc
.execute(ctx, (Migration) root, storageTaskExecutor);
} else {
GlobalPhysicalTask task = new GlobalPhysicalTask(root, ctx);
try (TaskResult result = storageTaskExecutor.executeGlobalTask(task)) {
return result.nullableUnwrap();
try (TaskResult<RowStream> result = storageTaskExecutor.executeGlobalTask(task)) {
RowStream rowStream = result.unwrap();
if (rowStream == null) {
return null;
}
return BatchStreams.wrap(ctx.getAllocator(), result.unwrap(), ctx.getBatchRowCount());
}
}
}
PhysicalTask task = optimizer.optimize(root, ctx);
ctx.setPhysicalTree(task);
List<PhysicalTask> bottomTasks = new ArrayList<>();
getBottomTasks(bottomTasks, task);
PhysicalTask<?> task = optimizer.optimize(root, ctx);
PhysicalTask<BatchStream> batchTask = optimizer.convert(task, ctx, BatchStream.class);
ctx.setPhysicalTree(batchTask);
List<PhysicalTask<?>> bottomTasks = new ArrayList<>();
getBottomTasks(bottomTasks, batchTask);
commitBottomTasks(bottomTasks);
try (TaskResult result = task.getResult()) {
return result.nullableUnwrap();
try (TaskResult<BatchStream> result = batchTask.getResult().get()) {
return result.unwrap();
} catch (ExecutionException | InterruptedException e) {
throw new PhysicalException(e);
}
}

private void commitBottomTasks(List<PhysicalTask> bottomTasks) {
private void commitBottomTasks(List<PhysicalTask<?>> bottomTasks) throws PhysicalException {
List<StoragePhysicalTask> storageTasks = new ArrayList<>();
List<GlobalPhysicalTask> globalTasks = new ArrayList<>();
for (PhysicalTask task : bottomTasks) {
for (PhysicalTask<?> task : bottomTasks) {
if (task.getType().equals(TaskType.Storage)) {
storageTasks.add((StoragePhysicalTask) task);
} else if (task.getType().equals(TaskType.Global)) {
Expand All @@ -101,7 +111,7 @@ private void commitBottomTasks(List<PhysicalTask> bottomTasks) {
}
storageTaskExecutor.commit(storageTasks);
for (GlobalPhysicalTask globalTask : globalTasks) {
storageTaskExecutor.executeGlobalTask(globalTask);
storageTaskExecutor.executeGlobalTask(globalTask).close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.memory.queue.MemoryPhysicalTaskQueue;
import cn.edu.tsinghua.iginx.engine.physical.memory.queue.MemoryPhysicalTaskQueueImpl;
import cn.edu.tsinghua.iginx.engine.physical.task.MemoryPhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskResult;
import cn.edu.tsinghua.iginx.engine.physical.task.memory.MemoryPhysicalTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
Expand Down Expand Up @@ -62,7 +62,7 @@ public void startDispatcher() {
() -> {
try {
while (true) {
final MemoryPhysicalTask task = taskQueue.getTask();
final MemoryPhysicalTask<?> task = taskQueue.getTask();
if (isCancelled(task.getContext().getSessionId())) {
LOGGER.warn(
"MemoryPhysicalTask[sessionId={}] is cancelled.",
Expand All @@ -71,19 +71,19 @@ public void startDispatcher() {
}
taskExecuteThreadPool.submit(
() -> {
MemoryPhysicalTask currentTask = task;
MemoryPhysicalTask<?> currentTask = task;
while (currentTask != null) {
TaskResult result;
TaskResult<?> result;
try {
result = currentTask.execute();
} catch (Throwable e) {
LOGGER.error("execute memory task failure: ", e);
result = new TaskResult(new PhysicalException(e));
result = new TaskResult<>(new PhysicalException(e));
}
currentTask.setResult(result);
if (currentTask.getFollowerTask() != null) { // 链式执行可以被执行的任务
MemoryPhysicalTask followerTask =
(MemoryPhysicalTask) currentTask.getFollowerTask();
MemoryPhysicalTask<?> followerTask =
(MemoryPhysicalTask<?>) currentTask.getFollowerTask();
if (followerTask.notifyParentReady()) {
currentTask = followerTask;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class JoinOption {
public static final JoinOption LEFT = new JoinOption(true, false);
public static final JoinOption RIGHT = new JoinOption(false, true);
public static final JoinOption FULL = new JoinOption(true, true);
public static final JoinOption SINGLE = new JoinOption(false, false, false, false, null, false);

private final boolean outputBuildSideUnmatched;
private final boolean outputProbeSideUnmatched;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,9 @@ public static Field flatten(DictionaryProvider dictionaryProvider, Field field)
Dictionary dictionary = dictionaryProvider.lookup(field.getDictionary().getId());
return Schemas.fieldWithName(dictionary.getVector().getField(), field.getName());
}

public static cn.edu.tsinghua.iginx.engine.shared.data.read.Field toIginxField(Field field) {
return new cn.edu.tsinghua.iginx.engine.shared.data.read.Field(
field.getName(), toDataType(field.getFieldType().getType()), field.getMetadata());
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,18 @@ public RowStream executeBinaryOperator(
}
}

private Table transformToTable(RowStream stream) throws PhysicalException {
public static Table transformToTable(RowStream stream) throws PhysicalException {
if (stream instanceof Table) {
return (Table) stream;
}
Header header = stream.getHeader();
List<Row> rows = new ArrayList<>();
while (stream.hasNext()) {
rows.add(stream.next());
try (RowStream ignored = stream) {
Header header = stream.getHeader();
List<Row> rows = new ArrayList<>();
while (stream.hasNext()) {
rows.add(stream.next());
}
return new Table(header, rows);
}
stream.close();
return new Table(header, rows);
}

private RowStream executeProject(Project project, Table table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package cn.edu.tsinghua.iginx.engine.physical.memory.queue;

import cn.edu.tsinghua.iginx.engine.physical.task.MemoryPhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.memory.MemoryPhysicalTask;

public interface MemoryPhysicalTaskQueue {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package cn.edu.tsinghua.iginx.engine.physical.memory.queue;

import cn.edu.tsinghua.iginx.engine.physical.task.MemoryPhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.memory.MemoryPhysicalTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@
package cn.edu.tsinghua.iginx.engine.physical.optimizer;

import cn.edu.tsinghua.iginx.engine.physical.task.PhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.utils.PhysicalCloseable;
import cn.edu.tsinghua.iginx.engine.shared.RequestContext;
import cn.edu.tsinghua.iginx.engine.shared.constraint.ConstraintManager;
import cn.edu.tsinghua.iginx.engine.shared.operator.Operator;

public interface PhysicalOptimizer {

PhysicalTask optimize(Operator root, RequestContext context);
PhysicalTask<?> optimize(Operator root, RequestContext context);

<RESULT extends PhysicalCloseable> PhysicalTask<RESULT> convert(
PhysicalTask<?> task, RequestContext context, Class<RESULT> clazz);

ConstraintManager getConstraintManager();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea;
import cn.edu.tsinghua.iginx.engine.physical.storage.queue.StoragePhysicalTaskQueue;
import cn.edu.tsinghua.iginx.engine.physical.task.*;
import cn.edu.tsinghua.iginx.engine.physical.task.memory.MemoryPhysicalTask;
import cn.edu.tsinghua.iginx.engine.shared.data.read.FetchMetricsRowStream;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStreams;
import cn.edu.tsinghua.iginx.engine.shared.operator.*;
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OperatorType;
import cn.edu.tsinghua.iginx.metadata.DefaultMetaManager;
Expand All @@ -48,10 +51,7 @@
import cn.edu.tsinghua.iginx.utils.Pair;
import cn.edu.tsinghua.iginx.utils.StringUtils;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -113,8 +113,9 @@ private StoragePhysicalTaskExecutor() {
task.setStorageUnit(id);
task.setDummyStorageUnit(isDummy);
if (pair.v.getQueue().size() > maxCachedPhysicalTaskPerStorage) {
task.setResult(
new TaskResult(new TooManyPhysicalTasksException(storageId)));
setResult(
task,
new TaskExecuteResult(new TooManyPhysicalTasksException(storageId)));
continue;
}
if (isCancelled(task.getContext().getSessionId())) {
Expand Down Expand Up @@ -238,7 +239,7 @@ private StoragePhysicalTaskExecutor() {
}
long span = System.nanoTime() - startTime;
task.getMetrics().accumulateCpuTime(span);
task.setResult(result);
setResult(task, result);
if (task.getFollowerTask() != null
&& task.isSync()) { // 只有同步任务才会影响后续任务的执行
MemoryPhysicalTask followerTask =
Expand All @@ -255,7 +256,7 @@ private StoragePhysicalTaskExecutor() {
+ task
+ " will not broadcasting to replicas for the sake of exception",
result.getException());
task.setResult(new TaskResult(result.getException()));
setResult(task, new TaskExecuteResult(result.getException()));
} else {
StorageUnitMeta masterStorageUnit =
task.getTargetFragment().getMasterStorageUnit();
Expand Down Expand Up @@ -336,24 +337,28 @@ public void commitWithTargetStorageUnitId(StoragePhysicalTask task, String stora
storageTaskQueues.get(storageUnitId).addTask(task);
}

public TaskResult executeGlobalTask(GlobalPhysicalTask task) {
public TaskResult<RowStream> executeGlobalTask(GlobalPhysicalTask task) {
switch (task.getOperator().getType()) {
case ShowColumns:
long startTime = System.nanoTime();
TaskExecuteResult result = executeShowColumns((ShowColumns) task.getOperator());
long span = System.nanoTime() - startTime;
task.getMetrics().accumulateCpuTime(span);
task.setResult(result);
setResult(task, result);
if (task.getFollowerTask() != null) {
MemoryPhysicalTask followerTask = (MemoryPhysicalTask) task.getFollowerTask();
MemoryPhysicalTask<?> followerTask = (MemoryPhysicalTask<?>) task.getFollowerTask();
boolean isFollowerTaskReady = followerTask.notifyParentReady();
if (isFollowerTaskReady) {
memoryTaskExecutor.addMemoryTask(followerTask);
}
}
return task.getResult();
try {
return task.getResult().get();
} catch (InterruptedException | ExecutionException e) {
return new TaskResult<>(new PhysicalException(e));
}
default:
return new TaskResult(
return new TaskResult<>(
new UnexpectedOperatorException("unknown op: " + task.getOperator().getType()));
}
}
Expand Down Expand Up @@ -444,4 +449,22 @@ public void init(
public StorageManager getStorageManager() {
return storageManager;
}

private static void setResult(PhysicalTask<RowStream> task, TaskExecuteResult result) {
if (result != null) {
if (result.getException() != null) {
task.setResult(new TaskResult<>(result.getException()));
return;
}
RowStream sourceRowStream = result.getRowStream();
if (sourceRowStream != null) {
RowStream rowStream =
new FetchMetricsRowStream(
sourceRowStream, task.getMetrics(), task.getContext().getBatchRowCount());
task.setResult(new TaskResult<>(rowStream));
return;
}
}
task.setResult(new TaskResult<>(RowStreams.empty()));
}
}
Loading

0 comments on commit 8c5de23

Please sign in to comment.