Skip to content

Commit

Permalink
pkg/rpctype: prepare for not using for target communication
Browse files Browse the repository at this point in the history
Remove things that are only needed for target VM communication:
conditional compression, timeout scaling, traffic stats.
To minimize diffs when we switch target VM communication to flatrpc.
  • Loading branch information
dvyukov committed May 3, 2024
1 parent 3a81775 commit 610f2a5
Show file tree
Hide file tree
Showing 26 changed files with 62 additions and 129 deletions.
8 changes: 3 additions & 5 deletions pkg/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,10 +456,9 @@ func (inst *inst) testRepro() ([]byte, error) {
}

type OptionalFuzzerArgs struct {
Slowdown int
SandboxArg int
PprofPort int
NetCompression bool
Slowdown int
SandboxArg int
PprofPort int
}

type FuzzerCmdArgs struct {
Expand Down Expand Up @@ -496,7 +495,6 @@ func FuzzerCmd(args *FuzzerCmdArgs) string {
{Name: "slowdown", Value: fmt.Sprint(args.Optional.Slowdown)},
{Name: "sandbox_arg", Value: fmt.Sprint(args.Optional.SandboxArg)},
{Name: "pprof_port", Value: fmt.Sprint(args.Optional.PprofPort)},
{Name: "net_compression", Value: fmt.Sprint(args.Optional.NetCompression)},
}
optionalArg = " " + tool.OptionalFlags(flags)
}
Expand Down
94 changes: 20 additions & 74 deletions pkg/rpctype/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,14 @@ import (
"time"

"github.com/google/syzkaller/pkg/log"
"github.com/google/syzkaller/pkg/stats"
)

type RPCServer struct {
ln net.Listener
s *rpc.Server
useCompression bool
statSent *stats.Val
statRecv *stats.Val
ln net.Listener
s *rpc.Server
}

func NewRPCServer(addr, name string, receiver interface{}, useCompression bool) (*RPCServer, error) {
func NewRPCServer(addr, name string, receiver interface{}) (*RPCServer, error) {
ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to listen on %v: %w", addr, err)
Expand All @@ -34,13 +30,8 @@ func NewRPCServer(addr, name string, receiver interface{}, useCompression bool)
return nil, err
}
serv := &RPCServer{
ln: ln,
s: s,
useCompression: useCompression,
statSent: stats.Create("go rpc sent", "Uncompressed outbound RPC traffic",
stats.Graph("traffic"), stats.Rate{}, stats.FormatMB),
statRecv: stats.Create("go rpc recv", "Uncompressed inbound RPC traffic",
stats.Graph("traffic"), stats.Rate{}, stats.FormatMB),
ln: ln,
s: s,
}
return serv, nil
}
Expand All @@ -53,7 +44,7 @@ func (serv *RPCServer) Serve() {
continue
}
setupKeepAlive(conn, time.Minute)
go serv.s.ServeConn(maybeFlateConn(newCountedConn(serv, conn), serv.useCompression))
go serv.s.ServeConn(newFlateConn(conn))
}
}

Expand All @@ -62,51 +53,35 @@ func (serv *RPCServer) Addr() net.Addr {
}

type RPCClient struct {
conn net.Conn
c *rpc.Client
timeScale time.Duration
useTimeouts bool
useCompression bool
conn net.Conn
c *rpc.Client
}

func Dial(addr string, timeScale time.Duration) (net.Conn, error) {
if timeScale <= 0 {
return nil, fmt.Errorf("bad rpc time scale %v", timeScale)
}
func NewRPCClient(addr string) (*RPCClient, error) {
var conn net.Conn
var err error
if addr == "stdin" {
// This is used by vm/gvisor which passes us a unix socket connection in stdin.
return net.FileConn(os.Stdin)
}
if conn, err = net.DialTimeout("tcp", addr, time.Minute*timeScale); err != nil {
return nil, err
// TODO: remove this once we switch to flatrpc for target communication.
conn, err = net.FileConn(os.Stdin)
} else {
conn, err = net.DialTimeout("tcp", addr, 3*time.Minute)
}
setupKeepAlive(conn, time.Minute*timeScale)
return conn, nil
}

func NewRPCClient(addr string, timeScale time.Duration, useTimeouts, useCompression bool) (*RPCClient, error) {
conn, err := Dial(addr, timeScale)
if err != nil {
return nil, err
}
setupKeepAlive(conn, time.Minute)
cli := &RPCClient{
conn: conn,
c: rpc.NewClient(maybeFlateConn(conn, useCompression)),
timeScale: timeScale,
useTimeouts: useTimeouts,
useCompression: useCompression,
conn: conn,
c: rpc.NewClient(newFlateConn(conn)),
}
return cli, nil
}

func (cli *RPCClient) Call(method string, args, reply interface{}) error {
if cli.useTimeouts {
// Note: SetDeadline is not implemented on fuchsia, so don't fail on error.
cli.conn.SetDeadline(time.Now().Add(3 * time.Minute * cli.timeScale))
defer cli.conn.SetDeadline(time.Time{})
}
// Note: SetDeadline is not implemented on fuchsia, so don't fail on error.
cli.conn.SetDeadline(time.Now().Add(10 * time.Minute))
defer cli.conn.SetDeadline(time.Time{})
return cli.c.Call(method, args, reply)
}

Expand All @@ -130,10 +105,7 @@ type flateConn struct {
c io.Closer
}

func maybeFlateConn(conn io.ReadWriteCloser, useCompression bool) io.ReadWriteCloser {
if !useCompression {
return conn
}
func newFlateConn(conn io.ReadWriteCloser) io.ReadWriteCloser {
w, err := flate.NewWriter(conn, 9)
if err != nil {
panic(err)
Expand Down Expand Up @@ -173,29 +145,3 @@ func (fc *flateConn) Close() error {
}
return err0
}

// countedConn wraps net.Conn to record the transferred bytes.
type countedConn struct {
io.ReadWriteCloser
server *RPCServer
}

func newCountedConn(server *RPCServer,
conn io.ReadWriteCloser) io.ReadWriteCloser {
return &countedConn{
ReadWriteCloser: conn,
server: server,
}
}

func (cc countedConn) Read(p []byte) (n int, err error) {
n, err = cc.ReadWriteCloser.Read(p)
cc.server.statRecv.Add(n)
return
}

func (cc countedConn) Write(b []byte) (n int, err error) {
n, err = cc.ReadWriteCloser.Write(b)
cc.server.statSent.Add(n)
return
}
17 changes: 8 additions & 9 deletions syz-fuzzer/fuzzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,13 @@ func main() {
debug.SetGCPercent(50)

var (
flagName = flag.String("name", "test", "unique name for manager")
flagOS = flag.String("os", runtime.GOOS, "target OS")
flagArch = flag.String("arch", runtime.GOARCH, "target arch")
flagManager = flag.String("manager", "", "manager rpc address")
flagProcs = flag.Int("procs", 1, "number of parallel test processes")
flagTest = flag.Bool("test", false, "enable image testing mode") // used by syz-ci
flagPprofPort = flag.Int("pprof_port", 0, "HTTP port for the pprof endpoint (disabled if 0)")
flagNetCompression = flag.Bool("net_compression", false, "use network compression for RPC calls")
flagName = flag.String("name", "test", "unique name for manager")
flagOS = flag.String("os", runtime.GOOS, "target OS")
flagArch = flag.String("arch", runtime.GOARCH, "target arch")
flagManager = flag.String("manager", "", "manager rpc address")
flagProcs = flag.Int("procs", 1, "number of parallel test processes")
flagTest = flag.Bool("test", false, "enable image testing mode") // used by syz-ci
flagPprofPort = flag.Int("pprof_port", 0, "HTTP port for the pprof endpoint (disabled if 0)")
)
defer tool.Init()()
log.Logf(0, "fuzzer started")
Expand Down Expand Up @@ -123,7 +122,7 @@ func main() {
}

log.Logf(0, "dialing manager at %v", *flagManager)
manager, err := rpctype.NewRPCClient(*flagManager, timeouts.Scale, false, *flagNetCompression)
manager, err := rpctype.NewRPCClient(*flagManager)
if err != nil {
log.SyzFatalf("failed to create an RPC client: %v ", err)
}
Expand Down
4 changes: 3 additions & 1 deletion syz-fuzzer/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main
import (
"fmt"
"io"
"net"
"strings"
"time"

Expand All @@ -30,7 +31,8 @@ type checkArgs struct {

func testImage(hostAddr string, args *checkArgs) {
log.Logf(0, "connecting to host at %v", hostAddr)
conn, err := rpctype.Dial(hostAddr, args.ipcConfig.Timeouts.Scale)
timeout := time.Minute * args.ipcConfig.Timeouts.Scale
conn, err := net.DialTimeout("tcp", hostAddr, timeout)
if err != nil {
log.SyzFatalf("failed to connect to host: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion syz-hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func main() {

hub.initHTTP(cfg.HTTP)

s, err := rpctype.NewRPCServer(cfg.RPC, "Hub", hub, true)
s, err := rpctype.NewRPCServer(cfg.RPC, "Hub", hub)
if err != nil {
log.Fatalf("failed to create rpc server: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions syz-manager/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (hc *HubConnector) connect(corpus [][]byte) (*rpctype.RPCClient, error) {
if err != nil {
return nil, err
}
hub, err := rpctype.NewRPCClient(hc.cfg.HubAddr, 1, true, true)
hub, err := rpctype.NewRPCClient(hc.cfg.HubAddr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -162,7 +162,7 @@ func (hc *HubConnector) connect(corpus [][]byte) (*rpctype.RPCClient, error) {
if err != nil {
return nil, err
}
hub, err = rpctype.NewRPCClient(hc.cfg.HubAddr, 1, true, true)
hub, err = rpctype.NewRPCClient(hc.cfg.HubAddr)
if err != nil {
return nil, err
}
Expand Down
9 changes: 3 additions & 6 deletions syz-manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ type Manager struct {
vmStop chan bool
checkFeatures *host.Features
fresh bool
netCompression bool
expertMode bool
nextInstanceID atomic.Uint64

Expand Down Expand Up @@ -177,7 +176,6 @@ func RunManager(cfg *mgrconfig.Config) {
memoryLeakFrames: make(map[string]bool),
dataRaceFrames: make(map[string]bool),
fresh: true,
netCompression: vm.UseNetCompression(cfg.Type),
vmStop: make(chan bool),
externalReproQueue: make(chan *Crash, 10),
needMoreRepros: make(chan chan bool),
Expand Down Expand Up @@ -811,10 +809,9 @@ func (mgr *Manager) runInstanceInner(index int, instanceName string, injectLog <
Debug: *flagDebug,
Test: false,
Optional: &instance.OptionalFuzzerArgs{
Slowdown: mgr.cfg.Timeouts.Slowdown,
SandboxArg: mgr.cfg.SandboxArg,
PprofPort: inst.PprofPort(),
NetCompression: mgr.netCompression,
Slowdown: mgr.cfg.Timeouts.Slowdown,
SandboxArg: mgr.cfg.SandboxArg,
PprofPort: inst.PprofPort(),
},
}
cmd := instance.FuzzerCmd(args)
Expand Down
2 changes: 1 addition & 1 deletion syz-manager/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func startRPCServer(mgr *Manager) (*RPCServer, error) {
"End-to-end fuzzer RPC Exchange call latency (us)", stats.Distribution{}),
statCoverFiltered: stats.Create("filtered coverage", "", stats.NoGraph),
}
s, err := rpctype.NewRPCServer(mgr.cfg.RPC, "Manager", serv, mgr.netCompression)
s, err := rpctype.NewRPCServer(mgr.cfg.RPC, "Manager", serv)
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions syz-runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ func main() {
log.Fatalf("failed to create default ipc config: %v", err)
}

timeouts := config.Timeouts
vrf, err := rpctype.NewRPCClient(*flagAddr, timeouts.Scale, true, true)
vrf, err := rpctype.NewRPCClient(*flagAddr)
if err != nil {
log.Fatalf("failed to connect to verifier : %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion syz-verifier/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func startRPCServer(vrf *Verifier) (*RPCServer, error) {
notChecked: len(vrf.pools),
}

s, err := rpctype.NewRPCServer(vrf.addr, "Verifier", srv, true)
s, err := rpctype.NewRPCServer(vrf.addr, "Verifier", srv)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion tools/syz-hubtool/hubtool.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func main() {
return
}
log.Printf("connecting to hub at %v...", *flagHubAddress)
conn, err := rpctype.NewRPCClient(*flagHubAddress, 1, true, true)
conn, err := rpctype.NewRPCClient(*flagHubAddress)
if err != nil {
log.Fatalf("failed to connect to hub: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion tools/syz-runtest/runtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func main() {
}
mgr.checkFiles, mgr.checkProgs = mgr.checker.StartCheck()
mgr.needCheckResults = len(mgr.checkProgs)
s, err := rpctype.NewRPCServer(cfg.RPC, "Manager", mgr, false)
s, err := rpctype.NewRPCServer(cfg.RPC, "Manager", mgr)
if err != nil {
log.Fatalf("failed to create rpc server: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion vm/adb/adb.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

func init() {
vmimpl.Register("adb", ctor, false, true)
vmimpl.Register("adb", ctor, false)
}

type Device struct {
Expand Down
2 changes: 1 addition & 1 deletion vm/bhyve/bhyve.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

func init() {
vmimpl.Register("bhyve", ctor, true, false)
vmimpl.Register("bhyve", ctor, true)
}

type Config struct {
Expand Down
2 changes: 1 addition & 1 deletion vm/cuttlefish/cuttlefish.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
)

func init() {
vmimpl.Register("cuttlefish", ctor, true, true)
vmimpl.Register("cuttlefish", ctor, true)
}

type Pool struct {
Expand Down
2 changes: 1 addition & 1 deletion vm/gce/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
)

func init() {
vmimpl.Register("gce", ctor, true, true)
vmimpl.Register("gce", ctor, true)
}

type Config struct {
Expand Down
2 changes: 1 addition & 1 deletion vm/gvisor/gvisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

func init() {
vmimpl.Register("gvisor", ctor, true, false)
vmimpl.Register("gvisor", ctor, true)
}

type Config struct {
Expand Down
2 changes: 1 addition & 1 deletion vm/isolated/isolated.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
const pstoreConsoleFile = "/sys/fs/pstore/console-ramoops-0"

func init() {
vmimpl.Register("isolated", ctor, false, true)
vmimpl.Register("isolated", ctor, false)
}

type Config struct {
Expand Down
2 changes: 1 addition & 1 deletion vm/proxyapp/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func init() {
func(env *vmimpl.Env) (vmimpl.Pool, error) {
return ctor(makeDefaultParams(), env)
},
false, true)
false)
}

// Package configuration VARs are mostly needed for tests.
Expand Down
2 changes: 1 addition & 1 deletion vm/qemu/qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

func init() {
var _ vmimpl.Infoer = (*instance)(nil)
vmimpl.Register("qemu", ctor, true, false)
vmimpl.Register("qemu", ctor, true)
}

type Config struct {
Expand Down
Loading

0 comments on commit 610f2a5

Please sign in to comment.