From 928e6bfb83aa054e72a13e28b40900c142cba6ef Mon Sep 17 00:00:00 2001 From: Kevin Schiffer Date: Tue, 5 Dec 2023 17:15:59 +0900 Subject: [PATCH 1/2] dev: Add timeout and better error handling to streams --- .../api/stream/subscribeToWebSocketStream.js | 171 +++++++++++------- 1 file changed, 101 insertions(+), 70 deletions(-) diff --git a/sdk/js/src/api/stream/subscribeToWebSocketStream.js b/sdk/js/src/api/stream/subscribeToWebSocketStream.js index 6587b42a49..3b0762118a 100644 --- a/sdk/js/src/api/stream/subscribeToWebSocketStream.js +++ b/sdk/js/src/api/stream/subscribeToWebSocketStream.js @@ -28,6 +28,7 @@ const initialListeners = Object.values(EVENTS).reduce((acc, curr) => ({ ...acc, * @param {object} payload - - The body of the initial request. * @param {string} baseUrl - The stream baseUrl. * @param {string} endpoint - The stream endpoint. + * @param {number} timeout - The timeout for the stream. * * @example * (async () => { @@ -51,7 +52,12 @@ const initialListeners = Object.values(EVENTS).reduce((acc, curr) => ({ ...acc, * @returns {object} The stream subscription object with the `on` function for * attaching listeners and the `close` function to close the stream. */ -export default async (payload, baseUrl, endpoint = '/console/internal/events/') => { +export default async ( + payload, + baseUrl, + endpoint = '/console/internal/events/', + timeout = 10000, +) => { const subscriptionId = Date.now() const subscriptionPayload = JSON.stringify({ type: MESSAGE_TYPES.SUBSCRIBE, @@ -65,84 +71,109 @@ export default async (payload, baseUrl, endpoint = '/console/internal/events/') let closeRequested = false const url = baseUrl + endpoint - await new Promise(async resolve => { - // Add the new subscription to the subscriptions object. - // Also add the resolver function to the subscription object to be able - // to resolve the promise after the subscription confirmation message. - subscriptions = { - ...subscriptions, - [subscriptionId]: { ...initialListeners, url, _resolver: resolve }, - } - - const token = new Token().get() - const tokenParsed = typeof token === 'function' ? (await token()).access_token : token - const baseUrlParsed = baseUrl.replace('http', 'ws') - - // Open up the WebSocket connection if it doesn't exist. - if (!wsInstances[url]) { - wsInstances[url] = new WebSocket(`${baseUrlParsed}${endpoint}`, [ - 'ttn.lorawan.v3.console.internal.events.v1', - `ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`, - ]) - - // Event listener for 'open' - wsInstances[url].addEventListener('open', () => { - wsInstances[url].send(subscriptionPayload) - }) - - // Broadcast connection errors to all listeners. - wsInstances[url].addEventListener('error', error => { - Object.values(subscriptions) - .filter(s => s.url === url) - .forEach(s => notify(s[EVENTS.ERROR], error)) - resolve() - }) + await Promise.race([ + new Promise(async (resolve, reject) => { + // Add the new subscription to the subscriptions object. + // Also add the resolver function to the subscription object to be able + // to resolve the promise after the subscription confirmation message. + subscriptions = { + ...subscriptions, + [subscriptionId]: { ...initialListeners, url, _resolver: resolve }, + } - // Event listener for 'close' - wsInstances[url].addEventListener('close', () => { - delete wsInstances[url] - }) + try { + const token = new Token().get() + const tokenParsed = typeof token === 'function' ? `${(await token()).access_token}` : token + const baseUrlParsed = baseUrl.replace('http', 'ws') + + // Open up the WebSocket connection if it doesn't exist. + if (!wsInstances[url]) { + wsInstances[url] = new WebSocket(`${baseUrlParsed}${endpoint}`, [ + 'ttn.lorawan.v3.console.internal.events.v1', + `ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`, + ]) + + // Event listener for 'open' + wsInstances[url].addEventListener('open', () => { + wsInstances[url].send(subscriptionPayload) + }) - // After the WebSocket connection is open, add the event listeners. - // Wait for the subscription confirmation message before resolving. - wsInstances[url].addEventListener('message', ({ data }) => { - const dataParsed = JSON.parse(data) - const listeners = subscriptions[dataParsed.id] + // Broadcast connection errors to all listeners. + wsInstances[url].addEventListener('error', error => { + Object.values(subscriptions) + .filter(s => s.url === url) + .forEach(s => notify(s[EVENTS.ERROR], new Error(error))) + // The error is an error event, but we should only throw proper errors. + // It has an optional error code that we could use to map to a proper error. + // However, the error codes are optional and not always used. + reject(new Error('Error in WebSocket connection')) + }) - if (!listeners) { - warn('Message received for closed or unknown subscription with ID', dataParsed.id) + // Event listener for 'close' + wsInstances[url].addEventListener('close', closeEvent => { + delete wsInstances[url] + Object.values(subscriptions) + .filter(s => s.url === url) + .forEach(s => notify(s[EVENTS.CLOSE], closeRequested)) - return - } + if (closeRequested) { + resolve() + } else { + reject( + new Error(`WebSocket connection closed unexpectedly with code ${closeEvent.code}`), + ) + } + }) - if (dataParsed.type === MESSAGE_TYPES.SUBSCRIBE) { - notify(listeners[EVENTS.OPEN]) - // Resolve the promise after the subscription confirmation message. - listeners._resolver() - } + // After the WebSocket connection is open, add the event listeners. + // Wait for the subscription confirmation message before resolving. + wsInstances[url].addEventListener('message', ({ data }) => { + const dataParsed = JSON.parse(data) + const listeners = subscriptions[dataParsed.id] - if (dataParsed.type === MESSAGE_TYPES.ERROR) { - notify(listeners[EVENTS.ERROR], dataParsed) - } + if (!listeners) { + warn('Message received for closed or unknown subscription with ID', dataParsed.id) - if (dataParsed.type === MESSAGE_TYPES.PUBLISH) { - notify(listeners[EVENTS.MESSAGE], dataParsed.event) - } + return + } + + if (dataParsed.type === MESSAGE_TYPES.SUBSCRIBE) { + notify(listeners[EVENTS.OPEN]) + // Resolve the promise after the subscription confirmation message. + listeners._resolver() + } + + if (dataParsed.type === MESSAGE_TYPES.ERROR) { + notify(listeners[EVENTS.ERROR], dataParsed) + } + + if (dataParsed.type === MESSAGE_TYPES.PUBLISH) { + notify(listeners[EVENTS.MESSAGE], dataParsed.event) + } - if (dataParsed.type === MESSAGE_TYPES.UNSUBSCRIBE) { - notify(listeners[EVENTS.CLOSE], closeRequested) - // Remove the subscription. - delete subscriptions[dataParsed.id] - if (!Object.values(subscriptions).some(s => s.url === url)) { - wsInstances[url].close() - } + if (dataParsed.type === MESSAGE_TYPES.UNSUBSCRIBE) { + notify(listeners[EVENTS.CLOSE], closeRequested) + // Remove the subscription. + delete subscriptions[dataParsed.id] + if (!Object.values(subscriptions).some(s => s.url === url)) { + wsInstances[url].close() + } + } + }) + } else if (wsInstances[url] && wsInstances[url].readyState === WebSocket.OPEN) { + // If the WebSocket connection is already open, only add the subscription. + wsInstances[url].send(subscriptionPayload) } - }) - } else if (wsInstances[url] && wsInstances[url].readyState === WebSocket.OPEN) { - // If the WebSocket connection is already open, only add the subscription. - wsInstances[url].send(subscriptionPayload) - } - }) + } catch (error) { + const err = error instanceof Error ? error : new Error(error) + Object.values(subscriptions) + .filter(s => s.url === url) + .forEach(s => notify(s[EVENTS.ERROR], err)) + reject(err) + } + }), + new Promise((resolve, reject) => setTimeout(() => reject(new Error('timeout')), timeout)), + ]) // Return an observer object with the `on` and `close` functions for // the current subscription. From 907f52f31dac2ea44c6db316e718825c1c003c44 Mon Sep 17 00:00:00 2001 From: Kevin Schiffer Date: Tue, 5 Dec 2023 17:16:32 +0900 Subject: [PATCH 2/2] console: Adapt event logic to websocket stream --- .../console/store/middleware/logics/events.js | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/pkg/webui/console/store/middleware/logics/events.js b/pkg/webui/console/store/middleware/logics/events.js index ef797fbd7b..fcf45003fc 100644 --- a/pkg/webui/console/store/middleware/logics/events.js +++ b/pkg/webui/console/store/middleware/logics/events.js @@ -1,4 +1,4 @@ -// Copyright © 2019 The Things Network Foundation, The Things Industries B.V. +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -145,6 +145,9 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { if (isUnauthenticatedError(error)) { // The user is no longer authenticated; reinitiate the auth flow // by refreshing the page. + // NOTE: As a result of the WebSocket refactor, the error shape is + // now very unspecific and authentication errors like before are + // not thrown anymore. This should be addressed eventually. window.location.reload() } else { dispatch(startEventsFailure(id, error)) @@ -174,21 +177,16 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { allow(action) }, process: ({ action }, dispatch, done) => { - if (channel) { - try { - channel.close() - } catch (error) { - if (isNetworkError(error) || isTimeoutError(action.payload)) { - // Set the connection status to `checking` to trigger connection checks - // and detect possible offline state. - dispatch(setStatusChecking()) + if (action.error) { + if (action.error?.message === 'timeout') { + // Set the connection status to `checking` to trigger connection checks + // and detect possible offline state. + dispatch(setStatusChecking()) - // In case of a network error, the connection could not be closed - // since the network connection is disrupted. We can regard this - // as equivalent to a closed connection. - return done() - } - throw error + // In case of a network error, the connection could not be closed + // since the network connection is disrupted. We can regard this + // as equivalent to a closed connection. + return done() } } done() @@ -245,6 +243,7 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { type: SET_CONNECTION_STATUS, process: ({ getState, action }, dispatch, done) => { const isOnline = action.payload.onlineStatus === ONLINE_STATUS.ONLINE + const isOffline = action.payload.onlineStatus === ONLINE_STATUS.OFFLINE if (isOnline) { const state = getState() @@ -268,6 +267,8 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { dispatch(dispatch(startEvents(ids))) } } + } else if (isOffline) { + // If the app went offline, close the event stream. } done()