From 7c526cce4e780a7ad2cf9bb122ec049f8330cc09 Mon Sep 17 00:00:00 2001 From: chengxy Date: Fri, 11 Aug 2023 11:54:33 +0800 Subject: [PATCH] add fetch service --- .../adapter/runtime/buffer/MemoryBuffer.java | 57 +++++++++++ .../adapter/runtime/buffer/OutBuffer.java | 95 ++++++++++++++++++ .../env/EnvironmentArgumentParser.java | 4 +- .../runtime/fetcher/FetchListenerImpl.java | 29 ++++++ .../adapter/runtime/fetcher/FetchRequest.java | 96 +++++++++++++++++++ .../runtime/fetcher/FetcherListener.java | 16 ++++ .../runtime/fetcher/IFetchRequest.java | 6 ++ .../runtime/fetcher/InitFetchRequest.java | 85 ++++++++++++++++ .../runtime/fetcher/PipelineInputFetcher.java | 85 ++++++++++++++++ .../adapter/runtime/protocol/IMessage.java | 7 ++ .../adapter/runtime/protocol/Message.java | 41 ++++++++ .../runtime/task/pipeline/PipelineEvent.java | 12 +++ .../task/pipeline/PipelineMessage.java | 17 ++++ .../task/record/AbstractMessageIterator.java | 18 ++++ .../runtime/task/record/IMessageIterator.java | 17 ++++ .../runtime/task/record/RecordArgs.java | 36 +++++++ .../runtime/task/runner/FetcherRunner.java | 39 ++++++++ .../runtime/task/service/FetcherService.java | 38 ++++++++ .../runtime/task/service/TaskService.java | 7 +- .../runtime/worker/DispatcherService.java | 2 +- .../adapter/runtime/worker/InputReader.java | 36 +++++++ 21 files changed, 739 insertions(+), 4 deletions(-) create mode 100644 adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/buffer/MemoryBuffer.java create mode 100644 adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/buffer/OutBuffer.java create mode 100644 adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/FetchListenerImpl.java create mode 100644 adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/FetchRequest.java create mode 100644 adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/FetcherListener.java create mode 100644 adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/IFetchRequest.java create mode 100644 adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/InitFetchRequest.java create mode 100644 adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/PipelineInputFetcher.java create mode 100644 adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/protocol/IMessage.java create mode 100644 adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/protocol/Message.java create mode 100644 adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/pipeline/PipelineEvent.java create mode 100644 adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/pipeline/PipelineMessage.java create mode 100644 adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/record/AbstractMessageIterator.java create mode 100644 adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/record/IMessageIterator.java create mode 100644 adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/record/RecordArgs.java create mode 100644 adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/runner/FetcherRunner.java create mode 100644 adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/service/FetcherService.java create mode 100644 adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/worker/InputReader.java diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/buffer/MemoryBuffer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/buffer/MemoryBuffer.java new file mode 100644 index 00000000..a062e9e0 --- /dev/null +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/buffer/MemoryBuffer.java @@ -0,0 +1,57 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.buffer; + +import io.netty.channel.FileRegion; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public class MemoryBuffer implements OutBuffer { + + private static final int INITIAL_BUFFER_SIZE = 4096; + private byte[] bytes; + + + public MemoryBuffer(byte[] bytes) { + this.bytes = bytes; + } + + @Override + public InputStream getInputStream() { + return new ByteArrayInputStream(bytes); + } + + @Override + public int getBufferSize() { + return bytes == null ? 0 : bytes.length; + } + + // TODO Implement reference counting + @Override + public void write(OutputStream outputStream) throws IOException { + if (bytes != null) { + outputStream.write(bytes); + } + } + + @Override + public void setRefCount(int refCount) { + + } + + @Override + public boolean isDisposable() { + return false; + } + + @Override + public void release() { + + } + + @Override + public FileRegion toFileRegion() { + return null; + } +} diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/buffer/OutBuffer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/buffer/OutBuffer.java new file mode 100644 index 00000000..908299df --- /dev/null +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/buffer/OutBuffer.java @@ -0,0 +1,95 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.buffer; + +import io.netty.channel.FileRegion; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public interface OutBuffer { + + /** + * Get the input stream of this buffer. + * + * @return buffer input stream. + */ + InputStream getInputStream(); + + /** + * Convert this buffer to file region. + * + * @return file region. + */ + FileRegion toFileRegion(); + + /** + * Get the buffer size of this buffer. + * + * @return buffer size. + */ + int getBufferSize(); + + /** + * Write data from a output stream. + * + * @param outputStream output stream. + * @throws IOException io exception. + */ + void write(OutputStream outputStream) throws IOException; + + /** + * Set ref count, the number of consumer which handle this buffer. + * + * @param refCount ref count. + */ + void setRefCount(int refCount); + + /** + * Check if this buffer disposable. + * + * @return if disposable. + */ + boolean isDisposable(); + + /** + * Release this buffer. + */ + void release(); + + interface BufferBuilder { + + OutputStream getOutputStream(); + + /** + * Set the position of the stream. + * + * @param position position + */ + void positionStream(int position); + + /** + * Get the buffer size. + * + * @return buffer size. + */ + int getBufferSize(); + + /** + * Set memory track. + */ + void enableMemoryTrack(); + + /** + * Build the buffer. + * + * @return buffer. + */ + OutBuffer build(); + + /** + * Close this builder. + */ + void close(); + + } +} diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/env/EnvironmentArgumentParser.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/env/EnvironmentArgumentParser.java index f1763127..00e849b5 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/env/EnvironmentArgumentParser.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/env/EnvironmentArgumentParser.java @@ -16,7 +16,7 @@ public class EnvironmentArgumentParser implements IEnvironmentArgsParser { private static final Logger LOGGER = LoggerFactory.getLogger(EnvironmentArgumentParser.class); private static final String CLUSTER_ARGS = "cluster"; - private static final String GEAFLOW_PREFIX = "bridge"; + private static final String BRIDGE_PREFIX = "bridge"; @Override @@ -63,7 +63,7 @@ private static void fillSystemConfig(Map finalSystemArgs, Map entry : tmp.entrySet()) { String key = entry.getKey(); String value = entry.getValue(); - if (key.startsWith(GEAFLOW_PREFIX)) { + if (key.startsWith(BRIDGE_PREFIX)) { finalSystemArgs.put(key, value); } else { LOGGER.warn("ignore nonstandard system config: {} {}", key, value); diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/FetchListenerImpl.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/FetchListenerImpl.java new file mode 100644 index 00000000..fd345a7f --- /dev/null +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/FetchListenerImpl.java @@ -0,0 +1,29 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.fetcher; + + +import org.apache.rocketmq.eventbridge.adapter.runtime.protocol.Message; +import org.apache.rocketmq.eventbridge.adapter.runtime.task.pipeline.PipelineMessage; +import org.apache.rocketmq.eventbridge.adapter.runtime.worker.InputReader; + +public class FetchListenerImpl implements FetcherListener { + + private InputReader inputReader; + + public FetchListenerImpl(InputReader inputReader) { + this.inputReader = inputReader; + } + + /** + * Trigger processor to process message. + */ + @Override + public void onMessage(PipelineMessage message) { + long windowId = message.getWindowId(); + inputReader.add(new Message(windowId, message)); + } + + @Override + public void onCompleted(long windowId, long windowCount) { + inputReader.add(new Message(windowId, windowCount)); + } +} diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/FetchRequest.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/FetchRequest.java new file mode 100644 index 00000000..29a9c6e2 --- /dev/null +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/FetchRequest.java @@ -0,0 +1,96 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.fetcher; + +import org.apache.rocketmq.eventbridge.adapter.runtime.pipline.encoder.IEncoder; + +import java.util.Map; + +public class FetchRequest { + + + private long pipelineId; + private String pipelineName; + private int componentId; + private int taskId; + private int taskIndex; + private String taskName; + private long targetBatchId; + + //transformMap + private Map inputStreamMap; + private Map> encoders; + + public long getPipelineId() { + return pipelineId; + } + + public void setPipelineId(long pipelineId) { + this.pipelineId = pipelineId; + } + + public String getPipelineName() { + return pipelineName; + } + + public void setPipelineName(String pipelineName) { + this.pipelineName = pipelineName; + } + + public long getTargetBatchId() { + return targetBatchId; + } + + public void setTargetBatchId(long targetBatchId) { + this.targetBatchId = targetBatchId; + } + + public int getTaskIndex() { + return taskIndex; + } + + public void setTaskIndex(int taskIndex) { + this.taskIndex = taskIndex; + } + + public int getComponentId() { + return componentId; + } + + public void setComponentId(int componentId) { + this.componentId = componentId; + } + + + public int getTaskId() { + return taskId; + } + + public void setTaskId(int taskId) { + this.taskId = taskId; + } + + public String getTaskName() { + return taskName; + } + + public void setTaskName(String taskName) { + this.taskName = taskName; + } + + public void setInputStreamMap(Map inputStreamMap) { + this.inputStreamMap = inputStreamMap; + } + + public Map getInputStreamMap() { + return inputStreamMap; + } + + public Map> getEncoders() { + return this.encoders; + } + + public void setEncoders(Map> encoders) { + this.encoders = encoders; + } + + +} diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/FetcherListener.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/FetcherListener.java new file mode 100644 index 00000000..59ac4e92 --- /dev/null +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/FetcherListener.java @@ -0,0 +1,16 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.fetcher; + +import org.apache.rocketmq.eventbridge.adapter.runtime.task.pipeline.PipelineMessage; + +public interface FetcherListener { + + /** + * Trigger processor to process message. + */ + void onMessage(PipelineMessage message); + + /** + * Trigger processor to process barrier. + */ + void onCompleted(long windowId, long windowCount); +} diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/IFetchRequest.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/IFetchRequest.java new file mode 100644 index 00000000..093742eb --- /dev/null +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/IFetchRequest.java @@ -0,0 +1,6 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.fetcher; + +import java.io.Serializable; + +public interface IFetchRequest extends Serializable { +} diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/InitFetchRequest.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/InitFetchRequest.java new file mode 100644 index 00000000..f3841868 --- /dev/null +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/InitFetchRequest.java @@ -0,0 +1,85 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.fetcher; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class InitFetchRequest implements IFetchRequest { + + private final long pipelineId; + private final String pipelineName; + + private int taskId; + private int taskIndex; + private String taskName; + + private int componentId; + + private Map inputStreamMap; + + private List fetchListeners; + + + public InitFetchRequest(long pipelineId, String pipelineName, String taskName) { + this.pipelineId = pipelineId; + this.pipelineName = pipelineName; + this.taskName = taskName; + } + + public long getPipelineId() { + return pipelineId; + } + + public String getPipelineName() { + return pipelineName; + } + + public int getTaskId() { + return taskId; + } + + public int getTaskIndex() { + return taskIndex; + } + + public String getTaskName() { + return taskName; + } + + public int getComponentId() { + return componentId; + } + + public Map getInputStreamMap() { + return inputStreamMap; + } + + + public InitFetchRequest setTaskId(int taskId) { + this.taskId = taskId; + return this; + } + + public InitFetchRequest setTaskIndex(int taskIndex) { + this.taskIndex = taskIndex; + return this; + } + + public InitFetchRequest setTaskName(String taskName) { + this.taskName = taskName; + return this; + } + + public InitFetchRequest setComponentId(int componentId) { + this.componentId = componentId; + return this; + } + + + public List getFetchListeners() { + return fetchListeners; + } + + +} diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/PipelineInputFetcher.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/PipelineInputFetcher.java new file mode 100644 index 00000000..6e7e7fb8 --- /dev/null +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/fetcher/PipelineInputFetcher.java @@ -0,0 +1,85 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.fetcher; + +import org.apache.rocketmq.eventbridge.adapter.runtime.boot.EventRuleTransfer; +import org.apache.rocketmq.eventbridge.adapter.runtime.env.config.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class PipelineInputFetcher { + + + private static final Logger logger = LoggerFactory.getLogger(PipelineInputFetcher.class); + + private Configuration config; + private InitFetchRequest initRequest; + + private List fetchListeners; + + private long pipelineId; + private String pipelineName; + + public PipelineInputFetcher(Configuration config) { + this.config = config; + } + + /** + * Init input fetcher reader. + * + * @param request + */ + public void init(InitFetchRequest request) { + + + this.pipelineId = request.getPipelineId(); + this.pipelineName = request.getPipelineName(); + this.initRequest = request; + this.fetchListeners = request.getFetchListeners(); + } + + /** + * Fetch batch data and trigger process. + */ + public void fetch(long startWindowId, long windowCount) { + logger.info("task {} start fetch windowId:{} count:{}", initRequest.getTaskId(), + startWindowId, windowCount); + long targetWindowId = startWindowId + windowCount - 1; + fetch(buildFetchRequest(targetWindowId)); + } + + /** + * Build the fetch request. + * @return + */ + protected FetchRequest buildFetchRequest(long targetBatchId) { + FetchRequest request = new FetchRequest(); + request.setPipelineId(pipelineId); + request.setPipelineName(pipelineName); + request.setTaskId(initRequest.getTaskId()); + request.setTaskIndex(initRequest.getTaskIndex()); + request.setTaskName(initRequest.getTaskName()); + request.setComponentId(initRequest.getComponentId()); + request.setTargetBatchId(targetBatchId); + request.setInputStreamMap(initRequest.getInputStreamMap()); + return request; + } + + /** + * Fetch data according to fetch request and process by worker. + * + * @param request + */ + protected void fetch(FetchRequest request) { + //TODO Distribute tasks among workers + } + + public void cancel() { + // TODO Cancel fetching task. + // Shuffle reader should support cancel. + } + + public void close() { + //TODO Terminate the task when the task is abnormal + } +} diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/protocol/IMessage.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/protocol/IMessage.java new file mode 100644 index 00000000..dd039e86 --- /dev/null +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/protocol/IMessage.java @@ -0,0 +1,7 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.protocol; + +/** + * A message is the event of data flow among cycle scheduling. + */ +public interface IMessage extends IEvent { +} diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/protocol/Message.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/protocol/Message.java new file mode 100644 index 00000000..52b6277c --- /dev/null +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/protocol/Message.java @@ -0,0 +1,41 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.protocol; + + +import org.apache.rocketmq.eventbridge.adapter.runtime.task.pipeline.PipelineMessage; + +/** + * A message which is processed by worker. + */ +public class Message implements IMessage { + + private final long windowId; + private PipelineMessage message; + private long windowCount; + + public Message(long windowId, PipelineMessage message) { + this.windowId = windowId; + this.message = message; + } + + public Message(long windowId, long windowCount) { + this.windowId = windowId; + this.windowCount = windowCount; + } + + public long getWindowId() { + return windowId; + } + + public PipelineMessage getMessage() { + return message; + } + + public long getWindowCount() { + return windowCount; + } + + @Override + public EventType getEventType() { + return EventType.MESSAGE; + } +} diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/pipeline/PipelineEvent.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/pipeline/PipelineEvent.java new file mode 100644 index 00000000..1dec5fa0 --- /dev/null +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/pipeline/PipelineEvent.java @@ -0,0 +1,12 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.task.pipeline; + +public interface PipelineEvent { + + /** + * Get the window id of this pipeline event. + * + * @return window id. + */ + long getWindowId(); + +} diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/pipeline/PipelineMessage.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/pipeline/PipelineMessage.java new file mode 100644 index 00000000..4dcc6d1a --- /dev/null +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/pipeline/PipelineMessage.java @@ -0,0 +1,17 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.task.pipeline; + +import org.apache.rocketmq.eventbridge.adapter.runtime.task.record.RecordArgs; + +public class PipelineMessage implements PipelineEvent{ + + private final RecordArgs recordArgs; + + public PipelineMessage(long batchId, String streamName, RecordArgs recordArgs) { + this.recordArgs = recordArgs; + } + + @Override + public long getWindowId() { + return 0; + } +} diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/record/AbstractMessageIterator.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/record/AbstractMessageIterator.java new file mode 100644 index 00000000..aa8cb49c --- /dev/null +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/record/AbstractMessageIterator.java @@ -0,0 +1,18 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.task.record; + +import java.io.InputStream; +import java.io.OutputStream; + +public abstract class AbstractMessageIterator implements IMessageIterator{ + + private long recordNum; + protected T currentValue; + + protected OutputStream outputStream; + protected InputStream inputStream; + + public AbstractMessageIterator(OutputStream outBuffer) { + this.outputStream = outBuffer; + } + +} diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/record/IMessageIterator.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/record/IMessageIterator.java new file mode 100644 index 00000000..e76d13f1 --- /dev/null +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/record/IMessageIterator.java @@ -0,0 +1,17 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.task.record; + +import java.util.Iterator; + +public interface IMessageIterator extends Iterator { + /** + * Get total record accessed. + * + * @return total record number. + */ + long getSize(); + + /** + * Close this iterator. + */ + void close(); +} diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/record/RecordArgs.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/record/RecordArgs.java new file mode 100644 index 00000000..4686b3e6 --- /dev/null +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/record/RecordArgs.java @@ -0,0 +1,36 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.task.record; + +public class RecordArgs { + + private long windowId; + private String name; + + public RecordArgs() { + this.windowId = 0; + this.name = ""; + } + + public RecordArgs(long windowId) { + this.windowId = windowId; + } + + + public RecordArgs(long windowId, String name) { + this.windowId = windowId; + this.name = name; + } + + public long getWindowId() { + return windowId; + } + + public String getName() { + return name; + } + + @Override + public String toString() { + return "RecordArgs{" + "windowId=" + windowId + ", name='" + name + '\'' + '}'; + } + +} diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/runner/FetcherRunner.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/runner/FetcherRunner.java new file mode 100644 index 00000000..c67998a6 --- /dev/null +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/runner/FetcherRunner.java @@ -0,0 +1,39 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.task.runner; + +import org.apache.rocketmq.eventbridge.adapter.runtime.env.config.Configuration; +import org.apache.rocketmq.eventbridge.adapter.runtime.fetcher.IFetchRequest; +import org.apache.rocketmq.eventbridge.adapter.runtime.fetcher.InitFetchRequest; +import org.apache.rocketmq.eventbridge.adapter.runtime.fetcher.PipelineInputFetcher; +import org.apache.rocketmq.eventbridge.adapter.runtime.task.AbstractTaskRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FetcherRunner extends AbstractTaskRunner { + + private static final Logger LOGGER = LoggerFactory.getLogger(FetcherRunner.class); + + private final PipelineInputFetcher fetcher; + + public FetcherRunner(Configuration configuration) { + this.fetcher = new PipelineInputFetcher(configuration); + } + + @Override + protected void process(IFetchRequest task) { + if (task instanceof InitFetchRequest) { + fetcher.init((InitFetchRequest) task); + } + } + + @Override + public void interrupt() { + LOGGER.info("cancel fetcher runner"); + fetcher.cancel(); + } + + @Override + public void shutdown() { + super.shutdown(); + fetcher.close(); + } +} diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/service/FetcherService.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/service/FetcherService.java new file mode 100644 index 00000000..054c4768 --- /dev/null +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/service/FetcherService.java @@ -0,0 +1,38 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.task.service; + +import com.google.common.base.Preconditions; +import org.apache.rocketmq.eventbridge.adapter.runtime.env.config.Configuration; +import org.apache.rocketmq.eventbridge.adapter.runtime.fetcher.IFetchRequest; +import org.apache.rocketmq.eventbridge.adapter.runtime.task.runner.AbstractTaskService; +import org.apache.rocketmq.eventbridge.adapter.runtime.task.runner.FetcherRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +public class FetcherService extends AbstractTaskService implements Serializable { + + private static final Logger LOGGER = LoggerFactory.getLogger(FetcherService.class); + + private static final String FETCHER_FORMAT = "bridge-fetcher-%d"; + + private int slots; + private Configuration configuration; + + public FetcherService(int slots, Configuration configuration) { + super(FETCHER_FORMAT); + this.slots = slots; + this.configuration = configuration; + } + + @Override + protected FetcherRunner[] buildTaskRunner() { + Preconditions.checkArgument(slots > 0, "fetcher pool should be larger than 0"); + FetcherRunner[] fetcherRunners = new FetcherRunner[slots]; + for (int i = 0; i < slots; i++) { + FetcherRunner runner = new FetcherRunner(configuration); + fetcherRunners[i] = runner; + } + return fetcherRunners; + } +} diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/service/TaskService.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/service/TaskService.java index 114c2542..d12d77ae 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/service/TaskService.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/task/service/TaskService.java @@ -1,5 +1,6 @@ package org.apache.rocketmq.eventbridge.adapter.runtime.task.service; +import org.apache.rocketmq.eventbridge.adapter.runtime.env.config.Configuration; import org.apache.rocketmq.eventbridge.adapter.runtime.protocol.ICommand; import org.apache.rocketmq.eventbridge.adapter.runtime.task.runner.AbstractTaskService; import org.apache.rocketmq.eventbridge.adapter.runtime.task.runner.TaskRunner; @@ -12,7 +13,11 @@ public class TaskService extends AbstractTaskService { private static final String WORKER_FORMAT = "bridge-worker-%d"; - public TaskService(String threadFormat) { + private int containerId; + + private int taskNum; + + public TaskService(int containerId, int taskNum, Configuration configuration, String threadFormat) { super(threadFormat); } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/worker/DispatcherService.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/worker/DispatcherService.java index 56dcebf6..e54b8328 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/worker/DispatcherService.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/worker/DispatcherService.java @@ -6,7 +6,7 @@ public class DispatcherService extends AbstractTaskService { - private static final String MESSAGE_FORMAT = "geaflow-message-%d"; + private static final String MESSAGE_FORMAT = "bridge-message-%d"; private Dispatcher dispatcher; diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/worker/InputReader.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/worker/InputReader.java new file mode 100644 index 00000000..01073c91 --- /dev/null +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/worker/InputReader.java @@ -0,0 +1,36 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.worker; + +import org.apache.rocketmq.eventbridge.adapter.runtime.protocol.Message; + +import java.io.Serializable; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class InputReader implements Serializable { + + private LinkedBlockingQueue inputQueue; + + public InputReader() { + this.inputQueue = new LinkedBlockingQueue<>(); + } + + /** + * Add message into input queue. + */ + public void add(Message message) { + inputQueue.add(message); + } + + /** + * Returns message from input queue. + */ + public Message poll(long timeout, TimeUnit unit) { + Message message; + try { + message = inputQueue.poll(timeout, unit); + } catch (Throwable t) { + throw new RuntimeException(t); + } + return message; + } +}