Skip to content
This repository has been archived by the owner on Dec 8, 2017. It is now read-only.

Commit

Permalink
Merge pull request #3 from 18F/timeout
Browse files Browse the repository at this point in the history
Various fixes found while accepting
  • Loading branch information
LinuxBozo authored Aug 22, 2017
2 parents bbae3b0 + d5aa926 commit 51452aa
Showing 1 changed file with 91 additions and 58 deletions.
149 changes: 91 additions & 58 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"errors"
"flag"
"fmt"
"io"
Expand All @@ -12,10 +11,7 @@ import (
)

var (
masterAddr *net.TCPAddr
prevMasterAddr *net.TCPAddr
raddr *net.TCPAddr
saddr *net.TCPAddr
masterAddr *net.TCPAddr

localAddr = flag.String("listen", ":9999", "local address")
sentinelAddr = flag.String("sentinel", ":26379", "remote address")
Expand All @@ -29,12 +25,8 @@ func main() {
if err != nil {
log.Fatalf("Failed to resolve local address: %s", err)
}
saddr, err = net.ResolveTCPAddr("tcp", *sentinelAddr)
if err != nil {
log.Fatalf("Failed to resolve sentinel address: %s", err)
}

stopChan := make(chan struct{})
stopChan := make(chan string)
go master(&stopChan)

listener, err := net.ListenTCP("tcp", laddr)
Expand All @@ -52,82 +44,123 @@ func main() {
}
}

func master(stopChan *chan struct{}) {
func master(stopChan *chan string) {
var err error
var possibleMaster *net.TCPAddr
for {
// has master changed from last time?
masterAddr, err = getMasterAddr(saddr, *masterName)
possibleMaster, err = getMasterAddr(*sentinelAddr, *masterName)
if err != nil {
log.Println(err)
log.Printf("[MASTER] Error polling for new master: %s\n", err)
} else {
if possibleMaster != nil && possibleMaster.String() != masterAddr.String() {
log.Printf("[MASTER] Master Address changed from %s to %s \n", masterAddr.String(), possibleMaster.String())
masterAddr = possibleMaster
close(*stopChan)
*stopChan = make(chan string)
}
}
if masterAddr.String() != prevMasterAddr.String() {
fmt.Printf("Master Address changed. Closing stopChan. %s v. %s \n", masterAddr.String(), prevMasterAddr.String())
close(*stopChan)
*stopChan = make(chan struct{})

if masterAddr == nil {
// if we haven't discovered a master at all, then slow our roll as the cluster is
// probably still coming up
time.Sleep(1 * time.Second)
} else {
// if we've seen a master before, then it's time for beast mode
time.Sleep(250 * time.Millisecond)
}
prevMasterAddr = masterAddr
time.Sleep(1 * time.Second)

}
}

func pipe(r io.Reader, w io.WriteCloser) {
io.Copy(w, r)
w.Close()
fmt.Println("Closing pipe")
func pipe(r net.Conn, w net.Conn, proxyChan chan<- string) {
bytes, err := io.Copy(w, r)
log.Printf("[PROXY %s => %s] Shutting down stream; transferred %v bytes: %v\n", w.RemoteAddr().String(), r.RemoteAddr().String(), bytes, err)
close(proxyChan)
}

// pass a stopChan to the go routtine
func proxy(local io.ReadWriteCloser, remoteAddr *net.TCPAddr, stopChan chan struct{}) {
fmt.Printf("Opening a new connection on remoteAddr, %s\n", remoteAddr)
remote, err := net.DialTCP("tcp", nil, remoteAddr)
func proxy(client *net.TCPConn, redisAddr *net.TCPAddr, stopChan <-chan string) {
redis, err := net.DialTimeout("tcp4", redisAddr.String(), 50*time.Millisecond)
if err != nil {
log.Println(err)
local.Close()
log.Printf("[PROXY %s => %s] Can't establish connection: %s\n", client.RemoteAddr().String(), redisAddr.String(), err)
client.Close()
return
}
go pipe(local, remote)
go pipe(remote, local)
<-stopChan // read from stopChan
fmt.Println("Closing Proxy")
local.Close()
}

func getMasterAddr(sentinelAddress *net.TCPAddr, masterName string) (*net.TCPAddr, error) {
conn, err := net.DialTCP("tcp", nil, sentinelAddress)
if err != nil {
return nil, err
}
log.Printf("[PROXY %s => %s] New connection\n", client.RemoteAddr().String(), redisAddr.String())
defer client.Close()
defer redis.Close()

defer conn.Close()
clientChan := make(chan string)
redisChan := make(chan string)

conn.Write([]byte(fmt.Sprintf("sentinel get-master-addr-by-name %s\n", masterName)))
go pipe(client, redis, redisChan)
go pipe(redis, client, clientChan)

b := make([]byte, 256)
_, err = conn.Read(b)
if err != nil {
log.Fatal(err)
select {
case <-stopChan:
case <-clientChan:
case <-redisChan:
}

parts := strings.Split(string(b), "\r\n")
log.Printf("[PROXY %s => %s] Closing connection\n", client.RemoteAddr().String(), redisAddr.String())
}

if len(parts) < 5 {
err = errors.New("Couldn't get master address from sentinel")
return nil, err
}
func getMasterAddr(sentinelAddress string, masterName string) (*net.TCPAddr, error) {

//getting the string address for the master node
stringaddr := fmt.Sprintf("%s:%s", parts[2], parts[4])
addr, err := net.ResolveTCPAddr("tcp", stringaddr)
sentinelHost, sentinelPort, err := net.SplitHostPort(sentinelAddress)
if err != nil {
return nil, fmt.Errorf("Can't find Sentinel: %s", err)
}

sentinels, err := net.LookupIP(sentinelHost)
if err != nil {
return nil, err
return nil, fmt.Errorf("Can't lookup Sentinel: %s", err)
}

//check that there's actually someone listening on that address
conn2, err := net.DialTCP("tcp", nil, addr)
if err == nil {
for _, sentinelIP := range sentinels {
conn, err := net.DialTimeout("tcp4", sentinelIP.String()+":"+sentinelPort, 100*time.Millisecond)
if err != nil {
log.Printf("[MASTER] Unable to connect to Sentinel at %v:%v: %v", sentinelIP, sentinelPort, err)
continue
}
defer conn.Close()

conn.Write([]byte(fmt.Sprintf("sentinel get-master-addr-by-name %s\n", masterName)))

b := make([]byte, 256)
_, err = conn.Read(b)
if err != nil {
log.Printf("[MASTER] Error reading from Sentinel %v:%v: %s", sentinelIP, sentinelPort, err)
}

parts := strings.Split(string(b), "\r\n")

if len(parts) < 5 {
log.Printf("[MASTER] Unexpected response from Sentinel %v:%v: %s", sentinelIP, sentinelPort, string(b))
continue
}

//getting the string address for the master node
stringaddr := fmt.Sprintf("%s:%s", parts[2], parts[4])
addr, err := net.ResolveTCPAddr("tcp", stringaddr)
if err != nil {
log.Printf("[MASTER] Unable to resolve new master (from %s:%s) %s: %s", sentinelIP, sentinelPort, stringaddr, err)
continue
}

//check that there's actually someone listening on that address
conn2, err := net.DialTimeout("tcp", addr.String(), 50*time.Millisecond)
if err != nil {
log.Printf("[MASTER] Error checking new master (from %s:%s) %s: %s", sentinelIP, sentinelPort, stringaddr, err)
continue
}
defer conn2.Close()

return addr, err
}

return addr, err
return nil, fmt.Errorf("No Sentinels returned a valid master: %v", sentinels)

}

0 comments on commit 51452aa

Please sign in to comment.