Skip to content
This repository has been archived by the owner on Dec 8, 2017. It is now read-only.

Commit

Permalink
Merge pull request #1 from rogeruiz/pdb-fixes
Browse files Browse the repository at this point in the history
Add ability to close connections if the master IP address changes
  • Loading branch information
LinuxBozo authored Jul 27, 2017
2 parents d51e77e + 1bdc14a commit bbae3b0
Showing 1 changed file with 24 additions and 10 deletions.
34 changes: 24 additions & 10 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
)

var (
masterAddr *net.TCPAddr
raddr *net.TCPAddr
saddr *net.TCPAddr
masterAddr *net.TCPAddr
prevMasterAddr *net.TCPAddr
raddr *net.TCPAddr
saddr *net.TCPAddr

localAddr = flag.String("listen", ":9999", "local address")
sentinelAddr = flag.String("sentinel", ":26379", "remote address")
Expand All @@ -26,14 +27,15 @@ func main() {

laddr, err := net.ResolveTCPAddr("tcp", *localAddr)
if err != nil {
log.Fatal("Failed to resolve local address: %s", err)
log.Fatalf("Failed to resolve local address: %s", err)
}
saddr, err = net.ResolveTCPAddr("tcp", *sentinelAddr)
if err != nil {
log.Fatal("Failed to resolve sentinel address: %s", err)
log.Fatalf("Failed to resolve sentinel address: %s", err)
}

go master()
stopChan := make(chan struct{})
go master(&stopChan)

listener, err := net.ListenTCP("tcp", laddr)
if err != nil {
Expand All @@ -46,28 +48,37 @@ func main() {
log.Println(err)
continue
}

go proxy(conn, masterAddr)
go proxy(conn, masterAddr, stopChan)
}
}

func master() {
func master(stopChan *chan struct{}) {
var err error
for {
// has master changed from last time?
masterAddr, err = getMasterAddr(saddr, *masterName)
if err != nil {
log.Println(err)
}
if masterAddr.String() != prevMasterAddr.String() {
fmt.Printf("Master Address changed. Closing stopChan. %s v. %s \n", masterAddr.String(), prevMasterAddr.String())
close(*stopChan)
*stopChan = make(chan struct{})
}
prevMasterAddr = masterAddr
time.Sleep(1 * time.Second)
}
}

func pipe(r io.Reader, w io.WriteCloser) {
io.Copy(w, r)
w.Close()
fmt.Println("Closing pipe")
}

func proxy(local io.ReadWriteCloser, remoteAddr *net.TCPAddr) {
// pass a stopChan to the go routtine
func proxy(local io.ReadWriteCloser, remoteAddr *net.TCPAddr, stopChan chan struct{}) {
fmt.Printf("Opening a new connection on remoteAddr, %s\n", remoteAddr)
remote, err := net.DialTCP("tcp", nil, remoteAddr)
if err != nil {
log.Println(err)
Expand All @@ -76,6 +87,9 @@ func proxy(local io.ReadWriteCloser, remoteAddr *net.TCPAddr) {
}
go pipe(local, remote)
go pipe(remote, local)
<-stopChan // read from stopChan
fmt.Println("Closing Proxy")
local.Close()
}

func getMasterAddr(sentinelAddress *net.TCPAddr, masterName string) (*net.TCPAddr, error) {
Expand Down

0 comments on commit bbae3b0

Please sign in to comment.