diff --git a/com.ibm.streamsx.dps/impl/src/RedisClusterPlusPlusDBLayer.cpp b/com.ibm.streamsx.dps/impl/src/RedisClusterPlusPlusDBLayer.cpp
index b2ec284..ade0971 100644
--- a/com.ibm.streamsx.dps/impl/src/RedisClusterPlusPlusDBLayer.cpp
+++ b/com.ibm.streamsx.dps/impl/src/RedisClusterPlusPlusDBLayer.cpp
@@ -1,6 +1,6 @@
/*
# Licensed Materials - Property of IBM
-# Copyright IBM Corp. 2011, 2020
+# Copyright IBM Corp. 2011, 2021
# US Government Users Restricted Rights - Use, duplication or
# disclosure restricted by GSA ADP Schedule Contract with
# IBM Corp.
@@ -5088,9 +5088,13 @@ namespace distributed
exceptionType = REDIS_PLUS_PLUS_OTHER_ERROR;
}
+ // Under normal working conditions of the Redis server(s),
+ // the Redis command executed above shouldn't have raised
+ // any exception.
+ //
// Did we encounter a redis-cluster server connection error?
- if (exceptionType == REDIS_PLUS_PLUS_CONNECTION_ERROR) {
- SPLAPPTRC(L_ERROR, "Inside isConnected: Unable to connect to the redis-cluster server(s). Failed with an exception. Error=" << exceptionString << ". rc=" << DPS_CONNECTION_ERROR, "RedisClusterPlusPlusDBLayer");
+ if (exceptionType != REDIS_PLUS_PLUS_NO_ERROR) {
+ SPLAPPTRC(L_ERROR, "Inside isConnected: Unable to execute a get command. Possible issue connecting to the redis-cluster server(s). Failed with an exception type " << exceptionType << ". Error=" << exceptionString << ". rc=" << DPS_CONNECTION_ERROR << ". Application code may call the DPS reconnect API and then retry the failed operation.", "RedisClusterPlusPlusDBLayer");
// Connection error.
return(false);
} else {
diff --git a/com.ibm.streamsx.dps/info.xml b/com.ibm.streamsx.dps/info.xml
index da5368f..2da0ce0 100644
--- a/com.ibm.streamsx.dps/info.xml
+++ b/com.ibm.streamsx.dps/info.xml
@@ -235,7 +235,7 @@ To specifically learn how to call the DPS APIs from SPL native functions, C++ an
# Reference information
[../../javadoc/dps/index.html| DPS Java API Reference]
- 4.1.2
+ 4.1.3
4.2.0.0
diff --git a/samples/advanced/04_all_dps_apis_at_work_in_spl/com.acme.test/Main.splmm b/samples/advanced/04_all_dps_apis_at_work_in_spl/com.acme.test/Main.splmm
index 479d93e..b1bdbab 100644
--- a/samples/advanced/04_all_dps_apis_at_work_in_spl/com.acme.test/Main.splmm
+++ b/samples/advanced/04_all_dps_apis_at_work_in_spl/com.acme.test/Main.splmm
@@ -1,6 +1,6 @@
/*
* Licensed Materials - Property of IBM
- * Copyright IBM Corp. 2011, 2020
+ * Copyright IBM Corp. 2011, 2021
* US Government Users Restricted Rights - Use, duplication or
* disclosure restricted by GSA ADP Schedule Contract with IBM Corp.
*/
@@ -50,27 +50,27 @@ composite Main {
// two threads will not get into the same store at the same time.
// Let us do a few tests to show how our DPS handles user defined distributed locks.
//
- // Please be aware that this particular test exercises heavy locking and unlocking of
- // the K/V store to have protected read/write operation. If a chosen back-end data store
- // provides eventual consistency (such as Cassandra, Cloudant etc.) or performs
- // put/get operations to/from the disk media (HBase, Mongo, Couchbase etc.), the technical requirements for this test
- // will not be met by such data stores and this test may not finish correctly in such
- // environments (e-g: Cassandra, Cloudant, HBase, Mongo, Couchbase etc.). That is because, data stores with eventual consistency
- // as well as storing it in disks may not return the correct value during a get that
- // immediately follows a put operation. For such data stores, it will take a while
- // (a second or two) before the actual value we put is written to
- // all the replica servers' memory/disk. Hence, LockTest with too many iterations is not a
- // suitable one for such data stores based on eventual consistency put/get as well disk based put/get (HBase, Mongo, Couchbase).
- // Before running this test for those slow data stores, please refer to the commentary at the top of
- // this composite to reduce the test count in order to obtain reasonable results.
+ // Please be aware that this particular test exercises heavy locking and unlocking of
+ // the K/V store to have protected read/write operation. If a chosen back-end data store
+ // provides eventual consistency (such as Cassandra, Cloudant etc.) or performs
+ // put/get operations to/from the disk media (HBase, Mongo, Couchbase etc.), the technical requirements for this test
+ // will not be met by such data stores and this test may not finish correctly in such
+ // environments (e-g: Cassandra, Cloudant, HBase, Mongo, Couchbase etc.). That is because, data stores with eventual consistency
+ // as well as storing it in disks may not return the correct value during a get that
+ // immediately follows a put operation. For such data stores, it will take a while
+ // (a second or two) before the actual value we put is written to
+ // all the replica servers' memory/disk. Hence, LockTest with too many iterations is not a
+ // suitable one for such data stores based on eventual consistency put/get as well disk based put/get (HBase, Mongo, Couchbase).
+ // Before running this test for those slow data stores, please refer to the commentary at the top of
+ // this composite to reduce the test count in order to obtain reasonable results.
() as Sink2 = LockTest() {}
// Do 100K writes and 100K reads and time it.
// Please be aware this high volume test will finish in a decent time for memcached, Redis, and Aerospike.
// However, Cassandra, Cloudant, HBase, Mongo, Couchbase etc. are not very fast due to their disk writes and
// hence it may be a long wait before this test completes when you use those data stores.
- // Before running this test for those slow data stores, please refer to the commentary at the top of
- // this composite to reduce the test count in order to obtain reasonable results.
+ // Before running this test for those slow data stores, please refer to the commentary at the top of
+ // this composite to reduce the test count in order to obtain reasonable results.
() as Sink3 = ReadWritePerformanceTest() {}
// Users can directly execute native back-end data store commands that are of
@@ -401,28 +401,22 @@ composite GeneralTest()
dpsPutTTL(myNewKey, myNewValue, 60u, err, false, false);
printStringLn("Test13-->err-->" + (rstring)err);
printStringLn("dpsPutTTL result for a key with a length of " + (rstring)length(myNewKey) + "=" + (rstring)err);
- if (dpsIsConnected() == true) {
- printStringLn("DPS connection is active. Ending the loop.");
+
+ // DPS reconnect logic is shown here.
+ mutable int32 result = 0;
+ checkAndReconnect(12, result);
+
+ if(result == 0) {
+ // Connection to the backend K/V store is healthy and not broken.
break;
- } else {
- printStringLn("DPS connection went inactive. Attempting to reconnect.");
- // DPS Connection is inative.
- // Loop here until the connection is made.
- mutable int32 reconnectCnt = 0;
- while(dpsReconnect() == false) {
- printStringLn("DPS connection not reestablished. Trying again.");
- reconnectCnt++;
- if (reconnectCnt%4 == 0) {
- printStringLn("Calling dpsHasTTL when reconnectCnt = " + (rstring)reconnectCnt);
- dpsHasTTL(myNewKey, err, false);
- printStringLn("Result of dpsHasTTL. err=" + (rstring)err);
- }
-
- block(5.0);
- }
-
- printStringLn("DPS reconnect successful. Going through the outer loop one more time.");
+ } else if(result == 1) {
+ // Reconnect happened. Let us redo the same operation that
+ // we tried earlier before a broken connection.
continue;
+ } else {
+ // Reconnect didn't happen within the specified reconnect attempt limit.
+ printStringLn("Reconnect attempt limit is not sufficient. Try increasing it.");
+ break;
}
} // End of while loop.
@@ -1696,7 +1690,9 @@ composite StateUpdater() {
// Just to demonstrate that the basic locking operation works, let us use a very low number of lock tests for
// those eventual consistency data stores and the ones that use the disk media for storage.
// Memcached, Redis and Aerospike should work just fine with 1000 iterations, because they provide full data consistency during put and get operations.
- expression $LOCK_TEST_ITERATIONS_NEEDED_FOR_NON_DISK_BASED_STORES : 1000;
+ // For testing the backend K/V store reconect logic, you may want to increase the following
+ // iterations for non disk based stores by 100 times i.e. from 1K to 100K.
+ expression $LOCK_TEST_ITERATIONS_NEEDED_FOR_NON_DISK_BASED_STORES : 1 * 1000;
expression $LOCK_TEST_ITERATIONS_NEEDED_FOR_DISK_BASED_STORES : 10;
// For those data stores that only provides eventual consistency (Cassandra, Cloudant etc.), you can increase the delay value here
// from 0.0 to 3.0. That will allow the eventual consistency to work correctly so that we can read the correct data that was put earlier.
@@ -1730,6 +1726,7 @@ composite StateUpdater() {
$LOCK_TEST_ITERATIONS_NEEDED_FOR_DISK_BASED_STORES : $LOCK_TEST_ITERATIONS_NEEDED_FOR_NON_DISK_BASED_STORES;
mutable int32 cnt = 0;
Src oTuple = {a=1b};
+ printStringLn("Starting " + (rstring)loopCnt + " distributed lock operations.");
// Stay in a loop and send the lock test signals.
while(cnt++ < loopCnt) {
@@ -1756,145 +1753,462 @@ composite StateUpdater() {
onTuple Src: {
mutable boolean done = false;
mutable uint64 err = 0;
+ mutable int32 cnt = 0;
+ mutable uint32 pid = 0u;
+ mutable int32 result = 0;
lockTestCnt++;
if(first) {
- first = false;
- rstring dummyRstring = "";
- int32 dummyInt32 = 0;
-
- rstring dbName = dpsGetNoSqlDbProductName();
- iterationsNeeded = (dbName == "cassandra" || dbName == "cloudant" ||
- dbName == "hbase" || dbName == "couchbase") ?
- $LOCK_TEST_ITERATIONS_NEEDED_FOR_DISK_BASED_STORES : $LOCK_TEST_ITERATIONS_NEEDED_FOR_NON_DISK_BASED_STORES;
- // Even though HBase is a disk based data store, in my tests I observed that this delay is not needed for HBase.
- // If someone needs this delay for HBase, it can always be added here easily.
- delayBetweenPutAndGet = (dbName == "cassandra" || dbName == "cloudant") ?
- $DELAY_NEEDED_FOR_DISK_BASED_STORES : $DELAY_NEEDED_FOR_NON_DISK_BASED_STORES;
-
- // If multiple attempts are made to create the same store in parallel from different operators,
- // we may get concurrent execution exception in Cassandra. Hence, let us retry this for 5 times.
- // In general, such multiple tries are not required. We are doing it here for pedantic reasons.
- mutable int32 cnt = 0;
-
- while(cnt++ <= 4) {
- s = dpsCreateOrGetStore("Super_Duper_Store", dummyRstring, dummyInt32, err);
- if (err != 0ul) {
- printStringLn("Error in dpsCreateOrGetStore(Super_Duper_Store) rc = " +
- (rstring)dpsGetLastStoreErrorCode() + ", msg=" + dpsGetLastStoreErrorString());
- } else {
- printStringLn("My Super_Duper_Store id = " + (rstring)s + ". Obtained in " +
- (rstring)cnt + " attempt(s).");
- break;
- }
- }
-
- if (err != 0ul) {
- return;
- }
-
- assert(err==0ul);
- // Get a user defined distributed lock thar will be used to have a thread safe access into the store we created above.
- l = dlCreateOrGetLock("Super_Duper_Lock", err);
- if (err != 0ul) {
- printStringLn("Error in dlCreateOrGetLock(Super_Duper_Lock) rc = " +
- (rstring)dlGetLastDistributedLockErrorCode() + ", msg=" + dlGetLastDistributedLockErrorString());
- return;
- } else {
- printStringLn("My distributed Super_Duper_Lock id = " + (rstring)l);
- }
- assert(err==0ul);
- }
+ first = false;
+ rstring dummyRstring = "";
+ int32 dummyInt32 = 0;
+
+ rstring dbName = dpsGetNoSqlDbProductName();
+ iterationsNeeded = (dbName == "cassandra" || dbName == "cloudant" ||
+ dbName == "hbase" || dbName == "couchbase") ?
+ $LOCK_TEST_ITERATIONS_NEEDED_FOR_DISK_BASED_STORES : $LOCK_TEST_ITERATIONS_NEEDED_FOR_NON_DISK_BASED_STORES;
+ // Even though HBase is a disk based data store, in my tests I observed that this delay is not needed for HBase.
+ // If someone needs this delay for HBase, it can always be added here easily.
+ delayBetweenPutAndGet = (dbName == "cassandra" || dbName == "cloudant") ?
+ $DELAY_NEEDED_FOR_DISK_BASED_STORES : $DELAY_NEEDED_FOR_NON_DISK_BASED_STORES;
+
+ // If multiple attempts are made to create the same store in parallel from different operators,
+ // we may get concurrent execution exception in Cassandra. Hence, let us retry this for 5 times.
+ // In general, such multiple tries are not required. We are doing it here for pedantic reasons.
+ while(++cnt <= 5) {
+ s = dpsCreateOrGetStore("Super_Duper_Store", dummyRstring, dummyInt32, err);
+
+ if (err != 0ul) {
+ printStringLn("Error in dpsCreateOrGetStore(Super_Duper_Store) rc = " +
+ (rstring)dpsGetLastStoreErrorCode() + ", msg=" + dpsGetLastStoreErrorString() + ", attempCnt = " + (rstring)cnt);
+
+ checkAndReconnect(12, result);
+
+ if(result == 0) {
+ // This error is not related to a broken connection.
+ // Treat the earlier error as a DPS API error and do
+ // application-specific error processing.
+ break;
+ } else if(result == 1) {
+ // Reconnect happened. Let us redo the same operation that
+ // failed earlier due to a broken connection.
+ cnt--;
+ continue;
+ } else {
+ // Reconnect didn't happen within the specified reconnect attempt limit.
+ printStringLn("Reconnect attempt limit is not sufficient. Try increasing it.");
+ break;
+ }
+ } else {
+ printStringLn("My Super_Duper_Store id = " + (rstring)s + ". Obtained in " +
+ (rstring)cnt + " attempt(s).");
+ break;
+ }
+ } // End of while(++cnt <= 5) {
+
+ if (err != 0ul) {
+ printStringLn("Unable to get Super_Duper_Store id. Stopping the lock Test");
+ return;
+ }
+
+ assert(err==0ul);
+
+ // Get a user defined distributed lock thar will be used to have a thread safe access into the store we created above.
+ cnt = 0;
+
+ while(++cnt <= 5) {
+ l = dlCreateOrGetLock("Super_Duper_Lock", err);
+
+ if (err != 0ul) {
+ printStringLn("Error in dlCreateOrGetLock(Super_Duper_Lock) rc = " +
+ (rstring)dlGetLastDistributedLockErrorCode() + ", msg=" + dlGetLastDistributedLockErrorString() + ", attemptCnt = " + (rstring)cnt);
+
+ checkAndReconnect(12, result);
+
+ if(result == 0) {
+ // This error is not related to a broken connection.
+ // Treat the earlier error as a DPS API error and do
+ // application-specific error processing.
+ break;
+ } else if(result == 1) {
+ // Reconnect happened. Let us redo the same operation that
+ // failed earlier due to a broken connection.
+ cnt--;
+ continue;
+ } else {
+ // Reconnect didn't happen within the specified reconnect attempt limit.
+ printStringLn("Reconnect attempt limit is not sufficient. Try increasing it.");
+ break;
+ }
+ } else {
+ printStringLn("My distributed Super_Duper_Lock id = " + (rstring)l + ". Obtained in " + (rstring)cnt + " attempt(s).");
+ break;
+ }
+ } // End of while(++cnt <= 5) {
+
+ if (err != 0ul) {
+ printStringLn("Unable to do dlCreateOrGetLock(Super_Duper_Lock). Stopping the lock Test");
+ return;
+ }
+
+ assert(err==0ul);
+ } // End of if(first)
// We have an utility function that will return us the process id currently owning this lock.
// Let us exercise that API.
- mutable uint32 pid = dlGetPidForLock("Super_Duper_Lock", err);
+ cnt = 0;
+
+ while(++cnt <= 5) {
+ pid = dlGetPidForLock("Super_Duper_Lock", err);
+ if (err != 0ul) {
+ printStringLn("Error in dlGetPidForLock(Super_Duper_Lock) rc = " +
+ (rstring)dlGetLastDistributedLockErrorCode() + ", msg=" + dlGetLastDistributedLockErrorString() + ", attemptCnt = " + (rstring)cnt);
+
+ checkAndReconnect(12, result);
+
+ if(result == 0) {
+ // This error is not related to a broken connection.
+ // Treat the earlier error as a DPS API error and do
+ // application-specific error processing.
+ break;
+ } else if(result == 1) {
+ // Reconnect happened. Let us redo the same operation that
+ // failed earlier due to a broken connection.
+ cnt--;
+ continue;
+ } else {
+ // Reconnect didn't happen within the specified reconnect attempt limit.
+ printStringLn("Reconnect attempt limit is not sufficient. Try increasing it.");
+ break;
+ }
+ } else {
+ printStringLn("Before lock acquisition: pid owning the Super_Duper_Lock = " + (rstring)pid);
+ break;
+ }
+ } // End of while(++cnt <= 5)
+
if (err != 0ul) {
- printStringLn("Error in dlGetPidForLock(Super_Duper_Lock) rc = " +
- (rstring)dlGetLastDistributedLockErrorCode() + ", msg=" + dlGetLastDistributedLockErrorString());
- } else {
- printStringLn("Before lock acquisition: pid owning the Super_Duper_Lock = " + (rstring)pid);
+ printStringLn("Unable to do dlCreateOrGetLock(Super_Duper_Lock). Stopping the lock Test");
+ return;
}
+
+ assert(err==0ul);
- // mutual exclusion
- // Our distributed locking works based on the assumption that every thread interested in
- // accessing the same store will play a fair game. That means every thread will get into the
- // store only if they can successfully acquire a lock. There should not be any rogue threads that
- // will bypass this gentlemanly agreement and get into the store without owning a lock.
- // It is purely a trust based cooperative locking scheme.
- //
- // printStringLn("Begin lock acquisition");
- // Acquire that lock with a lease time for 30 seconds and wait no more than 40 seconds to acquire the lock.
- // These high time values are needed for the eventual consistency based data stores to work correctly (Cassandra, Cloudant etc.)
- dlAcquireLock(l, 30.0, 40.0, err);
- // If we can't aquire the lock, let us return from here.
+ // mutual exclusion
+ // Our distributed locking works based on the assumption that every thread interested in
+ // accessing the same store will play a fair game. That means every thread will get into the
+ // store only if they can successfully acquire a lock. There should not be any rogue threads that
+ // will bypass this gentlemanly agreement and get into the store without owning a lock.
+ // It is purely a trust based cooperative locking scheme.
+ //
+ // printStringLn("Begin lock acquisition");
+
+ // Acquire that lock with a lease time for 30 seconds and wait no more than 40 seconds to acquire the lock.
+ // These high time values are needed for the eventual consistency based data stores to work correctly (Cassandra, Cloudant etc.)
+ cnt = 0;
+
+ while(++cnt <= 5) {
+ dlAcquireLock(l, 30.0, 40.0, err);
+
+ if (err != 0ul) {
+ printStringLn("Failed to acquire a lock. rc = " +
+ (rstring)dlGetLastDistributedLockErrorCode() + ", msg=" + dlGetLastDistributedLockErrorString() + ", attemptCnt = " + (rstring)cnt);
+
+ checkAndReconnect(12, result);
+
+ if(result == 0) {
+ // This error is not related to a broken connection.
+ // Treat the earlier error as a DPS API error and do
+ // application-specific error processing.
+ break;
+ } else if(result == 1) {
+ // Reconnect happened. Let us redo the same operation that
+ // failed earlier due to a broken connection.
+ cnt--;
+ continue;
+ } else {
+ // Reconnect didn't happen within the specified reconnect attempt limit.
+ printStringLn("Reconnect attempt limit is not sufficient. Try increasing it.");
+ break;
+ }
+ } else {
+ printStringLn("Successful lock acquisition for Super_Duper_Lock with id= " + (rstring)l);
+ break;
+ }
+ } // End of while(++cnt <= 5)
+
if (err != 0ul) {
- printStringLn("Failed to acquire a lock. rc = " +
- (rstring)dlGetLastDistributedLockErrorCode() + ", msg=" + dlGetLastDistributedLockErrorString());
-
- return;
+ printStringLn("Unable to do dlAcquireLock. Stopping the lock Test");
+ return;
}
+
assert(err==0ul);
- // This debug print is here to test the lock acquisition logic for data stores such as Cassandra, Cloudant etc.
+
+ // This debug print is here to test the lock acquisition logic for data stores such as Cassandra, Cloudant etc.
// printStringLn("End lock acquisition");
- pid = dlGetPidForLock("Super_Duper_Lock", err);
+
+ cnt = 0;
+
+ while(++cnt <= 5) {
+ pid = dlGetPidForLock("Super_Duper_Lock", err);
- if (err != 0ul) {
- printStringLn("Error in dlGetPidForLock(Super_Duper_Lock) rc = " +
- (rstring)dlGetLastDistributedLockErrorCode() + ", msg=" + dlGetLastDistributedLockErrorString());
- } else {
- printStringLn("After lock acquisition: pid owning the Super_Duper_Lock = " + (rstring)pid);
- }
+ if (err != 0ul) {
+ printStringLn("Error in dlGetPidForLock(Super_Duper_Lock) rc = " +
+ (rstring)dlGetLastDistributedLockErrorCode() + ", msg=" + dlGetLastDistributedLockErrorString() + ", attemptCnt = " + (rstring)cnt);
+
+ checkAndReconnect(12, result);
+
+ if(result == 0) {
+ // This error is not related to a broken connection.
+ // Treat the earlier error as a DPS API error and do
+ // application-specific error processing.
+ break;
+ } else if(result == 1) {
+ // Reconnect happened. Let us redo the same operation that
+ // failed earlier due to a broken connection.
+ cnt--;
+ continue;
+ } else {
+ // Reconnect didn't happen within the specified reconnect attempt limit.
+ printStringLn("Reconnect attempt limit is not sufficient. Try increasing it.");
+ break;
+ }
+ } else {
+ printStringLn("After lock acquisition: pid owning the Super_Duper_Lock = " + (rstring)pid);
+ break;
+ }
+ } // End of while(++cnt <= 5)
+ if (err != 0ul) {
+ printStringLn("Unable to do post lock acquisition dlGetPidForLock. Stopping the lock Test");
+ return;
+ }
+
+ assert(err==0ul);
+
mutable int32 val = 0;
// For the eventual consistency based data stores (Cassandra, Cloudant etc.), let us wait for a while before
// the actual value is propagated (Eventual consistency is really odd.). We will do a get after that wait.
if (delayBetweenPutAndGet > 0.0) {
block(delayBetweenPutAndGet);
}
- dpsGet(s, "myKey", val, err);
- dpsPut(s, "myKey", val+1, err);
+
+ cnt = 0;
+
+ while(++cnt <= 5) {
+ dpsGet(s, "myKey", val, err);
+
+ if (err != 0ul) {
+ printStringLn("Error in dpsGet. rc = " +
+ (rstring)dpsGetLastStoreErrorCode() + ", msg=" + dpsGetLastStoreErrorString() + ", attemptCnt = " + (rstring)cnt);
+
+ checkAndReconnect(12, result);
+
+ if(result == 0) {
+ // This error is not related to a broken connection.
+ // Treat the earlier error as a DPS API error and do
+ // application-specific error processing.
+ break;
+ } else if(result == 1) {
+ // Reconnect happened. Let us redo the same operation that
+ // failed earlier due to a broken connection.
+ cnt--;
+ continue;
+ } else {
+ // Reconnect didn't happen within the specified reconnect attempt limit.
+ printStringLn("Reconnect attempt limit is not sufficient. Try increasing it.");
+ break;
+ }
+ } else {
+ printStringLn("dpsGet successful.");
+ break;
+ }
+ } // End of while(++cnt <= 5)
+
+ if (err != 0ul) {
+ printStringLn("Unable to do dpsGet. Stopping the lock Test");
+ return;
+ }
+
+ assert(err==0ul);
+
+ cnt = 0;
+
+ if(lockTestCnt == 1) {
+ // For the very first iteration, dpsGet done above will not
+ // get a meaningful value for val. That is because the
+ // key may not exist in the store when we come here for the
+ // very first iteration. If the value obtained is not meaningful,
+ // we will reset it to 0.
+ if (val > iterationsNeeded * 2) {
+ val = 0;
+ }
+ }
+
+ while(++cnt <= 5) {
+ dpsPut(s, "myKey", val+1, err);
+
+ if (err != 0ul) {
+ printStringLn("Error in dpsPut. rc = " +
+ (rstring)dpsGetLastStoreErrorCode() + ", msg=" + dpsGetLastStoreErrorString() + ", attemptCnt = " + (rstring)cnt);
+
+ checkAndReconnect(12, result);
+
+ if(result == 0) {
+ // This error is not related to a broken connection.
+ // Treat the earlier error as a DPS API error and do
+ // application-specific error processing.
+ break;
+ } else if(result == 1) {
+ // Reconnect happened. Let us redo the same operation that
+ // failed earlier due to a broken connection.
+ cnt--;
+ continue;
+ } else {
+ // Reconnect didn't happen within the specified reconnect attempt limit.
+ printStringLn("Reconnect attempt limit is not sufficient. Try increasing it.");
+ break;
+ }
+ } else {
+ printStringLn("dpsPut successful.");
+ break;
+ }
+ } // End of while(++cnt <= 5)
+
+ if (err != 0ul) {
+ printStringLn("Unable to do dpsPut. Stopping the lock Test");
+ return;
+ }
+
+ assert(err==0ul);
+
printStringLn("val=" + (rstring)val + " as read from the store during lock test #" + (rstring)lockTestCnt);
// Only one of the two PEs currently in the race to get from and
// put into that same store will win in obtaining a chance to
// store the value of $LOCK_TEST_ITERATIONS_NEEDED * 2u.
if(val+1 == (iterationsNeeded * 2)) {
- done=true;
- printStringLn("'myKey' => " + (rstring)(val+1));
- dpsRemoveStore(s, err);
-
- if (err != 0ul) {
- printStringLn("Store removal error. rc = " +
- (rstring)dpsGetLastStoreErrorCode() + ", msg=" + dpsGetLastStoreErrorString());
- } else {
- printStringLn("Super_Duper_Store with a store id of " + (rstring)s + " has now been removed from the data store.");
- }
-
- assert(err==0ul);
+ done=true;
+ printStringLn("'myKey' => " + (rstring)(val+1));
+ cnt = 0;
+
+ while(++cnt <= 5) {
+ dpsRemoveStore(s, err);
+
+ if (err != 0ul) {
+ printStringLn("Store removal error. rc = " +
+ (rstring)dpsGetLastStoreErrorCode() + ", msg=" + dpsGetLastStoreErrorString() + ", attemptCnt = " + (rstring)cnt);
+
+ checkAndReconnect(12, result);
+
+ if(result == 0) {
+ // This error is not related to a broken connection.
+ // Treat the earlier error as a DPS API error and do
+ // application-specific error processing.
+ break;
+ } else if(result == 1) {
+ // Reconnect happened. Let us redo the same operation that
+ // failed earlier due to a broken connection.
+ cnt--;
+ continue;
+ } else {
+ // Reconnect didn't happen within the specified reconnect attempt limit.
+ printStringLn("Reconnect attempt limit is not sufficient. Try increasing it.");
+ break;
+ }
+ } else {
+ printStringLn("Super_Duper_Store with a store id of " + (rstring)s + " has now been removed from the data store.");
+ break;
+ }
+ } // End of while(++cnt <= 5)
+
+ if(err != 0ul) {
+ printStringLn("Unable to do dpsRemoveStore. Stopping the lock Test");
+ return;
+ }
+
+ assert(err==0ul);
}
- dlReleaseLock(l, err);
-
- if (err != 0ul) {
- printStringLn("Lock release error. rc = " +
- (rstring)dlGetLastDistributedLockErrorCode() + ", msg=" + dlGetLastDistributedLockErrorString());
- }
+ cnt = 0;
+
+ while(++cnt <= 5) {
+ dlReleaseLock(l, err);
+ if (err != 0ul) {
+ printStringLn("Lock release error. rc = " +
+ (rstring)dlGetLastDistributedLockErrorCode() + ", msg=" + dlGetLastDistributedLockErrorString() + ", attemptCnt = " + (rstring)cnt);
+
+ checkAndReconnect(12, result);
+
+ if(result == 0) {
+ // This error is not related to a broken connection.
+ // Treat the earlier error as a DPS API error and do
+ // application-specific error processing.
+ break;
+ } else if(result == 1) {
+ // Reconnect happened. Let us redo the same operation that
+ // failed earlier due to a broken connection.
+ cnt--;
+ continue;
+ } else {
+ // Reconnect didn't happen within the specified reconnect attempt limit.
+ printStringLn("Reconnect attempt limit is not sufficient. Try increasing it.");
+ break;
+ }
+ } else {
+ printStringLn("dlReleaseLock successful.");
+ break;
+ }
+ } // End of while(++cnt <= 5)
+
+ if(err != 0ul) {
+ printStringLn("Unable to do dlReleaseLock. Stopping the lock Test");
+ return;
+ }
+
assert(err==0ul);
if(done) {
- dlRemoveLock(l, err);
-
- if (err != 0ul) {
- printStringLn("Lock removal error. rc = " +
- (rstring)dlGetLastDistributedLockErrorCode() + ", msg=" + dlGetLastDistributedLockErrorString());
- } else {
- printStringLn("Super_Duper_Lock with a lock id of " + (rstring)l + " has now been removed from the data store.");
- }
-
- assert(err==0ul);
+ cnt = 0;
+
+ while(++cnt <= 5) {
+ dlRemoveLock(l, err);
+
+ if (err != 0ul) {
+ printStringLn("Lock removal error. rc = " +
+ (rstring)dlGetLastDistributedLockErrorCode() + ", msg=" + dlGetLastDistributedLockErrorString());
+
+ checkAndReconnect(12, result);
+
+ if(result == 0) {
+ // This error is not related to a broken connection.
+ // Treat the earlier error as a DPS API error and do
+ // application-specific error processing.
+ break;
+ } else if(result == 1) {
+ // Reconnect happened. Let us redo the same operation that
+ // failed earlier due to a broken connection.
+ cnt--;
+ continue;
+ } else {
+ // Reconnect didn't happen within the specified reconnect attempt limit.
+ printStringLn("Reconnect attempt limit is not sufficient. Try increasing it.");
+ break;
+ }
+ } else {
+ printStringLn("Super_Duper_Lock with a lock id of " + (rstring)l + " has now been removed from the data store.");
+ break;
+ }
+ } // End of while(++cnt <= 5)
+
+ if (err != 0ul) {
+ printStringLn("Unable to do dlRemoveLock. Stopping the lock Test");
+ return;
+ }
+
+ assert(err==0ul);
}
}
}
@@ -1913,7 +2227,9 @@ composite ReadWritePerformanceTest() {
param
// Reduce this operations count to 100*1 when testing with relatively slow performing data stores (Cassandra, Cloudant, HBase etc.)
// Memcached, Redis and Aerospike should work just fine for 100*1000 operations, because they are somewhat faster than the others.
- expression $PUT_GET_OPERATIONS_NEEDED_FOR_IN_MEMORY_DATA_STORES : 100*1000;
+ // For testing the backend K/V store reconect logic, you may want to increase the following
+ // iterations for non disk based stores by 10 times i.e. from 100K to 1000K.
+ expression $PUT_GET_OPERATIONS_NEEDED_FOR_IN_MEMORY_DATA_STORES : 100 * 1000;
expression $PUT_GET_OPERATIONS_NEEDED_FOR_DISK_BASED_DATA_STORES : 100*1;
graph
@@ -1945,18 +2261,59 @@ composite ReadWritePerformanceTest() {
printStringLn("dpsCreateOrGetStore(PerfStore1)--> Error code=" + (rstring)dpsGetLastStoreErrorCode() +
", msg = " + dpsGetLastStoreErrorString());
}
-
+
+ printStringLn("Starting " + (rstring)maxPutGetOperationsCount + " put operations.");
+
mutable int32 myCnt = 0;
+ mutable int32 consecutiveDpsApiRetryCnt = 0;
+
// Do a bulk write.
while(++myCnt <= maxPutGetOperationsCount) {
dpsPut(s, myDummyRString + (rstring)myCnt, myDummyRString + (rstring)myCnt, err);
- }
+
+ ///// if(err == 102ul || err == 107ul || err == 108ul || err == 501ul) {
+ if(err != 0ul) {
+ mutable int32 result = 0;
+ checkAndReconnect(12, result);
+
+ if(result == 0) {
+ // This error is not related to a broken connection.
+ // Treat the earlier error as a DPS API error and do
+ // application-specific error processing. It is possible
+ // for a broken connection error to be sensed by the
+ // client driver after several seconds. We have to make
+ // sure that the error we got above is not due to
+ // a broken connection. So, before we label this as an
+ // DPS API error, we will allow a retry of the
+ // same operation for 1500 times. (Do your testing to
+ // arrive at an optimal retry count for your application.)
+ if(++consecutiveDpsApiRetryCnt <= 1500) {
+ myCnt--;
+ continue;
+ } else {
+ break;
+ }
+ } else if(result == 1) {
+ // Reconnect happened. Let us redo the same operation that
+ // failed earlier due to a broken connection.
+ myCnt--;
+ continue;
+ } else {
+ // Reconnect didn't happen within the specified reconnect attempt limit.
+ printStringLn("Reconnect attempt limit is not sufficient. Try increasing it.");
+ break;
+ }
+ }
+
+ // If we are here, that means the DPS API didn't have any error.
+ // Reset the retry count.
+ consecutiveDpsApiRetryCnt = 0;
+ } // End of bulk write while loop.
if (err > 0ul) {
- printStringLn("Error in dpsPut: rc=" + (rstring)dpsGetLastStoreErrorCode() +
- ", msg = " + dpsGetLastStoreErrorString());
- abort();
+ printStringLn("Error in dpsPut: rc=" + (rstring)err + ". Stopping the R/W performance test.");
+ return;
}
_timeNow = getTimestamp();
@@ -1964,21 +2321,61 @@ composite ReadWritePerformanceTest() {
mutable int64 _totalExecutionTime_ = _timeInNanoSecondsAfterExecution - _timeInNanoSecondsBeforeExecution;
printStringLn("Time taken for " + (rstring)maxPutGetOperationsCount +
" insertions = " + (rstring)_totalExecutionTime_ + " nanosecs");
+
+ printStringLn("Starting " + (rstring)maxPutGetOperationsCount + " get operations.");
myCnt = 0;
mutable rstring myValue = 0;
_timeNow = getTimestamp();
_timeInNanoSecondsBeforeExecution = ((getSeconds(_timeNow) * (int64)1000000000) + (int64)getNanoseconds(_timeNow));
+ consecutiveDpsApiRetryCnt = 0;
// Do a bulk read.
while(++myCnt <= maxPutGetOperationsCount) {
dpsGet(s, myDummyRString + (rstring)myCnt, myValue, err);
- }
+
+ ///// if(err == 102ul || err == 107ul || err == 108ul || err == 501ul) {
+ if(err != 0ul) {
+ mutable int32 result = 0;
+ checkAndReconnect(12, result);
+
+ if(result == 0) {
+ // This error is not related to a broken connection.
+ // Treat the earlier error as a DPS API error and do
+ // application-specific error processing. It is possible
+ // for a broken connection error to be sensed by the
+ // client driver after several seconds. We have to make
+ // sure that the error we got above is not due to
+ // a broken connection. So, before we label this as an
+ // DPS API error, we will allow a retry of the
+ // same operation for 1500 times. (Do your testing to
+ // arrive at an optimal retry count for your application.)
+ if(++consecutiveDpsApiRetryCnt <= 1500) {
+ myCnt--;
+ continue;
+ } else {
+ break;
+ }
+ } else if(result == 1) {
+ // Reconnect happened. Let us redo the same operation that
+ // failed earlier due to a broken connection.
+ myCnt--;
+ continue;
+ } else {
+ // Reconnect didn't happen within the specified reconnect attempt limit.
+ printStringLn("Reconnect attempt limit is not sufficient. Try increasing it.");
+ break;
+ }
+ }
+
+ // If we are here, that means the DPS API didn't have any error.
+ // Reset the retry count.
+ consecutiveDpsApiRetryCnt = 0;
+ } // End of bulk read while loop.
if (err > 0ul) {
- printStringLn("Error in dpsGet: rc=" + (rstring)dpsGetLastStoreErrorCode() +
- ", msg = " + dpsGetLastStoreErrorString());
- abort();
+ printStringLn("Error in dpsGet: rc=" + (rstring)err + ". Stopping the R/W performance test.");
+ return;
}
_timeNow = getTimestamp();
@@ -3452,3 +3849,89 @@ stateful public void displayHBaseTableRows(rstring jsonResponse) {
}
}
}
+
+/*
+This function can be helpful to the IBM Streams applications that use the
+DPS toolkit to check if any non-zero DPS API return code is related to a
+broken connection with the underlying backend K/V store.
+
+This function will do a check to see if the DPS client connection to the
+backend K/V store is good or broken. If it is broken, it will
+attempt to reconnect until it succeeds or it runs out of the caller given
+reconnect attempt limit whichever happens first. It conveys the action result
+in the caller given result (second) function argument.
+
+result will be set to 0 to indicate connection is healthy and not broken.
+result will be set to 1 to indicate that reconnect is successful.
+result will be set to 2 to indicate that reconnect is not successful within the caller given reconnect attempt limit.
+
+IMPORTANT
+---------
+The reconnect logic below will work only when there is a single primary back-end server that goes down at any given time.
+If there is more than one primary server in down status at any given time, the logic below will not work correctly.
+So, it is important for the backend K/V store administrator to bring back the failed primary server to running status
+before any other primary server goes down subsequently.
+*/
+stateful public void checkAndReconnect(int32 reconnectAttemptLimit, mutable int32 result) {
+ mutable int32 cnt = 0;
+ mutable boolean connectionStatus = false;
+
+ /*
+ K/V store backend connection error takes a few seconds to propagate to the client.
+ So, we will do a lightning round of connection status check for a few times.
+ We may detect the broken connection status much before completing this full loop count.
+ If it is not found to be sufficient during testing inside a particular application,
+ please plan to change it to a suitable number of times for this loop to iterate.
+ This loop will iterate 300 times in about 80 milliseconds.
+ */
+ while(++cnt <= 300) {
+ connectionStatus = dpsIsConnected();
+
+ if(connectionStatus == false) {
+ // There is a connection error.
+ break;
+ }
+ }
+
+ /*
+ If connection status still remains true, then the DPS API
+ operation error is not broken connection related. In that case,
+ application has to treat that as API related error.
+ */
+ if (connectionStatus == true) {
+ appTrc(Trace.info, "checkAndReconnect: Connection is healthy after doing " + (rstring)(cnt-1) + " status checks. Application can treat the DPS API error as a non-connection related error.");
+ // Set it to indicate connection is healthy and not broken.
+ result = 0;
+ return;
+ } else {
+ // Broken connection error detected.
+ appTrc(Trace.error, "checkAndRecoonect: Back-end K/V store broken connection error is detected after doing " +
+ (rstring)cnt + " connection status check(s).");
+ }
+
+ mutable int32 reconnectAttempt = 0;
+
+ // Stay in this loop and attempt to reconnect upto a caller specified number of times.
+ while(++reconnectAttempt <= reconnectAttemptLimit) {
+ appTrc(Trace.error, "checkAndRecoonect: Reconnect attempt " + (rstring)reconnectAttempt);
+ // Wait 5 seconds
+ block(5.0);
+
+ // Retry to connect
+ if (dpsReconnect() == false) {
+ // If that was not successful, try again.
+ continue;
+ } else {
+ appTrc(Trace.error, "checkAndReconnect: Reconnect is successful.");
+ // Set it to indicate connection was broken and it is now reestablished.
+ result = 1;
+ return;
+ }
+ } // End of while loop.
+
+ // Set it to indicate connection was broken and it is not
+ // reestablished within the caller given reconnect attempt limit.
+ appTrc(Trace.error, "checkAndReconnect: Reconnect is not successful after the reconnect attempt limit of " + (rstring)reconnectAttemptLimit + ".");
+ result = 2;
+ return;
+} // End of function checkAndReconnect
diff --git a/samples/advanced/04_all_dps_apis_at_work_in_spl/info.xml b/samples/advanced/04_all_dps_apis_at_work_in_spl/info.xml
index a51836a..6d850fa 100644
--- a/samples/advanced/04_all_dps_apis_at_work_in_spl/info.xml
+++ b/samples/advanced/04_all_dps_apis_at_work_in_spl/info.xml
@@ -4,7 +4,7 @@
04_all_dps_apis_at_work_in_spl
- 1.0.2
+ 1.0.3
4.2.0.0