diff --git a/go.mod b/go.mod index eb1239251d7..accec9d3d81 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,6 @@ require ( google.golang.org/protobuf v1.35.1 gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473 gotest.tools/v3 v3.5.1 - inet.af/tcpproxy v0.0.0-20221017015627-91f861402626 // replaced to github.com/inetaf/tcpproxy (see the bottom of this go.mod file) k8s.io/api v0.31.1 k8s.io/apimachinery v0.31.1 k8s.io/client-go v0.31.1 @@ -127,6 +126,7 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect gvisor.dev/gvisor v0.0.0-20231023213702-2691a8f9b1cf // indirect + inet.af/tcpproxy v0.0.0-20221017015627-91f861402626 // indirect; replaced to github.com/inetaf/tcpproxy (see the bottom of this go.mod file) k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect diff --git a/pkg/vz/network_darwin.go b/pkg/vz/network_darwin.go index 1aceb8e294e..4f0c8680e42 100644 --- a/pkg/vz/network_darwin.go +++ b/pkg/vz/network_darwin.go @@ -3,17 +3,18 @@ package vz import ( - "context" "encoding/binary" + "errors" "io" "net" "os" + "sync" + "syscall" "time" "github.com/balajiv113/fd" "github.com/sirupsen/logrus" - "inet.af/tcpproxy" // replaced to github.com/inetaf/tcpproxy in go.mod ) func PassFDToUnix(unixSock string) (*os.File, error) { @@ -40,7 +41,7 @@ func DialQemu(unixSock string) (*os.File, error) { if err != nil { return nil, err } - qemuConn := &QEMUPacketConn{unixConn: unixConn} + qemuConn := &qemuPacketConn{Conn: unixConn} server, client, err := createSockPair() if err != nil { @@ -50,77 +51,117 @@ func DialQemu(unixSock string) (*os.File, error) { if err != nil { return nil, err } + vzConn := &packetConn{Conn: dgramConn} - remote := tcpproxy.DialProxy{ - DialContext: func(context.Context, string, string) (net.Conn, error) { - return dgramConn, nil - }, - } - go remote.HandleConn(qemuConn) + go forwardPackets(qemuConn, vzConn) return client, nil } -// QEMUPacketConn converts raw network packet to a QEMU supported network packet. -type QEMUPacketConn struct { - unixConn net.Conn +func forwardPackets(qemuConn *qemuPacketConn, vzConn *packetConn) { + defer qemuConn.Close() + defer vzConn.Close() + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + if _, err := io.Copy(qemuConn, vzConn); err != nil { + logrus.Errorf("Failed to forward packets from VZ to VMNET: %s", err) + } + }() + + go func() { + defer wg.Done() + if _, err := io.Copy(vzConn, qemuConn); err != nil { + logrus.Errorf("Failed to forward packets from VMNET to VZ: %s", err) + } + }() + + wg.Wait() } -var _ net.Conn = (*QEMUPacketConn)(nil) +// qemuPacketConn converts raw network packet to a QEMU supported network packet. +type qemuPacketConn struct { + net.Conn +} -// Read gets rid of the QEMU header packet and returns the raw packet as response. -func (v *QEMUPacketConn) Read(b []byte) (n int, err error) { - header := make([]byte, 4) - _, err = io.ReadFull(v.unixConn, header) - if err != nil { - logrus.Errorln("Failed to read header", err) +// Read reads a QEMU packet and returns the contained raw packet. Returns (len, +// nil) if a packet was read, and (0, err) on error. Errors means the prorocol +// is broken and the socket must be closed. +func (c *qemuPacketConn) Read(b []byte) (n int, err error) { + var size uint32 + if err := binary.Read(c.Conn, binary.BigEndian, &size); err != nil { + // Likely connection closed by peer. + return 0, err } - size := binary.BigEndian.Uint32(header) - reader := io.LimitReader(v.unixConn, int64(size)) + reader := io.LimitReader(c.Conn, int64(size)) _, err = reader.Read(b) if err != nil { - logrus.Errorln("Failed to read packet", err) + // Likely connection closed by peer. + return 0, err } return int(size), nil } -// Write puts QEMU header packet first and then writes the raw packet. -func (v *QEMUPacketConn) Write(b []byte) (n int, err error) { - header := make([]byte, 4) - binary.BigEndian.PutUint32(header, uint32(len(b))) - _, err = v.unixConn.Write(header) - if err != nil { - logrus.Errorln("Failed to write header", err) +// Write writes a QEMU packet containing the raw packet. Returns (len(b), nil) +// if a packet was written, and (0, err) if a packet was not fully written. +// Errors means the prorocol is broken and the socket must be closed. +func (c *qemuPacketConn) Write(b []byte) (int, error) { + size := len(b) + header := uint32(size) + if err := binary.Write(c.Conn, binary.BigEndian, header); err != nil { + return 0, err } - write, err := v.unixConn.Write(b) - if err != nil { - logrus.Errorln("Failed to write packet", err) + start := 0 + for start < size { + nw, err := c.Conn.Write(b[start:]) + if err != nil { + return 0, err + } + start += nw } - return write, nil + return size, nil } -func (v *QEMUPacketConn) Close() error { - return v.unixConn.Close() -} - -func (v *QEMUPacketConn) LocalAddr() net.Addr { - return v.unixConn.LocalAddr() -} - -func (v *QEMUPacketConn) RemoteAddr() net.Addr { - return v.unixConn.RemoteAddr() -} - -func (v *QEMUPacketConn) SetDeadline(t time.Time) error { - return v.unixConn.SetDeadline(t) -} +// Testing show that retries are very rare (e.g 24 of 62,499,008 packets) and +// requires 1 or 2 retries to complete the write. A 100 microseconds sleep loop +// consumes about 4% CPU on M1 Pro. +const writeRetryDelay = 100 * time.Microsecond -func (v *QEMUPacketConn) SetReadDeadline(t time.Time) error { - return v.unixConn.SetReadDeadline(t) +// packetConn handles ENOBUFS errors when writing to unixgram socket. +type packetConn struct { + net.Conn } -func (v *QEMUPacketConn) SetWriteDeadline(t time.Time) error { - return v.unixConn.SetWriteDeadline(t) +// Write writes a packet retrying on ENOBUFS errors. +func (c *packetConn) Write(b []byte) (int, error) { + var retries uint64 + for { + n, err := c.Conn.Write(b) + if n == 0 && err != nil && errors.Is(err, syscall.ENOBUFS) { + // This is an expected condition on BSD based system. The kernel + // does not support blocking until buffer space is available. + // The only way to recover is to retry the call until it + // succeeds, or drop the packet. + // Handled in a similar way in gvisor-tap-vsock: + // https://github.com/containers/gvisor-tap-vsock/issues/367 + time.Sleep(writeRetryDelay) + retries++ + continue + } + if err != nil { + return 0, err + } + if n < len(b) { + return n, errors.New("incomplete write to unixgram socket") + } + if retries > 0 { + logrus.Debugf("Write completed after %d retries", retries) + } + return n, nil + } } diff --git a/pkg/vz/network_darwin_test.go b/pkg/vz/network_darwin_test.go new file mode 100644 index 00000000000..43189e33ca7 --- /dev/null +++ b/pkg/vz/network_darwin_test.go @@ -0,0 +1,151 @@ +//go:build darwin && !no_vz + +package vz + +import ( + "encoding/binary" + "fmt" + "net" + "path/filepath" + "testing" +) + +const vmnetMaxPacketSize = 1514 +const packetsCount = 1000 + +func TestDialQemu(t *testing.T) { + listener, err := listenUnix(t.TempDir()) + if err != nil { + t.Fatal(err) + } + defer listener.Close() + t.Logf("Listening at %q", listener.Addr()) + + errc := make(chan error, 2) + + // Start the fake vmnet server. + go func() { + t.Log("Fake vmnet started") + errc <- serveOneClient(listener) + t.Log("Fake vmnet finished") + }() + + // Connect to the fake vmnet server. + client, err := DialQemu(listener.Addr().String()) + if err != nil { + t.Fatal(err) + } + t.Log("Connected to fake vment server") + + dgramConn, err := net.FileConn(client) + if err != nil { + t.Fatal(err) + } + + vzConn := packetConn{Conn: dgramConn} + defer vzConn.Close() + + go func() { + t.Log("Sender started") + buf := make([]byte, vmnetMaxPacketSize) + for i := 0; i < vmnetMaxPacketSize; i++ { + buf[i] = 0x55 + } + + // data packet format: + // 0-4 packet number + // 4-1514 0x55 ... + for i := 0; i < packetsCount; i++ { + binary.BigEndian.PutUint32(buf, uint32(i)) + if _, err := vzConn.Write(buf); err != nil { + errc <- err + return + } + } + t.Logf("Sent %d data packets", packetsCount) + + // quit packet format: + // 0-4: "quit" + copy(buf[:4], []byte("quit")) + if _, err := vzConn.Write(buf[:4]); err != nil { + errc <- err + return + } + + errc <- nil + t.Log("Sender finished") + }() + + // Read and verify packets to the server. + + buf := make([]byte, vmnetMaxPacketSize) + + t.Logf("Receiving and verifying data packets...") + for i := 0; i < packetsCount; i++ { + n, err := vzConn.Read(buf) + if err != nil { + t.Fatal(err) + } + if n < vmnetMaxPacketSize { + t.Fatalf("Expected %d bytes, got %d", vmnetMaxPacketSize, n) + } + + number := binary.BigEndian.Uint32(buf[:4]) + if number != uint32(i) { + t.Fatalf("Expected packet %d, got packet %d", i, number) + } + + for j := 4; j < vmnetMaxPacketSize; j++ { + if buf[j] != 0x55 { + t.Fatalf("Expected byte 0x55 at offset %d, got 0x%02x", j, buf[j]) + } + } + } + t.Logf("Recived and verified %d data packets", packetsCount) + + for i := 0; i < 2; i++ { + err := <-errc + if err != nil { + t.Fatal(err) + } + } +} + +// serveOneClient accepts one client and echo back received packets until a +// "quit" packet is sent. +func serveOneClient(listener *net.UnixListener) error { + conn, err := listener.Accept() + if err != nil { + return err + } + qemuConn := qemuPacketConn{Conn: conn} + defer qemuConn.Close() + + buf := make([]byte, vmnetMaxPacketSize) + for { + nr, err := qemuConn.Read(buf) + if err != nil { + return err + } + if string(buf[:4]) == "quit" { + return nil + } + nw, err := qemuConn.Write(buf[:nr]) + if err != nil { + return err + } + if nw != nr { + return fmt.Errorf("incomplete write: expected: %d, wrote: %d", nr, nw) + } + } +} + +// listenUnix creates and listen to unix socket under dir +func listenUnix(dir string) (*net.UnixListener, error) { + sock := filepath.Join(dir, "sock") + addr, err := net.ResolveUnixAddr("unix", sock) + if err != nil { + return nil, err + } + return net.ListenUnix("unix", addr) +}