From b438268bb9c64b003d41d3bd77927da864f40d73 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 2 Apr 2024 10:47:32 +1300 Subject: [PATCH] update to quic-go v0.43.0 (unreleased) --- client.go | 20 ++++++++++---------- go.mod | 2 +- go.sum | 4 ++-- server.go | 8 ++++---- session.go | 2 +- session_manager.go | 33 +++++++++++++++++---------------- session_test.go | 10 +++++----- webtransport_test.go | 18 +++++++++--------- 8 files changed, 49 insertions(+), 48 deletions(-) diff --git a/client.go b/client.go index 5ae17ce..92e4bd0 100644 --- a/client.go +++ b/client.go @@ -54,7 +54,7 @@ func (d *Dialer) init() { d.RoundTripper.AdditionalSettings = make(map[uint64]uint64) } d.RoundTripper.AdditionalSettings[settingsEnableWebtransport] = 1 - d.RoundTripper.StreamHijacker = func(ft http3.FrameType, conn quic.Connection, str quic.Stream, e error) (hijacked bool, err error) { + d.RoundTripper.StreamHijacker = func(ft http3.FrameType, connTracingID quic.ConnectionTracingID, str quic.Stream, e error) (hijacked bool, err error) { if isWebTransportError(e) { return true, nil } @@ -68,21 +68,21 @@ func (d *Dialer) init() { } return false, err } - d.conns.AddStream(conn, str, sessionID(id)) + d.conns.AddStream(connTracingID, str, sessionID(id)) return true, nil } - d.RoundTripper.UniStreamHijacker = func(st http3.StreamType, conn quic.Connection, str quic.ReceiveStream, err error) (hijacked bool) { + d.RoundTripper.UniStreamHijacker = func(st http3.StreamType, connTracingID quic.ConnectionTracingID, str quic.ReceiveStream, err error) (hijacked bool) { if st != webTransportUniStreamType && !isWebTransportError(err) { return false } - d.conns.AddUniStream(conn, str) + d.conns.AddUniStream(connTracingID, str) return true } - if d.QuicConfig == nil { - d.QuicConfig = &quic.Config{EnableDatagrams: true} + if d.QUICConfig == nil { + d.QUICConfig = &quic.Config{EnableDatagrams: true} } - if d.QuicConfig.MaxIncomingStreams == 0 { - d.QuicConfig.MaxIncomingStreams = 100 + if d.QUICConfig.MaxIncomingStreams == 0 { + d.QUICConfig.MaxIncomingStreams = 100 } } @@ -91,8 +91,8 @@ func (d *Dialer) Dial(ctx context.Context, urlStr string, reqHdr http.Header) (* // Technically, this is not true. DATAGRAMs could be sent using the Capsule protocol. // However, quic-go currently enforces QUIC datagram support if HTTP/3 datagrams are enabled. - if !d.QuicConfig.EnableDatagrams { - return nil, nil, errors.New("WebTransport requires DATAGRAM support, enable it via QuicConfig.EnableDatagrams") + if !d.QUICConfig.EnableDatagrams { + return nil, nil, errors.New("WebTransport requires DATAGRAM support, enable it via QUICConfig.EnableDatagrams") } u, err := url.Parse(urlStr) diff --git a/go.mod b/go.mod index 4fff1f1..336a325 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/quic-go/webtransport-go go 1.21 require ( - github.com/quic-go/quic-go v0.42.0 + github.com/quic-go/quic-go v0.42.1-0.20240401225549-0d62a15b4e06 github.com/stretchr/testify v1.8.0 go.uber.org/mock v0.4.0 ) diff --git a/go.sum b/go.sum index 3e1411b..9e8f445 100644 --- a/go.sum +++ b/go.sum @@ -92,8 +92,8 @@ github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7q github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo= github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A= -github.com/quic-go/quic-go v0.42.0 h1:uSfdap0eveIl8KXnipv9K7nlwZ5IqLlYOpJ58u5utpM= -github.com/quic-go/quic-go v0.42.0/go.mod h1:132kz4kL3F9vxhW3CtQJLDVwcFe5wdWeJXXijhsO57M= +github.com/quic-go/quic-go v0.42.1-0.20240401225549-0d62a15b4e06 h1:fGXQnRHJb5KcuaKUZS0RyhuHK4IC6yUxUnx9Y3lIan0= +github.com/quic-go/quic-go v0.42.1-0.20240401225549-0d62a15b4e06/go.mod h1:132kz4kL3F9vxhW3CtQJLDVwcFe5wdWeJXXijhsO57M= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= diff --git a/server.go b/server.go index 90cf1c3..672c8fd 100644 --- a/server.go +++ b/server.go @@ -80,7 +80,7 @@ func (s *Server) init() error { if s.H3.StreamHijacker != nil { return errors.New("StreamHijacker already set") } - s.H3.StreamHijacker = func(ft http3.FrameType, qconn quic.Connection, str quic.Stream, err error) (bool /* hijacked */, error) { + s.H3.StreamHijacker = func(ft http3.FrameType, connTracingID quic.ConnectionTracingID, str quic.Stream, err error) (bool /* hijacked */, error) { if isWebTransportError(err) { return true, nil } @@ -96,14 +96,14 @@ func (s *Server) init() error { } return false, err } - s.conns.AddStream(qconn, str, sessionID(id)) + s.conns.AddStream(connTracingID, str, sessionID(id)) return true, nil } - s.H3.UniStreamHijacker = func(st http3.StreamType, qconn quic.Connection, str quic.ReceiveStream, err error) (hijacked bool) { + s.H3.UniStreamHijacker = func(st http3.StreamType, connTracingID quic.ConnectionTracingID, str quic.ReceiveStream, err error) (hijacked bool) { if st != webTransportUniStreamType && !isWebTransportError(err) { return false } - s.conns.AddUniStream(qconn, str) + s.conns.AddUniStream(connTracingID, str) return true } return nil diff --git a/session.go b/session.go index f440059..12ca016 100644 --- a/session.go +++ b/session.go @@ -83,7 +83,7 @@ type Session struct { } func newSession(sessionID sessionID, qconn http3.StreamCreator, requestStr quic.Stream) *Session { - tracingID := qconn.Context().Value(quic.ConnectionTracingKey).(uint64) + tracingID := qconn.Context().Value(quic.ConnectionTracingKey).(quic.ConnectionTracingID) ctx, ctxCancel := context.WithCancel(context.WithValue(context.Background(), quic.ConnectionTracingKey, tracingID)) c := &Session{ sessionID: sessionID, diff --git a/session_manager.go b/session_manager.go index 2dbb738..7026c86 100644 --- a/session_manager.go +++ b/session_manager.go @@ -25,13 +25,13 @@ type sessionManager struct { timeout time.Duration mx sync.Mutex - conns map[http3.StreamCreator]map[sessionID]*session + conns map[quic.ConnectionTracingID]map[sessionID]*session } func newSessionManager(timeout time.Duration) *sessionManager { m := &sessionManager{ timeout: timeout, - conns: make(map[http3.StreamCreator]map[sessionID]*session), + conns: make(map[quic.ConnectionTracingID]map[sessionID]*session), } m.ctx, m.ctxCancel = context.WithCancel(context.Background()) return m @@ -41,8 +41,8 @@ func newSessionManager(timeout time.Duration) *sessionManager { // If the WebTransport session has not yet been established, // it starts a new go routine and waits for establishment of the session. // If that takes longer than timeout, the stream is reset. -func (m *sessionManager) AddStream(qconn http3.StreamCreator, str quic.Stream, id sessionID) { - sess, isExisting := m.getOrCreateSession(qconn, id) +func (m *sessionManager) AddStream(connTracingID quic.ConnectionTracingID, str quic.Stream, id sessionID) { + sess, isExisting := m.getOrCreateSession(connTracingID, id) if isExisting { sess.conn.addIncomingStream(str) return @@ -60,19 +60,19 @@ func (m *sessionManager) AddStream(qconn http3.StreamCreator, str quic.Stream, i // Once no more streams are waiting for this session to be established, // and this session is still outstanding, delete it from the map. if sess.counter == 0 && sess.conn == nil { - m.maybeDelete(qconn, id) + m.maybeDelete(connTracingID, id) } }() } -func (m *sessionManager) maybeDelete(qconn http3.StreamCreator, id sessionID) { - sessions, ok := m.conns[qconn] +func (m *sessionManager) maybeDelete(connTracingID quic.ConnectionTracingID, id sessionID) { + sessions, ok := m.conns[connTracingID] if !ok { // should never happen return } delete(sessions, id) if len(sessions) == 0 { - delete(m.conns, qconn) + delete(m.conns, connTracingID) } } @@ -80,14 +80,14 @@ func (m *sessionManager) maybeDelete(qconn http3.StreamCreator, id sessionID) { // If the WebTransport session has not yet been established, // it starts a new go routine and waits for establishment of the session. // If that takes longer than timeout, the stream is reset. -func (m *sessionManager) AddUniStream(qconn http3.StreamCreator, str quic.ReceiveStream) { +func (m *sessionManager) AddUniStream(connTracingID quic.ConnectionTracingID, str quic.ReceiveStream) { idv, err := quicvarint.Read(quicvarint.NewReader(str)) if err != nil { str.CancelRead(1337) } id := sessionID(idv) - sess, isExisting := m.getOrCreateSession(qconn, id) + sess, isExisting := m.getOrCreateSession(connTracingID, id) if isExisting { sess.conn.addIncomingUniStream(str) return @@ -105,19 +105,19 @@ func (m *sessionManager) AddUniStream(qconn http3.StreamCreator, str quic.Receiv // Once no more streams are waiting for this session to be established, // and this session is still outstanding, delete it from the map. if sess.counter == 0 && sess.conn == nil { - m.maybeDelete(qconn, id) + m.maybeDelete(connTracingID, id) } }() } -func (m *sessionManager) getOrCreateSession(qconn http3.StreamCreator, id sessionID) (sess *session, existed bool) { +func (m *sessionManager) getOrCreateSession(connTracingID quic.ConnectionTracingID, id sessionID) (sess *session, existed bool) { m.mx.Lock() defer m.mx.Unlock() - sessions, ok := m.conns[qconn] + sessions, ok := m.conns[connTracingID] if !ok { sessions = make(map[sessionID]*session) - m.conns[qconn] = sessions + m.conns[connTracingID] = sessions } sess, ok = sessions[id] @@ -166,14 +166,15 @@ func (m *sessionManager) handleUniStream(str quic.ReceiveStream, sess *session) // AddSession adds a new WebTransport session. func (m *sessionManager) AddSession(qconn http3.StreamCreator, id sessionID, requestStr quic.Stream) *Session { conn := newSession(id, qconn, requestStr) + connTracingID := qconn.Context().Value(quic.ConnectionTracingKey).(quic.ConnectionTracingID) m.mx.Lock() defer m.mx.Unlock() - sessions, ok := m.conns[qconn] + sessions, ok := m.conns[connTracingID] if !ok { sessions = make(map[sessionID]*session) - m.conns[qconn] = sessions + m.conns[connTracingID] = sessions } if sess, ok := sessions[id]; ok { // We might already have an entry of this session. diff --git a/session_test.go b/session_test.go index 8071e48..4392477 100644 --- a/session_test.go +++ b/session_test.go @@ -42,7 +42,7 @@ func TestCloseStreamsOnClose(t *testing.T) { ctrl := gomock.NewController(t) mockSess := NewMockStreamCreator(ctrl) - mockSess.EXPECT().Context().Return(context.WithValue(context.Background(), quic.ConnectionTracingKey, uint64(1337))) + mockSess.EXPECT().Context().Return(context.WithValue(context.Background(), quic.ConnectionTracingKey, quic.ConnectionTracingID(1337))) sess := newSession(42, mockSess, newMockRequestStream(ctrl)) str := NewMockStream(ctrl) @@ -67,7 +67,7 @@ func TestOpenStreamSyncCancel(t *testing.T) { defer ctrl.Finish() mockSess := NewMockStreamCreator(ctrl) - mockSess.EXPECT().Context().Return(context.WithValue(context.Background(), quic.ConnectionTracingKey, uint64(1337))) + mockSess.EXPECT().Context().Return(context.WithValue(context.Background(), quic.ConnectionTracingKey, quic.ConnectionTracingID(1337))) sess := newSession(42, mockSess, newMockRequestStream(ctrl)) defer sess.CloseWithError(0, "") @@ -102,7 +102,7 @@ func TestAddStreamAfterSessionClose(t *testing.T) { defer ctrl.Finish() mockSess := NewMockStreamCreator(ctrl) - mockSess.EXPECT().Context().Return(context.WithValue(context.Background(), quic.ConnectionTracingKey, uint64(1337))) + mockSess.EXPECT().Context().Return(context.WithValue(context.Background(), quic.ConnectionTracingKey, quic.ConnectionTracingID(1337))) sess := newSession(42, mockSess, newMockRequestStream(ctrl)) require.NoError(t, sess.CloseWithError(0, "")) @@ -122,7 +122,7 @@ func TestOpenStreamAfterSessionClose(t *testing.T) { defer ctrl.Finish() mockSess := NewMockStreamCreator(ctrl) - mockSess.EXPECT().Context().Return(context.WithValue(context.Background(), quic.ConnectionTracingKey, uint64(1337))) + mockSess.EXPECT().Context().Return(context.WithValue(context.Background(), quic.ConnectionTracingKey, quic.ConnectionTracingID(1337))) wait := make(chan struct{}) streamOpen := make(chan struct{}) mockSess.EXPECT().OpenStreamSync(gomock.Any()).DoAndReturn(func(context.Context) (quic.Stream, error) { @@ -154,7 +154,7 @@ func TestOpenUniStreamAfterSessionClose(t *testing.T) { defer ctrl.Finish() mockSess := NewMockStreamCreator(ctrl) - mockSess.EXPECT().Context().Return(context.WithValue(context.Background(), quic.ConnectionTracingKey, uint64(1337))) + mockSess.EXPECT().Context().Return(context.WithValue(context.Background(), quic.ConnectionTracingKey, quic.ConnectionTracingID(1337))) wait := make(chan struct{}) streamOpen := make(chan struct{}) mockSess.EXPECT().OpenUniStreamSync(gomock.Any()).DoAndReturn(func(context.Context) (quic.SendStream, error) { diff --git a/webtransport_test.go b/webtransport_test.go index 41188c4..008c677 100644 --- a/webtransport_test.go +++ b/webtransport_test.go @@ -71,7 +71,7 @@ func establishSession(t *testing.T, handler func(*webtransport.Session)) (sess * s := &webtransport.Server{ H3: http3.Server{ TLSConfig: tlsConf, - QuicConfig: &quic.Config{Tracer: getQlogger(t), EnableDatagrams: true}, + QUICConfig: &quic.Config{Tracer: getQlogger(t), EnableDatagrams: true}, }, } addHandler(t, s, handler) @@ -80,7 +80,7 @@ func establishSession(t *testing.T, handler func(*webtransport.Session)) (sess * d := webtransport.Dialer{ RoundTripper: &http3.RoundTripper{ TLSClientConfig: &tls.Config{RootCAs: certPool}, - QuicConfig: &quic.Config{Tracer: getQlogger(t), EnableDatagrams: true}, + QUICConfig: &quic.Config{Tracer: getQlogger(t), EnableDatagrams: true}, }, } defer d.Close() @@ -221,9 +221,9 @@ func TestStreamsImmediateClose(t *testing.T) { t.Run("unidirectional", func(t *testing.T) { t.Run("client-initiated", func(t *testing.T) { - sess, closeServer := establishSession(t, func(c *webtransport.Session) { - defer c.CloseWithError(0, "") - str, err := c.AcceptUniStream(context.Background()) + sess, closeServer := establishSession(t, func(sess *webtransport.Session) { + defer sess.CloseWithError(0, "") + str, err := sess.AcceptUniStream(context.Background()) require.NoError(t, err) n, err := str.Read([]byte{0}) require.Zero(t, n) @@ -238,8 +238,8 @@ func TestStreamsImmediateClose(t *testing.T) { }) t.Run("server-initiated", func(t *testing.T) { - sess, closeServer := establishSession(t, func(c *webtransport.Session) { - str, err := c.OpenUniStream() + sess, closeServer := establishSession(t, func(sess *webtransport.Session) { + str, err := sess.OpenUniStream() require.NoError(t, err) require.NoError(t, str.Close()) }) @@ -344,7 +344,7 @@ func TestMultipleClients(t *testing.T) { d := webtransport.Dialer{ RoundTripper: &http3.RoundTripper{ TLSClientConfig: &tls.Config{RootCAs: certPool}, - QuicConfig: &quic.Config{Tracer: getQlogger(t), EnableDatagrams: true}, + QUICConfig: &quic.Config{Tracer: getQlogger(t), EnableDatagrams: true}, }, } defer d.Close() @@ -524,7 +524,7 @@ func TestCheckOrigin(t *testing.T) { d := webtransport.Dialer{ RoundTripper: &http3.RoundTripper{ TLSClientConfig: &tls.Config{RootCAs: certPool}, - QuicConfig: &quic.Config{Tracer: getQlogger(t), EnableDatagrams: true}, + QUICConfig: &quic.Config{Tracer: getQlogger(t), EnableDatagrams: true}, }, } defer d.Close()