Skip to content

Commit

Permalink
Merge pull request #2680 from nirs/fix-forward-packets
Browse files Browse the repository at this point in the history
Fix packet forwarding between vz and socket_vmnet
  • Loading branch information
AkihiroSuda authored Oct 11, 2024
2 parents 5a98e62 + d49ac31 commit fa4d0d8
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 53 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ require (
google.golang.org/protobuf v1.35.1
gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473
gotest.tools/v3 v3.5.1
inet.af/tcpproxy v0.0.0-20221017015627-91f861402626 // replaced to github.com/inetaf/tcpproxy (see the bottom of this go.mod file)
k8s.io/api v0.31.1
k8s.io/apimachinery v0.31.1
k8s.io/client-go v0.31.1
Expand Down Expand Up @@ -127,6 +126,7 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gvisor.dev/gvisor v0.0.0-20231023213702-2691a8f9b1cf // indirect
inet.af/tcpproxy v0.0.0-20221017015627-91f861402626 // indirect; replaced to github.com/inetaf/tcpproxy (see the bottom of this go.mod file)
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
Expand Down
145 changes: 93 additions & 52 deletions pkg/vz/network_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
package vz

import (
"context"
"encoding/binary"
"errors"
"io"
"net"
"os"
"sync"
"syscall"
"time"

"github.com/balajiv113/fd"

"github.com/sirupsen/logrus"
"inet.af/tcpproxy" // replaced to github.com/inetaf/tcpproxy in go.mod
)

func PassFDToUnix(unixSock string) (*os.File, error) {
Expand All @@ -40,7 +41,7 @@ func DialQemu(unixSock string) (*os.File, error) {
if err != nil {
return nil, err
}
qemuConn := &QEMUPacketConn{unixConn: unixConn}
qemuConn := &qemuPacketConn{Conn: unixConn}

server, client, err := createSockPair()
if err != nil {
Expand All @@ -50,77 +51,117 @@ func DialQemu(unixSock string) (*os.File, error) {
if err != nil {
return nil, err
}
vzConn := &packetConn{Conn: dgramConn}

remote := tcpproxy.DialProxy{
DialContext: func(context.Context, string, string) (net.Conn, error) {
return dgramConn, nil
},
}
go remote.HandleConn(qemuConn)
go forwardPackets(qemuConn, vzConn)

return client, nil
}

// QEMUPacketConn converts raw network packet to a QEMU supported network packet.
type QEMUPacketConn struct {
unixConn net.Conn
func forwardPackets(qemuConn *qemuPacketConn, vzConn *packetConn) {
defer qemuConn.Close()
defer vzConn.Close()

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
if _, err := io.Copy(qemuConn, vzConn); err != nil {
logrus.Errorf("Failed to forward packets from VZ to VMNET: %s", err)
}
}()

go func() {
defer wg.Done()
if _, err := io.Copy(vzConn, qemuConn); err != nil {
logrus.Errorf("Failed to forward packets from VMNET to VZ: %s", err)
}
}()

wg.Wait()
}

var _ net.Conn = (*QEMUPacketConn)(nil)
// qemuPacketConn converts raw network packet to a QEMU supported network packet.
type qemuPacketConn struct {
net.Conn
}

// Read gets rid of the QEMU header packet and returns the raw packet as response.
func (v *QEMUPacketConn) Read(b []byte) (n int, err error) {
header := make([]byte, 4)
_, err = io.ReadFull(v.unixConn, header)
if err != nil {
logrus.Errorln("Failed to read header", err)
// Read reads a QEMU packet and returns the contained raw packet. Returns (len,
// nil) if a packet was read, and (0, err) on error. Errors means the prorocol
// is broken and the socket must be closed.
func (c *qemuPacketConn) Read(b []byte) (n int, err error) {
var size uint32
if err := binary.Read(c.Conn, binary.BigEndian, &size); err != nil {
// Likely connection closed by peer.
return 0, err
}

size := binary.BigEndian.Uint32(header)
reader := io.LimitReader(v.unixConn, int64(size))
reader := io.LimitReader(c.Conn, int64(size))
_, err = reader.Read(b)
if err != nil {
logrus.Errorln("Failed to read packet", err)
// Likely connection closed by peer.
return 0, err
}
return int(size), nil
}

// Write puts QEMU header packet first and then writes the raw packet.
func (v *QEMUPacketConn) Write(b []byte) (n int, err error) {
header := make([]byte, 4)
binary.BigEndian.PutUint32(header, uint32(len(b)))
_, err = v.unixConn.Write(header)
if err != nil {
logrus.Errorln("Failed to write header", err)
// Write writes a QEMU packet containing the raw packet. Returns (len(b), nil)
// if a packet was written, and (0, err) if a packet was not fully written.
// Errors means the prorocol is broken and the socket must be closed.
func (c *qemuPacketConn) Write(b []byte) (int, error) {
size := len(b)
header := uint32(size)
if err := binary.Write(c.Conn, binary.BigEndian, header); err != nil {
return 0, err
}

write, err := v.unixConn.Write(b)
if err != nil {
logrus.Errorln("Failed to write packet", err)
start := 0
for start < size {
nw, err := c.Conn.Write(b[start:])
if err != nil {
return 0, err
}
start += nw
}
return write, nil
return size, nil
}

func (v *QEMUPacketConn) Close() error {
return v.unixConn.Close()
}

func (v *QEMUPacketConn) LocalAddr() net.Addr {
return v.unixConn.LocalAddr()
}

func (v *QEMUPacketConn) RemoteAddr() net.Addr {
return v.unixConn.RemoteAddr()
}

func (v *QEMUPacketConn) SetDeadline(t time.Time) error {
return v.unixConn.SetDeadline(t)
}
// Testing show that retries are very rare (e.g 24 of 62,499,008 packets) and
// requires 1 or 2 retries to complete the write. A 100 microseconds sleep loop
// consumes about 4% CPU on M1 Pro.
const writeRetryDelay = 100 * time.Microsecond

func (v *QEMUPacketConn) SetReadDeadline(t time.Time) error {
return v.unixConn.SetReadDeadline(t)
// packetConn handles ENOBUFS errors when writing to unixgram socket.
type packetConn struct {
net.Conn
}

func (v *QEMUPacketConn) SetWriteDeadline(t time.Time) error {
return v.unixConn.SetWriteDeadline(t)
// Write writes a packet retrying on ENOBUFS errors.
func (c *packetConn) Write(b []byte) (int, error) {
var retries uint64
for {
n, err := c.Conn.Write(b)
if n == 0 && err != nil && errors.Is(err, syscall.ENOBUFS) {
// This is an expected condition on BSD based system. The kernel
// does not support blocking until buffer space is available.
// The only way to recover is to retry the call until it
// succeeds, or drop the packet.
// Handled in a similar way in gvisor-tap-vsock:
// https://github.com/containers/gvisor-tap-vsock/issues/367
time.Sleep(writeRetryDelay)
retries++
continue
}
if err != nil {
return 0, err
}
if n < len(b) {
return n, errors.New("incomplete write to unixgram socket")
}
if retries > 0 {
logrus.Debugf("Write completed after %d retries", retries)
}
return n, nil
}
}
151 changes: 151 additions & 0 deletions pkg/vz/network_darwin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
//go:build darwin && !no_vz

package vz

import (
"encoding/binary"
"fmt"
"net"
"path/filepath"
"testing"
)

const vmnetMaxPacketSize = 1514
const packetsCount = 1000

func TestDialQemu(t *testing.T) {
listener, err := listenUnix(t.TempDir())
if err != nil {
t.Fatal(err)
}
defer listener.Close()
t.Logf("Listening at %q", listener.Addr())

errc := make(chan error, 2)

// Start the fake vmnet server.
go func() {
t.Log("Fake vmnet started")
errc <- serveOneClient(listener)
t.Log("Fake vmnet finished")
}()

// Connect to the fake vmnet server.
client, err := DialQemu(listener.Addr().String())
if err != nil {
t.Fatal(err)
}
t.Log("Connected to fake vment server")

dgramConn, err := net.FileConn(client)
if err != nil {
t.Fatal(err)
}

vzConn := packetConn{Conn: dgramConn}
defer vzConn.Close()

go func() {
t.Log("Sender started")
buf := make([]byte, vmnetMaxPacketSize)
for i := 0; i < vmnetMaxPacketSize; i++ {
buf[i] = 0x55
}

// data packet format:
// 0-4 packet number
// 4-1514 0x55 ...
for i := 0; i < packetsCount; i++ {
binary.BigEndian.PutUint32(buf, uint32(i))
if _, err := vzConn.Write(buf); err != nil {
errc <- err
return
}
}
t.Logf("Sent %d data packets", packetsCount)

// quit packet format:
// 0-4: "quit"
copy(buf[:4], []byte("quit"))
if _, err := vzConn.Write(buf[:4]); err != nil {
errc <- err
return
}

errc <- nil
t.Log("Sender finished")
}()

// Read and verify packets to the server.

buf := make([]byte, vmnetMaxPacketSize)

t.Logf("Receiving and verifying data packets...")
for i := 0; i < packetsCount; i++ {
n, err := vzConn.Read(buf)
if err != nil {
t.Fatal(err)
}
if n < vmnetMaxPacketSize {
t.Fatalf("Expected %d bytes, got %d", vmnetMaxPacketSize, n)
}

number := binary.BigEndian.Uint32(buf[:4])
if number != uint32(i) {
t.Fatalf("Expected packet %d, got packet %d", i, number)
}

for j := 4; j < vmnetMaxPacketSize; j++ {
if buf[j] != 0x55 {
t.Fatalf("Expected byte 0x55 at offset %d, got 0x%02x", j, buf[j])
}
}
}
t.Logf("Recived and verified %d data packets", packetsCount)

for i := 0; i < 2; i++ {
err := <-errc
if err != nil {
t.Fatal(err)
}
}
}

// serveOneClient accepts one client and echo back received packets until a
// "quit" packet is sent.
func serveOneClient(listener *net.UnixListener) error {
conn, err := listener.Accept()
if err != nil {
return err
}
qemuConn := qemuPacketConn{Conn: conn}
defer qemuConn.Close()

buf := make([]byte, vmnetMaxPacketSize)
for {
nr, err := qemuConn.Read(buf)
if err != nil {
return err
}
if string(buf[:4]) == "quit" {
return nil
}
nw, err := qemuConn.Write(buf[:nr])
if err != nil {
return err
}
if nw != nr {
return fmt.Errorf("incomplete write: expected: %d, wrote: %d", nr, nw)
}
}
}

// listenUnix creates and listen to unix socket under dir
func listenUnix(dir string) (*net.UnixListener, error) {
sock := filepath.Join(dir, "sock")
addr, err := net.ResolveUnixAddr("unix", sock)
if err != nil {
return nil, err
}
return net.ListenUnix("unix", addr)
}

0 comments on commit fa4d0d8

Please sign in to comment.