Skip to content

Commit

Permalink
http/3 support (#474)
Browse files Browse the repository at this point in the history
* move tcp tunnel code to separate file

* http2 connect support

* add log messages

* fix buffering

* go mod tidy

* http/3 support

* add test

* fix test

* add flush
  • Loading branch information
calebdoxsey authored Dec 6, 2024
1 parent 0ea07a9 commit 0789063
Show file tree
Hide file tree
Showing 11 changed files with 680 additions and 109 deletions.
1 change: 0 additions & 1 deletion api/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ func tunnelAcceptLoop(ctx context.Context, id string, li net.Listener, tun Tunne
if err != nil {
log.Printf("error serving local connection %s: %v\n", id, err)
}
cEvt.OnDisconnected(ctx, err)
}(c)
}
}
Expand Down
9 changes: 8 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ require (
github.com/google/uuid v1.6.0
github.com/martinlindhe/base36 v1.1.1
github.com/pomerium/pomerium v0.28.0
github.com/quic-go/quic-go v0.48.2
github.com/rs/zerolog v1.33.0
github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
github.com/spf13/cobra v1.8.1
github.com/stretchr/testify v1.10.0
golang.org/x/crypto v0.29.0
golang.org/x/net v0.30.0
golang.org/x/sync v0.9.0
golang.org/x/sys v0.27.0
google.golang.org/grpc v1.68.0
Expand All @@ -37,6 +39,8 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/envoyproxy/go-control-plane v0.13.1 // indirect
github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-set/v3 v3.0.0 // indirect
Expand All @@ -49,15 +53,18 @@ require (
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mholt/acmez/v2 v2.0.3 // indirect
github.com/miekg/dns v1.1.62 // indirect
github.com/onsi/ginkgo/v2 v2.19.1 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/pomerium/protoutil v0.0.0-20240813175624-47b7ac43ff46 // indirect
github.com/quic-go/qpack v0.5.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/zeebo/blake3 v0.2.4 // indirect
go.uber.org/mock v0.5.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa // indirect
golang.org/x/mod v0.20.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/text v0.20.0 // indirect
golang.org/x/tools v0.24.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9 // indirect
Expand Down
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE=
github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand All @@ -111,6 +113,8 @@ github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl76
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg=
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
Expand Down Expand Up @@ -192,6 +196,10 @@ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/onsi/ginkgo/v2 v2.19.1 h1:QXgq3Z8Crl5EL1WBAC98A5sEBHARrAJNzAmMxzLcRF0=
github.com/onsi/ginkgo/v2 v2.19.1/go.mod h1:O3DtEWQkPa/F7fBMgmZQKKsluAy8pd3rEQdrjkPb9zA=
github.com/onsi/gomega v1.34.1 h1:EUMJIKUjM8sKjYbtxQI9A4z2o+rruxnzNvpknOXie6k=
github.com/onsi/gomega v1.34.1/go.mod h1:kU1QgUvBDLXBJq618Xvm2LUX6rSAfRaFRTcdOeDLwwY=
github.com/open-policy-agent/opa v0.70.0 h1:B3cqCN2iQAyKxK6+GI+N40uqkin+wzIrM7YA60t9x1U=
github.com/open-policy-agent/opa v0.70.0/go.mod h1:Y/nm5NY0BX0BqjBriKUiV81sCl8XOjjvqQG7dXrggtI=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
Expand Down Expand Up @@ -231,6 +239,10 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/prometheus/statsd_exporter v0.22.7 h1:7Pji/i2GuhK6Lu7DHrtTkFmNBCudCPT1pX2CziuyQR0=
github.com/prometheus/statsd_exporter v0.22.7/go.mod h1:N/TevpjkIh9ccs6nuzY3jQn9dFqnUakOjnEuMPJJJnI=
github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI=
github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg=
github.com/quic-go/quic-go v0.48.2 h1:wsKXZPeGWpMpCGSWqOcqpW2wZYic/8T3aqiOID0/KWE=
github.com/quic-go/quic-go v0.48.2/go.mod h1:yBgs3rWBOADpga7F+jJsb6Ybg1LSYiQvwWlLX+/6HMs=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/go-charset v0.0.0-20180617210344-2471d30d28b4/go.mod h1:qgYeAmZ5ZIpBWTGllZSQnw97Dj+woV0toclVaRGI8pc=
Expand Down Expand Up @@ -368,6 +380,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug=
golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4=
golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
Expand Down
23 changes: 23 additions & 0 deletions internal/testutil/testutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package testutil

import (
"net"
"testing"

"github.com/stretchr/testify/require"
)

// GetPort gets a free port.
func GetPort(t *testing.T) string {
t.Helper()

li, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

_, port, err := net.SplitHostPort(li.Addr().String())
require.NoError(t, err)

_ = li.Close()

return port
}
146 changes: 39 additions & 107 deletions tunnel/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@
package tunnel

import (
"bufio"
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"net/url"
"sync"
"time"

backoff "github.com/cenkalti/backoff/v4"
Expand All @@ -20,10 +18,19 @@ import (
"github.com/pomerium/cli/jwt"
)

var (
errUnavailable = errors.New("unavailable")
errUnauthenticated = errors.New("unauthenticated")
errUnsupported = errors.New("unsupported")
)

// A Tunnel represents a TCP tunnel over HTTP Connect.
type Tunnel struct {
cfg *config
auth *authclient.AuthClient

mu sync.Mutex
tcpTunneler TCPTunneler
}

// New creates a new Tunnel.
Expand Down Expand Up @@ -106,121 +113,46 @@ func (tun *Tunnel) Run(ctx context.Context, local io.ReadWriter, eventSink Event
}

func (tun *Tunnel) run(ctx context.Context, eventSink EventSink, local io.ReadWriter, rawJWT string, retryCount int) error {
eventSink.OnConnecting(ctx)

hdr := http.Header{}
if rawJWT != "" {
hdr.Set("Authorization", "Pomerium "+rawJWT)
}

req := (&http.Request{
Method: "CONNECT",
URL: &url.URL{Opaque: tun.cfg.dstHost},
Host: tun.cfg.dstHost,
Header: hdr,
}).WithContext(ctx)

var remote net.Conn
var err error
if tun.cfg.tlsConfig != nil {
remote, err = (&tls.Dialer{Config: tun.cfg.tlsConfig}).DialContext(ctx, "tcp", tun.cfg.proxyHost)
} else {
remote, err = (&net.Dialer{}).DialContext(ctx, "tcp", tun.cfg.proxyHost)
}
if err != nil {
return fmt.Errorf("failed to establish connection to proxy: %w", err)
}
defer func() {
_ = remote.Close()
log.Println("connection closed")
}()
if done := ctx.Done(); done != nil {
go func() {
<-done
_ = remote.Close()
}()
}

err = req.Write(remote)
if err != nil {
return err
tun.mu.Lock()
if tun.tcpTunneler == nil {
tun.tcpTunneler = tun.pickTCPTunneler(ctx)
}
tun.mu.Unlock()

br := bufio.NewReader(remote)
res, err := http.ReadResponse(br, req)
if err != nil {
return fmt.Errorf("failed to read HTTP response: %w", err)
}
defer func() {
_ = res.Body.Close()
}()
switch res.StatusCode {
case http.StatusOK:
case http.StatusServiceUnavailable:
err := tun.tcpTunneler.TunnelTCP(ctx, eventSink, local, rawJWT)
if errors.Is(err, errUnavailable) {
// don't delete the JWT if we get a service unavailable
return fmt.Errorf("invalid http response code: %s", res.Status)
case http.StatusMovedPermanently,
http.StatusFound,
http.StatusTemporaryRedirect,
http.StatusPermanentRedirect:
if retryCount == 0 {
_ = remote.Close()

serverURL := &url.URL{
Scheme: "http",
Host: tun.cfg.proxyHost,
}
if tun.cfg.tlsConfig != nil {
serverURL.Scheme = "https"
}

rawJWT, err = tun.auth.GetJWT(ctx, serverURL, func(authURL string) { eventSink.OnAuthRequired(ctx, authURL) })
if err != nil {
return fmt.Errorf("failed to get authentication JWT: %w", err)
}

err = tun.cfg.jwtCache.StoreJWT(tun.jwtCacheKey(), rawJWT)
if err != nil {
return fmt.Errorf("failed to store JWT: %w", err)
}

return tun.run(ctx, eventSink, local, rawJWT, retryCount+1)
return err
} else if errors.Is(err, errUnauthenticated) && retryCount == 0 {
serverURL := &url.URL{
Scheme: "http",
Host: tun.cfg.proxyHost,
}
if tun.cfg.tlsConfig != nil {
serverURL.Scheme = "https"
}
fallthrough
default:
_ = tun.cfg.jwtCache.DeleteJWT(tun.jwtCacheKey())
return fmt.Errorf("invalid http response code: %d", res.StatusCode)
}

log.Println("connection established")
eventSink.OnConnected(ctx)
rawJWT, err = tun.auth.GetJWT(ctx, serverURL, func(authURL string) {
eventSink.OnAuthRequired(ctx, authURL)
})
if err != nil {
return fmt.Errorf("failed to get authentication JWT: %w", err)
}

errc := make(chan error, 2)
go func() {
_, err := io.Copy(remote, local)
errc <- err
}()
remoteReader := deBuffer(br, remote)
go func() {
_, err := io.Copy(local, remoteReader)
errc <- err
}()
err = tun.cfg.jwtCache.StoreJWT(tun.jwtCacheKey(), rawJWT)
if err != nil {
return fmt.Errorf("failed to store JWT: %w", err)
}

select {
case err := <-errc:
return tun.run(ctx, eventSink, local, rawJWT, retryCount+1)
} else if err != nil {
_ = tun.cfg.jwtCache.DeleteJWT(tun.jwtCacheKey())
return err
case <-ctx.Done():
return nil
}

return nil
}

func (tun *Tunnel) jwtCacheKey() string {
return fmt.Sprintf("%s|%v", tun.cfg.proxyHost, tun.cfg.tlsConfig != nil)
}

func deBuffer(br *bufio.Reader, underlying io.Reader) io.Reader {
if br.Buffered() == 0 {
return underlying
}
return io.MultiReader(io.LimitReader(br, int64(br.Buffered())), underlying)
}
Loading

0 comments on commit 0789063

Please sign in to comment.