Skip to content

客户端使用指南

myksl edited this page Apr 11, 2022 · 11 revisions

Prerequisite

在启动客户端之前,需要保证:

1.服务端已经启动,参考“QuickStart”

2.zookeeper已启动,并获得地址 zk-ip:zk-port

1.启动方式一

1.在IDE中直接运行, 需要和Reader在同一台机器启动 替换类

com.meituan.ptubes.sdk.example.ExampleByMainFunction

中zookeeper地址为真实地址zk-ip:zk-port

        public <T> T getConfig(String confName, Class<T> confClass) {
            // ignore confName, when there is only one implement for subscript or consumer config in task.
            try {
                // ... 
                if (confClass.isAssignableFrom(PtubesSdkSubscriptionConfig.class)) {
                    return (T) new PtubesSdkSubscriptionConfig(
                        "dbustest2-5",  // reader中对应的Task名字, 比如 demoR1
                        taskName, // SDK自身的任务名
                        "127.0.0.1:2181", // change the zookeeper address to a real address.
                        "test_test.wm_risk_list_account,test_test.test_table" // 需要订阅的库表名
                    );
                }
                // ... 

            } catch (Exception e) {
                System.out.println("getConfig error for confName:{" + confName + "}, confClass:{" + confClass + "}");
                e.printStackTrace();
            }
            return null;
        }

2.启动方式二

运行编译好的客户端

  1. 下载编译好的SDK客户端
  2. 解压客户端压缩包
  3. 检查目录和配置 压缩包内的结构如下:
drwxr-xr-x   4 admin  root  12:00 bin
drwxr-xr-x   5 admin  root  12:00 conf
drwxr-xr-x   4 admin  root  12:10 lib

bin目录存储启动和停止脚本, conf目录存储依赖的配置文件, lib目录存储所需要的程序包。 ./conf/ptubes_demo_task.properties 是示例任务的配置文件,其中 ptubes.sdk.zookeeper.address=127.0.0.1:2181应该替换为已启动的真实zookeeper地址zk-ip:zk-port sdk.conf

ptubes.sdk.task.set=ptubes_demo_task // 每个任务名都需要创建一个对应的配置文件, 以逗号分隔

ptubes_demo_task.properties

ptubes.sdk.reader.name=dbustest2-5 // reader中对应的Task名字, 比如 demoR1
ptubes.sdk.task.name=ptubes_demo_task // SDK自身的任务名
ptubes.sdk.zookeeper.address=127.0.0.1:2181 // 对应的zk地址
ptubes.sdk.subs=test.test_table// 订阅的库表名, 以逗号分隔
ptubes.sdk.reader.ip=127.0.0.1:28332 // 对应的Reader地址, 以逗号分隔

4.执行

# 此命令会运行 ./conf/sdk.conf 文件中所列出的所有sdk任务
# 同时这些任务的配置也需要在./conf/${sdkTaskName}.properties 中存在
cd bin && sh start.sh

5.停止

# 此命令会停止本机上所有在运行中的SDK任务
sh stop.sh

3.启动方式三

新建maven工程并依赖ptubes项目 1.新建maven工程并添加依赖 org.meituan.ptubes sdk 6.6.6-SNAPSHOT 2.在工程 resource 目录下新增两个配置文件并调整参数 sdk.conf

ptubes.sdk.task.set=ptubes_demo_task

ptubes_demo_task.properties

# reader中对应的Task名字, 比如 demoR1
ptubes.sdk.reader.name=dbustest2-5 

#SDK自身的任务名
ptubes.sdk.task.name=ptubes_demo_task

# 对应的zk地址
ptubes.sdk.zookeeper.address=127.0.0.1:2181

# 订阅的库表名, 以逗号分隔
ptubes.sdk.subs=test.test_table

# 对应的Reader地址, 以逗号分隔
ptubes.sdk.reader.ip=127.0.0.1:28332

3.新增一个类TestMain

package com.example.buffalo;

import com.meituan.ptubes.common.utils.PbJsonUtil;
import com.meituan.ptubes.sdk.IPtubesConnector;
import com.meituan.ptubes.common.log.LoggerFactory;
import com.meituan.ptubes.sdk.IRdsCdcEventListener;
import com.meituan.ptubes.sdk.RdsCdcEventStatus;
import com.meituan.ptubes.sdk.config.notification.IConfigChangeNotifier;
import com.meituan.ptubes.sdk.RdsCdcConnectorFactory;
import com.meituan.ptubes.sdk.config.notification.SimpleLocalFileConfigChangeNotifier;
import com.meituan.ptubes.sdk.protocol.RdsPacket;

import java.util.List;

public class TestMain {
    public static void main(String[] args) {
        String taskName = "ptubes_demo_task";
        // set log file directory
        System.setProperty(LoggerFactory.DEFAULT_LOG_DIR_PROPERTY, TestMain.class.getResource("/").getPath()  + "/logs");
        // set log type
        System.setProperty(LoggerFactory.DEFAULT_LOG_TYPE_PROPERTY, "log4j2");
        System.out.println(System.getProperty(LoggerFactory.DEFAULT_LOG_DIR_PROPERTY));
        IPtubesConnector rdsCdcConnector = null;
        IConfigChangeNotifier iConfigChangeNotifier = new SimpleLocalFileConfigChangeNotifier(taskName);

        try {
            rdsCdcConnector = RdsCdcConnectorFactory.buildMySQLConnector(taskName, iConfigChangeNotifier, new IRdsCdcEventListener() {

                @Override
                public RdsCdcEventStatus onEvents(List<RdsPacket.RdsEvent> events) {
                    for (RdsPacket.RdsEvent event : events) {
                        System.out.println(event.getRowData()
                                .getBeforeColumnsMap()
                                .get("id"));
                        System.out.println(event.getRowData()
                                .getAfterColumnsMap()
                                .get("id"));
                        System.out.println(PbJsonUtil.printToStringDefaultNull(event));
                    }

                    return RdsCdcEventStatus.SUCCESS;
                }
            });

            rdsCdcConnector.startup();
            long eachSleepTime = 30000L;
            long sleepTimeCount = 300000L;

            do {
                Thread.sleep(eachSleepTime);
                sleepTimeCount -= eachSleepTime;
                try {
                    if (null != rdsCdcConnector) {

                        /**
                         * hot to get a buffalo sdk task runtime info
                         */
                        System.out.println(rdsCdcConnector.getConnectorMonitorInfo());
                    }
                } catch (Exception e) {
                    System.out.println(e);
                }

            } while (sleepTimeCount > 0);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // rdsCdcConnector.shutdown();
        }
    }
}
  1. 执行TestMain.main方法。并观察控制台输出