diff --git a/bdb/rep.c b/bdb/rep.c index c4ef088280..949bb4cbc1 100644 --- a/bdb/rep.c +++ b/bdb/rep.c @@ -4191,13 +4191,23 @@ static int process_berkdb(bdb_state_type *bdb_state, char *host, DBT *control, D } int gbl_force_incoherent = 0; +int gbl_force_incoherent_master = 0; int gbl_ignore_coherency = 0; static int bdb_am_i_coherent_int(bdb_state_type *bdb_state) { - /*master can't be incoherent*/ - if (bdb_amimaster(bdb_state)) + if (bdb_amimaster(bdb_state)) { + if (gbl_force_incoherent_master) { + static time_t lastpr = 0; + time_t now = time(NULL); + if (now - lastpr) { + logmsg(LOGMSG_WARN, "%s returning INCOHERENT on force_incoherent_master\n", __func__); + lastpr = now; + } + return 0; + } return 1; + } /* force_incoherent overrides ignore_coherency */ if (gbl_force_incoherent) { diff --git a/berkdb/rep/rep_record.c b/berkdb/rep/rep_record.c index b55b637b10..2ef289ad5f 100644 --- a/berkdb/rep/rep_record.c +++ b/berkdb/rep/rep_record.c @@ -2987,6 +2987,7 @@ static inline void repdb_dequeue(DBT *control_dbt, DBT *rec_dbt) } __thread int disable_random_deadlocks = 0; +__thread int physrep_out_of_order = 0; /* * __rep_apply -- @@ -3125,23 +3126,26 @@ __rep_apply_int(dbenv, rp, rec, ret_lsnp, commit_gen, decoupled) * That said, I really don't want to do db operations holding the * log mutex, so the synchronization here is tricky. */ - /* TODO: return a message telling the physical replicant to go - * into matching mode */ - if (gbl_is_physical_replicant && cmp != 0) + if (gbl_is_physical_replicant) { - static uint32_t count=0; - count++; - if (gbl_physrep_debug == 1) { - logmsg(LOGMSG_USER, "%s out-of-order lsn [%d][%d] instead of [%d][%d], count %u\n", - __func__, rp->lsn.file, rp->lsn.offset, lp->ready_lsn.file, - lp->ready_lsn.offset, count); - } - /* A master node in a physical replication cluster would not - * have the ability to 'ask' for missing log records. - */ - if (F_ISSET(rep, REP_F_MASTER)) { - goto done; - } + if(cmp != 0) { + static uint32_t count=0; + count++; + physrep_out_of_order = 1; + if (gbl_physrep_debug == 1) { + logmsg(LOGMSG_USER, "%s out-of-order lsn [%d][%d] instead of [%d][%d], count %u\n", + __func__, rp->lsn.file, rp->lsn.offset, lp->ready_lsn.file, + lp->ready_lsn.offset, count); + } + /* A master node in a physical replication cluster would not + * have the ability to 'ask' for missing log records. + */ + if (F_ISSET(rep, REP_F_MASTER)) { + goto done; + } + } else { + physrep_out_of_order = 0; + } } if (cmp == 0) { diff --git a/db/comdb2.c b/db/comdb2.c index 4833f64b7e..43340a9516 100644 --- a/db/comdb2.c +++ b/db/comdb2.c @@ -1713,7 +1713,8 @@ void clean_exit(void) bdb_exiting(thedb->static_table.handle); stop_threads(thedb); - physrep_cleanup(); + if (!gbl_exit) + physrep_cleanup(); flush_db(); if (gbl_backend_opened) llmeta_dump_mapping(thedb); diff --git a/db/db_tunables.c b/db/db_tunables.c index fab86b002f..f31e4d8304 100644 --- a/db/db_tunables.c +++ b/db/db_tunables.c @@ -230,6 +230,7 @@ extern int gbl_last_locked_seqnum; extern int gbl_set_coherent_state_trace; extern int gbl_incoherent_slow_inactive_timeout; extern int gbl_force_incoherent; +extern int gbl_force_incoherent_master; extern int gbl_ignore_coherency; extern int gbl_skip_catchup_logic; extern int gbl_debug_downgrade_cluster_at_open; @@ -453,6 +454,10 @@ extern int gbl_fdb_io_error_retries_phase_2_poll; extern int gbl_fdb_auth_enabled; extern int gbl_debug_invalid_genid; +/* Tranlog */ +extern int gbl_tranlog_incoherent_timeout; +extern int gbl_tranlog_maxpoll; + /* Physical replication */ extern int gbl_blocking_physrep; extern int gbl_physrep_check_minlog_freq_sec; @@ -461,6 +466,8 @@ extern int gbl_physrep_exit_on_invalid_logstream; extern int gbl_physrep_fanout; extern int gbl_physrep_hung_replicant_check_freq_sec; extern int gbl_physrep_hung_replicant_threshold; +extern int gbl_physrep_revconn_check_interval; +extern int gbl_physrep_update_registry_interval; extern int gbl_physrep_i_am_metadb; extern int gbl_physrep_keepalive_freq_sec; extern int gbl_physrep_max_candidates; @@ -470,11 +477,18 @@ extern int gbl_physrep_register_interval; extern int gbl_physrep_shuffle_host_list; extern int gbl_physrep_ignore_queues; +/* source-name / host is from lrl */ extern char *gbl_physrep_source_dbname; extern char *gbl_physrep_source_host; + +/* meta-name / host is from lrl */ extern char *gbl_physrep_metadb_name; extern char *gbl_physrep_metadb_host; +/* repl-name / host is the active connection */ +extern char *gbl_physrep_repl_name; +extern char *gbl_physrep_repl_host; + /* Reversql connection/sql */ extern int gbl_revsql_allow_command_exec; extern int gbl_revsql_debug; diff --git a/db/db_tunables.h b/db/db_tunables.h index aa24e9324a..4851f43bf3 100644 --- a/db/db_tunables.h +++ b/db/db_tunables.h @@ -1766,13 +1766,14 @@ REGISTER_TUNABLE("blocking_physrep", "Physical replicant blocks on select. (Default: false)", TUNABLE_BOOLEAN, &gbl_blocking_physrep, 0, NULL, NULL, NULL, NULL); -REGISTER_TUNABLE("physrep_check_minlog_freq_sec", - "Check the minimum log number to keep this often. (Default: 10)", - TUNABLE_INTEGER, &gbl_physrep_check_minlog_freq_sec, 0, NULL, - NULL, NULL, NULL); -REGISTER_TUNABLE("physrep_debug", - "Print extended physrep trace. (Default: off)", - TUNABLE_BOOLEAN, &gbl_physrep_debug, 0, NULL, NULL, NULL, NULL); +REGISTER_TUNABLE("tranlog_incoherent_timeout", "Timeout in seconds for incoherent tranlog. (Default: 10)", + TUNABLE_INTEGER, &gbl_tranlog_incoherent_timeout, 0, NULL, NULL, NULL, NULL); +REGISTER_TUNABLE("tranlog_maxpoll", "Tranlog timeout in seconds for blocking poll. (Default: 60)", TUNABLE_INTEGER, + &gbl_tranlog_maxpoll, 0, NULL, NULL, NULL, NULL); +REGISTER_TUNABLE("physrep_check_minlog_freq_sec", "Check the minimum log number to keep this often. (Default: 10)", + TUNABLE_INTEGER, &gbl_physrep_check_minlog_freq_sec, 0, NULL, NULL, NULL, NULL); +REGISTER_TUNABLE("physrep_debug", "Print extended physrep trace. (Default: off)", TUNABLE_BOOLEAN, &gbl_physrep_debug, + 0, NULL, NULL, NULL, NULL); REGISTER_TUNABLE("physrep_exit_on_invalid_logstream", "Exit physreps on invalid logstream. (Default: off)", TUNABLE_BOOLEAN, &gbl_physrep_exit_on_invalid_logstream, 0, NULL, NULL, NULL, NULL); REGISTER_TUNABLE("physrep_fanout", @@ -1786,6 +1787,10 @@ REGISTER_TUNABLE("physrep_hung_replicant_threshold", "Report if the physical replicant has been inactive for this duration. (Default: 60)", TUNABLE_INTEGER, &gbl_physrep_hung_replicant_threshold, 0, NULL, NULL, NULL, NULL); +REGISTER_TUNABLE("physrep_update_registry_interval", "Physrep update-registry interval. (Default: 60)", TUNABLE_INTEGER, + &gbl_physrep_update_registry_interval, 0, NULL, NULL, NULL, NULL); +REGISTER_TUNABLE("physrep_revconn_check_interval", "Physrep recheck revconn interval. (Default: 60)", TUNABLE_INTEGER, + &gbl_physrep_revconn_check_interval, 0, NULL, NULL, NULL, NULL); REGISTER_TUNABLE("physrep_i_am_metadb", "I am physical replication metadb (Default: off)", TUNABLE_BOOLEAN, &gbl_physrep_i_am_metadb, NOARG, NULL, NULL, NULL, NULL); REGISTER_TUNABLE("physrep_keepalive_freq_sec", @@ -1812,21 +1817,20 @@ REGISTER_TUNABLE("physrep_reconnect_penalty", "Physrep wait seconds before retry to the same node. (Default: 5)", TUNABLE_INTEGER, &gbl_physrep_reconnect_penalty, 0, NULL, NULL, NULL, NULL); -REGISTER_TUNABLE("physrep_register_interval", - "Interval for physical replicant re-registration. (Default: 3600)", - TUNABLE_INTEGER, &gbl_physrep_register_interval, 0, NULL, NULL, - NULL, NULL); +REGISTER_TUNABLE("physrep_register_interval", "Interval for physical replicant re-registration. (Default: 600)", + TUNABLE_INTEGER, &gbl_physrep_register_interval, 0, NULL, NULL, NULL, NULL); REGISTER_TUNABLE("physrep_shuffle_host_list", "Shuffle the host list returned by register_replicant() " "before connecting to the hosts. (Default: OFF)", - TUNABLE_BOOLEAN, &gbl_physrep_shuffle_host_list, 0, NULL, NULL, - NULL, NULL); -REGISTER_TUNABLE("physrep_source_dbname", "Physical replication source cluster dbname.", - TUNABLE_STRING, &gbl_physrep_source_dbname, READONLY, NULL, NULL, NULL, - NULL); -REGISTER_TUNABLE("physrep_source_host", "List of physical replication source cluster hosts.", - TUNABLE_STRING, &gbl_physrep_source_host, READONLY, NULL, NULL, NULL, - NULL); + TUNABLE_BOOLEAN, &gbl_physrep_shuffle_host_list, 0, NULL, NULL, NULL, NULL); +REGISTER_TUNABLE("physrep_repl_name", "Current physrep parent.", TUNABLE_STRING, &gbl_physrep_repl_name, READONLY, NULL, + NULL, NULL, NULL); +REGISTER_TUNABLE("physrep_repl_host", "Current physrep host.", TUNABLE_STRING, &gbl_physrep_repl_host, READONLY, NULL, + NULL, NULL, NULL); +REGISTER_TUNABLE("physrep_source_dbname", "Physical replication source cluster dbname.", TUNABLE_STRING, + &gbl_physrep_source_dbname, READONLY, NULL, NULL, NULL, NULL); +REGISTER_TUNABLE("physrep_source_host", "List of physical replication source cluster hosts.", TUNABLE_STRING, + &gbl_physrep_source_host, READONLY, NULL, NULL, NULL, NULL); REGISTER_TUNABLE("physrep_ignore_queues", "Don't replicate queues.", TUNABLE_BOOLEAN, &gbl_physrep_ignore_queues, READONLY, NULL, NULL, NULL, NULL); @@ -1835,8 +1839,7 @@ REGISTER_TUNABLE("revsql_allow_command_execution", "Allow processing and execution of command over the 'reverse connection' " "that has come in as part of the request. This is mostly intended for " "testing. (Default: off)", - TUNABLE_BOOLEAN, &gbl_revsql_allow_command_exec, EXPERIMENTAL | INTERNAL, - NULL, NULL, NULL, NULL); + TUNABLE_BOOLEAN, &gbl_revsql_allow_command_exec, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL); REGISTER_TUNABLE("revsql_cdb2_debug", "Print extended reversql-sql cdb2 related trace. (Default: off)", TUNABLE_BOOLEAN, &gbl_revsql_cdb2_debug, EXPERIMENTAL | INTERNAL, @@ -1887,25 +1890,23 @@ REGISTER_TUNABLE("force_incoherent", TUNABLE_BOOLEAN, &gbl_force_incoherent, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL); -REGISTER_TUNABLE("ignore_coherency", - "Force this node to be coherent. (Default: off)", - TUNABLE_BOOLEAN, &gbl_ignore_coherency, - EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL); +REGISTER_TUNABLE("force_incoherent_master", "Force master node to be incoherent. (Default: off)", TUNABLE_BOOLEAN, + &gbl_force_incoherent_master, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL); + +REGISTER_TUNABLE("ignore_coherency", "Force this node to be coherent. (Default: off)", TUNABLE_BOOLEAN, + &gbl_ignore_coherency, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL); REGISTER_TUNABLE("forbid_incoherent_writes", "Prevent writes against a node which was incoherent at " "transaction start. (Default: off)", - TUNABLE_BOOLEAN, &gbl_forbid_incoherent_writes, - EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL); + TUNABLE_BOOLEAN, &gbl_forbid_incoherent_writes, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL); REGISTER_TUNABLE("debug_downgrade_cluster_at_open", "Sleep on open to allow testsuite to downgrade master. (Default: off)", TUNABLE_BOOLEAN, &gbl_debug_downgrade_cluster_at_open, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL); -REGISTER_TUNABLE("skip_catchup_logic", - "Skip initial catchup logic. (Default: off)", TUNABLE_BOOLEAN, - &gbl_skip_catchup_logic, EXPERIMENTAL | INTERNAL, NULL, NULL, - NULL, NULL); +REGISTER_TUNABLE("skip_catchup_logic", "Skip initial catchup logic. (Default: off)", TUNABLE_BOOLEAN, + &gbl_skip_catchup_logic, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL); REGISTER_TUNABLE("sample_queries", "Sample queries and query plans to table comdb2_sample_queries. (Default: on)", TUNABLE_BOOLEAN, &gbl_sample_queries, 0, NULL, NULL, NULL, NULL); diff --git a/db/phys_rep.c b/db/phys_rep.c index d653c27f22..6b51f7956e 100644 --- a/db/phys_rep.c +++ b/db/phys_rep.c @@ -68,6 +68,8 @@ int gbl_physrep_keepalive_freq_sec = 10; int gbl_physrep_check_minlog_freq_sec = 10; int gbl_physrep_hung_replicant_check_freq_sec = 10; int gbl_physrep_hung_replicant_threshold = 60; +int gbl_physrep_revconn_check_interval = 60; +int gbl_physrep_update_registry_interval = 60; int gbl_physrep_shuffle_host_list = 0; int gbl_physrep_i_am_metadb = 0; int gbl_started_physrep_threads = 0; @@ -80,6 +82,9 @@ char *gbl_physrep_source_host; char *gbl_physrep_metadb_name; char *gbl_physrep_metadb_host; +char *gbl_physrep_repl_name = NULL; +char *gbl_physrep_repl_host = NULL; + static int repl_db_connected = 0; static pthread_t physrep_worker_thread; @@ -99,6 +104,7 @@ reverse_conn_handle_tp *rev_conn_hndl = NULL; static int last_register; static int add_replicant_host(char *hostname, char *dbname); +static void dump_replicant_hosts(void); static void delete_replicant_host(DB_Connection *cnct); extern struct dbenv *thedb; @@ -179,13 +185,27 @@ void cleanup_hosts() repl_dbs_sz = 0; } +static void set_repl_db_connected(char *dbname, char *host) +{ + gbl_physrep_repl_name = dbname; + gbl_physrep_repl_host = host; + repl_db_connected = 1; +} + +static void set_repl_db_disconnected() +{ + repl_db_connected = 0; + gbl_physrep_repl_name = NULL; + gbl_physrep_repl_host = NULL; +} + static void close_repl_connection(DB_Connection *cnct, cdb2_hndl_tp *repl_db, const char *func, int line) { cnct->last_failed = time(NULL); cnct->is_up = 0; + set_repl_db_disconnected(); cdb2_close(repl_db); - repl_db_connected = 0; repl_db = NULL; if (rev_conn_hndl) { // Set the 'done' flag to signal 'reversesql' plugin to perform the @@ -314,53 +334,6 @@ static int append_quoted_source_hosts(char *buf, int buf_len, int *rc) { return -1; } -static int update_registry(cdb2_hndl_tp *repl_metadb, - const char * remote_dbname, - const char * remote_host) { - const size_t nodes_list_sz = REPMAX * (255+1) + 3; - char cmd[120+nodes_list_sz]; - int bytes_written = 0; - int rc; - - char *buf = cmd; - size_t buf_len = sizeof(cmd); - - bytes_written += snprintf(buf+bytes_written, buf_len-bytes_written, - "exec procedure sys.physrep.update_registry" - "('%s', '%s', '%s', '%s', \"", - gbl_dbname, gbl_myhostname, - (remote_dbname) ? remote_dbname : "NULL", - (remote_host) ? remote_host : "NULL"); - if (bytes_written >= buf_len) { - physrep_logmsg(LOGMSG_ERROR, "%s:%d Buffer is not long enough!\n", __func__, __LINE__); - return 1; - } - - bytes_written += append_quoted_local_hosts(buf+bytes_written, buf_len-bytes_written, " "); - if (bytes_written >= buf_len) { - physrep_logmsg(LOGMSG_ERROR, "%s:%d Buffer is not long enough!\n", __func__, __LINE__); - return 1; - } - - bytes_written += snprintf(buf+bytes_written, buf_len-bytes_written, "\")"); - if (bytes_written >= buf_len) { - physrep_logmsg(LOGMSG_ERROR, "%s:%d Buffer is not long enough!\n", __func__, __LINE__); - return 1; - } - - if (gbl_physrep_debug) { - physrep_logmsg(LOGMSG_USER, "%s:%d Executing: %s\n", __func__, __LINE__, cmd); - } - - if ((rc = cdb2_run_statement(repl_metadb, cmd)) == CDB2_OK) { - while ((rc = cdb2_next_record(repl_metadb)) == CDB2_OK); - } else { - physrep_logmsg(LOGMSG_ERROR, "%s:%d Failed to execute (rc: %d)\n", __func__, __LINE__, rc); - return 1; - } - return 0; -} - static int get_local_hndl(cdb2_hndl_tp **hndl) { int rc = cdb2_open(hndl, gbl_dbname, "local", 0); if (rc != 0) { @@ -468,7 +441,72 @@ int physrep_get_metadb_or_local_hndl(cdb2_hndl_tp **hndl) { ? get_metadb_hndl(hndl) : get_local_hndl(hndl); } -static int send_reset_nodes(const char *state) { +static int update_registry(cdb2_hndl_tp *repl_metadb, const char *remote_dbname, const char *remote_host) +{ + const size_t nodes_list_sz = REPMAX * (255 + 1) + 3; + char cmd[120 + nodes_list_sz]; + int bytes_written = 0; + int rc; + + char *buf = cmd; + size_t buf_len = sizeof(cmd); + + bytes_written += snprintf(buf + bytes_written, buf_len - bytes_written, + "exec procedure sys.physrep.update_registry" + "('%s', '%s', '%s', '%s', \"", + gbl_dbname, gbl_myhostname, (remote_dbname) ? remote_dbname : "NULL", + (remote_host) ? remote_host : "NULL"); + if (bytes_written >= buf_len) { + physrep_logmsg(LOGMSG_ERROR, "%s:%d Buffer is not long enough!\n", __func__, __LINE__); + return 1; + } + + bytes_written += append_quoted_local_hosts(buf + bytes_written, buf_len - bytes_written, " "); + if (bytes_written >= buf_len) { + physrep_logmsg(LOGMSG_ERROR, "%s:%d Buffer is not long enough!\n", __func__, __LINE__); + return 1; + } + + bytes_written += snprintf(buf + bytes_written, buf_len - bytes_written, "\")"); + if (bytes_written >= buf_len) { + physrep_logmsg(LOGMSG_ERROR, "%s:%d Buffer is not long enough!\n", __func__, __LINE__); + return 1; + } + + if (gbl_physrep_debug) { + physrep_logmsg(LOGMSG_USER, "%s:%d Executing: %s\n", __func__, __LINE__, cmd); + } + + if ((rc = cdb2_run_statement(repl_metadb, cmd)) == CDB2_OK) { + while ((rc = cdb2_next_record(repl_metadb)) == CDB2_OK) + ; + } else { + physrep_logmsg(LOGMSG_ERROR, "%s:%d Failed to execute (rc: %d)\n", __func__, __LINE__, rc); + return 1; + } + return 0; +} + +static int update_registry_periodic(const char *remote_dbname, const char *remote_host) +{ + cdb2_hndl_tp *repl_metadb = NULL; + int rc; + + if (gbl_physrep_debug) + logmsg(LOGMSG_USER, "%s: updating-registry %s/%s\n", __func__, remote_dbname, remote_host); + + if ((rc = physrep_get_metadb_or_local_hndl(&repl_metadb)) != 0) { + return rc; + } + + rc = update_registry(repl_metadb, remote_dbname, remote_host); + + cdb2_close(repl_metadb); + return rc; +} + +static int send_reset_nodes(const char *state) +{ const size_t nodes_list_sz = REPMAX * (255+1) + 3; char cmd[120+nodes_list_sz]; int bytes_written = 0; @@ -511,7 +549,8 @@ static int send_reset_nodes(const char *state) { } if ((rc = cdb2_run_statement(repl_metadb, cmd)) == CDB2_OK) { - while ((rc = cdb2_next_record(repl_metadb)) == CDB2_OK); + while ((rc = cdb2_next_record(repl_metadb)) == CDB2_OK) + ; if (rc == CDB2_OK_DONE) rc = 0; } else { @@ -523,14 +562,23 @@ static int send_reset_nodes(const char *state) { } char *physrep_master_cached = NULL; +int gbl_physrep_force_registration = 0; -int force_registration() { - if (!physrep_master_cached || ((strcmp(gbl_myhostname, physrep_master_cached)) != 0)) { - free(physrep_master_cached); - physrep_master_cached = strdup(gbl_myhostname); - return 1; - } - return 0; +int force_registration() +{ + int rtn = 0; + if (gbl_physrep_force_registration) { + logmsg(LOGMSG_USER, "%s: forcing registration on flag\n", __func__); + rtn = 1; + gbl_physrep_force_registration = 0; + } + + if (!physrep_master_cached || ((strcmp(gbl_myhostname, physrep_master_cached)) != 0)) { + free(physrep_master_cached); + physrep_master_cached = strdup(gbl_myhostname); + rtn = 1; + } + return rtn; } time_t gbl_physrep_last_applied_time; @@ -626,8 +674,8 @@ static LOG_INFO handle_record(cdb2_hndl_tp *repl_db, LOG_INFO prev_info) } if (rc != 0) { - physrep_logmsg(LOGMSG_FATAL, "%s:%d: Something went wrong with applying the logs (rc: %d)\n", - __func__, __LINE__, rc); + physrep_logmsg(LOGMSG_FATAL, "%s:%d: Something went wrong with applying the logs (rc: %d)\n", __func__, + __LINE__, rc); exit(1); } @@ -646,12 +694,19 @@ static int register_self(cdb2_hndl_tp *repl_metadb) int bytes_written = 0; int rc; + if (gbl_physrep_debug) { + physrep_logmsg(LOGMSG_USER, "%s:%d Registering self\n", __func__, __LINE__); + } + // Reset all the nodes from this physical replication cluster; and mark them // 'Inactive'. // // This is required to ensure that the metadb does not return one the nodes // of this cluster as a potential source when one of the nodes tries to // re-register as a physical replicant. + if (gbl_physrep_debug) { + physrep_logmsg(LOGMSG_USER, "%s:%d Send-reset nodes\n", __func__, __LINE__); + } rc = send_reset_nodes("Inactive"); if (rc != 0) { physrep_logmsg(LOGMSG_ERROR, "%s:%d Failed to reset info in replication metadb tables (rc: %d)\n", @@ -708,6 +763,9 @@ static int register_self(cdb2_hndl_tp *repl_metadb) last_register = time(NULL); if (candidate_leaders_count > 0) { + if (gbl_physrep_debug) { + dump_replicant_hosts(); + } return 0; } if (gbl_physrep_debug) @@ -735,7 +793,7 @@ static int seedsort(const void *arg1, const void *arg2) } static DB_Connection *find_new_repl_db(cdb2_hndl_tp *repl_metadb, cdb2_hndl_tp **repl_db) { - int rc; + int rc, count = 0; DB_Connection *cnct; assert(repl_db_connected == 0); @@ -776,52 +834,65 @@ static DB_Connection *find_new_repl_db(cdb2_hndl_tp *repl_metadb, cdb2_hndl_tp * rc = cdb2_run_statement(*repl_db, "select 1"); if (rc != CDB2_OK) { physrep_logmsg(LOGMSG_ERROR, "%s:%d: Couldn't execute 'select 1' against %s@%s (rc: %d error: %s)\n", - __func__, __LINE__, cnct->dbname, - cnct->hostname, rc, cdb2_errstr(*repl_db)); + __func__, __LINE__, cnct->dbname, cnct->hostname, rc, cdb2_errstr(*repl_db)); cnct->last_failed = time(NULL); continue; } - while (cdb2_next_record(*repl_db) == CDB2_OK) {} + while (cdb2_next_record(*repl_db) == CDB2_OK) { + } - physrep_logmsg(LOGMSG_USER, "Attached to '%s' db '%s' for replication\n", - cnct->hostname, cnct->dbname); + physrep_logmsg(LOGMSG_USER, "Attached to '%s' db '%s' for replication\n", cnct->hostname, cnct->dbname); /* Execute sys.physrep.update_registry() on the replication metadb cluster */ rc = update_registry(repl_metadb, cnct->dbname, cnct->hostname); if (rc != 0) { - physrep_logmsg(LOGMSG_ERROR, "%s:%d Failed to exec sys.physrep.update_registry() on %s:%s\n", - __func__, __LINE__, cnct->dbname, cnct->hostname); + physrep_logmsg(LOGMSG_ERROR, "%s:%d Failed to exec sys.physrep.update_registry() on %s:%s\n", __func__, + __LINE__, cnct->dbname, cnct->hostname); } cnct->last_cnct = time(NULL); cnct->is_up = 1; - repl_db_connected = 1; + set_repl_db_connected(cnct->dbname, cnct->hostname); return cnct; } - physrep_logmsg(LOGMSG_USER, "%s:%d: Couldn't connect to any of the replication source hosts, retrying in a second\n", - __func__, __LINE__); + count++; + if (count < 10) { + physrep_logmsg(LOGMSG_USER, + "%s:%d: Couldn't connect to any of the replication source hosts, retrying in a second\n", + __func__, __LINE__); - sleep(1); + sleep(1); + } else { + physrep_logmsg(LOGMSG_USER, + "%s:%d: Couldn't connect to any of the replication source hosts, break to re-register\n", + __func__, __LINE__); + return NULL; + } } physrep_logmsg(LOGMSG_WARN, "Stopping replication\n"); return NULL; } +static void dump_replicant_hosts() +{ + for (int i = 0; i < repl_dbs_sz; i++) { + DB_Connection *cnct = repl_dbs[i]; + physrep_logmsg(LOGMSG_USER, "%s:%d: %s:%s\n", __func__, __LINE__, cnct->hostname, cnct->dbname); + } +} + static int add_replicant_host(char *hostname, char *dbname) { - if (gbl_physrep_debug) - physrep_logmsg(LOGMSG_USER, "%s:%d: Adding %s:%s\n", __func__, __LINE__, - hostname, dbname); /* Don't add same machine multiple times */ for (int i = 0; i < repl_dbs_sz; i++) { DB_Connection *c = repl_dbs[i]; - if ((strcmp(c->hostname, hostname) == 0) && - strcmp(c->dbname, dbname) == 0) { - physrep_logmsg(LOGMSG_DEBUG, "%s mach %s db %s found\n", __func__, hostname, - dbname); + if ((strcmp(c->hostname, hostname) == 0) && strcmp(c->dbname, dbname) == 0) { + if (gbl_physrep_debug) { + physrep_logmsg(LOGMSG_USER, "%s Found %s:%s not adding\n", __func__, hostname, dbname); + } return 0; } } @@ -837,6 +908,10 @@ static int add_replicant_host(char *hostname, char *dbname) repl_dbs = realloc(repl_dbs, (repl_dbs_sz + 1) * sizeof(DB_Connection *)); repl_dbs[repl_dbs_sz ++] = cnct; + if (gbl_physrep_debug) { + physrep_logmsg(LOGMSG_USER, "%s:%d Adding %s:%s\n", __func__, __LINE__, hostname, dbname); + } + return 0; } @@ -1117,7 +1192,9 @@ static int do_wait_for_reverse_conn(cdb2_hndl_tp *repl_metadb) { This is the database/node that to replicant connects to retrieve and apply physical logs. */ -static void *physrep_worker(void *args) { +extern __thread int physrep_out_of_order; +static void *physrep_worker(void *args) +{ comdb2_name_thread(__func__); volatile int64_t gen, highest_gen = 0; @@ -1126,6 +1203,9 @@ static void *physrep_worker(void *args) { int do_truncate = 0; int rc; int now; + int is_revconn = -1; + int last_revconn_check = 0; + int last_update_registry = 0; LOG_INFO info; LOG_INFO prev_info; DB_Connection *repl_db_cnct = NULL; @@ -1155,13 +1235,32 @@ static void *physrep_worker(void *args) { goto sleep_and_retry; } - if (repl_db_connected && (force_registration() || - (((now = time(NULL)) - last_register) > - gbl_physrep_register_interval))) { + if (repl_db_connected && + (force_registration() || (((now = time(NULL)) - last_register) > gbl_physrep_register_interval))) { close_repl_connection(repl_db_cnct, repl_db, __func__, __LINE__); if (gbl_physrep_debug) { - physrep_logmsg(LOGMSG_USER, "%s:%d: Forcing re-registration\n", - __func__, __LINE__); + physrep_logmsg(LOGMSG_USER, "%s:%d: Forcing re-registration\n", __func__, __LINE__); + } + } + + int revconn_ck = gbl_physrep_revconn_check_interval; + if (repl_db_connected && revconn_ck > 0 && comdb2_time_epoch() - last_revconn_check > revconn_ck) { + cdb2_hndl_tp *repl_metadb = NULL; + last_revconn_check = comdb2_time_epoch(); + if (gbl_physrep_debug) { + physrep_logmsg(LOGMSG_USER, "%s:%d Re-checking for reverse connection\n", __func__, __LINE__); + } + if ((rc = physrep_get_metadb_or_local_hndl(&repl_metadb)) == 0) { + int do_revconn = do_wait_for_reverse_conn(repl_metadb); + cdb2_close(repl_metadb); + if (gbl_physrep_debug) { + physrep_logmsg(LOGMSG_USER, "%s:%d Reverse connection check: do-revcon=%d, is-revcon=%d\n", + __func__, __LINE__, do_revconn, is_revconn); + } + if ((do_revconn && !is_revconn) || (!do_revconn && is_revconn)) { + logmsg(LOGMSG_USER, "Revconn changed, do_revconn=%d, is_revconn=%d\n", do_revconn, is_revconn); + close_repl_connection(repl_db_cnct, repl_db, __func__, __LINE__); + } } } @@ -1172,7 +1271,9 @@ static void *physrep_worker(void *args) { goto sleep_and_retry; } + last_revconn_check = comdb2_time_epoch(); if (do_wait_for_reverse_conn(repl_metadb) == 1) { + is_revconn = 1; int wait_timeout_sec = 60; if (gbl_physrep_debug) @@ -1214,8 +1315,9 @@ static void *physrep_worker(void *args) { /* Perform truncation to start fresh */ do_truncate = 1; - repl_db_connected = 1; + set_repl_db_connected(rev_conn_hndl->remote_dbname, rev_conn_hndl->remote_host); } else { + is_revconn = 0; int notfound = 0; while (stop_physrep_worker == 0) { if ((rc = register_self(repl_metadb)) == 0) @@ -1246,6 +1348,8 @@ static void *physrep_worker(void *args) { goto sleep_and_retry; } + last_update_registry = comdb2_time_epoch(); + /* Perform truncation to start fresh */ do_truncate = 1; } @@ -1291,21 +1395,23 @@ static void *physrep_worker(void *args) { if (gbl_physrep_debug) physrep_logmsg(LOGMSG_USER, "%s:%d: Executing: %s\n", __func__, __LINE__, sql_cmd); if ((rc = cdb2_run_statement(repl_db, sql_cmd)) != CDB2_OK) { - physrep_logmsg(LOGMSG_ERROR, "Couldn't query the database, retrying\n"); + physrep_logmsg(LOGMSG_ERROR, "Couldn't query the database, rcode=%d '%s' retrying\n", rc, + cdb2_errstr(repl_db)); close_repl_connection(repl_db_cnct, repl_db, __func__, __LINE__); goto sleep_and_retry; } if ((rc = cdb2_next_record(repl_db)) != CDB2_OK) { - if (gbl_physrep_debug) - physrep_logmsg(LOGMSG_USER, "%s:%d: Can't find the next record (rc: %d)\n", - __func__, __LINE__, rc); + if (gbl_physrep_debug) { + physrep_logmsg(LOGMSG_USER, "%s:%d: Can't find the next record (rc: %d '%s')\n", __func__, __LINE__, rc, + cdb2_errstr(repl_db)); + } close_repl_connection(repl_db_cnct, repl_db, __func__, __LINE__); goto sleep_and_retry; } /* our log matches, so apply each record log received */ - while (stop_physrep_worker == 0 && !do_truncate && + while (stop_physrep_worker == 0 && thedb->master == gbl_myhostname && !do_truncate && (rc = cdb2_next_record(repl_db)) == CDB2_OK) { /* check the generation id to make sure the master hasn't * switched */ @@ -1314,10 +1420,9 @@ static void *physrep_worker(void *args) { if (rec_gen && *rec_gen > highest_gen) { int64_t new_gen = *rec_gen; if (gbl_physrep_debug) { - physrep_logmsg(LOGMSG_USER, "%s:%d: My master changed, set truncate flag\n", - __func__, __LINE__); - physrep_logmsg(LOGMSG_USER, "%s:%d: gen: %" PRId64 ", rec_gen: %" PRId64 "\n", - __func__, __LINE__, gen, *rec_gen); + physrep_logmsg(LOGMSG_USER, "%s:%d: My master changed, set truncate flag\n", __func__, __LINE__); + physrep_logmsg(LOGMSG_USER, "%s:%d: gen: %" PRId64 ", rec_gen: %" PRId64 "\n", __func__, __LINE__, + gen, *rec_gen); } close_repl_connection(repl_db_cnct, repl_db, __func__, __LINE__); do_truncate = 1; @@ -1326,11 +1431,34 @@ static void *physrep_worker(void *args) { } prev_info = handle_record(repl_db, prev_info); + if (physrep_out_of_order) { + physrep_out_of_order = 0; + do_truncate = 1; + goto repl_loop; + } gbl_physrep_last_applied_time = time(NULL); + revconn_ck = gbl_physrep_revconn_check_interval; + if (revconn_ck > 0 && comdb2_time_epoch() - last_revconn_check > revconn_ck) { + if (gbl_physrep_debug) { + logmsg(LOGMSG_USER, "Checking reverse connection status\n"); + } + goto repl_loop; + } + + int update_regck = gbl_physrep_update_registry_interval; + if (repl_db_connected && update_regck > 0 && (comdb2_time_epoch() - last_update_registry) > update_regck) { + update_registry_periodic(repl_db_cnct->dbname, repl_db_cnct->hostname); + last_update_registry = comdb2_time_epoch(); + } } - if (rc != CDB2_OK_DONE || do_truncate) { + if (gbl_physrep_debug) { + logmsg(LOGMSG_USER, "%s:%d: next-record rc = %d, '%s'\n", __func__, __LINE__, rc, cdb2_errstr(repl_db)); + } + + if (thedb->master != gbl_myhostname || rc != CDB2_OK_DONE) { + close_repl_connection(repl_db_cnct, repl_db, __func__, __LINE__); do_truncate = 1; } sleep_and_retry: diff --git a/db/process_message.c b/db/process_message.c index 150245f8f8..e683897da6 100644 --- a/db/process_message.c +++ b/db/process_message.c @@ -89,6 +89,8 @@ extern int gbl_udp; extern int gbl_prefault_udp; extern int gbl_prefault_latency; extern int gbl_use_modsnap_for_snapshot; +extern int gbl_force_incoherent; +extern int gbl_force_incoherent_master; extern struct thdpool *gbl_verify_thdpool; extern int get_commit_lsn_map_switch_value(); @@ -2146,6 +2148,22 @@ int process_command(struct dbenv *dbenv, char *line, int lline, int st) gbl_who = toknum(tok, ltok); gbl_debug = gbl_sdebug = 0; logmsg(LOGMSG_USER, "Set who to %d\n", gbl_who); + } else if (tokcmp(tok, ltok, "physrep_force_registration") == 0) { + extern int gbl_physrep_force_registration; + logmsg(LOGMSG_USER, "physrep forcing registration, current-value is %d\n", gbl_physrep_force_registration); + gbl_physrep_force_registration = 1; + } else if (tokcmp(tok, ltok, "force_incoherent") == 0) { + logmsg(LOGMSG_USER, "Setting force_incoherent to 1\n"); + gbl_force_incoherent = 1; + } else if (tokcmp(tok, ltok, "unforce_incoherent") == 0) { + logmsg(LOGMSG_USER, "Setting force_incoherent to 0\n"); + gbl_force_incoherent = 0; + } else if (tokcmp(tok, ltok, "force_incoherent_master") == 0) { + logmsg(LOGMSG_USER, "Setting force_incoherent_master to 1\n"); + gbl_force_incoherent_master = 1; + } else if (tokcmp(tok, ltok, "unforce_incoherent_master") == 0) { + logmsg(LOGMSG_USER, "Setting force_incoherent_master to 0\n"); + gbl_force_incoherent_master = 0; } else if (tokcmp(tok, ltok, "physrep_fanout_override") == 0) { tok = segtok(line, lline, &st, <ok); if (ltok == 0) { @@ -3071,11 +3089,11 @@ int process_command(struct dbenv *dbenv, char *line, int lline, int st) logmsg(LOGMSG_USER, "Upgrade ahead enabled with size %d.\n", gbl_num_record_upgrades); } else if (tokcmp(tok, ltok, "disable_upgrade_ahead") == 0) { - gbl_num_record_upgrades = toknum(tok, ltok); + gbl_num_record_upgrades = toknum(tok, ltok); logmsg(LOGMSG_USER, "Upgrade ahead disabled.\n"); } else if (tokcmp(tok, ltok, "checkctags") == 0) { - tok = segtok(line, lline, &st, <ok); - if (ltok == 0) { + tok = segtok(line, lline, &st, <ok); + if (ltok == 0) { if (gbl_check_client_tags == 0) { logmsg(LOGMSG_USER, "currently check tag logic is off\n"); } else if (gbl_check_client_tags == 1) { @@ -3087,7 +3105,7 @@ int process_command(struct dbenv *dbenv, char *line, int lline, int st) gbl_check_client_tags); } return 0; - } + } if (tokcmp(tok, ltok, "off") == 0) { logmsg(LOGMSG_USER, "check tag logic is now off\n"); gbl_check_client_tags = 0; @@ -4525,60 +4543,57 @@ int process_command(struct dbenv *dbenv, char *line, int lline, int st) } logmsg(LOGMSG_USER, "\n"); } else if (tokcmp(tok, ltok, "decimal_rounding") == 0) { - tok = segtok(line, lline, &st, <ok); - if (ltok > 0 && tok[0]) { - gbl_decimal_rounding = dec_parse_rounding(tok, ltok); - logmsg(LOGMSG_USER, "Default decimal rounding is %s\n", - dec_print_mode(gbl_decimal_rounding)); - } else { - logmsg(LOGMSG_USER, - "Missing option for decimal rounding, current is %s\n", - dec_print_mode(gbl_decimal_rounding)); - } + tok = segtok(line, lline, &st, <ok); + if (ltok > 0 && tok[0]) { + gbl_decimal_rounding = dec_parse_rounding(tok, ltok); + logmsg(LOGMSG_USER, "Default decimal rounding is %s\n", dec_print_mode(gbl_decimal_rounding)); + } else { + logmsg(LOGMSG_USER, "Missing option for decimal rounding, current is %s\n", + dec_print_mode(gbl_decimal_rounding)); + } } else if (tokcmp(tok, ltok, "localrep") == 0) { - struct dbtable *db; - int i; - logmsg(LOGMSG_USER, "%-30s %10s\n", "table", "localrep?"); - for (i = 0; i < thedb->num_dbs; i++) { - db = thedb->dbs[i]; - logmsg(LOGMSG_USER, "%-30s %10s\n", db->tablename, - db->do_local_replication ? "YES" : "NO"); - } + struct dbtable *db; + int i; + logmsg(LOGMSG_USER, "%-30s %10s\n", "table", "localrep?"); + for (i = 0; i < thedb->num_dbs; i++) { + db = thedb->dbs[i]; + logmsg(LOGMSG_USER, "%-30s %10s\n", db->tablename, db->do_local_replication ? "YES" : "NO"); + } } else if (tokcmp(tok, ltok, "transtat") == 0) { - bdb_dumptrans(thedb->bdb_env); + bdb_dumptrans(thedb->bdb_env); } else if (tokcmp(tok, ltok, "ddlk") == 0) { - extern unsigned gbl_ddlk; - tok = segtok(line, lline, &st, <ok); - gbl_ddlk = toknum(tok, ltok); - if (gbl_ddlk) { - logmsg(LOGMSG_USER, "1 in every %d lock requests will deadlock\n", gbl_ddlk); - } else { - logmsg(LOGMSG_USER, "DDLK generator turned off\n"); - } + extern unsigned gbl_ddlk; + tok = segtok(line, lline, &st, <ok); + gbl_ddlk = toknum(tok, ltok); + if (gbl_ddlk) { + logmsg(LOGMSG_USER, "1 in every %d lock requests will deadlock\n", gbl_ddlk); + } else { + logmsg(LOGMSG_USER, "DDLK generator turned off\n"); + } } else if (tokcmp(tok, ltok, "berkdelay") == 0) { - uint32_t commit_delay_ms = 0; - tok = segtok(line, lline, &st, <ok); - if (ltok > 0) { - commit_delay_ms = toknum(tok, ltok); - Pthread_mutex_lock(&testguard); - bdb_berktest_commit_delay(commit_delay_ms); - Pthread_mutex_unlock(&testguard); - } else { - logmsg(LOGMSG_USER, "berkdelay requires commit-delay-ms argument\n"); - } + uint32_t commit_delay_ms = 0; + tok = segtok(line, lline, &st, <ok); + if (ltok > 0) { + commit_delay_ms = toknum(tok, ltok); + Pthread_mutex_lock(&testguard); + bdb_berktest_commit_delay(commit_delay_ms); + Pthread_mutex_unlock(&testguard); + } else { + logmsg(LOGMSG_USER, "berkdelay requires commit-delay-ms argument\n"); + } } else if (tokcmp(tok, ltok, "berktest") == 0) { - uint32_t txnsize = 0; - tok = segtok(line, lline, &st, <ok); - if (ltok > 0) - txnsize = toknum(tok, ltok); - Pthread_mutex_lock(&testguard); - if (txnsize <= 0) - bdb_berktest_multi(thedb->bdb_env); - else - bdb_berktest(thedb->bdb_env, txnsize); - Pthread_mutex_unlock(&testguard); + uint32_t txnsize = 0; + tok = segtok(line, lline, &st, <ok); + if (ltok > 0) + txnsize = toknum(tok, ltok); + Pthread_mutex_lock(&testguard); + if (txnsize <= 0) + bdb_berktest_multi(thedb->bdb_env); + else + bdb_berktest(thedb->bdb_env, txnsize); + Pthread_mutex_unlock(&testguard); } else if (tokcmp(tok, ltok, "dump_ltran_list") == 0) { - bdb_dump_logical_tranlist(thedb->bdb_env, stderr); + bdb_dump_logical_tranlist(thedb->bdb_env, stderr); # if 0 } else if (tokcmp(tok, ltok, "clear_rowlocks_stats") == 0) { rowlocks_clear_stats(); @@ -4586,63 +4601,62 @@ int process_command(struct dbenv *dbenv, char *line, int lline, int st) rowlocks_print_stats(stdout); # endif } else if (tokcmp(tok, ltok, "rep_process_txn_trace") == 0) { - gbl_rep_process_txn_time = 1; - logmsg(LOGMSG_USER, "Enabled rep-collect transaction trace\n"); + gbl_rep_process_txn_time = 1; + logmsg(LOGMSG_USER, "Enabled rep-collect transaction trace\n"); } else if (tokcmp(tok, ltok, "no_rep_process_txn_trace") == 0) { - gbl_rep_process_txn_time = 0; - logmsg(LOGMSG_ERROR, "Disabled rep-collect transaction trace\n"); + gbl_rep_process_txn_time = 0; + logmsg(LOGMSG_ERROR, "Disabled rep-collect transaction trace\n"); } else if (tokcmp(tok, ltok, "ack_trace") == 0) { - enable_ack_trace(); - logmsg(LOGMSG_ERROR, "Enabled ack trace\n"); + enable_ack_trace(); + logmsg(LOGMSG_ERROR, "Enabled ack trace\n"); } else if (tokcmp(tok, ltok, "no_ack_trace") == 0) { - disable_ack_trace(); - logmsg(LOGMSG_ERROR, "Disabled ack trace\n"); + disable_ack_trace(); + logmsg(LOGMSG_ERROR, "Disabled ack trace\n"); } else if (tokcmp(tok, ltok, "rowlocks_bench_logical_rectype") == 0) { - gbl_rowlocks_bench_logical_rectype = 1; - logmsg(LOGMSG_ERROR, "I will consider rowlocks_bench record (10019) a logical " - "rectype\n"); + gbl_rowlocks_bench_logical_rectype = 1; + logmsg(LOGMSG_ERROR, "I will consider rowlocks_bench record (10019) a logical " + "rectype\n"); } else if (tokcmp(tok, ltok, "rowlocks_bench_no_logical_rectype") == 0) { - gbl_rowlocks_bench_logical_rectype = 0; - logmsg(LOGMSG_ERROR, "I will not consider rowlocks_bench record (10019) a logical " - "rectype\n"); + gbl_rowlocks_bench_logical_rectype = 0; + logmsg(LOGMSG_ERROR, "I will not consider rowlocks_bench record (10019) a logical " + "rectype\n"); } else if (tokcmp(tok, ltok, "enable_rowlock_logging") == 0) { - gbl_disable_rowlocks_logging = 0; - logmsg(LOGMSG_ERROR, "I perform all rowlocks logging\n"); + gbl_disable_rowlocks_logging = 0; + logmsg(LOGMSG_ERROR, "I perform all rowlocks logging\n"); } else if (tokcmp(tok, ltok, "disable_rowlock_logging") == 0) { - gbl_disable_rowlocks_logging = 1; - logmsg(LOGMSG_ERROR, "I disable all rowlocks logging\n"); + gbl_disable_rowlocks_logging = 1; + logmsg(LOGMSG_ERROR, "I disable all rowlocks logging\n"); } else if (tokcmp(tok, ltok, "enable_rowlock_locking") == 0) { - gbl_disable_rowlocks = 0; - logmsg(LOGMSG_ERROR, "I acquire all rowlocks\n"); + gbl_disable_rowlocks = 0; + logmsg(LOGMSG_ERROR, "I acquire all rowlocks\n"); } else if (tokcmp(tok, ltok, "disable_rowlock_locking") == 0) { - gbl_disable_rowlocks = 1; - logmsg(LOGMSG_ERROR, "I will not actually acquire any rowlocks (but will still " - "follow the codepath)\n"); + gbl_disable_rowlocks = 1; + logmsg(LOGMSG_ERROR, "I will not actually acquire any rowlocks (but will still " + "follow the codepath)\n"); } else if (tokcmp(tok, ltok, "dispatch_bench") == 0) { - gbl_dispatch_rowlocks_bench = 1; - logmsg(LOGMSG_ERROR, - "I will dispatch rowlocks_bench record (10019) to db_dispatch\n"); + gbl_dispatch_rowlocks_bench = 1; + logmsg(LOGMSG_ERROR, "I will dispatch rowlocks_bench record (10019) to db_dispatch\n"); } else if (tokcmp(tok, ltok, "dont_dispatch_bench") == 0) { - gbl_dispatch_rowlocks_bench = 0; - logmsg(LOGMSG_USER, "I will not dispatch rowlocks_bench record (10019) to " - "db_dispatch\n"); + gbl_dispatch_rowlocks_bench = 0; + logmsg(LOGMSG_USER, "I will not dispatch rowlocks_bench record (10019) to " + "db_dispatch\n"); } else if (tokcmp(tok, ltok, "disable_rowlocks_sleepns") == 0) { - tok = segtok(line, lline, &st, <ok); - if (ltok > 0) { - int sleepns = toknum(tok, ltok); - if (sleepns >= 0) + tok = segtok(line, lline, &st, <ok); + if (ltok > 0) { + int sleepns = toknum(tok, ltok); + if (sleepns >= 0) gbl_disable_rowlocks_sleepns = sleepns; - } + } logmsg(LOGMSG_USER, "disable_rowlocks_sleepns is %d\n", gbl_disable_rowlocks_sleepns); # if 0 @@ -4738,65 +4752,62 @@ int process_command(struct dbenv *dbenv, char *line, int lline, int st) } # endif } else if (tokcmp(tok, ltok, "deadlock_policy_override") == 0) { - tok = segtok(line, lline, &st, <ok); - if (ltok > 0) { - gbl_deadlock_policy_override = toknum(tok, ltok); - logmsg(LOGMSG_USER, "Set deadlock policy to %s\n", - deadlock_policy_str(gbl_deadlock_policy_override)); - } else { - logmsg(LOGMSG_ERROR, "Must specify policy:\n"); - logmsg(LOGMSG_ERROR, "1 - DB_LOCK_DEFAULT\n"); - logmsg(LOGMSG_ERROR, "2 - DB_LOCK_EXPIRE\n"); - logmsg(LOGMSG_ERROR, "3 - DB_LOCK_MAXLOCKS\n"); - logmsg(LOGMSG_ERROR, "4 - DB_LOCK_MINLOCKS\n"); - logmsg(LOGMSG_ERROR, "5 - DB_LOCK_MINWRITE\n"); - logmsg(LOGMSG_ERROR, "6 - DB_LOCK_OLDEST\n"); - logmsg(LOGMSG_ERROR, "7 - DB_LOCK_RANDOM\n"); - logmsg(LOGMSG_ERROR, "8 - DB_LOCK_YOUNGEST\n"); - logmsg(LOGMSG_ERROR, "9 - DB_LOCK_MAXWRITE\n"); - logmsg(LOGMSG_ERROR, "10 - DB_LOCK_MINWRITE_NOREAD\n"); - logmsg(LOGMSG_ERROR, "11 - DB_LOCK_YOUNGEST_EVER\n"); - logmsg(LOGMSG_ERROR, "12 - DB_LOCK_MINWRITE_EVER\n"); - } + tok = segtok(line, lline, &st, <ok); + if (ltok > 0) { + gbl_deadlock_policy_override = toknum(tok, ltok); + logmsg(LOGMSG_USER, "Set deadlock policy to %s\n", deadlock_policy_str(gbl_deadlock_policy_override)); + } else { + logmsg(LOGMSG_ERROR, "Must specify policy:\n"); + logmsg(LOGMSG_ERROR, "1 - DB_LOCK_DEFAULT\n"); + logmsg(LOGMSG_ERROR, "2 - DB_LOCK_EXPIRE\n"); + logmsg(LOGMSG_ERROR, "3 - DB_LOCK_MAXLOCKS\n"); + logmsg(LOGMSG_ERROR, "4 - DB_LOCK_MINLOCKS\n"); + logmsg(LOGMSG_ERROR, "5 - DB_LOCK_MINWRITE\n"); + logmsg(LOGMSG_ERROR, "6 - DB_LOCK_OLDEST\n"); + logmsg(LOGMSG_ERROR, "7 - DB_LOCK_RANDOM\n"); + logmsg(LOGMSG_ERROR, "8 - DB_LOCK_YOUNGEST\n"); + logmsg(LOGMSG_ERROR, "9 - DB_LOCK_MAXWRITE\n"); + logmsg(LOGMSG_ERROR, "10 - DB_LOCK_MINWRITE_NOREAD\n"); + logmsg(LOGMSG_ERROR, "11 - DB_LOCK_YOUNGEST_EVER\n"); + logmsg(LOGMSG_ERROR, "12 - DB_LOCK_MINWRITE_EVER\n"); + } } else if (tokcmp(tok, ltok, "dump_mintruncate") == 0) { - bdb_dump_mintruncate_list(thedb->bdb_env); + bdb_dump_mintruncate_list(thedb->bdb_env); } else if (tokcmp(tok, ltok, "clear_mintruncate") == 0) { - bdb_clear_mintruncate_list(thedb->bdb_env); + bdb_clear_mintruncate_list(thedb->bdb_env); } else if (tokcmp(tok, ltok, "build_mintruncate") == 0) { - bdb_build_mintruncate_list(thedb->bdb_env); + bdb_build_mintruncate_list(thedb->bdb_env); } else if (tokcmp(tok, ltok, "print_mintruncate") == 0) { - bdb_print_mintruncate_min(thedb->bdb_env); + bdb_print_mintruncate_min(thedb->bdb_env); } else if (tokcmp(tok, ltok, "detect") == 0) { - bdb_detect(thedb->bdb_env); + bdb_detect(thedb->bdb_env); } else if (tokcmp(tok, ltok, "lsum") == 0) { - bdb_locker_summary(thedb->bdb_env); + bdb_locker_summary(thedb->bdb_env); } else if (tokcmp(tok, ltok, "mempget_timeout") == 0) { - extern int __gbl_max_mpalloc_sleeptime; - tok = segtok(line, lline, &st, <ok); - if (ltok == 0) { - logmsg(LOGMSG_USER, "Current mempget_timeout value: %d seconds\n", - __gbl_max_mpalloc_sleeptime); - return 1; - } - __gbl_max_mpalloc_sleeptime = toknum(tok, ltok); - logmsg(LOGMSG_USER, "mempget timeout set to %d seconds\n", - __gbl_max_mpalloc_sleeptime); + extern int __gbl_max_mpalloc_sleeptime; + tok = segtok(line, lline, &st, <ok); + if (ltok == 0) { + logmsg(LOGMSG_USER, "Current mempget_timeout value: %d seconds\n", __gbl_max_mpalloc_sleeptime); + return 1; + } + __gbl_max_mpalloc_sleeptime = toknum(tok, ltok); + logmsg(LOGMSG_USER, "mempget timeout set to %d seconds\n", __gbl_max_mpalloc_sleeptime); } else if (tokcmp(tok, ltok, "listpools") == 0) { - thdpool_list_pools(); + thdpool_list_pools(); } else if (tokcmp(tok, ltok, "pools_do_all") == 0) { - thdpool_command_to_all(line, lline, st); + thdpool_command_to_all(line, lline, st); } else if (tokcmp(tok, ltok, "berkattr") == 0) { - tok = segtok(line, lline, &st, <ok); - if (ltok == 0) { - bdb_berkdb_dump_attrs(dbenv->bdb_env, stdout); - return 1; - } else if (tokcmp(tok, ltok, "set") == 0) { - char *attr = NULL; - char *value = NULL; - int ivalue; - int optlen; - tok = segtok(line, lline, &st, <ok); - if (ltok == 0) + tok = segtok(line, lline, &st, <ok); + if (ltok == 0) { + bdb_berkdb_dump_attrs(dbenv->bdb_env, stdout); + return 1; + } else if (tokcmp(tok, ltok, "set") == 0) { + char *attr = NULL; + char *value = NULL; + int ivalue; + int optlen; + tok = segtok(line, lline, &st, <ok); + if (ltok == 0) goto bad_berkattr_set; attr = tokdup(tok, ltok); tok = segtok(line, lline, &st, <ok); diff --git a/db/reverse_conn.c b/db/reverse_conn.c index 153f7d1651..8d510ca6a9 100644 --- a/db/reverse_conn.c +++ b/db/reverse_conn.c @@ -94,6 +94,10 @@ int send_reversesql_request(const char *dbname, const char *host, struct pollfd pol; char paddr[64]; + if (gbl_revsql_debug == 1) { + revconn_logmsg(LOGMSG_USER, "%s:%d Sending reversesql request to %s@%s\n", __func__, __LINE__, dbname, host); + } + // Connect to the remote database sb = connect_remote_db(NULL, dbname, NULL, (char *)host, 0, gbl_revsql_force_rte); if (!sb) { @@ -277,6 +281,7 @@ int replace_tier_by_hostname(reverse_conn_host_list_tp *new_reverse_conn_hosts) static void *reverse_connection_worker(void *args) { reverse_conn_host_tp *host = args; time_t last_conn_attempt = 0; + backend_thread_event(thedb, COMDB2_THR_EVENT_START_RDONLY); host->worker_state = REVERSE_CONN_WORKER_RUNNING; if (gbl_revsql_debug == 1) { @@ -291,6 +296,15 @@ static void *reverse_connection_worker(void *args) { sleep(1); continue; } + + if (!bdb_am_i_coherent(thedb->bdb_env)) { + if (gbl_revsql_debug == 1) { + revconn_logmsg(LOGMSG_USER, "%s:%d not starting connection, not coherent\n", __func__, __LINE__); + } + sleep(1); + continue; + } + last_conn_attempt = now; pthread_mutex_lock(&reverse_conn_hosts_mu); @@ -319,6 +333,7 @@ static void *reverse_connection_worker(void *args) { } sleep(1); } + backend_thread_event(thedb, COMDB2_THR_EVENT_DONE_RDONLY); return 0; } diff --git a/lua/lib/physrep_register_replicant.lua b/lua/lib/physrep_register_replicant.lua index 01a6346b49..302dbd242b 100644 --- a/lua/lib/physrep_register_replicant.lua +++ b/lua/lib/physrep_register_replicant.lua @@ -5,6 +5,12 @@ -- register themselves. The node, in return, sends back a list of "potential" -- nodes that the replicant can connect to to pull and apply physical logs. local function main(dbname, hostname, lsn, source_dbname, source_hosts) + + print( + "physrep_register_replicant: dbname = '" .. dbname .. "', hostname = '" .. hostname .. + "', lsn = '" .. lsn .. "', source_dbname = '" .. source_dbname .. "', source_hosts = '" .. + source_hosts .. "'") + db:begin() -- Retrieve physrep tunables @@ -30,40 +36,47 @@ local function main(dbname, hostname, lsn, source_dbname, source_hosts) return end - local rs, rc = db:exec("WITH RECURSIVE " .. - " tiers (dbname, host, tier) AS " .. - " (SELECT dbname, host, 0 " .. - " FROM comdb2_physreps " .. - " WHERE dbname = '" .. source_dbname .. "' AND " .. - " host IN (" .. source_hosts ..") " .. - " UNION ALL " .. - " SELECT p.dbname, p.host, t.tier+1 " .. - " FROM comdb2_physrep_connections p, tiers t " .. - " WHERE p.source_dbname = t.dbname AND p.source_host = t.host), " .. - " child_count (dbname, host, tier, cnt) AS " .. - " (SELECT t.dbname, t.host, t.tier, count (*) " .. - " FROM tiers t LEFT OUTER JOIN comdb2_physrep_connections p " .. - " ON t.dbname = p.source_dbname AND t.host = p.source_host " .. - " GROUP BY t.dbname, t.host HAVING COUNT(*) < " .. tunables["physrep_fanout"] .. " ) " .. - "SELECT c.tier, c.dbname, c.host FROM child_count c, comdb2_physreps p " .. - " WHERE c.dbname = p.dbname AND c.host = p.host AND (p.state IS NULL OR p.state NOT IN ('Pending', 'Inactive'))" .. - " ORDER BY tier, cnt " .. - " LIMIT " .. tunables["physrep_max_candidates"]) + local physrep_fanout = tunables["physrep_fanout"] + local physrep_max_candidates = tunables["physrep_max_candidates"] - if rs then - local row = rs:fetch() - while row do - db:emit(row) - row = rs:fetch() - end - end + local cte = ("WITH RECURSIVE " .. + " tiers (dbname, host, tier) AS " .. + " (SELECT dbname, host, 0 " .. + " FROM comdb2_physreps " .. + " WHERE dbname = '" .. source_dbname .. "' AND " .. + " host IN (" .. source_hosts ..") " .. + " UNION ALL " .. + " SELECT p.dbname, p.host, t.tier+1 " .. + " FROM comdb2_physrep_connections p, tiers t " .. + " WHERE p.source_dbname = t.dbname AND p.source_host = t.host), " .. + " child_count (dbname, host, tier, cnt) AS " .. + " (SELECT t.dbname, t.host, t.tier, count (*) " .. + " FROM tiers t LEFT OUTER JOIN comdb2_physrep_connections p " .. + " ON t.dbname = p.source_dbname AND t.host = p.source_host " .. + " GROUP BY t.dbname, t.host HAVING COUNT(*) < " .. physrep_fanout .. " ) " .. + "SELECT c.tier, c.dbname, c.host FROM child_count c, comdb2_physreps p " .. + " WHERE c.dbname = p.dbname AND c.host = p.host AND (p.state IS NULL OR p.state NOT IN ('Pending', 'Inactive'))" .. + " ORDER BY tier, random() " .. + " LIMIT " .. physrep_max_candidates) - -- Add this physical replicant requester to the comdb2_physreps table with - -- its state set to 'Pending'. This information will give an estimate on how - -- many replicant registrations are currently in progress. - -- We could deny further requests if there are too many pending requests. - db:exec("INSERT INTO comdb2_physreps(dbname, host, state) VALUES ('" .. dbname .. "', '" .. hostname .. "', 'Pending')" .. - " ON CONFLICT (dbname, host) DO UPDATE SET state = 'Pending'") + print("physrep_register_replicant: sql = " .. cte) + local rs, rc = db:exec(cte) + if (rc == 0) then + if rs then + local row = rs:fetch() + while row do + db:emit(row) + row = rs:fetch() + end + end + + -- Add this physical replicant requester to the comdb2_physreps table with + -- its state set to 'Pending'. This information will give an estimate on how + -- many replicant registrations are currently in progress. + -- We could deny further requests if there are too many pending requests. + db:exec("INSERT INTO comdb2_physreps(dbname, host, state) VALUES ('" .. dbname .. "', '" .. hostname .. "', 'Pending')" .. + " ON CONFLICT (dbname, host) DO UPDATE SET state = 'Pending'") + end db:commit() end diff --git a/sqlite/ext/comdb2/tranlog.c b/sqlite/ext/comdb2/tranlog.c index 821a5568e9..b639ccb312 100644 --- a/sqlite/ext/comdb2/tranlog.c +++ b/sqlite/ext/comdb2/tranlog.c @@ -25,6 +25,7 @@ #include "dbinc_auto/txn_auto.h" #include "comdb2systbl.h" #include "parse_lsn.h" +#include "epochlib.h" /* Column numbers */ #define TRANLOG_COLUMN_START 0 @@ -126,12 +127,16 @@ extern pthread_cond_t gbl_logput_cond; extern int gbl_num_logput_listeners; extern pthread_mutex_t gbl_durable_lsn_lk; extern pthread_cond_t gbl_durable_lsn_cond; +int gbl_tranlog_incoherent_timeout = 10; +int gbl_tranlog_maxpoll = 60; extern int comdb2_sql_tick(); +extern int bdb_am_i_coherent(bdb_state_type *bdb_state); /* ** Advance a tranlog cursor to the next log entry */ -static int tranlogNext(sqlite3_vtab_cursor *cur){ +static int tranlogNext(sqlite3_vtab_cursor *cur) +{ struct sql_thread *thd = NULL; tranlog_cursor *pCur = (tranlog_cursor*)cur; DB_LSN durable_lsn = {0}; @@ -219,14 +224,40 @@ static int tranlogNext(sqlite3_vtab_cursor *cur){ return SQLITE_INTERNAL; } + int incoherent_start_time = 0; + if (pCur->flags & TRANLOG_FLAGS_BLOCK && !(pCur->flags & TRANLOG_FLAGS_DESCENDING)) { + int poll_start_time = comdb2_time_epoch(); do { /* Tick up. Return an error if sql_tick() fails (peer dropped connection, max query time reached, etc.) */ if ((rc = comdb2_sql_tick()) != 0) return rc; + if (gbl_tranlog_maxpoll > 0) { + if (comdb2_time_epoch() - poll_start_time > gbl_tranlog_maxpoll) { + logmsg(LOGMSG_DEBUG, "%s: returning after poll for %d seconds\n", + __func__, gbl_tranlog_maxpoll); + pCur->hitLast = 1; + return SQLITE_OK; + } + } + if (gbl_tranlog_incoherent_timeout > 0) { + int coherent = bdb_am_i_coherent(bdb_state); + if (coherent) { + incoherent_start_time = 0; + } else if (incoherent_start_time == 0) { + incoherent_start_time = comdb2_time_epoch(); + } else if (comdb2_time_epoch() - incoherent_start_time > + gbl_tranlog_incoherent_timeout) { + logmsg(LOGMSG_DEBUG, "%s: incoherent for %d seconds\n", + __func__, gbl_tranlog_incoherent_timeout); + pCur->hitLast = 1; + return SQLITE_OK; + } + } + if (db_is_exiting() || pCur->startAppRecGen != gbl_apprec_gen) { pCur->hitLast = 1; return SQLITE_OK; diff --git a/tests/phys_rep_tiered.test/Makefile b/tests/phys_rep_tiered.test/Makefile index 8d4aaf0266..e112fd7824 100644 --- a/tests/phys_rep_tiered.test/Makefile +++ b/tests/phys_rep_tiered.test/Makefile @@ -3,6 +3,7 @@ ifeq ($(TESTSROOTDIR),) else include $(TESTSROOTDIR)/testcase.mk endif +export CHECK_DB_AT_FINISH=0 ifeq ($(TEST_TIMEOUT),) export TEST_TIMEOUT=20m endif diff --git a/tests/phys_rep_tiered.test/lrl.options b/tests/phys_rep_tiered.test/lrl.options index c4beeb51cf..f7968458be 100644 --- a/tests/phys_rep_tiered.test/lrl.options +++ b/tests/phys_rep_tiered.test/lrl.options @@ -1,4 +1,10 @@ +logmsg level debug debug_drop_nth_rep_message 10000 incoherent_slow_inactive_timeout 0 revsql_fake_connect_failure 1 revsql_debug 1 +physrep_debug 1 +tranlog_incoherent_timeout 10 +forbid_remote_admin 0 +ctrace_dbdir 1 +allow_lua_print 1 diff --git a/tests/phys_rep_tiered.test/runit b/tests/phys_rep_tiered.test/runit index fc42781235..560e0afa5c 100755 --- a/tests/phys_rep_tiered.test/runit +++ b/tests/phys_rep_tiered.test/runit @@ -11,8 +11,10 @@ export comdb2ar=${COMDB2AR_EXE} dbname=$1 dgpid=0 +first="" +cluster_count=0 NRUNS=100 -SLEEPAMOUNT=120 # 2 minutes +SLEEPAMOUNT=300 KILL_WAIT_TIME=10 SLEEP_BETWEEN_CHECKS=.5 PIDs="" @@ -27,7 +29,7 @@ fi function cleanFailExit() { - cleanup + cleanupabort failexit $@ } @@ -81,8 +83,8 @@ function wait_for_catchup() if [[ -z "$mnode" ]]; then continue; fi - c_lsn=`$CDB2SQL_EXE --tabs $CDB2_OPTIONS $dbname --host $mnode 'select lsn from comdb2_transaction_logs(NULL, NULL, 4) limit 1' | tr -d {} | cut -f2 -d":"` - r_lsn=`$CDB2SQL_EXE --tabs $CDB2_OPTIONS ${_repl_dbname} --host ${_repl_host} 'select lsn from comdb2_transaction_logs(NULL, NULL, 4) limit 1' | tr -d {} | cut -f2 -d":"` + c_lsn=`$CDB2SQL_EXE -admin --tabs $CDB2_OPTIONS $dbname --host $mnode 'select lsn from comdb2_transaction_logs(NULL, NULL, 4) limit 1' | tr -d {} | cut -f2 -d":"` + r_lsn=`$CDB2SQL_EXE -admin --tabs $CDB2_OPTIONS ${_repl_dbname} --host ${_repl_host} 'select lsn from comdb2_transaction_logs(NULL, NULL, 4) limit 1' | tr -d {} | cut -f2 -d":"` done if [[ "$c_lsn" -ne "$r_lsn" ]] ; then @@ -137,7 +139,7 @@ function create_physrep_tables() cleanFailExit "failed to create table on $repl_metadb_name@$repl_metadb_host" fi - cdb2sql ${CDB2_OPTIONS} --host $repl_metadb_host $repl_metadb_name "CREATE TABLE comdb2_physrep_sources(dbname CSTRING(60), host CSTRING(120), source_dbname CSTRING(60), source_host CSTRING(120), UNIQUE (dbname, host), UNIQUE (dbname, host, source_dbname, source_host))" + cdb2sql ${CDB2_OPTIONS} --host $repl_metadb_host $repl_metadb_name "CREATE TABLE comdb2_physrep_sources(dbname CSTRING(60), host CSTRING(120), source_dbname CSTRING(60), source_host CSTRING(120), UNIQUE (dbname, host, source_dbname, source_host))" if [ $? -ne 0 ]; then cleanFailExit "failed to create table on $repl_metadb_name@$repl_metadb_host" fi @@ -156,9 +158,9 @@ function add_to_physrep_sources() { echo "== Adding ${_source_dbname}@${_source_host} -> ${_repl_dbname}@${_repl_host} to comdb2_physrep_sources table ==" cdb2sql ${CDB2_OPTIONS} --host ${_repl_metadb_host} ${_repl_metadb_name} "INSERT INTO comdb2_physrep_sources VALUES ('${_repl_dbname}', '${_repl_host}', '${_source_dbname}', '${_source_host}')" - if [ $? -ne 0 ]; then - cleanFailExit "failed to insert record in ${_repl_metadb_name}@${_repl_metadb_host}" - fi + #if [ $? -ne 0 ]; then + # cleanFailExit "failed to insert record in ${_repl_metadb_name}@${_repl_metadb_host}" + #fi } function verify_lsn_db_node() @@ -287,6 +289,161 @@ function verify_fanout_myoverride() fi } +function count_revconn() +{ + typeset count=0 + for node in $CLUSTER ; do + x=$(cdb2sql ${CDB2_OPTIONS} --tabs --host $node ${REPL_CLUS_DBNAME} "exec procedure sys.cmd.send('stat thr')" | grep "reversesql" | wc -l) + count=$((count+x)) + done + echo $count +} + +function block_for_clustered_revconn() +{ + echo "== Blocking for single clustered revconn ==" + local count=0 + + cnt=$(count_revconn) + while [[ "$cnt" -ne 1 ]]; do + let count=count+1 + echo "Count-revcon is $cnt not 1, waiting for only 1 reversesql to start, count $count" + sleep 1 + cnt=$(count_revconn) + done +} + +function clustered_revconn() +{ + echo "== Clustered revconn ==" + + if [[ -z "$CLUSTER" ]]; then + echo "clustered-revconn test only valid for clustered test" + return + fi + + echo "Add clustered physrep to reverse-conns" + for node in $CLUSTER ; do + for node2 in $CLUSTER ; do + add_to_physrep_sources ${REPL_META_DBNAME} ${REPL_META_HOST} ${SOURCE_DBNAME} ${node} ${REPL_CLUS_DBNAME} ${node2} + done + done + + block_for_clustered_revconn + + echo "Attempt 3 downgrades" + + j=0 + while [[ $j -lt 3 ]]; do + + # Downgrade + for node in $CLUSTER ; do + cdb2sql ${CDB2_OPTIONS} $REPL_CLUS_DBNAME --host $node "exec procedure sys.cmd.send('downgrade')" + done + + echo "Sleeping for 10" + sleep 10 + block_for_clustered_revconn + + let j=j+1 +done +} + +function incoherent_test() +{ + echo "== Incoherent test ==" + + echo "Creating a simple table" + cdb2sql ${CDB2_OPTIONS} $dbname default "Create table t1(a int)" + + echo "Marking all as incoherent" + for node in $CLUSTER ; do + cdb2sql --admin ${CDB2_OPTIONS} $dbname --host $node "exec procedure sys.cmd.send('force_incoherent')" + cdb2sql --admin ${CDB2_OPTIONS} $dbname --host $node "exec procedure sys.cmd.send('force_incoherent_master')" + cdb2sql --admin ${CDB2_OPTIONS} ${REPL_CLUS_DBNAME} --host $node "exec procedure sys.cmd.send('force_incoherent')" + cdb2sql --admin ${CDB2_OPTIONS} ${REPL_CLUS_DBNAME} --host $node "exec procedure sys.cmd.send('force_incoherent_master')" + done + + echo "Sleeping for 10" + sleep 10 + + for node in $CLUSTER ; do + cdb2sql --admin ${CDB2_OPTIONS} ${REPL_CLUS_DBNAME} --host $node "exec procedure sys.cmd.send('physrep_force_registration')" + cdb2sql --admin ${CDB2_OPTIONS} ${REPL_DBNAME_PREFIX}_${node} --host $node "exec procedure sys.cmd.send('physrep_force_registration')" + done + + echo "Sleeping for 10" + sleep 10 + + have_reregistered=0 + echo "Force registration" + while [[ "$have_reregistered" -eq 0 ]]; do + for node in $CLUSTER ; do + x=$(cdb2sql --tabs --admin ${CDB2_OPTIONS} ${REPL_CLUS_DBNAME} --host $node "exec procedure sys.cmd.send('physrep_force_registration')") + echo "$x" + j=$(echo "$x" | awk '{print $NF}') + if [[ "$j" == "0" ]]; then + have_reregistered=1 + fi + x=$(cdb2sql --tabs --admin ${CDB2_OPTIONS} ${REPL_DBNAME_PREFIX}_${node} --host $node "exec procedure sys.cmd.send('physrep_force_registration')") + echo "$x" + j=$(echo "$x" | awk '{print $NF}') + if [[ "$j" == "0" ]]; then + have_reregistered=1 + fi + done + if [[ $have_reregistered -eq 0 ]]; then + echo "Nothing has reregistered, sleeping for 10" + sleep 10 + fi + done + + echo "Sleeping for 10" + sleep 10 + + echo "Unforcing incoherent master" + for node in $CLUSTER ; do + cdb2sql --admin ${CDB2_OPTIONS} $dbname --host $node "exec procedure sys.cmd.send('unforce_incoherent_master')" + cdb2sql --admin ${CDB2_OPTIONS} ${REPL_CLUS_DBNAME} --host $node "exec procedure sys.cmd.send('unforce_incoherent_master')" + #cdb2sql --admin ${CDB2_OPTIONS} ${REPL_CLUS_DBNAME} --host $node "exec procedure sys.cmd.send('physrep_force_registration')" + #cdb2sql --admin ${CDB2_OPTIONS} ${REPL_DBNAME_PREFIX}_${node} --host $node "exec procedure sys.cmd.send('physrep_force_registration')" + done + + echo "Unforcing incoherent first-node" + cdb2sql --admin ${CDB2_OPTIONS} $dbname --host $first "exec procedure sys.cmd.send('unforce_incoherent')" + + echo "Sleeping for 10" + sleep 10 + + echo "Generating writes for source db" + j=0 + while [[ $j -lt 50 ]]; do + cdb2sql -admin ${CDB2_OPTIONS} $dbname default "insert into t1 select * from generate_series(1, 1000)" + let j=j+1 + done + + echo "Ask all cluster nodes to start a reverse-conn" + if [[ -n $CLUSTER ]]; then + for node in $CLUSTER ; do + add_to_physrep_sources ${REPL_META_DBNAME} ${REPL_META_HOST} ${SOURCE_DBNAME} ${node} "${REPL_DBNAME_PREFIX}_${first}" ${first} + done + fi + + echo "Compare-physrep lsns - this will timeout if broken" + compare_end_lsns + + echo "Marking all as coherent" + for node in $CLUSTER ; do + cdb2sql --admin ${CDB2_OPTIONS} $dbname --host $node "exec procedure sys.cmd.send('unforce_incoherent')" + cdb2sql --admin ${CDB2_OPTIONS} $dbname --host $node "exec procedure sys.cmd.send('unforce_incoherent_master')" + cdb2sql --admin ${CDB2_OPTIONS} ${REPL_CLUS_DBNAME} --host $node "exec procedure sys.cmd.send('unforce_incoherent')" + cdb2sql --admin ${CDB2_OPTIONS} ${REPL_CLUS_DBNAME} --host $node "exec procedure sys.cmd.send('unforce_incoherent_master')" + done + + echo "Dropping table" + cdb2sql ${CDB2_OPTIONS} $dbname default "Drop table t1" +} + function setup_physrep_metadb() { echo "== Setting up replication metadb cluster ==" @@ -306,7 +463,15 @@ dir ${_dbdir} physrep_fanout_override fanouttest 100 physrep_fanout_override fanouttest2 50 physrep_fanout_override $DBNAME 3 +physrep_register_interval 10 +revsql_debug 1 +physrep_debug 1 +tranlog_incoherent_timeout 10 +forbid_remote_admin 0 logmsg level debug +ctrace_dbdir 1 +allow_lua_print 1 +physrep_update_registry_interval 1 END $COMDB2_EXE ${_dbname} --create --lrl ${_dbdir}/${_dbname}.lrl --pidfile ${_dbdir}/${_dbname}.pid >> ${logFile} 2>&1 @@ -333,7 +498,7 @@ END for node in ${CLUSTER}; do logFile=$TESTDIR/logs/${_dbname}.${node}.log - ssh ${node} "mkdir ${_dbdir}" + ssh ${node} "mkdir ${_dbdir}" < /dev/null if [[ ${firstNode} = "" ]]; then cat <> ${tmpdir}/${_dbname}.lrl @@ -343,13 +508,21 @@ cluster nodes ${CLUSTER} physrep_fanout_override fanouttest 100 physrep_fanout_override fanouttest2 50 physrep_fanout_override $DBNAME 3 +tranlog_incoherent_timeout 10 +physrep_register_interval 10 +revsql_debug 1 +physrep_debug 1 +forbid_remote_admin 0 logmsg level debug +ctrace_dbdir 1 +allow_lua_print 1 +physrep_update_registry_interval 1 END scp ${tmpdir}/${_dbname}.lrl ${node}:${_dbdir}/${_dbname}.lrl - ssh ${node} "$COMDB2_EXE ${_dbname} --create --lrl ${_dbdir}/${_dbname}.lrl --pidfile ${_dbdir}/${_dbname}.pid" >> ${logFile} 2>&1 + ssh ${node} "$COMDB2_EXE ${_dbname} --create --lrl ${_dbdir}/${_dbname}.lrl --pidfile ${_dbdir}/${_dbname}.pid" >> ${logFile} 2>&1 < /dev/null firstNode=${node} else - ssh $node "${COPYCOMDB2_EXE} -x ${COMDB2_EXE} $firstNode:${_dbdir}/${_dbname}.lrl $_dbdir $_dbdir" >> ${logFile} 2>&1 + ssh $node "${COPYCOMDB2_EXE} -x ${COMDB2_EXE} $firstNode:${_dbdir}/${_dbname}.lrl $_dbdir $_dbdir" >> ${logFile} 2>&1 < /dev/null if [ ! $? -eq 0 ]; then cleanFailExit "copycomdb2 failed" fi @@ -359,7 +532,7 @@ END # 2. Start instances for node in ${CLUSTER}; do logFile=$TESTDIR/logs/${_dbname}.${node}.log - ssh ${node} "$COMDB2_EXE ${_dbname} --lrl ${_dbdir}/${_dbname}.lrl --pidfile ${_dbdir}/${_dbname}.pid" >> ${logFile} 2>&1 & + ssh ${node} "$COMDB2_EXE ${_dbname} --lrl ${_dbdir}/${_dbname}.lrl --pidfile ${_dbdir}/${_dbname}.pid" >> ${logFile} 2>&1 < /dev/null & PIDs="${PIDs} $!" done @@ -411,8 +584,8 @@ function fix_lrl_and_restart_source_nodes() else for node in $CLUSTER ; do - ssh ${node} "echo \"physrep_metadb ${_meta_dbname} ${_meta_host}\" >> ${_source_dbdir}/${_source_dbname}.lrl" - ssh ${node} "echo \"physrep_debug 1\" >> ${_source_dbdir}/${_source_dbname}.lrl" + ssh ${node} "echo \"physrep_metadb ${_meta_dbname} ${_meta_host}\" >> ${_source_dbdir}/${_source_dbname}.lrl" < /dev/null + ssh ${node} "echo \"physrep_debug 1\" >> ${_source_dbdir}/${_source_dbname}.lrl" < /dev/null #ssh ${node} "echo \"physrep_register_interval 5\" >> ${_source_dbdir}/${_source_dbname}.lrl" echo "killrestart node $node" kill_restart_node $node $KILL_WAIT_TIME & @@ -478,17 +651,17 @@ function setup_physrep_cluster() logFile=$TESTDIR/logs/${_dbname}.${node}.log if [[ ${firstNode} = "" ]]; then - ssh $node "${COPYCOMDB2_EXE} -x ${COMDB2_EXE} -H ${_dbname} -y @${_source_host} ${_source_host}:${SOURCE_DBDIR}/${SOURCE_DBNAME}.lrl ${_dbdir} ${_dbdir}" >> ${logFile} 2>&1 + ssh $node "${COPYCOMDB2_EXE} -x ${COMDB2_EXE} -H ${_dbname} -y @${_source_host} ${_source_host}:${SOURCE_DBDIR}/${SOURCE_DBNAME}.lrl ${_dbdir} ${_dbdir}" >> ${logFile} 2>&1 > ${_dbdir}/${_dbname}.lrl" + ssh $node "echo \"cluster nodes ${CLUSTER}\" >> ${_dbdir}/${_dbname}.lrl" < /dev/null firstNode=${node} else - ssh $node "${COPYCOMDB2_EXE} -x ${COMDB2_EXE} $firstNode:${_dbdir}/${_dbname}.lrl $_dbdir $_dbdir" >> ${logFile} 2>&1 + ssh $node "${COPYCOMDB2_EXE} -x ${COMDB2_EXE} $firstNode:${_dbdir}/${_dbname}.lrl $_dbdir $_dbdir" >> ${logFile} 2>&1 < /dev/null if [ ! $? -eq 0 ]; then cleanFailExit "copycomdb2 failed" fi @@ -498,7 +671,7 @@ function setup_physrep_cluster() # 2. Start instances for node in ${CLUSTER}; do logFile=$TESTDIR/logs/${_dbname}.${node}.log - ssh ${node} "$COMDB2_EXE ${_dbname} --lrl ${_dbdir}/${_dbname}.lrl --pidfile ${_dbdir}/${_dbname}.pid" >> ${logFile} 2>&1 & + ssh ${node} "$COMDB2_EXE ${_dbname} --lrl ${_dbdir}/${_dbname}.lrl --pidfile ${_dbdir}/${_dbname}.pid" >> ${logFile} 2>&1 < /dev/null & PIDs="${PIDs} $!" done @@ -576,12 +749,12 @@ function setup_physrep_replicants() firstNode=${node} fi - ssh $node "${COPYCOMDB2_EXE} -x ${COMDB2_EXE} -H ${_repl_dbname} -y @${_source_host} ${_source_host}:${_source_dbdir}/${_source_dbname}.lrl ${_repl_dbdir} ${_repl_dbdir}" >> ${logFile} 2>&1 + ssh $node "${COPYCOMDB2_EXE} -x ${COMDB2_EXE} -H ${_repl_dbname} -y @${_source_host} ${_source_host}:${_source_dbdir}/${_source_dbname}.lrl ${_repl_dbdir} ${_repl_dbdir}" >> ${logFile} 2>&1 < /dev/null if [ ! $? -eq 0 ]; then cleanFailExit "copycomdb2 failed" fi - ssh ${node} "$COMDB2_EXE ${_repl_dbname} --lrl ${_repl_dbdir}/${_repl_dbname}.lrl --pidfile ${_repl_dbdir}/${_repl_dbname}.pid" >> ${logFile} 2>&1 & + ssh ${node} "$COMDB2_EXE ${_repl_dbname} --lrl ${_repl_dbdir}/${_repl_dbname}.lrl --pidfile ${_repl_dbdir}/${_repl_dbname}.pid" >> ${logFile} 2>&1 < /dev/null & PIDs="${PIDs} $!" # Wait for the node to start @@ -687,8 +860,9 @@ function run_tests() done } -function cleanup() +function cleanup_internal() { + typeset sig=$1 echo "Killing ${PIDs}" kill -9 ${PIDs} [[ $dgpid != 0 ]] && kill -9 $dgpid @@ -706,13 +880,23 @@ function cleanup() for node in $CLUSTER ; do _repl_dbname=${REPL_DBNAME_PREFIX}_${node} _repl_dbdir=${REPL_DBDIR_PREFIX}_${node} - ssh ${node} "kill -9 \$(cat ${_repl_dbdir}/${_repl_dbname}.pid)" - ssh ${node} "kill -9 \$(cat ${REPL_CLUS_DBDIR}/${REPL_CLUS_DBNAME}.pid)" - ssh ${node} "kill -9 \$(cat ${REPL_META_DBDIR}/${REPL_META_DBNAME}.pid)" + ssh ${node} "kill -$sig \$(cat ${_repl_dbdir}/${_repl_dbname}.pid)" < /dev/null + ssh ${node} "kill -$sig \$(cat ${REPL_CLUS_DBDIR}/${REPL_CLUS_DBNAME}.pid)" < /dev/null + ssh ${node} "kill -$sig \$(cat ${REPL_META_DBDIR}/${REPL_META_DBNAME}.pid)" < /dev/null done fi } +function cleanup() +{ + cleanup_internal 9 +} + +function cleanupabort() +{ + cleanup_internal 15 +} + # Compare LSNs of all the replicants against the source cluster/node function compare_end_lsns() { @@ -734,11 +918,18 @@ trap - INT EXIT REPL_META_DBNAME=${TESTCASE}_META REPL_META_DBDIR=${DBDIR}/${REPL_META_DBNAME} REPL_META_HOST="" + if [[ -z "$CLUSTER" ]]; then # Standalone REPL_META_HOST=$(hostname) else # Cluster for node in ${CLUSTER}; do + let cluster_count=cluster_count+1 + done + for node in ${CLUSTER}; do + first=${node} REPL_META_HOST=${node} + # Downgrade now - we want to test incoherent later + cdb2sql ${CDB2_OPTIONS} ${dbname} --host $node "exec procedure sys.cmd.send('downgrade')" break done fi @@ -780,6 +971,8 @@ run_tests verify_blkseq compare_end_lsns verify_fanout_myoverride ${REPL_META_DBNAME} ${REPL_META_HOST} +incoherent_test +clustered_revconn cleanup exit 0 diff --git a/tests/tunables.test/t00_all_tunables.expected b/tests/tunables.test/t00_all_tunables.expected index c6778f455b..611f0394ee 100644 --- a/tests/tunables.test/t00_all_tunables.expected +++ b/tests/tunables.test/t00_all_tunables.expected @@ -707,10 +707,14 @@ (name='physrep_metadb_host', description='List of physical replication metadb cluster hosts.', type='STRING', value=NULL, read_only='Y') (name='physrep_metadb_name', description='Physical replication metadb cluster name.', type='STRING', value=NULL, read_only='Y') (name='physrep_reconnect_penalty', description='Physrep wait seconds before retry to the same node. (Default: 5)', type='INTEGER', value='0', read_only='N') -(name='physrep_register_interval', description='Interval for physical replicant re-registration. (Default: 3600)', type='INTEGER', value='600', read_only='N') +(name='physrep_register_interval', description='Interval for physical replicant re-registration. (Default: 600)', type='INTEGER', value='600', read_only='N') +(name='physrep_repl_host', description='Current physrep host.', type='STRING', value=NULL, read_only='Y') +(name='physrep_repl_name', description='Current physrep parent.', type='STRING', value=NULL, read_only='Y') +(name='physrep_revconn_check_interval', description='Physrep recheck revconn interval. (Default: 60)', type='INTEGER', value='60', read_only='N') (name='physrep_shuffle_host_list', description='Shuffle the host list returned by register_replicant() before connecting to the hosts. (Default: OFF)', type='BOOLEAN', value='OFF', read_only='N') (name='physrep_source_dbname', description='Physical replication source cluster dbname.', type='STRING', value=NULL, read_only='Y') (name='physrep_source_host', description='List of physical replication source cluster hosts.', type='STRING', value=NULL, read_only='Y') +(name='physrep_update_registry_interval', description='Physrep update-registry interval. (Default: 60)', type='INTEGER', value='60', read_only='N') (name='plannedsc', description='Use planned schema change by default', type='BOOLEAN', value='ON', read_only='N') (name='planner_effort', description='Planner effort (try harder) levels. (Default: 1)', type='INTEGER', value='1', read_only='N') (name='planner_show_scanstats', description='', type='BOOLEAN', value='OFF', read_only='N') @@ -997,6 +1001,8 @@ (name='track_replication_times', description='Track how long each replicant takes to ack all transactions.', type='BOOLEAN', value='ON', read_only='N') (name='track_replication_times_max_lsns', description='Track replication times for up to this many transactions.', type='INTEGER', value='50', read_only='N') (name='tracked_locklist_init', description='Initial allocation count for tracked locks', type='INTEGER', value='10', read_only='N') +(name='tranlog_incoherent_timeout', description='Timeout in seconds for incoherent tranlog. (Default: 10)', type='INTEGER', value='10', read_only='N') +(name='tranlog_maxpoll', description='Tranlog timeout in seconds for blocking poll. (Default: 60)', type='INTEGER', value='60', read_only='N') (name='transaction_grace_period', description='Time to wait for connections with pending transactions to go away on exit. (Default: 60)', type='INTEGER', value='60', read_only='N') (name='transient_page_reallocation', description='Orphaned pages are maintained locally', type='BOOLEAN', value='OFF', read_only='N') (name='typessql', description='Use typessql to attempt to buffer results until all columns are non-null. (Default: off)', type='BOOLEAN', value='OFF', read_only='N')