diff --git a/connectors/rocketmq-connect-enterprisewechat/README.md b/connectors/rocketmq-connect-enterprisewechat/README.md new file mode 100644 index 00000000..69ed31d3 --- /dev/null +++ b/connectors/rocketmq-connect-enterprisewechat/README.md @@ -0,0 +1,67 @@ +# rocketmq-connect-enterprisewechat + +## 快速开始 + +### 1. 构建 rocketmq-connect-enterprisewechat + +```bash +cd connectors/rocketmq-connect-enterprisewechat + +mvn -DskipTests clean package +``` + +打包好的文件在 `connectors/rocketmq-connect-enterprisewechat/target` 目录下 + +### 2. 修改配置 + +修改 `distribution/conf` 目录下对应的配置文件。如对于 standalone 的启动方式,修改 `connect-standalone.conf` +文件中的 `namesrvAddr` 和 `pluginPaths` 等字段: + +```properties +namesrvAddr=localhost:9876 +pluginPaths=connectors/rocketmq-connect-enterprisewechat/target/rocketmq-connect-enterprisewechat-0.0.1-SNAPSHOT-jar-with-dependencies.jar +``` + +### 3. 启动 Worker + +参见 [README](https://github.com/apache/rocketmq-connect/blob/master/README.md#3%E8%BF%90%E8%A1%8Cworker) + +### 4. 启动 Sink Connector + +```http request +POST http://127.0.0.1:8082/connectors/enterpriseWechatSinkConnector +Content-Type: application/json + +{ + "connector.class": "org.apache.rocketmq.connect.enterprisewechat.sink.EnterpriseWechatSinkConnector", + "connect.topicnames": "testTopic", + "webHook": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxx" +} +``` + +### 5. 停止 Sink Connector + +```http request +GET http://${runtime-ip}:${runtime-port}/connectors/enterpriseWechatSinkConnector/stop +``` + +## 参数说明 + +| KEY | TYPE | Must be filled | Description | Example | +|--------------------|--------|----------------|-----------------|-----------------------------------------------------------| +| webHook | String | YES | 机器人的webhook地址 | https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxx | +| connect.topicnames | String | YES | sink消费消息的topics | testTopic | + +## Record 数据格式说明 +Record 应保存 JSON 字符串如: + +```json +{ + "msgtype": "text", + "text": { + "content": "Hello World!" + } +} +``` + +详情可参见[官方文档](https://developer.work.weixin.qq.com/document/path/99110#%E5%A6%82%E4%BD%95%E4%BD%BF%E7%94%A8%E7%BE%A4%E6%9C%BA%E5%99%A8%E4%BA%BA)。 \ No newline at end of file diff --git a/connectors/rocketmq-connect-enterprisewechat/pom.xml b/connectors/rocketmq-connect-enterprisewechat/pom.xml new file mode 100644 index 00000000..589e3221 --- /dev/null +++ b/connectors/rocketmq-connect-enterprisewechat/pom.xml @@ -0,0 +1,199 @@ + + + + + 4.0.0 + + org.apache.rocketmq + rocketmq-connect-enterprisewechat + 0.0.1-SNAPSHOT + + connect-enterprisewechat + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + jira + https://issues.apache.org/jira/browse/RocketMQ + + + + + + org.codehaus.mojo + versions-maven-plugin + 2.3 + + + org.codehaus.mojo + clirr-maven-plugin + 2.7 + + + maven-compiler-plugin + 3.6.1 + + ${maven.compiler.source} + ${maven.compiler.target} + ${maven.compiler.source} + true + true + + + + maven-surefire-plugin + 2.19.1 + + -Xms512m -Xmx1024m + always + + **/*Test.java + + + + + maven-site-plugin + 3.6 + + en_US + UTF-8 + UTF-8 + + + + maven-source-plugin + 3.0.1 + + + attach-sources + + jar + + + + + + maven-javadoc-plugin + 2.10.4 + + UTF-8 + en_US + io.openmessaging.internal + + + + aggregate + + aggregate + + site + + + + + maven-resources-plugin + 3.0.2 + + ${project.build.sourceEncoding} + + + + org.codehaus.mojo + findbugs-maven-plugin + 3.0.4 + + + org.apache.rat + apache-rat-plugin + 0.12 + + + README.md + README-CN.md + + + + + maven-assembly-plugin + 3.0.0 + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + + UTF-8 + UTF-8 + 1.8 + 1.8 + 0.1.4 + 1.7.7 + 3.9.1 + 1.2.83 + 3.12.0 + 4.13.1 + + + + + io.openmessaging + openmessaging-connector + ${openmessaging-connector.version} + + + junit + junit + ${junit.version} + test + + + org.slf4j + slf4j-api + ${slf4j.version} + + + com.squareup.okhttp3 + okhttp + ${okhttp.version} + + + com.alibaba + fastjson + ${fastjson.version} + + + org.apache.commons + commons-lang3 + ${commons-lang3.version} + + + \ No newline at end of file diff --git a/connectors/rocketmq-connect-enterprisewechat/src/main/java/org/apache/rocketmq/connect/enterprisewechat/common/SinkConstants.java b/connectors/rocketmq-connect-enterprisewechat/src/main/java/org/apache/rocketmq/connect/enterprisewechat/common/SinkConstants.java new file mode 100644 index 00000000..290f0145 --- /dev/null +++ b/connectors/rocketmq-connect-enterprisewechat/src/main/java/org/apache/rocketmq/connect/enterprisewechat/common/SinkConstants.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.enterprisewechat.common; + +public class SinkConstants { + public static final String CONTENT_TYPE = "Content-Type"; + public static final String APPLICATION_JSON_UTF_8_TYPE = "application/json"; + public static final String WEB_HOOK = "webHook"; +} diff --git a/connectors/rocketmq-connect-enterprisewechat/src/main/java/org/apache/rocketmq/connect/enterprisewechat/config/SinkConfig.java b/connectors/rocketmq-connect-enterprisewechat/src/main/java/org/apache/rocketmq/connect/enterprisewechat/config/SinkConfig.java new file mode 100644 index 00000000..dcffbb9d --- /dev/null +++ b/connectors/rocketmq-connect-enterprisewechat/src/main/java/org/apache/rocketmq/connect/enterprisewechat/config/SinkConfig.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.enterprisewechat.config; + +import java.util.HashSet; +import java.util.Set; +import org.apache.rocketmq.connect.enterprisewechat.common.SinkConstants; + +public class SinkConfig { + public static final Set REQUEST_CONFIG = new HashSet() { + { + add(SinkConstants.WEB_HOOK); + } + }; + + private String url; + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } +} diff --git a/connectors/rocketmq-connect-enterprisewechat/src/main/java/org/apache/rocketmq/connect/enterprisewechat/sink/EnterpriseWechatSinkConnector.java b/connectors/rocketmq-connect-enterprisewechat/src/main/java/org/apache/rocketmq/connect/enterprisewechat/sink/EnterpriseWechatSinkConnector.java new file mode 100644 index 00000000..c57c8ffd --- /dev/null +++ b/connectors/rocketmq-connect-enterprisewechat/src/main/java/org/apache/rocketmq/connect/enterprisewechat/sink/EnterpriseWechatSinkConnector.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.enterprisewechat.sink; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.sink.SinkConnector; +import io.openmessaging.connector.api.errors.ConnectException; +import java.net.URL; +import java.net.URLConnection; +import java.util.ArrayList; +import java.util.List; +import org.apache.rocketmq.connect.enterprisewechat.common.SinkConstants; +import org.apache.rocketmq.connect.enterprisewechat.config.SinkConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EnterpriseWechatSinkConnector extends SinkConnector { + private static final Logger log = LoggerFactory.getLogger(EnterpriseWechatSinkConnector.class); + + private KeyValue connectConfig; + + @Override + public void validate(KeyValue config) { + for (String requestKey : SinkConfig.REQUEST_CONFIG) { + if (!config.containsKey(requestKey)) { + throw new ConnectException("Request config key: " + requestKey); + } + } + + try { + URL urlConnect = new URL(config.getString(SinkConstants.WEB_HOOK)); + URLConnection urlConnection = urlConnect.openConnection(); + urlConnection.setConnectTimeout(5000); + urlConnection.connect(); + log.info("url validate success"); + } catch (Exception e) { + throw new RuntimeException(e.getMessage()); + } + } + + @Override + public List taskConfigs(int maxTasks) { + List configs = new ArrayList<>(maxTasks); + for (int i = 0; i < maxTasks; i++) { + configs.add(this.connectConfig); + } + return configs; + } + + @Override + public Class taskClass() { + return EnterpriseWechatSinkTask.class; + } + + @Override + public void start(KeyValue config) { + for (String requestKey : SinkConfig.REQUEST_CONFIG) { + if (!config.containsKey(requestKey)) { + throw new RuntimeException("Request config key: " + requestKey); + } + } + this.connectConfig = config; + } + + @Override + public void stop() { + this.connectConfig = null; + } +} diff --git a/connectors/rocketmq-connect-enterprisewechat/src/main/java/org/apache/rocketmq/connect/enterprisewechat/sink/EnterpriseWechatSinkTask.java b/connectors/rocketmq-connect-enterprisewechat/src/main/java/org/apache/rocketmq/connect/enterprisewechat/sink/EnterpriseWechatSinkTask.java new file mode 100644 index 00000000..efe5680d --- /dev/null +++ b/connectors/rocketmq-connect-enterprisewechat/src/main/java/org/apache/rocketmq/connect/enterprisewechat/sink/EnterpriseWechatSinkTask.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.enterprisewechat.sink; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.sink.SinkTask; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.errors.ConnectException; +import java.util.List; +import org.apache.rocketmq.connect.enterprisewechat.common.SinkConstants; +import org.apache.rocketmq.connect.enterprisewechat.util.OkHttpUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EnterpriseWechatSinkTask extends SinkTask { + private static final Logger log = LoggerFactory.getLogger(EnterpriseWechatSinkTask.class); + + private String webHook; + + @Override + public void put(List sinkRecords) throws ConnectException { + try { + sinkRecords.forEach(sinkRecord -> { + try { + String sync = OkHttpUtils.builder() + .url(webHook) + .addHeader(SinkConstants.CONTENT_TYPE, SinkConstants.APPLICATION_JSON_UTF_8_TYPE) + .postForStringBody(sinkRecord.getData()) + .sync(); + log.info("EnterpriseWechatSinkTask put sync : {}", sync); + } catch (Exception e) { + log.error("EnterpriseWechatSinkTask | put | addParam | error => ", e); + } + }); + } catch (Exception e) { + log.error("EnterpriseWechatSinkTask | put | error => ", e); + } + } + + @Override + public void start(KeyValue config) { + webHook = config.getString(SinkConstants.WEB_HOOK); + } + + @Override + public void stop() { + + } +} diff --git a/connectors/rocketmq-connect-enterprisewechat/src/main/java/org/apache/rocketmq/connect/enterprisewechat/util/OkHttpUtils.java b/connectors/rocketmq-connect-enterprisewechat/src/main/java/org/apache/rocketmq/connect/enterprisewechat/util/OkHttpUtils.java new file mode 100644 index 00000000..c80a78ce --- /dev/null +++ b/connectors/rocketmq-connect-enterprisewechat/src/main/java/org/apache/rocketmq/connect/enterprisewechat/util/OkHttpUtils.java @@ -0,0 +1,283 @@ +package org.apache.rocketmq.connect.enterprisewechat.util; + +import com.alibaba.fastjson.JSON; +import java.io.IOException; +import java.net.URLEncoder; +import java.security.SecureRandom; +import java.security.cert.X509Certificate; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.FormBody; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OkHttpUtils { + private static final Logger log = LoggerFactory.getLogger(OkHttpUtils.class); + + private static volatile OkHttpClient okHttpClient = null; + private static volatile Semaphore semaphore = null; + private Map headerMap; + private Map paramMap; + private String url; + private Request.Builder request; + + private OkHttpUtils() { + if (okHttpClient == null) { + synchronized (OkHttpUtils.class) { + if (okHttpClient == null) { + TrustManager[] trustManagers = buildTrustManagers(); + okHttpClient = new OkHttpClient.Builder() + .connectTimeout(15, TimeUnit.SECONDS) + .writeTimeout(20, TimeUnit.SECONDS) + .readTimeout(20, TimeUnit.SECONDS) + .sslSocketFactory(createSSLSocketFactory(trustManagers), (X509TrustManager) trustManagers[0]) + .hostnameVerifier((hostName, session) -> true) + .retryOnConnectionFailure(true) + .build(); + addHeader("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"); + } + } + } + } + + private static Semaphore getSemaphoreInstance() { + synchronized (OkHttpUtils.class) { + if (semaphore == null) { + semaphore = new Semaphore(0); + } + } + return semaphore; + } + + public static OkHttpUtils builder() { + return new OkHttpUtils(); + } + + public OkHttpUtils url(String url) { + this.url = url; + return this; + } + + /** + * 添加参数 + * + * @param key 参数名 + * @param value 参数值 + * @return + */ + public OkHttpUtils addParam(String key, String value) { + if (paramMap == null) { + paramMap = new LinkedHashMap<>(16); + } + paramMap.put(key, value); + return this; + } + + /** + * 添加请求头 + * + * @param key 参数名 + * @param value 参数值 + * @return + */ + public OkHttpUtils addHeader(String key, String value) { + if (headerMap == null) { + headerMap = new LinkedHashMap<>(16); + } + headerMap.put(key, value); + return this; + } + + public OkHttpUtils get() { + request = new Request.Builder().get(); + StringBuilder urlBuilder = new StringBuilder(url); + if (paramMap != null) { + urlBuilder.append("?"); + try { + for (Map.Entry entry : paramMap.entrySet()) { + urlBuilder.append(URLEncoder.encode(entry.getKey(), "utf-8")). + append("="). + append(URLEncoder.encode(entry.getValue(), "utf-8")). + append("&"); + } + } catch (Exception e) { + log.error("OkHttpUtils | get | error => ", e); + } + urlBuilder.deleteCharAt(urlBuilder.length() - 1); + } + request.url(urlBuilder.toString()); + return this; + } + + /** + * 初始化post方法 + * + * @param isJsonPost true等于json的方式提交数据,类似postman里post方法的raw + * false等于普通的表单提交 + * @return + */ + public OkHttpUtils post(boolean isJsonPost) { + RequestBody requestBody; + if (isJsonPost) { + String json = ""; + if (paramMap != null) { + json = JSON.toJSONString(paramMap); + } + requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), json); + } else { + FormBody.Builder formBody = new FormBody.Builder(); + if (paramMap != null) { + paramMap.forEach(formBody::add); + } + requestBody = formBody.build(); + } + request = new Request.Builder().post(requestBody).url(url); + return this; + } + + public OkHttpUtils postForStringBody(Object data) { + RequestBody requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), data.toString()); + request = new Request.Builder().post(requestBody).url(url); + return this; + } + + /** + * 同步请求 + * + * @return + */ + public String sync() { + setHeader(request); + try { + Response response = okHttpClient.newCall(request.build()).execute(); + assert response.body() != null; + return response.body().string(); + } catch (IOException e) { + log.error("OkHttpUtils | sync | error => ", e); + return "请求失败:" + e.getMessage(); + } + } + + /** + * 异步请求,有返回值 + */ + public String async() { + StringBuilder buffer = new StringBuilder(""); + setHeader(request); + okHttpClient.newCall(request.build()).enqueue(new Callback() { + @Override + public void onFailure(Call call, IOException e) { + buffer.append("请求出错:").append(e.getMessage()); + } + + @Override + public void onResponse(Call call, Response response) throws IOException { + assert response.body() != null; + buffer.append(response.body().string()); + getSemaphoreInstance().release(); + } + }); + try { + getSemaphoreInstance().acquire(); + } catch (InterruptedException e) { + log.error("OkHttpUtils | async | error => ", e); + } + return buffer.toString(); + } + + /** + * 异步请求,带有接口回调 + * + * @param callBack + */ + public void async(ICallBack callBack) { + setHeader(request); + okHttpClient.newCall(request.build()).enqueue(new Callback() { + @Override + public void onFailure(Call call, IOException e) { + callBack.onFailure(call, e.getMessage()); + } + + @Override + public void onResponse(Call call, Response response) throws IOException { + assert response.body() != null; + callBack.onSuccessful(call, response.body().string()); + } + }); + } + + /** + * 为request添加请求头 + * + * @param request + */ + private void setHeader(Request.Builder request) { + if (headerMap != null) { + try { + for (Map.Entry entry : headerMap.entrySet()) { + request.addHeader(entry.getKey(), entry.getValue()); + } + } catch (Exception e) { + log.error("OkHttpUtils | setHeader | error => ", e); + } + } + } + + /** + * 生成安全套接字工厂,用于https请求的证书跳过 + * + * @return + */ + private static SSLSocketFactory createSSLSocketFactory(TrustManager[] trustAllCerts) { + SSLSocketFactory ssfFactory = null; + try { + SSLContext sc = SSLContext.getInstance("SSL"); + sc.init(null, trustAllCerts, new SecureRandom()); + ssfFactory = sc.getSocketFactory(); + } catch (Exception e) { + log.error("OkHttpUtils | createSSLSocketFactory | error => ", e); + } + return ssfFactory; + } + + private static TrustManager[] buildTrustManagers() { + return new TrustManager[] { + new X509TrustManager() { + @Override + public void checkClientTrusted(X509Certificate[] chain, String authType) { + } + + @Override + public void checkServerTrusted(X509Certificate[] chain, String authType) { + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[] {}; + } + } + }; + } + + /** + * 自定义一个接口回调 + */ + public interface ICallBack { + void onSuccessful(Call call, String data); + + void onFailure(Call call, String errorMsg); + } +} diff --git a/connectors/rocketmq-connect-enterprisewechat/src/test/java/org/apache/rocketmq/connect/enterprisewechat/sink/EnterpriseWechatSinkConnectorTest.java b/connectors/rocketmq-connect-enterprisewechat/src/test/java/org/apache/rocketmq/connect/enterprisewechat/sink/EnterpriseWechatSinkConnectorTest.java new file mode 100644 index 00000000..511ab0e7 --- /dev/null +++ b/connectors/rocketmq-connect-enterprisewechat/src/test/java/org/apache/rocketmq/connect/enterprisewechat/sink/EnterpriseWechatSinkConnectorTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.enterprisewechat.sink; + +import io.openmessaging.KeyValue; +import io.openmessaging.internal.DefaultKeyValue; +import org.apache.rocketmq.connect.enterprisewechat.common.SinkConstants; +import org.junit.Test; + +public class EnterpriseWechatSinkConnectorTest { + private EnterpriseWechatSinkConnector connector = new EnterpriseWechatSinkConnector(); + + // Replace it with your own robot webhook. + private static final String webHook = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxx&debug=1"; + + @Test + public void testValidate() { + KeyValue keyValue = new DefaultKeyValue(); + // put webhook which need validation + keyValue.put(SinkConstants.WEB_HOOK, webHook); + connector.validate(keyValue); + } +} diff --git a/connectors/rocketmq-connect-enterprisewechat/src/test/java/org/apache/rocketmq/connect/enterprisewechat/sink/EnterpriseWechatSinkTaskTest.java b/connectors/rocketmq-connect-enterprisewechat/src/test/java/org/apache/rocketmq/connect/enterprisewechat/sink/EnterpriseWechatSinkTaskTest.java new file mode 100644 index 00000000..0f3b8b76 --- /dev/null +++ b/connectors/rocketmq-connect-enterprisewechat/src/test/java/org/apache/rocketmq/connect/enterprisewechat/sink/EnterpriseWechatSinkTaskTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.enterprisewechat.sink; + +import com.alibaba.fastjson.JSON; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.internal.DefaultKeyValue; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.Test; + +public class EnterpriseWechatSinkTaskTest { + // Replace it with your own robot webhook. + private static final String webHook = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxx&debug=1"; + + @Test + public void testPut() { + EnterpriseWechatSinkTask wechatSinkTask = new EnterpriseWechatSinkTask(); + KeyValue keyValue = new DefaultKeyValue(); + keyValue.put("webHook", webHook); + wechatSinkTask.start(keyValue); + + Map map = new HashMap<>(); + map.put("msgtype", "text"); + + Map map1 = new HashMap<>(); + map1.put("content", "hello world"); + map.put("text", map1); + + List connectRecordList = new ArrayList<>(); + ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis()); + connectRecord.setData(JSON.toJSONString(map)); + connectRecordList.add(connectRecord); + + wechatSinkTask.put(connectRecordList); + } +}