Skip to content

Commit

Permalink
parameterize closeWait
Browse files Browse the repository at this point in the history
  • Loading branch information
xtaci committed Aug 31, 2024
1 parent 142ac6b commit 3ec90cd
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 14 deletions.
1 change: 1 addition & 0 deletions client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Config struct {
Pprof bool `json:"pprof"`
QPP bool `json:"qpp"`
QPPCount int `json:"qpp-count"`
CloseWait int `json:"closewait"`
}

func parseJSONConfig(config *Config, path string) error {
Expand Down
12 changes: 9 additions & 3 deletions client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ func main() {
Value: 10, // nat keepalive interval in seconds
Usage: "seconds between heartbeats",
},
cli.IntFlag{
Name: "closewait",
Value: 0,
Usage: "the seconds to wait before tearing down a connection",
},
cli.StringFlag{
Name: "snmplog",
Value: "",
Expand Down Expand Up @@ -270,6 +275,7 @@ func main() {
config.Pprof = c.Bool("pprof")
config.QPP = c.Bool("QPP")
config.QPPCount = c.Int("QPPCount")
config.CloseWait = c.Int("closewait")

if c.String("c") != "" {
err := parseJSONConfig(&config, c.String("c"))
Expand Down Expand Up @@ -502,15 +508,15 @@ func main() {
}
}

go handleClient(_Q_, []byte(config.Key), muxes[idx].session, p1, config.Quiet)
go handleClient(_Q_, []byte(config.Key), muxes[idx].session, p1, config.Quiet, config.CloseWait)
rr++
}
}
myApp.Run(os.Args)
}

// handleClient aggregates connection p1 on mux
func handleClient(_Q_ *qpp.QuantumPermutationPad, seed []byte, session *smux.Session, p1 net.Conn, quiet bool) {
func handleClient(_Q_ *qpp.QuantumPermutationPad, seed []byte, session *smux.Session, p1 net.Conn, quiet bool, closeWait int) {
logln := func(v ...interface{}) {
if !quiet {
log.Println(v...)
Expand All @@ -537,7 +543,7 @@ func handleClient(_Q_ *qpp.QuantumPermutationPad, seed []byte, session *smux.Ses
}

// stream layer
err1, err2 := std.Pipe(s1, s2, false)
err1, err2 := std.Pipe(s1, s2, closeWait)

// handles transport layer errors
if err1 != nil && err1 != io.EOF {
Expand Down
1 change: 1 addition & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Config struct {
TCP bool `json:"tcp"`
QPP bool `json:"qpp"`
QPPCount int `json:"qpp-count"`
CloseWait int `json:"closewait"`
}

func parseJSONConfig(config *Config, path string) error {
Expand Down
14 changes: 10 additions & 4 deletions server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ func main() {
Value: 10, // nat keepalive interval in seconds
Usage: "seconds between heartbeats",
},
cli.IntFlag{
Name: "closewait",
Value: 30,
Usage: "the seconds to wait before tearing down a connection",
},
cli.StringFlag{
Name: "snmplog",
Value: "",
Expand Down Expand Up @@ -257,6 +262,7 @@ func main() {
config.TCP = c.Bool("tcp")
config.QPP = c.Bool("QPP")
config.QPPCount = c.Int("QPPCount")
config.CloseWait = c.Int("closewait")

if c.String("c") != "" {
//Now only support json config file
Expand Down Expand Up @@ -484,23 +490,23 @@ func handleMux(_Q_ *qpp.QuantumPermutationPad, conn net.Conn, config *Config) {
p1.Close()
return
}
handleClient(_Q_, []byte(config.Key), p1, p2, config.Quiet)
handleClient(_Q_, []byte(config.Key), p1, p2, config.Quiet, config.CloseWait)
case TGT_UNIX:
p2, err = net.Dial("unix", config.Target)
if err != nil {
log.Println(err)
p1.Close()
return
}
handleClient(_Q_, []byte(config.Key), p1, p2, config.Quiet)
handleClient(_Q_, []byte(config.Key), p1, p2, config.Quiet, config.CloseWait)
}

}(stream)
}
}

// handleClient pipes two streams
func handleClient(_Q_ *qpp.QuantumPermutationPad, seed []byte, p1 *smux.Stream, p2 net.Conn, quiet bool) {
func handleClient(_Q_ *qpp.QuantumPermutationPad, seed []byte, p1 *smux.Stream, p2 net.Conn, quiet bool, closeWait int) {
logln := func(v ...interface{}) {
if !quiet {
log.Println(v...)
Expand All @@ -521,7 +527,7 @@ func handleClient(_Q_ *qpp.QuantumPermutationPad, seed []byte, p1 *smux.Stream,
}

// stream layer
err1, err2 := std.Pipe(s1, s2, true)
err1, err2 := std.Pipe(s1, s2, closeWait)

// handles transport layer errors
if err1 != nil && err1 != io.EOF {
Expand Down
11 changes: 4 additions & 7 deletions std/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import (
)

const (
bufSize = 4096
closeWait = 30 // secs
bufSize = 4096
)

// Memory optimized io.Copy function specified for this library
Expand All @@ -51,7 +50,7 @@ func Copy(dst io.Writer, src io.Reader) (written int64, err error) {
}

// Pipe create a general bidirectional pipe between two streams
func Pipe(alice, bob io.ReadWriteCloser, isPassive bool) (errA, errB error) {
func Pipe(alice, bob io.ReadWriteCloser, closeWait int) (errA, errB error) {
var closed sync.Once

var wg sync.WaitGroup
Expand All @@ -60,10 +59,8 @@ func Pipe(alice, bob io.ReadWriteCloser, isPassive bool) (errA, errB error) {
streamCopy := func(dst io.Writer, src io.ReadCloser, err *error) {
// write error directly to the *pointer
_, *err = Copy(dst, src)

// wait for a constant time before closing the streams
if isPassive {
<-time.After(closeWait * time.Second)
if closeWait > 0 {
<-time.After(time.Duration(closeWait) * time.Second)
}

// wg.Done() called
Expand Down

0 comments on commit 3ec90cd

Please sign in to comment.