diff --git a/README.md b/README.md index 95803c4..9467978 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ * Memcached * InfluxDB * Nats + * TCP ## Usage diff --git a/_examples/server.go b/_examples/server.go index 12cf85b..4636d54 100644 --- a/_examples/server.go +++ b/_examples/server.go @@ -10,6 +10,7 @@ import ( healthHttp "github.com/hellofresh/health-go/v5/checks/http" healthMySql "github.com/hellofresh/health-go/v5/checks/mysql" healthPg "github.com/hellofresh/health-go/v5/checks/postgres" + healthTcp "github.com/hellofresh/health-go/v5/checks/tcp" ) func main() { @@ -38,6 +39,17 @@ func main() { }), }) + // tcp health check example + h.Register(health.Config{ + Name: "tcp-check", + Timeout: time.Second * 5, + SkipOnErr: true, + Check: healthTcp.New(healthTcp.Config{ + Host: `example.com`, + Port: 1123, + }), + }) + // postgres health check example h.Register(health.Config{ Name: "postgres-check", diff --git a/checks/tcp/check.go b/checks/tcp/check.go new file mode 100644 index 0000000..8a83d2a --- /dev/null +++ b/checks/tcp/check.go @@ -0,0 +1,43 @@ +package tcp + +import ( + "context" + "fmt" + "net" + "strconv" + "time" +) + +const defaultRequestTimeout = 5 * time.Second + +// Config is the TCP checker configuration settings container. +type Config struct { + // Host is the remote service hostname health check TCP. + Host string + // Port is the remote service port health check TCP. + Port int + // RequestTimeout is the duration that health check will try to consume published test message. + // If not set - 5 seconds + RequestTimeout time.Duration +} + +// New creates new HTTP service health check that verifies the following: +// - connection establishing +// - getting response status from defined URL +// - verifying that status code is less than 500 +func New(config Config) func(ctx context.Context) error { + if config.RequestTimeout == 0 { + config.RequestTimeout = defaultRequestTimeout + } + + return func(ctx context.Context) error { + conn, err := net.DialTimeout("tcp", config.Host+":"+strconv.Itoa(config.Port), config.RequestTimeout) + if err != nil { + return fmt.Errorf("making the request for the health check failed: %w", err.Error()) + } + + conn.Close() + + return nil + } +} diff --git a/checks/tcp/check_test.go b/checks/tcp/check_test.go new file mode 100644 index 0000000..dd0554c --- /dev/null +++ b/checks/tcp/check_test.go @@ -0,0 +1,53 @@ +package tcp + +import ( + "github.com/stretchr/testify/require" + "log" + "net" + "strconv" + "testing" +) + +var tcp Server + +func init() { + // Start the new server + tcp, err := NewServer("tcp", ":1123") + if err != nil { + log.Println("error starting TCP server") + return + } + + // Run the servers in goroutines to stop blocking + go func() { + tcp.Run() + }() +} + +func TestNew(t *testing.T) { + + t.Run("service is available", func(t *testing.T) { + + check := Config{ + Host: "127.0.0.1", + Port: 1123, + RequestTimeout: defaultRequestTimeout, + } + conn, err := net.DialTimeout("tcp", check.Host+":"+strconv.Itoa(check.Port), check.RequestTimeout) + require.NoError(t, err) + defer conn.Close() + }) + + t.Run("service is not available", func(t *testing.T) { + check := Config{ + Host: "127.0.0.1", + Port: 1124, + RequestTimeout: defaultRequestTimeout, + } + + conn, err := net.DialTimeout("tcp", check.Host+":"+strconv.Itoa(check.Port), check.RequestTimeout) + require.Error(t, err) + defer conn.Close() + + }) +} diff --git a/checks/tcp/net.go b/checks/tcp/net.go new file mode 100644 index 0000000..bb95636 --- /dev/null +++ b/checks/tcp/net.go @@ -0,0 +1,152 @@ +package tcp + +import ( + "bufio" + "errors" + "fmt" + "log" + "net" + "strings" +) + +// Server defines the minimum contract our +// TCP and UDP server implementations must satisfy. +type Server interface { + Run() error + Close() error +} + +// NewServer creates a new Server using given protocol +// and addr. +func NewServer(protocol, addr string) (Server, error) { + switch strings.ToLower(protocol) { + case "tcp": + return &TCPServer{ + addr: addr, + }, nil + case "udp": + return &UDPServer{ + addr: addr, + }, nil + } + return nil, errors.New("Invalid protocol given") +} + +// TCPServer holds the structure of our TCP +// implementation. +type TCPServer struct { + addr string + server net.Listener +} + +// Run starts the TCP Server. +func (t *TCPServer) Run() (err error) { + t.server, err = net.Listen("tcp", t.addr) + if err != nil { + return err + } + defer t.Close() + + for { + conn, err := t.server.Accept() + if err != nil { + err = errors.New("could not accept connection") + break + } + if conn == nil { + err = errors.New("could not create connection") + break + } + return t.handleConnections() + } + return +} + +// Close shuts down the TCP Server +func (t *TCPServer) Close() (err error) { + return t.server.Close() +} + +// handleConnections is used to accept connections on +// the TCPServer and handle each of them in separate +// goroutines. +func (t *TCPServer) handleConnections() (err error) { + for { + conn, err := t.server.Accept() + if err != nil || conn == nil { + err = errors.New("could not accept connection") + break + } + + go t.handleConnection(conn) + } + return +} + +// handleConnections deals with the business logic of +// each connection and their requests. +func (t *TCPServer) handleConnection(conn net.Conn) { + defer conn.Close() + + rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + for { + req, err := rw.ReadString('\n') + if err != nil { + rw.WriteString("failed to read input") + rw.Flush() + return + } + + rw.WriteString(fmt.Sprintf("Request received: %s", req)) + rw.Flush() + } +} + +// UDPServer holds the necessary structure for our +// UDP server. +type UDPServer struct { + addr string + server *net.UDPConn +} + +// Run starts the UDP server. +func (u *UDPServer) Run() (err error) { + laddr, err := net.ResolveUDPAddr("udp", u.addr) + if err != nil { + return errors.New("could not resolve UDP addr") + } + + u.server, err = net.ListenUDP("udp", laddr) + if err != nil { + return errors.New("could not listen on UDP") + } + + return u.handleConnections() +} + +func (u *UDPServer) handleConnections() error { + var err error + for { + buf := make([]byte, 2048) + n, conn, err := u.server.ReadFromUDP(buf) + if err != nil { + log.Println(err) + break + } + if conn == nil { + continue + } + + go u.handleConnection(conn, buf[:n]) + } + return err +} + +func (u *UDPServer) handleConnection(addr *net.UDPAddr, cmd []byte) { + u.server.WriteToUDP([]byte(fmt.Sprintf("Request recieved: %s", cmd)), addr) +} + +// Close ensures that the UDPServer is shut down gracefully. +func (u *UDPServer) Close() error { + return u.server.Close() +} diff --git a/docs/index.md b/docs/index.md index 555e161..5f324b8 100644 --- a/docs/index.md +++ b/docs/index.md @@ -13,6 +13,7 @@ * gRPC * Memcached * Nats + * TCP ## Usage