Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TCP Check #194

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Memcached
* InfluxDB
* Nats
* TCP

## Usage

Expand Down
12 changes: 12 additions & 0 deletions _examples/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
healthHttp "github.com/hellofresh/health-go/v5/checks/http"
healthMySql "github.com/hellofresh/health-go/v5/checks/mysql"
healthPg "github.com/hellofresh/health-go/v5/checks/postgres"
healthTcp "github.com/hellofresh/health-go/v5/checks/tcp"
)

func main() {
Expand Down Expand Up @@ -38,6 +39,17 @@ func main() {
}),
})

// tcp health check example
h.Register(health.Config{
Name: "tcp-check",
Timeout: time.Second * 5,
SkipOnErr: true,
Check: healthTcp.New(healthTcp.Config{
Host: `example.com`,
Port: 1123,
}),
})

// postgres health check example
h.Register(health.Config{
Name: "postgres-check",
Expand Down
43 changes: 43 additions & 0 deletions checks/tcp/check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package tcp

import (
"context"
"fmt"
"net"
"strconv"
"time"
)

const defaultRequestTimeout = 5 * time.Second

// Config is the TCP checker configuration settings container.
type Config struct {
// Host is the remote service hostname health check TCP.
Host string
// Port is the remote service port health check TCP.
Port int
// RequestTimeout is the duration that health check will try to consume published test message.
// If not set - 5 seconds
RequestTimeout time.Duration
}

// New creates new HTTP service health check that verifies the following:
// - connection establishing
// - getting response status from defined URL
// - verifying that status code is less than 500
func New(config Config) func(ctx context.Context) error {
if config.RequestTimeout == 0 {
config.RequestTimeout = defaultRequestTimeout
}

return func(ctx context.Context) error {
conn, err := net.DialTimeout("tcp", config.Host+":"+strconv.Itoa(config.Port), config.RequestTimeout)
if err != nil {
return fmt.Errorf("making the request for the health check failed: %w", err.Error())
}

conn.Close()

return nil
}
}
53 changes: 53 additions & 0 deletions checks/tcp/check_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package tcp

import (
"github.com/stretchr/testify/require"
"log"
"net"
"strconv"
"testing"
)

var tcp Server

func init() {
// Start the new server
tcp, err := NewServer("tcp", ":1123")
if err != nil {
log.Println("error starting TCP server")
return
}

// Run the servers in goroutines to stop blocking
go func() {
tcp.Run()
}()
}

func TestNew(t *testing.T) {

t.Run("service is available", func(t *testing.T) {

check := Config{
Host: "127.0.0.1",
Port: 1123,
RequestTimeout: defaultRequestTimeout,
}
conn, err := net.DialTimeout("tcp", check.Host+":"+strconv.Itoa(check.Port), check.RequestTimeout)
require.NoError(t, err)
defer conn.Close()
})

t.Run("service is not available", func(t *testing.T) {
check := Config{
Host: "127.0.0.1",
Port: 1124,
RequestTimeout: defaultRequestTimeout,
}

conn, err := net.DialTimeout("tcp", check.Host+":"+strconv.Itoa(check.Port), check.RequestTimeout)
require.Error(t, err)
defer conn.Close()

})
}
152 changes: 152 additions & 0 deletions checks/tcp/net.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package tcp

import (
"bufio"
"errors"
"fmt"
"log"
"net"
"strings"
)

// Server defines the minimum contract our
// TCP and UDP server implementations must satisfy.
type Server interface {
Run() error
Close() error
}

// NewServer creates a new Server using given protocol
// and addr.
func NewServer(protocol, addr string) (Server, error) {
switch strings.ToLower(protocol) {
case "tcp":
return &TCPServer{
addr: addr,
}, nil
case "udp":
return &UDPServer{
addr: addr,
}, nil
}
return nil, errors.New("Invalid protocol given")
}

// TCPServer holds the structure of our TCP
// implementation.
type TCPServer struct {
addr string
server net.Listener
}

// Run starts the TCP Server.
func (t *TCPServer) Run() (err error) {
t.server, err = net.Listen("tcp", t.addr)
if err != nil {
return err
}
defer t.Close()

for {
conn, err := t.server.Accept()
if err != nil {
err = errors.New("could not accept connection")
break
}
if conn == nil {
err = errors.New("could not create connection")
break
}
return t.handleConnections()
}
return
}

// Close shuts down the TCP Server
func (t *TCPServer) Close() (err error) {
return t.server.Close()
}

// handleConnections is used to accept connections on
// the TCPServer and handle each of them in separate
// goroutines.
func (t *TCPServer) handleConnections() (err error) {
for {
conn, err := t.server.Accept()
if err != nil || conn == nil {
err = errors.New("could not accept connection")
break
}

go t.handleConnection(conn)
}
return
}

// handleConnections deals with the business logic of
// each connection and their requests.
func (t *TCPServer) handleConnection(conn net.Conn) {
defer conn.Close()

rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
for {
req, err := rw.ReadString('\n')
if err != nil {
rw.WriteString("failed to read input")
rw.Flush()
return
}

rw.WriteString(fmt.Sprintf("Request received: %s", req))
rw.Flush()
}
}

// UDPServer holds the necessary structure for our
// UDP server.
type UDPServer struct {
addr string
server *net.UDPConn
}

// Run starts the UDP server.
func (u *UDPServer) Run() (err error) {
laddr, err := net.ResolveUDPAddr("udp", u.addr)
if err != nil {
return errors.New("could not resolve UDP addr")
}

u.server, err = net.ListenUDP("udp", laddr)
if err != nil {
return errors.New("could not listen on UDP")
}

return u.handleConnections()
}

func (u *UDPServer) handleConnections() error {
var err error
for {
buf := make([]byte, 2048)
n, conn, err := u.server.ReadFromUDP(buf)
if err != nil {
log.Println(err)
break
}
if conn == nil {
continue
}

go u.handleConnection(conn, buf[:n])
}
return err
}

func (u *UDPServer) handleConnection(addr *net.UDPAddr, cmd []byte) {
u.server.WriteToUDP([]byte(fmt.Sprintf("Request recieved: %s", cmd)), addr)
}

// Close ensures that the UDPServer is shut down gracefully.
func (u *UDPServer) Close() error {
return u.server.Close()
}
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* gRPC
* Memcached
* Nats
* TCP

## Usage

Expand Down