diff --git a/dbsync/Makefile b/dbsync/Makefile index b947f95..ad648c1 100755 --- a/dbsync/Makefile +++ b/dbsync/Makefile @@ -1,10 +1,10 @@ #Change the following according to mysql installation path # Minimal requriement: Mysql >= 5.6, Pgsql >= 9.3 -mysql_install_dir=/usr +mysql_install_dir=/usr/local/Cellar/mysql/5.7.15 mysql_include_dir=$(mysql_install_dir)/include/mysql mysql_lib_dir=$(mysql_install_dir)/lib64/mysql -pgsql_install_dir=/usr/pgsql-9.6 +pgsql_install_dir=/usr/local/Cellar/postgresql/9.5.4_1 PGFILEDESC = "ali_recvlogical" NAME = ali_recvlogical diff --git a/dbsync/misc.c b/dbsync/misc.c index 9f85545..6613ac1 100644 --- a/dbsync/misc.c +++ b/dbsync/misc.c @@ -158,13 +158,21 @@ quote_literal_internal(char *dst, const char *src, size_t len) } int -start_copy_origin_tx(PGconn *conn, const char *snapshot, int pg_version) +start_copy_origin_tx(PGconn *conn, const char *snapshot, int pg_version, bool is_greenplum) { PGresult *res; - const char *setup_query = - "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ, READ ONLY;\n"; + const char *setup_query = NULL; StringInfoData query; + if (is_greenplum == false) + { + setup_query = "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ, READ ONLY;\n"; + } + else + { + setup_query = "BEGIN"; + } + initStringInfo(&query); appendStringInfoString(&query, setup_query); @@ -181,7 +189,7 @@ start_copy_origin_tx(PGconn *conn, const char *snapshot, int pg_version) PQclear(res); - setup_connection(conn, pg_version, false); + setup_connection(conn, pg_version, is_greenplum); return 0; } diff --git a/dbsync/misc.h b/dbsync/misc.h index 29270a8..c725231 100644 --- a/dbsync/misc.h +++ b/dbsync/misc.h @@ -61,7 +61,7 @@ extern int ThreadCreate(Thread *th, void *(*start)(void *arg), void *arg); extern PGconn *pglogical_connect(const char *connstring, const char *connname); extern bool is_greenplum(PGconn *conn); extern size_t quote_literal_internal(char *dst, const char *src, size_t len); -extern int start_copy_origin_tx(PGconn *conn, const char *snapshot, int pg_version); +extern int start_copy_origin_tx(PGconn *conn, const char *snapshot, int pg_version, bool is_greenplum); extern int finish_copy_origin_tx(PGconn *conn); extern int start_copy_target_tx(PGconn *conn, int pg_version, bool is_greenplum); extern int finish_copy_target_tx(PGconn *conn); diff --git a/dbsync/pgsync.c b/dbsync/pgsync.c index c382f40..011b806 100644 --- a/dbsync/pgsync.c +++ b/dbsync/pgsync.c @@ -47,7 +47,7 @@ static volatile bool time_to_abort = false; #define RECONNECT_SLEEP_TIME 5 -#define ALL_DB_TABLE_SQL "select n.nspname, c.relname from pg_class c, pg_namespace n where n.oid = c.relnamespace and c.relkind = 'r' and n.nspname not in ('pg_catalog','tiger','tiger_data','topology','postgis','information_schema') order by c.relpages desc;" +#define ALL_DB_TABLE_SQL "select n.nspname, c.relname from pg_class c, pg_namespace n where n.oid = c.relnamespace and c.relkind = 'r' and n.nspname not in ('pg_catalog','tiger','tiger_data','topology','postgis','information_schema','gp_toolkit','pg_aoseg','pg_toast') order by c.relpages desc;" #define GET_NAPSHOT "SELECT pg_export_snapshot()" #define TASK_ID "1" @@ -136,7 +136,7 @@ copy_table_data(void *arg) break; } - start_copy_origin_tx(origin_conn, hd->snapshot, hd->src_version); + start_copy_origin_tx(origin_conn, hd->snapshot, hd->src_version, hd->desc_is_greenplum); start_copy_target_tx(target_conn, hd->desc_version, hd->desc_is_greenplum); nspname = curr->schemaname; @@ -255,7 +255,6 @@ db_sync_main(char *src, char *desc, char *local, int nthread) after; double elapsed_msec = 0; Decoder_handler *hander = NULL; - int src_version = 0; struct Thread *decoder = NULL; bool replication_sync = false; bool need_full_sync = false; @@ -283,6 +282,8 @@ db_sync_main(char *src, char *desc, char *local, int nthread) fprintf(stderr, "conn to src faild: %s", PQerrorMessage(origin_conn_repl)); return 1; } + th_hd.src_version = PQserverVersion(origin_conn_repl); + th_hd.src_is_greenplum = is_greenplum(origin_conn_repl); desc_conn = pglogical_connect(desc, EXTENSION_NAME "_main"); if (desc_conn == NULL) @@ -298,43 +299,43 @@ db_sync_main(char *src, char *desc, char *local, int nthread) if (local_conn == NULL) { fprintf(stderr, "init local conn failed: %s", PQerrorMessage(local_conn)); - return 1; - } - ExecuteSqlStatement(local_conn, "CREATE TABLE IF NOT EXISTS sync_sqls(id bigserial, sql text)"); - ExecuteSqlStatement(local_conn, "CREATE TABLE IF NOT EXISTS db_sync_status(id bigserial primary key, full_s_start timestamp DEFAULT NULL, full_s_end timestamp DEFAULT NULL, decoder_start timestamp DEFAULT NULL, apply_id bigint DEFAULT NULL)"); - ExecuteSqlStatement(local_conn, "insert into db_sync_status (id) values (" TASK_ID ");"); - get_task_status(local_conn, &full_start, &full_end, &decoder_start, &apply_id); - - if (full_start && full_end == NULL) - { - fprintf(stderr, "full sync start %s, but not finish.truncate all data and restart dbsync\n", full_start); - return 1; - } - else if(full_start == NULL && full_end == NULL) - { need_full_sync = true; - fprintf(stderr, "new dbsync task"); } - else if(full_start && full_end) + else { - fprintf(stderr, "full sync start %s, end %s restart decoder sync\n", full_start, full_end); - need_full_sync = false; - } + ExecuteSqlStatement(local_conn, "CREATE TABLE IF NOT EXISTS sync_sqls(id bigserial, sql text)"); + ExecuteSqlStatement(local_conn, "CREATE TABLE IF NOT EXISTS db_sync_status(id bigserial primary key, full_s_start timestamp DEFAULT NULL, full_s_end timestamp DEFAULT NULL, decoder_start timestamp DEFAULT NULL, apply_id bigint DEFAULT NULL)"); + ExecuteSqlStatement(local_conn, "insert into db_sync_status (id) values (" TASK_ID ");"); + get_task_status(local_conn, &full_start, &full_end, &decoder_start, &apply_id); - if (decoder_start) - { - fprintf(stderr, "decoder sync start %s\n", decoder_start); - } - - if (apply_id) - { - fprintf(stderr, "decoder apply id %s\n", apply_id); + if (full_start && full_end == NULL) + { + fprintf(stderr, "full sync start %s, but not finish.truncate all data and restart dbsync\n", full_start); + return 1; + } + else if(full_start == NULL && full_end == NULL) + { + need_full_sync = true; + fprintf(stderr, "new dbsync task"); + } + else if(full_start && full_end) + { + fprintf(stderr, "full sync start %s, end %s restart decoder sync\n", full_start, full_end); + need_full_sync = false; + } + + if (decoder_start) + { + fprintf(stderr, "decoder sync start %s\n", decoder_start); + } + + if (apply_id) + { + fprintf(stderr, "decoder apply id %s\n", apply_id); + } } - src_version = PQserverVersion(origin_conn_repl); - is_greenplum(origin_conn_repl); - th_hd.src_version = src_version; - if (src_version >= 90400) + if (th_hd.src_is_greenplum == false && th_hd.src_version >= 90400) { replication_sync = true; if (!is_slot_exists(origin_conn_repl, EXTENSION_NAME "_slot")) @@ -369,10 +370,18 @@ db_sync_main(char *src, char *desc, char *local, int nthread) if (need_full_sync) { - const char *setup_query = - "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ, READ ONLY;\n"; - PQExpBuffer query; - + const char *setup_query = NULL; + PQExpBuffer query; + + if (th_hd.src_is_greenplum == false) + { + setup_query = "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ, READ ONLY;\n"; + } + else + { + setup_query = "BEGIN"; + } + query = createPQExpBuffer(); appendPQExpBuffer(query, "%s", setup_query); @@ -391,7 +400,7 @@ db_sync_main(char *src, char *desc, char *local, int nthread) if (snapshot == NULL) { - if (src_version >= 90200) + if (th_hd.src_version >= 90200) { snapshot = get_synchronized_snapshot(origin_conn_repl); th_hd.snapshot = snapshot; diff --git a/dbsync/pgsync.h b/dbsync/pgsync.h index 3592e2b..9ac54a3 100644 --- a/dbsync/pgsync.h +++ b/dbsync/pgsync.h @@ -89,6 +89,8 @@ typedef struct Thread_hd const char *snapshot; char *src; int src_version; + bool src_is_greenplum; + char *slot_name; mysql_conn_info *mysql_src;