diff --git a/com.ibm.streamsx.dps/com.ibm.streamsx/store/distributed/native.function/function.xml b/com.ibm.streamsx.dps/com.ibm.streamsx/store/distributed/native.function/function.xml
index 91e8c4c..fc4db34 100644
--- a/com.ibm.streamsx.dps/com.ibm.streamsx/store/distributed/native.function/function.xml
+++ b/com.ibm.streamsx.dps/com.ibm.streamsx/store/distributed/native.function/function.xml
@@ -435,40 +435,69 @@ it does safety checks and is therefore slower.
- This function can be called to get multiple keys present in a given store.
+ This function can be called to get i.e. read/fetch multiple keys present in a given store.
@param store The handle of the store.
@param keys User provided mutable list variable. This list must be suitable for storing multiple keys found in a given store and it must be made of a given store's key data type.
- @param keyStartPosition User can indicate a start position from where keys should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
+ @param keyStartPosition User can indicate a zero index based start position from where keys should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
@param numberOfKeysNeeded User can indicate the total number of keys to be returned as available from the given key start position. It must be greater than or equal to 0 and less than or equal to 50000. If it is set to 0, then all the available keys upto a maximum of 50000 keys from the given key start position will be returned.
@param keyExpression User can provide an expression made of the attributes from the key's data type. This expression will be evaluated in determining which matching keys to be returned. Due to very high logic complexity, this feature is not implemented at this time.
@param valueExpression User can provide an expression made of the attributes from the value's data type. This expression will be evaluated in determining which matching keys to be returned. Due to very high logic complexity, this feature is not implemented at this time.
- @param err Contains the error code. Will be '0' if no error occurs, and a non-zero value otherwise.
+@param err User provided mutable uint64 typed variable in which the result of this bulk get keys operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
<any T1> public stateful void dpsGetKeys(uint64 store, mutable list<T1> keys, int32 keyStartPosition, int32 numberOfKeysNeeded, rstring keyExpression, rstring valueExpression, mutable uint64 err)
- This function can be called to get values for a given list of multiple keys present in a given store.
+ This function can be called to get i.e. read/fetch values for a given list of multiple keys present in a given store.
@param store The handle of the store.
-@param keys User provided list variable that contains keys for which values need to be fetched.
-@param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the key appears in the user provided keys list. Before using the individual values from this list, it is recommended to make sure that a given value fetch worked with no errors.
-@param errors User provided mutable list variable in which the individual success or failure value fetch result codes will be returned. This list must be of type uint64. Each list element will be 0 if no error occurs and a non-zero error code otherwise. Such value fetch result codes will be available in this list at the same index where the key appears in the user provided keys list. If a given result code doesn't indicate a successful value fetch, it is better to skip the corresponding element at the same index in the mutable values list.
-@return It returns true if value fetch worked for all given keys with no errors. Else, it returns false to indicate that value fetch encountered error for one or more keys.
+@param keys User provided list variable that contains keys for which values need to be fetched. It must be made of a given store's key data type.
+@param keyExistsOrNot User provided mutable list variable in which true or false status for every user given key will be returned. This is done to indicate to the user about if that key exists or not in the given store. This list must be made of a boolean data type. Status returned in this list can be used in combination with the items returned in the values list which is explained next.
+@param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the key appeared in the user provided keys list. If a user given key is not present in the store, there will still be a default value returned in that key's index. It is better to first confirm in the keyExistsOrNot list that will carry true or false status about the existence of all the user provided keys. Depending on the key existence status found in that other list, user can decide to either consider using or ignore a value in a given index of the values list.
+@param err User provided mutable uint64 typed variable in which the result of this bulk get values operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
- <any T1, any T2> public stateful boolean dpsGetValues(uint64 store, list<T1> keys, mutable list<T2> values, mutable list<uint64> errors)
+ <any T1, any T2> public stateful void dpsGetValues(uint64 store, list<T1> keys, mutable list<boolean> keyExistsOrNot, mutable list<T2> values, mutable uint64 err)
- This function can be called to get multiple Key/Value (KV) pairs present in a given store.
+ This function can be called to get i.e. read/fetch multiple Key/Value (KV) pairs present in a given store.
@param store The handle of the store.
@param keys User provided mutable list variable in which the keys found in the store will be returned. This list must be suitable for storing multiple keys found in a given store and it must be made of a given store's key data type.
-@param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the corresponding key appears in the keys list. Before using the individual values from this list, it is recommended to make sure that a given value fetch worked with no errors.
-@param keyStartPosition User can indicate a start position from where the K/V pairs should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
+@param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the corresponding key appears in the keys list.
+@param keyStartPosition User can indicate a zero index based start position from where the K/V pairs should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
@param numberOfPairsNeeded User can indicate the total number of K/V pairs to be returned as available from the given key start position. It must be greater than or equal to 0 and less than or equal to 50000. If it is set to 0, then all the available K/V pairs upto a maximum of 50000 pairs from the given key start position will be returned.
-@param errors User provided mutable list variable in which the individual success or failure value fetch result codes will be returned. This list must be of type uint64. Each list element will be 0 if no error occurs and a non-zero error code otherwise. Such value fetch result codes will be available in this list at the same index where the key appears in the keys list. If a given result code doesn't indicate a successful value fetch, it is better to skip the corresponding element at the same index in the mutable values list.
-@return It returns true if value fetch worked for all the keys with no errors. Else, it returns false to indicate that value fetch encountered error for one or more keys.
+@param err User provided mutable uint64 typed variable in which the result of this bulk get KV pairs operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
- <any T1, any T2> public stateful boolean dpsGetKVPairs(uint64 store, mutable list<T1> keys, mutable list<T2> values, int32 keyStartPosition, int32 numberOfPairsNeeded, mutable list<uint64> errors)
+ <any T1, any T2> public stateful void dpsGetKVPairs(uint64 store, mutable list<T1> keys, mutable list<T2> values, int32 keyStartPosition, int32 numberOfPairsNeeded, mutable uint64 err)
+
+
+
+ This function can be called to put i.e. write/save multiple Key/Value (KV) pairs to a given store. Note: You must make sure that the intended store exists before you use this function. If not, you will end up creating bulk K/V pairs in an unmanageable fashion in the backend data store which will make it less useful.
+@param store The handle of the store.
+@param keys User provided list variable that contains the keys to be written i.e. saved. This list must be made of a given store's key data type.
+@param values User provided list variable that contains the values to be written i.e. saved. This list must be made of a given store's value data type. A KV pair is formed with a key and a value taken from the same index position of the keys and values lists. If the keys and values lists are not of the same size, this function will simply return back without doing any bulk put operation.
+@param err User provided mutable uint64 typed variable in which the result of this bulk put operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
+
+ <any T1, any T2> public stateful void dpsPutKVPairs(uint64 store, list<T1> keys, list<T2> values, mutable uint64 err)
+
+
+
+ This function can be called to check for the existence of a given list of keys in a given store.
+@param store The handle of the store.
+@param keys User provided list variable that contains the keys to be checked for their existence in the given store. This list must be made of a given store's key data type.
+@param results User provided mutable list variable in which the key existence check true or false results will be returned. This list must be made of a boolean data type.
+@param err User provided mutable uint64 typed variable in which the result of this bulk has keys operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
+
+ <any T1> public stateful void dpsHasKeys(uint64 store, list<T1> keys, mutable list<boolean> results, mutable uint64 err)
+
+
+
+ This function can be called to remove a given list of keys from a given store.
+@param store The handle of the store.
+@param keys User provided list variable that contains the keys to be removed from the given store. This list must be made of a given store's key data type.
+@param totalKeysRemoved User provided mutable int32 variable in which the total number of keys removed from the store will be returned.
+@param err User provided mutable uint64 typed variable in which the result of this bulk remove keys operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
+
+ <any T1> public stateful void dpsRemoveKeys(uint64 store, list<T1> keys, mutable int32 totalKeysRemoved, mutable uint64 err)
diff --git a/com.ibm.streamsx.dps/impl/include/CassandraDBLayer.h b/com.ibm.streamsx.dps/impl/include/CassandraDBLayer.h
index 6d12a97..c9cdf7c 100644
--- a/com.ibm.streamsx.dps/impl/include/CassandraDBLayer.h
+++ b/com.ibm.streamsx.dps/impl/include/CassandraDBLayer.h
@@ -159,7 +159,10 @@ namespace distributed
bool removeLock(uint64_t lock, PersistenceError & lkError);
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
void getKeys(uint64_t store, std::vector & keysBuffer, std::vector & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);
- void getValue(std::string const & storeIdString, char const * & key, uint32_t const & keySize, unsigned char * & value, uint32_t & valueSize, uint64_t & error);
+ void getValues(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector & valueData, std::vector & valueSize, PersistenceError & dbError);
+ void putKVPairs(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector const & valueData, std::vector const & valueSize, PersistenceError & dbError);
+ void hasKeys(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector & results, PersistenceError & dbError);
+ void removeKeys(uint64_t store, std::vector const & keyData, std::vector const & keySize, int32_t & totalKeysRemoved, PersistenceError & dbError);
};
} } } } }
diff --git a/com.ibm.streamsx.dps/impl/include/CloudantDBLayer.h b/com.ibm.streamsx.dps/impl/include/CloudantDBLayer.h
index e66b1d0..609e835 100644
--- a/com.ibm.streamsx.dps/impl/include/CloudantDBLayer.h
+++ b/com.ibm.streamsx.dps/impl/include/CloudantDBLayer.h
@@ -197,8 +197,11 @@ namespace distributed
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
bool removeLock(uint64_t lock, PersistenceError & lkError);
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
- void getKeys(uint64_t store, std::vector & keysBuffer, std::vector & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);
- void getValue(std::string const & storeIdString, char const * & key, uint32_t const & keySize, unsigned char * & value, uint32_t & valueSize, uint64_t & error);
+ void getKeys(uint64_t store, std::vector & keysBuffer, std::vector & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);
+ void getValues(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector & valueData, std::vector & valueSize, PersistenceError & dbError);
+ void putKVPairs(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector const & valueData, std::vector const & valueSize, PersistenceError & dbError);
+ void hasKeys(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector & results, PersistenceError & dbError);
+ void removeKeys(uint64_t store, std::vector const & keyData, std::vector const & keySize, int32_t & totalKeysRemoved, PersistenceError & dbError);
};
} } } } }
diff --git a/com.ibm.streamsx.dps/impl/include/CouchbaseDBLayer.h b/com.ibm.streamsx.dps/impl/include/CouchbaseDBLayer.h
index 24e22e5..d5bc2fe 100644
--- a/com.ibm.streamsx.dps/impl/include/CouchbaseDBLayer.h
+++ b/com.ibm.streamsx.dps/impl/include/CouchbaseDBLayer.h
@@ -203,8 +203,11 @@ namespace distributed
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
bool removeLock(uint64_t lock, PersistenceError & lkError);
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
- void getKeys(uint64_t store, std::vector & keysBuffer, std::vector & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);
- void getValue(std::string const & storeIdString, char const * & key, uint32_t const & keySize, unsigned char * & value, uint32_t & valueSize, uint64_t & error);
+ void getKeys(uint64_t store, std::vector & keysBuffer, std::vector & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);
+ void getValues(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector & valueData, std::vector & valueSize, PersistenceError & dbError);
+ void putKVPairs(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector const & valueData, std::vector const & valueSize, PersistenceError & dbError);
+ void hasKeys(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector & results, PersistenceError & dbError);
+ void removeKeys(uint64_t store, std::vector const & keyData, std::vector const & keySize, int32_t & totalKeysRemoved, PersistenceError & dbError);
};
} } } } }
diff --git a/com.ibm.streamsx.dps/impl/include/DBLayer.h b/com.ibm.streamsx.dps/impl/include/DBLayer.h
index ff51126..f5442ed 100644
--- a/com.ibm.streamsx.dps/impl/include/DBLayer.h
+++ b/com.ibm.streamsx.dps/impl/include/DBLayer.h
@@ -289,9 +289,21 @@ namespace store {
/// @return a list containing multiple keys of a given data type.
virtual void getKeys(uint64_t store, std::vector & keysBuffer, std::vector & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError) = 0;
- /// Get value for a given key present in a given store.
- /// @return a value for a given key of a given data type.
- virtual void getValue(std::string const & storeIdString, char const * & key, uint32_t const & keySize, unsigned char * & value, uint32_t & valueSize, uint64_t & error) = 0;
+ /// Get values for a given list of keys present in a given store.
+ /// @return a list containing multiple values for a given list of keys of a given data type.
+ virtual void getValues(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector & valueData, std::vector & valueSize, PersistenceError & dbError) = 0;
+
+ /// Put (save) K/V pairs in a given store.
+ /// @return 0 if all put operations succeeded. Else, non-zero if one or more or all put failed.
+ virtual void putKVPairs(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector const & valueData, std::vector const & valueSize, PersistenceError & dbError) = 0;
+
+ /// Check for the existence of a given list of keys in a given store.
+ /// @return a list containing the key existence check true or false results.
+ virtual void hasKeys(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector & results, PersistenceError & dbError) = 0;
+
+ /// Remove a given list of keys from a given store.
+ /// @return an integer value indicating the total number of keys removed.
+ virtual void removeKeys(uint64_t store, std::vector const & keyData, std::vector const & keySize, int32_t & totalKeysRemoved, PersistenceError & dbError) = 0;
/// A store iterator
class Iterator
diff --git a/com.ibm.streamsx.dps/impl/include/DistributedProcessStore.h b/com.ibm.streamsx.dps/impl/include/DistributedProcessStore.h
index a59f523..250aab5 100644
--- a/com.ibm.streamsx.dps/impl/include/DistributedProcessStore.h
+++ b/com.ibm.streamsx.dps/impl/include/DistributedProcessStore.h
@@ -10,6 +10,7 @@
#include "PersistenceError.h"
#include "DBLayer.h"
+#include "DpsConstants.h"
#include
#include
@@ -221,35 +222,62 @@ namespace distributed
/// Get multiple keys present in a given store.
/// @param store The handle of the store.
/// @param keys User provided mutable list variable. This list must be suitable for storing multiple keys found in a given store and it must be made of a given store's key data type.
- /// @param keyStartPosition User can indicate a start position from where keys should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
+ /// @param keyStartPosition User can indicate a zero index based start position from where keys should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
/// @param numberOfKeysNeeded User can indicate the total number of keys to be returned as available from the given key start position. It must be greater than or equal to 0 and less than or equal to 50000. If it is set to 0, then all the available keys upto a maximum of 50000 keys from the given key start position will be returned.
/// @param keyExpression User can provide an expression made of the attributes from the key's data type. This expression will be evaluated in determining which matching keys to be returned. [This feature is not implemented at this time.]
/// @param valueExpression User can provide an expression made of the attributes from the value's data type. This expression will be evaluated in determining which matching keys to be returned. [This feature is not implemented at this time.]
- /// @param err Contains the error code. Will be '0' if no error occurs, and a non-zero value otherwise.
+ /// @param err User provided mutable uint64 typed variable in which the result of this bulk get keys operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
///
template
void getKeys(SPL::uint64 store, SPL::list & keys, SPL::int32 const & keyStartPosition, SPL::int32 const & numberOfKeysNeeded, SPL::rstring const & keyExpression, SPL::rstring const & valueExpression, SPL::uint64 & err);
- /// This function can be called to get values for a given list of multiple keys present in a given store.
+ /// This function can be called to get i.e. read/fetch values for a given list of multiple keys present in a given store.
/// @param store The handle of the store.
- /// @param keys User provided list variable that contains keys for which values need to be fetched.
- /// @param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the key appeared in the user provided keys list. Before using the individual values from this list, it is recommended to make sure that a given value fetch worked with no errors.
- /// @param errors User provided mutable list variable in which the individual success or failure value fetch result codes will be returned. This list must be of type uint64. Each list element will be 0 if no error occurs and a non-zero error code otherwise. Such value fetch result codes will be available in this list at the same index where the key appears in the user provided keys list. If a given result code doesn't indicate a successful value fetch, it is better to skip the corresponding element at the same index in the mutable values list.
+ /// @param keys User provided list variable that contains keys for which values need to be fetched. It must be made of a given store's key data type.
+ /// @param keyExistsOrNot User provided mutable list variable in which true or false status for every user given key will be returned. This is done to indicate to the user about if that key exists or not in the given store. This list must be made of a boolean data type. Status returned in this list can be used in combination with the items returned in the values list which is explained next.
+ /// @param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the key appeared in the user provided keys list. If a user given key is not present in the store, there will still be a default value returned in that key's index. It is better to first confirm in the keyExistsOrNot list that will carry true or false status about the existence of all the user provided keys. Depending on the key existence status found in that other list, user can decide to either consider using or ignore a value in a given index of the values list.
+ /// @param err User provided mutable uint64 typed variable in which the result of this bulk get values operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
///
template
- bool getValues(SPL::uint64 store, SPL::list const & keys, SPL::list & values, SPL::list & errors);
+ void getValues(SPL::uint64 store, SPL::list const & keys, SPL::list & keyExistsOrNot, SPL::list & values, SPL::uint64 & err);
/// This function can be called to get multiple Key/Value (KV) pairs present in a given store.
/// @param store The handle of the store.
/// @param keys User provided mutable list variable in which the keys found in the store will be returned. This list must be suitable for storing multiple keys found in a given store and it must be made of a given store's key data type.
- /// @param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the corresponding key appears in the keys list. Before using the individual values from this list, it is recommended to make sure that a given value fetch worked with no errors.
- /// @param keyStartPosition User can indicate a start position from where the K/V pairs should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
+ /// @param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the corresponding key appears in the keys list.
+ /// @param keyStartPosition User can indicate a zero index based start position from where the K/V pairs should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
/// @param numberOfPairsNeeded User can indicate the total number of K/V pairs to be returned as available from the given key start position. It must be greater than or equal to 0 and less than or equal to 50000. If it is set to 0, then all the available K/V pairs upto a maximum of 50000 pairs from the given key start position will be returned.
- /// @param errors User provided mutable list variable in which the individual success or failure value fetch result codes will be returned. This list must be of type uint64. Each list element will be 0 if no error occurs and a non-zero error code otherwise. Such value fetch result codes will be available in this list at the same index where the key appears in the keys list. If a given result code doesn't indicate a successful value fetch, it is better to skip the corresponding element at the same index in the mutable values list.
- /// @return It returns true if value fetch worked for all the keys with no errors. Else, it returns false to indicate that value fetch encountered error for one or more keys.
+ /// @param err User provided mutable uint64 typed variable in which the result of this bulk get KV pairs operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
///
template
- bool getKVPairs(SPL::uint64 store, SPL::list & keys, SPL::list & values, SPL::int32 const & keyStartPosition, SPL::int32 const & numberOfPairsNeeded, SPL::list & errors);
+ void getKVPairs(SPL::uint64 store, SPL::list & keys, SPL::list & values, SPL::int32 const & keyStartPosition, SPL::int32 const & numberOfPairsNeeded, SPL::uint64 & err);
+
+ /// This function can be called to put i.e. write/save multiple Key/Value (KV) pairs to a given store.
+ /// @param store The handle of the store.
+ /// @param keys User provided list variable that contains the keys to be written i.e. saved. This list must be made of a given store's key data type.
+ /// @param values User provided list variable that contains the values to be written i.e. saved. This list must be made of a given store's value data type. A KV pair is formed with a key and a value taken from the same index position of the keys and values lists. If the keys and values lists are not of the same size, this function will simply return back without doing any bulk put operation.
+ /// @param err User provided mutable uint64 typed variable in which the result of this bulk put operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
+ ///
+ template
+ void putKVPairs(SPL::uint64 store, SPL::list const & keys, SPL::list const & values, SPL::uint64 & err);
+
+ /// This function can be called to check for the existence of a given list of keys in a given store.
+ /// @param store The handle of the store.
+ /// @param keys User provided list variable that contains the keys to be checked for their existence in the given store. This list must be made of a given store's key data type.
+ /// @param results User provided mutable list variable in which the key existence check true or false results will be returned. This list must be made of a boolean data type.
+ /// @param err User provided mutable uint64 typed variable in which the result of this bulk has keys operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
+ ///
+ template
+ void hasKeys(SPL::uint64 store, SPL::list const & keys, SPL::list & results, SPL::uint64 & err);
+
+ /// This function can be called to remove a given list of keys from a given store.
+ /// @param store The handle of the store.
+ /// @param keys User provided list variable that contains the keys to be removed from the given store. This list must be made of a given store's key data type.
+ /// @param totalKeysRemoved User provided mutable int32 variable in which the total number of keys removed from the store will be returned.
+ /// @param err User provided mutable uint64 typed variable in which the result of this bulk remove keys operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
+ ///
+ template
+ void removeKeys(SPL::uint64 store, SPL::list const & keys, SPL::int32 & totalKeysRemoved, SPL::uint64 & err);
/// Serialize the items from the serialized store
/// @param store store handle
@@ -894,151 +922,347 @@ namespace distributed
/// Get multiple keys present in a given store.
/// @param store The handle of the store.
/// @param keys User provided mutable list variable. This list must be suitable for storing multiple keys found in a given store and it must be made of a given store's key data type.
- /// @param keyStartPosition User can indicate a start position from where keys should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
+ /// @param keyStartPosition User can indicate a zero index based start position from where keys should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
/// @param numberOfKeysNeeded User can indicate the total number of keys to be returned as available from the given key start position. It must be greater than or equal to 0 and less than or equal to 50000. If it is set to 0, then all the available keys upto a maximum of 50000 keys from the given key start position will be returned.
/// @param keyExpression User can provide an expression made of the attributes from the key's data type. This expression will be evaluated in determining which matching keys to be returned. [This feature is not implemented at this time.]
/// @param valueExpression User can provide an expression made of the attributes from the value's data type. This expression will be evaluated in determining which matching keys to be returned. [This feature is not implemented at this time.]
- /// @param err Contains the error code. Will be '0' if no error occurs, and a non-zero value otherwise.
+ /// @param err User provided mutable uint64 typed variable in which the result of this bulk get keys operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
///
template
void DistributedProcessStore::getKeys(SPL::uint64 store, SPL::list & keys, SPL::int32 const & keyStartPosition, SPL::int32 const & numberOfKeysNeeded, SPL::rstring const & keyExpression, SPL::rstring const & valueExpression, SPL::uint64 & err) {
- dbError_->reset();
- std::vector keysBuffer;
- std::vector keysSize;
- // Clear the user provided list now.
- keys.clear();
- // Call the underlying store implementation function to get multiple keys in a given store.
- db_->getKeys(store, keysBuffer, keysSize, keyStartPosition, numberOfKeysNeeded, *dbError_);
- err = dbError_->getErrorCode();
-
- if(err != 0) {
- // We got an error. Return now without populating anything in the
- // user provided list (vector). We will free any memory allocated
- // for the partial set of keys returned if any.
- for (unsigned int i = 0; i < keysBuffer.size(); i++) {
- if(keysSize.at(i) > 0) {
- // We must free the buffer allocated by the underlying implementation that we called above.
- free(keysBuffer.at(i));
- }
- } // End of for loop.
+ dbError_->reset();
+ std::vector keysBuffer;
+ std::vector keysSize;
+ // Clear the user provided list now.
+ keys.clear();
+ // Call the underlying store implementation function to get multiple keys in a given store.
+ db_->getKeys(store, keysBuffer, keysSize, keyStartPosition, numberOfKeysNeeded, *dbError_);
+ err = dbError_->getErrorCode();
+
+ if(err != 0) {
+ // We got an error. Return now without populating anything in the
+ // user provided list (vector). We will free any memory allocated
+ // for the partial set of keys returned if any.
+ for (unsigned int i = 0; i < keysBuffer.size(); i++) {
+ if(keysSize.at(i) > 0) {
+ // We must free the buffer allocated by the underlying implementation that we called above.
+ free(keysBuffer.at(i));
+ }
+ } // End of for loop.
- return;
- }
-
- // We got multiple keys.
- // Let us convert it to the proper key type and store them in
- // the user provided list (vector).
- //
- // Populate the user provided list (vector) with the store keys.
- for (unsigned int i = 0; i < keysBuffer.size(); i++) {
- SPL::NativeByteBuffer nbf_key(keysBuffer.at(i), keysSize.at(i));
+ return;
+ }
+
+ // We got multiple keys.
+ // Let us convert it to the proper key type and store them in
+ // the user provided list (vector).
+ //
+ // Populate the user provided list (vector) with the store keys.
+ for (unsigned int i = 0; i < keysBuffer.size(); i++) {
+ SPL::NativeByteBuffer nbf_key(keysBuffer.at(i), keysSize.at(i));
T1 tempKey;
// Deserialize the obtained keys from their NBF format to their native SPL type.
- nbf_key >> tempKey;
+ nbf_key >> tempKey;
keys.push_back(tempKey);
if(keysSize.at(i) > 0) {
// We must free the buffer allocated by the underlying implementation that we called above.
free(keysBuffer.at(i));
}
- } // End of for loop.
+ } // End of for loop.
} // End of getKeys method.
- /// This function can be called to get values for a given list of multiple keys present in a given store.
+ /// This function can be called to get i.e. read/fetch values for a given list of multiple keys present in a given store.
/// @param store The handle of the store.
- /// @param keys User provided list variable that contains keys for which values need to be fetched.
- /// @param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the key appeared in the user provided keys list. Before using the individual values from this list, it is recommended to make sure that a given value fetch worked with no errors.
- /// @param errors User provided mutable list variable in which the individual success or failure value fetch result codes will be returned. This list must be of type uint64. Each list element will be 0 if no error occurs and a non-zero error code otherwise. Such value fetch result codes will be available in this list at the same index where the key appears in the user provided keys list. If a given result code doesn't indicate a successful value fetch, it is better to skip the corresponding element at the same index in the mutable values list.
+ /// @param keys User provided list variable that contains keys for which values need to be fetched. It must be made of a given store's key data type.
+ /// @param keyExistsOrNot User provided mutable list variable in which true or false status for every user given key will be returned. This is done to indicate to the user about if that key exists or not in the given store. This list must be made of a boolean data type. Status returned in this list can be used in combination with the items returned in the values list which is explained next.
+ /// @param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the key appeared in the user provided keys list. If a user given key is not present in the store, there will still be a default value returned in that key's index. It is better to first confirm in the keyExistsOrNot list that will carry true or false status about the existence of all the user provided keys. Depending on the key existence status found in that other list, user can decide to either consider using or ignore a value in a given index of the values list.
+ /// @param err User provided mutable uint64 typed variable in which the result of this bulk get values operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
///
template
- bool DistributedProcessStore::getValues(SPL::uint64 store, SPL::list const & keys, SPL::list & values, SPL::list & errors) {
- dbError_->reset();
- bool resultStatus = true;
-
- // Clear the user provided mutable lists now.
- values.clear();
- errors.clear();
-
- // Create a string form of the store id.
- std::ostringstream storeId;
- storeId << store;
- std::string storeIdString = storeId.str();
-
- // Populate the key buffer with the serialized key.
- // i.e. Serialize it from the native SPL type to NBF format.
- // Then, call the method in the underlying DB implementation layer for every key that we have.
- for(int i=0; igetValue(storeIdString, keyData, keySize, valueData, valueSize, error);
-
- // Collect the results in the user provided list.
- T2 tempValue;
-
- if(error == 0) {
- SPL::NativeByteBuffer value_nbf(valueData, valueSize);
- value_nbf >> tempValue;
- } else {
- // At least one error seen while fetching the value for a key.
- resultStatus = false;
- }
-
- values.push_back(tempValue);
- errors.push_back(error);
+ void DistributedProcessStore::getValues(SPL::uint64 store, SPL::list const & keys, SPL::list & keyExistsOrNot, SPL::list & values, SPL::uint64 & err) {
+ dbError_->reset();
+
+ // Clear the user provided mutable lists now.
+ keyExistsOrNot.clear();
+ values.clear();
+
+ if(keys.size() == 0) {
+ // Caller sent us no keys.
+ // Set an error code so that the caller knows that there is no real get values done by us here.
+ dbError_->set(std::string("Inside the top level getValues #1, this method was called with an empty keys list."), DPS_GET_VALUES_EMPTY_KEYS_LIST_ERROR);
+ err = dbError_->getErrorCode();
+ return;
+ }
+
+ // We will create the NBF based keys and values, store them in different lists and
+ // send it to the underlying DB layer for it to do bulk writes of the K/V pairs.
+ std::vector keyData;
+ std::vector keySize;
+ std::vector valueData;
+ std::vector valueSize;
+
+ std::vector key_nbf;
+
+ // We have to have NBF objects in scope at the time of calling
+ // the underlying DB layer functions. We will keep them in a vector.
+ for(int i=0; igetValues(store, keyData, keySize, valueData, valueSize, *dbError_);
+ err = dbError_->getErrorCode();
+
+ if(err != 0) {
+ // Error occurred.
+ // We will release the allocated memory buffers if any by the underlying DB layer before we return from here.
+ for(int i=0; i 0) {
+ free(valueData.at(i));
+ }
+ }
- // We must free the memory allocated in the DB layer.
- if(valueSize > 0) {
- free(valueData);
- }
- } // End of for loop.
+ return;
+ }
+
+ // We have to now turn the value data read from the backend store into proper
+ // SPL type so that it is consumable by the caller.
+ for(int i=0; i> tempValue;
+ // We must free the memory allocated in the DB layer.
+ free(valueData.at(i));
+
+ // Indicate about the existence of the key in the user provided list.
+ keyExistsOrNot.push_back(true);
+ } else {
+ // Indicate about the non-existence of the key in the user provided list.
+ keyExistsOrNot.push_back(false);
+ }
- return(resultStatus);
+ // To the user provided values list, add either a fetched good value result for a key that exists in the
+ // store or a default value for a key that doesn't exist in the store.
+ values.push_back(tempValue);
+ } // End of for loop.
} // End of getValues method.
/// This function can be called to get multiple Key/Value (KV) pairs present in a given store.
/// @param store The handle of the store.
/// @param keys User provided mutable list variable in which the keys found in the store will be returned. This list must be suitable for storing multiple keys found in a given store and it must be made of a given store's key data type.
- /// @param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the corresponding key appears in the keys list. Before using the individual values from this list, it is recommended to make sure that a given value fetch worked with no errors.
- /// @param keyStartPosition User can indicate a start position from where the K/V pairs should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
+ /// @param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the corresponding key appears in the keys list.
+ /// @param keyStartPosition User can indicate a zero index based start position from where the K/V pairs should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
/// @param numberOfPairsNeeded User can indicate the total number of K/V pairs to be returned as available from the given key start position. It must be greater than or equal to 0 and less than or equal to 50000. If it is set to 0, then all the available K/V pairs upto a maximum of 50000 pairs from the given key start position will be returned.
- /// @param errors User provided mutable list variable in which the individual success or failure value fetch result codes will be returned. This list must be of type uint64. Each list element will be 0 if no error occurs and a non-zero error code otherwise. Such value fetch result codes will be available in this list at the same index where the key appears in the keys list. If a given result code doesn't indicate a successful value fetch, it is better to skip the corresponding element at the same index in the mutable values list.
- /// @return It returns true if value fetch worked for all the keys with no errors. Else, it returns false to indicate that value fetch encountered error for one or more keys.
+ /// @param err User provided mutable uint64 typed variable in which the result of this bulk get KV pairs operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
///
template
- bool DistributedProcessStore::getKVPairs(SPL::uint64 store, SPL::list & keys, SPL::list & values, SPL::int32 const & keyStartPosition, SPL::int32 const & numberOfPairsNeeded, SPL::list & errors) {
- // The logic in this method is going to be a combination of the two previous methods shown above.
- // We have to first get the keys and then the values for those keys.
- dbError_->reset();
- bool resultStatus = true;
- uint64_t error = 0;
- std::string keyExpression = "";
- std::string valueExpression = "";
-
- // Clear the user provided mutable lists now.
- keys.clear();
- values.clear();
- errors.clear();
-
- // Let us first get the keys available in the user specified range.
- getKeys(store, keys, keyStartPosition, numberOfPairsNeeded, keyExpression, valueExpression, error);
-
- // If we encountered an error in getting the keys, we can return back from here.
- if(error != 0) {
- resultStatus = false;
- return(resultStatus);
- }
+ void DistributedProcessStore::getKVPairs(SPL::uint64 store, SPL::list & keys, SPL::list & values, SPL::int32 const & keyStartPosition, SPL::int32 const & numberOfPairsNeeded, SPL::uint64 & err) {
+ // The logic in this method is going to be a combination of the two previous methods shown above.
+ // We have to first get the keys and then the values for those keys.
+ dbError_->reset();
+ std::string keyExpression = "";
+ std::string valueExpression = "";
+
+ // Clear the user provided mutable lists now.
+ keys.clear();
+ values.clear();
+
+ // Let us first get the keys available in the user specified range.
+ getKeys(store, keys, keyStartPosition, numberOfPairsNeeded, keyExpression, valueExpression, err);
+
+ // If we encountered an error in getting the keys, we can return back from here.
+ if(err != 0) {
+ return;
+ }
+
+ SPL::list keyExistsOrNot;
- // We got the keys. Let us now get the values stored for those keys.
- resultStatus = getValues(store, keys, values, errors);
- return(resultStatus);
+ // We got the keys. Let us now get the values stored for those keys.
+ getValues(store, keys, keyExistsOrNot, values, err);
} // End of getKVPairs method.
+ /// This function can be called to put i.e. write/save multiple Key/Value (KV) pairs to a given store.
+ /// @param store The handle of the store.
+ /// @param keys User provided list variable that contains the keys to be written i.e. saved. This list must be made of a given store's key data type.
+ /// @param values User provided list variable that contains the values to be written i.e. saved. This list must be made of a given store's value data type. A KV pair is formed with a key and a value taken from the same index position of the keys and values lists. If the keys and values lists are not of the same size, this function will simply return back without doing any bulk put operation.
+ /// @param err User provided mutable uint64 typed variable in which the result of this bulk put operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
+ ///
+ template
+ void DistributedProcessStore::putKVPairs(SPL::uint64 store, SPL::list const & keys, SPL::list const & values, SPL::uint64 & err) {
+ dbError_->reset();
+
+ // Check if the two user provided keys and values lists are of the same size.
+ if(keys.size() != values.size()) {
+ // They are not of the same size. We will ignore the caller's intent and
+ // return back from here without doing any useful work.
+ // Set an error code so that the caller knows that there is no real put KV pairs done by us here.
+ dbError_->set(std::string("Inside the top level putKVPairs #1, this method was called with keys and values lists that are made of different sizes."), DPS_PUT_KV_PAIRS_KEYS_VALUES_LISTS_NOT_OF_SAME_SIZE_ERROR);
+ err = dbError_->getErrorCode();
+ return;
+ }
+
+ // We will create the NBF based keys and values, store them in different lists and
+ // send it to the underlying DB layer for it to do bulk writes of the K/V pairs.
+ std::vector keyData;
+ std::vector keySize;
+ std::vector valueData;
+ std::vector valueSize;
+
+ std::vector key_nbf;
+ std::vector value_nbf;
+
+ // We have to have NBF objects in scope at the time of calling
+ // the underlying DB layer functions. We will keep them in a vector.
+ for(int i=0; iputKVPairs(store, keyData, keySize, valueData, valueSize, *dbError_);
+ err = dbError_->getErrorCode();
+ } // End of putKVPairs method.
+
+ /// This function can be called to check for the existence of a given list of keys in a given store.
+ /// @param store The handle of the store.
+ /// @param keys User provided list variable that contains the keys to be checked for their existence in the given store. This list must be made of a given store's key data type.
+ /// @param results User provided mutable list variable in which the key existence check true or false results will be returned. This list must be made of a boolean data type.
+ /// @param err User provided mutable uint64 typed variable in which the result of this bulk has keys operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
+ ///
+ template
+ void DistributedProcessStore::hasKeys(SPL::uint64 store, SPL::list const & keys, SPL::list & results, SPL::uint64 & err) {
+ dbError_->reset();
+
+ // Clear the user provided mutable list now.
+ results.clear();
+
+ if(keys.size() == 0) {
+ // Caller sent us no keys.
+ // Set an error code so that the caller knows that there are no existence results sent from us here.
+ dbError_->set(std::string("Inside the top level hasKeys #1, this method was called with an empty keys list."), DPS_HAS_KEYS_EMPTY_KEYS_LIST_ERROR);
+ err = dbError_->getErrorCode();
+ return;
+ }
+
+ // We will create the NBF based keys, store them in a different list and
+ // send it to the underlying DB layer for it to do bulk existence check of those keys.
+ std::vector keyData;
+ std::vector keySize;
+
+ std::vector key_nbf;
+
+ // We have to have NBF objects in scope at the time of calling
+ // the underlying DB layer functions. We will keep them in a vector.
+ for(int i=0; i to the
+ // underlying DB implementation that expects std::vector.
+ // So, I'm having this local variable to collect the results and then
+ // transfer it to the user provided list right after the call returns back.
+ // There must be an easier way to avoid this use of two different array
+ // types to do one task. It is a waste of extra logic and memory copy.
+ // At a later time, try to simplify it.
+ std::vector myResults;
+
+ // Call the underlying DB implementation layer only once to do bulk existence check for a given list of keys.
+ db_->hasKeys(store, keyData, keySize, myResults, *dbError_);
+ err = dbError_->getErrorCode();
+
+ // Populate the user provided SPL::list
+ for(int i=0; i
+ void DistributedProcessStore::removeKeys(SPL::uint64 store, SPL::list const & keys, SPL::int32 & totalKeysRemoved, SPL::uint64 & err) {
+ dbError_->reset();
+
+ // Reset the result variable.
+ totalKeysRemoved = 0;
+
+ if(keys.size() == 0) {
+ // Caller sent us no keys.
+ // Set an error code so that the caller knows that there are no key removals done here.
+ dbError_->set(std::string("Inside the top level removeKeys #1, this method was called with an empty keys list."), DPS_REMOVE_KEYS_EMPTY_KEYS_LIST_ERROR);
+ err = dbError_->getErrorCode();
+ return;
+ }
+
+ // We will create the NBF based keys, store them in a different list and
+ // send it to the underlying DB layer for it to do bulk existence check of those keys.
+ std::vector keyData;
+ std::vector keySize;
+
+ std::vector key_nbf;
+
+ // We have to have NBF objects in scope at the time of calling
+ // the underlying DB layer functions. We will keep them in a vector.
+ for(int i=0; iremoveKeys(store, keyData, keySize, totalKeysRemoved, *dbError_);
+ err = dbError_->getErrorCode();
+ } // End of removeKeys method.
+
template
void DistributedProcessStore::serialize(SPL::uint64 store, SPL::blob & data, SPL::uint64 & err)
{
diff --git a/com.ibm.streamsx.dps/impl/include/DistributedProcessStoreWrappers.h b/com.ibm.streamsx.dps/impl/include/DistributedProcessStoreWrappers.h
index 49a7030..dfb8684 100644
--- a/com.ibm.streamsx.dps/impl/include/DistributedProcessStoreWrappers.h
+++ b/com.ibm.streamsx.dps/impl/include/DistributedProcessStoreWrappers.h
@@ -303,11 +303,11 @@ namespace distributed
/// Get multiple keys present in a given store.
/// @param store The handle of the store.
/// @param keys User provided mutable list variable. This list must be suitable for storing multiple keys found in a given store and it must be made of a given store's key data type.
- /// @param keyStartPosition User can indicate a start position from where keys should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
+ /// @param keyStartPosition User can indicate a zero index based start position from where keys should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
/// @param numberOfKeysNeeded User can indicate the total number of keys to be returned as available from the given key start position. It must be greater than or equal to 0 and less than or equal to 50000. If it is set to 0, then all the available keys upto a maximum of 50000 keys from the given key start position will be returned.
/// @param keyExpression User can provide an expression made of the attributes from the key's data type. This expression will be evaluated in determining which matching keys to be returned. [This feature is not implemented at this time.]
/// @param valueExpression User can provide an expression made of the attributes from the value's data type. This expression will be evaluated in determining which matching keys to be returned. [This feature is not implemented at this time.]
- /// @param err Contains the error code. Will be '0' if no error occurs, and a non-zero value otherwise.
+ /// @param err User provided mutable uint64 typed variable in which the result of this bulk get keys operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
///
template
void dpsGetKeys(SPL::uint64 store, SPL::list & keys, SPL::int32 const & keyStartPosition, SPL::int32 const & numberOfKeysNeeded, SPL::rstring const & keyExpression, SPL::rstring const & valueExpression, SPL::uint64 & err)
@@ -315,33 +315,69 @@ namespace distributed
return DistributedProcessStore::getGlobalStore().getKeys(store, keys, keyStartPosition, numberOfKeysNeeded, keyExpression, valueExpression, err);
}
- /// This function can be called to get values for a given list of multiple keys present in a given store.
+ /// This function can be called to get i.e. read/fetch values for a given list of multiple keys present in a given store.
/// @param store The handle of the store.
- /// @param keys User provided list variable that contains keys for which values need to be fetched.
- /// @param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the key appeared in the user provided keys list. Before using the individual values from this list, it is recommended to make sure that a given value fetch worked with no errors.
- /// @param errors User provided mutable list variable in which the individual success or failure value fetch result codes will be returned. This list must be of type uint64. Each list element will be 0 if no error occurs and a non-zero error code otherwise. Such value fetch result codes will be available in this list at the same index where the key appears in the user provided keys list. If a given result code doesn't indicate a successful value fetch, it is better to skip the corresponding element at the same index in the mutable values list.
+ /// @param keys User provided list variable that contains keys for which values need to be fetched. It must be made of a given store's key data type.
+ /// @param keyExistsOrNot User provided mutable list variable in which true or false status for every user given key will be returned. This is done to indicate to the user about if that key exists or not in the given store. This list must be made of a boolean data type. Status returned in this list can be used in combination with the items returned in the values list which is explained next.
+ /// @param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the key appeared in the user provided keys list. If a user given key is not present in the store, there will still be a default value returned in that key's index. It is better to first confirm in the keyExistsOrNot list that will carry true or false status about the existence of all the user provided keys. Depending on the key existence status found in that other list, user can decide to either consider using or ignore a value in a given index of the values list.
+ /// @param err User provided mutable uint64 typed variable in which the result of this bulk get values operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
///
template
- bool dpsGetValues(SPL::uint64 store, SPL::list const & keys, SPL::list & values, SPL::list & errors)
+ void dpsGetValues(SPL::uint64 store, SPL::list const & keys, SPL::list & keyExistsOrNot, SPL::list & values, SPL::uint64 & err)
{
- return DistributedProcessStore::getGlobalStore().getValues(store, keys, values, errors);
+ return DistributedProcessStore::getGlobalStore().getValues(store, keys, keyExistsOrNot, values, err);
}
/// This function can be called to get multiple Key/Value (KV) pairs present in a given store.
/// @param store The handle of the store.
/// @param keys User provided mutable list variable in which the keys found in the store will be returned. This list must be suitable for storing multiple keys found in a given store and it must be made of a given store's key data type.
- /// @param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the corresponding key appears in the keys list. Before using the individual values from this list, it is recommended to make sure that a given value fetch worked with no errors.
- /// @param keyStartPosition User can indicate a start position from where the K/V pairs should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
+ /// @param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the corresponding key appears in the keys list.
+ /// @param keyStartPosition User can indicate a zero index based start position from where the K/V pairs should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
/// @param numberOfPairsNeeded User can indicate the total number of K/V pairs to be returned as available from the given key start position. It must be greater than or equal to 0 and less than or equal to 50000. If it is set to 0, then all the available K/V pairs upto a maximum of 50000 pairs from the given key start position will be returned.
- /// @param errors User provided mutable list variable in which the individual success or failure value fetch result codes will be returned. This list must be of type uint64. Each list element will be 0 if no error occurs and a non-zero error code otherwise. Such value fetch result codes will be available in this list at the same index where the key appears in the keys list. If a given result code doesn't indicate a successful value fetch, it is better to skip the corresponding element at the same index in the mutable values list.
- /// @return It returns true if value fetch worked for all the keys with no errors. Else, it returns false to indicate that value fetch encountered error for one or more keys.
+ /// @param err User provided mutable uint64 typed variable in which the result of this bulk get KV pairs operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
///
template
- bool dpsGetKVPairs(SPL::uint64 store, SPL::list & keys, SPL::list & values, SPL::int32 const & keyStartPosition, SPL::int32 const & numberOfPairsNeeded, SPL::list & errors)
+ void dpsGetKVPairs(SPL::uint64 store, SPL::list & keys, SPL::list & values, SPL::int32 const & keyStartPosition, SPL::int32 const & numberOfPairsNeeded, SPL::uint64 & err)
{
- return DistributedProcessStore::getGlobalStore().getKVPairs(store, keys, values, keyStartPosition, numberOfPairsNeeded, errors);
+ return DistributedProcessStore::getGlobalStore().getKVPairs(store, keys, values, keyStartPosition, numberOfPairsNeeded, err);
}
+ /// This function can be called to put i.e. write/save multiple Key/Value (KV) pairs to a given store.
+ /// @param store The handle of the store.
+ /// @param keys User provided list variable that contains the keys to be written i.e. saved. This list must be made of a given store's key data type.
+ /// @param values User provided list variable that contains the values to be written i.e. saved. This list must be made of a given store's value data type. A KV pair is formed with a key and a value taken from the same index position of the keys and values lists. If the keys and values lists are not of the same size, this function will simply return back without doing any bulk put operation.
+ /// @param err User provided mutable uint64 typed variable in which the result of this bulk put operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
+ ///
+ template
+ void dpsPutKVPairs(SPL::uint64 store, SPL::list const & keys, SPL::list const & values, SPL::uint64 & err)
+ {
+ return DistributedProcessStore::getGlobalStore().putKVPairs(store, keys, values, err);
+ }
+
+ /// This function can be called to check for the existence of a given list of keys in a given store.
+ /// @param store The handle of the store.
+ /// @param keys User provided list variable that contains the keys to be checked for their existence in the given store. This list must be made of a given store's key data type.
+ /// @param results User provided mutable list variable in which the key existence check true or false results will be returned. This list must be made of a boolean data type.
+ /// @param err User provided mutable uint64 typed variable in which the result of this bulk has keys operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
+ ///
+ template
+ void dpsHasKeys(SPL::uint64 store, SPL::list const & keys, SPL::list & results, SPL::uint64 & err)
+ {
+ return DistributedProcessStore::getGlobalStore().hasKeys(store, keys, results, err);
+ }
+
+ /// This function can be called to remove a given list of keys from a given store.
+ /// @param store The handle of the store.
+ /// @param keys User provided list variable that contains the keys to be removed from the given store. This list must be made of a given store's key data type.
+ /// @param totalKeysRemoved User provided mutable int32 variable in which the total number of keys removed from the store will be returned.
+ /// @param err User provided mutable uint64 typed variable in which the result of this bulk remove keys operation will be returned. It will be 0 if no error occurs and a non-zero error code otherwise.
+ ///
+ template
+ void dpsRemoveKeys(SPL::uint64 store, SPL::list const & keys, SPL::int32 & totalKeysRemoved, SPL::uint64 & err)
+ {
+ return DistributedProcessStore::getGlobalStore().removeKeys(store, keys, totalKeysRemoved, err);
+ }
+
/// Serialize the items from the serialized store
/// @param store store handle
/// @param data blob to serialize into
diff --git a/com.ibm.streamsx.dps/impl/include/DpsConstants.h b/com.ibm.streamsx.dps/impl/include/DpsConstants.h
index 50209bb..16f4ad8 100644
--- a/com.ibm.streamsx.dps/impl/include/DpsConstants.h
+++ b/com.ibm.streamsx.dps/impl/include/DpsConstants.h
@@ -81,6 +81,9 @@ interface with many different back-end in-memory stores.
#define REDIS_ZADD_CMD "zadd "
#define REDIS_ZREM_CMD "zrem "
#define REDIS_ZRANGE_CMD "zrange "
+#define REDIS_HMSET_CMD "hmset "
+#define REDIS_HMGET_CMD "hmget "
+#define REDIS_ZMSCORE_CMD "zmscore "
#define CASSANDRA_DPS_KEYSPACE "com_ibm_streamsx_dps"
#define CASSANDRA_DPS_MAIN_TABLE "t1"
#define HBASE_DPS_MAIN_TABLE "dps_t1"
@@ -221,6 +224,24 @@ interface with many different back-end in-memory stores.
#define DPS_REDIS_REPLY_NULL_ERROR 161
#define DPS_REDIS_REPLY_NIL_ERROR 162
#define DPS_EMPTY_DATA_ITEM_VALUE_FOUND_ERROR 163
+#define DPS_BULK_PUT_HSET_ERROR 164
+#define DPS_BULK_PUT_ZADD_ERROR 165
+#define DPS_BULK_GET_HMGET_ERROR 166
+#define DPS_BULK_GET_HMGET_NO_REPLY_ARRAY_ERROR 167
+#define DPS_BULK_GET_HMGET_MALLOC_ERROR 168
+#define DPS_BULK_GET_HMGET_EMPTY_VALUE_ERROR 169
+#define DPS_BULK_GET_ZMSCORE_ERROR 170
+#define DPS_BULK_GET_ZMSCORE_NO_REPLY_ARRAY_ERROR 171
+#define DPS_BULK_GET_ZMSCORE_EMPTY_VALUE_ERROR 172
+#define DPS_BULK_REMOVE_HDEL_ERROR 173
+#define DPS_BULK_REMOVE_ZREM_ERROR 174
+#define DPS_BULK_REMOVE_CNT_MISMATCH_ERROR 175
+#define DPS_BULK_REMOVE_HDEL_NO_INTEGER_REPLY_ERROR 176
+#define DPS_BULK_REMOVE_ZREM_NO_INTEGER_REPLY_ERROR 177
+#define DPS_GET_VALUES_EMPTY_KEYS_LIST_ERROR 178
+#define DPS_PUT_KV_PAIRS_KEYS_VALUES_LISTS_NOT_OF_SAME_SIZE_ERROR 179
+#define DPS_HAS_KEYS_EMPTY_KEYS_LIST_ERROR 180
+#define DPS_REMOVE_KEYS_EMPTY_KEYS_LIST_ERROR 181
#define DL_CONNECTION_ERROR 501
#define DL_GET_LOCK_ID_ERROR 502
diff --git a/com.ibm.streamsx.dps/impl/include/HBaseDBLayer.h b/com.ibm.streamsx.dps/impl/include/HBaseDBLayer.h
index 4b0e966..0564df9 100644
--- a/com.ibm.streamsx.dps/impl/include/HBaseDBLayer.h
+++ b/com.ibm.streamsx.dps/impl/include/HBaseDBLayer.h
@@ -212,8 +212,11 @@ namespace distributed
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
bool removeLock(uint64_t lock, PersistenceError & lkError);
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
- void getKeys(uint64_t store, std::vector & keysBuffer, std::vector & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);
- void getValue(std::string const & storeIdString, char const * & key, uint32_t const & keySize, unsigned char * & value, uint32_t & valueSize, uint64_t & error);
+ void getKeys(uint64_t store, std::vector & keysBuffer, std::vector & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);
+ void getValues(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector & valueData, std::vector & valueSize, PersistenceError & dbError);
+ void putKVPairs(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector const & valueData, std::vector const & valueSize, PersistenceError & dbError);
+ void hasKeys(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector & results, PersistenceError & dbError);
+ void removeKeys(uint64_t store, std::vector const & keyData, std::vector const & keySize, int32_t & totalKeysRemoved, PersistenceError & dbError);
};
} } } } }
diff --git a/com.ibm.streamsx.dps/impl/include/MemcachedDBLayer.h b/com.ibm.streamsx.dps/impl/include/MemcachedDBLayer.h
index 25daf29..0543ff0 100644
--- a/com.ibm.streamsx.dps/impl/include/MemcachedDBLayer.h
+++ b/com.ibm.streamsx.dps/impl/include/MemcachedDBLayer.h
@@ -178,8 +178,11 @@ namespace distributed
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
bool removeLock(uint64_t lock, PersistenceError & lkError);
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
- void getKeys(uint64_t store, std::vector & keysBuffer, std::vector & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);
- void getValue(std::string const & storeIdString, char const * & key, uint32_t const & keySize, unsigned char * & value, uint32_t & valueSize, uint64_t & error);
+ void getKeys(uint64_t store, std::vector & keysBuffer, std::vector & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);
+ void getValues(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector & valueData, std::vector & valueSize, PersistenceError & dbError);
+ void putKVPairs(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector const & valueData, std::vector const & valueSize, PersistenceError & dbError);
+ void hasKeys(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector & results, PersistenceError & dbError);
+ void removeKeys(uint64_t store, std::vector const & keyData, std::vector const & keySize, int32_t & totalKeysRemoved, PersistenceError & dbError);
};
} } } } }
diff --git a/com.ibm.streamsx.dps/impl/include/MongoDBLayer.h b/com.ibm.streamsx.dps/impl/include/MongoDBLayer.h
index ae5ccc0..a41adc9 100644
--- a/com.ibm.streamsx.dps/impl/include/MongoDBLayer.h
+++ b/com.ibm.streamsx.dps/impl/include/MongoDBLayer.h
@@ -172,8 +172,11 @@ namespace distributed
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
bool removeLock(uint64_t lock, PersistenceError & lkError);
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
- void getKeys(uint64_t store, std::vector & keysBuffer, std::vector & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);
- void getValue(std::string const & storeIdString, char const * & key, uint32_t const & keySize, unsigned char * & value, uint32_t & valueSize, uint64_t & error);
+ void getKeys(uint64_t store, std::vector & keysBuffer, std::vector & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);
+ void getValues(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector & valueData, std::vector & valueSize, PersistenceError & dbError);
+ void putKVPairs(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector const & valueData, std::vector const & valueSize, PersistenceError & dbError);
+ void hasKeys(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector & results, PersistenceError & dbError);
+ void removeKeys(uint64_t store, std::vector const & keyData, std::vector const & keySize, int32_t & totalKeysRemoved, PersistenceError & dbError);
};
} } } } }
diff --git a/com.ibm.streamsx.dps/impl/include/RedisClusterDBLayer.h b/com.ibm.streamsx.dps/impl/include/RedisClusterDBLayer.h
index a2d58f9..954f761 100644
--- a/com.ibm.streamsx.dps/impl/include/RedisClusterDBLayer.h
+++ b/com.ibm.streamsx.dps/impl/include/RedisClusterDBLayer.h
@@ -181,10 +181,13 @@ namespace distributed
std::string const & baseUrl, std::string const & apiEndpoint, std::string const & queryParams,
std::string const & jsonRequest, std::string & jsonResponse, PersistenceError & dbError);
bool runDataStoreCommand(std::vector const & cmdList, std::string & resultValue, PersistenceError & dbError);
- void getKeys(uint64_t store, std::vector & keysBuffer, std::vector & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);
- void getValue(std::string const & storeIdString, char const * & key, uint32_t const & keySize, unsigned char * & value, uint32_t & valueSize, uint64_t & error);
+ void getKeys(uint64_t store, std::vector & keysBuffer, std::vector & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);
+ void getValues(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector & valueData, std::vector & valueSize, PersistenceError & dbError);
+ void putKVPairs(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector const & valueData, std::vector const & valueSize, PersistenceError & dbError);
+ void hasKeys(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector & results, PersistenceError & dbError);
+ void removeKeys(uint64_t store, std::vector const & keyData, std::vector const & keySize, int32_t & totalKeysRemoved, PersistenceError & dbError);
- // Lock related methods.
+ // Lock related methods.
uint64_t createOrGetLock(std::string const & name, PersistenceError & lkError);
void releaseLock(uint64_t lock, PersistenceError & lkError);
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
diff --git a/com.ibm.streamsx.dps/impl/include/RedisClusterPlusPlusDBLayer.h b/com.ibm.streamsx.dps/impl/include/RedisClusterPlusPlusDBLayer.h
index 930a98e..cc44124 100644
--- a/com.ibm.streamsx.dps/impl/include/RedisClusterPlusPlusDBLayer.h
+++ b/com.ibm.streamsx.dps/impl/include/RedisClusterPlusPlusDBLayer.h
@@ -156,10 +156,13 @@ namespace distributed
std::string const & baseUrl, std::string const & apiEndpoint, std::string const & queryParams,
std::string const & jsonRequest, std::string & jsonResponse, PersistenceError & dbError);
bool runDataStoreCommand(std::vector const & cmdList, std::string & resultValue, PersistenceError & dbError);
- void getKeys(uint64_t store, std::vector & keysBuffer, std::vector & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);
- void getValue(std::string const & storeIdString, char const * & key, uint32_t const & keySize, unsigned char * & value, uint32_t & valueSize, uint64_t & error);
+ void getKeys(uint64_t store, std::vector & keysBuffer, std::vector & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);
+ void getValues(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector & valueData, std::vector & valueSize, PersistenceError & dbError);
+ void putKVPairs(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector const & valueData, std::vector const & valueSize, PersistenceError & dbError);
+ void hasKeys(uint64_t store, std::vector const & keyData, std::vector const & keySize, std::vector & results, PersistenceError & dbError);
+ void removeKeys(uint64_t store, std::vector