-
Notifications
You must be signed in to change notification settings - Fork 11
客户端使用指南
ShanceWang edited this page Apr 13, 2022
·
11 revisions
在启动客户端之前,需要保证:
- 服务端已经启动,参考 QuickStart
- zookeeper已启动,并获得地址 zk-ip:zk-port
执行git命令,获取ptubes代码
git clone [email protected]:meituan/ptubes.git
在IDE中找到类
com.meituan.ptubes.sdk.example.ExampleByMainFunction
替换ExampleByMainFunction
中zookeeper地址为真实地址zk-ip:zk-port,并按需修改reader地址,订阅列表等。示例如下:
public <T> T getConfig(String confName, Class<T> confClass) {
// ignore confName, when there is only one implement for subscript or consumer config in task.
try {
// ...
if (confClass.isAssignableFrom(PtubesSdkSubscriptionConfig.class)) {
return (T) new PtubesSdkSubscriptionConfig(
"demoR1", // reader taskName, eg. demoR1
taskName, // SDK task name
"127.0.0.1:2181", // change the zookeeper address to a real address.
"test_test.wm_risk_list_account,test_test.test_table" // tables for subscription
);
}
// ...
} catch (Exception e) {
System.out.println("getConfig error for confName:{" + confName + "}, confClass:{" + confClass + "}");
e.printStackTrace();
}
return null;
}
运行编译好的客户端
- 下载编译好的SDK客户端 ptubes-sdk-client.tar.gz
- 解压客户端压缩包
- 检查目录和配置
压缩包内的结构如下:
drwxr-xr-x 4 admin root 12:00 bin
drwxr-xr-x 5 admin root 12:00 conf
drwxr-xr-x 4 admin root 12:10 lib
bin目录存储启动和停止脚本, conf目录存储依赖的配置文件, lib目录存储所需要的程序包。
./conf/ptubes_demo_task.properties 是示例任务的配置文件,其中 ptubes.sdk.zookeeper.address=127.0.0.1:2181应该替换为已启动的真实zookeeper地址zk-ip:zk-port
sdk.conf
ptubes.sdk.task.set=ptubes_demo_task // 每个任务名都需要创建一个对应的配置文件, 以逗号分隔
ptubes_demo_task.properties
ptubes.sdk.reader.name=demoR1 // reader中对应的Task名字, 比如 demoR1
ptubes.sdk.task.name=ptubes_demo_task // SDK自身的任务名
ptubes.sdk.zookeeper.address=127.0.0.1:2181 // 对应的zk地址
ptubes.sdk.subs=test.test_table// 订阅的库表名, 以逗号分隔
ptubes.sdk.reader.ip=127.0.0.1:28332 // 对应的Reader地址, 以逗号分隔
- 执行
# 此命令会运行 ./conf/sdk.conf 文件中所列出的所有sdk任务
# 同时这些任务的配置也需要在./conf/${sdkTaskName}.properties 中存在
cd bin && sh start.sh
- 停止
# 此命令会停止本机上所有在运行中的SDK任务
sh stop.sh
新建maven工程并依赖ptubes项目
- 新建maven工程并添加依赖
<dependency>
<groupId>com.meituan</groupId>
<artifactId>ptubes-sdk</artifactId>
<version>1.0.0</version>
</dependency>
- 在工程 resource 目录下新增两个配置文件并调整参数 sdk.conf
ptubes.sdk.task.set=ptubes_demo_task
ptubes_demo_task.properties
# reader中对应的Task名字, 比如 demoR1
ptubes.sdk.reader.name=demoR1
#SDK自身的任务名
ptubes.sdk.task.name=ptubes_demo_task
# 对应的zk地址
ptubes.sdk.zookeeper.address=127.0.0.1:2181
# 订阅的库表名, 以逗号分隔
ptubes.sdk.subs=test.test_table
# 对应的Reader地址, 以逗号分隔
ptubes.sdk.reader.ip=127.0.0.1:28332
- 新增一个类TestMain
package com.example.buffalo;
import com.meituan.ptubes.common.utils.PbJsonUtil;
import com.meituan.ptubes.sdk.IPtubesConnector;
import com.meituan.ptubes.common.log.LoggerFactory;
import com.meituan.ptubes.sdk.IRdsCdcEventListener;
import com.meituan.ptubes.sdk.RdsCdcEventStatus;
import com.meituan.ptubes.sdk.config.notification.IConfigChangeNotifier;
import com.meituan.ptubes.sdk.RdsCdcConnectorFactory;
import com.meituan.ptubes.sdk.config.notification.SimpleLocalFileConfigChangeNotifier;
import com.meituan.ptubes.sdk.protocol.RdsPacket;
import java.util.List;
public class TestMain {
public static void main(String[] args) {
String taskName = "ptubes_demo_task";
// set log file directory
System.setProperty(LoggerFactory.DEFAULT_LOG_DIR_PROPERTY, TestMain.class.getResource("/").getPath() + "/logs");
// set log type
System.setProperty(LoggerFactory.DEFAULT_LOG_TYPE_PROPERTY, "log4j2");
System.out.println(System.getProperty(LoggerFactory.DEFAULT_LOG_DIR_PROPERTY));
IPtubesConnector rdsCdcConnector = null;
IConfigChangeNotifier iConfigChangeNotifier = new SimpleLocalFileConfigChangeNotifier(taskName);
try {
rdsCdcConnector = RdsCdcConnectorFactory.buildMySQLConnector(taskName, iConfigChangeNotifier, new IRdsCdcEventListener() {
@Override
public RdsCdcEventStatus onEvents(List<RdsPacket.RdsEvent> events) {
for (RdsPacket.RdsEvent event : events) {
System.out.println(event.getRowData()
.getBeforeColumnsMap()
.get("id"));
System.out.println(event.getRowData()
.getAfterColumnsMap()
.get("id"));
System.out.println(PbJsonUtil.printToStringDefaultNull(event));
}
return RdsCdcEventStatus.SUCCESS;
}
});
rdsCdcConnector.startup();
long eachSleepTime = 30000L;
long sleepTimeCount = 300000L;
do {
Thread.sleep(eachSleepTime);
sleepTimeCount -= eachSleepTime;
try {
if (null != rdsCdcConnector) {
/**
* hot to get a buffalo sdk task runtime info
*/
System.out.println(rdsCdcConnector.getConnectorMonitorInfo());
}
} catch (Exception e) {
System.out.println(e);
}
} while (sleepTimeCount > 0);
} catch (Exception e) {
e.printStackTrace();
} finally {
// rdsCdcConnector.shutdown();
}
}
}
- 执行TestMain.main方法。并观察控制台输出