diff --git a/leshan-client-core/src/main/java/org/eclipse/leshan/client/resource/SimpleInstanceEnabler.java b/leshan-client-core/src/main/java/org/eclipse/leshan/client/resource/SimpleInstanceEnabler.java index b5c5c67f24..4edb4eb768 100644 --- a/leshan-client-core/src/main/java/org/eclipse/leshan/client/resource/SimpleInstanceEnabler.java +++ b/leshan-client-core/src/main/java/org/eclipse/leshan/client/resource/SimpleInstanceEnabler.java @@ -18,14 +18,11 @@ *******************************************************************************/ package org.eclipse.leshan.client.resource; -import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import org.eclipse.leshan.client.servers.LwM2mServer; @@ -83,40 +80,66 @@ public ReadResponse read(LwM2mServer server, int resourceid) { } @Override - public WriteResponse write(LwM2mServer server, boolean replace, int resourceid, LwM2mResource value) { - // define new Value + public WriteResponse write(LwM2mServer server, boolean replace, int resourceid, LwM2mResource valueToWrite) { + // Get resource model + ResourceModel resourceModel = getModel().resources.get(resourceid); + if (resourceModel == null) { + return WriteResponse.notFound(); + } + + // Define new value LwM2mResource newValue; - List newInstances = new ArrayList<>(); - if (value instanceof LwM2mMultipleResource && !replace) { - // This is the special case of multiple instance resource where we do not replace the resource instances but - // we - // merge it. - LwM2mMultipleResource multipleResource = (LwM2mMultipleResource) resources.get(resourceid); - if (multipleResource != null) { - Map mergedInstances = new HashMap<>(multipleResource.getInstances()); - // try to detect resource instance changes - for (Entry entry : ((LwM2mMultipleResource) value).getInstances() - .entrySet()) { - LwM2mResourceInstance previous = mergedInstances.put(entry.getKey(), entry.getValue()); - if (!entry.getValue().equals(previous)) { - newInstances.add(getResourceInstancePath(resourceid, entry.getKey())); - } - } - newValue = new LwM2mMultipleResource(resourceid, value.getType(), mergedInstances.values()); + if (resourceModel.multiple && !replace) { + // This is the special case of multiple instance resource + // where we do not replace the resource instances but we merge it. + LwM2mMultipleResource previousValue = (LwM2mMultipleResource) resources.get(resourceid); + if (previousValue != null) { + Map mergedInstances = new HashMap<>(); + mergedInstances.putAll(previousValue.getInstances()); + mergedInstances.putAll(valueToWrite.getInstances()); + newValue = new LwM2mMultipleResource(resourceid, valueToWrite.getType(), mergedInstances.values()); } else { - newValue = value; + newValue = valueToWrite; } } else { - newValue = value; + newValue = valueToWrite; } + // Update value LwM2mResource previousValue = resources.put(resourceid, newValue); - // Either we raise resource instance changes or resource change. - if (!newInstances.isEmpty()) { - fireResourcesChange(newInstances.toArray(new LwM2mPath[newInstances.size()])); - } else if (!newValue.equals(previousValue)) { - fireResourceChange(resourceid); + // Detect changes + Set changedResources = new HashSet<>(); + if (resourceModel.multiple) { + previousValue.getInstances().forEach((previousInstanceId, previousInstance) -> { + LwM2mResourceInstance newInstance = newValue.getInstances().get(previousInstanceId); + if (newInstance == null) { + // deletion + changedResources.add(getResourceInstancePath(resourceid, previousInstanceId)); + } else { + if (!newInstance.equals(previousInstance)) { + // modification + changedResources.add(getResourceInstancePath(resourceid, previousInstanceId)); + } + } + }); + + newValue.getInstances().forEach((newInstanceId, newInstance) -> { + LwM2mResourceInstance previousInstance = newValue.getInstances().get(newInstanceId); + if (previousInstance == null) { + // addition + changedResources.add(getResourceInstancePath(resourceid, newInstanceId)); + } + }); + + } else { + if (!newValue.equals(previousValue)) { + changedResources.add(getResourcePath(resourceid)); + } + } + // Raise changes + if (!changedResources.isEmpty()) { + fireResourcesChange(changedResources.toArray(new LwM2mPath[changedResources.size()])); } return WriteResponse.success(); } diff --git a/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/observe/ObserveTest.java b/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/observe/ObserveTest.java index 9d731ca7c0..0d5b344464 100644 --- a/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/observe/ObserveTest.java +++ b/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/observe/ObserveTest.java @@ -24,15 +24,19 @@ import static org.eclipse.leshan.core.ResponseCode.CONTENT; import static org.eclipse.leshan.integration.tests.util.LeshanTestClientBuilder.givenClientUsing; import static org.eclipse.leshan.integration.tests.util.assertion.Assertions.assertThat; +import static org.junit.jupiter.api.Assumptions.assumeFalse; import static org.junit.jupiter.params.provider.Arguments.arguments; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; +import java.util.Collections; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import org.eclipse.leshan.core.ResponseCode; import org.eclipse.leshan.core.endpoint.Protocol; +import org.eclipse.leshan.core.model.ResourceModel.Type; import org.eclipse.leshan.core.node.LwM2mObject; import org.eclipse.leshan.core.node.LwM2mObjectInstance; import org.eclipse.leshan.core.node.LwM2mResourceInstance; @@ -175,6 +179,41 @@ public void can_observe_resource_instance(Protocol givenProtocol, String givenCl assertThat(response).hasValidUnderlyingResponseFor(givenServerEndpointProvider); } + @TestAllTransportLayer + public void observe_resource_instance_then_delete_it(Protocol givenProtocol, String givenClientEndpointProvider, + String givenServerEndpointProvider) throws InterruptedException { + + // skip java-coap client because of not fixed bug : https://github.com/open-coap/java-coap/issues/76 + assumeFalse(givenClientEndpointProvider.equals("java-coap")); + assumeFalse(givenServerEndpointProvider.equals("java-coap")); + + // multi instance string + String expectedPath = "/" + TestLwM2mId.TEST_OBJECT + "/0/" + TestLwM2mId.MULTIPLE_STRING_VALUE + "/0"; + ObserveResponse observeResponse = server.send(currentRegistration, new ObserveRequest(expectedPath)); + assertThat(observeResponse) // + .hasCode(CONTENT) // + .hasValidUnderlyingResponseFor(givenServerEndpointProvider); + + // an observation response should have been sent + SingleObservation observation = observeResponse.getObservation(); + assertThat(observation.getPath()).asString().isEqualTo(expectedPath); + assertThat(observation.getRegistrationId()).isEqualTo(currentRegistration.getId()); + Set observations = server.getObservationService().getObservations(currentRegistration); + assertThat(observations).containsExactly(observation); + server.waitForNewObservation(observation); + + // Write empty resoures <=> delete all instances + LwM2mResponse writeResponse = server.send(currentRegistration, new WriteRequest(Mode.REPLACE, ContentFormat.TLV, + TestLwM2mId.TEST_OBJECT, 0, TestLwM2mId.MULTIPLE_STRING_VALUE, Collections.emptyMap(), Type.STRING)); + assertThat(writeResponse).hasCode(CHANGED); + + // verify result + ObserveResponse response = server.waitForNotificationThenCancelled(observation); + assertThat(response).hasCode(ResponseCode.NOT_FOUND); + assertThat(response).hasValidUnderlyingResponseFor(givenServerEndpointProvider); + + } + @TestAllTransportLayer public void can_observe_resource_instance_then_passive_cancel(Protocol givenProtocol, String givenClientEndpointProvider, String givenServerEndpointProvider) throws InterruptedException { diff --git a/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/util/LeshanTestServer.java b/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/util/LeshanTestServer.java index 891f2728a2..6697bfb712 100644 --- a/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/util/LeshanTestServer.java +++ b/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/util/LeshanTestServer.java @@ -316,6 +316,21 @@ public ObserveResponse waitForNotificationOf(SingleObservation obs, int timeout, return c.getValue(); } + public ObserveResponse waitForNotificationThenCancelled(SingleObservation obs) { + return waitForNotificationThenCancelled(obs, 1, TimeUnit.SECONDS); + } + + public ObserveResponse waitForNotificationThenCancelled(SingleObservation obs, int timeout, TimeUnit unit) { + final ArgumentCaptor c = ArgumentCaptor.forClass(ObserveResponse.class); + notificationEventInOrder.verify(notificationListener, timeout(unit.toMillis(timeout)).times(1)).onResponse( + assertArg(o -> assertThat(o).isEqualTo(obs)), // + assertArg(reg -> assertThat(reg.getId()).isEqualTo(obs.getRegistrationId())), // + c.capture()); + notificationEventInOrder.verify(notificationListener, timeout(unit.toMillis(timeout)).times(1)).cancelled(obs); + notificationEventInOrder.verifyNoMoreInteractions(); + return c.getValue(); + } + public ObserveCompositeResponse waitForNotificationOf(CompositeObservation obs) { return waitForNotificationOf(obs, 1, TimeUnit.SECONDS); } diff --git a/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/write/WriteSingleValueTest.java b/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/write/WriteSingleValueTest.java index cf5168f4c4..dbe7aabf06 100644 --- a/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/write/WriteSingleValueTest.java +++ b/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/write/WriteSingleValueTest.java @@ -38,7 +38,6 @@ import org.eclipse.leshan.core.node.LwM2mPath; import org.eclipse.leshan.core.node.LwM2mResource; import org.eclipse.leshan.core.node.LwM2mResourceInstance; -import org.eclipse.leshan.core.node.LwM2mSingleResource; import org.eclipse.leshan.core.node.ObjectLink; import org.eclipse.leshan.core.node.codec.CodecException; import org.eclipse.leshan.core.request.ContentFormat; @@ -303,6 +302,26 @@ public void write_unsigned_integer_resource(ContentFormat contentFormat, Protoco assertThat(resource.getValue()).isEqualTo(expectedValue); } + @TestAllCases + public void write_objlnk_resource(ContentFormat contentFormat, Protocol givenProtocol, + String givenClientEndpointProvider, String givenServerEndpointProvider) throws InterruptedException { + // write resource + ObjectLink expectedValue = new ObjectLink(10245, 1); + WriteResponse response = server.send(currentRegistration, + new WriteRequest(contentFormat, TestLwM2mId.TEST_OBJECT, 0, TestLwM2mId.OBJLINK_VALUE, expectedValue)); + + // verify result + assertThat(response) // + .hasCode(CHANGED) // + .hasValidUnderlyingResponseFor(givenServerEndpointProvider); + + // read resource to check the value changed + ReadResponse readResponse = server.send(currentRegistration, + new ReadRequest(TestLwM2mId.TEST_OBJECT, 0, TestLwM2mId.OBJLINK_VALUE)); + LwM2mResource resource = (LwM2mResource) readResponse.getContent(); + assertThat(resource.getValue()).isEqualTo(expectedValue); + } + @TestAllCases public void can_write_single_instance_objlnk_resource(ContentFormat contentFormat, Protocol givenProtocol, String givenClientEndpointProvider, String givenServerEndpointProvider) throws InterruptedException { @@ -311,7 +330,7 @@ public void can_write_single_instance_objlnk_resource(ContentFormat contentForma // Write objlnk resource WriteResponse response = server.send(currentRegistration, new WriteRequest(contentFormat, - TestLwM2mId.TEST_OBJECT, 0, TestLwM2mId.MULTIPLE_OBJLINK_VALUE, expectedValue)); + TestLwM2mId.TEST_OBJECT, 0, TestLwM2mId.MULTIPLE_OBJLINK_VALUE, 0, expectedValue, Type.OBJLNK)); // Verify Write result assertThat(response) // @@ -320,8 +339,8 @@ public void can_write_single_instance_objlnk_resource(ContentFormat contentForma // Reading back the written OBJLNK value ReadResponse readResponse = server.send(currentRegistration, - new ReadRequest(TestLwM2mId.TEST_OBJECT, 0, TestLwM2mId.MULTIPLE_OBJLINK_VALUE)); - LwM2mSingleResource resource = (LwM2mSingleResource) readResponse.getContent(); + new ReadRequest(TestLwM2mId.TEST_OBJECT, 0, TestLwM2mId.MULTIPLE_OBJLINK_VALUE, 0)); + LwM2mResourceInstance resource = (LwM2mResourceInstance) readResponse.getContent(); assertThat(resource.getValue()).isEqualTo(expectedValue); } diff --git a/leshan-server-cf/src/main/java/org/eclipse/leshan/server/californium/endpoint/CaliforniumServerEndpointsProvider.java b/leshan-server-cf/src/main/java/org/eclipse/leshan/server/californium/endpoint/CaliforniumServerEndpointsProvider.java index 28425c3c3d..b559a903a6 100644 --- a/leshan-server-cf/src/main/java/org/eclipse/leshan/server/californium/endpoint/CaliforniumServerEndpointsProvider.java +++ b/leshan-server-cf/src/main/java/org/eclipse/leshan/server/californium/endpoint/CaliforniumServerEndpointsProvider.java @@ -165,7 +165,7 @@ public void onNotification(Request coapRequest, Response coapResponse) { // create Observe Response try { - AbstractLwM2mResponse response = messagetranslator.createObservation(observation, + AbstractLwM2mResponse response = messagetranslator.createObserveResponse(observation, coapResponse, toolbox, profile); if (observation instanceof SingleObservation) { notificatonReceiver.onNotification((SingleObservation) observation, client, profile, diff --git a/leshan-server-cf/src/main/java/org/eclipse/leshan/server/californium/endpoint/ServerCoapMessageTranslator.java b/leshan-server-cf/src/main/java/org/eclipse/leshan/server/californium/endpoint/ServerCoapMessageTranslator.java index c96e539689..f40f695e8b 100644 --- a/leshan-server-cf/src/main/java/org/eclipse/leshan/server/californium/endpoint/ServerCoapMessageTranslator.java +++ b/leshan-server-cf/src/main/java/org/eclipse/leshan/server/californium/endpoint/ServerCoapMessageTranslator.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.Map; -import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.Request; import org.eclipse.californium.core.coap.Response; import org.eclipse.californium.core.server.resources.Resource; @@ -84,15 +83,8 @@ public List createResources(UplinkRequestReceiver receiver, ServerEndp identityHandlerProvider)); } - public AbstractLwM2mResponse createObservation(Observation observation, Response coapResponse, + public AbstractLwM2mResponse createObserveResponse(Observation observation, Response coapResponse, ServerEndpointToolbox toolbox, ClientProfile profile) { - // CHANGED response is supported for backward compatibility with old spec. - if (coapResponse.getCode() != CoAP.ResponseCode.CHANGED - && coapResponse.getCode() != CoAP.ResponseCode.CONTENT) { - throw new InvalidResponseException("Unexpected response code [%s] for %s", coapResponse.getCode(), - observation); - } - // get content format ContentFormat contentFormat = null; if (coapResponse.getOptions().hasContentFormat()) { @@ -107,25 +99,34 @@ public AbstractLwM2mResponse createObservation(Observation observation, Response if (observation instanceof SingleObservation) { SingleObservation singleObservation = (SingleObservation) observation; - List timestampedNodes = toolbox.getDecoder().decodeTimestampedData( - coapResponse.getPayload(), contentFormat, singleObservation.getPath(), profile.getModel()); - - // create lwm2m response - if (timestampedNodes.size() == 1 && !timestampedNodes.get(0).isTimestamped()) { - return new ObserveResponse(responseCode, timestampedNodes.get(0).getNode(), null, singleObservation, - null, coapResponse); - } else { - return new ObserveResponse(responseCode, null, timestampedNodes, singleObservation, null, + if (responseCode.isError()) { + return new ObserveResponse(responseCode, null, null, null, coapResponse.getPayloadString(), coapResponse); + } else { + List timestampedNodes = toolbox.getDecoder().decodeTimestampedData( + coapResponse.getPayload(), contentFormat, singleObservation.getPath(), profile.getModel()); + + // create lwm2m response + if (timestampedNodes.size() == 1 && !timestampedNodes.get(0).isTimestamped()) { + return new ObserveResponse(responseCode, timestampedNodes.get(0).getNode(), null, + singleObservation, null, coapResponse); + } else { + return new ObserveResponse(responseCode, null, timestampedNodes, singleObservation, null, + coapResponse); + } } } else if (observation instanceof CompositeObservation) { CompositeObservation compositeObservation = (CompositeObservation) observation; - Map nodes = toolbox.getDecoder().decodeNodes(coapResponse.getPayload(), - contentFormat, compositeObservation.getPaths(), profile.getModel()); - - return new ObserveCompositeResponse(responseCode, nodes, null, coapResponse, compositeObservation); + if (responseCode.isError()) { + return new ObserveCompositeResponse(responseCode, null, coapResponse.getPayloadString(), + coapResponse, null); + } else { + Map nodes = toolbox.getDecoder().decodeNodes(coapResponse.getPayload(), + contentFormat, compositeObservation.getPaths(), profile.getModel()); + return new ObserveCompositeResponse(responseCode, nodes, null, coapResponse, compositeObservation); + } } throw new IllegalStateException( diff --git a/leshan-tl-javacoap-client/src/main/java/org/eclipse/leshan/transport/javacoap/client/observe/ObserversManager.java b/leshan-tl-javacoap-client/src/main/java/org/eclipse/leshan/transport/javacoap/client/observe/ObserversManager.java index 491a874815..b45965e5d2 100644 --- a/leshan-tl-javacoap-client/src/main/java/org/eclipse/leshan/transport/javacoap/client/observe/ObserversManager.java +++ b/leshan-tl-javacoap-client/src/main/java/org/eclipse/leshan/transport/javacoap/client/observe/ObserversManager.java @@ -98,27 +98,35 @@ private void sendObservation(CoapRequest observeRequest, Service { - if (exception != null) { - observersStore.remove(observeRequest); - LOGGER.warn("[{}#{}] Removed observation relation, got exception: {}", peerAddress, - separateResponse.getToken(), exception.toString()); - } else if (!result) { + try { + notificationSender.apply(separateResponse).whenComplete((result, exception) -> { + if (exception != null) { + observersStore.remove(observeRequest); + LOGGER.warn("[{}#{}] Removed observation relation, got exception: {}", peerAddress, + separateResponse.getToken(), exception.toString()); + } else if (!result) { + observersStore.remove(observeRequest); + LOGGER.info("[{}#{}] Removed observation relation, got reset", peerAddress, + separateResponse.getToken()); + } + }); + + if (separateResponse.getCode() != Code.C205_CONTENT) { observersStore.remove(observeRequest); - LOGGER.info("[{}#{}] Removed observation relation, got reset", peerAddress, - separateResponse.getToken()); } - }); - - if (separateResponse.getCode() != Code.C205_CONTENT) { - observersStore.remove(observeRequest); + } catch (RuntimeException e) { + LOGGER.warn("Unexpected Exception when sending Notification(%s) to %s", peerAddress, + separateResponse.getToken(), e); } }