diff --git a/lib/src/core/implementation/thing_discovery.dart b/lib/src/core/implementation/thing_discovery.dart index e3c5dd07..1992039c 100644 --- a/lib/src/core/implementation/thing_discovery.dart +++ b/lib/src/core/implementation/thing_discovery.dart @@ -149,19 +149,33 @@ class ThingDiscovery extends Stream Stream _discoverWithCoreLinkFormat(Uri uri) async* { // TODO: Remove additional quotes once fixed in CoAP library - yield* _performCoreLinkFormatDiscovery('"wot.thing"', uri) - .map(_discoverDirectly) - .flatten(); + yield* _performCoreLinkFormatDiscovery('"wot.thing"', uri).transform( + StreamTransformer.fromBind( + (stream) async* { + await for (final uris in stream) { + final futures = uris.map(_servient.requestThingDescription); + yield* Stream.fromFutures(futures); + } + }, + ), + ); } Stream _discoverfromCoreResourceDirectory(Uri uri) async* { // TODO: Remove additional quotes once fixed in CoAP library yield* _performCoreLinkFormatDiscovery('"core.rd-lookup-res"', uri) - .map(_discoverWithCoreLinkFormat) - .flatten(); + .transform( + StreamTransformer.fromBind((stream) async* { + await for (final uris in stream) { + for (final uri in uris) { + yield* _discoverWithCoreLinkFormat(uri); + } + } + }), + ); } - Stream _performCoreLinkFormatDiscovery( + Stream> _performCoreLinkFormatDiscovery( String resourceType, Uri uri, ) async* { @@ -171,24 +185,13 @@ class ThingDiscovery extends Stream await for (final coreWebLink in client.discoverWithCoreLinkFormat(discoveryUri)) { - final Iterable parsedUris; - try { - parsedUris = await _filterCoreWebLinks(resourceType, coreWebLink); + final parsedUris = await _filterCoreWebLinks(resourceType, coreWebLink); + yield parsedUris.where(discoveredUris.add); } on Exception catch (exception) { yield* Stream.error(exception); continue; } - - for (final parsedUri in parsedUris) { - final uriAdded = discoveredUris.add(parsedUri); - - if (!uriAdded) { - continue; - } - - yield parsedUri; - } } }