Skip to content

Commit

Permalink
Merge pull request #109 from markheger/develop
Browse files Browse the repository at this point in the history
v2.4.x fix #108
  • Loading branch information
markheger authored Jun 24, 2020
2 parents 4c0adf6 + 6a2ba8b commit 2b4e379
Show file tree
Hide file tree
Showing 10 changed files with 291 additions and 34 deletions.
4 changes: 4 additions & 0 deletions com.ibm.streamsx.eventstore/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Changes
==========

## v2.4.1:

* [#108](https://github.com/IBMStreams/streamsx.eventstore/issues/108) Improved error handling in EventStoreSink.updateInsertSpeedMetrics

## v2.4.0:

* [#106](https://github.com/IBMStreams/streamsx.eventstore/issues/106) Client library updated for Db2 EventStore 2.0.0.5
Expand Down
6 changes: 5 additions & 1 deletion com.ibm.streamsx.eventstore/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ IBM Db2 Event Store is an in-memory database designed to rapidly ingest and anal
This is an overview of changes for major and minor version upgrades. For details, see the releases in public [https://github.com/IBMStreams/streamsx.eventstore/releases|GitHub].
++ What is changed in version 2.4.1
* Improved error handling in EventStoreSink.updateInsertSpeedMetrics
++ What is changed in version 2.4.0
* Client library updated for Db2 EventStore 2.0.0.5
Expand All @@ -48,7 +52,7 @@ This is an overview of changes for major and minor version upgrades. For details
* Supports globalized messages with unique message IDs
]]></info:description>
<info:version>2.4.0</info:version>
<info:version>2.4.1</info:version>
<info:requiredProductVersion>4.3.0.0</info:requiredProductVersion>
</info:identity>
<info:dependencies/>
Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.eventstore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<groupId>com.ibm.streamsx.eventstore</groupId>
<artifactId>streamsx.eventstore</artifactId>
<packaging>jar</packaging>
<version>2.4.0</version>
<version>2.4.1</version>
<name>com.ibm.streamsx.eventstore</name>
<repositories>
<repository>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public class EventStoreSink extends AbstractOperator implements StateHandler {
public static final long CONSISTENT_REGION_DRAIN_WAIT_TIME = 180000;

private boolean isInsertSpeedMetricSet = false;
ArrayList<Long> insertTimes = new ArrayList<Long>();
ArrayList<Long> insertTimes = null;

// Initialize the metrics
@CustomMetric (kind = Metric.Kind.COUNTER, name = "nActiveInserts", description = "Number of active insert requests")
Expand Down Expand Up @@ -307,7 +307,7 @@ public void run(){
public synchronized void initialize(OperatorContext context)
throws Exception {
super.initialize(context);

insertTimes = new ArrayList<Long>();
// Initialize the operator information from scratch
startOperatorSetup(context, 0L, 0L);
}
Expand Down Expand Up @@ -1089,32 +1089,40 @@ private void submitResultTuple(LinkedList</*Row*/Tuple> batch, boolean inserted)
}

public void updateInsertSpeedMetrics (long insertDuration) {
if (false == isInsertSpeedMetricSet) {
// set initial values after first upload
this.lowestInsertTime.setValue(insertDuration);
this.highestInsertTime.setValue(insertDuration);
this.averageInsertTime.setValue(insertDuration);
isInsertSpeedMetricSet = true;
}
else {
// metrics for insert duration (time to insert a batch)
if (insertDuration < this.lowestInsertTime.getValue()) {
this.lowestInsertTime.setValue(insertDuration);
}
if (insertDuration > this.highestInsertTime.getValue()) {
this.highestInsertTime.setValue(insertDuration);
}
insertTimes.add(insertDuration);
// calculate average
long total = 0;
for(int i = 0; i < insertTimes.size(); i++) {
total += insertTimes.get(i);
}
this.averageInsertTime.setValue(total / insertTimes.size());
// avoid that arrayList is growing unlimited
if (insertTimes.size() > 10000) {
insertTimes.remove(0);
try {
if ((0 != insertDuration) && (!shutdown)) {
if (false == isInsertSpeedMetricSet) {
// set initial values after first upload
this.lowestInsertTime.setValue(insertDuration);
this.highestInsertTime.setValue(insertDuration);
this.averageInsertTime.setValue(insertDuration);
isInsertSpeedMetricSet = true;
}
else {
// metrics for insert duration (time to insert a batch)
if (insertDuration < this.lowestInsertTime.getValue()) {
this.lowestInsertTime.setValue(insertDuration);
}
if (insertDuration > this.highestInsertTime.getValue()) {
this.highestInsertTime.setValue(insertDuration);
}
insertTimes.add(insertDuration);
// calculate average
long total = 0;
for(int i = 0; i < insertTimes.size(); i++) {
total += insertTimes.get(i);
}
if ((0 != total) && (0 < insertTimes.size())) {
this.averageInsertTime.setValue(total / insertTimes.size());
}
// avoid that arrayList is growing unlimited
if (insertTimes.size() > 10000) {
insertTimes.remove(0);
}
}
}
} catch (Exception e) {
tracer.log(TraceLevel.ERROR, "Failed to update metrics: " + e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public composite InsertTest () {
Beacon: id=(int32)IterationCount(), id2=(int64)IterationCount(), name='SAMPLE';
}

// The output streams contains an attribute "_Inserted_" to indicate the result of the insert (true or false).
@parallel (width=3)
() as EventStoreSink = com.ibm.streamsx.eventstore::EventStoreSink(Beacon) {
param
vmArg : "-Xmx4096m";
Expand Down
32 changes: 28 additions & 4 deletions tests/spl-test/test_eventstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ def _build_launch_validate(self, name, composite_name, parameters, toolkit_name,
topo = Topology(name)
self._add_toolkits(topo, toolkit_name)
if self.es_keystore is not None:
self._add_store_file(topo, self.es_keystore)
self._add_store_file(topo, self.es_keystore)

#streamsx.spl.toolkit.add_toolkit_dependency(topo, 'com.ibm.streamsx.eventstore', '[2.4.1,3.0.0)')

params = parameters
# Call the test composite
Expand Down Expand Up @@ -215,6 +217,26 @@ def test_insert_optional_types(self):

self._build_launch_validate(name, "com.ibm.streamsx.eventstore.sample::OptionalTypesSampleComp", params, '../../samples/EventStoreNullableColumnSample', num_expected, True)

def test_parallel_single_pe(self):
print ('\n---------'+str(self))
name = 'test_parallel_single_pe'
if (streams_install_env_var()):
self._index_tk(self.samples_location)
num_expected = 100
batch_size = 50
params = {'connectionString': self.connection, 'databaseName': self.database, 'tableName': 'StreamsSample2', 'batchSize':batch_size, 'frontEndConnectionFlag':self.front_end_connection_flag, 'iterations': num_expected}
if self.es_password and self.es_user is not None:
params['eventStoreUser'] = self.es_user
params['eventStorePassword'] = self.es_password
if self.es_keystore_password and self.es_truststore_password is not None:
params['keyStore'] = 'opt/clientkeystore'
params['trustStore'] = 'opt/clientkeystore'
params['keyStorePassword'] = self.es_keystore_password
params['trustStorePassword'] = self.es_truststore_password

self._build_launch_validate(name, "com.ibm.streamsx.eventstore.sample::ParallelSampleComp", params, 'udp.test', num_expected, True)


# rename to enable it, remove _
def _test_insert_consistent_region(self):
print ('\n---------'+str(self))
Expand Down Expand Up @@ -256,15 +278,17 @@ def _create_stream(self, topo):
schema=StreamSchema('tuple<int32 id, rstring name>').as_tuple()
return s.map(lambda x : (x,'X'+str(x*2)), schema=schema)

# rename to enable it, remove _
def _test_insert_udp(self):
# each channel isolation, each EventStoreSink in an own PE
def test_insert_udp(self):
print ('\n---------'+str(self))
topo = Topology('test_insert_udp')
self._add_toolkits(topo, None)
topo.add_pip_package('streamsx.eventstore')
s = self._create_stream(topo)
result_schema = StreamSchema('tuple<int32 id, rstring name, boolean _Inserted_>')
# user-defined parallelism with two channels (two EventStoreSink operators)
res = es.insert(s.parallel(2), table='SampleTable', database=self.database, connection=self.connection, schema=result_schema, primary_key='id', partitioning_key='id', front_end_connection_flag=self.front_end_connection_flag, user=self.es_user, password=self.es_password, truststore=self.es_truststore, truststore_password=self.es_truststore_password, keystore=self.es_keystore, keystore_password=self.es_keystore_password)
res = res.end_parallel()
res.print()
#self._build_only('test_insert_udp', topo)
tester = Tester(topo)
Expand All @@ -281,9 +305,9 @@ def _test_insert_udp(self):
def test_insert_with_app_config(self):
print ('\n---------'+str(self))
self._create_app_config()
return
topo = Topology('test_insert_with_app_config')
self._add_toolkits(topo, None)
topo.add_pip_package('streamsx.eventstore')
s = self._create_stream(topo)
result_schema = StreamSchema('tuple<int32 id, rstring name, boolean _Inserted_>')
res = es.insert(s, config='eventstore', table='SampleTable', schema=result_schema, primary_key='id', partitioning_key='id', front_end_connection_flag=self.front_end_connection_flag, truststore=self.es_truststore, keystore=self.es_keystore)
Expand Down
7 changes: 7 additions & 0 deletions tests/spl-test/udp.test/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/output/
/.toolkitList
/toolkit.xml
/.apt_generated/
/doc/
/.classpath
/opt/
Empty file.
Loading

0 comments on commit 2b4e379

Please sign in to comment.