Skip to content

Commit

Permalink
add fetch service
Browse files Browse the repository at this point in the history
  • Loading branch information
chengxy committed Aug 11, 2023
1 parent c986657 commit 7c526cc
Show file tree
Hide file tree
Showing 21 changed files with 739 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.apache.rocketmq.eventbridge.adapter.runtime.buffer;

import io.netty.channel.FileRegion;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class MemoryBuffer implements OutBuffer {

private static final int INITIAL_BUFFER_SIZE = 4096;
private byte[] bytes;


public MemoryBuffer(byte[] bytes) {
this.bytes = bytes;
}

@Override
public InputStream getInputStream() {
return new ByteArrayInputStream(bytes);
}

@Override
public int getBufferSize() {
return bytes == null ? 0 : bytes.length;
}

// TODO Implement reference counting
@Override
public void write(OutputStream outputStream) throws IOException {
if (bytes != null) {
outputStream.write(bytes);
}
}

@Override
public void setRefCount(int refCount) {

}

@Override
public boolean isDisposable() {
return false;
}

@Override
public void release() {

}

@Override
public FileRegion toFileRegion() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package org.apache.rocketmq.eventbridge.adapter.runtime.buffer;

import io.netty.channel.FileRegion;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public interface OutBuffer {

/**
* Get the input stream of this buffer.
*
* @return buffer input stream.
*/
InputStream getInputStream();

/**
* Convert this buffer to file region.
*
* @return file region.
*/
FileRegion toFileRegion();

/**
* Get the buffer size of this buffer.
*
* @return buffer size.
*/
int getBufferSize();

/**
* Write data from a output stream.
*
* @param outputStream output stream.
* @throws IOException io exception.
*/
void write(OutputStream outputStream) throws IOException;

/**
* Set ref count, the number of consumer which handle this buffer.
*
* @param refCount ref count.
*/
void setRefCount(int refCount);

/**
* Check if this buffer disposable.
*
* @return if disposable.
*/
boolean isDisposable();

/**
* Release this buffer.
*/
void release();

interface BufferBuilder {

OutputStream getOutputStream();

/**
* Set the position of the stream.
*
* @param position position
*/
void positionStream(int position);

/**
* Get the buffer size.
*
* @return buffer size.
*/
int getBufferSize();

/**
* Set memory track.
*/
void enableMemoryTrack();

/**
* Build the buffer.
*
* @return buffer.
*/
OutBuffer build();

/**
* Close this builder.
*/
void close();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class EnvironmentArgumentParser implements IEnvironmentArgsParser {
private static final Logger LOGGER = LoggerFactory.getLogger(EnvironmentArgumentParser.class);

private static final String CLUSTER_ARGS = "cluster";
private static final String GEAFLOW_PREFIX = "bridge";
private static final String BRIDGE_PREFIX = "bridge";


@Override
Expand Down Expand Up @@ -63,7 +63,7 @@ private static void fillSystemConfig(Map<String, String> finalSystemArgs, Map<St
for (Map.Entry<String, String> entry : tmp.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key.startsWith(GEAFLOW_PREFIX)) {
if (key.startsWith(BRIDGE_PREFIX)) {
finalSystemArgs.put(key, value);
} else {
LOGGER.warn("ignore nonstandard system config: {} {}", key, value);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.apache.rocketmq.eventbridge.adapter.runtime.fetcher;


import org.apache.rocketmq.eventbridge.adapter.runtime.protocol.Message;
import org.apache.rocketmq.eventbridge.adapter.runtime.task.pipeline.PipelineMessage;
import org.apache.rocketmq.eventbridge.adapter.runtime.worker.InputReader;

public class FetchListenerImpl implements FetcherListener {

private InputReader inputReader;

public FetchListenerImpl(InputReader inputReader) {
this.inputReader = inputReader;
}

/**
* Trigger processor to process message.
*/
@Override
public void onMessage(PipelineMessage message) {
long windowId = message.getWindowId();
inputReader.add(new Message(windowId, message));
}

@Override
public void onCompleted(long windowId, long windowCount) {
inputReader.add(new Message(windowId, windowCount));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package org.apache.rocketmq.eventbridge.adapter.runtime.fetcher;

import org.apache.rocketmq.eventbridge.adapter.runtime.pipline.encoder.IEncoder;

import java.util.Map;

public class FetchRequest {


private long pipelineId;
private String pipelineName;
private int componentId;
private int taskId;
private int taskIndex;
private String taskName;
private long targetBatchId;

//transformMap
private Map<Integer, String> inputStreamMap;
private Map<Integer, IEncoder<?>> encoders;

public long getPipelineId() {
return pipelineId;
}

public void setPipelineId(long pipelineId) {
this.pipelineId = pipelineId;
}

public String getPipelineName() {
return pipelineName;
}

public void setPipelineName(String pipelineName) {
this.pipelineName = pipelineName;
}

public long getTargetBatchId() {
return targetBatchId;
}

public void setTargetBatchId(long targetBatchId) {
this.targetBatchId = targetBatchId;
}

public int getTaskIndex() {
return taskIndex;
}

public void setTaskIndex(int taskIndex) {
this.taskIndex = taskIndex;
}

public int getComponentId() {
return componentId;
}

public void setComponentId(int componentId) {
this.componentId = componentId;
}


public int getTaskId() {
return taskId;
}

public void setTaskId(int taskId) {
this.taskId = taskId;
}

public String getTaskName() {
return taskName;
}

public void setTaskName(String taskName) {
this.taskName = taskName;
}

public void setInputStreamMap(Map<Integer, String> inputStreamMap) {
this.inputStreamMap = inputStreamMap;
}

public Map<Integer, String> getInputStreamMap() {
return inputStreamMap;
}

public Map<Integer, IEncoder<?>> getEncoders() {
return this.encoders;
}

public void setEncoders(Map<Integer, IEncoder<?>> encoders) {
this.encoders = encoders;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.apache.rocketmq.eventbridge.adapter.runtime.fetcher;

import org.apache.rocketmq.eventbridge.adapter.runtime.task.pipeline.PipelineMessage;

public interface FetcherListener {

/**
* Trigger processor to process message.
*/
void onMessage(PipelineMessage message);

/**
* Trigger processor to process barrier.
*/
void onCompleted(long windowId, long windowCount);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.apache.rocketmq.eventbridge.adapter.runtime.fetcher;

import java.io.Serializable;

public interface IFetchRequest extends Serializable {
}
Loading

0 comments on commit 7c526cc

Please sign in to comment.