Skip to content

Commit

Permalink
support gp->gp
Browse files Browse the repository at this point in the history
  • Loading branch information
wenjing.zwj committed Jan 16, 2017
1 parent 15c0e3b commit 48a902b
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 46 deletions.
4 changes: 2 additions & 2 deletions dbsync/Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#Change the following according to mysql installation path
# Minimal requriement: Mysql >= 5.6, Pgsql >= 9.3
mysql_install_dir=/usr
mysql_install_dir=/usr/local/Cellar/mysql/5.7.15
mysql_include_dir=$(mysql_install_dir)/include/mysql
mysql_lib_dir=$(mysql_install_dir)/lib64/mysql

pgsql_install_dir=/usr/pgsql-9.6
pgsql_install_dir=/usr/local/Cellar/postgresql/9.5.4_1

PGFILEDESC = "ali_recvlogical"
NAME = ali_recvlogical
Expand Down
16 changes: 12 additions & 4 deletions dbsync/misc.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,21 @@ quote_literal_internal(char *dst, const char *src, size_t len)
}

int
start_copy_origin_tx(PGconn *conn, const char *snapshot, int pg_version)
start_copy_origin_tx(PGconn *conn, const char *snapshot, int pg_version, bool is_greenplum)
{
PGresult *res;
const char *setup_query =
"BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ, READ ONLY;\n";
const char *setup_query = NULL;
StringInfoData query;

if (is_greenplum == false)
{
setup_query = "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ, READ ONLY;\n";
}
else
{
setup_query = "BEGIN";
}

initStringInfo(&query);
appendStringInfoString(&query, setup_query);

Expand All @@ -181,7 +189,7 @@ start_copy_origin_tx(PGconn *conn, const char *snapshot, int pg_version)

PQclear(res);

setup_connection(conn, pg_version, false);
setup_connection(conn, pg_version, is_greenplum);

return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion dbsync/misc.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ extern int ThreadCreate(Thread *th, void *(*start)(void *arg), void *arg);
extern PGconn *pglogical_connect(const char *connstring, const char *connname);
extern bool is_greenplum(PGconn *conn);
extern size_t quote_literal_internal(char *dst, const char *src, size_t len);
extern int start_copy_origin_tx(PGconn *conn, const char *snapshot, int pg_version);
extern int start_copy_origin_tx(PGconn *conn, const char *snapshot, int pg_version, bool is_greenplum);
extern int finish_copy_origin_tx(PGconn *conn);
extern int start_copy_target_tx(PGconn *conn, int pg_version, bool is_greenplum);
extern int finish_copy_target_tx(PGconn *conn);
Expand Down
87 changes: 48 additions & 39 deletions dbsync/pgsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ static volatile bool time_to_abort = false;

#define RECONNECT_SLEEP_TIME 5

#define ALL_DB_TABLE_SQL "select n.nspname, c.relname from pg_class c, pg_namespace n where n.oid = c.relnamespace and c.relkind = 'r' and n.nspname not in ('pg_catalog','tiger','tiger_data','topology','postgis','information_schema') order by c.relpages desc;"
#define ALL_DB_TABLE_SQL "select n.nspname, c.relname from pg_class c, pg_namespace n where n.oid = c.relnamespace and c.relkind = 'r' and n.nspname not in ('pg_catalog','tiger','tiger_data','topology','postgis','information_schema','gp_toolkit','pg_aoseg','pg_toast') order by c.relpages desc;"
#define GET_NAPSHOT "SELECT pg_export_snapshot()"

#define TASK_ID "1"
Expand Down Expand Up @@ -136,7 +136,7 @@ copy_table_data(void *arg)
break;
}

start_copy_origin_tx(origin_conn, hd->snapshot, hd->src_version);
start_copy_origin_tx(origin_conn, hd->snapshot, hd->src_version, hd->desc_is_greenplum);
start_copy_target_tx(target_conn, hd->desc_version, hd->desc_is_greenplum);

nspname = curr->schemaname;
Expand Down Expand Up @@ -255,7 +255,6 @@ db_sync_main(char *src, char *desc, char *local, int nthread)
after;
double elapsed_msec = 0;
Decoder_handler *hander = NULL;
int src_version = 0;
struct Thread *decoder = NULL;
bool replication_sync = false;
bool need_full_sync = false;
Expand Down Expand Up @@ -283,6 +282,8 @@ db_sync_main(char *src, char *desc, char *local, int nthread)
fprintf(stderr, "conn to src faild: %s", PQerrorMessage(origin_conn_repl));
return 1;
}
th_hd.src_version = PQserverVersion(origin_conn_repl);
th_hd.src_is_greenplum = is_greenplum(origin_conn_repl);

desc_conn = pglogical_connect(desc, EXTENSION_NAME "_main");
if (desc_conn == NULL)
Expand All @@ -298,43 +299,43 @@ db_sync_main(char *src, char *desc, char *local, int nthread)
if (local_conn == NULL)
{
fprintf(stderr, "init local conn failed: %s", PQerrorMessage(local_conn));
return 1;
}
ExecuteSqlStatement(local_conn, "CREATE TABLE IF NOT EXISTS sync_sqls(id bigserial, sql text)");
ExecuteSqlStatement(local_conn, "CREATE TABLE IF NOT EXISTS db_sync_status(id bigserial primary key, full_s_start timestamp DEFAULT NULL, full_s_end timestamp DEFAULT NULL, decoder_start timestamp DEFAULT NULL, apply_id bigint DEFAULT NULL)");
ExecuteSqlStatement(local_conn, "insert into db_sync_status (id) values (" TASK_ID ");");
get_task_status(local_conn, &full_start, &full_end, &decoder_start, &apply_id);

if (full_start && full_end == NULL)
{
fprintf(stderr, "full sync start %s, but not finish.truncate all data and restart dbsync\n", full_start);
return 1;
}
else if(full_start == NULL && full_end == NULL)
{
need_full_sync = true;
fprintf(stderr, "new dbsync task");
}
else if(full_start && full_end)
else
{
fprintf(stderr, "full sync start %s, end %s restart decoder sync\n", full_start, full_end);
need_full_sync = false;
}
ExecuteSqlStatement(local_conn, "CREATE TABLE IF NOT EXISTS sync_sqls(id bigserial, sql text)");
ExecuteSqlStatement(local_conn, "CREATE TABLE IF NOT EXISTS db_sync_status(id bigserial primary key, full_s_start timestamp DEFAULT NULL, full_s_end timestamp DEFAULT NULL, decoder_start timestamp DEFAULT NULL, apply_id bigint DEFAULT NULL)");
ExecuteSqlStatement(local_conn, "insert into db_sync_status (id) values (" TASK_ID ");");
get_task_status(local_conn, &full_start, &full_end, &decoder_start, &apply_id);

if (decoder_start)
{
fprintf(stderr, "decoder sync start %s\n", decoder_start);
}

if (apply_id)
{
fprintf(stderr, "decoder apply id %s\n", apply_id);
if (full_start && full_end == NULL)
{
fprintf(stderr, "full sync start %s, but not finish.truncate all data and restart dbsync\n", full_start);
return 1;
}
else if(full_start == NULL && full_end == NULL)
{
need_full_sync = true;
fprintf(stderr, "new dbsync task");
}
else if(full_start && full_end)
{
fprintf(stderr, "full sync start %s, end %s restart decoder sync\n", full_start, full_end);
need_full_sync = false;
}

if (decoder_start)
{
fprintf(stderr, "decoder sync start %s\n", decoder_start);
}

if (apply_id)
{
fprintf(stderr, "decoder apply id %s\n", apply_id);
}
}

src_version = PQserverVersion(origin_conn_repl);
is_greenplum(origin_conn_repl);
th_hd.src_version = src_version;
if (src_version >= 90400)
if (th_hd.src_is_greenplum == false && th_hd.src_version >= 90400)
{
replication_sync = true;
if (!is_slot_exists(origin_conn_repl, EXTENSION_NAME "_slot"))
Expand Down Expand Up @@ -369,10 +370,18 @@ db_sync_main(char *src, char *desc, char *local, int nthread)

if (need_full_sync)
{
const char *setup_query =
"BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ, READ ONLY;\n";
PQExpBuffer query;

const char *setup_query = NULL;
PQExpBuffer query;

if (th_hd.src_is_greenplum == false)
{
setup_query = "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ, READ ONLY;\n";
}
else
{
setup_query = "BEGIN";
}

query = createPQExpBuffer();
appendPQExpBuffer(query, "%s", setup_query);

Expand All @@ -391,7 +400,7 @@ db_sync_main(char *src, char *desc, char *local, int nthread)

if (snapshot == NULL)
{
if (src_version >= 90200)
if (th_hd.src_version >= 90200)
{
snapshot = get_synchronized_snapshot(origin_conn_repl);
th_hd.snapshot = snapshot;
Expand Down
2 changes: 2 additions & 0 deletions dbsync/pgsync.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ typedef struct Thread_hd
const char *snapshot;
char *src;
int src_version;
bool src_is_greenplum;

char *slot_name;

mysql_conn_info *mysql_src;
Expand Down

0 comments on commit 48a902b

Please sign in to comment.