From 8cab86b9245763b95e9d15be7ea627dd7e216dfe Mon Sep 17 00:00:00 2001 From: Haruue Date: Wed, 8 May 2024 19:13:49 +0800 Subject: [PATCH 1/5] Reapply "Merge pull request #132 from eddc005/feat-pcap" This reverts commit 2ac8783eb66328e580b5958c696e82c53cb254dd. --- .github/workflows/check.yaml | 6 ++ .github/workflows/release.yaml | 3 + cmd/root.go | 36 +++++++-- engine/engine.go | 11 ++- io/interface.go | 3 + io/nfqueue.go | 5 ++ io/pcap.go | 130 +++++++++++++++++++++++++++++++++ 7 files changed, 184 insertions(+), 10 deletions(-) create mode 100644 io/pcap.go diff --git a/.github/workflows/check.yaml b/.github/workflows/check.yaml index ac9e66d..a24f198 100644 --- a/.github/workflows/check.yaml +++ b/.github/workflows/check.yaml @@ -23,6 +23,9 @@ jobs: with: go-version: 'stable' + - name: Install pcap + run: sudo apt install -y libpcap-dev + - run: go vet ./... - name: staticcheck @@ -44,4 +47,7 @@ jobs: with: go-version: 'stable' + - name: Install pcap + run: sudo apt install -y libpcap-dev + - run: go test ./... diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 0da1054..b227e16 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -24,6 +24,9 @@ jobs: with: go-version: "1.22" + - name: Install pcap + run: sudo apt install -y libpcap-dev + - name: Build env: GOOS: ${{ matrix.goos }} diff --git a/cmd/root.go b/cmd/root.go index 288e3d7..1ccf025 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -43,6 +43,7 @@ var logger *zap.Logger // Flags var ( cfgFile string + pcapFile string logLevel string logFormat string ) @@ -118,6 +119,7 @@ func init() { func initFlags() { rootCmd.PersistentFlags().StringVarP(&cfgFile, "config", "c", "", "config file") + rootCmd.PersistentFlags().StringVarP(&pcapFile, "pcap", "p", "", "pcap file (optional)") rootCmd.PersistentFlags().StringVarP(&logLevel, "log-level", "l", envOrDefaultString(appLogLevelEnv, "info"), "log level") rootCmd.PersistentFlags().StringVarP(&logFormat, "log-format", "f", envOrDefaultString(appLogFormatEnv, "console"), "log format") } @@ -167,6 +169,7 @@ type cliConfig struct { IO cliConfigIO `mapstructure:"io"` Workers cliConfigWorkers `mapstructure:"workers"` Ruleset cliConfigRuleset `mapstructure:"ruleset"` + Replay cliConfigReplay `mapstructure:"replay"` } type cliConfigIO struct { @@ -177,6 +180,10 @@ type cliConfigIO struct { RST bool `mapstructure:"rst"` } +type cliConfigReplay struct { + Realtime bool `mapstructure:"realtime"` +} + type cliConfigWorkers struct { Count int `mapstructure:"count"` QueueSize int `mapstructure:"queueSize"` @@ -197,17 +204,30 @@ func (c *cliConfig) fillLogger(config *engine.Config) error { } func (c *cliConfig) fillIO(config *engine.Config) error { - nfio, err := io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{ - QueueSize: c.IO.QueueSize, - ReadBuffer: c.IO.ReadBuffer, - WriteBuffer: c.IO.WriteBuffer, - Local: c.IO.Local, - RST: c.IO.RST, - }) + var ioImpl io.PacketIO + var err error + if pcapFile != "" { + // Setup IO for pcap file replay + logger.Info("replaying from pcap file", zap.String("pcap file", pcapFile)) + ioImpl, err = io.NewPcapPacketIO(io.PcapPacketIOConfig{ + PcapFile: pcapFile, + Realtime: c.Replay.Realtime, + }) + } else { + // Setup IO for nfqueue + ioImpl, err = io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{ + QueueSize: c.IO.QueueSize, + ReadBuffer: c.IO.ReadBuffer, + WriteBuffer: c.IO.WriteBuffer, + Local: c.IO.Local, + RST: c.IO.RST, + }) + } + if err != nil { return configError{Field: "io", Err: err} } - config.IO = nfio + config.IO = ioImpl return nil } diff --git a/engine/engine.go b/engine/engine.go index 56f5ed3..1270efb 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -58,12 +58,17 @@ func (e *engine) UpdateRuleset(r ruleset.Ruleset) error { } func (e *engine) Run(ctx context.Context) error { + workerCtx, workerCancel := context.WithCancel(ctx) + defer workerCancel() // Stop workers + + // Register IO shutdown ioCtx, ioCancel := context.WithCancel(ctx) - defer ioCancel() // Stop workers & IO + e.io.SetCancelFunc(ioCancel) + defer ioCancel() // Stop IO // Start workers for _, w := range e.workers { - go w.Run(ioCtx) + go w.Run(workerCtx) } // Register IO callback @@ -85,6 +90,8 @@ func (e *engine) Run(ctx context.Context) error { return err case <-ctx.Done(): return nil + case <-ioCtx.Done(): + return nil } } diff --git a/io/interface.go b/io/interface.go index af7e1e7..f996789 100644 --- a/io/interface.go +++ b/io/interface.go @@ -48,6 +48,9 @@ type PacketIO interface { ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error) // Close closes the packet IO. Close() error + // SetCancelFunc gives packet IO access to context cancel function, enabling it to + // trigger a shutdown + SetCancelFunc(cancelFunc context.CancelFunc) error } type ErrInvalidPacket struct { diff --git a/io/nfqueue.go b/io/nfqueue.go index e84a0bb..f1a64df 100644 --- a/io/nfqueue.go +++ b/io/nfqueue.go @@ -281,6 +281,11 @@ func (n *nfqueuePacketIO) Close() error { return n.n.Close() } +// nfqueue IO does not issue shutdown +func (n *nfqueuePacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error { + return nil +} + func (n *nfqueuePacketIO) setupNft(local, rst, remove bool) error { rules, err := generateNftRules(local, rst) if err != nil { diff --git a/io/pcap.go b/io/pcap.go new file mode 100644 index 0000000..520da17 --- /dev/null +++ b/io/pcap.go @@ -0,0 +1,130 @@ +package io + +import ( + "context" + "hash/crc32" + "net" + "sort" + "strings" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/pcap" +) + +var _ PacketIO = (*pcapPacketIO)(nil) + +type pcapPacketIO struct { + pcap *pcap.Handle + lastTime *time.Time + ioCancel context.CancelFunc + config PcapPacketIOConfig + + dialer *net.Dialer +} + +type PcapPacketIOConfig struct { + PcapFile string + Realtime bool +} + +func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) { + handle, err := pcap.OpenOffline(config.PcapFile) + if err != nil { + return nil, err + } + + return &pcapPacketIO{ + pcap: handle, + lastTime: nil, + ioCancel: nil, + config: config, + dialer: &net.Dialer{}, + }, nil +} + +func (p *pcapPacketIO) Register(ctx context.Context, cb PacketCallback) error { + go func() { + packetSource := gopacket.NewPacketSource(p.pcap, p.pcap.LinkType()) + for packet := range packetSource.Packets() { + p.wait(packet) + + networkLayer := packet.NetworkLayer() + if networkLayer != nil { + src, dst := networkLayer.NetworkFlow().Endpoints() + endpoints := []string{src.String(), dst.String()} + sort.Strings(endpoints) + id := crc32.Checksum([]byte(strings.Join(endpoints, ",")), crc32.IEEETable) + + cb(&pcapPacket{ + streamID: id, + timestamp: packet.Metadata().Timestamp, + data: packet.LinkLayer().LayerPayload(), + }, nil) + } + } + // Give the workers a chance to finish everything + time.Sleep(time.Second) + // Stop the engine when all packets are finished + p.ioCancel() + }() + + return nil +} + +// A normal dialer is sufficient as pcap IO does not mess up with the networking +func (p *pcapPacketIO) ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error) { + return p.dialer.DialContext(ctx, network, address) +} + +func (p *pcapPacketIO) SetVerdict(pkt Packet, v Verdict, newPacket []byte) error { + return nil +} + +func (p *pcapPacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error { + p.ioCancel = cancelFunc + return nil +} + +func (p *pcapPacketIO) Close() error { + p.pcap.Close() + return nil +} + +// Intentionally slow down the replay +// In realtime mode, this is to match the timestamps in the capture +func (p *pcapPacketIO) wait(packet gopacket.Packet) error { + if !p.config.Realtime { + return nil + } + + if p.lastTime == nil { + p.lastTime = &packet.Metadata().Timestamp + } else { + t := packet.Metadata().Timestamp.Sub(*p.lastTime) + time.Sleep(t) + p.lastTime = &packet.Metadata().Timestamp + } + + return nil +} + +var _ Packet = (*pcapPacket)(nil) + +type pcapPacket struct { + streamID uint32 + timestamp time.Time + data []byte +} + +func (p *pcapPacket) StreamID() uint32 { + return p.streamID +} + +func (p *pcapPacket) Timestamp() time.Time { + return p.timestamp +} + +func (p *pcapPacket) Data() []byte { + return p.data +} From 7456e5907e21fc214bfbcf92733355789c242e6c Mon Sep 17 00:00:00 2001 From: Haruue Date: Wed, 8 May 2024 19:22:17 +0800 Subject: [PATCH 2/5] refactor(pcap): switch to pcapgo --- io/pcap.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/io/pcap.go b/io/pcap.go index 520da17..1da66a7 100644 --- a/io/pcap.go +++ b/io/pcap.go @@ -3,19 +3,22 @@ package io import ( "context" "hash/crc32" + "io" "net" + "os" "sort" "strings" "time" "github.com/google/gopacket" - "github.com/google/gopacket/pcap" + "github.com/google/gopacket/pcapgo" ) var _ PacketIO = (*pcapPacketIO)(nil) type pcapPacketIO struct { - pcap *pcap.Handle + pcapFile io.ReadCloser + pcap *pcapgo.Reader lastTime *time.Time ioCancel context.CancelFunc config PcapPacketIOConfig @@ -29,12 +32,18 @@ type PcapPacketIOConfig struct { } func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) { - handle, err := pcap.OpenOffline(config.PcapFile) + pcapFile, err := os.Open(config.PcapFile) + if err != nil { + return nil, err + } + + handle, err := pcapgo.NewReader(pcapFile) if err != nil { return nil, err } return &pcapPacketIO{ + pcapFile: pcapFile, pcap: handle, lastTime: nil, ioCancel: nil, @@ -87,8 +96,7 @@ func (p *pcapPacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error { } func (p *pcapPacketIO) Close() error { - p.pcap.Close() - return nil + return p.pcapFile.Close() } // Intentionally slow down the replay From cb0427bfbbd61cee981d1371d58315e850b00323 Mon Sep 17 00:00:00 2001 From: Haruue Date: Wed, 8 May 2024 19:26:15 +0800 Subject: [PATCH 3/5] Revert "ci: install pcap for build" This reverts commit 5e15fd6dd937fa040254fe368ee2b6645d3f50ff. --- .github/workflows/check.yaml | 3 --- .github/workflows/release.yaml | 3 --- 2 files changed, 6 deletions(-) diff --git a/.github/workflows/check.yaml b/.github/workflows/check.yaml index a24f198..a1f2c4a 100644 --- a/.github/workflows/check.yaml +++ b/.github/workflows/check.yaml @@ -23,9 +23,6 @@ jobs: with: go-version: 'stable' - - name: Install pcap - run: sudo apt install -y libpcap-dev - - run: go vet ./... - name: staticcheck diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index b227e16..0da1054 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -24,9 +24,6 @@ jobs: with: go-version: "1.22" - - name: Install pcap - run: sudo apt install -y libpcap-dev - - name: Build env: GOOS: ${{ matrix.goos }} From 301f9af3d43603a88dedd222cb2c6552df2b485a Mon Sep 17 00:00:00 2001 From: Haruue Date: Wed, 8 May 2024 19:26:19 +0800 Subject: [PATCH 4/5] Revert "ci: install pcap for build 2" This reverts commit 0daaa32fc6c77fa4566266986d9a6f84722e6903. --- .github/workflows/check.yaml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/check.yaml b/.github/workflows/check.yaml index a1f2c4a..ac9e66d 100644 --- a/.github/workflows/check.yaml +++ b/.github/workflows/check.yaml @@ -44,7 +44,4 @@ jobs: with: go-version: 'stable' - - name: Install pcap - run: sudo apt install -y libpcap-dev - - run: go test ./... From 1934c065ecbe52692aeefa1f7171480588154be7 Mon Sep 17 00:00:00 2001 From: Haruue Date: Wed, 8 May 2024 19:35:17 +0800 Subject: [PATCH 5/5] feat(pcap): impl realtime wait() with time offset --- io/pcap.go | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/io/pcap.go b/io/pcap.go index 1da66a7..9801f9c 100644 --- a/io/pcap.go +++ b/io/pcap.go @@ -17,11 +17,11 @@ import ( var _ PacketIO = (*pcapPacketIO)(nil) type pcapPacketIO struct { - pcapFile io.ReadCloser - pcap *pcapgo.Reader - lastTime *time.Time - ioCancel context.CancelFunc - config PcapPacketIOConfig + pcapFile io.ReadCloser + pcap *pcapgo.Reader + timeOffset *time.Duration + ioCancel context.CancelFunc + config PcapPacketIOConfig dialer *net.Dialer } @@ -43,12 +43,12 @@ func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) { } return &pcapPacketIO{ - pcapFile: pcapFile, - pcap: handle, - lastTime: nil, - ioCancel: nil, - config: config, - dialer: &net.Dialer{}, + pcapFile: pcapFile, + pcap: handle, + timeOffset: nil, + ioCancel: nil, + config: config, + dialer: &net.Dialer{}, }, nil } @@ -101,20 +101,18 @@ func (p *pcapPacketIO) Close() error { // Intentionally slow down the replay // In realtime mode, this is to match the timestamps in the capture -func (p *pcapPacketIO) wait(packet gopacket.Packet) error { +func (p *pcapPacketIO) wait(packet gopacket.Packet) { if !p.config.Realtime { - return nil + return } - if p.lastTime == nil { - p.lastTime = &packet.Metadata().Timestamp + if p.timeOffset == nil { + offset := time.Since(packet.Metadata().Timestamp) + p.timeOffset = &offset } else { - t := packet.Metadata().Timestamp.Sub(*p.lastTime) + t := time.Until(packet.Metadata().Timestamp.Add(*p.timeOffset)) time.Sleep(t) - p.lastTime = &packet.Metadata().Timestamp } - - return nil } var _ Packet = (*pcapPacket)(nil)