Skip to content

Commit

Permalink
Add proxy protocol (#363)
Browse files Browse the repository at this point in the history
* Add proxy protocol

* Deploy new connection wrapper to dev

* Longer timeout

* Update metrics listener

* Get peer address from context

* Add method as well

* Tidy up

* Revert logging change

* Fix test

* Split string
  • Loading branch information
neekolas authored Mar 21, 2024
1 parent 9428056 commit a490b32
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 20 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ require (
github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/philhofer/fwd v1.1.1 // indirect
github.com/pires/go-proxyproto v0.7.0
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,8 @@ github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0je
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pires/go-proxyproto v0.7.0 h1:IukmRewDQFWC7kfnb66CSomk2q/seBuilHBYFwyq0Hs=
github.com/pires/go-proxyproto v0.7.0/go.mod h1:Vz/1JPY/OACxWGQNIRY2BeyDmpoaWmEP40O9LbuiFR4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
11 changes: 10 additions & 1 deletion pkg/api/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"

"github.com/xmtp/xmtp-node-go/pkg/logging"
Expand Down Expand Up @@ -148,7 +149,9 @@ func (wa *WalletAuthorizer) applyLimits(ctx context.Context, fullMethod string,
if len(ip) == 0 {
// requests without an IP address are bucketed together as "ip_unknown"
ip = "ip_unknown"
wa.Log.Warn("no ip found", logging.String("method", fullMethod))
}
wa.Log.Info("got client ip", logging.String("client_ip", ip), logging.String("method", fullMethod))

// with no wallet apply regular limits
var isPriority bool
Expand Down Expand Up @@ -238,7 +241,13 @@ func clientIPFromContext(ctx context.Context) string {
md, _ := metadata.FromIncomingContext(ctx)
vals := md.Get("x-forwarded-for")
if len(vals) == 0 {
return ""
p, ok := peer.FromContext(ctx)
if ok {
ipAndPort := strings.Split(p.Addr.String(), ":")
return ipAndPort[0]
} else {
return ""
}
}
// There are potentially multiple comma separated IPs bundled in that first value
ips := strings.Split(vals[0], ",")
Expand Down
4 changes: 3 additions & 1 deletion pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/pires/go-proxyproto"
messagev1 "github.com/xmtp/xmtp-node-go/pkg/api/message/v1"
apicontext "github.com/xmtp/xmtp-node-go/pkg/api/message/v1/context"
mlsv1 "github.com/xmtp/xmtp-node-go/pkg/mls/api/v1"
Expand Down Expand Up @@ -86,7 +87,8 @@ func New(config *Config) (*Server, error) {
func (s *Server) startGRPC() error {
var err error

s.grpcListener, err = net.Listen("tcp", addrString(s.GRPCAddress, s.GRPCPort))
grpcListener, err := net.Listen("tcp", addrString(s.GRPCAddress, s.GRPCPort))
s.grpcListener = &proxyproto.Listener{Listener: grpcListener, ReadHeaderTimeout: 10 * time.Second}
if err != nil {
return errors.Wrap(err, "creating grpc listener")
}
Expand Down
18 changes: 2 additions & 16 deletions pkg/api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,14 +780,7 @@ func Test_Ratelimits_Regular(t *testing.T) {
require.NoError(t, err)
_, err = client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs[1:2]})
require.Error(t, err)
errMsg := "1 exceeds rate limit R"
if _, ok := status.FromError(err); ok {
// GRPC
errMsg += "ip_unknownPUB"
} else {
// HTTP
errMsg += "127.0.0.1PUB"
}
errMsg := "1 exceeds rate limit R127.0.0.1PUB"
requireErrorEqual(t, err, codes.ResourceExhausted, errMsg)
// check that Query is not affected by publish quota
_, err = client.Query(ctx, &messageV1.QueryRequest{ContentTopics: []string{"topic"}})
Expand Down Expand Up @@ -815,14 +808,7 @@ func Test_Ratelimits_Priority(t *testing.T) {
require.NoError(t, err)
_, err = client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs[2:3]})
require.Error(t, err)
errMsg := "1 exceeds rate limit P"
if _, ok := status.FromError(err); ok {
// GRPC
errMsg += "ip_unknownPUB"
} else {
// HTTP
errMsg += "127.0.0.1PUB"
}
errMsg := "1 exceeds rate limit P127.0.0.1PUB"
requireErrorEqual(t, err, codes.ResourceExhausted, errMsg)
// check that query is not affected by publish quota
_, err = client.Query(ctx, &messageV1.QueryRequest{ContentTopics: []string{"topic"}})
Expand Down
5 changes: 3 additions & 2 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net"
"net/http"

"github.com/pires/go-proxyproto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/xmtp/xmtp-node-go/pkg/tracing"
Expand All @@ -24,9 +25,9 @@ func NewMetricsServer(ctx context.Context, address string, port int, log *zap.Lo
log: log.Named("metrics"),
}

var err error
addr := fmt.Sprintf("%s:%d", address, port)
s.http, err = net.Listen("tcp", addr)
httpListener, err := net.Listen("tcp", addr)
s.http = &proxyproto.Listener{Listener: httpListener}
if err != nil {
return nil, err
}
Expand Down

0 comments on commit a490b32

Please sign in to comment.