From b4d6803b1fd023c700cd839f2eef2057a1bfa006 Mon Sep 17 00:00:00 2001 From: Jan Romann Date: Sun, 10 Dec 2023 21:08:39 +0100 Subject: [PATCH] feat: implement `exploreDirectory` method --- lib/src/core/thing_discovery.dart | 61 +++++++++++++++++++ lib/src/core/wot.dart | 29 ++++++++- .../discovery/thing_discovery.dart | 20 ++++++ lib/src/scripting_api/wot.dart | 8 +++ 4 files changed, 117 insertions(+), 1 deletion(-) diff --git a/lib/src/core/thing_discovery.dart b/lib/src/core/thing_discovery.dart index 28223b44..e00bc784 100644 --- a/lib/src/core/thing_discovery.dart +++ b/lib/src/core/thing_discovery.dart @@ -375,3 +375,64 @@ extension _FlatStreamExtension on Stream> { } } } + +/// Implemention of the [scripting_api.ThingDiscoveryProcess] interface. +class ThingDiscoveryProcess extends Stream + implements scripting_api.ThingDiscoveryProcess { + /// Constructs a new [ThingDiscoveryProcess]. + /// + /// Accepts an [Iterable] of [ThingDescription]s, which are filtered by an + /// optional [thingFilter]. + ThingDiscoveryProcess( + Stream thingDescriptions, + this.thingFilter, + ) { + _streamController.addStream(thingDescriptions); + } + + final _streamController = StreamController(); + + var _done = false; + + @override + bool get done => _done; + + Exception? _error; + + @override + Exception? get error => _error; + + @override + final scripting_api.ThingFilter? thingFilter; + + @override + StreamSubscription listen( + void Function(ThingDescription event)? onData, { + Function? onError, + void Function()? onDone, + bool? cancelOnError, + }) { + return _streamController.stream.listen( + onData, + onError: (error) { + if (error is Exception) { + _error = error; + } + // ignore: avoid_dynamic_calls + onError?.call(error); + }, + onDone: () { + _done = true; + onDone?.call(); + }, + ); + } + + @override + Future stop() async { + if (!_streamController.isClosed) { + await _streamController.close(); + } + _done = true; + } +} diff --git a/lib/src/core/wot.dart b/lib/src/core/wot.dart index 1bdf8fc0..843491c7 100644 --- a/lib/src/core/wot.dart +++ b/lib/src/core/wot.dart @@ -10,7 +10,8 @@ import '../scripting_api/discovery/discovery_method.dart'; import 'consumed_thing.dart'; import 'exposed_thing.dart'; import 'servient.dart'; -import 'thing_discovery.dart' show ThingDiscovery; +import 'thing_discovery.dart' + show DiscoveryException, ThingDiscovery, ThingDiscoveryProcess; /// This [Exception] is thrown if an error during the consumption of a /// [ThingDescription] occurs. @@ -95,4 +96,30 @@ class WoT implements scripting_api.WoT { Future requestThingDescription(Uri url) { return _servient.requestThingDescription(url); } + + @override + Future exploreDirectory( + Uri url, [ + scripting_api.ThingFilter? filter, + ]) async { + // TODO: Could also be moved to the Servient level + final directoryThingDescription = await requestThingDescription(url); + final consumedDirectoryThing = await consume(directoryThingDescription); + + final interactionOutput = + await consumedDirectoryThing.readProperty('things'); + final rawThingDescriptions = await interactionOutput.value(); + + if (rawThingDescriptions is! List) { + throw DiscoveryException( + 'Received an invalid output during Directory discovery.', + ); + } + + final thingDescriptionStream = Stream.fromIterable( + rawThingDescriptions.whereType>(), + ).map(ThingDescription.fromJson); + + return ThingDiscoveryProcess(thingDescriptionStream, filter); + } } diff --git a/lib/src/scripting_api/discovery/thing_discovery.dart b/lib/src/scripting_api/discovery/thing_discovery.dart index a50443d9..c5e75e49 100644 --- a/lib/src/scripting_api/discovery/thing_discovery.dart +++ b/lib/src/scripting_api/discovery/thing_discovery.dart @@ -22,3 +22,23 @@ abstract interface class ThingDiscovery implements Stream { /// Stops the discovery process. void stop(); } + +/// Provides the properties and methods controlling the discovery process, and +/// returning the results. +abstract interface class ThingDiscoveryProcess + implements Stream { + /// The [thingFilter] that is applied during the discovery process. + ThingFilter? get thingFilter; + + /// `true` if the discovery has been stopped or completed with no more results + /// to report. + bool get done; + + /// Represents the last error that occurred during the discovery process. + /// + /// Typically used for critical errors that stop discovery. + Exception? get error; + + /// Stops the discovery process. + void stop(); +} diff --git a/lib/src/scripting_api/wot.dart b/lib/src/scripting_api/wot.dart index 207e0723..ad51e806 100644 --- a/lib/src/scripting_api/wot.dart +++ b/lib/src/scripting_api/wot.dart @@ -34,6 +34,14 @@ abstract interface class WoT { /// Requests a [ThingDescription] from the given [url]. Future requestThingDescription(Uri url); + /// Starts the discovery process that given a TD Directory [url], will provide + /// [ThingDescription] objects for Thing Descriptions that match an optional + /// [filter] argument of type [ThingFilter]. + Future exploreDirectory( + Uri url, [ + ThingFilter? filter, + ]); + /// Discovers [ThingDescription]s from a given [url] using the specified /// [method]. ///