diff --git a/lib/src/core/thing_discovery.dart b/lib/src/core/thing_discovery.dart index 6cac1e09..e0d74dfb 100644 --- a/lib/src/core/thing_discovery.dart +++ b/lib/src/core/thing_discovery.dart @@ -414,17 +414,18 @@ class ThingDiscoveryProcess extends Stream }) { final streamSubscription = _thingDescriptionStream.listen( onData, - onError: (error) { + onError: (error, stackTrace) { if (error is Exception) { _error = error; + // ignore: avoid_dynamic_calls + onError?.call(error, stackTrace); } - // ignore: avoid_dynamic_calls - onError?.call(error); }, - onDone: () async { + onDone: () { _done = true; onDone?.call(); }, + cancelOnError: cancelOnError, ); _streamSubscription = streamSubscription; diff --git a/lib/src/core/wot.dart b/lib/src/core/wot.dart index b754e7f4..289159c6 100644 --- a/lib/src/core/wot.dart +++ b/lib/src/core/wot.dart @@ -4,6 +4,8 @@ // // SPDX-License-Identifier: BSD-3-Clause +import 'dart:async'; + import '../../scripting_api.dart' as scripting_api; import '../definitions/thing_description.dart'; import '../scripting_api/discovery/discovery_method.dart'; @@ -125,7 +127,33 @@ class WoT implements scripting_api.WoT { final thingDescriptionStream = Stream.fromIterable( rawThingDescriptions.whereType>(), - ).map(ThingDescription.fromJson); + ).transform( + StreamTransformer, ThingDescription>( + (stream, cancelOnError) { + final streamController = StreamController(); + + final streamSubscription = stream.listen( + (rawThingDescription) { + try { + streamController + .add(ThingDescription.fromJson(rawThingDescription)); + } on Exception catch (exception) { + streamController.addError(exception); + } + }, + onDone: streamController.close, + onError: streamController.addError, + cancelOnError: cancelOnError, + ); + streamController + ..onPause = streamSubscription.pause + ..onResume = streamSubscription.resume + ..onCancel = streamSubscription.cancel; + + return streamController.stream.listen(null); + }, + ), + ); return ThingDiscoveryProcess(thingDescriptionStream, filter); }