diff --git a/lib/core.dart b/lib/core.dart index a7afb1b7..52445bb6 100644 --- a/lib/core.dart +++ b/lib/core.dart @@ -9,8 +9,10 @@ /// runtime used for consuming, exposing, and discovering Things. library core; +// TODO(JKRhb): Reorganize top-level core package into smaller packages. export "src/core/definitions.dart"; export "src/core/exceptions.dart"; +export "src/core/extensions.dart"; export "src/core/implementation.dart"; export "src/core/protocol_interfaces.dart"; export "src/core/scripting_api.dart"; diff --git a/lib/src/binding_coap/coap_client.dart b/lib/src/binding_coap/coap_client.dart index 8d677cc6..a551f2fe 100644 --- a/lib/src/binding_coap/coap_client.dart +++ b/lib/src/binding_coap/coap_client.dart @@ -73,7 +73,8 @@ coap.PskCredentialsCallback? _createPskCallback( } /// A [ProtocolClient] for the Constrained Application Protocol (CoAP). -final class CoapClient extends ProtocolClient { +final class CoapClient extends ProtocolClient + with DirectDiscoverer, MulticastDiscoverer, CoreLinkFormatDiscoverer { /// Creates a new [CoapClient] based on an optional [CoapConfig]. CoapClient({ CoapConfig? coapConfig, @@ -446,59 +447,14 @@ final class CoapClient extends ProtocolClient { @override Future stop() async {} - Stream _discoverFromMulticast( - coap.CoapClient client, - Uri uri, - ) async* { - final streamController = StreamController(); - final multicastResponseHandler = coap.CoapMulticastResponseHandler( - (data) { - streamController.add(data.determineDiscoveryContent(uri.scheme)); - }, - onError: streamController.addError, - onDone: () async { - await streamController.close(); - }, - ); - - final content = _sendDiscoveryRequest( - uri, - coap.RequestMethod.get, - form: null, - accept: coap.CoapMediaType.applicationTdJson, - multicastResponseHandler: multicastResponseHandler, - ); - unawaited(content); - yield* streamController.stream; - } - - Stream _discoverFromUnicast( - coap.CoapClient client, - Uri uri, - ) async* { - yield await _sendDiscoveryRequest( - uri, - coap.RequestMethod.get, - form: null, - accept: coap.CoapMediaType.applicationTdJson, - ); - } - @override - Stream discoverDirectly( - Uri uri, { - bool disableMulticast = false, - }) async* { - final client = coap.CoapClient(uri); - - if (uri.isMulticastAddress) { - if (!disableMulticast) { - yield* _discoverFromMulticast(client, uri); - } - } else { - yield* _discoverFromUnicast(client, uri); - } - } + Future discoverDirectly(Uri uri) async => + _sendDiscoveryRequest( + uri, + coap.RequestMethod.get, + form: null, + accept: coap.CoapMediaType.applicationTdJson, + ); @override Stream discoverWithCoreLinkFormat(Uri uri) async* { @@ -506,7 +462,7 @@ final class CoapClient extends ProtocolClient { final streamController = StreamController(); // TODO: Replace once https://github.com/shamblett/coap/pull/129 is merged - if (uri.isMulticastAddress) { + if (uri.hasMulticastAddress) { multicastResponseHandler = coap.CoapMulticastResponseHandler( (data) { streamController.add(data.determineDiscoveryContent(uri.scheme)); @@ -526,7 +482,7 @@ final class CoapClient extends ProtocolClient { multicastResponseHandler: multicastResponseHandler, ); - if (uri.isMulticastAddress) { + if (uri.hasMulticastAddress) { yield* streamController.stream; } else { yield content; @@ -534,10 +490,26 @@ final class CoapClient extends ProtocolClient { } @override - Future requestThingDescription(Uri url) async => _sendRequest( - url, - coap.RequestMethod.get, - form: null, - accept: coap.CoapMediaType.applicationTdJson, - ); + Stream discoverViaMulticast(Uri uri) async* { + final streamController = StreamController(); + final multicastResponseHandler = coap.CoapMulticastResponseHandler( + (data) { + streamController.add(data.determineDiscoveryContent(uri.scheme)); + }, + onError: streamController.addError, + onDone: () async { + await streamController.close(); + }, + ); + + final content = _sendDiscoveryRequest( + uri, + coap.RequestMethod.get, + form: null, + accept: coap.CoapMediaType.applicationTdJson, + multicastResponseHandler: multicastResponseHandler, + ); + unawaited(content); + yield* streamController.stream; + } } diff --git a/lib/src/binding_coap/coap_extensions.dart b/lib/src/binding_coap/coap_extensions.dart index edb0a345..1a463c2b 100644 --- a/lib/src/binding_coap/coap_extensions.dart +++ b/lib/src/binding_coap/coap_extensions.dart @@ -4,7 +4,6 @@ // // SPDX-License-Identifier: BSD-3-Clause -import "dart:io"; import "dart:typed_data"; import "package:cbor/cbor.dart"; @@ -16,15 +15,6 @@ import "../../core.dart" hide PskCredentials; import "coap_binding_exception.dart"; import "coap_definitions.dart"; -/// Extension which makes it easier to handle [Uri]s containing -/// [InternetAddress]es. -extension InternetAddressMethods on Uri { - /// Checks whether the host of this [Uri] is a multicast [InternetAddress]. - bool get isMulticastAddress { - return InternetAddress.tryParse(host)?.isMulticast ?? false; - } -} - /// CoAP-specific extensions for the [AugmentedForm] class. extension CoapFormExtension on AugmentedForm { T? _obtainVocabularyTerm(String vocabularyTerm) { diff --git a/lib/src/binding_http/http_client.dart b/lib/src/binding_http/http_client.dart index 4c65f1f5..ce015589 100644 --- a/lib/src/binding_http/http_client.dart +++ b/lib/src/binding_http/http_client.dart @@ -36,7 +36,8 @@ const _authorizationHeader = "Authorization"; /// [RFC 7616]: https://datatracker.ietf.org/doc/html/rfc7616 /// [RFC 6750]: https://datatracker.ietf.org/doc/html/rfc6750 /// [`ComboSecurityScheme`]: https://w3c.github.io/wot-thing-description/#combosecurityscheme -final class HttpClient extends ProtocolClient { +final class HttpClient extends ProtocolClient + with DirectDiscoverer, CoreLinkFormatDiscoverer { /// Creates a new [HttpClient]. HttpClient({ AsyncClientSecurityCallback? basicCredentialsCallback, @@ -304,13 +305,13 @@ final class HttpClient extends ProtocolClient { } @override - Stream discoverDirectly( + Future discoverDirectly( Uri uri, { bool disableMulticast = false, - }) async* { + }) async { final request = Request(HttpRequestMethod.get.methodName, uri); - yield await _sendDiscoveryRequest( + return _sendDiscoveryRequest( request, acceptHeaderValue: "application/td+json", ); @@ -327,18 +328,4 @@ final class HttpClient extends ProtocolClient { yield encodedLinks; } - - @override - Future requestThingDescription(Uri url) async { - final request = Request(HttpRequestMethod.get.methodName, url); - const tdContentType = "application/td+json"; - request.headers["Accept"] = tdContentType; - - final response = await _client.send(request); - - return Content( - response.headers["Content-Type"] ?? tdContentType, - response.stream, - ); - } } diff --git a/lib/src/binding_mqtt/constants.dart b/lib/src/binding_mqtt/constants.dart index 21202500..5b60baf0 100644 --- a/lib/src/binding_mqtt/constants.dart +++ b/lib/src/binding_mqtt/constants.dart @@ -32,8 +32,8 @@ const mqttContextUri = "http://www.example.org/mqtt-binding#"; /// The default prefix used in MQTT-related compact URIs (CURIEs) in TDs. const defaultMqttPrefix = "mqv"; -/// Default timeout length used for reading properties and discovering TDs. -const defaultTimeout = Duration(seconds: 10); +/// Default timeout length used for reading properties. +const defaultReadTimeout = Duration(seconds: 10); /// Default duration MQTT connections are kept alive in seconds. const defaultKeepAlivePeriod = 20; @@ -43,9 +43,3 @@ const defaultKeepAlivePeriod = 20; /// /// Evaluates to `'application/octet-stream'. const defaultContentType = "application/octet-stream"; - -/// Content type used for the Content objects returned by discovery using MQTT. -/// -/// Evaluates to `application/td+json`. -// TODO: Should probably be redefined globally -const discoveryContentType = "application/td+json"; diff --git a/lib/src/binding_mqtt/mqtt_client.dart b/lib/src/binding_mqtt/mqtt_client.dart index 2a00d2aa..09c54ed4 100644 --- a/lib/src/binding_mqtt/mqtt_client.dart +++ b/lib/src/binding_mqtt/mqtt_client.dart @@ -21,7 +21,7 @@ import "mqtt_subscription.dart"; /// [ProtocolClient] for supporting the MQTT protocol. /// /// Currently, only MQTT version 3.1.1 is supported. -final class MqttClient extends ProtocolClient { +final class MqttClient extends ProtocolClient with MqttDiscoverer { /// Constructor. MqttClient({ MqttConfig? mqttConfig, @@ -199,22 +199,13 @@ final class MqttClient extends ProtocolClient { } @override - Stream discoverDirectly( - Uri uri, { - bool disableMulticast = false, + Stream performMqttDiscovery( + Uri brokerUri, { + required String discoveryTopic, + required String expectedContentType, + required Duration discoveryTimeout, }) async* { - final client = await _connect(uri, null); - const discoveryTopic = "wot/td/#"; - - final streamController = StreamController(); - - Timer( - _mqttConfig.discoveryTimeout, - () async { - client.disconnect(); - await streamController.close(); - }, - ); + final client = await _connect(brokerUri, null); // TODO: Revisit QoS value and subscription check if (client.subscribe(discoveryTopic, MqttQos.atLeastOnce) == null) { @@ -223,36 +214,29 @@ final class MqttClient extends ProtocolClient { ); } - client.updates?.listen( - (messages) { - for (final message in messages) { - final publishedMessage = message.payload as MqttPublishMessage; - final payload = publishedMessage.payload.message; - - streamController.add( - DiscoveryContent( - discoveryContentType, - Stream.value(payload), - uri, - ), - ); - } + final receivedMessageStream = client.updates; + if (receivedMessageStream == null) { + throw MqttBindingException( + "Subscription to topic $discoveryTopic failed", + ); + } + + Timer( + discoveryTimeout, + () async { + client.disconnect(); }, - cancelOnError: false, ); - yield* streamController.stream; - } - - @override - Stream discoverWithCoreLinkFormat(Uri uri) { - // TODO: implement discoverWithCoreLinkFormat - throw UnimplementedError(); - } + await for (final receivedMessageList in receivedMessageStream) { + for (final receivedMessage in receivedMessageList) { + final mqttMessage = receivedMessage.payload; + if (mqttMessage is MqttPublishMessage) { + final messagePayload = mqttMessage.payload.message; - @override - Future requestThingDescription(Uri url) { - // TODO: implement requestThingDescription - throw UnimplementedError(); + yield Content(expectedContentType, Stream.value(messagePayload)); + } + } + } } } diff --git a/lib/src/binding_mqtt/mqtt_config.dart b/lib/src/binding_mqtt/mqtt_config.dart index 4af38cde..39a3c892 100644 --- a/lib/src/binding_mqtt/mqtt_config.dart +++ b/lib/src/binding_mqtt/mqtt_config.dart @@ -14,9 +14,10 @@ import "constants.dart"; /// The default [QoS] values for the different operation types will be used if /// no Quality of Service is defined in the respective form. /// -/// If no [readTimeout] or [discoveryTimeout] is defined, a [defaultTimeout] of -/// 10 seconds will be used. Furthermore, the [keepAlivePeriod] defaults to 20 -/// seconds. +/// If no [readTimeout] is defined, a [defaultReadTimeout] of +/// 10 seconds will be used. +/// Furthermore, the [keepAlivePeriod] defaults to a [defaultKeepAlivePeriod] of +/// 20 seconds. class MqttConfig { /// Creates a new [MqttConfig] object. MqttConfig({ @@ -24,8 +25,7 @@ class MqttConfig { this.defaultWriteQoS = QoS.atMostOnce, this.defaultActionQoS = QoS.atMostOnce, this.defaultSubscribeQoS = QoS.atLeastOnce, - this.readTimeout = defaultTimeout, - this.discoveryTimeout = defaultTimeout, + this.readTimeout = defaultReadTimeout, this.keepAlivePeriod = defaultKeepAlivePeriod, }); @@ -50,11 +50,6 @@ class MqttConfig { /// If no value has been read until the timeout has expired, the operation /// will be canceled. final Duration readTimeout; - - /// Timeout value used for discovery using MQTT. - /// - /// The discovery process will be aborted once the timeout has expired. - final Duration discoveryTimeout; } /// Enum for indicating the default Quality of Service (QoS) that should be used diff --git a/lib/src/core/extensions.dart b/lib/src/core/extensions.dart new file mode 100644 index 00000000..2da88ca6 --- /dev/null +++ b/lib/src/core/extensions.dart @@ -0,0 +1,10 @@ +// Copyright 2024 Contributors to the Eclipse Foundation. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +// +// SPDX-License-Identifier: BSD-3-Clause + +/// Sub-library for extensions used by `dart_wot`. +library extensions; + +export "extensions/uri_extensions.dart"; diff --git a/lib/src/core/extensions/uri_extensions.dart b/lib/src/core/extensions/uri_extensions.dart new file mode 100644 index 00000000..3aa879fd --- /dev/null +++ b/lib/src/core/extensions/uri_extensions.dart @@ -0,0 +1,16 @@ +// Copyright 2024 Contributors to the Eclipse Foundation. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +// +// SPDX-License-Identifier: BSD-3-Clause + +import "dart:io"; + +/// Extension that makes it easier to handle [Uri]s which potentially contain +/// [InternetAddress]es. +extension InternetAddressMethodExtension on Uri { + /// Checks whether the host of this [Uri] is a multicast [InternetAddress]. + bool get hasMulticastAddress { + return InternetAddress.tryParse(host)?.isMulticast ?? false; + } +} diff --git a/lib/src/core/implementation/discovery/discovery_configuration.dart b/lib/src/core/implementation/discovery/discovery_configuration.dart index a4829f7f..e706326d 100644 --- a/lib/src/core/implementation/discovery/discovery_configuration.dart +++ b/lib/src/core/implementation/discovery/discovery_configuration.dart @@ -138,6 +138,45 @@ final class ExploreDirectoryConfiguration extends DiscoveryConfiguration { final int? limit; } +/// Experimental [DiscoveryConfiguration] that is used to perform discovery with +/// the MQTT protocol. +@experimental +final class MqttDiscoveryConfiguration extends DiscoveryConfiguration { + /// Instantiates a new [DiscoveryConfiguration] for MQTT. + const MqttDiscoveryConfiguration( + this.brokerUri, { + this.discoveryTopic = "wot/td/#", + this.expectedContentType = "application/td+json", + this.discoveryTimeout = const Duration(seconds: 5), + }); + + /// [Uri] of the broker the + final Uri brokerUri; + + /// The topic that will be used for performing the discovery process. + /// + /// If a wildcard topic is used, then the discovery process may return more + /// than one TD. + /// + /// Defaults to `wot/td/#`. + final String discoveryTopic; + + /// The Thing Description content type that is expected during the discovery + /// process. + /// + /// Data that is received during the discovery process that is not + /// deserializable using the content type provided here will be ignored. + /// + /// Defaults to `application/td+json`. + final String expectedContentType; + + /// Time period after which the MQTT discovery process is going to be + /// cancelled. + /// + /// Defaults to five seconds. + final Duration discoveryTimeout; +} + /// Base class for configuring discovery mechanisms that involve a two-step /// approach. /// diff --git a/lib/src/core/implementation/servient.dart b/lib/src/core/implementation/servient.dart index ae7f6b4d..ab7fd9df 100644 --- a/lib/src/core/implementation/servient.dart +++ b/lib/src/core/implementation/servient.dart @@ -280,9 +280,16 @@ class InternalServient implements Servient { /// Requests a [ThingDescription] from a [url]. Future requestThingDescription(Uri url) async { - final client = clientFor(url.scheme); - final content = await client.requestThingDescription(url); + final uriScheme = url.scheme; + final client = clientFor(uriScheme); + if (client is! DirectDiscoverer) { + throw DiscoveryException( + "Client with URI scheme $uriScheme does not support direct discovery.", + ); + } + + final content = await client.discoverDirectly(url); final dataSchemaValue = await contentSerdes.contentToValue(content, null); if (dataSchemaValue diff --git a/lib/src/core/implementation/thing_discovery.dart b/lib/src/core/implementation/thing_discovery.dart index 38c1d30c..5c916daf 100644 --- a/lib/src/core/implementation/thing_discovery.dart +++ b/lib/src/core/implementation/thing_discovery.dart @@ -12,6 +12,7 @@ import "package:multicast_dns/multicast_dns.dart"; import "../definitions.dart"; import "../exceptions.dart"; +import "../extensions.dart"; import "../protocol_interfaces.dart"; import "../scripting_api.dart" as scripting_api; @@ -63,13 +64,29 @@ class ThingDiscovery extends Stream ): yield* _discoverFromCoreResourceDirectory(uri, discoveryType); case DirectConfiguration(:final uri): - yield* Stream.fromFuture(_servient.requestThingDescription(uri)); + if (!uri.hasMulticastAddress) { + yield* Stream.fromFuture(_servient.requestThingDescription(uri)); + } else { + yield* _performMulticastDiscovery(uri); + } case ExploreDirectoryConfiguration(:final uri, :final thingFilter): final thingDiscoveryProcess = await _servient.exploreDirectory( uri, thingFilter: thingFilter, ); yield* thingDiscoveryProcess; + case MqttDiscoveryConfiguration( + :final brokerUri, + :final discoveryTopic, + :final expectedContentType, + :final discoveryTimeout, + ): + yield* _performMqttDiscovery( + brokerUri, + discoveryTopic, + expectedContentType, + discoveryTimeout, + ); } } } @@ -190,7 +207,18 @@ class ThingDiscovery extends Stream Uri uri, String resourceType, ) async* { - final client = _clientForUriScheme(uri.scheme); + final uriScheme = uri.scheme; + final client = _clientForUriScheme(uriScheme); + + if (client is! CoreLinkFormatDiscoverer) { + yield* Stream.error( + DiscoveryException( + "Client for URI scheme $uriScheme does not support Core Link Format " + "Discovery.", + ), + ); + return; + } await for (final coreWebLink in client.discoverWithCoreLinkFormat(uri)) { try { @@ -234,6 +262,34 @@ class ThingDiscovery extends Stream ); } + Stream _performMqttDiscovery( + Uri brokerUri, + String discoveryTopic, + String expectedContentType, + Duration discoveryTimeout, + ) async* { + final uriScheme = brokerUri.scheme; + final client = _clientForUriScheme(uriScheme); + + if (client is! MqttDiscoverer) { + yield* Stream.error( + DiscoveryException( + "Client for URI scheme $uriScheme does not support MQTT Discovery.", + ), + ); + return; + } + + final contentStream = client.performMqttDiscovery( + brokerUri, + discoveryTopic: discoveryTopic, + expectedContentType: expectedContentType, + discoveryTimeout: discoveryTimeout, + ); + + yield* _transformContentStreamToThingDescriptions(contentStream); + } + @override Future stop() async { final stopFutures = _clients.values.map((client) => client.stop()); @@ -251,7 +307,7 @@ class ThingDiscovery extends Stream if (dataSchemaValue is! scripting_api.DataSchemaValue) { throw DiscoveryException( - "Could not parse Thing Description obtained from $sourceUri", + "Could not parse CoRE web links obtained from $sourceUri", ); } @@ -319,6 +375,45 @@ class ThingDiscovery extends Stream return Map.fromEntries(recordsList); } + + Stream _performMulticastDiscovery(Uri uri) async* { + final client = _clientForUriScheme(uri.scheme); + + if (client is MulticastDiscoverer) { + final contentStream = client.discoverViaMulticast(uri); + yield* _transformContentStreamToThingDescriptions(contentStream); + } + } + + Stream _transformContentStreamToThingDescriptions( + Stream contentStream, + ) async* { + await for (final content in contentStream) { + try { + final thingDescription = + await _convertContentToThingDescription(content); + yield thingDescription; + } on Exception catch (exception) { + yield* Stream.error(exception); + } + } + } + + Future _convertContentToThingDescription( + Content content, + ) async { + final dataSchemaValue = + await _servient.contentSerdes.contentToValue(content, null); + + if (dataSchemaValue is scripting_api.ObjectValue) { + return dataSchemaValue.value.toThingDescription(); + } + + throw ValidationException( + "Encountered wrong datatype ${dataSchemaValue.runtimeType} that cannot " + "be processed as a Thing Description.", + ); + } } extension _CoreLinkFormatExtension on String { diff --git a/lib/src/core/protocol_interfaces.dart b/lib/src/core/protocol_interfaces.dart index 652d757c..8866a08b 100644 --- a/lib/src/core/protocol_interfaces.dart +++ b/lib/src/core/protocol_interfaces.dart @@ -6,5 +6,6 @@ export "protocol_interfaces/protocol_client.dart"; export "protocol_interfaces/protocol_client_factory.dart"; +export "protocol_interfaces/protocol_discoverer.dart"; export "protocol_interfaces/protocol_server.dart"; export "protocol_interfaces/protocol_subscription.dart"; diff --git a/lib/src/core/protocol_interfaces/protocol_client.dart b/lib/src/core/protocol_interfaces/protocol_client.dart index edb9cc3f..831dabc3 100644 --- a/lib/src/core/protocol_interfaces/protocol_client.dart +++ b/lib/src/core/protocol_interfaces/protocol_client.dart @@ -15,33 +15,6 @@ abstract base class ProtocolClient { /// Stops this [ProtocolClient]. Future stop(); - /// Discovers one or more Thing Descriptions from a [uri], returning a - /// [Stream] of [Content]. - /// - /// Allows the caller to explicitly [disableMulticast], overriding the - /// multicast settings in the config of the underlying binding implementation. - Stream discoverDirectly( - Uri uri, { - bool disableMulticast = false, - }); - - /// Discovers links using the CoRE Link Format (see [RFC 6690]) from a [uri], - /// encoded as a [Stream] of [Content]. - /// - /// This method will also be used for discovery from CoRE Resource - /// Directories ([RFC 9176]). - /// - /// If the [uri]'s path is empty, then `/.well-known/core` will be set as a - /// default value. - /// - /// Certain protocols (like CoAP) might also use multicast for this discovery - /// method if the underlying binding implementation supports it and if it is - /// activated in the config. - /// - /// [RFC 6690]: https://datatracker.ietf.org/doc/html/rfc6690 - /// [RFC 9176]: https://datatracker.ietf.org/doc/html/rfc9176 - Stream discoverWithCoreLinkFormat(Uri uri); - /// Requests the client to perform a `readproperty` operation on a [form]. Future readResource(AugmentedForm form); @@ -61,7 +34,4 @@ abstract base class ProtocolClient { void Function(Exception error)? error, required void Function() complete, }); - - /// Requests a Thing Description as [Content] from a [url]. - Future requestThingDescription(Uri url); } diff --git a/lib/src/core/protocol_interfaces/protocol_discoverer.dart b/lib/src/core/protocol_interfaces/protocol_discoverer.dart new file mode 100644 index 00000000..cc4dc787 --- /dev/null +++ b/lib/src/core/protocol_interfaces/protocol_discoverer.dart @@ -0,0 +1,66 @@ +import "package:meta/meta.dart"; + +import "../implementation/content.dart"; +import "protocol_client.dart"; + +/// Interface for a client that is able to [discoverDirectly], i.e. to retrieve +/// a Thing Description from a given [Uri] via unicast. +base mixin DirectDiscoverer on ProtocolClient { + /// Discovers one Thing Descriptions from a [uri], returning a + /// [Future] of [Content]. + Future discoverDirectly(Uri uri); +} + +/// Interface for a client that is able to [discoverViaMulticast], i.e. to +/// retrieve a [Stream] of Thing Descriptions from a given [Uri] via multicast. +base mixin MulticastDiscoverer on ProtocolClient { + /// Discovers a [Stream] Thing Descriptions from a [uri], returning a + /// [Stream] of [Content]. + /// + /// The host component of the [uri] has to be a multicast IP address, while + /// the protocol referenced by its [Uri.scheme] has to indicate that the + /// protocol itself also supports multicast. + /// Otherwise, an exception will be thrown. + Stream discoverViaMulticast(Uri uri); +} + +/// Interfaces for clients that support discovery via the CoRE Link Format. +@experimental +base mixin CoreLinkFormatDiscoverer on ProtocolClient { + /// Discovers links using the CoRE Link Format (see [RFC 6690]) from a [uri], + /// encoded as a [Stream] of [Content]. + /// + /// This method will also be used for discovery from CoRE Resource + /// Directories ([RFC 9176]). + /// + /// If the [uri]'s path is empty, then `/.well-known/core` will be set as a + /// default value. + /// + /// Certain protocols (like CoAP) might also use multicast for this discovery + /// method if the underlying binding implementation supports it and if it is + /// activated in the config. + /// + /// [RFC 6690]: https://datatracker.ietf.org/doc/html/rfc6690 + /// [RFC 9176]: https://datatracker.ietf.org/doc/html/rfc9176 + @experimental + Stream discoverWithCoreLinkFormat(Uri uri); +} + +/// Interface for performing experimental discovery using the MQTT protocol. +@experimental +base mixin MqttDiscoverer on ProtocolClient { + /// Performs discovery of Thing Descriptions using the MQTT protocol via the + /// given [brokerUri]. + /// + /// By default, the [discoveryTopic] `wot/td/#` will be used as discussed in + /// [this issue]. + /// + /// [this issue]: https://github.com/w3c/wot-discovery/issues/134 + @experimental + Stream performMqttDiscovery( + Uri brokerUri, { + required String discoveryTopic, + required String expectedContentType, + required Duration discoveryTimeout, + }); +} diff --git a/test/core/discovery_test.dart b/test/core/discovery_test.dart index 3a0ca616..068a6a1a 100644 --- a/test/core/discovery_test.dart +++ b/test/core/discovery_test.dart @@ -138,13 +138,7 @@ const invalidTestThingDescription2 = ''' {"foo": "bar"} '''; -final class _MockedProtocolClient extends ProtocolClient { - @override - Stream discoverWithCoreLinkFormat(Uri uri) { - // TODO: implement discoverWithCoreLinkFormat - throw UnimplementedError(); - } - +final class _MockedProtocolClient extends ProtocolClient with DirectDiscoverer { @override Future invokeResource(Form form, Content content) { // TODO: implement invokeResource @@ -175,7 +169,7 @@ final class _MockedProtocolClient extends ProtocolClient { } @override - Future requestThingDescription(Uri url) async { + Future discoverDirectly(Uri url) async { if (url == validTestDiscoveryUri) { return validTestThingDescription.toDiscoveryContent(url); } @@ -229,15 +223,6 @@ final class _MockedProtocolClient extends ProtocolClient { // TODO: implement writeResource throw UnimplementedError(); } - - @override - Stream discoverDirectly( - Uri uri, { - bool disableMulticast = false, - }) { - // TODO: implement discoverDirectly - throw UnimplementedError(); - } } class _MockedProtocolClientFactory implements ProtocolClientFactory {