Skip to content

Commit

Permalink
extend rocketmq-connect-common
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 8, 2023
1 parent 1f5080b commit 01d16c8
Show file tree
Hide file tree
Showing 68 changed files with 949 additions and 774 deletions.
5 changes: 5 additions & 0 deletions connectors/rocketmq-connect-debezium/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,11 @@
<artifactId>logback-core</artifactId>
<version>1.2.9</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-connect-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
5 changes: 5 additions & 0 deletions metric-exporter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
Expand All @@ -37,6 +38,10 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-connect-common</artifactId>
</dependency>
</dependencies>

<build>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.connect.common.ProducerConfiguration;
import org.apache.rocketmq.connect.common.RocketMqBaseConfiguration;
import org.apache.rocketmq.connect.common.RocketMqUtils;
import org.apache.rocketmq.connect.metrics.MetricName;
import org.apache.rocketmq.connect.metrics.ScheduledMetricsReporter;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -126,27 +128,35 @@ public void config(Map<String, String> configs) {
}
this.topic = configs.get(METRICS_TOPIC);
String groupId = configs.get(GROUP_ID);
DefaultMQAdminExt defaultMQAdminExt = null;
try {
defaultMQAdminExt = RocketMQClientUtil.startMQAdminTool(Boolean.valueOf(configs.get(ACL_ENABLED)), configs.get(ACCESS_KEY), configs.get(SECRET_KEY), groupId, configs.get(NAMESRV_ADDR));
if (!RocketMQClientUtil.topicExist(defaultMQAdminExt, topic)) {
RocketMQClientUtil.createTopic(defaultMQAdminExt, new TopicConfig(topic));
RocketMqBaseConfiguration baseConfiguration = RocketMqBaseConfiguration
.builder()
.namesrvAddr(configs.get(NAMESRV_ADDR))
.aclEnable(Boolean.valueOf(configs.get(ACL_ENABLED)))
.accessKey(configs.get(ACCESS_KEY))
.secretKey(configs.get(SECRET_KEY))
.groupId(groupId)
.build();

RocketMqUtils.maybeCreateTopic(baseConfiguration, new TopicConfig(topic));
if (!RocketMqUtils.fetchAllConsumerGroup(baseConfiguration).contains(groupId)) {
RocketMqUtils.createGroup(baseConfiguration, groupId);
}
if (!RocketMQClientUtil.fetchAllConsumerGroup(defaultMQAdminExt).contains(groupId)) {
RocketMQClientUtil.createSubGroup(defaultMQAdminExt, groupId);
}
this.producer = RocketMQClientUtil.initDefaultMQProducer(Boolean.valueOf(configs.get(ACL_ENABLED)), configs.get(ACCESS_KEY), configs.get(SECRET_KEY), groupId, configs.get(NAMESRV_ADDR));
ProducerConfiguration producerConfiguration = ProducerConfiguration
.producerBuilder()
.namesrvAddr(configs.get(NAMESRV_ADDR))
.aclEnable(Boolean.valueOf(configs.get(ACL_ENABLED)))
.accessKey(configs.get(ACCESS_KEY))
.secretKey(configs.get(SECRET_KEY))
.groupId(groupId)
.build();
this.producer = RocketMqUtils.initDefaultMQProducer(producerConfiguration);
this.producer.start();
} catch (Exception e) {
log.error("Init config failed ", e);
} finally {
if (defaultMQAdminExt != null) {
defaultMQAdminExt.shutdown();
}
}
}


@Override
public void start() {
this.start(10, TimeUnit.SECONDS);
Expand Down
24 changes: 14 additions & 10 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<packaging>pom</packaging>
<version>0.0.1-SNAPSHOT</version>
<modules>
<module>rocketmq-connect-common</module>
<module>rocketmq-connect-sample</module>
<module>rocketmq-connect-runtime</module>
<module>rocketmq-connect-cli</module>
Expand Down Expand Up @@ -57,6 +58,9 @@
<commons-collections4.version>4.4</commons-collections4.version>
<!-- RocketMQ Version-->
<rocketmq.version>5.1.0</rocketmq.version>

<lombok.version>1.18.0</lombok.version>

<maven-artifact.version>3.8.1</maven-artifact.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!--maven properties -->
Expand Down Expand Up @@ -150,6 +154,11 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-connect-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-connect-cli</artifactId>
Expand Down Expand Up @@ -181,21 +190,16 @@
<artifactId>commons-collections4</artifactId>
<version>${commons-collections4.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.maven</groupId>
<artifactId>maven-artifact</artifactId>
<version>${maven-artifact.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
34 changes: 34 additions & 0 deletions rocketmq-connect-common/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq-connect</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>rocketmq-connect-common</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.connect.common;

import lombok.Builder;
import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public class ConsumerConfiguration extends RocketMqBaseConfiguration {
// consumer
private Integer batchSize;
private Long pollTimeoutMillis;


@Builder(builderMethodName = "consumerBuilder")
public ConsumerConfiguration(String namesrvAddr, String groupId, boolean aclEnable, String accessKey,
String secretKey,
Integer batchSize, Long pollTimeoutMillis) {
super(namesrvAddr, groupId, aclEnable, accessKey, secretKey);
this.batchSize = batchSize;
this.pollTimeoutMillis = pollTimeoutMillis;
}
}
Loading

0 comments on commit 01d16c8

Please sign in to comment.