From 347667a2bd26cf2e4cd970bb44012ea5e5f00687 Mon Sep 17 00:00:00 2001 From: Toby Date: Mon, 8 Apr 2024 11:54:35 -0700 Subject: [PATCH] feat: TCP timeout flush --- cmd/root.go | 58 ++++++++++++++++++++++++++++++++------------- engine/engine.go | 1 + engine/interface.go | 3 +++ engine/worker.go | 25 +++++++++++++++++-- 4 files changed, 68 insertions(+), 19 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 756513a..2ea21eb 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -7,6 +7,7 @@ import ( "os/signal" "strings" "syscall" + "time" "github.com/apernet/OpenGFW/analyzer" "github.com/apernet/OpenGFW/analyzer/tcp" @@ -176,11 +177,12 @@ type cliConfigIO struct { } type cliConfigWorkers struct { - Count int `mapstructure:"count"` - QueueSize int `mapstructure:"queueSize"` - TCPMaxBufferedPagesTotal int `mapstructure:"tcpMaxBufferedPagesTotal"` - TCPMaxBufferedPagesPerConn int `mapstructure:"tcpMaxBufferedPagesPerConn"` - UDPMaxStreams int `mapstructure:"udpMaxStreams"` + Count int `mapstructure:"count"` + QueueSize int `mapstructure:"queueSize"` + TCPMaxBufferedPagesTotal int `mapstructure:"tcpMaxBufferedPagesTotal"` + TCPMaxBufferedPagesPerConn int `mapstructure:"tcpMaxBufferedPagesPerConn"` + TCPTimeout time.Duration `mapstructure:"tcpTimeout"` + UDPMaxStreams int `mapstructure:"udpMaxStreams"` } type cliConfigRuleset struct { @@ -213,6 +215,7 @@ func (c *cliConfig) fillWorkers(config *engine.Config) error { config.WorkerQueueSize = c.Workers.QueueSize config.WorkerTCPMaxBufferedPagesTotal = c.Workers.TCPMaxBufferedPagesTotal config.WorkerTCPMaxBufferedPagesPerConn = c.Workers.TCPMaxBufferedPagesPerConn + config.WorkerTCPTimeout = c.Workers.TCPTimeout config.WorkerUDPMaxStreams = c.Workers.UDPMaxStreams return nil } @@ -340,12 +343,26 @@ func (l *engineLogger) TCPStreamPropUpdate(info ruleset.StreamInfo, close bool) } func (l *engineLogger) TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) { - logger.Info("TCP stream action", - zap.Int64("id", info.ID), - zap.String("src", info.SrcString()), - zap.String("dst", info.DstString()), - zap.String("action", action.String()), - zap.Bool("noMatch", noMatch)) + if noMatch { + logger.Debug("TCP stream no match", + zap.Int64("id", info.ID), + zap.String("src", info.SrcString()), + zap.String("dst", info.DstString()), + zap.String("action", action.String())) + } else { + logger.Info("TCP stream action", + zap.Int64("id", info.ID), + zap.String("src", info.SrcString()), + zap.String("dst", info.DstString()), + zap.String("action", action.String())) + } +} + +func (l *engineLogger) TCPFlush(workerID, flushed, closed int) { + logger.Debug("TCP flush", + zap.Int("workerID", workerID), + zap.Int("flushed", flushed), + zap.Int("closed", closed)) } func (l *engineLogger) UDPStreamNew(workerID int, info ruleset.StreamInfo) { @@ -366,12 +383,19 @@ func (l *engineLogger) UDPStreamPropUpdate(info ruleset.StreamInfo, close bool) } func (l *engineLogger) UDPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) { - logger.Info("UDP stream action", - zap.Int64("id", info.ID), - zap.String("src", info.SrcString()), - zap.String("dst", info.DstString()), - zap.String("action", action.String()), - zap.Bool("noMatch", noMatch)) + if noMatch { + logger.Debug("UDP stream no match", + zap.Int64("id", info.ID), + zap.String("src", info.SrcString()), + zap.String("dst", info.DstString()), + zap.String("action", action.String())) + } else { + logger.Info("UDP stream action", + zap.Int64("id", info.ID), + zap.String("src", info.SrcString()), + zap.String("dst", info.DstString()), + zap.String("action", action.String())) + } } func (l *engineLogger) ModifyError(info ruleset.StreamInfo, err error) { diff --git a/engine/engine.go b/engine/engine.go index 7c93e0a..c838e0d 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -34,6 +34,7 @@ func NewEngine(config Config) (Engine, error) { Ruleset: config.Ruleset, TCPMaxBufferedPagesTotal: config.WorkerTCPMaxBufferedPagesTotal, TCPMaxBufferedPagesPerConn: config.WorkerTCPMaxBufferedPagesPerConn, + TCPTimeout: config.WorkerTCPTimeout, UDPMaxStreams: config.WorkerUDPMaxStreams, }) if err != nil { diff --git a/engine/interface.go b/engine/interface.go index fe25de5..af106f4 100644 --- a/engine/interface.go +++ b/engine/interface.go @@ -2,6 +2,7 @@ package engine import ( "context" + "time" "github.com/apernet/OpenGFW/io" "github.com/apernet/OpenGFW/ruleset" @@ -25,6 +26,7 @@ type Config struct { WorkerQueueSize int WorkerTCPMaxBufferedPagesTotal int WorkerTCPMaxBufferedPagesPerConn int + WorkerTCPTimeout time.Duration WorkerUDPMaxStreams int } @@ -36,6 +38,7 @@ type Logger interface { TCPStreamNew(workerID int, info ruleset.StreamInfo) TCPStreamPropUpdate(info ruleset.StreamInfo, close bool) TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) + TCPFlush(workerID, flushed, closed int) UDPStreamNew(workerID int, info ruleset.StreamInfo) UDPStreamPropUpdate(info ruleset.StreamInfo, close bool) diff --git a/engine/worker.go b/engine/worker.go index 5177016..343bc57 100644 --- a/engine/worker.go +++ b/engine/worker.go @@ -2,6 +2,7 @@ package engine import ( "context" + "time" "github.com/apernet/OpenGFW/io" "github.com/apernet/OpenGFW/ruleset" @@ -14,9 +15,12 @@ import ( const ( defaultChanSize = 64 - defaultTCPMaxBufferedPagesTotal = 4096 - defaultTCPMaxBufferedPagesPerConnection = 64 + defaultTCPMaxBufferedPagesTotal = 65536 + defaultTCPMaxBufferedPagesPerConnection = 16 + defaultTCPTimeout = 10 * time.Minute defaultUDPMaxStreams = 4096 + + tcpFlushInterval = 1 * time.Minute ) type workerPacket struct { @@ -33,6 +37,7 @@ type worker struct { tcpStreamFactory *tcpStreamFactory tcpStreamPool *reassembly.StreamPool tcpAssembler *reassembly.Assembler + tcpTimeout time.Duration udpStreamFactory *udpStreamFactory udpStreamManager *udpStreamManager @@ -47,6 +52,7 @@ type workerConfig struct { Ruleset ruleset.Ruleset TCPMaxBufferedPagesTotal int TCPMaxBufferedPagesPerConn int + TCPTimeout time.Duration UDPMaxStreams int } @@ -60,6 +66,9 @@ func (c *workerConfig) fillDefaults() { if c.TCPMaxBufferedPagesPerConn <= 0 { c.TCPMaxBufferedPagesPerConn = defaultTCPMaxBufferedPagesPerConnection } + if c.TCPTimeout <= 0 { + c.TCPTimeout = defaultTCPTimeout + } if c.UDPMaxStreams <= 0 { c.UDPMaxStreams = defaultUDPMaxStreams } @@ -98,6 +107,7 @@ func newWorker(config workerConfig) (*worker, error) { tcpStreamFactory: tcpSF, tcpStreamPool: tcpStreamPool, tcpAssembler: tcpAssembler, + tcpTimeout: config.TCPTimeout, udpStreamFactory: udpSF, udpStreamManager: udpSM, modSerializeBuffer: gopacket.NewSerializeBuffer(), @@ -111,6 +121,10 @@ func (w *worker) Feed(p *workerPacket) { func (w *worker) Run(ctx context.Context) { w.logger.WorkerStart(w.id) defer w.logger.WorkerStop(w.id) + + tcpFlushTicker := time.NewTicker(tcpFlushInterval) + defer tcpFlushTicker.Stop() + for { select { case <-ctx.Done(): @@ -122,6 +136,8 @@ func (w *worker) Run(ctx context.Context) { } v, b := w.handle(wPkt.StreamID, wPkt.Packet) _ = wPkt.SetVerdict(v, b) + case <-tcpFlushTicker.C: + w.flushTCP(w.tcpTimeout) } } } @@ -176,6 +192,11 @@ func (w *worker) handleTCP(ipFlow gopacket.Flow, pMeta *gopacket.PacketMetadata, return io.Verdict(ctx.Verdict) } +func (w *worker) flushTCP(timeout time.Duration) { + flushed, closed := w.tcpAssembler.FlushCloseOlderThan(time.Now().Add(-timeout)) + w.logger.TCPFlush(w.id, flushed, closed) +} + func (w *worker) handleUDP(streamID uint32, ipFlow gopacket.Flow, udp *layers.UDP) (io.Verdict, []byte) { ctx := &udpContext{ Verdict: udpVerdictAccept,