diff --git a/client/config.go b/client/config.go index fbafa30ea..cae5a7d55 100644 --- a/client/config.go +++ b/client/config.go @@ -62,6 +62,7 @@ type Config struct { Pprof bool `json:"pprof"` QPP bool `json:"qpp"` QPPCount int `json:"qpp-count"` + CloseWait int `json:"closewait"` } func parseJSONConfig(config *Config, path string) error { diff --git a/client/main.go b/client/main.go index afcd8ae83..c4027acd5 100644 --- a/client/main.go +++ b/client/main.go @@ -202,6 +202,11 @@ func main() { Value: 10, // nat keepalive interval in seconds Usage: "seconds between heartbeats", }, + cli.IntFlag{ + Name: "closewait", + Value: 0, + Usage: "the seconds to wait before tearing down a connection", + }, cli.StringFlag{ Name: "snmplog", Value: "", @@ -270,6 +275,7 @@ func main() { config.Pprof = c.Bool("pprof") config.QPP = c.Bool("QPP") config.QPPCount = c.Int("QPPCount") + config.CloseWait = c.Int("closewait") if c.String("c") != "" { err := parseJSONConfig(&config, c.String("c")) @@ -502,7 +508,7 @@ func main() { } } - go handleClient(_Q_, []byte(config.Key), muxes[idx].session, p1, config.Quiet) + go handleClient(_Q_, []byte(config.Key), muxes[idx].session, p1, config.Quiet, config.CloseWait) rr++ } } @@ -510,7 +516,7 @@ func main() { } // handleClient aggregates connection p1 on mux -func handleClient(_Q_ *qpp.QuantumPermutationPad, seed []byte, session *smux.Session, p1 net.Conn, quiet bool) { +func handleClient(_Q_ *qpp.QuantumPermutationPad, seed []byte, session *smux.Session, p1 net.Conn, quiet bool, closeWait int) { logln := func(v ...interface{}) { if !quiet { log.Println(v...) @@ -537,7 +543,7 @@ func handleClient(_Q_ *qpp.QuantumPermutationPad, seed []byte, session *smux.Ses } // stream layer - err1, err2 := std.Pipe(s1, s2, false) + err1, err2 := std.Pipe(s1, s2, closeWait) // handles transport layer errors if err1 != nil && err1 != io.EOF { diff --git a/server/config.go b/server/config.go index 9eca1de97..947a2488a 100644 --- a/server/config.go +++ b/server/config.go @@ -59,6 +59,7 @@ type Config struct { TCP bool `json:"tcp"` QPP bool `json:"qpp"` QPPCount int `json:"qpp-count"` + CloseWait int `json:"closewait"` } func parseJSONConfig(config *Config, path string) error { diff --git a/server/main.go b/server/main.go index b2adf25f6..8d12557f9 100644 --- a/server/main.go +++ b/server/main.go @@ -192,6 +192,11 @@ func main() { Value: 10, // nat keepalive interval in seconds Usage: "seconds between heartbeats", }, + cli.IntFlag{ + Name: "closewait", + Value: 30, + Usage: "the seconds to wait before tearing down a connection", + }, cli.StringFlag{ Name: "snmplog", Value: "", @@ -257,6 +262,7 @@ func main() { config.TCP = c.Bool("tcp") config.QPP = c.Bool("QPP") config.QPPCount = c.Int("QPPCount") + config.CloseWait = c.Int("closewait") if c.String("c") != "" { //Now only support json config file @@ -484,7 +490,7 @@ func handleMux(_Q_ *qpp.QuantumPermutationPad, conn net.Conn, config *Config) { p1.Close() return } - handleClient(_Q_, []byte(config.Key), p1, p2, config.Quiet) + handleClient(_Q_, []byte(config.Key), p1, p2, config.Quiet, config.CloseWait) case TGT_UNIX: p2, err = net.Dial("unix", config.Target) if err != nil { @@ -492,7 +498,7 @@ func handleMux(_Q_ *qpp.QuantumPermutationPad, conn net.Conn, config *Config) { p1.Close() return } - handleClient(_Q_, []byte(config.Key), p1, p2, config.Quiet) + handleClient(_Q_, []byte(config.Key), p1, p2, config.Quiet, config.CloseWait) } }(stream) @@ -500,7 +506,7 @@ func handleMux(_Q_ *qpp.QuantumPermutationPad, conn net.Conn, config *Config) { } // handleClient pipes two streams -func handleClient(_Q_ *qpp.QuantumPermutationPad, seed []byte, p1 *smux.Stream, p2 net.Conn, quiet bool) { +func handleClient(_Q_ *qpp.QuantumPermutationPad, seed []byte, p1 *smux.Stream, p2 net.Conn, quiet bool, closeWait int) { logln := func(v ...interface{}) { if !quiet { log.Println(v...) @@ -521,7 +527,7 @@ func handleClient(_Q_ *qpp.QuantumPermutationPad, seed []byte, p1 *smux.Stream, } // stream layer - err1, err2 := std.Pipe(s1, s2, true) + err1, err2 := std.Pipe(s1, s2, closeWait) // handles transport layer errors if err1 != nil && err1 != io.EOF { diff --git a/std/copy.go b/std/copy.go index f5ed66bc1..7420a5768 100644 --- a/std/copy.go +++ b/std/copy.go @@ -29,8 +29,7 @@ import ( ) const ( - bufSize = 4096 - closeWait = 30 // secs + bufSize = 4096 ) // Memory optimized io.Copy function specified for this library @@ -51,7 +50,7 @@ func Copy(dst io.Writer, src io.Reader) (written int64, err error) { } // Pipe create a general bidirectional pipe between two streams -func Pipe(alice, bob io.ReadWriteCloser, isPassive bool) (errA, errB error) { +func Pipe(alice, bob io.ReadWriteCloser, closeWait int) (errA, errB error) { var closed sync.Once var wg sync.WaitGroup @@ -60,10 +59,8 @@ func Pipe(alice, bob io.ReadWriteCloser, isPassive bool) (errA, errB error) { streamCopy := func(dst io.Writer, src io.ReadCloser, err *error) { // write error directly to the *pointer _, *err = Copy(dst, src) - - // wait for a constant time before closing the streams - if isPassive { - <-time.After(closeWait * time.Second) + if closeWait > 0 { + <-time.After(time.Duration(closeWait) * time.Second) } // wg.Done() called