diff --git a/.clang-format b/.clang-format
new file mode 100644
index 00000000..f41fe296
--- /dev/null
+++ b/.clang-format
@@ -0,0 +1,221 @@
+# .clang-format for Qt Creator
+#
+# This is for clang-format >= 5.0.
+#
+# The configuration below follows the Qt Creator Coding Rules [1] as closely as
+# possible. For documentation of the options, see [2].
+#
+# Use ../../tests/manual/clang-format-for-qtc/test.cpp for documenting problems
+# or testing changes.
+#
+# In case you update this configuration please also update the qtcStyle() in src\plugins\clangformat\clangformatutils.cpp
+#
+# [1] https://doc-snapshots.qt.io/qtcreator-extending/coding-style.html
+# [2] https://clang.llvm.org/docs/ClangFormatStyleOptions.html
+#
+---
+Language: Cpp
+AccessModifierOffset: -4
+AlignAfterOpenBracket: DontAlign
+AlignConsecutiveAssignments: false
+AlignConsecutiveDeclarations: false
+AlignEscapedNewlines: DontAlign
+AlignOperands: true
+AlignTrailingComments: false
+AllowAllParametersOfDeclarationOnNextLine: true
+AllowShortLambdasOnASingleLine: None
+AllowShortCaseLabelsOnASingleLine: false
+AllowShortFunctionsOnASingleLine: None
+AllowShortIfStatementsOnASingleLine: Never
+AllowShortLoopsOnASingleLine: false
+AlwaysBreakAfterReturnType: None
+AlwaysBreakBeforeMultilineStrings: false
+AlwaysBreakTemplateDeclarations: true
+BinPackArguments: true
+BinPackParameters: true
+BraceWrapping:
+ AfterClass: false
+ AfterControlStatement: false
+ AfterEnum: false
+ AfterFunction: false
+ AfterNamespace: false
+ AfterObjCDeclaration: false
+ AfterStruct: false
+ AfterUnion: false
+ BeforeCatch: false
+ BeforeElse: false
+ IndentBraces: false
+ SplitEmptyFunction: true
+ SplitEmptyRecord: true
+ SplitEmptyNamespace: true
+BreakBeforeBinaryOperators: All
+BreakBeforeBraces: Custom
+BreakBeforeInheritanceComma: false
+BreakBeforeTernaryOperators: true
+BreakConstructorInitializersBeforeComma: false
+BreakConstructorInitializers: BeforeComma
+BreakAfterJavaFieldAnnotations: true
+BreakStringLiterals: true
+ColumnLimit: 0
+CommentPragmas: '^ IWYU pragma:'
+CompactNamespaces: false
+ConstructorInitializerAllOnOneLineOrOnePerLine: false
+ConstructorInitializerIndentWidth: 4
+ContinuationIndentWidth: 8
+Cpp11BracedListStyle: false
+DerivePointerAlignment: false
+DisableFormat: false
+ExperimentalAutoDetectBinPacking: false
+FixNamespaceComments: true
+ForEachMacros:
+ - forever # avoids { wrapped to next line
+ - foreach
+ - Q_FOREACH
+ - BOOST_FOREACH
+IncludeCategories:
+ - Regex: '^
+ Everyone is permitted to copy and distribute verbatim copies
+ of this license document, but changing it is not allowed.
+
+
+ This version of the GNU Lesser General Public License incorporates
+the terms and conditions of version 3 of the GNU General Public
+License, supplemented by the additional permissions listed below.
+
+ 0. Additional Definitions.
+
+ As used herein, "this License" refers to version 3 of the GNU Lesser
+General Public License, and the "GNU GPL" refers to version 3 of the GNU
+General Public License.
+
+ "The Library" refers to a covered work governed by this License,
+other than an Application or a Combined Work as defined below.
+
+ An "Application" is any work that makes use of an interface provided
+by the Library, but which is not otherwise based on the Library.
+Defining a subclass of a class defined by the Library is deemed a mode
+of using an interface provided by the Library.
+
+ A "Combined Work" is a work produced by combining or linking an
+Application with the Library. The particular version of the Library
+with which the Combined Work was made is also called the "Linked
+Version".
+
+ The "Minimal Corresponding Source" for a Combined Work means the
+Corresponding Source for the Combined Work, excluding any source code
+for portions of the Combined Work that, considered in isolation, are
+based on the Application, and not on the Linked Version.
+
+ The "Corresponding Application Code" for a Combined Work means the
+object code and/or source code for the Application, including any data
+and utility programs needed for reproducing the Combined Work from the
+Application, but excluding the System Libraries of the Combined Work.
+
+ 1. Exception to Section 3 of the GNU GPL.
+
+ You may convey a covered work under sections 3 and 4 of this License
+without being bound by section 3 of the GNU GPL.
+
+ 2. Conveying Modified Versions.
+
+ If you modify a copy of the Library, and, in your modifications, a
+facility refers to a function or data to be supplied by an Application
+that uses the facility (other than as an argument passed when the
+facility is invoked), then you may convey a copy of the modified
+version:
+
+ a) under this License, provided that you make a good faith effort to
+ ensure that, in the event an Application does not supply the
+ function or data, the facility still operates, and performs
+ whatever part of its purpose remains meaningful, or
+
+ b) under the GNU GPL, with none of the additional permissions of
+ this License applicable to that copy.
+
+ 3. Object Code Incorporating Material from Library Header Files.
+
+ The object code form of an Application may incorporate material from
+a header file that is part of the Library. You may convey such object
+code under terms of your choice, provided that, if the incorporated
+material is not limited to numerical parameters, data structure
+layouts and accessors, or small macros, inline functions and templates
+(ten or fewer lines in length), you do both of the following:
+
+ a) Give prominent notice with each copy of the object code that the
+ Library is used in it and that the Library and its use are
+ covered by this License.
+
+ b) Accompany the object code with a copy of the GNU GPL and this license
+ document.
+
+ 4. Combined Works.
+
+ You may convey a Combined Work under terms of your choice that,
+taken together, effectively do not restrict modification of the
+portions of the Library contained in the Combined Work and reverse
+engineering for debugging such modifications, if you also do each of
+the following:
+
+ a) Give prominent notice with each copy of the Combined Work that
+ the Library is used in it and that the Library and its use are
+ covered by this License.
+
+ b) Accompany the Combined Work with a copy of the GNU GPL and this license
+ document.
+
+ c) For a Combined Work that displays copyright notices during
+ execution, include the copyright notice for the Library among
+ these notices, as well as a reference directing the user to the
+ copies of the GNU GPL and this license document.
+
+ d) Do one of the following:
+
+ 0) Convey the Minimal Corresponding Source under the terms of this
+ License, and the Corresponding Application Code in a form
+ suitable for, and under terms that permit, the user to
+ recombine or relink the Application with a modified version of
+ the Linked Version to produce a modified Combined Work, in the
+ manner specified by section 6 of the GNU GPL for conveying
+ Corresponding Source.
+
+ 1) Use a suitable shared library mechanism for linking with the
+ Library. A suitable mechanism is one that (a) uses at run time
+ a copy of the Library already present on the user's computer
+ system, and (b) will operate properly with a modified version
+ of the Library that is interface-compatible with the Linked
+ Version.
+
+ e) Provide Installation Information, but only if you would otherwise
+ be required to provide such information under section 6 of the
+ GNU GPL, and only to the extent that such information is
+ necessary to install and execute a modified version of the
+ Combined Work produced by recombining or relinking the
+ Application with a modified version of the Linked Version. (If
+ you use option 4d0, the Installation Information must accompany
+ the Minimal Corresponding Source and Corresponding Application
+ Code. If you use option 4d1, you must provide the Installation
+ Information in the manner specified by section 6 of the GNU GPL
+ for conveying Corresponding Source.)
+
+ 5. Combined Libraries.
+
+ You may place library facilities that are a work based on the
+Library side by side in a single library together with other library
+facilities that are not Applications and are not covered by this
+License, and convey such a combined library under terms of your
+choice, if you do both of the following:
+
+ a) Accompany the combined library with a copy of the same work based
+ on the Library, uncombined with any other library facilities,
+ conveyed under the terms of this License.
+
+ b) Give prominent notice with the combined library that part of it
+ is a work based on the Library, and explaining where to find the
+ accompanying uncombined form of the same work.
+
+ 6. Revised Versions of the GNU Lesser General Public License.
+
+ The Free Software Foundation may publish revised and/or new versions
+of the GNU Lesser General Public License from time to time. Such new
+versions will be similar in spirit to the present version, but may
+differ in detail to address new problems or concerns.
+
+ Each version is given a distinguishing version number. If the
+Library as you received it specifies that a certain numbered version
+of the GNU Lesser General Public License "or any later version"
+applies to it, you have the option of following the terms and
+conditions either of that published version or of any later version
+published by the Free Software Foundation. If the Library as you
+received it does not specify a version number of the GNU Lesser
+General Public License, you may choose any version of the GNU Lesser
+General Public License ever published by the Free Software Foundation.
+
+ If the Library as you received it specifies that a proxy can decide
+whether future versions of the GNU Lesser General Public License shall
+apply, that proxy's public statement of acceptance of any version is
+permanent authorization for you to choose that version for the
+Library.
\ No newline at end of file
diff --git a/README.md b/README.md
new file mode 100644
index 00000000..e69de29b
diff --git a/assets/OpenCMW_logo.odg b/assets/OpenCMW_logo.odg
new file mode 100644
index 00000000..566d3706
Binary files /dev/null and b/assets/OpenCMW_logo.odg differ
diff --git a/client/pom.xml b/client/pom.xml
new file mode 100644
index 00000000..ae3a1d3b
--- /dev/null
+++ b/client/pom.xml
@@ -0,0 +1,50 @@
+
+
+ 4.0.0
+
+
+ io.opencmw
+ opencmw
+ ${revision}${sha1}${changelist}
+ ../pom.xml
+
+
+ client
+
+
+ OpenCmw, CmwLight, and RESTful client implementations.
+
+
+
+
+ io.opencmw
+ core
+ ${revision}${sha1}${changelist}
+
+
+ de.gsi.dataset
+ chartfx-dataset
+ ${version.chartfx}
+
+
+
+ com.squareup.okhttp3
+ okhttp
+ ${version.okHttp3}
+
+
+ com.squareup.okhttp3
+ okhttp-sse
+ ${version.okHttp3}
+
+
+ com.squareup.okhttp3
+ mockwebserver
+ ${version.okHttp3}
+ test
+
+
+
+
+
diff --git a/client/src/main/java/io/opencmw/client/DataSource.java b/client/src/main/java/io/opencmw/client/DataSource.java
new file mode 100644
index 00000000..822ea7b7
--- /dev/null
+++ b/client/src/main/java/io/opencmw/client/DataSource.java
@@ -0,0 +1,118 @@
+package io.opencmw.client;
+
+import java.time.Duration;
+import java.util.List;
+
+import org.zeromq.ZContext;
+import org.zeromq.ZMQ.Socket;
+import org.zeromq.ZMsg;
+
+import io.opencmw.serialiser.IoSerialiser;
+import io.opencmw.utils.NoDuplicatesList;
+
+/**
+ * Interface for DataSources to be added to an EventStore by a single event loop.
+ * Should provide a static boolean matches(String address) function to determine whether
+ * it is eligible for a given address.
+ */
+public abstract class DataSource {
+ private static final List IMPLEMENTATIONS = new NoDuplicatesList<>();
+
+ private DataSource() {
+ // prevent implementers from implementing default constructor
+ }
+
+ /**
+ * Constructor
+ * @param endpoint Endpoint to subscribe to
+ */
+ public DataSource(final String endpoint) {
+ if (endpoint == null || endpoint.isBlank() || !getFactory().matches(endpoint)) {
+ throw new UnsupportedOperationException(this.getClass().getName() + " DataSource Implementation does not support endpoint: " + endpoint);
+ }
+ }
+
+ /**
+ * Factory method to get a DataSource for a given endpoint
+ * @param endpoint endpoint address
+ * @return if there is a DataSource implementation for the protocol of the endpoint return a Factory to create a new
+ * Instance of this DataSource
+ * @throws UnsupportedOperationException in case there is no valid implementation
+ */
+ public static Factory getFactory(final String endpoint) {
+ for (Factory factory : IMPLEMENTATIONS) {
+ if (factory.matches(endpoint)) {
+ return factory;
+ }
+ }
+ throw new UnsupportedOperationException("No DataSource implementation available for endpoint: " + endpoint);
+ }
+
+ public static void register(final Factory factory) {
+ IMPLEMENTATIONS.add(0, factory); // custom added implementations are added in front to be discovered first
+ }
+
+ /**
+ * Get Socket to wait for in the event loop.
+ * The main event thread will wait for data becoming available on this socket.
+ * The socket might be used to receive the actual data or it might just be used to notify the main thread.
+ * @return a Socket for the event loop to wait upon
+ */
+ public abstract Socket getSocket();
+
+ protected abstract Factory getFactory();
+
+ /**
+ * Gets called whenever data is available on the DataSoure's socket.
+ * Should then try to receive data and return any results back to the calling event loop.
+ * @return null if there is no more data available, a Zero length Zmsg if there was data which was only used internally
+ * or a ZMsg with [reqId, endpoint, byte[] data, [byte[] optional RBAC token]]
+ */
+ public abstract ZMsg getMessage();
+
+ /**
+ * Perform housekeeping tasks like connection management, heartbeats, subscriptions, etc
+ * @return next time housekeeping duties should be performed
+ */
+ public abstract long housekeeping();
+
+ /**
+ * Subscribe to this endpoint
+ * @param reqId the id to join the result of this subscribe with
+ * @param rbacToken byte array containing signed body hash-key and corresponding RBAC role
+ */
+ public abstract void subscribe(final String reqId, final String endpoint, final byte[] rbacToken);
+
+ /**
+ * Unsubscribe from the endpoint of this DataSource.
+ */
+ public abstract void unsubscribe(final String reqId);
+
+ /**
+ * Perform a get request on this endpoint.
+ * @param requestId request id which later allows to match the returned value to this query.
+ * This is the only mandatory parameter, all the following may be null.
+ * @param endpoint extend the filters originally supplied to the endpoint e.g. "ctx=selector&channel=chanA"
+ * @param filters The serialised filters which will determine which data to update
+ * @param data The serialised data which can be used by the get call
+ * @param rbacToken byte array containing signed body hash-key and corresponding RBAC role
+ */
+ public abstract void get(final String requestId, final String endpoint, final byte[] filters, final byte[] data, final byte[] rbacToken);
+
+ /**
+ * Perform a set request on this endpoint using additional filters
+ * @param requestId request id which later allows to match the returned value to this query.
+ * This is the only mandatory parameter, all the following may be null.
+ * @param endpoint extend the filters originally supplied to the endpoint e.g. "ctx=selector&channel=chanA"
+ * @param filters The serialised filters which will determine which data to update
+ * @param data The serialised data which can be used by the get call
+ * @param rbacToken byte array containing signed body hash-key and corresponding RBAC role
+ */
+ public abstract void set(final String requestId, final String endpoint, final byte[] filters, final byte[] data, final byte[] rbacToken);
+
+ protected interface Factory {
+ boolean matches(final String endpoint);
+ Class extends IoSerialiser> getMatchingSerialiserType(final String endpoint);
+ DataSource newInstance(final ZContext context, final String endpoint, final Duration timeout, final String clientId);
+ }
+}
diff --git a/client/src/main/java/io/opencmw/client/DataSourceFilter.java b/client/src/main/java/io/opencmw/client/DataSourceFilter.java
new file mode 100644
index 00000000..7ba7d582
--- /dev/null
+++ b/client/src/main/java/io/opencmw/client/DataSourceFilter.java
@@ -0,0 +1,63 @@
+package io.opencmw.client;
+
+import io.opencmw.Filter;
+import io.opencmw.serialiser.IoSerialiser;
+
+public class DataSourceFilter implements Filter {
+ public ReplyType eventType = ReplyType.UNKNOWN;
+ public Class extends IoSerialiser> protocolType;
+ public String device;
+ public String property;
+ public DataSourcePublisher.ThePromisedFuture> future;
+ public String context;
+
+ @Override
+ public void clear() {
+ eventType = ReplyType.UNKNOWN;
+ device = "UNKNOWN";
+ property = "UNKNOWN";
+ future = null; // NOPMD - have to clear the future because the events are reused
+ context = "";
+ }
+
+ @Override
+ public void copyTo(final Filter other) {
+ if (other instanceof DataSourceFilter) {
+ final DataSourceFilter otherDSF = (DataSourceFilter) other;
+ otherDSF.eventType = eventType;
+ otherDSF.device = device;
+ otherDSF.property = property;
+ otherDSF.future = future;
+ otherDSF.context = context;
+ }
+ }
+
+ /**
+ * internal enum to track different get/set/subscribe/... transactions
+ */
+ public enum ReplyType {
+ SUBSCRIBE(0),
+ GET(1),
+ SET(2),
+ UNSUBSCRIBE(3),
+ UNKNOWN(-1);
+
+ private final byte id;
+ ReplyType(int id) {
+ this.id = (byte) id;
+ }
+
+ public byte getID() {
+ return id;
+ }
+
+ public static ReplyType valueOf(final int id) {
+ for (ReplyType mode : ReplyType.values()) {
+ if (mode.getID() == id) {
+ return mode;
+ }
+ }
+ return UNKNOWN;
+ }
+ }
+}
diff --git a/client/src/main/java/io/opencmw/client/DataSourcePublisher.java b/client/src/main/java/io/opencmw/client/DataSourcePublisher.java
new file mode 100644
index 00000000..55d14d37
--- /dev/null
+++ b/client/src/main/java/io/opencmw/client/DataSourcePublisher.java
@@ -0,0 +1,463 @@
+package io.opencmw.client;
+
+import static java.util.Objects.requireNonNull;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.SocketType;
+import org.zeromq.ZContext;
+import org.zeromq.ZFrame;
+import org.zeromq.ZMQ;
+import org.zeromq.ZMsg;
+
+import io.opencmw.EventStore;
+import io.opencmw.RingBufferEvent;
+import io.opencmw.filter.EvtTypeFilter;
+import io.opencmw.filter.TimingCtx;
+import io.opencmw.rbac.RbacProvider;
+import io.opencmw.serialiser.IoBuffer;
+import io.opencmw.serialiser.IoClassSerialiser;
+import io.opencmw.serialiser.IoSerialiser;
+import io.opencmw.serialiser.spi.FastByteBuffer;
+import io.opencmw.utils.CustomFuture;
+import io.opencmw.utils.SharedPointer;
+
+import com.lmax.disruptor.EventHandler;
+
+/**
+ * Publishes events from different sources into a common {@link EventStore} and takes care of setting the appropriate
+ * filters and deserialisation of the domain objects.
+ *
+ * The subscribe/unsubscribe/set/get methods can be called from any thread and are decoupled from the actual
+ *
+ * @author Alexander Krimmm
+ * @author rstein
+ */
+public class DataSourcePublisher implements Runnable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DataSourcePublisher.class);
+ private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger();
+ private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+ private static final ZFrame EMPTY_FRAME = new ZFrame(EMPTY_BYTE_ARRAY);
+ public static final int MIN_FRAMES_INTERNAL_MSG = 3;
+ private final String inprocCtrl = "inproc://dsPublisher#" + INSTANCE_COUNT.incrementAndGet();
+ protected final Map> requestFutureMap = new ConcurrentHashMap<>(); //
+ protected final Map clientMap = new ConcurrentHashMap<>(); //
+
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ private final AtomicInteger internalReqIdGenerator = new AtomicInteger(0);
+ private final EventStore eventStore;
+ private final ZMQ.Poller poller;
+ private final ZContext context = new ZContext(1);
+ private final ZMQ.Socket controlSocket;
+ private final IoBuffer byteBuffer = new FastByteBuffer(2000); // zero length buffer to use when there is no data to deserialise
+ private final IoClassSerialiser ioClassSerialiser = new IoClassSerialiser(byteBuffer);
+
+ private final ThreadLocal perThreadControlSocket = new ThreadLocal<>() { // creates a client control socket for each calling thread
+ private ZMQ.Socket result;
+
+ @Override
+ public void remove() {
+ if (result != null) {
+ result.disconnect(inprocCtrl);
+ }
+ super.remove();
+ }
+
+ @Override
+ protected ZMQ.Socket initialValue() {
+ result = context.createSocket(SocketType.DEALER);
+ result.connect(inprocCtrl);
+ return result;
+ }
+ };
+ private final String clientId;
+ private final RbacProvider rbacProvider;
+
+ public DataSourcePublisher(final RbacProvider rbacProvider, final EventStore publicationTarget, final String... clientId) {
+ this(rbacProvider, clientId);
+ eventStore.register((event, sequence, endOfBatch) -> {
+ final DataSourceFilter dataSourceFilter = event.getFilter(DataSourceFilter.class);
+ final ThePromisedFuture> future = dataSourceFilter.future;
+ if (future.replyType == DataSourceFilter.ReplyType.SUBSCRIBE) {
+ final Class> domainClass = future.getRequestedDomainObjType();
+ final ZMsg cmwMsg = event.payload.get(ZMsg.class);
+ requireNonNull(cmwMsg.poll()).getString(Charset.defaultCharset()); // ignore header
+ final byte[] body = requireNonNull(cmwMsg.poll()).getData();
+ final String exc = requireNonNull(cmwMsg.poll()).getString(Charset.defaultCharset());
+ Object domainObj = null;
+ if (body != null && body.length != 0) {
+ ioClassSerialiser.setDataBuffer(FastByteBuffer.wrap(body));
+ domainObj = ioClassSerialiser.deserialiseObject(domainClass);
+ ioClassSerialiser.setDataBuffer(byteBuffer); // allow received byte array to be released
+ }
+ publicationTarget.getRingBuffer().publishEvent((publishEvent, seq, obj) -> {
+ final TimingCtx contextFilter = publishEvent.getFilter(TimingCtx.class);
+ final EvtTypeFilter evtTypeFilter = publishEvent.getFilter(EvtTypeFilter.class);
+ publishEvent.arrivalTimeStamp = event.arrivalTimeStamp;
+ publishEvent.payload = new SharedPointer<>();
+ publishEvent.payload.set(obj);
+ if (exc != null && !exc.isBlank()) {
+ publishEvent.throwables.add(new Exception(exc));
+ }
+ try {
+ contextFilter.setSelector(dataSourceFilter.context, 0);
+ } catch (IllegalArgumentException e) {
+ LOGGER.atError().setCause(e).addArgument(dataSourceFilter.context).log("No valid context: {}");
+ }
+ // contextFilter.acqts = msg.dataContext.acqStamp; // needs to be added?
+ // contextFilter.ctxName = // what should go here?
+ evtTypeFilter.evtType = EvtTypeFilter.DataType.DEVICE_DATA;
+ evtTypeFilter.typeName = dataSourceFilter.device + '/' + dataSourceFilter.property;
+ evtTypeFilter.updateType = EvtTypeFilter.UpdateType.COMPLETE;
+ }, domainObj);
+ } else if (future.replyType == DataSourceFilter.ReplyType.GET) {
+ // get data from socket
+ final ZMsg cmwMsg = event.payload.get(ZMsg.class);
+ requireNonNull(cmwMsg.poll()).getString(StandardCharsets.UTF_8);
+ final byte[] body = requireNonNull(cmwMsg.poll()).getData();
+ final String exc = requireNonNull(cmwMsg.poll()).getString(Charset.defaultCharset());
+ // deserialise
+ Object obj = null;
+ if (body != null && body.length != 0) {
+ ioClassSerialiser.setDataBuffer(FastByteBuffer.wrap(body));
+ obj = ioClassSerialiser.deserialiseObject(future.getRequestedDomainObjType());
+ ioClassSerialiser.setDataBuffer(byteBuffer); // allow received byte array to be released
+ }
+ // notify future
+ if (exc == null || exc.isBlank()) {
+ future.castAndSetReply(obj);
+ } else {
+ future.setException(new Exception(exc));
+ }
+ // publish to ring buffer
+ publicationTarget.getRingBuffer().publishEvent((publishEvent, seq, o) -> {
+ final TimingCtx contextFilter = publishEvent.getFilter(TimingCtx.class);
+ final EvtTypeFilter evtTypeFilter = publishEvent.getFilter(EvtTypeFilter.class);
+ publishEvent.arrivalTimeStamp = event.arrivalTimeStamp;
+ publishEvent.payload = new SharedPointer<>();
+ publishEvent.payload.set(o);
+ if (exc != null && !exc.isBlank()) {
+ publishEvent.throwables.add(new Exception(exc));
+ }
+ try {
+ contextFilter.setSelector(dataSourceFilter.context, 0);
+ } catch (IllegalArgumentException e) {
+ LOGGER.atError().setCause(e).addArgument(dataSourceFilter.context).log("No valid context: {}");
+ }
+ // contextFilter.acqts = msg.dataContext.acqStamp; // needs to be added?
+ // contextFilter.ctxName = // what should go here?
+ evtTypeFilter.evtType = EvtTypeFilter.DataType.DEVICE_DATA;
+ evtTypeFilter.typeName = dataSourceFilter.device + '/' + dataSourceFilter.property;
+ evtTypeFilter.updateType = EvtTypeFilter.UpdateType.COMPLETE;
+ }, obj);
+ } else {
+ // ignore other reply types for now
+ // todo: publish statistics, connection state and getRequests
+ LOGGER.atInfo().addArgument(event.payload.get()).log("{}");
+ }
+ });
+ }
+
+ public DataSourcePublisher(final RbacProvider rbacProvider, final EventHandler eventHandler, final String... clientId) {
+ this(rbacProvider, clientId);
+ eventStore.register(eventHandler);
+ }
+
+ public DataSourcePublisher(final RbacProvider rbacProvider, final String... clientId) {
+ poller = context.createPoller(1);
+ // control socket for adding subscriptions / triggering requests from other threads
+ controlSocket = context.createSocket(SocketType.DEALER);
+ controlSocket.bind(inprocCtrl);
+ poller.register(controlSocket, ZMQ.Poller.POLLIN);
+ // instantiate event store
+ eventStore = EventStore.getFactory().setSingleProducer(true).setFilterConfig(DataSourceFilter.class).build();
+ // register default handlers // TODO: find out how to do this without having to reference them directly
+ // DataSource.register(CmwLightClient.FACTORY);
+ // DataSource.register(RestDataSource.FACTORY);
+ this.clientId = clientId.length == 1 ? clientId[0] : DataSourcePublisher.class.getName();
+ this.rbacProvider = rbacProvider;
+ }
+
+ public ZContext getContext() {
+ return context;
+ }
+
+ public EventStore getEventStore() {
+ return eventStore;
+ }
+
+ public Future set(String endpoint, final Class requestedDomainObjType, final Object requestBody, final RbacProvider... rbacProvider) {
+ return set(endpoint, null, requestBody, requestedDomainObjType, rbacProvider);
+ }
+
+ /**
+ * Perform an asynchronous set request on the given device/property.
+ * Checks if a client for this service already exists and if it does performs the asynchronous get on it, otherwise
+ * it starts a new client and performs it there.
+ *
+ * @param endpoint endpoint address for the property e.g. 'rda3://hostname:port/property?selector&filter',
+ * file:///path/to/directory, mdp://host:port
+ * @param requestFilter optional map of optional filters e.g. Map.of("channelName", "VoltageChannel")
+ * @param requestBody optional domain object payload to be send with the request
+ * @param requestedDomainObjType the requested result domain object type
+ * @param The type of the deserialised requested result domain object
+ * @return A future which will be able to retrieve the deserialised result
+ */
+ public Future set(String endpoint, final Map requestFilter, final Object requestBody, final Class requestedDomainObjType, final RbacProvider... rbacProvider) {
+ return request(DataSourceFilter.ReplyType.SET, endpoint, requestFilter, requestBody, requestedDomainObjType, rbacProvider);
+ }
+
+ public Future get(String endpoint, final Class requestedDomainObjType, final RbacProvider... rbacProvider) {
+ return get(endpoint, null, null, requestedDomainObjType, rbacProvider);
+ }
+
+ /**
+ * Perform an asynchronous get request on the given device/property.
+ * Checks if a client for this service already exists and if it does performs the asynchronous get on it, otherwise
+ * it starts a new client and performs it there.
+ *
+ * @param endpoint endpoint address for the property e.g. 'rda3://hostname:port/property?selector&filter',
+ * file:///path/to/directory, mdp://host:port
+ * @param requestFilter optional map of optional filters e.g. Map.of("channelName", "VoltageChannel")
+ * @param requestBody optional domain object payload to be send with the request
+ * @param requestedDomainObjType the requested result domain object type
+ * @param The type of the deserialised requested result domain object
+ * @return A future which will be able to retrieve the deserialised result
+ */
+ public Future get(String endpoint, final Map requestFilter, final Object requestBody, final Class requestedDomainObjType, final RbacProvider... rbacProvider) {
+ return request(DataSourceFilter.ReplyType.GET, endpoint, requestFilter, requestBody, requestedDomainObjType, rbacProvider);
+ }
+
+ private ThePromisedFuture request(final DataSourceFilter.ReplyType replyType, final String endpoint, final Map requestFilter, final Object requestBody, final Class requestedDomainObjType, final RbacProvider... rbacProvider) {
+ final String requestId = clientId + internalReqIdGenerator.incrementAndGet();
+ final ThePromisedFuture requestFuture = newFuture(endpoint, requestFilter, requestBody, requestedDomainObjType, replyType, requestId);
+ final Class extends IoSerialiser> matchingSerialiser = DataSource.getFactory(endpoint).getMatchingSerialiserType(endpoint);
+
+ // signal socket for get with endpoint and request id
+ final ZMsg msg = new ZMsg();
+ msg.add(new byte[] { replyType.getID() });
+ msg.add(requestId);
+ msg.add(endpoint);
+ if (requestFilter == null) {
+ msg.add(EMPTY_FRAME);
+ } else {
+ ioClassSerialiser.getDataBuffer().reset();
+ ioClassSerialiser.setMatchedIoSerialiser(matchingSerialiser); // needs to be converted in DataSource impl
+ ioClassSerialiser.serialiseObject(requestFilter);
+ msg.add(Arrays.copyOfRange(ioClassSerialiser.getDataBuffer().elements(), 0, ioClassSerialiser.getDataBuffer().position()));
+ }
+ if (requestBody == null) {
+ msg.add(EMPTY_FRAME);
+ } else {
+ ioClassSerialiser.getDataBuffer().reset();
+ ioClassSerialiser.setMatchedIoSerialiser(matchingSerialiser); // needs to be converted in DataSource impl
+ ioClassSerialiser.serialiseObject(requestBody);
+ msg.add(Arrays.copyOfRange(ioClassSerialiser.getDataBuffer().elements(), 0, ioClassSerialiser.getDataBuffer().position()));
+ }
+ // RBAC
+ if (rbacProvider.length > 0 || this.rbacProvider != null) {
+ final RbacProvider rbac = rbacProvider.length > 0 ? rbacProvider[0] : this.rbacProvider; // NOPMD - future use
+ // rbac.sign(msg); // todo: sign message and add rbac token and signature
+ } else {
+ msg.add(EMPTY_FRAME);
+ }
+
+ msg.send(perThreadControlSocket.get());
+ //TODO: do we need the following 'remove()'
+ perThreadControlSocket.remove();
+ return requestFuture;
+ }
+
+ public void subscribe(final String endpoint, final Class requestedDomainObjType) {
+ subscribe(endpoint, requestedDomainObjType, null, null);
+ }
+
+ public String subscribe(final String endpoint, final Class requestedDomainObjType, final Map requestFilter, final Object requestBody, final RbacProvider... rbacProvider) {
+ ThePromisedFuture future = request(DataSourceFilter.ReplyType.SUBSCRIBE, endpoint, requestFilter, requestBody, requestedDomainObjType, rbacProvider);
+ return future.internalRequestID;
+ }
+
+ public void unsubscribe(String requestId) {
+ // signal socket for get with endpoint and request id
+ final ZMsg msg = new ZMsg();
+ msg.add(new byte[] { DataSourceFilter.ReplyType.UNSUBSCRIBE.getID() });
+ msg.add(requestId);
+ msg.add(requestFutureMap.get(requestId).endpoint);
+ msg.send(perThreadControlSocket.get());
+ //TODO: do we need the following 'remove()'
+ perThreadControlSocket.remove();
+ }
+
+ @Override
+ public void run() {
+ // start the ring buffer and its processors
+ eventStore.start();
+ // event loop polling all data sources and performing regular housekeeping jobs
+ running.set(true);
+ long nextHousekeeping = System.currentTimeMillis(); // immediately perform first housekeeping
+ long tout = 0L;
+ while (!Thread.interrupted() && running.get() && (tout <= 0 || -1 != poller.poll(tout))) {
+ // get data from clients
+ boolean dataAvailable = true;
+ while (dataAvailable && System.currentTimeMillis() < nextHousekeeping && running.get()) {
+ dataAvailable = handleDataSourceSockets();
+ // check specificaly for control socket
+ dataAvailable |= handleControlSocket();
+ }
+
+ nextHousekeeping = clientMap.values().stream().mapToLong(DataSource::housekeeping).min().orElse(System.currentTimeMillis() + 1000);
+ tout = nextHousekeeping - System.currentTimeMillis();
+ }
+ LOGGER.atDebug().addArgument(clientMap.values()).log("poller returned negative value - abort run() - clients = {}");
+ }
+
+ public void start() {
+ new Thread(this).start(); // NOPMD - not a webapp
+ }
+
+ protected boolean handleControlSocket() {
+ final ZMsg controlMsg = ZMsg.recvMsg(controlSocket, false);
+ if (controlMsg == null) {
+ return false; // no more data available on control socket
+ }
+ if (controlMsg.size() < MIN_FRAMES_INTERNAL_MSG) { // msgType, requestId and endpoint have to be always present
+ LOGGER.atDebug().log("ignoring invalid message");
+ return true; // ignore invalid partial message
+ }
+ final DataSourceFilter.ReplyType msgType = DataSourceFilter.ReplyType.valueOf(controlMsg.pollFirst().getData()[0]);
+ final String requestId = requireNonNull(controlMsg.pollFirst()).getString(Charset.defaultCharset());
+ final String endpoint = requireNonNull(controlMsg.pollFirst()).getString(Charset.defaultCharset());
+ final byte[] filters = controlMsg.isEmpty() ? EMPTY_BYTE_ARRAY : controlMsg.pollFirst().getData();
+ final byte[] data = controlMsg.isEmpty() ? EMPTY_BYTE_ARRAY : controlMsg.pollFirst().getData();
+ final byte[] rbacToken = controlMsg.isEmpty() ? EMPTY_BYTE_ARRAY : controlMsg.pollFirst().getData();
+
+ final DataSource client = getClient(endpoint); // get client for endpoint
+ switch (msgType) {
+ case SUBSCRIBE: // subscribe: 0b, requestId, addr/dev/prop?sel&filters, [filter]
+ client.subscribe(requestId, endpoint, rbacToken); // issue get request
+ break;
+ case GET: // get: 1b, reqId, addr/dev/prop?sel&filters, [filter]
+ client.get(requestId, endpoint, filters, data, rbacToken); // issue get request
+ break;
+ case SET: // set: 2b, reqId, addr/dev/prop?sel&filters, data, add data to blocking queue instead?
+ client.set(requestId, endpoint, filters, data, rbacToken);
+ break;
+ case UNSUBSCRIBE: //unsub: 3b, reqId, endpoint
+ client.unsubscribe(requestId);
+ requestFutureMap.remove(requestId);
+ break;
+ case UNKNOWN:
+ default:
+ throw new UnsupportedOperationException("Illegal operation type");
+ }
+ return true;
+ }
+
+ protected boolean handleDataSourceSockets() {
+ boolean dataAvailable = false;
+ for (DataSource entry : clientMap.values()) {
+ final ZMsg reply = entry.getMessage();
+ if (reply == null) {
+ continue; // no data received, queue empty
+ }
+ dataAvailable = true;
+ if (reply.isEmpty()) {
+ continue; // there was data received, but only used for internal state of the client
+ }
+ // the received data consists of the following frames: replyType(byte), reqId(string), endpoint(string), dataBody(byte[])
+ eventStore.getRingBuffer().publishEvent((event, sequence) -> {
+ final String reqId = requireNonNull(reply.pollFirst()).getString(Charset.defaultCharset());
+ final ThePromisedFuture> returnFuture = requestFutureMap.get(reqId);
+ if (returnFuture.getReplyType() != DataSourceFilter.ReplyType.SUBSCRIBE) { // remove entries for one time replies
+ assert returnFuture.getInternalRequestID().equals(reqId)
+ : "requestID mismatch";
+ requestFutureMap.remove(reqId);
+ }
+ final Endpoint endpoint = new Endpoint(requireNonNull(reply.pollFirst()).getString(Charset.defaultCharset())); // NOPMD - need to create new Endpoint
+ event.arrivalTimeStamp = System.currentTimeMillis();
+ event.payload = new SharedPointer<>(); // NOPMD - need to create new shared pointer instance
+ event.payload.set(reply); // ZMsg containing header, body and exception frame
+ final DataSourceFilter dataSourceFilter = event.getFilter(DataSourceFilter.class);
+ dataSourceFilter.future = returnFuture;
+ dataSourceFilter.eventType = DataSourceFilter.ReplyType.SUBSCRIBE;
+ dataSourceFilter.device = endpoint.getDevice();
+ dataSourceFilter.property = endpoint.getProperty();
+ dataSourceFilter.context = endpoint.getSelector();
+ });
+ }
+ return dataAvailable;
+ }
+
+ protected ThePromisedFuture newFuture(final String endpoint, final Map requestFilter, final Object requestBody, final Class requestedDomainObjType, final DataSourceFilter.ReplyType replyType, final String requestId) {
+ final ThePromisedFuture requestFuture = new ThePromisedFuture<>(endpoint, requestFilter, requestBody, requestedDomainObjType, replyType, requestId);
+ final Object oldEntry = requestFutureMap.put(requestId, requestFuture);
+ assert oldEntry == null : "requestID '" + requestId + "' already present in requestFutureMap";
+ return requestFuture;
+ }
+
+ private DataSource getClient(final String endpoint) {
+ return clientMap.computeIfAbsent(new Endpoint(endpoint).getAddress(), requestedEndPoint -> {
+ final DataSource dataSource = DataSource.getFactory(requestedEndPoint).newInstance(context, endpoint, Duration.ofMillis(100), Long.toString(internalReqIdGenerator.incrementAndGet()));
+ poller.register(dataSource.getSocket(), ZMQ.Poller.POLLIN);
+ return dataSource;
+ });
+ }
+
+ public static class ThePromisedFuture extends CustomFuture { // NOPMD - no need for setters/getters here
+ private final String endpoint;
+ private final Map requestFilter;
+ private final Object requestBody;
+ private final Class requestedDomainObjType;
+ private final DataSourceFilter.ReplyType replyType;
+ private final String internalRequestID;
+
+ public ThePromisedFuture(final String endpoint, final Map requestFilter, final Object requestBody, final Class requestedDomainObjType, final DataSourceFilter.ReplyType replyType, final String internalRequestID) {
+ super();
+ this.endpoint = endpoint;
+ this.requestFilter = requestFilter;
+ this.requestBody = requestBody;
+ this.requestedDomainObjType = requestedDomainObjType;
+ this.replyType = replyType;
+ this.internalRequestID = internalRequestID;
+ }
+
+ public String getEndpoint() {
+ return endpoint;
+ }
+
+ public DataSourceFilter.ReplyType getReplyType() {
+ return replyType;
+ }
+
+ public Object getRequestBody() {
+ return requestBody;
+ }
+
+ public Map getRequestFilter() {
+ return requestFilter;
+ }
+
+ public Class getRequestedDomainObjType() {
+ return requestedDomainObjType;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void castAndSetReply(final Object newValue) {
+ this.setReply((R) newValue);
+ }
+
+ public String getInternalRequestID() {
+ return internalRequestID;
+ }
+ }
+}
diff --git a/client/src/main/java/io/opencmw/client/Endpoint.java b/client/src/main/java/io/opencmw/client/Endpoint.java
new file mode 100644
index 00000000..74b97db6
--- /dev/null
+++ b/client/src/main/java/io/opencmw/client/Endpoint.java
@@ -0,0 +1,137 @@
+package io.opencmw.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Endpoint helper class to deserialise endpoint strings.
+ * Uses lazy initialisation to prevent doing unnecessary work or doing the same thing twice.
+ */
+public class Endpoint { // NOPMD data class
+ private static final String DEFAULT_SELECTOR = "";
+ public static final String FILTER_TYPE_LONG = "long:";
+ public static final String FILTER_TYPE_INT = "int:";
+ public static final String FILTER_TYPE_BOOL = "bool:";
+ public static final String FILTER_TYPE_DOUBLE = "double:";
+ public static final String FILTER_TYPE_FLOAT = "float:";
+ private final String value;
+ private String protocol;
+ private String address;
+ private String device;
+ private String property;
+ private String selector;
+ private Map filters;
+
+ public Endpoint(final String endpoint) {
+ this.value = endpoint;
+ }
+
+ public String getProtocol() {
+ if (protocol == null) {
+ parse();
+ }
+ return protocol;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ public String getAddress() {
+ if (protocol == null) {
+ parse();
+ }
+ return address;
+ }
+
+ public String getDevice() {
+ if (protocol == null) {
+ parse();
+ }
+ return device;
+ }
+
+ public String getSelector() {
+ if (protocol == null) {
+ parse();
+ }
+ return selector;
+ }
+
+ public String getProperty() {
+ return property;
+ }
+
+ public Map getFilters() {
+ return filters;
+ }
+
+ public String getEndpointForContext(final String context) {
+ if (context == null || context.equals("")) {
+ return value;
+ }
+ parse();
+ final String filterString = filters.entrySet().stream() //
+ .map(e -> {
+ String val;
+ if (e.getValue() instanceof String) {
+ val = (String) e.getValue();
+ } else if (e.getValue() instanceof Integer) {
+ val = FILTER_TYPE_INT + e.getValue();
+ } else if (e.getValue() instanceof Long) {
+ val = FILTER_TYPE_LONG + e.getValue();
+ } else if (e.getValue() instanceof Boolean) {
+ val = FILTER_TYPE_BOOL + e.getValue();
+ } else if (e.getValue() instanceof Double) {
+ val = FILTER_TYPE_DOUBLE + e.getValue();
+ } else if (e.getValue() instanceof Float) {
+ val = FILTER_TYPE_FLOAT + e.getValue();
+ } else {
+ throw new UnsupportedOperationException("Data type not supported in endpoint filters");
+ }
+ return e.getKey() + '=' + val;
+ }) //
+ .collect(Collectors.joining("&"));
+ return address + '/' + device + '/' + property + "?ctx=" + context + '&' + filterString;
+ }
+
+ private void parse() {
+ final String[] tmp = value.split("\\?", 2); // split into address/dev/prop and sel+filters part
+ final String[] adp = tmp[0].split("/"); // split access point into parts
+ device = adp[adp.length - 2]; // get device name from access point
+ property = adp[adp.length - 1]; // get property name from access point
+ address = tmp[0].substring(0, tmp[0].length() - device.length() - property.length() - 2);
+ protocol = address.substring(0, address.indexOf("://") + 3);
+ filters = new HashMap<>();
+ selector = DEFAULT_SELECTOR;
+ filters = new HashMap<>();
+
+ final String paramString = tmp[1];
+ final String[] kvpairs = paramString.split("&"); // split into individual key/value pairs
+ for (final String pair : kvpairs) {
+ String[] splitpair = pair.split("=", 2); // split at first equal sign
+ if (splitpair.length != 2) {
+ continue;
+ }
+ if ("ctx".equals(splitpair[0])) {
+ selector = splitpair[1];
+ } else {
+ if (splitpair[1].startsWith(FILTER_TYPE_INT)) {
+ filters.put(splitpair[0], Integer.valueOf(splitpair[1].substring(FILTER_TYPE_INT.length())));
+ } else if (splitpair[1].startsWith(FILTER_TYPE_LONG)) {
+ filters.put(splitpair[0], Long.valueOf(splitpair[1].substring(FILTER_TYPE_LONG.length())));
+ } else if (splitpair[1].startsWith(FILTER_TYPE_BOOL)) {
+ filters.put(splitpair[0], Boolean.valueOf(splitpair[1].substring(FILTER_TYPE_BOOL.length())));
+ } else if (splitpair[1].startsWith(FILTER_TYPE_DOUBLE)) {
+ filters.put(splitpair[0], Double.valueOf(splitpair[1].substring(FILTER_TYPE_DOUBLE.length())));
+ } else if (splitpair[1].startsWith(FILTER_TYPE_FLOAT)) {
+ filters.put(splitpair[0], Float.valueOf(splitpair[1].substring(FILTER_TYPE_FLOAT.length())));
+ } else {
+ filters.put(splitpair[0], splitpair[1]);
+ }
+ }
+ }
+ }
+}
diff --git a/client/src/main/java/io/opencmw/client/cmwlight/CmwLightDataSource.java b/client/src/main/java/io/opencmw/client/cmwlight/CmwLightDataSource.java
new file mode 100644
index 00000000..d92ff899
--- /dev/null
+++ b/client/src/main/java/io/opencmw/client/cmwlight/CmwLightDataSource.java
@@ -0,0 +1,545 @@
+package io.opencmw.client.cmwlight;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.SocketType;
+import org.zeromq.ZContext;
+import org.zeromq.ZFrame;
+import org.zeromq.ZMQ;
+import org.zeromq.ZMQException;
+import org.zeromq.ZMsg;
+
+import io.opencmw.client.DataSource;
+import io.opencmw.client.Endpoint;
+import io.opencmw.serialiser.IoSerialiser;
+import io.opencmw.serialiser.spi.CmwLightSerialiser;
+
+/**
+ * A lightweight implementation of the CMW RDA3 client part.
+ * Reads all sockets from a single Thread, which can also be embedded into other event loops.
+ * Manages connection state and automatically reconnects broken connections and subscriptions.
+ */
+public class CmwLightDataSource extends DataSource { // NOPMD - class should probably be smaller
+ private static final Logger LOGGER = LoggerFactory.getLogger(CmwLightDataSource.class);
+ private static final AtomicLong CONNECTION_ID_GENERATOR = new AtomicLong(0); // global counter incremented for each connection
+ private static final AtomicInteger REQUEST_ID_GENERATOR = new AtomicInteger(0);
+ public static final String RDA_3_PROTOCOL = "rda3://";
+ public static final Factory FACTORY = new Factory() {
+ @Override
+ public boolean matches(final String endpoint) {
+ return endpoint.startsWith(RDA_3_PROTOCOL);
+ }
+
+ @Override
+ public Class extends IoSerialiser> getMatchingSerialiserType(final String endpoint) {
+ return CmwLightSerialiser.class;
+ }
+
+ @Override
+ public DataSource newInstance(final ZContext context, final String endpoint, final Duration timeout, final String clientId) {
+ return new CmwLightDataSource(context, endpoint, clientId);
+ }
+ };
+ protected static final int HEARTBEAT_INTERVAL = 1000; // time between to heartbeats in ms
+ protected static final int HEARTBEAT_ALLOWED_MISSES = 3; // number of heartbeats which can be missed before resetting the conection
+ protected static final long SUBSCRIPTION_TIMEOUT = 1000; // maximum time after which a connection should be reconnected
+ private static DirectoryLightClient directoryLightClient;
+ protected final AtomicInteger channelId = new AtomicInteger(0); // connection local counter incremented for each channel
+ protected final ZContext context;
+ protected final ZMQ.Socket socket;
+ protected final AtomicReference connectionState = new AtomicReference<>(ConnectionState.DISCONNECTED);
+ private final String address;
+ protected final String sessionId;
+ protected long connectionId;
+ protected final Map subscriptions = new HashMap<>(); // all subscriptions added to the server // NOPMD - only accessed from main thread
+ protected final Map subscriptionsByReqId = new HashMap<>(); // all subscriptions added to the server // NOPMD - only accessed from main thread
+ protected final Map replyIdMap = new HashMap<>(); // all acknowledged subscriptions by their reply id // NOPMD - only accessed from main thread
+ protected long lastHbReceived = -1;
+ protected long lastHbSent = -1;
+ protected int backOff = 20;
+ private final Queue queuedRequests = new LinkedBlockingQueue<>();
+ private final Map pendingRequests = new HashMap<>(); // NOPMD - only accessed from main thread
+ private String connectedAddress = "";
+
+ public CmwLightDataSource(final ZContext context, final String endpoint, final String clientId) {
+ super(endpoint);
+ LOGGER.atTrace().addArgument(endpoint).log("connecting to: {}");
+ this.context = context;
+ this.socket = context.createSocket(SocketType.DEALER);
+ this.sessionId = getSessionId(clientId);
+ this.address = new Endpoint(endpoint).getAddress();
+ }
+
+ public static DirectoryLightClient getDirectoryLightClient() {
+ return directoryLightClient;
+ }
+
+ public static void setDirectoryLightClient(final DirectoryLightClient directoryLightClient) {
+ CmwLightDataSource.directoryLightClient = directoryLightClient;
+ }
+
+ public CmwLightMessage receiveData() {
+ // receive data
+ try {
+ final ZMsg data = ZMsg.recvMsg(socket, ZMQ.DONTWAIT);
+ if (data == null) {
+ return null;
+ }
+ return CmwLightProtocol.parseMsg(data);
+ } catch (CmwLightProtocol.RdaLightException e) {
+ LOGGER.atDebug().setCause(e).log("error parsing cmw light reply: ");
+ return null;
+ }
+ }
+ @Override
+ public ZMsg getMessage() { // return maintenance objects instead of replies
+ final long currentTime = System.currentTimeMillis(); // NOPMD
+ CmwLightMessage reply = receiveData();
+ if (reply == null) {
+ return null;
+ }
+ switch (reply.messageType) {
+ case SERVER_CONNECT_ACK:
+ if (connectionState.get().equals(ConnectionState.CONNECTING)) {
+ LOGGER.atTrace().addArgument(connectedAddress).log("Connected to server: {}");
+ connectionState.set(ConnectionState.CONNECTED);
+ lastHbReceived = currentTime;
+ backOff = 20; // reset back-off time
+ } else {
+ LOGGER.atWarn().addArgument(reply).log("ignoring unsolicited connection acknowledgement: {}");
+ }
+ return new ZMsg();
+ case SERVER_HB:
+ if (connectionState.get() != ConnectionState.CONNECTED) {
+ LOGGER.atWarn().addArgument(reply).log("ignoring heartbeat received before connection established: {}");
+ return new ZMsg();
+ }
+ lastHbReceived = currentTime;
+ return new ZMsg();
+ case SERVER_REP:
+ if (connectionState.get() != ConnectionState.CONNECTED) {
+ LOGGER.atWarn().addArgument(reply).log("ignoring data received before connection established: {}");
+ return new ZMsg();
+ }
+ lastHbReceived = currentTime;
+ return handleServerReply(reply, currentTime);
+ case CLIENT_CONNECT:
+ case CLIENT_REQ:
+ case CLIENT_HB:
+ default:
+ LOGGER.atWarn().addArgument(reply).log("ignoring client message from server: {}");
+ return new ZMsg();
+ }
+ }
+
+ private ZMsg handleServerReply(final CmwLightMessage reply, final long currentTime) { //NOPMD
+ final ZMsg result = new ZMsg();
+ switch (reply.requestType) {
+ case REPLY:
+ Request requestForReply = pendingRequests.remove(reply.id);
+ result.add(requestForReply.requestId);
+ result.add(new Endpoint(requestForReply.endpoint).getEndpointForContext(reply.dataContext.cycleName));
+ result.add(new ZFrame(new byte[0])); // header
+ result.add(reply.bodyData); // body
+ result.add(new ZFrame(new byte[0])); // exception
+ return result;
+ case EXCEPTION:
+ final Request requestForException = pendingRequests.remove(reply.id);
+ result.add(requestForException.requestId);
+ result.add(requestForException.endpoint);
+ result.add(new ZFrame(new byte[0])); // header
+ result.add(new ZFrame(new byte[0])); // body
+ result.add(reply.exceptionMessage.message); // exception
+ return result;
+ case SUBSCRIBE:
+ final long id = reply.id;
+ final Subscription sub = subscriptions.get(id);
+ sub.updateId = (long) reply.options.get(CmwLightProtocol.FieldName.SOURCE_ID_TAG.value());
+ replyIdMap.put(sub.updateId, sub);
+ sub.subscriptionState = SubscriptionState.SUBSCRIBED;
+ LOGGER.atDebug().addArgument(sub.device).addArgument(sub.property).log("subscription successful: {}/{}");
+ sub.backOff = 20;
+ return result;
+ case UNSUBSCRIBE:
+ // successfully removed subscription
+ final Subscription subscriptionForUnsub = subscriptions.remove(reply.id);
+ subscriptionsByReqId.remove(subscriptionForUnsub.idString);
+ replyIdMap.remove(subscriptionForUnsub.updateId);
+ return result;
+ case NOTIFICATION_DATA:
+ final Subscription subscriptionForNotification = replyIdMap.get(reply.id);
+ if (subscriptionForNotification == null) {
+ LOGGER.atInfo().addArgument(reply.toString()).log("Got unsolicited subscription data: {}");
+ return result;
+ }
+ result.add(subscriptionForNotification.idString);
+ result.add(new Endpoint(subscriptionForNotification.endpoint).getEndpointForContext(reply.dataContext.cycleName));
+ result.add(new ZFrame(new byte[0])); // header
+ result.add(reply.bodyData); // body
+ result.add(new ZFrame(new byte[0])); // exception
+ return result;
+ case NOTIFICATION_EXC:
+ final Subscription subscriptionForNotifyExc = replyIdMap.get(reply.id);
+ if (subscriptionForNotifyExc == null) {
+ LOGGER.atInfo().addArgument(reply.toString()).log("Got unsolicited subscription notification error: {}");
+ return result;
+ }
+ result.add(subscriptionForNotifyExc.idString);
+ result.add(subscriptionForNotifyExc.endpoint);
+ result.add(new ZFrame(new byte[0])); // header
+ result.add(new ZFrame(new byte[0])); // body
+ result.add(reply.exceptionMessage.message); // exception
+ return result;
+ case SUBSCRIBE_EXCEPTION:
+ final Subscription subForSubExc = subscriptions.get(reply.id);
+ subForSubExc.subscriptionState = SubscriptionState.UNSUBSCRIBED;
+ subForSubExc.timeoutValue = currentTime + subForSubExc.backOff;
+ subForSubExc.backOff *= 2;
+ LOGGER.atDebug().addArgument(subForSubExc.device).addArgument(subForSubExc.property).log("exception during subscription, retrying: {}/{}");
+ result.add(subForSubExc.idString);
+ result.add(subForSubExc.endpoint);
+ result.add(new ZFrame(new byte[0])); // header
+ result.add(new ZFrame(new byte[0])); // body
+ result.add(reply.exceptionMessage.message); // exception
+ return result;
+ // unsupported or non-actionable replies
+ case GET:
+ case SET:
+ case CONNECT:
+ case EVENT:
+ case SESSION_CONFIRM:
+ default:
+ return result;
+ }
+ }
+
+ public enum ConnectionState {
+ DISCONNECTED,
+ CONNECTING,
+ CONNECTED
+ }
+
+ @Override
+ public void get(final String requestId, final String endpoint, final byte[] filters, final byte[] data, final byte[] rbacToken) {
+ final Request request = new Request(CmwLightProtocol.RequestType.GET, requestId, endpoint, filters, data, rbacToken);
+ queuedRequests.add(request);
+ }
+
+ @Override
+ public void set(final String requestId, final String endpoint, final byte[] filters, final byte[] data, final byte[] rbacToken) {
+ final Request request = new Request(CmwLightProtocol.RequestType.SET, requestId, endpoint, filters, data, rbacToken);
+ queuedRequests.add(request);
+ }
+
+ @Override
+ public void subscribe(final String reqId, final String endpoint, final byte[] rbacToken) {
+ final Endpoint ep = new Endpoint(endpoint);
+ final Subscription sub = new Subscription(endpoint, ep.getDevice(), ep.getProperty(), ep.getSelector(), ep.getFilters());
+ sub.idString = reqId;
+ subscriptions.put(sub.id, sub);
+ subscriptionsByReqId.put(reqId, sub);
+ }
+
+ @Override
+ public void unsubscribe(final String reqId) {
+ subscriptionsByReqId.get(reqId).subscriptionState = SubscriptionState.CANCELED;
+ }
+
+ public ConnectionState getConnectionState() {
+ return connectionState.get();
+ }
+
+ public ZContext getContext() {
+ return context;
+ }
+
+ @Override
+ public ZMQ.Socket getSocket() {
+ return socket;
+ }
+
+ @Override
+ protected Factory getFactory() {
+ return FACTORY;
+ }
+
+ private String getIdentity() {
+ String hostname;
+ try {
+ hostname = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ hostname = "localhost";
+ }
+ final long processId = ProcessHandle.current().pid();
+ connectionId = CONNECTION_ID_GENERATOR.incrementAndGet();
+ final int chId = this.channelId.incrementAndGet();
+ return hostname + '/' + processId + '/' + connectionId + '/' + chId;
+ }
+
+ private String getSessionId(final String clientId) {
+ return "cmwLightClient{pid=" + ProcessHandle.current().pid() + ", conn=" + connectionId + ", clientId=" + clientId + '}'; // Todo: create identification string, cmw uses string with user/app name, pid, etc
+ }
+
+ public void connect() {
+ if (connectionState.getAndSet(ConnectionState.CONNECTING) != ConnectionState.DISCONNECTED) {
+ return; // already connected
+ }
+ String address = this.address.startsWith(RDA_3_PROTOCOL) ? this.address.substring(RDA_3_PROTOCOL.length()) : this.address;
+ if (!address.contains(":")) {
+ try {
+ DirectoryLightClient.Device device = directoryLightClient.getDeviceInfo(Collections.singletonList(address)).get(0);
+ LOGGER.atTrace().addArgument(address).addArgument(device).log("resolved address for device {}: {}");
+ address = device.servers.stream().findFirst().orElseThrow().get("Address:");
+ } catch (NullPointerException | NoSuchElementException | DirectoryLightClient.DirectoryClientException e) { // NOPMD - directory client must be refactored anyway
+ LOGGER.atDebug().addArgument(e.getMessage()).log("Error resolving device from nameserver, using address from endpoint. Error was: {}");
+ backOff = backOff * 2;
+ connectionState.set(ConnectionState.DISCONNECTED);
+ return;
+ }
+ }
+ lastHbSent = System.currentTimeMillis();
+ try {
+ final String identity = getIdentity();
+ connectedAddress = "tcp://" + address;
+ LOGGER.atDebug().addArgument(connectedAddress).addArgument(identity).log("connecting to: {} with identity {}");
+ socket.setIdentity(identity.getBytes()); // hostname/process/id/channel
+ socket.connect(connectedAddress);
+ CmwLightProtocol.sendMsg(socket, CmwLightMessage.connect(CmwLightProtocol.VERSION));
+ } catch (ZMQException | CmwLightProtocol.RdaLightException e) {
+ LOGGER.atDebug().setCause(e).log("failed to connect: ");
+ backOff = backOff * 2;
+ connectionState.set(ConnectionState.DISCONNECTED);
+ }
+ }
+
+ private void disconnect() {
+ LOGGER.atDebug().addArgument(connectedAddress).log("disconnecting {}");
+ connectionState.set(ConnectionState.DISCONNECTED);
+ try {
+ socket.disconnect(connectedAddress);
+ } catch (ZMQException e) {
+ LOGGER.atError().setCause(e).log("Failed to disconnect socket");
+ }
+ // disconnect/reset subscriptions
+ for (Subscription sub : subscriptions.values()) {
+ sub.subscriptionState = SubscriptionState.UNSUBSCRIBED;
+ }
+ }
+
+ @Override
+ public long housekeeping() {
+ final long currentTime = System.currentTimeMillis();
+ switch (connectionState.get()) {
+ case DISCONNECTED: // reconnect after adequate back off
+ if (currentTime > lastHbSent + backOff) {
+ LOGGER.atTrace().addArgument(address).log("Connecting to {}");
+ connect();
+ }
+ return lastHbSent + backOff;
+ case CONNECTING:
+ if (currentTime > lastHbSent + HEARTBEAT_INTERVAL * HEARTBEAT_ALLOWED_MISSES) { // connect timed out -> increase back of and retry
+ backOff = backOff * 2;
+ lastHbSent = currentTime;
+ LOGGER.atTrace().addArgument(connectedAddress).addArgument(backOff).log("Connection timed out for {}, retrying in {} ms");
+ disconnect();
+ }
+ return lastHbSent + HEARTBEAT_INTERVAL * HEARTBEAT_ALLOWED_MISSES;
+ case CONNECTED:
+ Request request;
+ while ((request = queuedRequests.poll()) != null) {
+ pendingRequests.put(request.id, request);
+ sendRequest(request);
+ }
+ if (currentTime > lastHbSent + HEARTBEAT_INTERVAL) { // check for heartbeat interval
+ // send Heartbeats
+ sendHeartBeat();
+ lastHbSent = currentTime;
+ // check if heartbeat was received
+ if (lastHbReceived + HEARTBEAT_INTERVAL * HEARTBEAT_ALLOWED_MISSES < currentTime) {
+ LOGGER.atDebug().addArgument(backOff).log("Connection timed out, reconnecting in {} ms");
+ disconnect();
+ return HEARTBEAT_INTERVAL;
+ }
+ // check timeouts of connection/subscription requests
+ for (Subscription sub : subscriptions.values()) {
+ updateSubscription(currentTime, sub);
+ }
+ }
+ return lastHbSent + HEARTBEAT_INTERVAL;
+ default:
+ throw new IllegalStateException("unexpected connection state: " + connectionState.get());
+ }
+ }
+
+ private void sendRequest(final Request request) {
+ // Filters and data are already serialised but the protocol saves them deserialised :/
+ // final ZFrame data = request.data == null ? new ZFrame(new byte[0]) : new ZFrame(request.data);
+ // final ZFrame filters = request.filters == null ? new ZFrame(new byte[0]) : new ZFrame(request.filters);
+ final Endpoint requestEndpoint = new Endpoint(request.endpoint);
+
+ try {
+ switch (request.requestType) {
+ case GET:
+ CmwLightProtocol.sendMsg(socket, CmwLightMessage.getRequest(
+ sessionId, request.id, requestEndpoint.getDevice(), requestEndpoint.getProperty(),
+ new CmwLightMessage.RequestContext(requestEndpoint.getSelector(), requestEndpoint.getFilters(), null)));
+ break;
+ case SET:
+ Objects.requireNonNull(request.data, "Data for set cannot be null");
+ CmwLightProtocol.sendMsg(socket, CmwLightMessage.setRequest(
+ sessionId, request.id, requestEndpoint.getDevice(), requestEndpoint.getProperty(),
+ new ZFrame(request.data),
+ new CmwLightMessage.RequestContext(requestEndpoint.getSelector(), requestEndpoint.getFilters(), null)));
+ break;
+ default:
+ throw new CmwLightProtocol.RdaLightException("Message of unknown type");
+ }
+ } catch (CmwLightProtocol.RdaLightException e) {
+ LOGGER.atDebug().setCause(e).log("Error sending get request:");
+ }
+ }
+
+ private void updateSubscription(final long currentTime, final Subscription sub) {
+ switch (sub.subscriptionState) {
+ case SUBSCRIBING:
+ // check timeout
+ if (currentTime > sub.timeoutValue) {
+ sub.subscriptionState = SubscriptionState.UNSUBSCRIBED;
+ sub.timeoutValue = currentTime + sub.backOff;
+ sub.backOff = sub.backOff * 2; // exponential back of
+ LOGGER.atDebug().addArgument(sub.device).addArgument(sub.property).log("subscription timed out, retrying: {}/{}");
+ }
+ break;
+ case UNSUBSCRIBED:
+ if (currentTime > sub.timeoutValue) {
+ LOGGER.atDebug().addArgument(sub.device).addArgument(sub.property).log("subscribing {}/{}");
+ sendSubscribe(sub);
+ }
+ break;
+ case SUBSCRIBED:
+ case UNSUBSCRIBE_SENT:
+ // do nothing
+ break;
+ case CANCELED:
+ sendUnsubscribe(sub);
+ break;
+ default:
+ throw new IllegalStateException("unexpected subscription state: " + sub.subscriptionState);
+ }
+ }
+
+ public void sendHeartBeat() {
+ try {
+ CmwLightProtocol.sendMsg(socket, CmwLightMessage.CLIENT_HB);
+ } catch (CmwLightProtocol.RdaLightException e) {
+ LOGGER.atDebug().setCause(e).log("Error sending heartbeat");
+ }
+ }
+
+ private void sendSubscribe(final Subscription sub) {
+ if (!sub.subscriptionState.equals(SubscriptionState.UNSUBSCRIBED)) {
+ return; // already subscribed/subscription in progress
+ }
+ try {
+ CmwLightProtocol.sendMsg(socket, CmwLightMessage.subscribeRequest(
+ sessionId, sub.id, sub.device, sub.property,
+ Map.of(CmwLightProtocol.FieldName.SESSION_BODY_TAG.value(), Collections.emptyMap()),
+ new CmwLightMessage.RequestContext(sub.selector, sub.filters, null),
+ CmwLightProtocol.UpdateType.IMMEDIATE_UPDATE));
+ sub.subscriptionState = SubscriptionState.SUBSCRIBING;
+ sub.timeoutValue = System.currentTimeMillis() + SUBSCRIPTION_TIMEOUT;
+ } catch (CmwLightProtocol.RdaLightException e) {
+ LOGGER.atDebug().setCause(e).log("Error subscribing to property:");
+ sub.timeoutValue = System.currentTimeMillis() + sub.backOff;
+ sub.backOff *= 2;
+ }
+ }
+
+ private void sendUnsubscribe(final Subscription sub) {
+ try {
+ CmwLightProtocol.sendMsg(socket, CmwLightMessage.unsubscribeRequest(
+ sessionId, sub.updateId, sub.device, sub.property,
+ Map.of(CmwLightProtocol.FieldName.SESSION_BODY_TAG.value(), Collections.emptyMap()),
+ CmwLightProtocol.UpdateType.IMMEDIATE_UPDATE));
+ sub.subscriptionState = SubscriptionState.UNSUBSCRIBE_SENT;
+ } catch (CmwLightProtocol.RdaLightException e) {
+ LOGGER.atError().addArgument(sub.property).log("failed to unsubscribe ");
+ }
+ }
+
+ public static class Subscription {
+ private final long id = REQUEST_ID_GENERATOR.incrementAndGet();
+ public final String property;
+ public final String device;
+ public final String selector;
+ public final Map filters;
+ public final String endpoint;
+ public SubscriptionState subscriptionState = SubscriptionState.UNSUBSCRIBED;
+ public int backOff = 20;
+ public long updateId = -1;
+ public long timeoutValue = -1;
+ public String idString = "";
+
+ public Subscription(final String endpoint, final String device, final String property, final String selector, final Map filters) {
+ this.endpoint = endpoint;
+ this.property = property;
+ this.device = device;
+ this.selector = selector;
+ this.filters = filters;
+ }
+
+ @Override
+ public String toString() {
+ return "Subscription{"
+ + "property='" + property + '\'' + ", device='" + device + '\'' + ", selector='" + selector + '\'' + ", filters=" + filters + ", subscriptionState=" + subscriptionState + ", backOff=" + backOff + ", id=" + id + ", updateId=" + updateId + ", timeoutValue=" + timeoutValue + '}';
+ }
+ }
+
+ public static class Request { // NOPMD - data class
+ public final byte[] filters;
+ public final byte[] data;
+ public final long id;
+ private final String requestId;
+ private final String endpoint;
+ private final byte[] rbacToken;
+ public final CmwLightProtocol.RequestType requestType;
+
+ public Request(final CmwLightProtocol.RequestType requestType,
+ final String requestId,
+ final String endpoint,
+ final byte[] filters, // NOPMD - zero copy contract
+ final byte[] data, // NOPMD - zero copy contract
+ final byte[] rbacToken // NOPMD - zero copy contract
+ ) {
+ this.requestType = requestType;
+ this.id = REQUEST_ID_GENERATOR.incrementAndGet();
+ this.requestId = requestId;
+ this.endpoint = endpoint;
+ this.filters = filters;
+ this.data = data;
+ this.rbacToken = rbacToken;
+ }
+ }
+
+ public enum SubscriptionState {
+ UNSUBSCRIBED,
+ SUBSCRIBING,
+ SUBSCRIBED,
+ CANCELED,
+ UNSUBSCRIBE_SENT
+ }
+}
diff --git a/client/src/main/java/io/opencmw/client/cmwlight/CmwLightMessage.java b/client/src/main/java/io/opencmw/client/cmwlight/CmwLightMessage.java
new file mode 100644
index 00000000..640d498b
--- /dev/null
+++ b/client/src/main/java/io/opencmw/client/cmwlight/CmwLightMessage.java
@@ -0,0 +1,427 @@
+package io.opencmw.client.cmwlight;
+
+import java.util.Map;
+import java.util.Objects;
+
+import org.zeromq.ZFrame;
+
+/**
+ * Data representation for all Messages exchanged between CMW client and server
+ */
+@SuppressWarnings({ "PMD.ExcessivePublicCount", "PMD.TooManyMethods", "PMD.TooManyFields" }) // - the nature of this class definition
+public class CmwLightMessage {
+ // general fields
+ public CmwLightProtocol.MessageType messageType;
+
+ // Connection Req/Ack
+ public String version;
+
+ // header data
+ public CmwLightProtocol.RequestType requestType;
+ public long id;
+ public String deviceName;
+ public CmwLightProtocol.UpdateType updateType;
+ public String sessionId;
+ public String propertyName;
+ public Map options;
+ public Map data;
+
+ // additional data
+ public ZFrame bodyData;
+ public ExceptionMessage exceptionMessage;
+ public RequestContext requestContext;
+ public DataContext dataContext;
+
+ // Subscription Update
+ public long notificationId;
+
+ // subscription established
+ public long sourceId;
+ public Map sessionBody;
+
+ // static instances for low level message types
+ public static final CmwLightMessage SERVER_HB = new CmwLightMessage(CmwLightProtocol.MessageType.SERVER_HB);
+ public static final CmwLightMessage CLIENT_HB = new CmwLightMessage(CmwLightProtocol.MessageType.CLIENT_HB);
+ // static functions to get certain message types
+ public static CmwLightMessage connectAck(final String version) {
+ final CmwLightMessage msg = new CmwLightMessage(CmwLightProtocol.MessageType.SERVER_CONNECT_ACK);
+ msg.version = version;
+ return msg;
+ }
+
+ public static CmwLightMessage connect(final String version) {
+ final CmwLightMessage msg = new CmwLightMessage(CmwLightProtocol.MessageType.CLIENT_CONNECT);
+ msg.version = version;
+ return msg;
+ }
+ public static CmwLightMessage subscribeRequest(String sessionId, long id, String device, String property, final Map options, RequestContext requestContext, CmwLightProtocol.UpdateType updateType) {
+ final CmwLightMessage msg = new CmwLightMessage(CmwLightProtocol.MessageType.CLIENT_REQ);
+ msg.requestType = CmwLightProtocol.RequestType.SUBSCRIBE;
+ msg.id = id;
+ msg.options = options;
+ msg.sessionId = sessionId;
+ msg.deviceName = device;
+ msg.propertyName = property;
+ msg.requestContext = requestContext;
+ msg.updateType = updateType;
+ return msg;
+ }
+ public static CmwLightMessage subscribeReply(String sessionId, long id, String device, String property, final Map options) {
+ final CmwLightMessage msg = new CmwLightMessage(CmwLightProtocol.MessageType.SERVER_REP);
+ msg.requestType = CmwLightProtocol.RequestType.SUBSCRIBE;
+ msg.id = id;
+ msg.options = options;
+ msg.sessionId = sessionId;
+ msg.deviceName = device;
+ msg.propertyName = property;
+ return msg;
+ }
+ public static CmwLightMessage unsubscribeRequest(String sessionId, long id, String device, String property, final Map options, CmwLightProtocol.UpdateType updateType) {
+ final CmwLightMessage msg = new CmwLightMessage(CmwLightProtocol.MessageType.CLIENT_REQ);
+ msg.requestType = CmwLightProtocol.RequestType.UNSUBSCRIBE;
+ msg.id = id;
+ msg.options = options;
+ msg.sessionId = sessionId;
+ msg.deviceName = device;
+ msg.propertyName = property;
+ msg.updateType = updateType;
+ return msg;
+ }
+ public static CmwLightMessage getRequest(String sessionId, long id, String device, String property, RequestContext requestContext) {
+ final CmwLightMessage msg = new CmwLightMessage();
+ msg.messageType = CmwLightProtocol.MessageType.CLIENT_REQ;
+ msg.requestType = CmwLightProtocol.RequestType.GET;
+ msg.id = id;
+ msg.sessionId = sessionId;
+ msg.deviceName = device;
+ msg.propertyName = property;
+ msg.requestContext = requestContext;
+ msg.updateType = CmwLightProtocol.UpdateType.NORMAL;
+ return msg;
+ }
+
+ public static CmwLightMessage setRequest(final String sessionId, final long id, final String device, final String property, final ZFrame data, final RequestContext requestContext) {
+ final CmwLightMessage msg = new CmwLightMessage();
+ msg.messageType = CmwLightProtocol.MessageType.CLIENT_REQ;
+ msg.requestType = CmwLightProtocol.RequestType.SET;
+ msg.id = id;
+ msg.sessionId = sessionId;
+ msg.deviceName = device;
+ msg.propertyName = property;
+ msg.requestContext = requestContext;
+ msg.updateType = CmwLightProtocol.UpdateType.NORMAL;
+ msg.bodyData = data;
+ return msg;
+ }
+
+ public static CmwLightMessage exceptionReply(final String sessionId, final long id, final String device, final String property, final String message, final long contextAcqStamp, final long contextCycleStamp, final byte type) {
+ final CmwLightMessage msg = new CmwLightMessage(CmwLightProtocol.MessageType.SERVER_REP);
+ msg.requestType = CmwLightProtocol.RequestType.EXCEPTION;
+ msg.id = id;
+ msg.sessionId = sessionId;
+ msg.deviceName = device;
+ msg.propertyName = property;
+ msg.updateType = CmwLightProtocol.UpdateType.NORMAL;
+ msg.exceptionMessage = new ExceptionMessage(contextAcqStamp, contextCycleStamp, message, type);
+ return msg;
+ }
+
+ public static CmwLightMessage subscribeExceptionReply(final String sessionId, final long id, final String device, final String property, final String message, final long contextAcqStamp, final long contextCycleStamp, final byte type) {
+ final CmwLightMessage msg = new CmwLightMessage(CmwLightProtocol.MessageType.SERVER_REP);
+ msg.requestType = CmwLightProtocol.RequestType.SUBSCRIBE_EXCEPTION;
+ msg.id = id;
+ msg.sessionId = sessionId;
+ msg.deviceName = device;
+ msg.propertyName = property;
+ msg.updateType = CmwLightProtocol.UpdateType.NORMAL;
+ msg.exceptionMessage = new ExceptionMessage(contextAcqStamp, contextCycleStamp, message, type);
+ return msg;
+ }
+
+ public static CmwLightMessage notificationExceptionReply(final String sessionId, final long id, final String device, final String property, final String message, final long contextAcqStamp, final long contextCycleStamp, final byte type) {
+ final CmwLightMessage msg = new CmwLightMessage(CmwLightProtocol.MessageType.SERVER_REP);
+ msg.requestType = CmwLightProtocol.RequestType.NOTIFICATION_EXC;
+ msg.id = id;
+ msg.sessionId = sessionId;
+ msg.deviceName = device;
+ msg.propertyName = property;
+ msg.updateType = CmwLightProtocol.UpdateType.NORMAL;
+ msg.exceptionMessage = new ExceptionMessage(contextAcqStamp, contextCycleStamp, message, type);
+ return msg;
+ }
+
+ public static CmwLightMessage notificationReply(final String sessionId, final long id, final String device, final String property, final ZFrame data, final long notificationId, final DataContext requestContext, final CmwLightProtocol.UpdateType updateType) {
+ final CmwLightMessage msg = new CmwLightMessage(CmwLightProtocol.MessageType.SERVER_REP);
+ msg.requestType = CmwLightProtocol.RequestType.NOTIFICATION_DATA;
+ msg.id = id;
+ msg.sessionId = sessionId;
+ msg.deviceName = device;
+ msg.propertyName = property;
+ msg.notificationId = notificationId;
+ msg.options = Map.of(CmwLightProtocol.FieldName.NOTIFICATION_ID_TAG.value(), notificationId);
+ msg.dataContext = requestContext;
+ msg.updateType = updateType;
+ msg.bodyData = data;
+ return msg;
+ }
+
+ public static CmwLightMessage getReply(final String sessionId, final long id, final String device, final String property, final ZFrame data, final DataContext requestContext) {
+ final CmwLightMessage msg = new CmwLightMessage(CmwLightProtocol.MessageType.SERVER_REP);
+ msg.requestType = CmwLightProtocol.RequestType.REPLY;
+ msg.id = id;
+ msg.sessionId = sessionId;
+ msg.deviceName = device;
+ msg.propertyName = property;
+ msg.dataContext = requestContext;
+ msg.bodyData = data;
+ return msg;
+ }
+
+ public static CmwLightMessage sessionConfirmReply(final String sessionId, final long id, final String device, final String property, final Map options) {
+ final CmwLightMessage msg = new CmwLightMessage();
+ msg.messageType = CmwLightProtocol.MessageType.SERVER_REP;
+ msg.requestType = CmwLightProtocol.RequestType.SESSION_CONFIRM;
+ msg.id = id;
+ msg.options = options;
+ msg.sessionId = sessionId;
+ msg.deviceName = device;
+ msg.propertyName = property;
+ msg.updateType = CmwLightProtocol.UpdateType.NORMAL;
+ return msg;
+ }
+
+ public static CmwLightMessage eventReply(final String sessionId, final long id, final String device, final String property) {
+ final CmwLightMessage msg = new CmwLightMessage();
+ msg.messageType = CmwLightProtocol.MessageType.SERVER_REP;
+ msg.requestType = CmwLightProtocol.RequestType.EVENT;
+ msg.id = id;
+ msg.sessionId = sessionId;
+ msg.deviceName = device;
+ msg.propertyName = property;
+ msg.updateType = CmwLightProtocol.UpdateType.NORMAL;
+ return msg;
+ }
+
+ public static CmwLightMessage eventRequest(final String sessionId, final long id, final String device, final String property) {
+ final CmwLightMessage msg = new CmwLightMessage();
+ msg.messageType = CmwLightProtocol.MessageType.CLIENT_REQ;
+ msg.requestType = CmwLightProtocol.RequestType.EVENT;
+ msg.id = id;
+ msg.sessionId = sessionId;
+ msg.deviceName = device;
+ msg.propertyName = property;
+ msg.updateType = CmwLightProtocol.UpdateType.NORMAL;
+ return msg;
+ }
+
+ public static CmwLightMessage connectRequest(final String sessionId, final long id, final String device, final String property) {
+ final CmwLightMessage msg = new CmwLightMessage();
+ msg.messageType = CmwLightProtocol.MessageType.CLIENT_REQ;
+ msg.requestType = CmwLightProtocol.RequestType.CONNECT;
+ msg.id = id;
+ msg.sessionId = sessionId;
+ msg.deviceName = device;
+ msg.propertyName = property;
+ msg.updateType = CmwLightProtocol.UpdateType.NORMAL;
+ return msg;
+ }
+
+ protected CmwLightMessage() {
+ // Constructor only accessible from within serialiser and factory methods to only allow valid messages
+ }
+
+ protected CmwLightMessage(final CmwLightProtocol.MessageType messageType) {
+ this.messageType = messageType;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof CmwLightMessage)) {
+ return false;
+ }
+ final CmwLightMessage that = (CmwLightMessage) o;
+ return id == that.id && notificationId == that.notificationId && sourceId == that.sourceId && messageType == that.messageType && Objects.equals(version, that.version) && requestType == that.requestType && Objects.equals(deviceName, that.deviceName) && updateType == that.updateType && Objects.equals(sessionId, that.sessionId) && Objects.equals(propertyName, that.propertyName) && Objects.equals(options, that.options) && Objects.equals(data, that.data) && Objects.equals(bodyData, that.bodyData) && Objects.equals(exceptionMessage, that.exceptionMessage) && Objects.equals(requestContext, that.requestContext) && Objects.equals(dataContext, that.dataContext) && Objects.equals(sessionBody, that.sessionBody);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(messageType, version, requestType, id, deviceName, updateType, sessionId, propertyName, options, data, bodyData, exceptionMessage, requestContext, dataContext, notificationId, sourceId, sessionBody);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("CmwMessage: ");
+ switch (messageType) {
+ case CLIENT_CONNECT:
+ sb.append("Connection request, client version='").append(version).append('\'');
+ break;
+ case SERVER_CONNECT_ACK:
+ sb.append("Connection ack, server version='").append(version).append('\'');
+ break;
+ case CLIENT_HB:
+ sb.append("client heartbeat");
+ break;
+ case SERVER_HB:
+ sb.append("server heartbeat");
+ break;
+ case SERVER_REP:
+ sb.append("server reply: ").append(requestType.name());
+ case CLIENT_REQ:
+ if (messageType == CmwLightProtocol.MessageType.CLIENT_REQ) {
+ sb.append("client request: ").append(requestType.name());
+ }
+ sb.append(" id: ").append(id).append(" deviceName=").append(deviceName).append(", updateType=").append(updateType).append(", sessionId='").append(sessionId).append("', propertyName='").append(propertyName).append("', options=").append(options).append(", data=").append(data).append(", sourceId=").append(sourceId);
+ switch (requestType) {
+ case GET:
+ case SET:
+ case SUBSCRIBE:
+ case UNSUBSCRIBE:
+ sb.append("\n requestContext=").append(requestContext);
+ break;
+ case REPLY:
+ case NOTIFICATION_DATA:
+ sb.append(", notificationId=").append(notificationId).append("\n bodyData=").append(bodyData).append("\n dataContext=").append(dataContext);
+ break;
+ case EXCEPTION:
+ case NOTIFICATION_EXC:
+ case SUBSCRIBE_EXCEPTION:
+ sb.append("\n exceptionMessage=").append(exceptionMessage);
+ break;
+ case SESSION_CONFIRM:
+ sb.append(", sessionBody='").append(sessionBody).append('\'');
+ break;
+ case CONNECT:
+ case EVENT:
+ break;
+ default:
+ throw new IllegalStateException("unknown client request message type: " + messageType);
+ }
+ break;
+ default:
+ throw new IllegalStateException("unknown message type: " + messageType);
+ }
+ return sb.toString();
+ }
+
+ public static class RequestContext {
+ public String selector;
+ public Map data;
+ public Map filters;
+
+ public RequestContext(final String selector, final Map filters, final Map data) {
+ this.selector = selector;
+ this.filters = filters;
+ this.data = data;
+ }
+
+ protected RequestContext() {
+ // default constructor only available to protocol (de)serialisers
+ }
+
+ @Override
+ public String toString() {
+ return "RequestContext{"
+ + "selector='" + selector + '\'' + ", data=" + data + ", filters=" + filters + '}';
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof RequestContext)) {
+ return false;
+ }
+ final RequestContext that = (RequestContext) o;
+ return selector.equals(that.selector) && Objects.equals(data, that.data) && Objects.equals(filters, that.filters);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(selector, data, filters);
+ }
+ }
+
+ public static class DataContext {
+ public String cycleName;
+ public long cycleStamp;
+ public long acqStamp;
+ public Map data;
+
+ public DataContext(final String cycleName, final long cycleStamp, final long acqStamp, final Map data) {
+ this.cycleName = cycleName;
+ this.cycleStamp = cycleStamp;
+ this.acqStamp = acqStamp;
+ this.data = data;
+ }
+
+ protected DataContext() {
+ // allow only protocol serialiser to create empty object
+ }
+
+ @Override
+ public String toString() {
+ return "DataContext{cycleName='" + cycleName + '\'' + ", cycleStamp=" + cycleStamp + ", acqStamp=" + acqStamp + ", data=" + data + '}';
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DataContext)) {
+ return false;
+ }
+ final DataContext that = (DataContext) o;
+ return cycleStamp == that.cycleStamp && acqStamp == that.acqStamp && cycleName.equals(that.cycleName) && Objects.equals(data, that.data);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(cycleName, cycleStamp, acqStamp, data);
+ }
+ }
+
+ public static class ExceptionMessage {
+ public long contextAcqStamp;
+ public long contextCycleStamp;
+ public String message;
+ public byte type;
+
+ public ExceptionMessage(final long contextAcqStamp, final long contextCycleStamp, final String message, final byte type) {
+ this.contextAcqStamp = contextAcqStamp;
+ this.contextCycleStamp = contextCycleStamp;
+ this.message = message;
+ this.type = type;
+ }
+
+ protected ExceptionMessage() {
+ // allow only protocol serialiser to create empty object
+ }
+
+ @Override
+ public String toString() {
+ return "ExceptionMessage{contextAcqStamp=" + contextAcqStamp + ", contextCycleStamp=" + contextCycleStamp + ", message='" + message + '\'' + ", type=" + type + '}';
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ExceptionMessage)) {
+ return false;
+ }
+ final ExceptionMessage that = (ExceptionMessage) o;
+ return contextAcqStamp == that.contextAcqStamp && contextCycleStamp == that.contextCycleStamp && type == that.type && message.equals(that.message);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(contextAcqStamp, contextCycleStamp, message, type);
+ }
+ }
+}
diff --git a/client/src/main/java/io/opencmw/client/cmwlight/CmwLightProtocol.java b/client/src/main/java/io/opencmw/client/cmwlight/CmwLightProtocol.java
new file mode 100644
index 00000000..e42ba502
--- /dev/null
+++ b/client/src/main/java/io/opencmw/client/cmwlight/CmwLightProtocol.java
@@ -0,0 +1,621 @@
+package io.opencmw.client.cmwlight;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jetbrains.annotations.NotNull;
+import org.zeromq.ZFrame;
+import org.zeromq.ZMQ;
+import org.zeromq.ZMsg;
+
+import io.opencmw.serialiser.DataType;
+import io.opencmw.serialiser.FieldDescription;
+import io.opencmw.serialiser.IoBuffer;
+import io.opencmw.serialiser.IoClassSerialiser;
+import io.opencmw.serialiser.spi.CmwLightSerialiser;
+import io.opencmw.serialiser.spi.FastByteBuffer;
+import io.opencmw.serialiser.spi.WireDataFieldDescription;
+
+/**
+ * A lightweight implementation of the CMW RDA client protocol part.
+ * Serializes CmwLightMessage to ZeroMQ messages and vice versa.
+ */
+@SuppressWarnings("PMD.UnusedLocalVariable") // Unused variables are taken from the protocol and should be available for reference
+public class CmwLightProtocol { //NOPMD -- nomen est omen
+ private static final String CONTEXT_ACQ_STAMP = "ContextAcqStamp";
+ private static final String CONTEXT_CYCLE_STAMP = "ContextCycleStamp";
+ private static final String MESSAGE = "Message";
+ private static final String TYPE = "Type";
+ private static final String EMPTY_CONTEXT = "empty context data for request type: ";
+ private static final int MAX_MSG_SIZE = 4096 * 1024;
+ private static final IoBuffer IO_BUFFER = new FastByteBuffer(MAX_MSG_SIZE);
+ private static final CmwLightSerialiser SERIALISER = new CmwLightSerialiser(IO_BUFFER);
+ private static final IoClassSerialiser IO_CLASS_SERIALISER = new IoClassSerialiser(IO_BUFFER);
+ public static final String VERSION = "1.0.0"; // Protocol version used if msg.version is null or empty
+ private static final int SERIALISER_QUIRK = 100; // there seems to be a bug in the serialiser which does not update the buffer position correctly, so send more
+
+ private CmwLightProtocol() {
+ // utility class
+ }
+
+ /**
+ * The message specified by the byte contained in the first frame of a message defines what type of message is present
+ */
+ public enum MessageType {
+ SERVER_CONNECT_ACK(0x01),
+ SERVER_REP(0x02),
+ SERVER_HB(0x03),
+ CLIENT_CONNECT(0x20),
+ CLIENT_REQ(0x21),
+ CLIENT_HB(0x22);
+
+ private static final int CLIENT_API_RANGE = 0x4;
+ private static final int SERVER_API_RANGE = 0x20;
+ private final byte value;
+
+ MessageType(int value) {
+ this.value = (byte) value;
+ }
+
+ public byte value() {
+ return value;
+ }
+
+ public static MessageType of(int value) { // NOPMD -- nomen est omen
+ if (value < CLIENT_API_RANGE) {
+ return values()[value - 1];
+ } else {
+ return values()[value - SERVER_API_RANGE + CLIENT_CONNECT.ordinal()];
+ }
+ }
+ }
+
+ /**
+ * Frame Types in the descriptor (Last frame of a message containing the type of each sub message)
+ */
+ public enum FrameType {
+ HEADER(0),
+ BODY(1),
+ BODY_DATA_CONTEXT(2),
+ BODY_REQUEST_CONTEXT(3),
+ BODY_EXCEPTION(4);
+
+ private final byte value;
+
+ FrameType(int value) {
+ this.value = (byte) value;
+ }
+
+ public byte value() {
+ return value;
+ }
+ }
+
+ /**
+ * Field names for the Request Header
+ */
+ public enum FieldName {
+ EVENT_TYPE_TAG("eventType"),
+ MESSAGE_TAG("message"),
+ ID_TAG("0"),
+ DEVICE_NAME_TAG("1"),
+ REQ_TYPE_TAG("2"),
+ OPTIONS_TAG("3"),
+ CYCLE_NAME_TAG("4"),
+ ACQ_STAMP_TAG("5"),
+ CYCLE_STAMP_TAG("6"),
+ UPDATE_TYPE_TAG("7"),
+ SELECTOR_TAG("8"),
+ CLIENT_INFO_TAG("9"),
+ NOTIFICATION_ID_TAG("a"),
+ SOURCE_ID_TAG("b"),
+ FILTERS_TAG("c"),
+ DATA_TAG("x"),
+ SESSION_ID_TAG("d"),
+ SESSION_BODY_TAG("e"),
+ PROPERTY_NAME_TAG("f");
+
+ private final String name;
+
+ FieldName(String name) {
+ this.name = name;
+ }
+
+ public String value() {
+ return name;
+ }
+ }
+
+ /**
+ * request type used in request header REQ_TYPE_TAG
+ */
+ public enum RequestType {
+ GET(0),
+ SET(1),
+ CONNECT(2),
+ REPLY(3),
+ EXCEPTION(4),
+ SUBSCRIBE(5),
+ UNSUBSCRIBE(6),
+ NOTIFICATION_DATA(7),
+ NOTIFICATION_EXC(8),
+ SUBSCRIBE_EXCEPTION(9),
+ EVENT(10),
+ SESSION_CONFIRM(11);
+
+ private final byte value;
+
+ RequestType(int value) {
+ this.value = (byte) value;
+ }
+
+ public static RequestType of(int value) { // NOPMD - nomen est omen
+ return values()[value];
+ }
+
+ public byte value() {
+ return value;
+ }
+ }
+
+ /**
+ * UpdateType
+ */
+ public enum UpdateType {
+ NORMAL(0),
+ FIRST_UPDATE(1), // Initial update sent when the subscription is created.
+ IMMEDIATE_UPDATE(2); // Update sent after the value has been modified by a set call.
+
+ private final byte value;
+
+ UpdateType(int value) {
+ this.value = (byte) value;
+ }
+
+ public static UpdateType of(int value) { // NOPMD - nomen est omen
+ return values()[value];
+ }
+
+ public byte value() {
+ return value;
+ }
+ }
+
+ public static CmwLightMessage recvMsg(final ZMQ.Socket socket, int tout) throws RdaLightException {
+ return parseMsg(ZMsg.recvMsg(socket, tout));
+ }
+
+ public static CmwLightMessage parseMsg(final @NotNull ZMsg data) throws RdaLightException { // NOPMD - NPath complexity acceptable (complex protocol)
+ assert data != null : "data";
+ final ZFrame firstFrame = data.pollFirst();
+ if (firstFrame != null && Arrays.equals(firstFrame.getData(), new byte[] { MessageType.SERVER_CONNECT_ACK.value() })) {
+ final CmwLightMessage reply = new CmwLightMessage(MessageType.SERVER_CONNECT_ACK);
+ final ZFrame versionData = data.pollFirst();
+ assert versionData != null : "version data in connection acknowledgement frame";
+ reply.version = versionData.getString(Charset.defaultCharset());
+ return reply;
+ }
+ if (firstFrame != null && Arrays.equals(firstFrame.getData(), new byte[] { MessageType.CLIENT_CONNECT.value() })) {
+ final CmwLightMessage reply = new CmwLightMessage(MessageType.CLIENT_CONNECT);
+ final ZFrame versionData = data.pollFirst();
+ assert versionData != null : "version data in connection acknowledgement frame";
+ reply.version = versionData.getString(Charset.defaultCharset());
+ return reply;
+ }
+ if (firstFrame != null && Arrays.equals(firstFrame.getData(), new byte[] { MessageType.SERVER_HB.value() })) {
+ return CmwLightMessage.SERVER_HB;
+ }
+ if (firstFrame != null && Arrays.equals(firstFrame.getData(), new byte[] { MessageType.CLIENT_HB.value() })) {
+ return CmwLightMessage.CLIENT_HB;
+ }
+ byte[] descriptor = checkDescriptor(data.pollLast(), firstFrame);
+ final ZFrame headerMsg = data.poll();
+ assert headerMsg != null : "message header";
+ CmwLightMessage reply = getReplyFromHeader(firstFrame, headerMsg);
+ switch (reply.requestType) {
+ case REPLY:
+ assertDescriptor(descriptor, FrameType.HEADER, FrameType.BODY, FrameType.BODY_DATA_CONTEXT);
+ reply.bodyData = data.pollFirst();
+ if (data.isEmpty()) {
+ throw new RdaLightException(EMPTY_CONTEXT + reply.requestType);
+ }
+ reply.dataContext = parseContextData(data.pollFirst());
+ return reply;
+ case NOTIFICATION_DATA: // notification update
+ assertDescriptor(descriptor, FrameType.HEADER, FrameType.BODY, FrameType.BODY_DATA_CONTEXT);
+ if (reply.options != null && reply.options.containsKey(FieldName.NOTIFICATION_ID_TAG.value())) {
+ reply.notificationId = (long) reply.options.get(FieldName.NOTIFICATION_ID_TAG.value());
+ }
+ reply.bodyData = data.pollFirst();
+ if (data.isEmpty()) {
+ throw new RdaLightException(EMPTY_CONTEXT + reply.requestType);
+ }
+ reply.dataContext = parseContextData(data.pollFirst());
+ return reply;
+ case EXCEPTION: // exception on get/set request
+ case NOTIFICATION_EXC: // exception on notification, e.g null pointer in server notify code
+ case SUBSCRIBE_EXCEPTION: // exception on subscribe e.g. nonexistent property, wrong filters
+ assertDescriptor(descriptor, FrameType.HEADER, FrameType.BODY_EXCEPTION);
+ reply.exceptionMessage = parseExceptionMessage(data.pollFirst());
+ return reply;
+ case GET:
+ assertDescriptor(descriptor, FrameType.HEADER, FrameType.BODY_REQUEST_CONTEXT);
+ if (data.isEmpty()) {
+ throw new RdaLightException(EMPTY_CONTEXT + reply.requestType);
+ }
+ reply.requestContext = parseRequestContext(data.pollFirst());
+ return reply;
+ case SUBSCRIBE: // descriptor: [0] options: SOURCE_ID_TAG // seems to be sent after subscription is accepted
+ if (reply.messageType == MessageType.SERVER_REP) {
+ assertDescriptor(descriptor, FrameType.HEADER);
+ if (reply.options != null && reply.options.containsKey(FieldName.SOURCE_ID_TAG.value())) {
+ reply.sourceId = (long) reply.options.get(FieldName.SOURCE_ID_TAG.value());
+ }
+ } else {
+ assertDescriptor(descriptor, FrameType.HEADER, FrameType.BODY_REQUEST_CONTEXT);
+ if (data.isEmpty()) {
+ throw new RdaLightException(EMPTY_CONTEXT + reply.requestType);
+ }
+ reply.requestContext = parseRequestContext(data.pollFirst());
+ }
+ return reply;
+ case SESSION_CONFIRM: // descriptor: [0] options: SESSION_BODY_TAG
+ assertDescriptor(descriptor, FrameType.HEADER);
+ if (reply.options != null && reply.options.containsKey(FieldName.SESSION_BODY_TAG.value())) {
+ final Object subMap = reply.options.get(FieldName.SESSION_BODY_TAG.value());
+ final String fieldName = FieldName.SESSION_BODY_TAG.value();
+ if (subMap instanceof Map) {
+ @SuppressWarnings("unchecked")
+ final Map castMap = (Map) reply.options.get(fieldName);
+ reply.sessionBody = castMap;
+ } else {
+ throw new RdaLightException("field member '" + fieldName + "' not assignable to Map: " + subMap);
+ }
+ }
+ return reply;
+ case EVENT:
+ case UNSUBSCRIBE:
+ case CONNECT:
+ assertDescriptor(descriptor, FrameType.HEADER);
+ return reply;
+ case SET:
+ assertDescriptor(descriptor, FrameType.HEADER, FrameType.BODY, FrameType.BODY_REQUEST_CONTEXT);
+ reply.bodyData = data.pollFirst();
+ if (data.isEmpty()) {
+ throw new RdaLightException(EMPTY_CONTEXT + reply.requestType);
+ }
+ reply.requestContext = parseRequestContext(data.pollFirst());
+ return reply;
+ default:
+ throw new RdaLightException("received unknown or non-client request type: " + reply.requestType);
+ }
+ }
+
+ public static void sendMsg(final ZMQ.Socket socket, final CmwLightMessage msg) throws RdaLightException {
+ serialiseMsg(msg).send(socket);
+ }
+
+ public static ZMsg serialiseMsg(final CmwLightMessage msg) throws RdaLightException {
+ final ZMsg result = new ZMsg();
+ switch (msg.messageType) {
+ case SERVER_CONNECT_ACK:
+ case CLIENT_CONNECT:
+ result.add(new ZFrame(new byte[] { msg.messageType.value() }));
+ result.add(new ZFrame(msg.version == null || msg.version.isEmpty() ? VERSION : msg.version));
+ return result;
+ case CLIENT_HB:
+ case SERVER_HB:
+ result.add(new ZFrame(new byte[] { msg.messageType.value() }));
+ return result;
+ case SERVER_REP:
+ case CLIENT_REQ:
+ result.add(new byte[] { msg.messageType.value() });
+ result.add(serialiseHeader(msg));
+ switch (msg.requestType) {
+ case CONNECT:
+ case EVENT:
+ case SESSION_CONFIRM:
+ case UNSUBSCRIBE:
+ addDescriptor(result, FrameType.HEADER);
+ break;
+ case GET:
+ case SUBSCRIBE:
+ if (msg.messageType == MessageType.CLIENT_REQ) {
+ assert msg.requestContext != null : "requestContext";
+ result.add(serialiseRequestContext(msg.requestContext));
+ addDescriptor(result, FrameType.HEADER, FrameType.BODY_REQUEST_CONTEXT);
+ } else {
+ addDescriptor(result, FrameType.HEADER);
+ }
+ break;
+ case SET:
+ assert msg.bodyData != null : "bodyData";
+ assert msg.requestContext != null : "requestContext";
+ result.add(msg.bodyData);
+ result.add(serialiseRequestContext(msg.requestContext));
+ addDescriptor(result, FrameType.HEADER, FrameType.BODY, FrameType.BODY_REQUEST_CONTEXT);
+ break;
+ case REPLY:
+ case NOTIFICATION_DATA:
+ assert msg.bodyData != null : "bodyData";
+ result.add(msg.bodyData);
+ result.add(serialiseDataContext(msg.dataContext));
+ addDescriptor(result, FrameType.HEADER, FrameType.BODY, FrameType.BODY_DATA_CONTEXT);
+ break;
+ case NOTIFICATION_EXC:
+ case EXCEPTION:
+ case SUBSCRIBE_EXCEPTION:
+ assert msg.exceptionMessage != null : "exceptionMessage";
+ result.add(serialiseExceptionMessage(msg.exceptionMessage));
+ addDescriptor(result, FrameType.HEADER, FrameType.BODY_EXCEPTION);
+ break;
+ default:
+ }
+ return result;
+ default:
+ }
+
+ throw new RdaLightException("Invalid cmwMessage: " + msg);
+ }
+
+ private static ZFrame serialiseExceptionMessage(final CmwLightMessage.ExceptionMessage exceptionMessage) {
+ IO_BUFFER.reset();
+ SERIALISER.setBuffer(IO_BUFFER);
+ SERIALISER.putHeaderInfo();
+ SERIALISER.put(CONTEXT_ACQ_STAMP, exceptionMessage.contextAcqStamp);
+ SERIALISER.put(CONTEXT_CYCLE_STAMP, exceptionMessage.contextCycleStamp);
+ SERIALISER.put(MESSAGE, exceptionMessage.message);
+ SERIALISER.put(TYPE, exceptionMessage.type);
+ IO_BUFFER.flip();
+ return new ZFrame(Arrays.copyOfRange(IO_BUFFER.elements(), 0, IO_BUFFER.limit() + SERIALISER_QUIRK));
+ }
+
+ private static void addDescriptor(final ZMsg result, final FrameType... frametypes) {
+ byte[] descriptor = new byte[frametypes.length];
+ for (int i = 0; i < descriptor.length; i++) {
+ descriptor[i] = frametypes[i].value();
+ }
+ result.add(new ZFrame(descriptor));
+ }
+
+ private static ZFrame serialiseHeader(final CmwLightMessage msg) throws RdaLightException {
+ IO_BUFFER.reset();
+ SERIALISER.setBuffer(IO_BUFFER);
+ SERIALISER.putHeaderInfo();
+ SERIALISER.put(FieldName.REQ_TYPE_TAG.value(), msg.requestType.value());
+ SERIALISER.put(FieldName.ID_TAG.value(), msg.id);
+ SERIALISER.put(FieldName.DEVICE_NAME_TAG.value(), msg.deviceName);
+ SERIALISER.put(FieldName.PROPERTY_NAME_TAG.value(), msg.propertyName);
+ if (msg.updateType != null) {
+ SERIALISER.put(FieldName.UPDATE_TYPE_TAG.value(), msg.updateType.value());
+ }
+ SERIALISER.put(FieldName.SESSION_ID_TAG.value(), msg.sessionId);
+ // StartMarker marks start of Data Object
+ putMap(SERIALISER, FieldName.OPTIONS_TAG.value(), msg.options);
+ IO_BUFFER.flip();
+ return new ZFrame(Arrays.copyOfRange(IO_BUFFER.elements(), 0, IO_BUFFER.limit() + SERIALISER_QUIRK));
+ }
+
+ private static ZFrame serialiseRequestContext(final CmwLightMessage.RequestContext requestContext) throws RdaLightException {
+ IO_BUFFER.reset();
+ SERIALISER.putHeaderInfo();
+ SERIALISER.put(FieldName.SELECTOR_TAG.value(), requestContext.selector);
+ putMap(SERIALISER, FieldName.FILTERS_TAG.value(), requestContext.filters);
+ putMap(SERIALISER, FieldName.DATA_TAG.value(), requestContext.data);
+ IO_BUFFER.flip();
+ return new ZFrame(Arrays.copyOfRange(IO_BUFFER.elements(), 0, IO_BUFFER.limit() + SERIALISER_QUIRK));
+ }
+
+ private static ZFrame serialiseDataContext(final CmwLightMessage.DataContext dataContext) throws RdaLightException {
+ IO_BUFFER.reset();
+ SERIALISER.putHeaderInfo();
+ SERIALISER.put(FieldName.CYCLE_NAME_TAG.value(), dataContext.cycleName);
+ SERIALISER.put(FieldName.CYCLE_STAMP_TAG.value(), dataContext.cycleStamp);
+ SERIALISER.put(FieldName.ACQ_STAMP_TAG.value(), dataContext.acqStamp);
+ putMap(SERIALISER, FieldName.DATA_TAG.value(), dataContext.data);
+ IO_BUFFER.flip();
+ return new ZFrame(Arrays.copyOfRange(IO_BUFFER.elements(), 0, IO_BUFFER.limit() + SERIALISER_QUIRK));
+ }
+
+ private static void putMap(final CmwLightSerialiser serialiser, final String fieldName, final Map map) throws RdaLightException {
+ if (map != null) {
+ final WireDataFieldDescription dataFieldMarker = new WireDataFieldDescription(serialiser, serialiser.getParent(), -1,
+ fieldName, DataType.START_MARKER, -1, -1, -1);
+ serialiser.putStartMarker(dataFieldMarker);
+ for (final Map.Entry entry : map.entrySet()) {
+ if (entry.getValue() instanceof String) {
+ serialiser.put(entry.getKey(), (String) entry.getValue());
+ } else if (entry.getValue() instanceof Integer) {
+ serialiser.put(entry.getKey(), (Integer) entry.getValue());
+ } else if (entry.getValue() instanceof Long) {
+ serialiser.put(entry.getKey(), (Long) entry.getValue());
+ } else if (entry.getValue() instanceof Boolean) {
+ serialiser.put(entry.getKey(), (Boolean) entry.getValue());
+ } else if (entry.getValue() instanceof Map) {
+ @SuppressWarnings("unchecked")
+ final Map subMap = (Map) entry.getValue();
+ putMap(serialiser, entry.getKey(), subMap);
+ } else {
+ throw new RdaLightException("unsupported map entry type: " + entry.getValue().getClass().getCanonicalName());
+ }
+ }
+ serialiser.putEndMarker(dataFieldMarker);
+ }
+ }
+
+ private static CmwLightMessage getReplyFromHeader(final ZFrame firstFrame, final ZFrame header) throws RdaLightException {
+ CmwLightMessage reply = new CmwLightMessage(MessageType.of(firstFrame.getData()[0]));
+ IO_CLASS_SERIALISER.setDataBuffer(FastByteBuffer.wrap(header.getData()));
+ final FieldDescription headerMap;
+ try {
+ headerMap = IO_CLASS_SERIALISER.parseWireFormat().getChildren().get(0);
+ for (FieldDescription field : headerMap.getChildren()) {
+ if (field.getFieldName().equals(FieldName.REQ_TYPE_TAG.value()) && field.getType() == byte.class) {
+ reply.requestType = RequestType.of((byte) (((WireDataFieldDescription) field).data()));
+ } else if (field.getFieldName().equals(FieldName.ID_TAG.value()) && field.getType() == long.class) {
+ reply.id = (long) ((WireDataFieldDescription) field).data();
+ } else if (field.getFieldName().equals(FieldName.DEVICE_NAME_TAG.value()) && field.getType() == String.class) {
+ reply.deviceName = (String) ((WireDataFieldDescription) field).data();
+ } else if (field.getFieldName().equals(FieldName.OPTIONS_TAG.value())) {
+ reply.options = readMap(field);
+ } else if (field.getFieldName().equals(FieldName.UPDATE_TYPE_TAG.value()) && field.getType() == byte.class) {
+ reply.updateType = UpdateType.of((byte) ((WireDataFieldDescription) field).data());
+ } else if (field.getFieldName().equals(FieldName.SESSION_ID_TAG.value()) && field.getType() == String.class) {
+ reply.sessionId = (String) ((WireDataFieldDescription) field).data();
+ } else if (field.getFieldName().equals(FieldName.PROPERTY_NAME_TAG.value()) && field.getType() == String.class) {
+ reply.propertyName = (String) ((WireDataFieldDescription) field).data();
+ } else {
+ throw new RdaLightException("Unknown CMW header field: " + field.getFieldName());
+ }
+ }
+ } catch (IllegalStateException e) {
+ throw new RdaLightException("unparsable header: " + Arrays.toString(header.getData()) + "(" + header.toString() + ")", e);
+ }
+ if (reply.requestType == null) {
+ throw new RdaLightException("Header does not contain request type field");
+ }
+ return reply;
+ }
+
+ private static Map readMap(final FieldDescription field) {
+ Map result = null;
+ for (FieldDescription dataField : field.getChildren()) {
+ if (result == null) {
+ result = new HashMap<>(); // NOPMD - necessary to allocate inside loop
+ }
+ //if ( 'condition' ) {
+ // find out how to see if the field is itself a map
+ // result.put(dataField.getFieldName(), readMap(dataField))
+ // } else {
+ result.put(dataField.getFieldName(), ((WireDataFieldDescription) dataField).data());
+ //}
+ }
+ return result;
+ }
+
+ private static CmwLightMessage.ExceptionMessage parseExceptionMessage(final ZFrame exceptionBody) throws RdaLightException {
+ if (exceptionBody == null) {
+ throw new RdaLightException("malformed subscription exception");
+ }
+ final CmwLightMessage.ExceptionMessage exceptionMessage = new CmwLightMessage.ExceptionMessage();
+ IO_CLASS_SERIALISER.setDataBuffer(FastByteBuffer.wrap(exceptionBody.getData()));
+ final FieldDescription exceptionFields = IO_CLASS_SERIALISER.parseWireFormat().getChildren().get(0);
+ for (FieldDescription field : exceptionFields.getChildren()) {
+ if (CONTEXT_ACQ_STAMP.equals(field.getFieldName()) && field.getType() == long.class) {
+ exceptionMessage.contextAcqStamp = (long) ((WireDataFieldDescription) field).data();
+ } else if (CONTEXT_CYCLE_STAMP.equals(field.getFieldName()) && field.getType() == long.class) {
+ exceptionMessage.contextCycleStamp = (long) ((WireDataFieldDescription) field).data();
+ } else if (MESSAGE.equals(field.getFieldName()) && field.getType() == String.class) {
+ exceptionMessage.message = (String) ((WireDataFieldDescription) field).data();
+ } else if (TYPE.equals(field.getFieldName()) && field.getType() == byte.class) {
+ exceptionMessage.type = (byte) ((WireDataFieldDescription) field).data();
+ } else {
+ throw new RdaLightException("Unsupported field in exception body: " + field.getFieldName());
+ }
+ }
+ return exceptionMessage;
+ }
+
+ private static CmwLightMessage.RequestContext parseRequestContext(final @NotNull ZFrame contextData) throws RdaLightException {
+ assert contextData != null : "contextData";
+ CmwLightMessage.RequestContext requestContext = new CmwLightMessage.RequestContext();
+ IO_CLASS_SERIALISER.setDataBuffer(FastByteBuffer.wrap(contextData.getData()));
+ final FieldDescription contextMap;
+ try {
+ contextMap = IO_CLASS_SERIALISER.parseWireFormat().getChildren().get(0);
+ for (FieldDescription field : contextMap.getChildren()) {
+ if (field.getFieldName().equals(FieldName.SELECTOR_TAG.value()) && field.getType() == String.class) {
+ requestContext.selector = (String) ((WireDataFieldDescription) field).data();
+ } else if (field.getFieldName().equals(FieldName.FILTERS_TAG.value())) {
+ for (FieldDescription dataField : field.getChildren()) {
+ if (requestContext.filters == null) {
+ requestContext.filters = new HashMap<>(); // NOPMD - necessary to allocate inside loop
+ }
+ requestContext.filters.put(dataField.getFieldName(), ((WireDataFieldDescription) dataField).data());
+ }
+ } else if (field.getFieldName().equals(FieldName.DATA_TAG.value())) {
+ for (FieldDescription dataField : field.getChildren()) {
+ if (requestContext.data == null) {
+ requestContext.data = new HashMap<>(); // NOPMD - necessary to allocate inside loop
+ }
+ requestContext.data.put(dataField.getFieldName(), ((WireDataFieldDescription) dataField).data());
+ }
+ } else {
+ throw new UnsupportedOperationException("Unknown field: " + field.getFieldName());
+ }
+ }
+ } catch (IllegalStateException e) {
+ throw new RdaLightException("unparsable context data: " + Arrays.toString(contextData.getData()) + "(" + new String(contextData.getData()) + ")", e);
+ }
+ return requestContext;
+ }
+
+ private static CmwLightMessage.DataContext parseContextData(final @NotNull ZFrame contextData) throws RdaLightException {
+ assert contextData != null : "contextData";
+ CmwLightMessage.DataContext dataContext = new CmwLightMessage.DataContext();
+ IO_CLASS_SERIALISER.setDataBuffer(FastByteBuffer.wrap(contextData.getData()));
+ final FieldDescription contextMap;
+ try {
+ contextMap = IO_CLASS_SERIALISER.parseWireFormat().getChildren().get(0);
+ for (FieldDescription field : contextMap.getChildren()) {
+ if (field.getFieldName().equals(FieldName.CYCLE_NAME_TAG.value()) && field.getType() == String.class) {
+ dataContext.cycleName = (String) ((WireDataFieldDescription) field).data();
+ } else if (field.getFieldName().equals(FieldName.ACQ_STAMP_TAG.value()) && field.getType() == long.class) {
+ dataContext.acqStamp = (long) ((WireDataFieldDescription) field).data();
+ } else if (field.getFieldName().equals(FieldName.CYCLE_STAMP_TAG.value()) && field.getType() == long.class) {
+ dataContext.cycleStamp = (long) ((WireDataFieldDescription) field).data();
+ } else if (field.getFieldName().equals(FieldName.DATA_TAG.value())) {
+ for (FieldDescription dataField : field.getChildren()) {
+ if (dataContext.data == null) {
+ dataContext.data = new HashMap<>(); // NOPMD - necessary to allocate inside loop
+ }
+ dataContext.data.put(dataField.getFieldName(), ((WireDataFieldDescription) dataField).data());
+ }
+ } else {
+ throw new UnsupportedOperationException("Unknown field: " + field.getFieldName());
+ }
+ }
+ } catch (IllegalStateException e) {
+ throw new RdaLightException("unparsable context data: " + Arrays.toString(contextData.getData()) + "(" + new String(contextData.getData()) + ")", e);
+ }
+ return dataContext;
+ }
+
+ private static void assertDescriptor(final byte[] descriptor, final FrameType... frameTypes) throws RdaLightException {
+ if (descriptor.length != frameTypes.length) {
+ throw new RdaLightException("descriptor does not match message type: \n " + Arrays.toString(descriptor) + "\n " + Arrays.toString(frameTypes));
+ }
+ for (int i = 1; i < descriptor.length; i++) {
+ if (descriptor[i] != frameTypes[i].value()) {
+ throw new RdaLightException("descriptor does not match message type: \n " + Arrays.toString(descriptor) + "\n " + Arrays.toString(frameTypes));
+ }
+ }
+ }
+
+ private static byte[] checkDescriptor(final ZFrame descriptorMsg, final ZFrame firstFrame) throws RdaLightException {
+ if (firstFrame == null || !(Arrays.equals(firstFrame.getData(), new byte[] { MessageType.SERVER_REP.value() }) || Arrays.equals(firstFrame.getData(), new byte[] { MessageType.CLIENT_REQ.value() }))) {
+ throw new RdaLightException("Expecting only messages of type Heartbeat or Reply but got: " + firstFrame);
+ }
+ if (descriptorMsg == null) {
+ throw new RdaLightException("Message does not contain descriptor");
+ }
+ final byte[] descriptor = descriptorMsg.getData();
+ if (descriptor[0] != FrameType.HEADER.value()) {
+ throw new RdaLightException("First message of SERVER_REP has to be of type MT_HEADER but is: " + descriptor[0]);
+ }
+ return descriptor;
+ }
+
+ public static class RdaLightException extends Exception {
+ private static final long serialVersionUID = 5197623305559702319L;
+ public RdaLightException(final String msg) {
+ super(msg);
+ }
+
+ public RdaLightException(final String msg, final Throwable e) {
+ super(msg, e);
+ }
+ }
+}
diff --git a/client/src/main/java/io/opencmw/client/cmwlight/DirectoryLightClient.java b/client/src/main/java/io/opencmw/client/cmwlight/DirectoryLightClient.java
new file mode 100644
index 00000000..5d8e0f12
--- /dev/null
+++ b/client/src/main/java/io/opencmw/client/cmwlight/DirectoryLightClient.java
@@ -0,0 +1,203 @@
+package io.opencmw.client.cmwlight;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Obtain device info from the directory server
+ */
+public class DirectoryLightClient {
+ public static final String GET_DEVICE_INFO = "get-device-info";
+ // public static final String GET_SERVER_INFO = "get-server-info";
+ // private static final String SUPPORTED_CHARACTERS = "\\.\\-\\+_a-zA-Z0-9";
+ // private static final String NAME_REGEX = "[a-zA-Z0-9][" + SUPPORTED_CHARACTERS + "]*";
+ // private static final String CLIENT_INFO_SUPPORTED_CHARACTERS = "\\x20-\\x7E"; // ASCII := {32-126}
+ private static final String ERROR_STRING = "ERROR";
+ private static final String HOST_PORT_SEPARATOR = ":";
+
+ private static final String NOT_BOUND_LOCATION = "*NOT_BOUND*";
+ // static final String UNKNOWN_SERVER = "*UNKNOWN*";
+ private static final String CLIENT_INFO = "DirectoryLightClient";
+ private static final String VERSION = "2.0.0";
+ private final String nameserver;
+ private final int nameserverPort;
+
+ public DirectoryLightClient(final String... nameservers) throws DirectoryClientException {
+ if (nameservers.length != 1) {
+ throw new DirectoryClientException("only one nameserver supported at the moment");
+ }
+ final String[] hostport = nameservers[0].split(HOST_PORT_SEPARATOR);
+ if (hostport.length != 2) {
+ throw new DirectoryClientException("nameserver address has wrong format: " + nameservers[0]);
+ }
+ nameserver = hostport[0];
+ nameserverPort = Integer.parseInt(hostport[1]);
+ }
+
+ /**
+ * Build the request message to query a number of devices
+ *
+ * @param devices The devices to query information for
+ * @return The request message to send to the server
+ **/
+ private String getDeviceMsg(final List devices) {
+ final StringBuilder sb = new StringBuilder();
+ sb.append(GET_DEVICE_INFO).append("\n@client-info ").append(CLIENT_INFO).append("\n@version ").append(VERSION).append('\n');
+ // msg.append("@prefer-proxy\n");
+ // msg.append("@direct ").append(this.properties.directServers.getValue()).append("\n");
+ // msg.append("@domain ");
+ // for (Domain domain : domains) {
+ // msg.append(domain.getName());
+ // msg.append(",");
+ // }
+ // msg.deleteCharAt(msg.length()-1);
+ // msg.append("\n");
+ for (final String dev : devices) {
+ sb.append(dev).append('\n');
+ }
+ sb.append('\n');
+ return sb.toString();
+ }
+
+ // /**
+ // * Build the request message to query a number of servers
+ // *
+ // * @param servers The servers to query information for
+ // * @return The request message to send to the server
+ // **/
+ // private String getServerMsg(final List servers) {
+ // final StringBuilder sb = new StringBuilder();
+ // sb.append(GET_SERVER_INFO).append("\n");
+ // sb.append("@client-info ").append(CLIENT_INFO).append("\n");
+ // sb.append("@version ").append(VERSION).append("\n");
+ // // msg.append("@prefer-proxy\n");
+ // // msg.append("@direct ").append(this.properties.directServers.getValue()).append("\n");
+ // // msg.append("@domain ");
+ // // for (Domain domain : domains) {
+ // // msg.append(domain.getName());
+ // // msg.append(",");
+ // // }
+ // // msg.deleteCharAt(msg.length()-1);
+ // // msg.append("\n");
+ // for (final String dev : servers) {
+ // sb.append(dev).append('\n');
+ // }
+ // sb.append('\n');
+ // return sb.toString();
+ // }
+
+ /**
+ * Query Server information for a given list of devices.
+ *
+ * @param devices The devices to query information for
+ * @return a list of device information for the queried devices
+ **/
+ public List getDeviceInfo(final List devices) throws DirectoryClientException {
+ final ArrayList result = new ArrayList<>();
+ try (Socket socket = new Socket()) {
+ socket.connect(new InetSocketAddress(nameserver, nameserverPort));
+ try (PrintWriter writer = new PrintWriter(socket.getOutputStream());
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
+ writer.write(getDeviceMsg(devices));
+ writer.flush();
+ // read query result, one line per requested device or ERROR followed by error message
+ while (true) {
+ final String line = bufferedReader.readLine();
+ if (line == null) {
+ break;
+ }
+ if (line.equals(ERROR_STRING)) {
+ final String errorMsg = bufferedReader.lines().collect(Collectors.joining("\n")).strip();
+ throw new DirectoryClientException(errorMsg);
+ }
+ result.add(parseDeviceInfo(line));
+ }
+ }
+ } catch (IOException e) {
+ throw new DirectoryClientException("Nameserver error: ", e);
+ }
+ return result;
+ }
+
+ private Device parseDeviceInfo(final String line) throws DirectoryClientException {
+ String[] tokens = line.split(" ");
+ if (tokens.length < 2) {
+ throw new DirectoryClientException("Malformed reply line: " + line);
+ }
+ if (tokens[1].equals(NOT_BOUND_LOCATION)) {
+ throw new DirectoryClientException("Requested device not bound: " + tokens[0]);
+ }
+ final ArrayList