Skip to content

Commit

Permalink
internal/rendezvous: handle buildlet client creation and testing
Browse files Browse the repository at this point in the history
This change returns a buildlet.Client from the rendezvous instance
instead of a net.Conn. The connections will be converted into buildlet
clients by the consumers. Adding the conversion to this package
enables us to create a fake that returns a fake buildlet. It
simplifies testing for other packages. A fake rendezvous has been
added to enable testing.

Fixes golang/go#61844

Change-Id: Iab241e029df67609f9a09dc8874473cc094cd430
Reviewed-on: https://go-review.googlesource.com/c/build/+/538255
Reviewed-by: Dmitri Shuralyov <[email protected]>
LUCI-TryBot-Result: Go LUCI <[email protected]>
Reviewed-by: Dmitri Shuralyov <[email protected]>
  • Loading branch information
cagedmantis committed Oct 30, 2023
1 parent ef07766 commit 6785f27
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 34 deletions.
82 changes: 82 additions & 0 deletions internal/rendezvous/fake_rendezvous.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package rendezvous

import (
"context"
"log"
"net/http"
"time"

"golang.org/x/build/buildlet"
)

type rendezvousServer interface {
DeregisterInstance(ctx context.Context, id string)
HandleReverse(w http.ResponseWriter, r *http.Request)
RegisterInstance(ctx context.Context, id string, wait time.Duration)
WaitForInstance(ctx context.Context, id string) (buildlet.Client, error)
}

var _ rendezvousServer = (*FakeRendezvous)(nil)
var _ rendezvousServer = (*Rendezvous)(nil)

// FakeRendezvous is a fake rendezvous implementation intended for use in testing.
type FakeRendezvous struct {
validator TokenValidator
}

// NewFake creates a Fake Rendezvous instance.
func NewFake(ctx context.Context, validator TokenValidator) *FakeRendezvous {
rdv := &FakeRendezvous{
validator: validator,
}
return rdv
}

// RegisterInstance is a fake implementation.
func (rdv *FakeRendezvous) RegisterInstance(ctx context.Context, id string, wait time.Duration) {
// do nothing
}

// DeregisterInstance is a fake implementation.
func (rdv *FakeRendezvous) DeregisterInstance(ctx context.Context, id string) {
// do nothing
}

// WaitForInstance is a fake implementation.
func (rdv *FakeRendezvous) WaitForInstance(ctx context.Context, id string) (buildlet.Client, error) {
return &buildlet.FakeClient{}, nil
}

// HandleReverse is a fake implementation of the handler.
func (rdv *FakeRendezvous) HandleReverse(w http.ResponseWriter, r *http.Request) {
if r.TLS == nil {
http.Error(w, "buildlet registration requires SSL", http.StatusInternalServerError)
return
}
var (
id = r.Header.Get(HeaderID)
authToken = r.Header.Get(HeaderToken)
hostname = r.Header.Get(HeaderHostname)
)
if hostname == "" {
http.Error(w, "missing X-Go-Hostname header", http.StatusBadRequest)
return
}
if id == "" {
http.Error(w, "missing X-Go-Gomote-ID header", http.StatusBadRequest)
return
}
if authToken == "" {
http.Error(w, "missing X-Go-Swarming-Auth-Token header", http.StatusBadRequest)
return
}
if !rdv.validator(r.Context(), authToken) {
log.Printf("rendezvous: Unable to validate authentication token id=%s", id)
http.Error(w, "invalid authentication Token", http.StatusPreconditionFailed)
return
}
}
137 changes: 114 additions & 23 deletions internal/rendezvous/rendezvous.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ import (
"sync"
"time"

"golang.org/x/build/buildlet"
"golang.org/x/build/internal"
"golang.org/x/build/revdial/v2"
"google.golang.org/api/idtoken"
)

// result contains the result for a waiting instance registration.
type result struct {
conn net.Conn
err error
bc buildlet.Client
err error
}

// entry contains the elements needed to process an instance registration.
Expand All @@ -28,34 +31,34 @@ type entry struct {
ch chan *result
}

// TokenVerifier verifies if a token is valid.
type TokenVerifier func(ctx context.Context, jwt string) bool
// TokenValidator verifies if a token is valid.
type TokenValidator func(ctx context.Context, jwt string) bool

// Rendezvous waits for buildlets to connect, verifies they are valid instances
// and passes the connection to the waiting caller.
type Rendezvous struct {
mu sync.Mutex

m map[string]*entry
verifier TokenVerifier
m map[string]*entry
validator TokenValidator
}

// Option is an optional configuration setting.
type Option func(*Rendezvous)

// OptionVerifier changes the verifier used by Rendezvous.
func OptionVerifier(v TokenVerifier) Option {
// OptionValidator changes the verifier used by Rendezvous.
func OptionValidator(v TokenValidator) Option {
return func(rdv *Rendezvous) {
rdv.verifier = v
rdv.validator = v
}
}

// New creates a Rendezvous element. The context that is passed in should be non-canceled
// during the lifetime of the running service.
func New(ctx context.Context, opts ...Option) *Rendezvous {
rdv := &Rendezvous{
m: make(map[string]*entry),
verifier: verifyToken,
m: make(map[string]*entry),
validator: validateLUCIIDToken,
}
for _, opt := range opts {
opt(rdv)
Expand Down Expand Up @@ -89,7 +92,6 @@ func (rdv *Rendezvous) RegisterInstance(ctx context.Context, id string, wait tim
ch: make(chan *result, 1),
}
rdv.mu.Unlock()
log.Printf("rendezvous: waiting for instance=%q", id)
}

// DeregisterInstance removes the registration for an instance which has been
Expand All @@ -98,13 +100,12 @@ func (rdv *Rendezvous) DeregisterInstance(ctx context.Context, id string) {
rdv.mu.Lock()
delete(rdv.m, id)
rdv.mu.Unlock()
log.Printf("rendezvous: stopped waiting for instance=%q", id)
}

// WaitForInstance waits for the registered instance to successfully connect. It waits for the
// lifetime of the context. If the instance is not registered or has exceeded the timeout period,
// it will immediately return an error.
func (rdv *Rendezvous) WaitForInstance(ctx context.Context, id string) (net.Conn, error) {
func (rdv *Rendezvous) WaitForInstance(ctx context.Context, id string) (buildlet.Client, error) {
rdv.mu.Lock()
e, ok := rdv.m[id]
rdv.mu.Unlock()
Expand All @@ -122,7 +123,7 @@ func (rdv *Rendezvous) WaitForInstance(ctx context.Context, id string) (net.Conn
delete(rdv.m, id)
close(e.ch)
rdv.mu.Unlock()
return res.conn, res.err
return res.bc, res.err
}
}

Expand Down Expand Up @@ -167,21 +168,111 @@ func (rdv *Rendezvous) HandleReverse(w http.ResponseWriter, r *http.Request) {
http.Error(w, "not expecting buildlet client", http.StatusPreconditionFailed)
return
}
if !rdv.verifier(r.Context(), authToken) {
if !rdv.validator(r.Context(), authToken) {
log.Printf("rendezvous: Unable to validate authentication token id=%s", id)
http.Error(w, "invalid authentication Token", http.StatusPreconditionFailed)
return
}
conn, _, err := w.(http.Hijacker).Hijack()
hj, ok := w.(http.Hijacker)
if !ok {
http.Error(w, "webserver does not support hijacking", http.StatusHTTPVersionNotSupported)
return
}
conn, _, err := hj.Hijack()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
res.ch <- &result{err: err}
return
}
bc, err := connToClient(conn, hostname, "swarming_task")
if err != nil {
log.Printf("rendezvous: unable to create buildlet client: %s", err)
conn.Close()
res.ch <- &result{err: err}
return
}
log.Printf("rendezvous instance connected %q", id)
res.ch <- &result{conn: conn}
res.ch <- &result{bc: bc}
}

func connToClient(conn net.Conn, hostname, hostType string) (buildlet.Client, error) {
if err := (&http.Response{StatusCode: http.StatusSwitchingProtocols, Proto: "HTTP/1.1"}).Write(conn); err != nil {
log.Printf("gomote: error writing upgrade response to reverse buildlet %s (%s) at %s: %v", hostname, hostType, conn.RemoteAddr(), err)
conn.Close()
return nil, err
}
revDialer := revdial.NewDialer(conn, "/revdial")
revDialerDone := revDialer.Done()
dialer := revDialer.Dial

client := buildlet.NewClient(conn.RemoteAddr().String(), buildlet.NoKeyPair)
client.SetHTTPClient(&http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return dialer(ctx)
},
},
})
client.SetDialer(dialer)
client.SetDescription(fmt.Sprintf("reverse peer %s/%s for host type %v", hostname, conn.RemoteAddr(), hostType))

var isDead struct {
sync.Mutex
v bool
}
client.SetOnHeartbeatFailure(func() {
isDead.Lock()
isDead.v = true
isDead.Unlock()
conn.Close()
})

// If the reverse dialer (which is always reading from the
// conn detects that the remote went away, close the buildlet
// client proactively.
go func() {
<-revDialerDone
isDead.Lock()
defer isDead.Unlock()
if !isDead.v {
client.Close()
}
}()
tstatus := time.Now()
status, err := client.Status(context.Background())
if err != nil {
log.Printf("Reverse connection %s/%s for %s did not answer status after %v: %v",
hostname, conn.RemoteAddr(), hostType, time.Since(tstatus), err)
conn.Close()
return nil, err
}
log.Printf("Buildlet %s/%s: %+v for %s", hostname, conn.RemoteAddr(), status, hostType)
return client, nil
}

// verifyToken verifies that the token is valid and contains the expected fields.
func verifyToken(ctx context.Context, jwt string) bool {
// TODO(go.dev/issue/63354) add service account verification
return false
// validateLUCIIDToken verifies that the token is valid and contains the expected fields.
func validateLUCIIDToken(ctx context.Context, jwt string) bool {
payload, err := idtoken.Validate(ctx, jwt, "https://gomote.golang.org")
if err != nil {
log.Printf("rendezvous: unable to validate authentication token: %s", err)
return false
}
if payload.Issuer != "https://accounts.google.com" {
log.Printf("rendezvous: incorrect issuer: %q", payload.Issuer)
return false
}
if payload.Expires+30 < time.Now().Unix() || payload.IssuedAt-30 > time.Now().Unix() {
log.Printf("rendezvous: Bad JWT times: expires %v, issued %v", time.Unix(payload.Expires, 0), time.Unix(payload.IssuedAt, 0))
return false
}
email, ok := payload.Claims["email"]
if !ok || email != "[email protected]" {
log.Printf("rendezvous: incorrect email=%s", email)
return false
}
emailVerified, ok := payload.Claims["email_verified"].(bool)
if !ok || !emailVerified {
log.Printf("rendezvous: email unverified email=%s", email)
return false
}
return true
}
27 changes: 16 additions & 11 deletions internal/rendezvous/rendezvous_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"sync"
"testing"
"time"

"golang.org/x/build/revdial/v2"
)

func TestNew(t *testing.T) {
Expand Down Expand Up @@ -58,7 +60,7 @@ func TestWaitForInstanceError(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
rdv := &Rendezvous{
m: make(map[string]*entry),
verifier: func(ctx context.Context, jwt string) bool {
validator: func(ctx context.Context, jwt string) bool {
return true
},
}
Expand All @@ -83,7 +85,7 @@ func TestWaitForInstanceError(t *testing.T) {
func TestWaitForInstaceErrorNonTLS(t *testing.T) {
rdv := &Rendezvous{
m: make(map[string]*entry),
verifier: func(ctx context.Context, jwt string) bool {
validator: func(ctx context.Context, jwt string) bool {
return true
},
}
Expand All @@ -100,36 +102,39 @@ func TestWaitForInstaceErrorNonTLS(t *testing.T) {
}
}

func TestWaitForInstaceError(t *testing.T) {
func TestWaitForInstaceRevdialError(t *testing.T) {
rdv := &Rendezvous{
m: make(map[string]*entry),
verifier: func(ctx context.Context, jwt string) bool {
validator: func(ctx context.Context, jwt string) bool {
return true
},
}
instanceID := "test-id-3"
ctx := context.Background()
rdv.RegisterInstance(ctx, instanceID, 15*time.Second)
ts := httptest.NewTLSServer(http.HandlerFunc(rdv.HandleReverse))
mux := http.NewServeMux()
mux.HandleFunc("/reverse", rdv.HandleReverse)
mux.Handle("/revdial", revdial.ConnHandler())
ts := httptest.NewTLSServer(mux)
defer ts.Close()
client := ts.Client()
req, err := http.NewRequest("GET", ts.URL, nil)
req, err := http.NewRequest("GET", ts.URL+"/reverse", nil)
req.Header.Set(HeaderID, instanceID)
req.Header.Set(HeaderToken, "test-token")
req.Header.Set(HeaderHostname, "test-hostname")

var wg sync.WaitGroup
wg.Add(1)
go func() {
wg.Add(1)
defer wg.Done()

_, _ = client.Do(req)
}()
c, err := rdv.WaitForInstance(ctx, instanceID)
if err != nil {
t.Fatalf("WaitForInstance(): got %s, want no error", err)
_, err = rdv.WaitForInstance(ctx, instanceID)
if err == nil {
// expect a missing status endpoint
t.Fatal("WaitForInstance(): got nil, want error")
}
c.Close()
wg.Wait()
}

Expand Down

0 comments on commit 6785f27

Please sign in to comment.