Skip to content

Commit

Permalink
Merge pull request #6760 from TheThingsNetwork/fix/event-stream-recon…
Browse files Browse the repository at this point in the history
…nect-3

Fix race condition in stream opening
  • Loading branch information
PavelJankoski authored Dec 12, 2023
2 parents c0687cf + c4164c6 commit 291ac27
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
1 change: 1 addition & 0 deletions pkg/webui/console/store/middleware/logics/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => {
}),
createLogic({
type: SET_EVENT_FILTER,
debounce: 250,
process: async ({ action }, dispatch, done) => {
if (channel) {
try {
Expand Down
16 changes: 9 additions & 7 deletions sdk/js/src/api/stream/subscribeToWebSocketStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { notify, EVENTS, MESSAGE_TYPES } from './shared'
const wsInstances = {}
let subscriptions = {}
const initialListeners = Object.values(EVENTS).reduce((acc, curr) => ({ ...acc, [curr]: {} }), {})
let closeRequested = false

/**
* Opens a new stream.
Expand Down Expand Up @@ -68,7 +69,6 @@ export default async (
type: MESSAGE_TYPES.UNSUBSCRIBE,
id: subscriptionId,
})
let closeRequested = false
const url = baseUrl + endpoint

await Promise.race([
Expand All @@ -93,11 +93,6 @@ export default async (
`ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`,
])

// Event listener for 'open'
wsInstances[url].addEventListener('open', () => {
wsInstances[url].send(subscriptionPayload)
})

// Broadcast connection errors to all listeners.
wsInstances[url].addEventListener('error', error => {
Object.values(subscriptions)
Expand Down Expand Up @@ -163,9 +158,16 @@ export default async (
}
}
})
} else if (wsInstances[url] && wsInstances[url].readyState === WebSocket.OPEN) {
}

if (wsInstances[url] && wsInstances[url].readyState === WebSocket.OPEN) {
// If the WebSocket connection is already open, only add the subscription.
wsInstances[url].send(subscriptionPayload)
} else if (wsInstances[url] && wsInstances[url].readyState === WebSocket.CONNECTING) {
// Otherwise wait for the connection to open and then add the subscription.
wsInstances[url].addEventListener('open', () => {
wsInstances[url].send(subscriptionPayload)
})
}
} catch (error) {
const err = error instanceof Error ? error : new Error(error)
Expand Down

0 comments on commit 291ac27

Please sign in to comment.