Skip to content

Commit

Permalink
Merge branch 'release_3.3.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
brandtol committed Jan 18, 2018
2 parents 605f238 + 9ab61a8 commit bcd868b
Show file tree
Hide file tree
Showing 21 changed files with 1,682 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,18 @@ The machine name, Linux OS version and the CPU architecture (x86 or PPC) will be
</function>
<function>
<description>Allows for setting a path to the configuration file. Relative paths are relative to the toolkit root directory. If not set, then 'etc/no-sql-kv-store-servers.cfg' file will be used.
@param dpsConfigFile the path to the configuration file that should be used
If a file with the given name is not found, the DPS toolkit tries to read the configuration from a Streams application configuration object with the given name.
In order to use the Streams application configuration, the user must call the dpsSetConfigFile method to specify the Streams app config name.
In the Streams application configuration, the user has to specify every configuration line as an individual property.
The names of the properties are as shown below. Following is an example of how a DPS application configuration named dps-cfg-for-app1 would be created using the streamtool:

streamtool mkappconfig --property dps.nosql.db=redis dps-cfg-for-app1
streamtool chappconfig --property dps.server1=MyMachine1:6379:MyPassword dps-cfg-for-app1
streamtool chappconfig --property dps.server2=MyMachine2:6379:MyPassword dps-cfg-for-app1
streamtool chappconfig --property dps.server3=MyMachine3:6379:MyPassword dps-cfg-for-app1
streamtool chappconfig --property dps.server4=MyMachine4:6379:MyPassword dps-cfg-for-app1

@param dpsConfigFile the path to the configuration file or the name of an application configuration that should be used
@return returns always true</description>
<prototype>public stateful boolean dpsSetConfigFile(rstring dpsConfigFile)</prototype>
</function>
Expand All @@ -634,7 +645,7 @@ The machine name, Linux OS version and the CPU architecture (x86 or PPC) will be
if (dpsReconnect() == false) {
// if that was not successful, give up
abort();
{
}
}

</description>
Expand Down
3 changes: 2 additions & 1 deletion com.ibm.streamsx.dps/impl/include/RedisClusterDBLayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ namespace distributed
Cluster<redisContext>::ptr_t redis_cluster;
redisReply *redis_cluster_reply;
RedisClusterDBLayer *redisClusterDBLayerPtr;
std::string password_for_redis_cluster;

RedisClusterDBLayerIterator();
~RedisClusterDBLayerIterator();
Expand All @@ -99,7 +100,7 @@ namespace distributed
private:
Cluster<redisContext>::ptr_t redis_cluster;
redisReply *redis_cluster_reply;

std::string password_for_redis_cluster;

bool readStoreInformation(std::string const & storeIdString, PersistenceError & dbError,
uint32_t & dataItemCnt, std::string & storeName,
Expand Down
118 changes: 103 additions & 15 deletions com.ibm.streamsx.dps/impl/src/DistributedProcessStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,24 +99,32 @@ namespace distributed
//
// Read all the lines in the file.
string line;
ifstream myfile (confFile.c_str());
int32 serverCnt = 0;
bool noSqlKvStoreProductNameFound = false;

if (myfile.is_open()) {
while (myfile.good()){
getline (myfile, line);
streams_boost::algorithm::trim(line);

// If it is an empty line, skip processing it.
if (line == "") {
// If the config file is present, we will use it.
// Otherwise, user may have provided the name of an app config in Streams.
// In that case, we will try to read it from the Streams app config.
if (streams_boost::filesystem::exists(confFile)) {
SPLAPPTRC(L_ERROR, "Going to read the DPS configuration details from the DPS config file '" +
confFile + "'.", "DistributedProcessStore");
ifstream myfile (confFile.c_str());

if (myfile.is_open()) {
while (myfile.good()){
getline (myfile, line);
streams_boost::algorithm::trim(line);

// If it is an empty line, skip processing it.
if (line == "") {
continue;
}
}

// Check whether it is a comment line. i.e. very first character starting with #
size_t pos = line.find_first_of("#", 0);
// Check whether it is a comment line. i.e. very first character starting with #
size_t pos = line.find_first_of("#", 0);

if ((pos == string::npos) || (pos > 0)) {
if ((pos == string::npos) || (pos > 0)) {
if (noSqlKvStoreProductNameFound == false) {
// This must be the very first non-comment line containing the
// name of the no-sql store product being configured.
Expand All @@ -126,18 +134,98 @@ namespace distributed
dbServers.insert(line);
serverCnt++;
}
}
}
}
}

myfile.close();
myfile.close();
}
} else {
// Config file is not present.
// So, we will attempt to get the DPS configuration details from the Streams app config.
// In order to use the Streams app config, user must first call the dpsSetConfigFile API to
// specify the Streams app config name for a given PE or application. That value will be passed to
// this method via the dpsConfigFile method argument.
// In the Streams app config, we want users to specify every configuration line as an individual property.
// Names of the properties should be as shown below.
// Following is an example of how a DPS app config would like like.
//
// streamtool mkappconfig -d s1 -i s1 --property dps.nosql.db=redis dps-cfg-for-app1
// streamtool chappconfig -d s1 -i s1 --property dps.server1=MyMachine1:6379:MyPassword dps-cfg-for-app1
// streamtool chappconfig -d s1 -i s1 --property dps.server2=MyMachine2:6379:MyPassword dps-cfg-for-app1
// streamtool chappconfig -d s1 -i s1 --property dps.server3=MyMachine3:6379:MyPassword dps-cfg-for-app1
// streamtool chappconfig -d s1 -i s1 --property dps.server4=MyMachine4:6379:MyPassword dps-cfg-for-app1
//
// If you do a listing on a dps app config, it should look like the following.
// streamtool getappconfig -d s1 -i s1 dps-cfg-for-app1
// dps.nosql.db=redis
// dps.server1=MyMachine1:6379:MyPassword
// dps.server2=MyMachine2:6379:MyPassword
// dps.server3=MyMachine3:6379:MyPassword
// dps.server4=MyMachine4:6379:MyPassword
//
if (DistributedProcessStore::dpsConfigFile_ == "") {
// There is no config file present. So, next choice is to use the Streams app config.
// But, user has not given any app config name either. They must call the dpsSetConfigFile API to do that.
std::string exceptionError =
"Neither a DPS configuration file nor a Streams app config name is available to read the DPS configuration details. Please ensure that DPS is configured via a configuration file or a Streams app config. Throwing an exception now.";
SPLAPPTRC(L_ERROR, exceptionError, "DistributedProcessStore");
throw(SPL::SPLRuntimeException("fetchDBParameters", exceptionError));
}

// Before reading from the Streams app config, let us ensure that we are not going to
// treat a non-existing file path as the Streams app config name.
// If there is a forward slash character, then it refers to a file path and not to a
// Streams app config.
if (streams_boost::algorithm::contains(dpsConfigFile, "/") == true) {
// This is a non-existing DPS configuration file.
std::string exceptionError = "DPS configuration file '" + dpsConfigFile +
"' doesn't exist. Throwing an exception now.";
throw(SPL::SPLRuntimeException("fetchDBParameters", exceptionError));
}

SPLAPPTRC(L_ERROR, "Going to read the DPS configuration details from the Streams app config '" +
dpsConfigFile + "'.", "DistributedProcessStore");
// Let us first get the NoSQL db name from the Streams app config.
SPL::rstring defaultValue = "";
SPL::rstring propertyName = "dps.nosql.db";
noSqlKvStoreProductName = SPL::Functions::Utility::getApplicationConfigurationProperty(dpsConfigFile, propertyName, defaultValue).string();
std::string serverConfig = "";

if (noSqlKvStoreProductName != defaultValue.string()) {
// We got the NoSQL DB name.
// We can now get all the server names.
propertyName = "dps.server";

while(true) {
char buf[68];
sprintf(buf, "%d", serverCnt+1);
propertyName = "dps.server" + std::string(buf);
serverConfig = SPL::Functions::Utility::getApplicationConfigurationProperty(dpsConfigFile, propertyName, defaultValue).string();

if(serverConfig != defaultValue.string()) {
dbServers.insert(serverConfig);
serverCnt++;
} else {
// We didn't get any value for the given property.
// Break out of this loop.
break;
}
}
} else {
std::string error = "Either the Streams app config '" + dpsConfigFile + "' doesn't exist or " +
"it doesn't contain the required DPS configuration details.";
SPLAPPTRC(L_ERROR, error, "DistributedProcessStore");
}
}

if (serverCnt == 0) {
// SPLAPPLOG is causing it to get stuck in RHEL6/CentOS6 (RHEL7/CentOS7 is fine) when the @catch annotation is used in the calling SPL code.
// SPLAPPLOG(L_ERROR, DPSMSG_CANNOT_GET_PRODUCT_NAME(confFile), "DistributedProcessStore");
std::string error = "Cannot get NoSQL K/V store product name and/or the server names from the configuration file '"+confFile+"'";
std::string error = "Cannot get NoSQL K/V store product name and/or the server names from the DPS configuration.";
SPLAPPTRC(L_ERROR, error, "DistributedProcessStore");
throw(SPL::SPLRuntimeException("fetchDBParameters", error));
} else {
SPLAPPTRC(L_ERROR, "Completed reading the DPS configuration details.", "DistributedProcessStore");
}
}

Expand Down
Loading

0 comments on commit bcd868b

Please sign in to comment.