Skip to content

Commit

Permalink
[INLONG-10543][Manager] The delimiter and other configurations in CLS…
Browse files Browse the repository at this point in the history
… and ES sink are obtained from the stream (apache#10545)
  • Loading branch information
fuweng11 authored Jul 1, 2024
1 parent ada439c commit 1dde4f7
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ public Map<String, String> parse2IdParams(StreamSinkEntity streamSink, List<Stri
public SinkConfig getSinkConfig(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, StreamSink sink) {
ClsSink clsSink = (ClsSink) sink;
ClsSinkConfig sinkConfig = CommonBeanUtils.copyProperties(clsSink, ClsSinkConfig::new);
sinkConfig.setSeparator(String.valueOf((char) (Integer.parseInt(streamInfo.getDataSeparator()))));
sinkConfig.setFieldOffset(streamInfo.getExtendedFieldSize());
sinkConfig.setContentOffset(0);
List<FieldConfig> fields = sinkFieldMapper.selectBySinkId(sink.getId()).stream().map(
v -> {
FieldConfig fieldConfig = new FieldConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ public SinkConfig getSinkConfig(InlongGroupInfo groupInfo, InlongStreamInfo stre
ElasticsearchSinkDTO elasticsearchSinkDTO = ElasticsearchSinkDTO.getFromJson(streamSinkEntity.getExtParams());
EsSinkConfig sinkConfig = CommonBeanUtils.copyProperties(elasticsearchSink, EsSinkConfig::new);
CommonBeanUtils.copyProperties(elasticsearchSinkDTO, sinkConfig);
sinkConfig.setSeparator(String.valueOf((char) (Integer.parseInt(streamInfo.getDataSeparator()))));
sinkConfig.setFieldOffset(streamInfo.getExtendedFieldSize());
sinkConfig.setContentOffset(0);
List<FieldConfig> fields = sinkFieldMapper.selectBySinkId(sink.getId()).stream().map(
v -> {
FieldConfig fieldConfig = new FieldConfig();
Expand Down

0 comments on commit 1dde4f7

Please sign in to comment.