From d294cea90f18e167ce7e30e34fce9d09d9b6bba9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Mon, 7 Oct 2024 19:53:58 +0200 Subject: [PATCH 01/11] Apply sender for bind proxy --- client/iface/bind/{bind.go => ice_bind.go} | 49 +++++++++++- client/iface/device/device_darwin.go | 5 +- client/iface/device/device_netstack.go | 5 +- client/iface/device/device_usp_unix.go | 6 +- client/iface/iface.go | 34 ++++++--- client/iface/iface_darwin.go | 8 +- client/iface/iface_unix.go | 38 ++++++---- client/iface/iwginterface.go | 2 + client/iface/wgproxy/bind/proxy.go | 42 ++++++++++ .../wgproxy/ebpf/portlookup.go | 0 .../wgproxy/ebpf/portlookup_test.go | 0 .../{internal => iface}/wgproxy/ebpf/proxy.go | 2 +- .../wgproxy/ebpf/proxy_test.go | 0 .../wgproxy/ebpf/wrapper.go | 2 +- client/iface/wgproxy/factory_linux.go | 76 +++++++++++++++++++ client/iface/wgproxy/factory_nonlinux.go | 35 +++++++++ client/{internal => iface}/wgproxy/proxy.go | 2 +- .../{internal => iface}/wgproxy/proxy_test.go | 6 +- .../{internal => iface}/wgproxy/usp/proxy.go | 3 +- client/internal/engine.go | 15 +--- client/internal/peer/conn.go | 25 +++--- client/internal/wgproxy/factory_linux.go | 50 ------------ client/internal/wgproxy/factory_nonlinux.go | 21 ----- 23 files changed, 285 insertions(+), 141 deletions(-) rename client/iface/bind/{bind.go => ice_bind.go} (72%) create mode 100644 client/iface/wgproxy/bind/proxy.go rename client/{internal => iface}/wgproxy/ebpf/portlookup.go (100%) rename client/{internal => iface}/wgproxy/ebpf/portlookup_test.go (100%) rename client/{internal => iface}/wgproxy/ebpf/proxy.go (99%) rename client/{internal => iface}/wgproxy/ebpf/proxy_test.go (100%) rename client/{internal => iface}/wgproxy/ebpf/wrapper.go (95%) create mode 100644 client/iface/wgproxy/factory_linux.go create mode 100644 client/iface/wgproxy/factory_nonlinux.go rename client/{internal => iface}/wgproxy/proxy.go (72%) rename client/{internal => iface}/wgproxy/proxy_test.go (92%) rename client/{internal => iface}/wgproxy/usp/proxy.go (96%) delete mode 100644 client/internal/wgproxy/factory_linux.go delete mode 100644 client/internal/wgproxy/factory_nonlinux.go diff --git a/client/iface/bind/bind.go b/client/iface/bind/ice_bind.go similarity index 72% rename from client/iface/bind/bind.go rename to client/iface/bind/ice_bind.go index ba6153cb73..ae041f891b 100644 --- a/client/iface/bind/bind.go +++ b/client/iface/bind/ice_bind.go @@ -4,6 +4,7 @@ import ( "fmt" "net" "runtime" + "strings" "sync" "github.com/pion/stun/v2" @@ -27,15 +28,17 @@ type ICEBind struct { muUDPMux sync.Mutex transportNet transport.Net - udpMux *UniversalUDPMuxDefault + filterFn FilterFn + endpoints map[string]net.Conn - filterFn FilterFn + udpMux *UniversalUDPMuxDefault } func NewICEBind(transportNet transport.Net, filterFn FilterFn) *ICEBind { ib := &ICEBind{ transportNet: transportNet, filterFn: filterFn, + endpoints: make(map[string]net.Conn), } rc := receiverCreator{ @@ -56,6 +59,33 @@ func (s *ICEBind) GetICEMux() (*UniversalUDPMuxDefault, error) { return s.udpMux, nil } +func (b *ICEBind) SetEndpoint(peerAddress *net.UDPAddr, conn net.Conn) (*net.UDPAddr, error) { + fakeAddr, err := fakeAddress(peerAddress) + if err != nil { + return nil, err + } + b.endpoints[fakeAddr.String()] = conn + return fakeAddr, nil +} + +func (b *ICEBind) RemoveEndpoint(fakeAddr *net.UDPAddr) { + delete(b.endpoints, fakeAddr.String()) +} + +func (b *ICEBind) Send(bufs [][]byte, ep wgConn.Endpoint) error { + conn, ok := b.endpoints[ep.DstToString()] + if !ok { + return b.StdNetBind.Send(bufs, ep) + } + + for _, buf := range bufs { + if _, err := conn.Write(buf); err != nil { + return err + } + } + return nil +} + func (s *ICEBind) createIPv4ReceiverFn(ipv4MsgsPool *sync.Pool, pc *ipv4.PacketConn, conn *net.UDPConn) wgConn.ReceiveFunc { s.muUDPMux.Lock() defer s.muUDPMux.Unlock() @@ -90,6 +120,7 @@ func (s *ICEBind) createIPv4ReceiverFn(ipv4MsgsPool *sync.Pool, pc *ipv4.PacketC for i := 0; i < numMsgs; i++ { msg := &(*msgs)[i] + log.Infof("---- read msg: %s", msg.Addr.String()) // todo: handle err ok, _ := s.filterOutStunMessages(msg.Buffers, msg.N, msg.Addr) if ok { @@ -101,6 +132,7 @@ func (s *ICEBind) createIPv4ReceiverFn(ipv4MsgsPool *sync.Pool, pc *ipv4.PacketC addrPort := msg.Addr.(*net.UDPAddr).AddrPort() ep := &wgConn.StdNetEndpoint{AddrPort: addrPort} // TODO: remove allocation wgConn.GetSrcFromControl(msg.OOB[:msg.NN], ep) + log.Infof("---- ep msg: %v", ep) eps[i] = ep } return numMsgs, nil @@ -140,3 +172,16 @@ func (s *ICEBind) parseSTUNMessage(raw []byte) (*stun.Message, error) { return msg, nil } + +func fakeAddress(peerAddress *net.UDPAddr) (*net.UDPAddr, error) { + octets := strings.Split(peerAddress.IP.String(), ".") + if len(octets) != 4 { + return nil, fmt.Errorf("invalid IP format") + } + + newAddr := &net.UDPAddr{ + IP: net.ParseIP(fmt.Sprintf("127.1.%s.%s", octets[2], octets[3])), + Port: peerAddress.Port, + } + return newAddr, nil +} diff --git a/client/iface/device/device_darwin.go b/client/iface/device/device_darwin.go index 03e85a7f17..b5a128bc1c 100644 --- a/client/iface/device/device_darwin.go +++ b/client/iface/device/device_darwin.go @@ -6,7 +6,6 @@ import ( "fmt" "os/exec" - "github.com/pion/transport/v3" log "github.com/sirupsen/logrus" "golang.zx2c4.com/wireguard/device" "golang.zx2c4.com/wireguard/tun" @@ -29,14 +28,14 @@ type TunDevice struct { configurer WGConfigurer } -func NewTunDevice(name string, address WGAddress, port int, key string, mtu int, transportNet transport.Net, filterFn bind.FilterFn) *TunDevice { +func NewTunDevice(name string, address WGAddress, port int, key string, mtu int, iceBind *bind.ICEBind) *TunDevice { return &TunDevice{ name: name, address: address, port: port, key: key, mtu: mtu, - iceBind: bind.NewICEBind(transportNet, filterFn), + iceBind: iceBind, } } diff --git a/client/iface/device/device_netstack.go b/client/iface/device/device_netstack.go index 440a1ca191..f5d39e9e07 100644 --- a/client/iface/device/device_netstack.go +++ b/client/iface/device/device_netstack.go @@ -6,7 +6,6 @@ package device import ( "fmt" - "github.com/pion/transport/v3" log "github.com/sirupsen/logrus" "golang.zx2c4.com/wireguard/device" @@ -31,7 +30,7 @@ type TunNetstackDevice struct { configurer WGConfigurer } -func NewNetstackDevice(name string, address WGAddress, wgPort int, key string, mtu int, transportNet transport.Net, listenAddress string, filterFn bind.FilterFn) *TunNetstackDevice { +func NewNetstackDevice(name string, address WGAddress, wgPort int, key string, mtu int, iceBind *bind.ICEBind, listenAddress string) *TunNetstackDevice { return &TunNetstackDevice{ name: name, address: address, @@ -39,7 +38,7 @@ func NewNetstackDevice(name string, address WGAddress, wgPort int, key string, m key: key, mtu: mtu, listenAddress: listenAddress, - iceBind: bind.NewICEBind(transportNet, filterFn), + iceBind: iceBind, } } diff --git a/client/iface/device/device_usp_unix.go b/client/iface/device/device_usp_unix.go index 4175f65569..643d77565c 100644 --- a/client/iface/device/device_usp_unix.go +++ b/client/iface/device/device_usp_unix.go @@ -7,7 +7,6 @@ import ( "os" "runtime" - "github.com/pion/transport/v3" log "github.com/sirupsen/logrus" "golang.zx2c4.com/wireguard/device" "golang.zx2c4.com/wireguard/tun" @@ -30,7 +29,7 @@ type USPDevice struct { configurer WGConfigurer } -func NewUSPDevice(name string, address WGAddress, port int, key string, mtu int, transportNet transport.Net, filterFn bind.FilterFn) *USPDevice { +func NewUSPDevice(name string, address WGAddress, port int, key string, mtu int, iceBind *bind.ICEBind) *USPDevice { log.Infof("using userspace bind mode") checkUser() @@ -41,7 +40,8 @@ func NewUSPDevice(name string, address WGAddress, port int, key string, mtu int, port: port, key: key, mtu: mtu, - iceBind: bind.NewICEBind(transportNet, filterFn)} + iceBind: iceBind, + } } func (t *USPDevice) Create() (WGConfigurer, error) { diff --git a/client/iface/iface.go b/client/iface/iface.go index accf5ce0af..b55143997e 100644 --- a/client/iface/iface.go +++ b/client/iface/iface.go @@ -6,12 +6,15 @@ import ( "sync" "time" + "github.com/hashicorp/go-multierror" log "github.com/sirupsen/logrus" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "github.com/netbirdio/netbird/client/errors" "github.com/netbirdio/netbird/client/iface/bind" "github.com/netbirdio/netbird/client/iface/configurer" "github.com/netbirdio/netbird/client/iface/device" + "github.com/netbirdio/netbird/client/iface/wgproxy" ) const ( @@ -28,8 +31,13 @@ type WGIface struct { userspaceBind bool mu sync.Mutex - configurer device.WGConfigurer - filter device.PacketFilter + configurer device.WGConfigurer + filter device.PacketFilter + wgProxyFactory *wgproxy.Factory +} + +func (w *WGIface) GetProxy() wgproxy.Proxy { + return w.wgProxyFactory.GetProxy() } // IsUserspaceBind indicates whether this interfaces is userspace with bind.ICEBind @@ -124,22 +132,26 @@ func (w *WGIface) Close() error { w.mu.Lock() defer w.mu.Unlock() - err := w.tun.Close() - if err != nil { - return fmt.Errorf("failed to close wireguard interface %s: %w", w.Name(), err) + var result *multierror.Error + + if err := w.wgProxyFactory.Free(); err != nil { + result = multierror.Append(result, fmt.Errorf("failed to free WireGuard proxy: %w", err)) } - err = w.waitUntilRemoved() - if err != nil { + if err := w.tun.Close(); err != nil { + result = multierror.Append(result, fmt.Errorf("failed to close wireguard interface %s: %w", w.Name(), err)) + } + + if err := w.waitUntilRemoved(); err != nil { log.Warnf("failed to remove WireGuard interface %s: %v", w.Name(), err) - err = w.Destroy() - if err != nil { - return fmt.Errorf("failed to remove WireGuard interface %s: %w", w.Name(), err) + if err := w.Destroy(); err != nil { + result = multierror.Append(result, fmt.Errorf("failed to remove WireGuard interface %s: %w", w.Name(), err)) + return errors.FormatErrorOrNil(result) } log.Infof("interface %s successfully removed", w.Name()) } - return nil + return errors.FormatErrorOrNil(result) } // SetFilter sets packet filters for the userspace implementation diff --git a/client/iface/iface_darwin.go b/client/iface/iface_darwin.go index b46ea0f806..0f0a82178f 100644 --- a/client/iface/iface_darwin.go +++ b/client/iface/iface_darwin.go @@ -12,6 +12,7 @@ import ( "github.com/netbirdio/netbird/client/iface/bind" "github.com/netbirdio/netbird/client/iface/device" "github.com/netbirdio/netbird/client/iface/netstack" + "github.com/netbirdio/netbird/client/iface/wgproxy" ) // NewWGIFace Creates a new WireGuard interface instance @@ -21,16 +22,19 @@ func NewWGIFace(iFaceName string, address string, wgPort int, wgPrivKey string, return nil, err } + iceBind := bind.NewICEBind(transportNet, filterFn) + wgIFace := &WGIface{ userspaceBind: true, + wgProxyMgr: wgproxy.NewManager(wgPort, iceBind), } if netstack.IsEnabled() { - wgIFace.tun = device.NewNetstackDevice(iFaceName, wgAddress, wgPort, wgPrivKey, mtu, transportNet, netstack.ListenAddr(), filterFn) + wgIFace.tun = device.NewNetstackDevice(iFaceName, wgAddress, wgPort, wgPrivKey, mtu, iceBind, netstack.ListenAddr()) return wgIFace, nil } - wgIFace.tun = device.NewTunDevice(iFaceName, wgAddress, wgPort, wgPrivKey, mtu, transportNet, filterFn) + wgIFace.tun = device.NewTunDevice(iFaceName, wgAddress, wgPort, wgPrivKey, mtu, iceBind) return wgIFace, nil } diff --git a/client/iface/iface_unix.go b/client/iface/iface_unix.go index 09dbb2c1f7..2ad1953af3 100644 --- a/client/iface/iface_unix.go +++ b/client/iface/iface_unix.go @@ -10,7 +10,7 @@ import ( "github.com/netbirdio/netbird/client/iface/bind" "github.com/netbirdio/netbird/client/iface/device" - "github.com/netbirdio/netbird/client/iface/netstack" + "github.com/netbirdio/netbird/client/iface/wgproxy" ) // NewWGIFace Creates a new WireGuard interface instance @@ -22,25 +22,31 @@ func NewWGIFace(iFaceName string, address string, wgPort int, wgPrivKey string, wgIFace := &WGIface{} - // move the kernel/usp/netstack preference evaluation to upper layer - if netstack.IsEnabled() { - wgIFace.tun = device.NewNetstackDevice(iFaceName, wgAddress, wgPort, wgPrivKey, mtu, transportNet, netstack.ListenAddr(), filterFn) + /* + if netstack.IsEnabled() { + iceBind := bind.NewICEBind(transportNet, filterFn) + wgIFace.tun = device.NewNetstackDevice(iFaceName, wgAddress, wgPort, wgPrivKey, mtu, iceBind, netstack.ListenAddr()) + wgIFace.userspaceBind = true + wgIFace.wgProxyFactory = wgproxy.NewFactory(wgPort, iceBind) + return wgIFace, nil + } + + if device.WireGuardModuleIsLoaded() { + wgIFace.tun = device.NewKernelDevice(iFaceName, wgAddress, wgPort, wgPrivKey, mtu, transportNet) + wgIFace.userspaceBind = false + wgIFace.wgProxyFactory = wgproxy.NewFactory(wgPort, nil) + return wgIFace, nil + } + */ + if device.ModuleTunIsLoaded() { + iceBind := bind.NewICEBind(transportNet, filterFn) + wgIFace.tun = device.NewUSPDevice(iFaceName, wgAddress, wgPort, wgPrivKey, mtu, iceBind) wgIFace.userspaceBind = true + wgIFace.wgProxyFactory = wgproxy.NewFactory(wgPort, iceBind) return wgIFace, nil } - if device.WireGuardModuleIsLoaded() { - wgIFace.tun = device.NewKernelDevice(iFaceName, wgAddress, wgPort, wgPrivKey, mtu, transportNet) - wgIFace.userspaceBind = false - return wgIFace, nil - } - - if !device.ModuleTunIsLoaded() { - return nil, fmt.Errorf("couldn't check or load tun module") - } - wgIFace.tun = device.NewUSPDevice(iFaceName, wgAddress, wgPort, wgPrivKey, mtu, transportNet, nil) - wgIFace.userspaceBind = true - return wgIFace, nil + return nil, fmt.Errorf("couldn't check or load tun module") } // CreateOnAndroid this function make sense on mobile only diff --git a/client/iface/iwginterface.go b/client/iface/iwginterface.go index cb6d7ccd9a..f5ab295390 100644 --- a/client/iface/iwginterface.go +++ b/client/iface/iwginterface.go @@ -11,6 +11,7 @@ import ( "github.com/netbirdio/netbird/client/iface/bind" "github.com/netbirdio/netbird/client/iface/configurer" "github.com/netbirdio/netbird/client/iface/device" + "github.com/netbirdio/netbird/client/iface/wgproxy" ) type IWGIface interface { @@ -22,6 +23,7 @@ type IWGIface interface { ToInterface() *net.Interface Up() (*bind.UniversalUDPMuxDefault, error) UpdateAddr(newAddr string) error + GetProxy() wgproxy.Proxy UpdatePeer(peerKey string, allowedIps string, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error RemovePeer(peerKey string) error AddAllowedIP(peerKey string, allowedIP string) error diff --git a/client/iface/wgproxy/bind/proxy.go b/client/iface/wgproxy/bind/proxy.go new file mode 100644 index 0000000000..8fd3307798 --- /dev/null +++ b/client/iface/wgproxy/bind/proxy.go @@ -0,0 +1,42 @@ +package bind + +import ( + "context" + "net" + + "github.com/netbirdio/netbird/client/iface/bind" +) + +type ProxyBind struct { + Bind *bind.ICEBind + + addr *net.UDPAddr +} + +func (p *ProxyBind) AddTurnConn(_ context.Context, endpoint *net.UDPAddr, relayedConn net.Conn) error { + addr, err := p.Bind.SetEndpoint(endpoint, relayedConn) + if err != nil { + return err + } + + p.addr = addr + return err + +} + +func (p *ProxyBind) EndpointAddr() *net.UDPAddr { + return p.addr +} + +func (p *ProxyBind) Work() { + // todo implement me +} + +func (p *ProxyBind) Pause() { + // todo implement me +} + +func (p *ProxyBind) CloseConn() error { + p.Bind.RemoveEndpoint(p.addr) + return nil +} diff --git a/client/internal/wgproxy/ebpf/portlookup.go b/client/iface/wgproxy/ebpf/portlookup.go similarity index 100% rename from client/internal/wgproxy/ebpf/portlookup.go rename to client/iface/wgproxy/ebpf/portlookup.go diff --git a/client/internal/wgproxy/ebpf/portlookup_test.go b/client/iface/wgproxy/ebpf/portlookup_test.go similarity index 100% rename from client/internal/wgproxy/ebpf/portlookup_test.go rename to client/iface/wgproxy/ebpf/portlookup_test.go diff --git a/client/internal/wgproxy/ebpf/proxy.go b/client/iface/wgproxy/ebpf/proxy.go similarity index 99% rename from client/internal/wgproxy/ebpf/proxy.go rename to client/iface/wgproxy/ebpf/proxy.go index e850f4533c..e21fc35d4e 100644 --- a/client/internal/wgproxy/ebpf/proxy.go +++ b/client/iface/wgproxy/ebpf/proxy.go @@ -119,7 +119,7 @@ func (p *WGEBPFProxy) Free() error { p.ctxCancel() var result *multierror.Error - if p.conn != nil { // p.conn will be nil if we have failed to listen + if p.conn != nil { if err := p.conn.Close(); err != nil { result = multierror.Append(result, err) } diff --git a/client/internal/wgproxy/ebpf/proxy_test.go b/client/iface/wgproxy/ebpf/proxy_test.go similarity index 100% rename from client/internal/wgproxy/ebpf/proxy_test.go rename to client/iface/wgproxy/ebpf/proxy_test.go diff --git a/client/internal/wgproxy/ebpf/wrapper.go b/client/iface/wgproxy/ebpf/wrapper.go similarity index 95% rename from client/internal/wgproxy/ebpf/wrapper.go rename to client/iface/wgproxy/ebpf/wrapper.go index ec1162263b..e9726b1bf3 100644 --- a/client/internal/wgproxy/ebpf/wrapper.go +++ b/client/iface/wgproxy/ebpf/wrapper.go @@ -27,7 +27,7 @@ type ProxyWrapper struct { isStarted bool } -func (p *ProxyWrapper) AddTurnConn(ctx context.Context, remoteConn net.Conn) error { +func (p *ProxyWrapper) AddTurnConn(ctx context.Context, endpoint *net.UDPAddr, remoteConn net.Conn) error { addr, err := p.WgeBPFProxy.AddTurnConn(remoteConn) if err != nil { return fmt.Errorf("add turn conn: %w", err) diff --git a/client/iface/wgproxy/factory_linux.go b/client/iface/wgproxy/factory_linux.go new file mode 100644 index 0000000000..87f4ef967f --- /dev/null +++ b/client/iface/wgproxy/factory_linux.go @@ -0,0 +1,76 @@ +//go:build !android + +package wgproxy + +import ( + log "github.com/sirupsen/logrus" + + "github.com/netbirdio/netbird/client/iface/bind" + proxyBind "github.com/netbirdio/netbird/client/iface/wgproxy/bind" + "github.com/netbirdio/netbird/client/iface/wgproxy/ebpf" + "github.com/netbirdio/netbird/client/iface/wgproxy/usp" +) + +type proxyMode int + +const ( + proxyModeUDP proxyMode = iota + proxyModeEBPF + proxyModeBind +) + +type Factory struct { + wgPort int + mode proxyMode + + ebpfProxy *ebpf.WGEBPFProxy + bind *bind.ICEBind +} + +func NewFactory(wgPort int, iceBind *bind.ICEBind) *Factory { + f := &Factory{ + wgPort: wgPort, + } + + if iceBind != nil { + f.bind = iceBind + f.mode = proxyModeBind + return f + } + + ebpfProxy := ebpf.NewWGEBPFProxy(wgPort) + if err := ebpfProxy.Listen(); err != nil { + log.Warnf("failed to initialize ebpf proxy, fallback to user space proxy: %s", err) + f.mode = proxyModeUDP + return f + } + f.ebpfProxy = ebpfProxy + f.mode = proxyModeEBPF + return f +} + +func (w *Factory) GetProxy() Proxy { + switch w.mode { + case proxyModeUDP: + return usp.NewWGUserSpaceProxy(w.wgPort) + case proxyModeEBPF: + p := &ebpf.ProxyWrapper{ + WgeBPFProxy: w.ebpfProxy, + } + return p + case proxyModeBind: + p := &proxyBind.ProxyBind{ + Bind: w.bind, + } + return p + default: + return nil + } +} + +func (w *Factory) Free() error { + if w.ebpfProxy == nil { + return nil + } + return w.ebpfProxy.Free() +} diff --git a/client/iface/wgproxy/factory_nonlinux.go b/client/iface/wgproxy/factory_nonlinux.go new file mode 100644 index 0000000000..8334f677f0 --- /dev/null +++ b/client/iface/wgproxy/factory_nonlinux.go @@ -0,0 +1,35 @@ +//go:build !linux || android + +package wgproxy + +import ( + "github.com/netbirdio/netbird/client/iface/bind" + "github.com/netbirdio/netbird/client/iface/wgproxy/usp" +) + +type Factory struct { + bind *bind.ICEBind + port int +} + +func NewFactory(port int, bind *bind.ICEBind) *Factory { + return &Factory{ + port: port, + bind: bind, + } +} + +func (w *Factory) GetProxy() Proxy { + /* + p := &proxyBind.ProxyBind{ + Bind: w.bind, + } + + */ + p := usp.NewWGUserSpaceProxy(w.port) + return p +} + +func (w *Factory) Free() error { + return nil +} diff --git a/client/internal/wgproxy/proxy.go b/client/iface/wgproxy/proxy.go similarity index 72% rename from client/internal/wgproxy/proxy.go rename to client/iface/wgproxy/proxy.go index 558121cdd5..4a217caa2a 100644 --- a/client/internal/wgproxy/proxy.go +++ b/client/iface/wgproxy/proxy.go @@ -7,7 +7,7 @@ import ( // Proxy is a transfer layer between the relayed connection and the WireGuard type Proxy interface { - AddTurnConn(ctx context.Context, turnConn net.Conn) error + AddTurnConn(ctx context.Context, endpoint *net.UDPAddr, turnConn net.Conn) error EndpointAddr() *net.UDPAddr Work() Pause() diff --git a/client/internal/wgproxy/proxy_test.go b/client/iface/wgproxy/proxy_test.go similarity index 92% rename from client/internal/wgproxy/proxy_test.go rename to client/iface/wgproxy/proxy_test.go index b88ff3f83c..7a9547c56b 100644 --- a/client/internal/wgproxy/proxy_test.go +++ b/client/iface/wgproxy/proxy_test.go @@ -11,8 +11,8 @@ import ( "testing" "time" - "github.com/netbirdio/netbird/client/internal/wgproxy/ebpf" - "github.com/netbirdio/netbird/client/internal/wgproxy/usp" + "github.com/netbirdio/netbird/client/iface/wgproxy/ebpf" + "github.com/netbirdio/netbird/client/iface/wgproxy/usp" "github.com/netbirdio/netbird/util" ) @@ -114,7 +114,7 @@ func TestProxyCloseByRemoteConn(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { relayedConn := newMockConn() - err := tt.proxy.AddTurnConn(ctx, relayedConn) + err := tt.proxy.AddTurnConn(ctx, nil, relayedConn) if err != nil { t.Errorf("error: %v", err) } diff --git a/client/internal/wgproxy/usp/proxy.go b/client/iface/wgproxy/usp/proxy.go similarity index 96% rename from client/internal/wgproxy/usp/proxy.go rename to client/iface/wgproxy/usp/proxy.go index c526de73cd..56c26841b7 100644 --- a/client/internal/wgproxy/usp/proxy.go +++ b/client/iface/wgproxy/usp/proxy.go @@ -13,6 +13,7 @@ import ( ) // WGUserSpaceProxy proxies +// todo: rename to UDP proxy type WGUserSpaceProxy struct { localWGListenPort int @@ -42,7 +43,7 @@ func NewWGUserSpaceProxy(wgPort int) *WGUserSpaceProxy { // the connection is complete, an error is returned. Once successfully // connected, any expiration of the context will not affect the // connection. -func (p *WGUserSpaceProxy) AddTurnConn(ctx context.Context, remoteConn net.Conn) error { +func (p *WGUserSpaceProxy) AddTurnConn(ctx context.Context, endpoint *net.UDPAddr, remoteConn net.Conn) error { dialer := net.Dialer{} localConn, err := dialer.DialContext(ctx, "udp", fmt.Sprintf(":%d", p.localWGListenPort)) if err != nil { diff --git a/client/internal/engine.go b/client/internal/engine.go index c51901a225..0986feebd3 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -35,7 +35,6 @@ import ( "github.com/netbirdio/netbird/client/internal/rosenpass" "github.com/netbirdio/netbird/client/internal/routemanager" "github.com/netbirdio/netbird/client/internal/routemanager/systemops" - "github.com/netbirdio/netbird/client/internal/wgproxy" nbssh "github.com/netbirdio/netbird/client/ssh" "github.com/netbirdio/netbird/client/system" nbdns "github.com/netbirdio/netbird/dns" @@ -141,8 +140,7 @@ type Engine struct { ctx context.Context cancel context.CancelFunc - wgInterface iface.IWGIface - wgProxyFactory *wgproxy.Factory + wgInterface iface.IWGIface udpMux *bind.UniversalUDPMuxDefault @@ -292,9 +290,6 @@ func (e *Engine) Start() error { } e.wgInterface = wgIface - userspace := e.wgInterface.IsUserspaceBind() - e.wgProxyFactory = wgproxy.NewFactory(userspace, e.config.WgPort) - if e.config.RosenpassEnabled { log.Infof("rosenpass is enabled") if e.config.RosenpassPermissive { @@ -959,7 +954,7 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, e }, } - peerConn, err := peer.NewConn(e.ctx, config, e.statusRecorder, e.wgProxyFactory, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager) + peerConn, err := peer.NewConn(e.ctx, config, e.statusRecorder, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager) if err != nil { return nil, err } @@ -1110,12 +1105,6 @@ func (e *Engine) parseNATExternalIPMappings() []string { } func (e *Engine) close() { - if e.wgProxyFactory != nil { - if err := e.wgProxyFactory.Free(); err != nil { - log.Errorf("failed closing ebpf proxy: %s", err) - } - } - // stop/restore DNS first so dbus and friends don't complain because of a missing interface e.stopDNSServer() diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 50862c0c66..cad424f1a3 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -17,8 +17,8 @@ import ( "github.com/netbirdio/netbird/client/iface" "github.com/netbirdio/netbird/client/iface/configurer" + "github.com/netbirdio/netbird/client/iface/wgproxy" "github.com/netbirdio/netbird/client/internal/stdnet" - "github.com/netbirdio/netbird/client/internal/wgproxy" relayClient "github.com/netbirdio/netbird/relay/client" "github.com/netbirdio/netbird/route" nbnet "github.com/netbirdio/netbird/util/net" @@ -79,10 +79,10 @@ type Conn struct { ctxCancel context.CancelFunc config ConnConfig statusRecorder *Status - wgProxyFactory *wgproxy.Factory signaler *Signaler relayManager *relayClient.Manager - allowedIPsIP string + allowedIP net.IP + allowedNet string handshaker *Handshaker onConnected func(remoteWireGuardKey string, remoteRosenpassPubKey []byte, wireGuardIP string, remoteRosenpassAddr string) @@ -111,8 +111,8 @@ type Conn struct { // NewConn creates a new not opened Conn to the remote peer. // To establish a connection run Conn.Open -func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Status, wgProxyFactory *wgproxy.Factory, signaler *Signaler, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager) (*Conn, error) { - _, allowedIPsIP, err := net.ParseCIDR(config.WgConfig.AllowedIps) +func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Status, signaler *Signaler, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager) (*Conn, error) { + allowedIP, allowedNet, err := net.ParseCIDR(config.WgConfig.AllowedIps) if err != nil { log.Errorf("failed to parse allowedIPS: %v", err) return nil, err @@ -127,10 +127,10 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu ctxCancel: ctxCancel, config: config, statusRecorder: statusRecorder, - wgProxyFactory: wgProxyFactory, signaler: signaler, relayManager: relayManager, - allowedIPsIP: allowedIPsIP.String(), + allowedIP: allowedIP, + allowedNet: allowedNet.String(), statusRelay: NewAtomicConnStatus(), statusICE: NewAtomicConnStatus(), iCEDisconnected: make(chan bool, 1), @@ -676,7 +676,7 @@ func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAdd } if conn.onConnected != nil { - conn.onConnected(conn.config.Key, remoteRosenpassPubKey, conn.allowedIPsIP, remoteRosenpassAddr) + conn.onConnected(conn.config.Key, remoteRosenpassPubKey, conn.allowedNet, remoteRosenpassAddr) } } @@ -767,8 +767,13 @@ func (conn *Conn) freeUpConnID() { func (conn *Conn) newProxy(remoteConn net.Conn) (wgproxy.Proxy, error) { conn.log.Debugf("setup proxied WireGuard connection") - wgProxy := conn.wgProxyFactory.GetProxy() - if err := wgProxy.AddTurnConn(conn.ctx, remoteConn); err != nil { + udpAddr := &net.UDPAddr{ + IP: conn.allowedIP, + Port: conn.config.WgConfig.WgListenPort, + } + + wgProxy := conn.config.WgConfig.WgInterface.GetProxy() + if err := wgProxy.AddTurnConn(conn.ctx, udpAddr, remoteConn); err != nil { conn.log.Errorf("failed to add turn net.Conn to local proxy: %v", err) return nil, err } diff --git a/client/internal/wgproxy/factory_linux.go b/client/internal/wgproxy/factory_linux.go deleted file mode 100644 index 369ba99db1..0000000000 --- a/client/internal/wgproxy/factory_linux.go +++ /dev/null @@ -1,50 +0,0 @@ -//go:build !android - -package wgproxy - -import ( - log "github.com/sirupsen/logrus" - - "github.com/netbirdio/netbird/client/internal/wgproxy/ebpf" - "github.com/netbirdio/netbird/client/internal/wgproxy/usp" -) - -type Factory struct { - wgPort int - ebpfProxy *ebpf.WGEBPFProxy -} - -func NewFactory(userspace bool, wgPort int) *Factory { - f := &Factory{wgPort: wgPort} - - if userspace { - return f - } - - ebpfProxy := ebpf.NewWGEBPFProxy(wgPort) - err := ebpfProxy.Listen() - if err != nil { - log.Warnf("failed to initialize ebpf proxy, fallback to user space proxy: %s", err) - return f - } - - f.ebpfProxy = ebpfProxy - return f -} - -func (w *Factory) GetProxy() Proxy { - if w.ebpfProxy != nil { - p := &ebpf.ProxyWrapper{ - WgeBPFProxy: w.ebpfProxy, - } - return p - } - return usp.NewWGUserSpaceProxy(w.wgPort) -} - -func (w *Factory) Free() error { - if w.ebpfProxy == nil { - return nil - } - return w.ebpfProxy.Free() -} diff --git a/client/internal/wgproxy/factory_nonlinux.go b/client/internal/wgproxy/factory_nonlinux.go deleted file mode 100644 index f930b09b3a..0000000000 --- a/client/internal/wgproxy/factory_nonlinux.go +++ /dev/null @@ -1,21 +0,0 @@ -//go:build !linux || android - -package wgproxy - -import "github.com/netbirdio/netbird/client/internal/wgproxy/usp" - -type Factory struct { - wgPort int -} - -func NewFactory(_ bool, wgPort int) *Factory { - return &Factory{wgPort: wgPort} -} - -func (w *Factory) GetProxy() Proxy { - return usp.NewWGUserSpaceProxy(w.wgPort) -} - -func (w *Factory) Free() error { - return nil -} From 2b81a68090ed8a35363ca18bfd900d4bdedde399 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Wed, 9 Oct 2024 09:58:49 +0200 Subject: [PATCH 02/11] Add read loop to proxy --- client/iface/bind/endpoint.go | 5 +++ client/iface/bind/ice_bind.go | 71 +++++++++++++++++++++++++++--- client/iface/wgproxy/bind/proxy.go | 52 +++++++++++++++++++--- 3 files changed, 115 insertions(+), 13 deletions(-) create mode 100644 client/iface/bind/endpoint.go diff --git a/client/iface/bind/endpoint.go b/client/iface/bind/endpoint.go new file mode 100644 index 0000000000..1926ff88f1 --- /dev/null +++ b/client/iface/bind/endpoint.go @@ -0,0 +1,5 @@ +package bind + +import wgConn "golang.zx2c4.com/wireguard/conn" + +type Endpoint = wgConn.StdNetEndpoint diff --git a/client/iface/bind/ice_bind.go b/client/iface/bind/ice_bind.go index ae041f891b..10bf38b3f5 100644 --- a/client/iface/bind/ice_bind.go +++ b/client/iface/bind/ice_bind.go @@ -14,6 +14,12 @@ import ( wgConn "golang.zx2c4.com/wireguard/conn" ) +type RecvMessage struct { + Endpoint *Endpoint + Buffer []byte + Len int +} + type receiverCreator struct { iceBind *ICEBind } @@ -24,21 +30,28 @@ func (rc receiverCreator) CreateIPv4ReceiverFn(msgPool *sync.Pool, pc *ipv4.Pack type ICEBind struct { *wgConn.StdNetBind - - muUDPMux sync.Mutex + RecvChan chan RecvMessage transportNet transport.Net filterFn FilterFn - endpoints map[string]net.Conn + endpoints map[string]net.Conn // todo: is not thread safe + closedChan chan struct{} - udpMux *UniversalUDPMuxDefault + muUDPMux sync.Mutex + udpMux *UniversalUDPMuxDefault + closed bool } func NewICEBind(transportNet transport.Net, filterFn FilterFn) *ICEBind { + b, _ := wgConn.NewStdNetBind().(*wgConn.StdNetBind) ib := &ICEBind{ + StdNetBind: b, + RecvChan: make(chan RecvMessage, 1), transportNet: transportNet, filterFn: filterFn, endpoints: make(map[string]net.Conn), + closedChan: make(chan struct{}), + closed: true, } rc := receiverCreator{ @@ -48,6 +61,34 @@ func NewICEBind(transportNet transport.Net, filterFn FilterFn) *ICEBind { return ib } +func (s *ICEBind) Open(uport uint16) ([]wgConn.ReceiveFunc, uint16, error) { + s.closed = false + log.Infof("------ ICEBind: Open") + fns, port, err := s.StdNetBind.Open(uport) + if err != nil { + return nil, 0, err + } + + fns = append(fns, s.receiveRelayed) + return fns, port, nil +} + +func (s *ICEBind) Close() error { + // just a quick implementation to make the tests pass + if s.closed { + return nil + } + log.Infof("------ ICEBind: Close") + s.closed = true + select { + case s.closedChan <- struct{}{}: + default: + } + err := s.StdNetBind.Close() + return err + +} + // GetICEMux returns the ICE UDPMux that was created and used by ICEBind func (s *ICEBind) GetICEMux() (*UniversalUDPMuxDefault, error) { s.muUDPMux.Lock() @@ -120,7 +161,6 @@ func (s *ICEBind) createIPv4ReceiverFn(ipv4MsgsPool *sync.Pool, pc *ipv4.PacketC for i := 0; i < numMsgs; i++ { msg := &(*msgs)[i] - log.Infof("---- read msg: %s", msg.Addr.String()) // todo: handle err ok, _ := s.filterOutStunMessages(msg.Buffers, msg.N, msg.Addr) if ok { @@ -132,7 +172,6 @@ func (s *ICEBind) createIPv4ReceiverFn(ipv4MsgsPool *sync.Pool, pc *ipv4.PacketC addrPort := msg.Addr.(*net.UDPAddr).AddrPort() ep := &wgConn.StdNetEndpoint{AddrPort: addrPort} // TODO: remove allocation wgConn.GetSrcFromControl(msg.OOB[:msg.NN], ep) - log.Infof("---- ep msg: %v", ep) eps[i] = ep } return numMsgs, nil @@ -173,6 +212,26 @@ func (s *ICEBind) parseSTUNMessage(raw []byte) (*stun.Message, error) { return msg, nil } +func (c *ICEBind) receiveRelayed(buffs [][]byte, sizes []int, eps []wgConn.Endpoint) (int, error) { + if c.closed { + log.Infof("receiver is closed, return with closed error") + return 0, net.ErrClosed + } + + select { + case <-c.closedChan: + return 0, net.ErrClosed + case msg, ok := <-c.RecvChan: + if !ok { + return 0, net.ErrClosed + } + copy(buffs[0], msg.Buffer) + sizes[0] = msg.Len + eps[0] = wgConn.Endpoint(msg.Endpoint) + return 1, nil + } +} + func fakeAddress(peerAddress *net.UDPAddr) (*net.UDPAddr, error) { octets := strings.Split(peerAddress.IP.String(), ".") if len(octets) != 4 { diff --git a/client/iface/wgproxy/bind/proxy.go b/client/iface/wgproxy/bind/proxy.go index 8fd3307798..b991db715d 100644 --- a/client/iface/wgproxy/bind/proxy.go +++ b/client/iface/wgproxy/bind/proxy.go @@ -3,6 +3,9 @@ package bind import ( "context" "net" + "net/netip" + + log "github.com/sirupsen/logrus" "github.com/netbirdio/netbird/client/iface/bind" ) @@ -10,22 +13,30 @@ import ( type ProxyBind struct { Bind *bind.ICEBind - addr *net.UDPAddr + wgAddr *net.UDPAddr + wgEndpoint *bind.Endpoint + remoteConn net.Conn } -func (p *ProxyBind) AddTurnConn(_ context.Context, endpoint *net.UDPAddr, relayedConn net.Conn) error { - addr, err := p.Bind.SetEndpoint(endpoint, relayedConn) +// AddTurnConn adds a new connection to the bind. +// endpoint is the NetBird address of the remote peer. The SetEndpoint return with the address what will be used in the +// WireGuard configuration. +func (p *ProxyBind) AddTurnConn(ctx context.Context, nbAddr *net.UDPAddr, remoteConn net.Conn) error { + addr, err := p.Bind.SetEndpoint(nbAddr, remoteConn) if err != nil { return err } - p.addr = addr + p.wgAddr = addr + p.wgEndpoint = addrToEndpoint(addr) + p.remoteConn = remoteConn + + go p.proxyToLocal(ctx) return err } - func (p *ProxyBind) EndpointAddr() *net.UDPAddr { - return p.addr + return p.wgAddr } func (p *ProxyBind) Work() { @@ -37,6 +48,33 @@ func (p *ProxyBind) Pause() { } func (p *ProxyBind) CloseConn() error { - p.Bind.RemoveEndpoint(p.addr) + p.Bind.RemoveEndpoint(p.wgAddr) return nil } + +func (p *ProxyBind) proxyToLocal(ctx context.Context) { + buf := make([]byte, 1500) + for { + n, err := p.remoteConn.Read(buf) + if err != nil { + if ctx.Err() != nil { + return + } + log.Errorf("failed to read from remote conn: %s, %s", p.remoteConn.RemoteAddr(), err) + return + } + + msg := bind.RecvMessage{ + Endpoint: p.wgEndpoint, + Buffer: buf, + Len: n, + } + p.Bind.RecvChan <- msg + } +} + +func addrToEndpoint(addr *net.UDPAddr) *bind.Endpoint { + ip, _ := netip.AddrFromSlice(addr.IP.To4()) + addrPort := netip.AddrPortFrom(ip, uint16(addr.Port)) + return &bind.Endpoint{AddrPort: addrPort} +} From fac8869100bba06daec0380aea201abac61e4ae3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Wed, 9 Oct 2024 09:59:39 +0200 Subject: [PATCH 03/11] Add comment --- client/iface/bind/ice_bind.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client/iface/bind/ice_bind.go b/client/iface/bind/ice_bind.go index 10bf38b3f5..6df2f07f23 100644 --- a/client/iface/bind/ice_bind.go +++ b/client/iface/bind/ice_bind.go @@ -225,6 +225,7 @@ func (c *ICEBind) receiveRelayed(buffs [][]byte, sizes []int, eps []wgConn.Endpo if !ok { return 0, net.ErrClosed } + // todo: do not copy the full buffer copy(buffs[0], msg.Buffer) sizes[0] = msg.Len eps[0] = wgConn.Endpoint(msg.Endpoint) From c366ddfce63c13f896668ac829c2984161b0a25d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Wed, 9 Oct 2024 17:13:57 +0200 Subject: [PATCH 04/11] - Add pause logic - Use endpoints in threadsafe way --- client/iface/bind/ice_bind.go | 9 +++- client/iface/wgproxy/bind/proxy.go | 68 +++++++++++++++++++++++++++--- 2 files changed, 71 insertions(+), 6 deletions(-) diff --git a/client/iface/bind/ice_bind.go b/client/iface/bind/ice_bind.go index 6df2f07f23..d193bdb3c1 100644 --- a/client/iface/bind/ice_bind.go +++ b/client/iface/bind/ice_bind.go @@ -34,7 +34,8 @@ type ICEBind struct { transportNet transport.Net filterFn FilterFn - endpoints map[string]net.Conn // todo: is not thread safe + endpoints map[string]net.Conn + endpointsMu sync.Mutex closedChan chan struct{} muUDPMux sync.Mutex @@ -105,16 +106,22 @@ func (b *ICEBind) SetEndpoint(peerAddress *net.UDPAddr, conn net.Conn) (*net.UDP if err != nil { return nil, err } + b.endpointsMu.Lock() b.endpoints[fakeAddr.String()] = conn + b.endpointsMu.Unlock() return fakeAddr, nil } func (b *ICEBind) RemoveEndpoint(fakeAddr *net.UDPAddr) { + b.endpointsMu.Lock() + defer b.endpointsMu.Unlock() delete(b.endpoints, fakeAddr.String()) } func (b *ICEBind) Send(bufs [][]byte, ep wgConn.Endpoint) error { + b.endpointsMu.Lock() conn, ok := b.endpoints[ep.DstToString()] + b.endpointsMu.Unlock() if !ok { return b.StdNetBind.Send(bufs, ep) } diff --git a/client/iface/wgproxy/bind/proxy.go b/client/iface/wgproxy/bind/proxy.go index b991db715d..dc64f2a158 100644 --- a/client/iface/wgproxy/bind/proxy.go +++ b/client/iface/wgproxy/bind/proxy.go @@ -2,8 +2,10 @@ package bind import ( "context" + "fmt" "net" "net/netip" + "sync" log "github.com/sirupsen/logrus" @@ -16,6 +18,14 @@ type ProxyBind struct { wgAddr *net.UDPAddr wgEndpoint *bind.Endpoint remoteConn net.Conn + ctx context.Context + cancel context.CancelFunc + closeMu sync.Mutex + closed bool + + pausedMu sync.Mutex + paused bool + isStarted bool } // AddTurnConn adds a new connection to the bind. @@ -30,8 +40,7 @@ func (p *ProxyBind) AddTurnConn(ctx context.Context, nbAddr *net.UDPAddr, remote p.wgAddr = addr p.wgEndpoint = addrToEndpoint(addr) p.remoteConn = remoteConn - - go p.proxyToLocal(ctx) + p.ctx, p.cancel = context.WithCancel(ctx) return err } @@ -40,19 +49,61 @@ func (p *ProxyBind) EndpointAddr() *net.UDPAddr { } func (p *ProxyBind) Work() { - // todo implement me + if p.remoteConn == nil { + return + } + + p.pausedMu.Lock() + p.paused = false + p.pausedMu.Unlock() + + // Start the proxy only once + if !p.isStarted { + p.isStarted = true + go p.proxyToLocal(p.ctx) + } } func (p *ProxyBind) Pause() { - // todo implement me + if p.remoteConn == nil { + return + } + + p.pausedMu.Lock() + p.paused = true + p.pausedMu.Unlock() } func (p *ProxyBind) CloseConn() error { + if p.cancel == nil { + return fmt.Errorf("proxy not started") + } + return p.close() +} + +func (p *ProxyBind) close() error { + p.closeMu.Lock() + defer p.closeMu.Unlock() + + if p.closed { + return nil + } + p.closed = true + + p.cancel() + p.Bind.RemoveEndpoint(p.wgAddr) - return nil + + return p.remoteConn.Close() } func (p *ProxyBind) proxyToLocal(ctx context.Context) { + defer func() { + if err := p.close(); err != nil { + log.Warnf("failed to close remote conn: %s", err) + } + }() + buf := make([]byte, 1500) for { n, err := p.remoteConn.Read(buf) @@ -64,12 +115,19 @@ func (p *ProxyBind) proxyToLocal(ctx context.Context) { return } + p.pausedMu.Lock() + if p.paused { + p.pausedMu.Unlock() + continue + } + msg := bind.RecvMessage{ Endpoint: p.wgEndpoint, Buffer: buf, Len: n, } p.Bind.RecvChan <- msg + p.pausedMu.Unlock() } } From 9ea3d24c892bfcd2db5c9c5a45cbc01bae1611cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Wed, 9 Oct 2024 17:49:32 +0200 Subject: [PATCH 05/11] - decrease the buffer copy - add comments - fix close logic --- client/iface/bind/ice_bind.go | 28 +++++++++++++--------------- client/iface/wgproxy/bind/proxy.go | 3 +-- client/iface/wgproxy/proxy.go | 8 ++++---- 3 files changed, 18 insertions(+), 21 deletions(-) diff --git a/client/iface/bind/ice_bind.go b/client/iface/bind/ice_bind.go index d193bdb3c1..6bf24b3c95 100644 --- a/client/iface/bind/ice_bind.go +++ b/client/iface/bind/ice_bind.go @@ -17,7 +17,6 @@ import ( type RecvMessage struct { Endpoint *Endpoint Buffer []byte - Len int } type receiverCreator struct { @@ -36,11 +35,14 @@ type ICEBind struct { filterFn FilterFn endpoints map[string]net.Conn endpointsMu sync.Mutex + // every time when Close() is called (i.e. BindUpdate()) we need to close exit from the receiveRelayed and create a + // new closed channel. With the closedChanMu we can safely close the channel and create a new one closedChan chan struct{} + closedChanMu sync.RWMutex + closed bool muUDPMux sync.Mutex udpMux *UniversalUDPMuxDefault - closed bool } func NewICEBind(transportNet transport.Net, filterFn FilterFn) *ICEBind { @@ -64,12 +66,13 @@ func NewICEBind(transportNet transport.Net, filterFn FilterFn) *ICEBind { func (s *ICEBind) Open(uport uint16) ([]wgConn.ReceiveFunc, uint16, error) { s.closed = false - log.Infof("------ ICEBind: Open") + s.closedChanMu.Lock() + s.closedChan = make(chan struct{}) + s.closedChanMu.Unlock() fns, port, err := s.StdNetBind.Open(uport) if err != nil { return nil, 0, err } - fns = append(fns, s.receiveRelayed) return fns, port, nil } @@ -79,12 +82,8 @@ func (s *ICEBind) Close() error { if s.closed { return nil } - log.Infof("------ ICEBind: Close") s.closed = true - select { - case s.closedChan <- struct{}{}: - default: - } + close(s.closedChan) err := s.StdNetBind.Close() return err @@ -219,11 +218,11 @@ func (s *ICEBind) parseSTUNMessage(raw []byte) (*stun.Message, error) { return msg, nil } +// receiveRelayed is a receive function that is used to receive packets from the relayed connection and forward to the +// WireGuard. Critical part is do not block if the Closed() has been called. func (c *ICEBind) receiveRelayed(buffs [][]byte, sizes []int, eps []wgConn.Endpoint) (int, error) { - if c.closed { - log.Infof("receiver is closed, return with closed error") - return 0, net.ErrClosed - } + c.closedChanMu.RLock() + defer c.closedChanMu.RUnlock() select { case <-c.closedChan: @@ -232,9 +231,8 @@ func (c *ICEBind) receiveRelayed(buffs [][]byte, sizes []int, eps []wgConn.Endpo if !ok { return 0, net.ErrClosed } - // todo: do not copy the full buffer copy(buffs[0], msg.Buffer) - sizes[0] = msg.Len + sizes[0] = len(msg.Buffer) eps[0] = wgConn.Endpoint(msg.Endpoint) return 1, nil } diff --git a/client/iface/wgproxy/bind/proxy.go b/client/iface/wgproxy/bind/proxy.go index dc64f2a158..e986d6d7b0 100644 --- a/client/iface/wgproxy/bind/proxy.go +++ b/client/iface/wgproxy/bind/proxy.go @@ -123,8 +123,7 @@ func (p *ProxyBind) proxyToLocal(ctx context.Context) { msg := bind.RecvMessage{ Endpoint: p.wgEndpoint, - Buffer: buf, - Len: n, + Buffer: buf[:n], } p.Bind.RecvChan <- msg p.pausedMu.Unlock() diff --git a/client/iface/wgproxy/proxy.go b/client/iface/wgproxy/proxy.go index 4a217caa2a..243aa2bd2a 100644 --- a/client/iface/wgproxy/proxy.go +++ b/client/iface/wgproxy/proxy.go @@ -7,9 +7,9 @@ import ( // Proxy is a transfer layer between the relayed connection and the WireGuard type Proxy interface { - AddTurnConn(ctx context.Context, endpoint *net.UDPAddr, turnConn net.Conn) error - EndpointAddr() *net.UDPAddr - Work() - Pause() + AddTurnConn(ctx context.Context, endpoint *net.UDPAddr, remoteConn net.Conn) error + EndpointAddr() *net.UDPAddr // EndpointAddr returns the address of the WireGuard peer endpoint + Work() // Work start or resume the proxy + Pause() // Pause to forward the packages from remote connection to WireGuard. The opposite way still works. CloseConn() error } From 69b1e9eb000216ae53eec2c841a1cdf25cb14d7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Wed, 9 Oct 2024 18:13:40 +0200 Subject: [PATCH 06/11] Rename userspace proxy to udp proxy --- client/iface/wgproxy/factory_linux.go | 4 +-- client/iface/wgproxy/factory_nonlinux.go | 12 +++------ client/iface/wgproxy/proxy_test.go | 3 +-- client/iface/wgproxy/{usp => udp}/proxy.go | 29 +++++++++++----------- 4 files changed, 20 insertions(+), 28 deletions(-) rename client/iface/wgproxy/{usp => udp}/proxy.go (82%) diff --git a/client/iface/wgproxy/factory_linux.go b/client/iface/wgproxy/factory_linux.go index 87f4ef967f..d75dd8b2d9 100644 --- a/client/iface/wgproxy/factory_linux.go +++ b/client/iface/wgproxy/factory_linux.go @@ -8,7 +8,7 @@ import ( "github.com/netbirdio/netbird/client/iface/bind" proxyBind "github.com/netbirdio/netbird/client/iface/wgproxy/bind" "github.com/netbirdio/netbird/client/iface/wgproxy/ebpf" - "github.com/netbirdio/netbird/client/iface/wgproxy/usp" + udpProxy "github.com/netbirdio/netbird/client/iface/wgproxy/udp" ) type proxyMode int @@ -52,7 +52,7 @@ func NewFactory(wgPort int, iceBind *bind.ICEBind) *Factory { func (w *Factory) GetProxy() Proxy { switch w.mode { case proxyModeUDP: - return usp.NewWGUserSpaceProxy(w.wgPort) + return udpProxy.NewWGUDPProxy(w.wgPort) case proxyModeEBPF: p := &ebpf.ProxyWrapper{ WgeBPFProxy: w.ebpfProxy, diff --git a/client/iface/wgproxy/factory_nonlinux.go b/client/iface/wgproxy/factory_nonlinux.go index 8334f677f0..1742f7ae02 100644 --- a/client/iface/wgproxy/factory_nonlinux.go +++ b/client/iface/wgproxy/factory_nonlinux.go @@ -4,7 +4,6 @@ package wgproxy import ( "github.com/netbirdio/netbird/client/iface/bind" - "github.com/netbirdio/netbird/client/iface/wgproxy/usp" ) type Factory struct { @@ -20,14 +19,9 @@ func NewFactory(port int, bind *bind.ICEBind) *Factory { } func (w *Factory) GetProxy() Proxy { - /* - p := &proxyBind.ProxyBind{ - Bind: w.bind, - } - - */ - p := usp.NewWGUserSpaceProxy(w.port) - return p + return &proxyBind.ProxyBind{ + Bind: w.bind, + } } func (w *Factory) Free() error { diff --git a/client/iface/wgproxy/proxy_test.go b/client/iface/wgproxy/proxy_test.go index 7a9547c56b..5358d93339 100644 --- a/client/iface/wgproxy/proxy_test.go +++ b/client/iface/wgproxy/proxy_test.go @@ -12,7 +12,6 @@ import ( "time" "github.com/netbirdio/netbird/client/iface/wgproxy/ebpf" - "github.com/netbirdio/netbird/client/iface/wgproxy/usp" "github.com/netbirdio/netbird/util" ) @@ -84,7 +83,7 @@ func TestProxyCloseByRemoteConn(t *testing.T) { }{ { name: "userspace proxy", - proxy: usp.NewWGUserSpaceProxy(51830), + proxy: udpproxy.NewWGUserSpaceProxy(51830), }, } diff --git a/client/iface/wgproxy/usp/proxy.go b/client/iface/wgproxy/udp/proxy.go similarity index 82% rename from client/iface/wgproxy/usp/proxy.go rename to client/iface/wgproxy/udp/proxy.go index 56c26841b7..6bc6e0663c 100644 --- a/client/iface/wgproxy/usp/proxy.go +++ b/client/iface/wgproxy/udp/proxy.go @@ -1,4 +1,4 @@ -package usp +package udp import ( "context" @@ -12,9 +12,8 @@ import ( "github.com/netbirdio/netbird/client/errors" ) -// WGUserSpaceProxy proxies -// todo: rename to UDP proxy -type WGUserSpaceProxy struct { +// WGUDPProxy proxies +type WGUDPProxy struct { localWGListenPort int remoteConn net.Conn @@ -29,10 +28,10 @@ type WGUserSpaceProxy struct { isStarted bool } -// NewWGUserSpaceProxy instantiate a user space WireGuard proxy. This is not a thread safe implementation -func NewWGUserSpaceProxy(wgPort int) *WGUserSpaceProxy { +// NewWGUDPProxy instantiate a UDP based WireGuard proxy. This is not a thread safe implementation +func NewWGUDPProxy(wgPort int) *WGUDPProxy { log.Debugf("Initializing new user space proxy with port %d", wgPort) - p := &WGUserSpaceProxy{ + p := &WGUDPProxy{ localWGListenPort: wgPort, } return p @@ -43,7 +42,7 @@ func NewWGUserSpaceProxy(wgPort int) *WGUserSpaceProxy { // the connection is complete, an error is returned. Once successfully // connected, any expiration of the context will not affect the // connection. -func (p *WGUserSpaceProxy) AddTurnConn(ctx context.Context, endpoint *net.UDPAddr, remoteConn net.Conn) error { +func (p *WGUDPProxy) AddTurnConn(ctx context.Context, endpoint *net.UDPAddr, remoteConn net.Conn) error { dialer := net.Dialer{} localConn, err := dialer.DialContext(ctx, "udp", fmt.Sprintf(":%d", p.localWGListenPort)) if err != nil { @@ -59,7 +58,7 @@ func (p *WGUserSpaceProxy) AddTurnConn(ctx context.Context, endpoint *net.UDPAdd return err } -func (p *WGUserSpaceProxy) EndpointAddr() *net.UDPAddr { +func (p *WGUDPProxy) EndpointAddr() *net.UDPAddr { if p.localConn == nil { return nil } @@ -68,7 +67,7 @@ func (p *WGUserSpaceProxy) EndpointAddr() *net.UDPAddr { } // Work starts the proxy or resumes it if it was paused -func (p *WGUserSpaceProxy) Work() { +func (p *WGUDPProxy) Work() { if p.remoteConn == nil { return } @@ -85,7 +84,7 @@ func (p *WGUserSpaceProxy) Work() { } // Pause pauses the proxy from receiving data from the remote peer -func (p *WGUserSpaceProxy) Pause() { +func (p *WGUDPProxy) Pause() { if p.remoteConn == nil { return } @@ -96,14 +95,14 @@ func (p *WGUserSpaceProxy) Pause() { } // CloseConn close the localConn -func (p *WGUserSpaceProxy) CloseConn() error { +func (p *WGUDPProxy) CloseConn() error { if p.cancel == nil { return fmt.Errorf("proxy not started") } return p.close() } -func (p *WGUserSpaceProxy) close() error { +func (p *WGUDPProxy) close() error { p.closeMu.Lock() defer p.closeMu.Unlock() @@ -127,7 +126,7 @@ func (p *WGUserSpaceProxy) close() error { } // proxyToRemote proxies from Wireguard to the RemoteKey -func (p *WGUserSpaceProxy) proxyToRemote(ctx context.Context) { +func (p *WGUDPProxy) proxyToRemote(ctx context.Context) { defer func() { if err := p.close(); err != nil { log.Warnf("error in proxy to remote loop: %s", err) @@ -159,7 +158,7 @@ func (p *WGUserSpaceProxy) proxyToRemote(ctx context.Context) { // proxyToLocal proxies from the Remote peer to local WireGuard // if the proxy is paused it will drain the remote conn and drop the packets -func (p *WGUserSpaceProxy) proxyToLocal(ctx context.Context) { +func (p *WGUDPProxy) proxyToLocal(ctx context.Context) { defer func() { if err := p.close(); err != nil { log.Warnf("error in proxy to local loop: %s", err) From 021eef56c786c89313c98aa9d1a956269a730edc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Wed, 9 Oct 2024 19:19:51 +0200 Subject: [PATCH 07/11] Fix tests --- client/iface/iface_moc.go | 7 +++++++ client/iface/wgproxy/proxy_test.go | 3 ++- client/internal/peer/conn_test.go | 25 ++++--------------------- 3 files changed, 13 insertions(+), 22 deletions(-) diff --git a/client/iface/iface_moc.go b/client/iface/iface_moc.go index 703da9ce00..d91a7224ff 100644 --- a/client/iface/iface_moc.go +++ b/client/iface/iface_moc.go @@ -9,6 +9,7 @@ import ( "github.com/netbirdio/netbird/client/iface/bind" "github.com/netbirdio/netbird/client/iface/configurer" "github.com/netbirdio/netbird/client/iface/device" + "github.com/netbirdio/netbird/client/iface/wgproxy" ) type MockWGIface struct { @@ -30,6 +31,7 @@ type MockWGIface struct { GetDeviceFunc func() *device.FilteredDevice GetStatsFunc func(peerKey string) (configurer.WGStats, error) GetInterfaceGUIDStringFunc func() (string, error) + GetProxyFunc func() wgproxy.Proxy } func (m *MockWGIface) GetInterfaceGUIDString() (string, error) { @@ -103,3 +105,8 @@ func (m *MockWGIface) GetDevice() *device.FilteredDevice { func (m *MockWGIface) GetStats(peerKey string) (configurer.WGStats, error) { return m.GetStatsFunc(peerKey) } + +func (m *MockWGIface) GetProxy() wgproxy.Proxy { + //TODO implement me + panic("implement me") +} diff --git a/client/iface/wgproxy/proxy_test.go b/client/iface/wgproxy/proxy_test.go index 5358d93339..64b6176211 100644 --- a/client/iface/wgproxy/proxy_test.go +++ b/client/iface/wgproxy/proxy_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/netbirdio/netbird/client/iface/wgproxy/ebpf" + udpProxy "github.com/netbirdio/netbird/client/iface/wgproxy/udp" "github.com/netbirdio/netbird/util" ) @@ -83,7 +84,7 @@ func TestProxyCloseByRemoteConn(t *testing.T) { }{ { name: "userspace proxy", - proxy: udpproxy.NewWGUserSpaceProxy(51830), + proxy: udpProxy.NewWGUDPProxy(51830), }, } diff --git a/client/internal/peer/conn_test.go b/client/internal/peer/conn_test.go index b4926a9d2e..e68861c5f0 100644 --- a/client/internal/peer/conn_test.go +++ b/client/internal/peer/conn_test.go @@ -11,7 +11,6 @@ import ( "github.com/netbirdio/netbird/client/iface" "github.com/netbirdio/netbird/client/internal/stdnet" - "github.com/netbirdio/netbird/client/internal/wgproxy" "github.com/netbirdio/netbird/util" ) @@ -44,11 +43,7 @@ func TestNewConn_interfaceFilter(t *testing.T) { } func TestConn_GetKey(t *testing.T) { - wgProxyFactory := wgproxy.NewFactory(false, connConf.LocalWgPort) - defer func() { - _ = wgProxyFactory.Free() - }() - conn, err := NewConn(context.Background(), connConf, nil, wgProxyFactory, nil, nil, nil) + conn, err := NewConn(context.Background(), connConf, nil, nil, nil, nil) if err != nil { return } @@ -59,11 +54,7 @@ func TestConn_GetKey(t *testing.T) { } func TestConn_OnRemoteOffer(t *testing.T) { - wgProxyFactory := wgproxy.NewFactory(false, connConf.LocalWgPort) - defer func() { - _ = wgProxyFactory.Free() - }() - conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil, nil) + conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), nil, nil, nil) if err != nil { return } @@ -96,11 +87,7 @@ func TestConn_OnRemoteOffer(t *testing.T) { } func TestConn_OnRemoteAnswer(t *testing.T) { - wgProxyFactory := wgproxy.NewFactory(false, connConf.LocalWgPort) - defer func() { - _ = wgProxyFactory.Free() - }() - conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil, nil) + conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), nil, nil, nil) if err != nil { return } @@ -132,11 +119,7 @@ func TestConn_OnRemoteAnswer(t *testing.T) { wg.Wait() } func TestConn_Status(t *testing.T) { - wgProxyFactory := wgproxy.NewFactory(false, connConf.LocalWgPort) - defer func() { - _ = wgProxyFactory.Free() - }() - conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), wgProxyFactory, nil, nil, nil) + conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), nil, nil, nil) if err != nil { return } From 553bd320ce959dbbe9e567ab8cdfabf2ccdfaed6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Thu, 10 Oct 2024 00:00:41 +0200 Subject: [PATCH 08/11] Fix Android build in factory --- client/iface/wgproxy/factory_nonlinux.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client/iface/wgproxy/factory_nonlinux.go b/client/iface/wgproxy/factory_nonlinux.go index 1742f7ae02..f60ac8436e 100644 --- a/client/iface/wgproxy/factory_nonlinux.go +++ b/client/iface/wgproxy/factory_nonlinux.go @@ -4,6 +4,7 @@ package wgproxy import ( "github.com/netbirdio/netbird/client/iface/bind" + proxyBind "github.com/netbirdio/netbird/client/iface/wgproxy/bind" ) type Factory struct { From 4d788a815cf672f60b2d49b7181638f3d524dada Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Thu, 10 Oct 2024 00:08:25 +0200 Subject: [PATCH 09/11] Fix darwin --- client/iface/iface_darwin.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/iface/iface_darwin.go b/client/iface/iface_darwin.go index 0f0a82178f..898311e11d 100644 --- a/client/iface/iface_darwin.go +++ b/client/iface/iface_darwin.go @@ -25,8 +25,8 @@ func NewWGIFace(iFaceName string, address string, wgPort int, wgPrivKey string, iceBind := bind.NewICEBind(transportNet, filterFn) wgIFace := &WGIface{ - userspaceBind: true, - wgProxyMgr: wgproxy.NewManager(wgPort, iceBind), + userspaceBind: true, + wgProxyFactory: wgproxy.NewFactory(wgPort, iceBind), } if netstack.IsEnabled() { From 20549442041a88c758776b7f5741923e06ef7e59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Thu, 10 Oct 2024 00:10:01 +0200 Subject: [PATCH 10/11] Fix windows build --- client/iface/iface_windows.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/client/iface/iface_windows.go b/client/iface/iface_windows.go index 6845ef3ddd..08d5033be6 100644 --- a/client/iface/iface_windows.go +++ b/client/iface/iface_windows.go @@ -8,6 +8,7 @@ import ( "github.com/netbirdio/netbird/client/iface/bind" "github.com/netbirdio/netbird/client/iface/device" "github.com/netbirdio/netbird/client/iface/netstack" + "github.com/netbirdio/netbird/client/iface/wgproxy" ) // NewWGIFace Creates a new WireGuard interface instance @@ -17,12 +18,15 @@ func NewWGIFace(iFaceName string, address string, wgPort int, wgPrivKey string, return nil, err } + iceBind := bind.NewICEBind(transportNet, filterFn) + wgIFace := &WGIface{ - userspaceBind: true, + userspaceBind: true, + wgProxyFactory: wgproxy.NewFactory(wgPort, iceBind), } if netstack.IsEnabled() { - wgIFace.tun = device.NewNetstackDevice(iFaceName, wgAddress, wgPort, wgPrivKey, mtu, transportNet, netstack.ListenAddr(), filterFn) + wgIFace.tun = device.NewNetstackDevice(iFaceName, wgAddress, wgPort, wgPrivKey, mtu, iceBind, netstack.ListenAddr()) return wgIFace, nil } From 25693e017be82ac8ece19334da69b5aa180ba724 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Thu, 10 Oct 2024 00:12:33 +0200 Subject: [PATCH 11/11] Lint fix --- client/iface/iwginterface_windows.go | 2 ++ client/internal/peer/conn.go | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/client/iface/iwginterface_windows.go b/client/iface/iwginterface_windows.go index 6baeb66ae0..96eec52a50 100644 --- a/client/iface/iwginterface_windows.go +++ b/client/iface/iwginterface_windows.go @@ -9,6 +9,7 @@ import ( "github.com/netbirdio/netbird/client/iface/bind" "github.com/netbirdio/netbird/client/iface/configurer" "github.com/netbirdio/netbird/client/iface/device" + "github.com/netbirdio/netbird/client/iface/wgproxy" ) type IWGIface interface { @@ -20,6 +21,7 @@ type IWGIface interface { ToInterface() *net.Interface Up() (*bind.UniversalUDPMuxDefault, error) UpdateAddr(newAddr string) error + GetProxy() wgproxy.Proxy UpdatePeer(peerKey string, allowedIps string, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error RemovePeer(peerKey string) error AddAllowedIP(peerKey string, allowedIP string) error diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 700b3c3b6b..99acfde314 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -82,7 +82,6 @@ type Conn struct { config ConnConfig statusRecorder *Status signaler *Signaler - iFaceDiscover stdnet.ExternalIFaceDiscover relayManager *relayClient.Manager allowedIP net.IP allowedNet string