From a31de5ca06890641342b9928f0fdbefe7be2bec9 Mon Sep 17 00:00:00 2001 From: Jan Romann Date: Wed, 27 Nov 2024 10:26:40 +0100 Subject: [PATCH] feat: add initial support for server-sent events --- lib/src/binding_http/http_client.dart | 18 ++++- lib/src/binding_http/http_client_factory.dart | 13 +--- lib/src/binding_http/http_subscription.dart | 66 +++++++++++++++++++ .../http_client_factory_test.dart | 37 +++-------- 4 files changed, 91 insertions(+), 43 deletions(-) create mode 100644 lib/src/binding_http/http_subscription.dart diff --git a/lib/src/binding_http/http_client.dart b/lib/src/binding_http/http_client.dart index 5be7e8a6..b8327bba 100644 --- a/lib/src/binding_http/http_client.dart +++ b/lib/src/binding_http/http_client.dart @@ -16,6 +16,7 @@ import "../../core.dart"; import "http_config.dart"; import "http_request_method.dart"; import "http_security_exception.dart"; +import "http_subscription.dart"; const _authorizationHeader = "Authorization"; @@ -306,13 +307,24 @@ final class HttpClient extends ProtocolClient @override Future subscribeResource( - Form form, { + AugmentedForm form, { required void Function(Content content) next, void Function(Exception error)? error, required void Function() complete, }) async { - // TODO(JKRhb): implement subscribeResource - throw UnimplementedError(); + if (form.subprotocol != "sse") { + throw const DartWotException( + "Only server-sent events are supported at the moment by dart_wot", + ); + } + + return HttpSseSubscription( + form, + complete, + next: next, + onError: error, + complete: complete, + ); } Future _sendDiscoveryRequest( diff --git a/lib/src/binding_http/http_client_factory.dart b/lib/src/binding_http/http_client_factory.dart index 779f4140..4aa86972 100644 --- a/lib/src/binding_http/http_client_factory.dart +++ b/lib/src/binding_http/http_client_factory.dart @@ -50,18 +50,7 @@ final class HttpClientFactory implements ProtocolClientFactory { @override bool supportsOperation(OperationType operationType, String? subprotocol) { - const unsupportedOperations = [ - OperationType.observeproperty, - OperationType.unobserveproperty, - OperationType.subscribeevent, - OperationType.unsubscribeevent, - ]; - - if (unsupportedOperations.contains(operationType)) { - return false; - } - - if (subprotocol != null) { + if (subprotocol != null && !["sse"].contains(subprotocol)) { return false; } diff --git a/lib/src/binding_http/http_subscription.dart b/lib/src/binding_http/http_subscription.dart new file mode 100644 index 00000000..db52243b --- /dev/null +++ b/lib/src/binding_http/http_subscription.dart @@ -0,0 +1,66 @@ +// Copyright 2024 Contributors to the Eclipse Foundation. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +// +// SPDX-License-Identifier: BSD-3-Clause + +import "dart:convert"; + +import "package:sse_channel/sse_channel.dart"; + +import "../../core.dart"; + +/// A [ProtocolSubscription] for supporting server-sent events. +final class HttpSseSubscription extends ProtocolSubscription { + /// Constructor + HttpSseSubscription( + AugmentedForm form, + super._complete, { + required void Function(Content content) next, + void Function(Exception error)? onError, + void Function()? complete, + }) : _active = true, + _sseChannel = SseChannel.connect(form.resolvedHref) { + _sseChannel.stream.listen( + (data) { + if (data is! String) { + return; + } + next( + Content(form.contentType, Stream.fromIterable([utf8.encode(data)])), + ); + }, + onError: (error) { + if (error is! Exception) { + return; + } + + onError?.call(error); + }, + onDone: complete, + ); + } + + final SseChannel _sseChannel; + + bool _active; + + @override + bool get active => _active; + + @override + Future stop({ + int? formIndex, + Map? uriVariables, + Object? data, + }) async { + if (!_active) { + return; + } + _active = false; + + await _sseChannel.sink.close(); + await super + .stop(formIndex: formIndex, uriVariables: uriVariables, data: data); + } +} diff --git a/test/binding_http/http_client_factory_test.dart b/test/binding_http/http_client_factory_test.dart index ce113dba..f1e3a959 100644 --- a/test/binding_http/http_client_factory_test.dart +++ b/test/binding_http/http_client_factory_test.dart @@ -13,48 +13,29 @@ void main() { test("indicate correctly whether an operation is supported", () { final httpClientFactory = HttpClientFactory(); - const observeOperations = [ - OperationType.observeproperty, - OperationType.unobserveproperty, - OperationType.subscribeevent, - OperationType.unsubscribeevent, - ]; - final otherOperations = OperationType.values - .where((operationType) => !observeOperations.contains(operationType)); - final testVector = [ ( - expectedResult: false, - operationTypes: observeOperations, + expectedResult: true, subprotocol: null, ), - ( - expectedResult: false, - operationTypes: observeOperations, - subprotocol: "foobar", - ), ( expectedResult: true, - operationTypes: otherOperations, - subprotocol: null, + subprotocol: "sse", ), ( expectedResult: false, - operationTypes: otherOperations, subprotocol: "foobar", ), ]; for (final testCase in testVector) { - for (final operationType in testCase.operationTypes) { - expect( - httpClientFactory.supportsOperation( - operationType, - testCase.subprotocol, - ), - testCase.expectedResult, - ); - } + expect( + httpClientFactory.supportsOperation( + OperationType.invokeaction, + testCase.subprotocol, + ), + testCase.expectedResult, + ); } }); });