Skip to content

Commit

Permalink
feat: implement exploreDirectory method
Browse files Browse the repository at this point in the history
  • Loading branch information
JKRhb committed Dec 19, 2023
1 parent e9ca7ed commit b4d6803
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 1 deletion.
61 changes: 61 additions & 0 deletions lib/src/core/thing_discovery.dart
Original file line number Diff line number Diff line change
Expand Up @@ -375,3 +375,64 @@ extension _FlatStreamExtension<T> on Stream<Stream<T>> {
}
}
}

/// Implemention of the [scripting_api.ThingDiscoveryProcess] interface.
class ThingDiscoveryProcess extends Stream<ThingDescription>
implements scripting_api.ThingDiscoveryProcess {
/// Constructs a new [ThingDiscoveryProcess].
///
/// Accepts an [Iterable] of [ThingDescription]s, which are filtered by an
/// optional [thingFilter].
ThingDiscoveryProcess(
Stream<ThingDescription> thingDescriptions,
this.thingFilter,
) {
_streamController.addStream(thingDescriptions);
}

final _streamController = StreamController<ThingDescription>();

var _done = false;

@override
bool get done => _done;

Exception? _error;

@override
Exception? get error => _error;

@override
final scripting_api.ThingFilter? thingFilter;

@override
StreamSubscription<ThingDescription> listen(
void Function(ThingDescription event)? onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
}) {
return _streamController.stream.listen(
onData,
onError: (error) {
if (error is Exception) {
_error = error;
}
// ignore: avoid_dynamic_calls
onError?.call(error);
},
onDone: () {
_done = true;
onDone?.call();
},
);
}

@override
Future<void> stop() async {
if (!_streamController.isClosed) {
await _streamController.close();
}
_done = true;
}
}
29 changes: 28 additions & 1 deletion lib/src/core/wot.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import '../scripting_api/discovery/discovery_method.dart';
import 'consumed_thing.dart';
import 'exposed_thing.dart';
import 'servient.dart';
import 'thing_discovery.dart' show ThingDiscovery;
import 'thing_discovery.dart'
show DiscoveryException, ThingDiscovery, ThingDiscoveryProcess;

/// This [Exception] is thrown if an error during the consumption of a
/// [ThingDescription] occurs.
Expand Down Expand Up @@ -95,4 +96,30 @@ class WoT implements scripting_api.WoT {
Future<ThingDescription> requestThingDescription(Uri url) {
return _servient.requestThingDescription(url);
}

@override
Future<scripting_api.ThingDiscoveryProcess> exploreDirectory(
Uri url, [
scripting_api.ThingFilter? filter,
]) async {
// TODO: Could also be moved to the Servient level
final directoryThingDescription = await requestThingDescription(url);
final consumedDirectoryThing = await consume(directoryThingDescription);

final interactionOutput =
await consumedDirectoryThing.readProperty('things');
final rawThingDescriptions = await interactionOutput.value();

if (rawThingDescriptions is! List<dynamic>) {
throw DiscoveryException(
'Received an invalid output during Directory discovery.',
);
}

final thingDescriptionStream = Stream.fromIterable(
rawThingDescriptions.whereType<Map<String, dynamic>>(),
).map(ThingDescription.fromJson);

return ThingDiscoveryProcess(thingDescriptionStream, filter);
}
}
20 changes: 20 additions & 0 deletions lib/src/scripting_api/discovery/thing_discovery.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,23 @@ abstract interface class ThingDiscovery implements Stream<ThingDescription> {
/// Stops the discovery process.
void stop();
}

/// Provides the properties and methods controlling the discovery process, and
/// returning the results.
abstract interface class ThingDiscoveryProcess
implements Stream<ThingDescription> {
/// The [thingFilter] that is applied during the discovery process.
ThingFilter? get thingFilter;

/// `true` if the discovery has been stopped or completed with no more results
/// to report.
bool get done;

/// Represents the last error that occurred during the discovery process.
///
/// Typically used for critical errors that stop discovery.
Exception? get error;

/// Stops the discovery process.
void stop();
}
8 changes: 8 additions & 0 deletions lib/src/scripting_api/wot.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ abstract interface class WoT {
/// Requests a [ThingDescription] from the given [url].
Future<ThingDescription> requestThingDescription(Uri url);

/// Starts the discovery process that given a TD Directory [url], will provide
/// [ThingDescription] objects for Thing Descriptions that match an optional
/// [filter] argument of type [ThingFilter].
Future<ThingDiscoveryProcess> exploreDirectory(
Uri url, [
ThingFilter? filter,
]);

/// Discovers [ThingDescription]s from a given [url] using the specified
/// [method].
///
Expand Down

0 comments on commit b4d6803

Please sign in to comment.