Skip to content

Commit

Permalink
Merge pull request #6776 from TheThingsNetwork/fix/event-stream-recon…
Browse files Browse the repository at this point in the history
…nect-4

Fix event stream concurrency and state machine
  • Loading branch information
adriansmares authored Dec 18, 2023
2 parents df10f46 + e798ae7 commit edb7c34
Show file tree
Hide file tree
Showing 17 changed files with 515 additions and 293 deletions.
9 changes: 8 additions & 1 deletion pkg/console/internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ import (
"context"
"net/http"
"sync"
"time"

"github.com/gorilla/mux"
"go.thethings.network/lorawan-stack/v3/pkg/auth/rights"
"go.thethings.network/lorawan-stack/v3/pkg/config"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/middleware"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions"
"go.thethings.network/lorawan-stack/v3/pkg/errorcontext"
"go.thethings.network/lorawan-stack/v3/pkg/events"
"go.thethings.network/lorawan-stack/v3/pkg/log"
"go.thethings.network/lorawan-stack/v3/pkg/random"
"go.thethings.network/lorawan-stack/v3/pkg/ratelimit"
"go.thethings.network/lorawan-stack/v3/pkg/task"
"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
Expand All @@ -40,6 +43,9 @@ import (
const (
authorizationProtocolPrefix = "ttn.lorawan.v3.header.authorization.bearer."
protocolV1 = "ttn.lorawan.v3.console.internal.events.v1"

pingPeriod = time.Minute
pingJitter = 0.1
)

// Component is the interface of the component to the events API handler.
Expand Down Expand Up @@ -95,7 +101,7 @@ func (h *eventsHandler) handleEvents(w http.ResponseWriter, r *http.Request) {
}
defer conn.Close(websocket.StatusNormalClosure, "main task closed")

ctx, cancel := context.WithCancelCause(ctx)
ctx, cancel := errorcontext.New(ctx)
defer cancel(nil)

var wg sync.WaitGroup
Expand All @@ -108,6 +114,7 @@ func (h *eventsHandler) handleEvents(w http.ResponseWriter, r *http.Request) {
"console_events_mux": makeMuxTask(m, cancel),
"console_events_read": makeReadTask(conn, m, rateLimit, cancel),
"console_events_write": makeWriteTask(conn, m, cancel),
"console_events_ping": makePingTask(conn, cancel, random.Jitter(pingPeriod, pingJitter)),
} {
wg.Add(1)
h.component.StartTask(&task.Config{
Expand Down
7 changes: 6 additions & 1 deletion pkg/console/internal/events/eventsmux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package eventsmux
import (
"context"

"go.thethings.network/lorawan-stack/v3/pkg/auth/rights"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions"
"go.thethings.network/lorawan-stack/v3/pkg/errorcontext"
"go.thethings.network/lorawan-stack/v3/pkg/events"
"go.thethings.network/lorawan-stack/v3/pkg/log"
)
Expand Down Expand Up @@ -54,7 +56,7 @@ func (m *mux) Responses() <-chan protocol.Response {

// Run implements Interface.
func (m *mux) Run(ctx context.Context) (err error) {
ctx, cancel := context.WithCancelCause(ctx)
ctx, cancel := errorcontext.New(ctx)
defer func() { cancel(err) }()
subs := m.createSubs(ctx, cancel)
defer subs.Close()
Expand All @@ -63,6 +65,9 @@ func (m *mux) Run(ctx context.Context) (err error) {
case <-ctx.Done():
return ctx.Err()
case req := <-m.requestCh:
if err := rights.RequireAuthenticated(ctx); err != nil {
return err
}
var resp protocol.Response
switch req := req.(type) {
case *protocol.SubscribeRequest:
Expand Down
4 changes: 4 additions & 0 deletions pkg/console/internal/events/eventsmux/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ func TestMux(t *testing.T) { // nolint:gocyclo
unique.ID(ctx, appIDs): ttnpb.RightsFrom(ttnpb.Right_RIGHT_ALL),
}),
})
ctx = rights.NewContextWithAuthInfo(ctx, &ttnpb.AuthInfoResponse{
UniversalRights: ttnpb.RightsFrom(ttnpb.Right_RIGHT_ALL),
IsAdmin: true,
})

subs := &mockSubscriptions{
ctx: ctx,
Expand Down
3 changes: 2 additions & 1 deletion pkg/console/internal/events/subscriptions/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.thethings.network/lorawan-stack/v3/pkg/auth/rights"
"go.thethings.network/lorawan-stack/v3/pkg/auth/rights/rightsutil"
"go.thethings.network/lorawan-stack/v3/pkg/errorcontext"
"go.thethings.network/lorawan-stack/v3/pkg/errors"
"go.thethings.network/lorawan-stack/v3/pkg/events"
"go.thethings.network/lorawan-stack/v3/pkg/log"
Expand Down Expand Up @@ -141,7 +142,7 @@ func (s *subscriptions) Subscribe(
return err
}
ch := make(chan events.Event, channelSize(tail))
ctx, cancel := context.WithCancelCause(s.ctx)
ctx, cancel := errorcontext.New(s.ctx)
defer func() {
if err != nil {
cancel(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.thethings.network/lorawan-stack/v3/pkg/auth/rights"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions"
"go.thethings.network/lorawan-stack/v3/pkg/errorcontext"
"go.thethings.network/lorawan-stack/v3/pkg/events"
"go.thethings.network/lorawan-stack/v3/pkg/task"
"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
Expand Down Expand Up @@ -143,7 +144,7 @@ func runTestSubscriptions(
_, historical := subscriber.(interface{ historical() })

a, ctx := test.New(t)
ctx, cancel := context.WithCancelCause(ctx)
ctx, cancel := errorcontext.New(ctx)
defer cancel(nil)

timeout := test.Delay << 3
Expand Down
19 changes: 19 additions & 0 deletions pkg/console/internal/events/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"io"
"time"

"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol"
Expand Down Expand Up @@ -79,3 +80,21 @@ func makeWriteTask(conn *websocket.Conn, m eventsmux.Interface, cancel func(erro
}
}
}

func makePingTask(conn *websocket.Conn, cancel func(error), period time.Duration) func(context.Context) error {
return func(ctx context.Context) (err error) {
ticker := time.NewTicker(period)
defer ticker.Stop()
defer func() { cancel(err) }()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := conn.Ping(ctx); err != nil {
return err
}
}
}
}
}
29 changes: 24 additions & 5 deletions pkg/webui/console/lib/events/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import { ingestError } from '@ttn-lw/lib/errors/utils'

import { createNetworkErrorEvent, createUnknownErrorEvent } from './definitions'

export const defineSyntheticEvent = name => data => ({
Expand All @@ -23,13 +25,30 @@ export const defineSyntheticEvent = name => data => ({
data,
})

export const createSyntheticEventFromError = error => {
const convertError = error => {
if (error instanceof Error) {
const errorString = error.toString()
if (error.message === 'network error' || error.message === 'Error in body stream') {
return createNetworkErrorEvent({ error: errorString })
return {
...error,
message: error.message,
name: error.name,
// The stack is omitted intentionally, as it is not relevant for a user.
}
}
return error
}

return createUnknownErrorEvent({ error: errorString })
export const createSyntheticEventFromError = error => {
if (error instanceof Error) {
if (
error.name === 'ConnectionError' ||
error.name === 'ConnectionClosedError' ||
error.name === 'ConnectionTimeoutError'
) {
return createNetworkErrorEvent({ error: convertError(error) })
} else if (error.name === 'ProtocolError') {
ingestError(error.error)
return createUnknownErrorEvent({ error: convertError(error) })
}
return createUnknownErrorEvent({ error: convertError(error) })
}
}
Loading

0 comments on commit edb7c34

Please sign in to comment.