diff --git a/uat/README.md b/uat/README.md index 1f897cbfc..94cd96646 100644 --- a/uat/README.md +++ b/uat/README.md @@ -177,7 +177,11 @@ mvn javadoc:javadoc ``` The main html-file will be located in each module by path **target/site/apidocs/index.html** +See individual README.md files for Python and C clients. + ## Limitations MQTT clients based on IoT Device SDK for Java v2, mosquitto C, Paho Java, Paho Python do no provide API to get information from PUBREC/PUBREL/PUBCOMP packages used when messages published with QoS 2. -Not all features of MQTT v5.0 have been implemented in clients and are supported by gRPC proto and the control as was requested, these are not bugs but designed by requirement. \ No newline at end of file +Not all features of MQTT v5.0 have been implemented in clients and are supported by gRPC proto and the control as was requested, these are not bugs but designed by requirement. + +Discovery of Core device broker feature is implemented only in the client based on AWS IoT device SDK library. diff --git a/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/DiscoveryClient.java b/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/DiscoveryClient.java new file mode 100644 index 000000000..c72ba3f8e --- /dev/null +++ b/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/DiscoveryClient.java @@ -0,0 +1,25 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.testing.mqtt5.client; + +import com.aws.greengrass.testing.mqtt.client.CoreDeviceDiscoveryReply; +import com.aws.greengrass.testing.mqtt.client.CoreDeviceDiscoveryRequest; +import com.aws.greengrass.testing.mqtt5.client.exceptions.DiscoveryException; + +/** + * Interface of discovery client. + */ +public interface DiscoveryClient { + + /** + * Does discovery of Core device broker. + * + * @param request the request + * @return formatted gRPC response + * @throws DiscoveryException on errors + */ + CoreDeviceDiscoveryReply discoveryCoreDevice(CoreDeviceDiscoveryRequest request) throws DiscoveryException; +} diff --git a/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/GRPCLink.java b/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/GRPCLink.java index 507c021bf..4ded3d6af 100644 --- a/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/GRPCLink.java +++ b/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/GRPCLink.java @@ -16,11 +16,13 @@ public interface GRPCLink { * Handle all gRPC requests received from control. * * @param mqttLib MQTT library + * @param discoveryClient the discovery client * @return shutdown reason as received from control or null * @throws GRPCException on errors * @throws InterruptedException when thread has been interrupted */ - String handleRequests(@NonNull MqttLib mqttLib) throws GRPCException, InterruptedException; + String handleRequests(@NonNull MqttLib mqttLib, @NonNull DiscoveryClient discoveryClient) + throws GRPCException, InterruptedException; /** * Unregister agent from control. diff --git a/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/Main.java b/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/Main.java index f382beaf9..f8c4e8fa0 100644 --- a/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/Main.java +++ b/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/Main.java @@ -5,6 +5,7 @@ package com.aws.greengrass.testing.mqtt5.client; +import com.aws.greengrass.testing.mqtt5.client.discover.DiscoveryClientImpl; import com.aws.greengrass.testing.mqtt5.client.exceptions.ClientException; import com.aws.greengrass.testing.mqtt5.client.grpc.GRPCLibImpl; import com.aws.greengrass.testing.mqtt5.client.sdkmqtt.MqttLibImpl; @@ -107,7 +108,7 @@ private static void doAll(String... args) throws Exception { GRPCLink link = gprcLib.makeLink(arguments.getAgentId(), arguments.getHosts(), arguments.getPort()); try (MqttLib mqttLib = new MqttLibImpl()) { - String reason = link.handleRequests(mqttLib); + String reason = link.handleRequests(mqttLib, new DiscoveryClientImpl()); link.shutdown(reason); } } diff --git a/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/discover/DiscoveryClientImpl.java b/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/discover/DiscoveryClientImpl.java new file mode 100644 index 000000000..b830477ce --- /dev/null +++ b/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/discover/DiscoveryClientImpl.java @@ -0,0 +1,94 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.testing.mqtt5.client.discover; + +import com.aws.greengrass.testing.mqtt.client.CoreDeviceConnectivityInfo; +import com.aws.greengrass.testing.mqtt.client.CoreDeviceDiscoveryReply; +import com.aws.greengrass.testing.mqtt.client.CoreDeviceDiscoveryRequest; +import com.aws.greengrass.testing.mqtt.client.CoreDeviceGroup; +import com.aws.greengrass.testing.mqtt5.client.exceptions.DiscoveryException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import software.amazon.awssdk.crt.io.SocketOptions; +import software.amazon.awssdk.crt.io.TlsContextOptions; +import software.amazon.awssdk.iot.discovery.DiscoveryClient; +import software.amazon.awssdk.iot.discovery.DiscoveryClientConfig; +import software.amazon.awssdk.iot.discovery.model.ConnectivityInfo; +import software.amazon.awssdk.iot.discovery.model.DiscoverResponse; +import software.amazon.awssdk.iot.discovery.model.GGCore; +import software.amazon.awssdk.iot.discovery.model.GGGroup; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Implementation of discovery client. + */ +public class DiscoveryClientImpl implements com.aws.greengrass.testing.mqtt5.client.DiscoveryClient { + private static final Logger logger = LogManager.getLogger(DiscoveryClientImpl.class); + + @Override + public CoreDeviceDiscoveryReply discoveryCoreDevice(CoreDeviceDiscoveryRequest request) + throws DiscoveryException { + try (SocketOptions socketOptions = new SocketOptions(); + TlsContextOptions tlsOptions = TlsContextOptions.createWithMtls(request.getCert(), request.getKey()) + .withCertificateAuthority(request.getCa()) + .withAlpnList(DiscoveryClient.TLS_EXT_ALPN); + DiscoveryClientConfig config = new DiscoveryClientConfig(tlsOptions, socketOptions, request.getRegion(), + 1, null); + DiscoveryClient client = new DiscoveryClient(config)) { + CompletableFuture discoverFuture = client.discover(request.getThingName()); + try { + DiscoverResponse response = discoverFuture.get(request.getTimeout(), TimeUnit.SECONDS); + return convertResponseToReply(response); + } catch (InterruptedException | ExecutionException | TimeoutException ex) { + logger.atError().withThrowable(ex).log("Failed during discover"); + throw new DiscoveryException("Could not do discovery", ex); + } + } + } + + private CoreDeviceDiscoveryReply convertResponseToReply(final DiscoverResponse response) + throws DiscoveryException { + if (response == null) { + throw new DiscoveryException("Discovery response is missing"); + } + + final List groups = response.getGGGroups(); + if (groups == null || groups.isEmpty() || groups.get(0) == null) { + throw new DiscoveryException("Groups are missing in discovery response"); + } + + CoreDeviceDiscoveryReply.Builder builder = CoreDeviceDiscoveryReply.newBuilder(); + for (final GGGroup group : groups) { + List ca = group.getCAs(); + logger.atInfo().log("Discovered groupId {} with {} CA", group.getGGGroupId(), ca.size()); + CoreDeviceGroup.Builder groupBuiler = CoreDeviceGroup.newBuilder(); + groupBuiler.addAllCaList(ca); + + for (final GGCore core : group.getCores()) { + logger.atInfo().log("Discovered Core with thing Arn {}", core.getThingArn()); + for (final ConnectivityInfo ci : core.getConnectivity()) { + logger.atInfo().log("Discovered connectivity info: id {} host {} port {}", ci.getId(), + ci.getHostAddress(), ci.getPortNumber()); + + CoreDeviceConnectivityInfo cdc = CoreDeviceConnectivityInfo.newBuilder() + .setHost(ci.getHostAddress()) + .setPort(ci.getPortNumber()) + .build(); + + groupBuiler.addConnectivityInfoList(cdc); + } + } + builder.addGroupList(groupBuiler.build()); + } + + return builder.build(); + } +} diff --git a/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/exceptions/DiscoveryException.java b/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/exceptions/DiscoveryException.java new file mode 100644 index 000000000..8ff5f63ec --- /dev/null +++ b/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/exceptions/DiscoveryException.java @@ -0,0 +1,33 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.testing.mqtt5.client.exceptions; + +/** + * Client's exception related to discovery parts. + */ +public class DiscoveryException extends ClientException { + private static final long serialVersionUID = -2081564070408021325L; + + public DiscoveryException() { + super(); + } + + public DiscoveryException(String message) { + super(message); + } + + public DiscoveryException(String message, Throwable cause) { + super(message, cause); + } + + public DiscoveryException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + + public DiscoveryException(Throwable cause) { + super(cause); + } +} diff --git a/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCControlServer.java b/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCControlServer.java index bbf5a888d..e4f26c038 100644 --- a/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCControlServer.java +++ b/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCControlServer.java @@ -5,6 +5,8 @@ package com.aws.greengrass.testing.mqtt5.client.grpc; +import com.aws.greengrass.testing.mqtt.client.CoreDeviceDiscoveryReply; +import com.aws.greengrass.testing.mqtt.client.CoreDeviceDiscoveryRequest; import com.aws.greengrass.testing.mqtt.client.Empty; import com.aws.greengrass.testing.mqtt.client.Mqtt5ConnAck; import com.aws.greengrass.testing.mqtt.client.Mqtt5Message; @@ -23,9 +25,11 @@ import com.aws.greengrass.testing.mqtt.client.MqttUnsubscribeRequest; import com.aws.greengrass.testing.mqtt.client.ShutdownRequest; import com.aws.greengrass.testing.mqtt.client.TLSSettings; +import com.aws.greengrass.testing.mqtt5.client.DiscoveryClient; import com.aws.greengrass.testing.mqtt5.client.GRPCClient; import com.aws.greengrass.testing.mqtt5.client.MqttConnection; import com.aws.greengrass.testing.mqtt5.client.MqttLib; +import com.aws.greengrass.testing.mqtt5.client.exceptions.DiscoveryException; import com.aws.greengrass.testing.mqtt5.client.exceptions.MqttException; import io.grpc.Grpc; import io.grpc.InsecureServerCredentials; @@ -49,6 +53,8 @@ class GRPCControlServer { private static final String CONNECTION_WITH_DOES_NOT_FOUND = "connection with id {} doesn't found"; private static final String CONNECTION_DOES_NOT_FOUND = "connection doesn't found"; + private static final String EMPTY_CERTIFICATE = "empty certificate"; + private static final String EMPTY_PRIVATE_KEY = "empty private key"; private static final int TIMEOUT_MIN = 1; @@ -77,6 +83,7 @@ class GRPCControlServer { private final int boundPort; private MqttLib mqttLib; + private DiscoveryClient discoveryClient; private String shutdownReason; @@ -201,18 +208,18 @@ public void createMqttConnection(MqttConnectRequest request, String cert = tls.getCert(); if (cert == null || cert.isEmpty()) { - logger.atWarn().log("empty certificate"); + logger.atWarn().log(EMPTY_CERTIFICATE); responseObserver.onError(Status.INVALID_ARGUMENT - .withDescription("empty certificate") + .withDescription(EMPTY_CERTIFICATE) .asRuntimeException()); return; } String key = tls.getKey(); if (key == null || key.isEmpty()) { - logger.atWarn().log("empty private key"); + logger.atWarn().log(EMPTY_PRIVATE_KEY); responseObserver.onError(Status.INVALID_ARGUMENT - .withDescription("empty private key") + .withDescription(EMPTY_PRIVATE_KEY) .asRuntimeException()); return; } @@ -589,6 +596,82 @@ public void unsubscribeMqtt(MqttUnsubscribeRequest request, responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } + + /** + * Handler of DiscoveryCoreDevice gRPC call. + * + * @param request incoming request + * @param responseObserver response control + */ + @Override + public void discoveryCoreDevice(CoreDeviceDiscoveryRequest request, + StreamObserver responseObserver) { + int timeout = request.getTimeout(); + if (timeout < TIMEOUT_MIN) { + logger.atWarn().log("invalid unsubscribe timeout {}, must be >= {}", timeout, TIMEOUT_MIN); + responseObserver.onError(Status.INVALID_ARGUMENT + .withDescription("invalid unsubscribe timeout, must be >= 1") + .asRuntimeException()); + return; + } + + final String ca = request.getCa(); + if (ca == null || ca.isEmpty()) { + logger.atWarn().log("empty CA"); + responseObserver.onError(Status.INVALID_ARGUMENT + .withDescription("empty CA") + .asRuntimeException()); + return; + } + + final String cert = request.getCert(); + if (cert == null || cert.isEmpty()) { + logger.atWarn().log(EMPTY_CERTIFICATE); + responseObserver.onError(Status.INVALID_ARGUMENT + .withDescription(EMPTY_CERTIFICATE) + .asRuntimeException()); + return; + } + + final String key = request.getKey(); + if (key == null || key.isEmpty()) { + logger.atWarn().log(EMPTY_PRIVATE_KEY); + responseObserver.onError(Status.INVALID_ARGUMENT + .withDescription(EMPTY_PRIVATE_KEY) + .asRuntimeException()); + return; + } + + final String thingName = request.getThingName(); + if (thingName == null || thingName.isEmpty()) { + logger.atWarn().log("empty thing name"); + responseObserver.onError(Status.INVALID_ARGUMENT + .withDescription("empty thing name") + .asRuntimeException()); + return; + } + + final String region = request.getRegion(); + if (region == null || region.isEmpty()) { + logger.atWarn().log("empty region"); + responseObserver.onError(Status.INVALID_ARGUMENT + .withDescription("empty region") + .asRuntimeException()); + return; + } + + CoreDeviceDiscoveryReply reply; + try { + reply = discoveryClient.discoveryCoreDevice(request); + } catch (DiscoveryException ex) { + logger.atError().withThrowable(ex).log("exception during discovery"); + responseObserver.onError(ex); + return; + } + + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } } /** @@ -636,8 +719,9 @@ public String getShutdownReason() { * * @param mqttLib reference to MQTT side of the client to handler incoming requests */ - public void waiting(MqttLib mqttLib) throws InterruptedException { + public void waiting(MqttLib mqttLib, DiscoveryClient discoveryClient) throws InterruptedException { this.mqttLib = mqttLib; + this.discoveryClient = discoveryClient; logger.atInfo().log("Server awaitTermination"); server.awaitTermination(); logger.atInfo().log("Server awaitTermination done"); diff --git a/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCLinkImpl.java b/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCLinkImpl.java index 87338cb55..992607c5e 100644 --- a/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCLinkImpl.java +++ b/uat/custom-components/client-java-sdk/src/main/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCLinkImpl.java @@ -5,6 +5,7 @@ package com.aws.greengrass.testing.mqtt5.client.grpc; +import com.aws.greengrass.testing.mqtt5.client.DiscoveryClient; import com.aws.greengrass.testing.mqtt5.client.GRPCLink; import com.aws.greengrass.testing.mqtt5.client.MqttLib; import com.aws.greengrass.testing.mqtt5.client.exceptions.GRPCException; @@ -92,9 +93,10 @@ public GRPCControlServer newServer(@NonNull GRPCDiscoveryClient client, @NonNull } @Override - public String handleRequests(@NonNull MqttLib mqttLib) throws GRPCException, InterruptedException { + public String handleRequests(@NonNull MqttLib mqttLib, @NonNull DiscoveryClient discoveryClient) + throws GRPCException, InterruptedException { logger.atInfo().log("Handle gRPC requests"); - server.waiting(mqttLib); + server.waiting(mqttLib, discoveryClient); return "Agent shutdown by OTF request '" + server.getShutdownReason() + "'"; } diff --git a/uat/custom-components/client-java-sdk/src/test/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCLinkImplTest.java b/uat/custom-components/client-java-sdk/src/test/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCLinkImplTest.java index 8f63e1ac3..d45d00dd0 100644 --- a/uat/custom-components/client-java-sdk/src/test/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCLinkImplTest.java +++ b/uat/custom-components/client-java-sdk/src/test/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCLinkImplTest.java @@ -5,6 +5,7 @@ package com.aws.greengrass.testing.mqtt5.client.grpc; +import com.aws.greengrass.testing.mqtt5.client.DiscoveryClient; import com.aws.greengrass.testing.mqtt5.client.MqttLib; import com.aws.greengrass.testing.mqtt5.client.exceptions.GRPCException; import org.junit.jupiter.api.extension.ExtendWith; @@ -68,13 +69,14 @@ void GIVEN_link_WHEN_handle_requests_THEN_server_is_called_correct_reason_return when(server.getShutdownReason()).thenReturn(reason); final MqttLib mqttLib = mock(MqttLib.class); + final DiscoveryClient discoveryClient = mock(DiscoveryClient.class); // WHEN - String shutdownReason = gRPCLinkImpl.handleRequests(mqttLib); + String shutdownReason = gRPCLinkImpl.handleRequests(mqttLib, discoveryClient); // THEN assertEquals(expectedShutdownReason, shutdownReason); - verify(server).waiting(eq(mqttLib)); + verify(server).waiting(eq(mqttLib), eq(discoveryClient)); } @Test diff --git a/uat/custom-components/client-mosquitto-c/README.md b/uat/custom-components/client-mosquitto-c/README.md index 6fc6551f7..c4b8fe420 100644 --- a/uat/custom-components/client-mosquitto-c/README.md +++ b/uat/custom-components/client-mosquitto-c/README.md @@ -4,7 +4,7 @@ MQTT 3.1.1/5.0 client for tests based on C mosquitto library ## Install requirements for native build ```bash -sudo apt-get install -y build-essential gcc cmake git autoconf libtool pkg-config libmosquitto-dev +sudo apt-get install -y build-essential gcc cmake git autoconf libtool pkg-config libmosquitto-dev doxygen ``` Note: required version 2.0 or above of mosquitto @@ -68,3 +68,19 @@ In Mosquitto API mosquitto_unsubscribe_v5_callback_set() callback does not provi 4. Windows implementation mosquitto library can build on Windows but miss threaded interface and can works only in synchronous mode. That brokes logic of gRPC requests and client control. + + +## Generate documentation + +Install doxygen + +```bash +apt-get install -y doxygen +``` + +Run command +```bash +doxygen +``` + +Directory "docs" with the html and latex documentation will be generated. diff --git a/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/api/AgentControl.java b/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/api/AgentControl.java index c010adf06..e2e24379f 100644 --- a/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/api/AgentControl.java +++ b/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/api/AgentControl.java @@ -5,6 +5,8 @@ package com.aws.greengrass.testing.mqtt.client.control.api; +import com.aws.greengrass.testing.mqtt.client.CoreDeviceDiscoveryReply; +import com.aws.greengrass.testing.mqtt.client.CoreDeviceDiscoveryRequest; import com.aws.greengrass.testing.mqtt.client.Mqtt5Disconnect; import com.aws.greengrass.testing.mqtt.client.Mqtt5Message; import com.aws.greengrass.testing.mqtt.client.MqttConnectRequest; @@ -94,4 +96,14 @@ ConnectionControl createMqttConnection(@NonNull MqttConnectRequest connectReques * @throws StatusRuntimeException on errors */ void shutdownAgent(String reason); + + + /** + * Do discovery of Core device broker. + * + * @param discoveryRequest the request with clients name and credentials + * @return the reply with connectivity information of IoT Core device broker + * @throws StatusRuntimeException on errors + */ + CoreDeviceDiscoveryReply discoveryCoreDevice(@NonNull CoreDeviceDiscoveryRequest discoveryRequest); } diff --git a/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/implementation/AgentControlImpl.java b/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/implementation/AgentControlImpl.java index 2b58261ce..570731c25 100644 --- a/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/implementation/AgentControlImpl.java +++ b/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/implementation/AgentControlImpl.java @@ -5,6 +5,8 @@ package com.aws.greengrass.testing.mqtt.client.control.implementation; +import com.aws.greengrass.testing.mqtt.client.CoreDeviceDiscoveryReply; +import com.aws.greengrass.testing.mqtt.client.CoreDeviceDiscoveryRequest; import com.aws.greengrass.testing.mqtt.client.Mqtt5ConnAck; import com.aws.greengrass.testing.mqtt.client.Mqtt5Disconnect; import com.aws.greengrass.testing.mqtt.client.Mqtt5Message; @@ -200,6 +202,13 @@ public ConnectionControl createMqttConnection(@NonNull MqttConnectRequest connec return connectionControl; } + @Override + public CoreDeviceDiscoveryReply discoveryCoreDevice(@NonNull CoreDeviceDiscoveryRequest discoveryRequest) { + CoreDeviceDiscoveryReply reply = blockingStub.discoveryCoreDevice(discoveryRequest); + logger.atInfo().log("discoveryCoreDevice: found {} groups", reply.getGroupListCount()); + return reply; + } + /** * Checks is agent receives request on that gRPC server address. * diff --git a/uat/proto/mqtt_client_control.proto b/uat/proto/mqtt_client_control.proto index 6a97f6079..3d87b997d 100644 --- a/uat/proto/mqtt_client_control.proto +++ b/uat/proto/mqtt_client_control.proto @@ -138,6 +138,9 @@ service MqttClientControl { // publish MQTT message rpc PublishMqtt(MqttPublishRequest) returns (MqttPublishReply) {} + + // do Core device discovery + rpc DiscoveryCoreDevice(CoreDeviceDiscoveryRequest) returns (CoreDeviceDiscoveryReply) {} } // Versions of MQTT protocol, used for compatibility, only MQTT_PROTOCOL_V50 actually supported @@ -278,4 +281,32 @@ message MqttPublishReply { repeated Mqtt5Properties properties = 3; // MQTT v5.0 PUBACK user's properties } + +// Request to discovery Core device broker +message CoreDeviceDiscoveryRequest { + int32 timeout = 1; // request timeout in seconds + string ca = 2; // PEM formatted CA to verify IoT Core + string cert = 3; // client's certificate, PEM formatted + string key = 4; // client's private key + string thingName = 5; // client's thing name + string region = 6; // AWS region +} + +// Connectivity information of Core device broker +message CoreDeviceConnectivityInfo { + string host = 1; // host name or IP address of the broker + int32 port = 2; // port number of the broker +} + +// Connectivity information of Core device group +message CoreDeviceGroup { + repeated string caList = 1; // group's CA list + repeated CoreDeviceConnectivityInfo connectivityInfoList = 2; // connectivity information of Core Device broker +} + +// Response to discovery request +message CoreDeviceDiscoveryReply { + repeated CoreDeviceGroup groupList = 1; // device groups +} + // end of MQTT client contol part diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/steps/MqttControlSteps.java b/uat/testing-features/src/main/java/com/aws/greengrass/steps/MqttControlSteps.java index 6c6331cab..af7c38144 100644 --- a/uat/testing-features/src/main/java/com/aws/greengrass/steps/MqttControlSteps.java +++ b/uat/testing-features/src/main/java/com/aws/greengrass/steps/MqttControlSteps.java @@ -10,6 +10,10 @@ import com.aws.greengrass.testing.model.ScenarioContext; import com.aws.greengrass.testing.model.TestContext; import com.aws.greengrass.testing.modules.model.AWSResourcesContext; +import com.aws.greengrass.testing.mqtt.client.CoreDeviceConnectivityInfo; +import com.aws.greengrass.testing.mqtt.client.CoreDeviceDiscoveryReply; +import com.aws.greengrass.testing.mqtt.client.CoreDeviceDiscoveryRequest; +import com.aws.greengrass.testing.mqtt.client.CoreDeviceGroup; import com.aws.greengrass.testing.mqtt.client.Mqtt5Disconnect; import com.aws.greengrass.testing.mqtt.client.Mqtt5Message; import com.aws.greengrass.testing.mqtt.client.Mqtt5Properties; @@ -65,6 +69,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -81,6 +86,7 @@ public class MqttControlSteps { private static final String DEFAULT_CLIENT_DEVICE_POLICY_CONFIG = "/configs/iot/basic_client_device_policy.yaml"; private static final int DEFAULT_MQTT_TIMEOUT_SEC = 30; + private static final int DEFAULT_DISCOVERY_TIMEOUT_SEC = 30; private static final String MQTT_VERSION_311 = "v3"; private static final String MQTT_VERSION_50 = "v5"; @@ -723,7 +729,7 @@ public void createClientDevice(String clientDeviceId) throws IOException { * Creates MQTT connection. * * @param clientDeviceId the id of the device (thing name) as defined by user in scenario - * @param componentId the componentId of MQTT client + * @param componentId the componentId of MQTT agent * @param brokerId the id of broker, before must be discovered or added */ @And("I connect device {string} on {word} to {string}") @@ -735,7 +741,7 @@ public void connect(String clientDeviceId, String componentId, String brokerId) * Creates MQTT connection. * * @param clientDeviceId the id of the device (thing name) as defined by user in scenario - * @param componentId the componentId of MQTT client + * @param componentId the componentId of MQTT agent * @param brokerId the id of broker, before must be discovered or added * @param mqttVersion the MQTT version string */ @@ -812,7 +818,7 @@ public void renameConnection(String clientDeviceId, String newConnectionName) { * Try to create MQTT connection to broker and ensure connection has been failed. * * @param clientDeviceId the id of the device (thing name) as defined by user in scenario - * @param componentId the componentId of MQTT client + * @param componentId the componentId of MQTT agent * @param brokerId the id of broker, before must be discovered or added * @param mqttVersion the MQTT version string * @throws RuntimeException throws in fail case @@ -1268,14 +1274,15 @@ public void clearAnything() { * @param clientDeviceId user defined client device id * @throws ExecutionException thrown when future completed exceptionally * @throws InterruptedException thrown when the current thread was interrupted while waiting + * @throws TimeoutException thrown when request is not finished in time limit */ @And("I discover core device broker as {string} from {string} in OTF") public void discoverCoreDeviceBroker(String brokerId, String clientDeviceId) - throws ExecutionException, InterruptedException { + throws ExecutionException, InterruptedException, TimeoutException { final String clientDeviceThingName = getClientDeviceThingName(clientDeviceId); final IotThingSpec thingSpec = getClientDeviceThingSpec(clientDeviceThingName); - final String crt = thingSpec.resource() + final String cert = thingSpec.resource() .certificate() .certificatePem(); final String key = thingSpec.resource() @@ -1285,15 +1292,59 @@ public void discoverCoreDeviceBroker(String brokerId, String clientDeviceId) final String region = resourcesContext.region().toString(); final String ca = registrationContext.rootCA(); try (SocketOptions socketOptions = new SocketOptions(); - TlsContextOptions tlsOptions = TlsContextOptions.createWithMtls(crt, key) + TlsContextOptions tlsOptions = TlsContextOptions.createWithMtls(cert, key) .withCertificateAuthority(ca) .withAlpnList(TLS_EXT_ALPN); DiscoveryClientConfig config = new DiscoveryClientConfig(tlsOptions, socketOptions, region, 1, null); DiscoveryClient client = new DiscoveryClient(config)) { - processDiscoveryResponse(brokerId, client.discover(clientDeviceThingName).get()); + CompletableFuture discoverFuture = client.discover(clientDeviceThingName); + processDiscoveryResponse(brokerId, discoverFuture.get(DEFAULT_DISCOVERY_TIMEOUT_SEC, TimeUnit.SECONDS)); } } + /** + * Discover IoT core device broker on device. + * Note: That feature available only in SDK-based client + * + * @param brokerId the broker name in tests + * @param clientDeviceId the user defined client device id + * @param componentId the componentId of MQTT agent + * @throws ExecutionException thrown when future completed exceptionally + * @throws InterruptedException thrown when the current thread was interrupted while waiting + */ + @And("I discover core device broker as {string} from {string} on {string}") + public void discoverCoreDeviceBrokerOnDevice(String brokerId, String clientDeviceId, String componentId) + throws ExecutionException, InterruptedException { + final String clientDeviceThingName = getClientDeviceThingName(clientDeviceId); + + final IotThingSpec thingSpec = getClientDeviceThingSpec(clientDeviceThingName); + final String cert = thingSpec.resource() + .certificate() + .certificatePem(); + final String key = thingSpec.resource() + .certificate() + .keyPair() + .privateKey(); + final String region = resourcesContext.region().toString(); + final String ca = registrationContext.rootCA(); + + CoreDeviceDiscoveryRequest request = CoreDeviceDiscoveryRequest.newBuilder() + .setTimeout(DEFAULT_DISCOVERY_TIMEOUT_SEC) + .setCa(ca) + .setCert(cert) + .setKey(key) + .setThingName(clientDeviceThingName) + .setRegion(region) + .build(); + + // get agent control by componentId + AgentControl agentControl = getAgentControl(componentId); + + // do discovery on the agent + CoreDeviceDiscoveryReply reply = agentControl.discoveryCoreDevice(request); + processDiscoveryReply(brokerId, reply); + } + /** * Set up IoT core broker. * @@ -1438,7 +1489,7 @@ private void processDiscoveryResponse(String brokerId, DiscoverResponse response log.info("Core with thing Arn {}", core.getThingArn()); core.getConnectivity().stream().forEach(ci -> { log.info("Connectivity info: id {} host {} port {}", - ci. getId(), + ci.getId(), ci.getHostAddress(), ci.getPortNumber()); }); @@ -1461,6 +1512,31 @@ ci. getId(), mqttBrokers.setConnectivityInfo(brokerId, connectionInfos); } + private void processDiscoveryReply(String brokerId, CoreDeviceDiscoveryReply reply) { + if (reply == null) { + throw new IllegalStateException("Discovery reply is missing"); + } + + if (reply.getGroupListCount() < 1) { + throw new IllegalStateException("Groups are missing in discovery reply"); + } + + List connectionInfos = new ArrayList<>(); + for (CoreDeviceGroup group : reply.getGroupListList()) { + log.info("group with {} CA", group.getCaListCount()); + for (CoreDeviceConnectivityInfo ci : group.getConnectivityInfoListList()) { + log.info("Connectivity info: host {} port {}", ci.getHost(), ci.getPort()); + connectionInfos.add(new MqttBrokers.ConnectivityInfo( + ci.getHost(), + ci.getPort(), + group.getCaListList() + )); + } + } + + mqttBrokers.setConnectivityInfo(brokerId, connectionInfos); + } + private String getAgentId(String componentName) { return componentName; } diff --git a/uat/testing-features/src/main/resources/greengrass/features/ggmq-1.feature b/uat/testing-features/src/main/resources/greengrass/features/ggmq-1.feature index e6b0b2e8f..b3f691c15 100644 --- a/uat/testing-features/src/main/resources/greengrass/features/ggmq-1.feature +++ b/uat/testing-features/src/main/resources/greengrass/features/ggmq-1.feature @@ -2754,3 +2754,73 @@ Feature: GGMQ-1 Examples: | mqtt-v | name | agent | recipe | publish-status-nms | | v5 | paho-python | aws.greengrass.client.Mqtt5PythonPahoClient | client_python_paho.yaml | 0 | + + + @GGMQ-1-T103 + Scenario Outline: GGMQ-1-T103--: As a customer, I can discover Core device broker in AWS IoT device SDK-based client + When I create a Greengrass deployment with components + | aws.greengrass.clientdevices.Auth | LATEST | + | aws.greengrass.clientdevices.mqtt.EMQX | LATEST | + | aws.greengrass.clientdevices.IPDetector | LATEST | + | | classpath:/local-store/recipes/ | + And I create client device "clientDeviceTest" + When I associate "clientDeviceTest" with ggc + And I update my Greengrass deployment configuration, setting the component aws.greengrass.clientdevices.Auth configuration to: + """ +{ + "MERGE":{ + "deviceGroups":{ + "formatVersion":"2021-03-05", + "definitions":{ + "MyPermissiveDeviceGroup":{ + "selectionRule":"thingName: ${clientDeviceTest}", + "policyName":"MyPermissivePolicy" + } + }, + "policies":{ + "MyPermissivePolicy":{ + "AllowAll":{ + "statementDescription":"Allow client devices to perform all actions.", + "operations":[ + "*" + ], + "resources":[ + "*" + ] + } + } + } + } + } +} + """ + And I update my Greengrass deployment configuration, setting the component configuration to: + """ +{ + "MERGE":{ + "controlAddresses":"${mqttControlAddresses}", + "controlPort":"${mqttControlPort}" + } +} + """ + + And I deploy the Greengrass deployment configuration + Then the Greengrass deployment is COMPLETED on the device after 5 minutes + And the aws.greengrass.clientdevices.mqtt.EMQX log on the device contains the line "is running now!." within 1 minutes + + And I discover core device broker as "default_broker" from "clientDeviceTest" on "" + And I connect device "clientDeviceTest" on to "default_broker" using mqtt "" + + When I subscribe "clientDeviceTest" to "iot_data_0" with qos 0 and expect status "GRANTED_QOS_0" + When I publish from "clientDeviceTest" to "iot_data_0" with qos 0 and message "Test message0" + And message "Test message0" received on "clientDeviceTest" from "iot_data_0" topic within 10 seconds + + @mqtt3 @sdk-java + Examples: + | mqtt-v | name | agent | recipe | + | v3 | sdk-java | aws.greengrass.client.Mqtt5JavaSdkClient | client_java_sdk.yaml | + + @mqtt5 @sdk-java + Examples: + | mqtt-v | name | agent | recipe | + | v5 | sdk-java | aws.greengrass.client.Mqtt5JavaSdkClient | client_java_sdk.yaml |