Skip to content

Commit

Permalink
Read Composite support for timestamped data.
Browse files Browse the repository at this point in the history
  • Loading branch information
JaroslawLegierski committed Aug 7, 2024
1 parent b05c759 commit 020fb17
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
Expand All @@ -59,6 +62,7 @@
import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.node.LwM2mSingleResource;
import org.eclipse.leshan.core.node.TimestampedLwM2mNode;
import org.eclipse.leshan.core.node.TimestampedLwM2mNodes;
import org.eclipse.leshan.core.node.codec.DefaultLwM2mEncoder;
import org.eclipse.leshan.core.node.codec.LwM2mEncoder;
import org.eclipse.leshan.core.observation.Observation;
Expand All @@ -67,6 +71,7 @@
import org.eclipse.leshan.core.request.ContentFormat;
import org.eclipse.leshan.core.request.DeregisterRequest;
import org.eclipse.leshan.core.request.ObserveRequest;
import org.eclipse.leshan.core.request.ReadCompositeRequest;
import org.eclipse.leshan.core.request.ReadRequest;
import org.eclipse.leshan.core.request.RegisterRequest;
import org.eclipse.leshan.core.request.UpdateRequest;
Expand All @@ -75,6 +80,7 @@
import org.eclipse.leshan.core.response.CancelObservationResponse;
import org.eclipse.leshan.core.response.ErrorCallback;
import org.eclipse.leshan.core.response.ObserveResponse;
import org.eclipse.leshan.core.response.ReadCompositeResponse;
import org.eclipse.leshan.core.response.ReadResponse;
import org.eclipse.leshan.core.response.ResponseCallback;
import org.eclipse.leshan.integration.tests.util.LeshanTestServer;
Expand Down Expand Up @@ -524,4 +530,55 @@ public void observe_timestamped(String givenServerEndpointProvider) throws Excep
ObserveResponse cancelResponse = cancelFuture.get(1, TimeUnit.SECONDS);
assertThat(cancelResponse.getTimestampedLwM2mNode()).isEqualTo(timestampedNode);
}

@TestAllTransportLayer
public void read_composite_timestamped(String givenServerEndpointProvider) throws Exception {

// register client
LockStepLwM2mClient client = new LockStepLwM2mClient(server.getEndpoint(Protocol.COAP).getURI());
Token token = client
.sendLwM2mRequest(new RegisterRequest(client.getEndpointName(), 60l, "1.1", EnumSet.of(BindingMode.U),
null, null, linkParser.parseCoreLinkFormat("</1>,</2>,</3>".getBytes()), null));
client.expectResponse().token(token).go();
server.waitForNewRegistrationOf(client.getEndpointName());

Registration registration = server.getRegistrationService().getByEndpoint(client.getEndpointName());

// create timestamped data

List<LwM2mPath> paths = new ArrayList<>();
paths.add(new LwM2mPath("/1/0/1"));
paths.add(new LwM2mPath("/3/0/15"));
// and expected Time-stamped nodes
TimestampedLwM2mNodes.Builder builder = new TimestampedLwM2mNodes.Builder();
Instant t1 = Instant.now().truncatedTo(ChronoUnit.MILLIS);
Instant t2 = Instant.now().truncatedTo(ChronoUnit.MILLIS).minusSeconds(200);
builder.put(t1, paths.get(0), LwM2mSingleResource.newIntegerResource(1, 3600));
builder.put(t1, paths.get(1), LwM2mSingleResource.newStringResource(15, "Europe/Belgrade"));
TimestampedLwM2mNodes timestampednodes = builder.build();

// TEST
LwM2mEncoder encoder = new DefaultLwM2mEncoder();

byte[] payload = encoder.encodeTimestampedNodes(timestampednodes, ContentFormat.SENML_JSON,
client.getLwM2mModel());

// send read request
Future<ReadCompositeResponse> future = Executors.newSingleThreadExecutor().submit(() -> {
// send a request with 1 seconds timeout
return server.send(registration,
new ReadCompositeRequest(ContentFormat.SENML_JSON, ContentFormat.SENML_JSON, "/1/0/1", "/3/0/15"),
1000);
});

// wait for request and send response
client.expectRequest().storeToken("TKN").storeMID("MID").go();
client.sendResponse(Type.ACK, ResponseCode.CONTENT).loadMID("MID").loadToken("TKN")
.payload(payload, ContentFormat.SENML_JSON_CODE).go();

// check response received at server side
ReadCompositeResponse response = future.get(1, TimeUnit.SECONDS);
assertThat(response.getTimestampedLwM2mNodes()).isEqualTo(timestampednodes);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ public void visit(CancelObservationRequest request) {

@Override
public void visit(ReadCompositeRequest request) {
response = new ReadCompositeResponse(code, null, errorMessage, null);
response = new ReadCompositeResponse(code, null, null, errorMessage, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public ObserveCompositeResponse(ResponseCode code, Map<LwM2mPath, LwM2mNode> con
TimestampedLwM2mNodes timestampedValues, CompositeObservation observation, String errorMessage,
Object coapResponse) {
super(code, timestampedValues != null && !timestampedValues.isEmpty() ? timestampedValues.getNodes() : content,
errorMessage, coapResponse);
null, errorMessage, coapResponse);
this.observation = observation;
this.timestampedValues = timestampedValues;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,58 @@
package org.eclipse.leshan.core.response;

import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.eclipse.leshan.core.ResponseCode;
import org.eclipse.leshan.core.node.LwM2mNode;
import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.node.TimestampedLwM2mNodes;

public class ReadCompositeResponse extends AbstractLwM2mResponse {

protected final Map<LwM2mPath, LwM2mNode> content;

public ReadCompositeResponse(ResponseCode code, Map<LwM2mPath, LwM2mNode> content, String errorMessage,
Object coapResponse) {
protected final TimestampedLwM2mNodes timestampedValues;

public ReadCompositeResponse(ResponseCode code, Map<LwM2mPath, LwM2mNode> content,
TimestampedLwM2mNodes timestampedValues, String errorMessage, Object coapResponse) {
super(code, errorMessage, coapResponse);
this.content = content;

Map<LwM2mPath, LwM2mNode> responseContent;
TimestampedLwM2mNodes responsetimestampedValues;

if (timestampedValues != null) {
// handle if timestamped value is passed
if (content != null) {
throw new IllegalArgumentException("content OR timestampedValue should be passed but not both");
}
// store value if all timestamps in timestampedValues are null
if (!timestampedValues.getNodes().isEmpty()
&& timestampedValues.getTimestamps().stream().noneMatch(Objects::nonNull)) {

responseContent = timestampedValues.getNodes();
responsetimestampedValues = null;
} else {
// check if we have only timestamp in timestampedValues
if (timestampedValues.getTimestamps().stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting())).size() >= 2) {
throw new IllegalArgumentException("only one timestamp in the content is allowed");
}

responseContent = null;
responsetimestampedValues = timestampedValues;
}
} else {
// handle if content (not timestamped) value is passed
responsetimestampedValues = null;
responseContent = content;
}

this.content = responseContent;
this.timestampedValues = responsetimestampedValues;

}

public Map<LwM2mPath, LwM2mNode> getContent() {
Expand All @@ -39,6 +78,10 @@ public LwM2mNode getContent(String path) {
return content.get(new LwM2mPath(path));
}

public TimestampedLwM2mNodes getTimestampedLwM2mNodes() {
return timestampedValues;
}

@Override
public boolean isSuccess() {
return getCode() == ResponseCode.CONTENT;
Expand Down Expand Up @@ -71,34 +114,38 @@ public String toString() {

// Syntactic sugar static constructors :
public static ReadCompositeResponse success(Map<LwM2mPath, LwM2mNode> content) {
return new ReadCompositeResponse(ResponseCode.CONTENT, content, null, null);
return new ReadCompositeResponse(ResponseCode.CONTENT, content, null, null, null);
}

public static ReadCompositeResponse success(TimestampedLwM2mNodes timestampedValues) {
return new ReadCompositeResponse(ResponseCode.CONTENT, null, timestampedValues, null, null);
}

public static ReadCompositeResponse notFound() {
return new ReadCompositeResponse(ResponseCode.NOT_FOUND, null, null, null);
return new ReadCompositeResponse(ResponseCode.NOT_FOUND, null, null, null, null);
}

public static ReadCompositeResponse unauthorized() {
return new ReadCompositeResponse(ResponseCode.UNAUTHORIZED, null, null, null);
return new ReadCompositeResponse(ResponseCode.UNAUTHORIZED, null, null, null, null);
}

public static ReadCompositeResponse methodNotAllowed() {
return new ReadCompositeResponse(ResponseCode.METHOD_NOT_ALLOWED, null, null, null);
return new ReadCompositeResponse(ResponseCode.METHOD_NOT_ALLOWED, null, null, null, null);
}

public static ReadCompositeResponse notAcceptable() {
return new ReadCompositeResponse(ResponseCode.NOT_ACCEPTABLE, null, null, null);
return new ReadCompositeResponse(ResponseCode.NOT_ACCEPTABLE, null, null, null, null);
}

public static ReadCompositeResponse unsupportedContentFormat() {
return new ReadCompositeResponse(ResponseCode.UNSUPPORTED_CONTENT_FORMAT, null, null, null);
return new ReadCompositeResponse(ResponseCode.UNSUPPORTED_CONTENT_FORMAT, null, null, null, null);
}

public static ReadCompositeResponse badRequest(String errorMessage) {
return new ReadCompositeResponse(ResponseCode.BAD_REQUEST, null, errorMessage, null);
return new ReadCompositeResponse(ResponseCode.BAD_REQUEST, null, null, errorMessage, null);
}

public static ReadCompositeResponse internalServerError(String errorMessage) {
return new ReadCompositeResponse(ResponseCode.INTERNAL_SERVER_ERROR, null, errorMessage, null);
return new ReadCompositeResponse(ResponseCode.INTERNAL_SERVER_ERROR, null, null, errorMessage, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.eclipse.leshan.core.node.LwM2mNode;
import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.node.TimestampedLwM2mNode;
import org.eclipse.leshan.core.node.TimestampedLwM2mNodes;
import org.eclipse.leshan.core.node.codec.CodecException;
import org.eclipse.leshan.core.node.codec.LwM2mDecoder;
import org.eclipse.leshan.core.observation.CompositeObservation;
Expand Down Expand Up @@ -295,13 +296,13 @@ public void visit(CancelObservationRequest request) {
public void visit(ReadCompositeRequest request) {
if (coapResponse.isError()) {
// handle error response:
lwM2mresponse = new ReadCompositeResponse(toLwM2mResponseCode(coapResponse.getCode()), null,
lwM2mresponse = new ReadCompositeResponse(toLwM2mResponseCode(coapResponse.getCode()), null, null,
coapResponse.getPayloadString(), coapResponse);
} else if (isResponseCodeContent()) {
// handle success response:
Map<LwM2mPath, LwM2mNode> content = decodeCompositeCoapResponse(request.getPaths(), coapResponse, request,
clientEndpoint);
lwM2mresponse = new ReadCompositeResponse(ResponseCode.CONTENT, content, null, coapResponse);
TimestampedLwM2mNodes timestampedNodes = decodeTimestampedCompositeCoapResponse(request.getPaths(),
coapResponse, request, clientEndpoint);
lwM2mresponse = new ReadCompositeResponse(ResponseCode.CONTENT, null, timestampedNodes, null, coapResponse);
} else {
// handle unexpected response:
handleUnexpectedResponseCode(clientEndpoint, request, coapResponse);
Expand Down Expand Up @@ -384,6 +385,17 @@ private Map<LwM2mPath, LwM2mNode> decodeCompositeCoapResponse(List<LwM2mPath> pa
}
}

private TimestampedLwM2mNodes decodeTimestampedCompositeCoapResponse(List<LwM2mPath> paths, Response coapResponse,
LwM2mRequest<?> request, String endpoint) {
try {
return decoder.decodeTimestampedNodes(coapResponse.getPayload(), getContentFormat(coapResponse), paths,
model);
} catch (CodecException e) {
handleCodecException(e, request, coapResponse, endpoint);
return null; // should not happen as handleCodecException raise exception
}
}

private TimestampedLwM2mNode decodeCoapTimestampedResponse(LwM2mPath path, Response coapResponse,
LwM2mRequest<?> request, String endpoint) {
List<TimestampedLwM2mNode> timestampedNodes = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.leshan.core.node.LwM2mNode;
import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.node.TimestampedLwM2mNode;
import org.eclipse.leshan.core.node.TimestampedLwM2mNodes;
import org.eclipse.leshan.core.node.codec.CodecException;
import org.eclipse.leshan.core.node.codec.LwM2mDecoder;
import org.eclipse.leshan.core.observation.CompositeObservation;
Expand Down Expand Up @@ -295,13 +296,13 @@ public void visit(CancelObservationRequest request) {
public void visit(ReadCompositeRequest request) {
if (coapResponse.getCode().getHttpCode() >= 400) {
// handle error response:
lwM2mresponse = new ReadCompositeResponse(toLwM2mResponseCode(coapResponse.getCode()), null,
lwM2mresponse = new ReadCompositeResponse(toLwM2mResponseCode(coapResponse.getCode()), null, null,
coapResponse.getPayloadString(), coapResponse);
} else if (isResponseCodeContent()) {
// handle success response:
Map<LwM2mPath, LwM2mNode> content = decodeCompositeCoapResponse(request.getPaths(), coapResponse, request,
clientEndpoint);
lwM2mresponse = new ReadCompositeResponse(ResponseCode.CONTENT, content, null, coapResponse);
TimestampedLwM2mNodes timestampedNodes = decodeTimestampedCompositeCoapResponse(request.getPaths(),
coapResponse, request, clientEndpoint);
lwM2mresponse = new ReadCompositeResponse(ResponseCode.CONTENT, null, timestampedNodes, null, coapResponse);
} else {
// handle unexpected response:
handleUnexpectedResponseCode(clientEndpoint, request, coapResponse);
Expand Down Expand Up @@ -499,6 +500,17 @@ private Map<LwM2mPath, LwM2mNode> decodeCompositeCoapResponse(List<LwM2mPath> pa
}
}

private TimestampedLwM2mNodes decodeTimestampedCompositeCoapResponse(List<LwM2mPath> paths,
CoapResponse coapResponse, LwM2mRequest<?> request, String endpoint) {
try {
return decoder.decodeTimestampedNodes(coapResponse.getPayload().getBytes(), getContentFormat(coapResponse),
paths, model);
} catch (CodecException e) {
handleCodecException(e, request, coapResponse, endpoint);
return null; // should not happen as handleCodecException raise exception
}
}

private TimestampedLwM2mNode decodeCoapTimestampedResponse(LwM2mPath path, CoapResponse coapResponse,
LwM2mRequest<?> request, String endpoint) {
List<TimestampedLwM2mNode> timestampedNodes = null;
Expand Down

0 comments on commit 020fb17

Please sign in to comment.