Skip to content

Commit

Permalink
fixup! feat: implement exploreDirectory method
Browse files Browse the repository at this point in the history
  • Loading branch information
JKRhb committed Dec 22, 2023
1 parent 5eabcab commit 5b5871e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 5 deletions.
9 changes: 5 additions & 4 deletions lib/src/core/thing_discovery.dart
Original file line number Diff line number Diff line change
Expand Up @@ -414,17 +414,18 @@ class ThingDiscoveryProcess extends Stream<ThingDescription>
}) {
final streamSubscription = _thingDescriptionStream.listen(
onData,
onError: (error) {
onError: (error, stackTrace) {
if (error is Exception) {
_error = error;
// ignore: avoid_dynamic_calls
onError?.call(error, stackTrace);
}
// ignore: avoid_dynamic_calls
onError?.call(error);
},
onDone: () async {
onDone: () {
_done = true;
onDone?.call();
},
cancelOnError: cancelOnError,
);

_streamSubscription = streamSubscription;
Expand Down
30 changes: 29 additions & 1 deletion lib/src/core/wot.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
//
// SPDX-License-Identifier: BSD-3-Clause

import 'dart:async';

import '../../scripting_api.dart' as scripting_api;
import '../definitions/thing_description.dart';
import '../scripting_api/discovery/discovery_method.dart';
Expand Down Expand Up @@ -125,7 +127,33 @@ class WoT implements scripting_api.WoT {

final thingDescriptionStream = Stream.fromIterable(
rawThingDescriptions.whereType<Map<String, dynamic>>(),
).map(ThingDescription.fromJson);
).transform(
StreamTransformer<Map<String, dynamic>, ThingDescription>(
(stream, cancelOnError) {
final streamController = StreamController<ThingDescription>();

final streamSubscription = stream.listen(
(rawThingDescription) {
try {
streamController
.add(ThingDescription.fromJson(rawThingDescription));
} on Exception catch (exception) {
streamController.addError(exception);
}
},
onDone: streamController.close,
onError: streamController.addError,
cancelOnError: cancelOnError,
);
streamController
..onPause = streamSubscription.pause
..onResume = streamSubscription.resume
..onCancel = streamSubscription.cancel;

return streamController.stream.listen(null);
},
),
);

return ThingDiscoveryProcess(thingDescriptionStream, filter);
}
Expand Down

0 comments on commit 5b5871e

Please sign in to comment.