Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of Notification with Error Code for Californium endpoint provider #1561

Merged
merged 4 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@
*******************************************************************************/
package org.eclipse.leshan.client.resource;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.eclipse.leshan.client.servers.LwM2mServer;
Expand Down Expand Up @@ -83,40 +80,66 @@ public ReadResponse read(LwM2mServer server, int resourceid) {
}

@Override
public WriteResponse write(LwM2mServer server, boolean replace, int resourceid, LwM2mResource value) {
// define new Value
public WriteResponse write(LwM2mServer server, boolean replace, int resourceid, LwM2mResource valueToWrite) {
// Get resource model
ResourceModel resourceModel = getModel().resources.get(resourceid);
if (resourceModel == null) {
return WriteResponse.notFound();
}

// Define new value
LwM2mResource newValue;
List<LwM2mPath> newInstances = new ArrayList<>();
if (value instanceof LwM2mMultipleResource && !replace) {
// This is the special case of multiple instance resource where we do not replace the resource instances but
// we
// merge it.
LwM2mMultipleResource multipleResource = (LwM2mMultipleResource) resources.get(resourceid);
if (multipleResource != null) {
Map<Integer, LwM2mResourceInstance> mergedInstances = new HashMap<>(multipleResource.getInstances());
// try to detect resource instance changes
for (Entry<Integer, LwM2mResourceInstance> entry : ((LwM2mMultipleResource) value).getInstances()
.entrySet()) {
LwM2mResourceInstance previous = mergedInstances.put(entry.getKey(), entry.getValue());
if (!entry.getValue().equals(previous)) {
newInstances.add(getResourceInstancePath(resourceid, entry.getKey()));
}
}
newValue = new LwM2mMultipleResource(resourceid, value.getType(), mergedInstances.values());
if (resourceModel.multiple && !replace) {
// This is the special case of multiple instance resource
// where we do not replace the resource instances but we merge it.
LwM2mMultipleResource previousValue = (LwM2mMultipleResource) resources.get(resourceid);
if (previousValue != null) {
Map<Integer, LwM2mResourceInstance> mergedInstances = new HashMap<>();
mergedInstances.putAll(previousValue.getInstances());
mergedInstances.putAll(valueToWrite.getInstances());
newValue = new LwM2mMultipleResource(resourceid, valueToWrite.getType(), mergedInstances.values());
} else {
newValue = value;
newValue = valueToWrite;
}
} else {
newValue = value;
newValue = valueToWrite;
}

// Update value
LwM2mResource previousValue = resources.put(resourceid, newValue);

// Either we raise resource instance changes or resource change.
if (!newInstances.isEmpty()) {
fireResourcesChange(newInstances.toArray(new LwM2mPath[newInstances.size()]));
} else if (!newValue.equals(previousValue)) {
fireResourceChange(resourceid);
// Detect changes
Set<LwM2mPath> changedResources = new HashSet<>();
if (resourceModel.multiple) {
previousValue.getInstances().forEach((previousInstanceId, previousInstance) -> {
LwM2mResourceInstance newInstance = newValue.getInstances().get(previousInstanceId);
if (newInstance == null) {
// deletion
changedResources.add(getResourceInstancePath(resourceid, previousInstanceId));
} else {
if (!newInstance.equals(previousInstance)) {
// modification
changedResources.add(getResourceInstancePath(resourceid, previousInstanceId));
}
}
});

newValue.getInstances().forEach((newInstanceId, newInstance) -> {
LwM2mResourceInstance previousInstance = newValue.getInstances().get(newInstanceId);
if (previousInstance == null) {
// addition
changedResources.add(getResourceInstancePath(resourceid, newInstanceId));
}
});

} else {
if (!newValue.equals(previousValue)) {
changedResources.add(getResourcePath(resourceid));
}
}
// Raise changes
if (!changedResources.isEmpty()) {
fireResourcesChange(changedResources.toArray(new LwM2mPath[changedResources.size()]));
}
return WriteResponse.success();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@
import static org.eclipse.leshan.core.ResponseCode.CONTENT;
import static org.eclipse.leshan.integration.tests.util.LeshanTestClientBuilder.givenClientUsing;
import static org.eclipse.leshan.integration.tests.util.assertion.Assertions.assertThat;
import static org.junit.jupiter.api.Assumptions.assumeFalse;
import static org.junit.jupiter.params.provider.Arguments.arguments;

import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import org.eclipse.leshan.core.ResponseCode;
import org.eclipse.leshan.core.endpoint.Protocol;
import org.eclipse.leshan.core.model.ResourceModel.Type;
import org.eclipse.leshan.core.node.LwM2mObject;
import org.eclipse.leshan.core.node.LwM2mObjectInstance;
import org.eclipse.leshan.core.node.LwM2mResourceInstance;
Expand Down Expand Up @@ -175,6 +179,41 @@ public void can_observe_resource_instance(Protocol givenProtocol, String givenCl
assertThat(response).hasValidUnderlyingResponseFor(givenServerEndpointProvider);
}

@TestAllTransportLayer
public void observe_resource_instance_then_delete_it(Protocol givenProtocol, String givenClientEndpointProvider,
String givenServerEndpointProvider) throws InterruptedException {

// skip java-coap client because of not fixed bug : https://github.com/open-coap/java-coap/issues/76
assumeFalse(givenClientEndpointProvider.equals("java-coap"));
assumeFalse(givenServerEndpointProvider.equals("java-coap"));

// multi instance string
String expectedPath = "/" + TestLwM2mId.TEST_OBJECT + "/0/" + TestLwM2mId.MULTIPLE_STRING_VALUE + "/0";
ObserveResponse observeResponse = server.send(currentRegistration, new ObserveRequest(expectedPath));
assertThat(observeResponse) //
.hasCode(CONTENT) //
.hasValidUnderlyingResponseFor(givenServerEndpointProvider);

// an observation response should have been sent
SingleObservation observation = observeResponse.getObservation();
assertThat(observation.getPath()).asString().isEqualTo(expectedPath);
assertThat(observation.getRegistrationId()).isEqualTo(currentRegistration.getId());
Set<Observation> observations = server.getObservationService().getObservations(currentRegistration);
assertThat(observations).containsExactly(observation);
server.waitForNewObservation(observation);

// Write empty resoures <=> delete all instances
LwM2mResponse writeResponse = server.send(currentRegistration, new WriteRequest(Mode.REPLACE, ContentFormat.TLV,
TestLwM2mId.TEST_OBJECT, 0, TestLwM2mId.MULTIPLE_STRING_VALUE, Collections.emptyMap(), Type.STRING));
assertThat(writeResponse).hasCode(CHANGED);

// verify result
ObserveResponse response = server.waitForNotificationThenCancelled(observation);
assertThat(response).hasCode(ResponseCode.NOT_FOUND);
assertThat(response).hasValidUnderlyingResponseFor(givenServerEndpointProvider);

}

@TestAllTransportLayer
public void can_observe_resource_instance_then_passive_cancel(Protocol givenProtocol,
String givenClientEndpointProvider, String givenServerEndpointProvider) throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,21 @@ public ObserveResponse waitForNotificationOf(SingleObservation obs, int timeout,
return c.getValue();
}

public ObserveResponse waitForNotificationThenCancelled(SingleObservation obs) {
return waitForNotificationThenCancelled(obs, 1, TimeUnit.SECONDS);
}

public ObserveResponse waitForNotificationThenCancelled(SingleObservation obs, int timeout, TimeUnit unit) {
final ArgumentCaptor<ObserveResponse> c = ArgumentCaptor.forClass(ObserveResponse.class);
notificationEventInOrder.verify(notificationListener, timeout(unit.toMillis(timeout)).times(1)).onResponse(
assertArg(o -> assertThat(o).isEqualTo(obs)), //
assertArg(reg -> assertThat(reg.getId()).isEqualTo(obs.getRegistrationId())), //
c.capture());
notificationEventInOrder.verify(notificationListener, timeout(unit.toMillis(timeout)).times(1)).cancelled(obs);
notificationEventInOrder.verifyNoMoreInteractions();
return c.getValue();
}

public ObserveCompositeResponse waitForNotificationOf(CompositeObservation obs) {
return waitForNotificationOf(obs, 1, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.node.LwM2mResource;
import org.eclipse.leshan.core.node.LwM2mResourceInstance;
import org.eclipse.leshan.core.node.LwM2mSingleResource;
import org.eclipse.leshan.core.node.ObjectLink;
import org.eclipse.leshan.core.node.codec.CodecException;
import org.eclipse.leshan.core.request.ContentFormat;
Expand Down Expand Up @@ -303,6 +302,26 @@ public void write_unsigned_integer_resource(ContentFormat contentFormat, Protoco
assertThat(resource.getValue()).isEqualTo(expectedValue);
}

@TestAllCases
public void write_objlnk_resource(ContentFormat contentFormat, Protocol givenProtocol,
String givenClientEndpointProvider, String givenServerEndpointProvider) throws InterruptedException {
// write resource
ObjectLink expectedValue = new ObjectLink(10245, 1);
WriteResponse response = server.send(currentRegistration,
new WriteRequest(contentFormat, TestLwM2mId.TEST_OBJECT, 0, TestLwM2mId.OBJLINK_VALUE, expectedValue));

// verify result
assertThat(response) //
.hasCode(CHANGED) //
.hasValidUnderlyingResponseFor(givenServerEndpointProvider);

// read resource to check the value changed
ReadResponse readResponse = server.send(currentRegistration,
new ReadRequest(TestLwM2mId.TEST_OBJECT, 0, TestLwM2mId.OBJLINK_VALUE));
LwM2mResource resource = (LwM2mResource) readResponse.getContent();
assertThat(resource.getValue()).isEqualTo(expectedValue);
}

@TestAllCases
public void can_write_single_instance_objlnk_resource(ContentFormat contentFormat, Protocol givenProtocol,
String givenClientEndpointProvider, String givenServerEndpointProvider) throws InterruptedException {
Expand All @@ -311,7 +330,7 @@ public void can_write_single_instance_objlnk_resource(ContentFormat contentForma

// Write objlnk resource
WriteResponse response = server.send(currentRegistration, new WriteRequest(contentFormat,
TestLwM2mId.TEST_OBJECT, 0, TestLwM2mId.MULTIPLE_OBJLINK_VALUE, expectedValue));
TestLwM2mId.TEST_OBJECT, 0, TestLwM2mId.MULTIPLE_OBJLINK_VALUE, 0, expectedValue, Type.OBJLNK));

// Verify Write result
assertThat(response) //
Expand All @@ -320,8 +339,8 @@ public void can_write_single_instance_objlnk_resource(ContentFormat contentForma

// Reading back the written OBJLNK value
ReadResponse readResponse = server.send(currentRegistration,
new ReadRequest(TestLwM2mId.TEST_OBJECT, 0, TestLwM2mId.MULTIPLE_OBJLINK_VALUE));
LwM2mSingleResource resource = (LwM2mSingleResource) readResponse.getContent();
new ReadRequest(TestLwM2mId.TEST_OBJECT, 0, TestLwM2mId.MULTIPLE_OBJLINK_VALUE, 0));
LwM2mResourceInstance resource = (LwM2mResourceInstance) readResponse.getContent();
assertThat(resource.getValue()).isEqualTo(expectedValue);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void onNotification(Request coapRequest, Response coapResponse) {

// create Observe Response
try {
AbstractLwM2mResponse response = messagetranslator.createObservation(observation,
AbstractLwM2mResponse response = messagetranslator.createObserveResponse(observation,
coapResponse, toolbox, profile);
if (observation instanceof SingleObservation) {
notificatonReceiver.onNotification((SingleObservation) observation, client, profile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.List;
import java.util.Map;

import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.server.resources.Resource;
Expand Down Expand Up @@ -84,15 +83,8 @@ public List<Resource> createResources(UplinkRequestReceiver receiver, ServerEndp
identityHandlerProvider));
}

public AbstractLwM2mResponse createObservation(Observation observation, Response coapResponse,
public AbstractLwM2mResponse createObserveResponse(Observation observation, Response coapResponse,
ServerEndpointToolbox toolbox, ClientProfile profile) {
// CHANGED response is supported for backward compatibility with old spec.
if (coapResponse.getCode() != CoAP.ResponseCode.CHANGED
&& coapResponse.getCode() != CoAP.ResponseCode.CONTENT) {
throw new InvalidResponseException("Unexpected response code [%s] for %s", coapResponse.getCode(),
observation);
}

// get content format
ContentFormat contentFormat = null;
if (coapResponse.getOptions().hasContentFormat()) {
Expand All @@ -107,25 +99,34 @@ public AbstractLwM2mResponse createObservation(Observation observation, Response
if (observation instanceof SingleObservation) {
SingleObservation singleObservation = (SingleObservation) observation;

List<TimestampedLwM2mNode> timestampedNodes = toolbox.getDecoder().decodeTimestampedData(
coapResponse.getPayload(), contentFormat, singleObservation.getPath(), profile.getModel());

// create lwm2m response
if (timestampedNodes.size() == 1 && !timestampedNodes.get(0).isTimestamped()) {
return new ObserveResponse(responseCode, timestampedNodes.get(0).getNode(), null, singleObservation,
null, coapResponse);
} else {
return new ObserveResponse(responseCode, null, timestampedNodes, singleObservation, null,
if (responseCode.isError()) {
return new ObserveResponse(responseCode, null, null, null, coapResponse.getPayloadString(),
coapResponse);
} else {
List<TimestampedLwM2mNode> timestampedNodes = toolbox.getDecoder().decodeTimestampedData(
coapResponse.getPayload(), contentFormat, singleObservation.getPath(), profile.getModel());

// create lwm2m response
if (timestampedNodes.size() == 1 && !timestampedNodes.get(0).isTimestamped()) {
return new ObserveResponse(responseCode, timestampedNodes.get(0).getNode(), null,
singleObservation, null, coapResponse);
} else {
return new ObserveResponse(responseCode, null, timestampedNodes, singleObservation, null,
coapResponse);
}
}
} else if (observation instanceof CompositeObservation) {

CompositeObservation compositeObservation = (CompositeObservation) observation;

Map<LwM2mPath, LwM2mNode> nodes = toolbox.getDecoder().decodeNodes(coapResponse.getPayload(),
contentFormat, compositeObservation.getPaths(), profile.getModel());

return new ObserveCompositeResponse(responseCode, nodes, null, coapResponse, compositeObservation);
if (responseCode.isError()) {
return new ObserveCompositeResponse(responseCode, null, coapResponse.getPayloadString(),
coapResponse, null);
} else {
Map<LwM2mPath, LwM2mNode> nodes = toolbox.getDecoder().decodeNodes(coapResponse.getPayload(),
contentFormat, compositeObservation.getPaths(), profile.getModel());
return new ObserveCompositeResponse(responseCode, nodes, null, coapResponse, compositeObservation);
}
}

throw new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,27 +98,35 @@ private void sendObservation(CoapRequest observeRequest, Service<CoapRequest, Co

private static SeparateResponse toSeparateResponse(CoapResponse obsResponse, int currentObserveSequence,
CoapRequest subscribeRequest) {
obsResponse.options().setObserve(currentObserveSequence);
if (obsResponse.getCode() == Code.C205_CONTENT) {
// TODO would be better to check if this is a success instead of 205 content
obsResponse.options().setObserve(currentObserveSequence);
}
return obsResponse.toSeparate(subscribeRequest.getToken(), subscribeRequest.getPeerAddress());
}

private void sendObservation(CoapRequest observeRequest, SeparateResponse separateResponse) {
InetSocketAddress peerAddress = separateResponse.getPeerAddress();

notificationSender.apply(separateResponse).whenComplete((result, exception) -> {
if (exception != null) {
observersStore.remove(observeRequest);
LOGGER.warn("[{}#{}] Removed observation relation, got exception: {}", peerAddress,
separateResponse.getToken(), exception.toString());
} else if (!result) {
try {
notificationSender.apply(separateResponse).whenComplete((result, exception) -> {
if (exception != null) {
observersStore.remove(observeRequest);
LOGGER.warn("[{}#{}] Removed observation relation, got exception: {}", peerAddress,
separateResponse.getToken(), exception.toString());
} else if (!result) {
observersStore.remove(observeRequest);
LOGGER.info("[{}#{}] Removed observation relation, got reset", peerAddress,
separateResponse.getToken());
}
});

if (separateResponse.getCode() != Code.C205_CONTENT) {
observersStore.remove(observeRequest);
LOGGER.info("[{}#{}] Removed observation relation, got reset", peerAddress,
separateResponse.getToken());
}
});

if (separateResponse.getCode() != Code.C205_CONTENT) {
observersStore.remove(observeRequest);
} catch (RuntimeException e) {
LOGGER.warn("Unexpected Exception when sending Notification(%s) to %s", peerAddress,
separateResponse.getToken(), e);
}
}

Expand Down