From 9ff5152b89e09f80240c843f2dcdf86d9cae7a6f Mon Sep 17 00:00:00 2001 From: Jan Romann Date: Sat, 23 Dec 2023 01:08:59 +0100 Subject: [PATCH] fixup! feat: implement `exploreDirectory` method --- lib/src/core/wot.dart | 63 ++++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/lib/src/core/wot.dart b/lib/src/core/wot.dart index 289159c6..967caaa0 100644 --- a/lib/src/core/wot.dart +++ b/lib/src/core/wot.dart @@ -127,33 +127,7 @@ class WoT implements scripting_api.WoT { final thingDescriptionStream = Stream.fromIterable( rawThingDescriptions.whereType>(), - ).transform( - StreamTransformer, ThingDescription>( - (stream, cancelOnError) { - final streamController = StreamController(); - - final streamSubscription = stream.listen( - (rawThingDescription) { - try { - streamController - .add(ThingDescription.fromJson(rawThingDescription)); - } on Exception catch (exception) { - streamController.addError(exception); - } - }, - onDone: streamController.close, - onError: streamController.addError, - cancelOnError: cancelOnError, - ); - streamController - ..onPause = streamSubscription.pause - ..onResume = streamSubscription.resume - ..onCancel = streamSubscription.cancel; - - return streamController.stream.listen(null); - }, - ), - ); + ).toThingDescriptionStream(); return ThingDiscoveryProcess(thingDescriptionStream, filter); } @@ -179,3 +153,38 @@ extension _DirectoryValidationExtension on ThingDescription { atTypes.contains(type); } } + +extension _DirectoryTdDeserializationExtension on Stream> { + Stream toThingDescriptionStream() { + const streamTransformer = StreamTransformer(_transformerMethod); + + return transform(streamTransformer); + } + + static StreamSubscription _transformerMethod( + Stream> rawThingDescriptionStream, + bool cancelOnError, + ) { + final streamController = StreamController(); + + final streamSubscription = rawThingDescriptionStream.listen( + (rawThingDescription) { + try { + streamController.add(ThingDescription.fromJson(rawThingDescription)); + } on Exception catch (exception) { + streamController.addError(exception); + } + }, + onDone: streamController.close, + onError: streamController.addError, + cancelOnError: cancelOnError, + ); + + streamController + ..onPause = streamSubscription.pause + ..onResume = streamSubscription.resume + ..onCancel = streamSubscription.cancel; + + return streamController.stream.listen(null); + } +}