Skip to content

Commit

Permalink
Merge pull request #124 from apernet/wip-tcp-flush
Browse files Browse the repository at this point in the history
feat: TCP timeout flush
  • Loading branch information
tobyxdd authored Apr 9, 2024
2 parents 393c29b + 347667a commit 5f447d4
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 19 deletions.
58 changes: 41 additions & 17 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os/signal"
"strings"
"syscall"
"time"

"github.com/apernet/OpenGFW/analyzer"
"github.com/apernet/OpenGFW/analyzer/tcp"
Expand Down Expand Up @@ -176,11 +177,12 @@ type cliConfigIO struct {
}

type cliConfigWorkers struct {
Count int `mapstructure:"count"`
QueueSize int `mapstructure:"queueSize"`
TCPMaxBufferedPagesTotal int `mapstructure:"tcpMaxBufferedPagesTotal"`
TCPMaxBufferedPagesPerConn int `mapstructure:"tcpMaxBufferedPagesPerConn"`
UDPMaxStreams int `mapstructure:"udpMaxStreams"`
Count int `mapstructure:"count"`
QueueSize int `mapstructure:"queueSize"`
TCPMaxBufferedPagesTotal int `mapstructure:"tcpMaxBufferedPagesTotal"`
TCPMaxBufferedPagesPerConn int `mapstructure:"tcpMaxBufferedPagesPerConn"`
TCPTimeout time.Duration `mapstructure:"tcpTimeout"`
UDPMaxStreams int `mapstructure:"udpMaxStreams"`
}

type cliConfigRuleset struct {
Expand Down Expand Up @@ -213,6 +215,7 @@ func (c *cliConfig) fillWorkers(config *engine.Config) error {
config.WorkerQueueSize = c.Workers.QueueSize
config.WorkerTCPMaxBufferedPagesTotal = c.Workers.TCPMaxBufferedPagesTotal
config.WorkerTCPMaxBufferedPagesPerConn = c.Workers.TCPMaxBufferedPagesPerConn
config.WorkerTCPTimeout = c.Workers.TCPTimeout
config.WorkerUDPMaxStreams = c.Workers.UDPMaxStreams
return nil
}
Expand Down Expand Up @@ -340,12 +343,26 @@ func (l *engineLogger) TCPStreamPropUpdate(info ruleset.StreamInfo, close bool)
}

func (l *engineLogger) TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) {
logger.Info("TCP stream action",
zap.Int64("id", info.ID),
zap.String("src", info.SrcString()),
zap.String("dst", info.DstString()),
zap.String("action", action.String()),
zap.Bool("noMatch", noMatch))
if noMatch {
logger.Debug("TCP stream no match",
zap.Int64("id", info.ID),
zap.String("src", info.SrcString()),
zap.String("dst", info.DstString()),
zap.String("action", action.String()))
} else {
logger.Info("TCP stream action",
zap.Int64("id", info.ID),
zap.String("src", info.SrcString()),
zap.String("dst", info.DstString()),
zap.String("action", action.String()))
}
}

func (l *engineLogger) TCPFlush(workerID, flushed, closed int) {
logger.Debug("TCP flush",
zap.Int("workerID", workerID),
zap.Int("flushed", flushed),
zap.Int("closed", closed))
}

func (l *engineLogger) UDPStreamNew(workerID int, info ruleset.StreamInfo) {
Expand All @@ -366,12 +383,19 @@ func (l *engineLogger) UDPStreamPropUpdate(info ruleset.StreamInfo, close bool)
}

func (l *engineLogger) UDPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) {
logger.Info("UDP stream action",
zap.Int64("id", info.ID),
zap.String("src", info.SrcString()),
zap.String("dst", info.DstString()),
zap.String("action", action.String()),
zap.Bool("noMatch", noMatch))
if noMatch {
logger.Debug("UDP stream no match",
zap.Int64("id", info.ID),
zap.String("src", info.SrcString()),
zap.String("dst", info.DstString()),
zap.String("action", action.String()))
} else {
logger.Info("UDP stream action",
zap.Int64("id", info.ID),
zap.String("src", info.SrcString()),
zap.String("dst", info.DstString()),
zap.String("action", action.String()))
}
}

func (l *engineLogger) ModifyError(info ruleset.StreamInfo, err error) {
Expand Down
1 change: 1 addition & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func NewEngine(config Config) (Engine, error) {
Ruleset: config.Ruleset,
TCPMaxBufferedPagesTotal: config.WorkerTCPMaxBufferedPagesTotal,
TCPMaxBufferedPagesPerConn: config.WorkerTCPMaxBufferedPagesPerConn,
TCPTimeout: config.WorkerTCPTimeout,
UDPMaxStreams: config.WorkerUDPMaxStreams,
})
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions engine/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package engine

import (
"context"
"time"

"github.com/apernet/OpenGFW/io"
"github.com/apernet/OpenGFW/ruleset"
Expand All @@ -25,6 +26,7 @@ type Config struct {
WorkerQueueSize int
WorkerTCPMaxBufferedPagesTotal int
WorkerTCPMaxBufferedPagesPerConn int
WorkerTCPTimeout time.Duration
WorkerUDPMaxStreams int
}

Expand All @@ -36,6 +38,7 @@ type Logger interface {
TCPStreamNew(workerID int, info ruleset.StreamInfo)
TCPStreamPropUpdate(info ruleset.StreamInfo, close bool)
TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool)
TCPFlush(workerID, flushed, closed int)

UDPStreamNew(workerID int, info ruleset.StreamInfo)
UDPStreamPropUpdate(info ruleset.StreamInfo, close bool)
Expand Down
25 changes: 23 additions & 2 deletions engine/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package engine

import (
"context"
"time"

"github.com/apernet/OpenGFW/io"
"github.com/apernet/OpenGFW/ruleset"
Expand All @@ -14,9 +15,12 @@ import (

const (
defaultChanSize = 64
defaultTCPMaxBufferedPagesTotal = 4096
defaultTCPMaxBufferedPagesPerConnection = 64
defaultTCPMaxBufferedPagesTotal = 65536
defaultTCPMaxBufferedPagesPerConnection = 16
defaultTCPTimeout = 10 * time.Minute
defaultUDPMaxStreams = 4096

tcpFlushInterval = 1 * time.Minute
)

type workerPacket struct {
Expand All @@ -33,6 +37,7 @@ type worker struct {
tcpStreamFactory *tcpStreamFactory
tcpStreamPool *reassembly.StreamPool
tcpAssembler *reassembly.Assembler
tcpTimeout time.Duration

udpStreamFactory *udpStreamFactory
udpStreamManager *udpStreamManager
Expand All @@ -47,6 +52,7 @@ type workerConfig struct {
Ruleset ruleset.Ruleset
TCPMaxBufferedPagesTotal int
TCPMaxBufferedPagesPerConn int
TCPTimeout time.Duration
UDPMaxStreams int
}

Expand All @@ -60,6 +66,9 @@ func (c *workerConfig) fillDefaults() {
if c.TCPMaxBufferedPagesPerConn <= 0 {
c.TCPMaxBufferedPagesPerConn = defaultTCPMaxBufferedPagesPerConnection
}
if c.TCPTimeout <= 0 {
c.TCPTimeout = defaultTCPTimeout
}
if c.UDPMaxStreams <= 0 {
c.UDPMaxStreams = defaultUDPMaxStreams
}
Expand Down Expand Up @@ -98,6 +107,7 @@ func newWorker(config workerConfig) (*worker, error) {
tcpStreamFactory: tcpSF,
tcpStreamPool: tcpStreamPool,
tcpAssembler: tcpAssembler,
tcpTimeout: config.TCPTimeout,
udpStreamFactory: udpSF,
udpStreamManager: udpSM,
modSerializeBuffer: gopacket.NewSerializeBuffer(),
Expand All @@ -111,6 +121,10 @@ func (w *worker) Feed(p *workerPacket) {
func (w *worker) Run(ctx context.Context) {
w.logger.WorkerStart(w.id)
defer w.logger.WorkerStop(w.id)

tcpFlushTicker := time.NewTicker(tcpFlushInterval)
defer tcpFlushTicker.Stop()

for {
select {
case <-ctx.Done():
Expand All @@ -122,6 +136,8 @@ func (w *worker) Run(ctx context.Context) {
}
v, b := w.handle(wPkt.StreamID, wPkt.Packet)
_ = wPkt.SetVerdict(v, b)
case <-tcpFlushTicker.C:
w.flushTCP(w.tcpTimeout)
}
}
}
Expand Down Expand Up @@ -176,6 +192,11 @@ func (w *worker) handleTCP(ipFlow gopacket.Flow, pMeta *gopacket.PacketMetadata,
return io.Verdict(ctx.Verdict)
}

func (w *worker) flushTCP(timeout time.Duration) {
flushed, closed := w.tcpAssembler.FlushCloseOlderThan(time.Now().Add(-timeout))
w.logger.TCPFlush(w.id, flushed, closed)
}

func (w *worker) handleUDP(streamID uint32, ipFlow gopacket.Flow, udp *layers.UDP) (io.Verdict, []byte) {
ctx := &udpContext{
Verdict: udpVerdictAccept,
Expand Down

0 comments on commit 5f447d4

Please sign in to comment.