Skip to content

Commit

Permalink
Add exit points when program was interrupted to functions locking the…
Browse files Browse the repository at this point in the history
… service mutex

Fix #14 and potentially #10
  • Loading branch information
perk11 committed Sep 23, 2024
1 parent f8b3c52 commit b67f4b0
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 4 deletions.
40 changes: 37 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ func startProxy(serviceConfig ServiceConfig) {
}(listener)

for {
if interrupted {
return
}
clientConnection, err := listener.Accept()
if err != nil {
log.Printf("[%s] Error accepting connection: %v", serviceConfig.Name, err)
Expand All @@ -191,6 +194,10 @@ func humanReadableConnection(conn net.Conn) string {
return fmt.Sprintf("%s->%s", conn.LocalAddr().String(), conn.RemoteAddr().String())
}
func handleConnection(clientConnection net.Conn, serviceConfig ServiceConfig) {
if interrupted {
_ = clientConnection.Close()
return
}
defer func(clientConnection net.Conn, serviceConfig ServiceConfig) {
log.Printf("[%s] Closing client connection %s on port %s", serviceConfig.Name, humanReadableConnection(clientConnection), serviceConfig.ListenPort)
err := clientConnection.Close()
Expand Down Expand Up @@ -218,6 +225,9 @@ func handleConnection(clientConnection net.Conn, serviceConfig ServiceConfig) {
}

func startServiceIfNotAlreadyRunningAndConnect(serviceConfig ServiceConfig) net.Conn {
if interrupted {
return nil
}
var serviceConnection net.Conn
runningService, found := resourceManager.maybeGetRunningService(serviceConfig.Name)
if !found {
Expand All @@ -229,6 +239,9 @@ func startServiceIfNotAlreadyRunningAndConnect(serviceConfig ServiceConfig) net.
serviceConnection = serviceConn
} else {
if !runningService.manageMutex.TryLock() {
if interrupted {
return nil
}
//The service could be currently starting or stopping, so let's wait for that to finish and try again
runningService.manageMutex.Lock()
runningService.manageMutex.Unlock()
Expand Down Expand Up @@ -273,12 +286,20 @@ func startService(serviceConfig ServiceConfig) (net.Conn, error) {
return nil, fmt.Errorf("failed to run command \"%s %s\"", serviceConfig.Command, serviceConfig.Args)
}
performHealthCheck(serviceConfig)
if interrupted {
return nil, fmt.Errorf("interrupt signal was received")
}
var serviceConnection = connectWithWaiting(serviceConfig.ProxyTargetHost, serviceConfig.ProxyTargetPort, serviceConfig.Name, 120*time.Second)

if interrupted {
return nil, fmt.Errorf("interrupt signal was received")
}
runningService.cmd = cmd

idleTimeout := getIdleTimeout(serviceConfig)
runningService.idleTimer = time.AfterFunc(idleTimeout, func() {
if interrupted {
return
}
resourceManager.serviceMutex.Lock()
defer resourceManager.serviceMutex.Unlock()

Expand All @@ -291,6 +312,9 @@ func startService(serviceConfig ServiceConfig) (net.Conn, error) {
log.Printf("[%s] Idle timeout %s reached, stopping service", serviceConfig.Name, idleTimeout)
stopService(serviceConfig.Name)
})
if interrupted {
return nil, fmt.Errorf("interrupt signal was received")
}
resourceManager.storeRunningService(serviceConfig.Name, runningService)
return serviceConnection, nil
}
Expand Down Expand Up @@ -350,6 +374,9 @@ func connectToService(serviceConfig ServiceConfig) net.Conn {
func connectWithWaiting(serviceHost string, servicePort string, serviceName string, timeout time.Duration) net.Conn {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if interrupted {
return nil
}
conn, err := net.DialTimeout("tcp", net.JoinHostPort(serviceHost, servicePort), time.Second)
if err == nil {
return conn
Expand Down Expand Up @@ -524,7 +551,12 @@ func forwardConnection(clientConnection net.Conn, serviceConnection net.Conn, se
}

func stopService(serviceName string) {
resourceManager.runningServices[serviceName].manageMutex.Lock()
if interrupted {
//Shouldn't be necessary, but there might be some locks causing issues
resourceManager.runningServices[serviceName].manageMutex.TryLock()
} else {
resourceManager.runningServices[serviceName].manageMutex.Lock()
}
runningService := resourceManager.runningServices[serviceName]
if runningService.idleTimer != nil {
runningService.idleTimer.Stop()
Expand Down Expand Up @@ -555,7 +587,9 @@ func stopService(serviceName string) {
}

releaseResources(runningService.resourceRequirements)
resourceManager.runningServices[serviceName].manageMutex.Unlock()
if !interrupted {
resourceManager.runningServices[serviceName].manageMutex.Unlock()
}
delete(resourceManager.runningServices, serviceName)
}

Expand Down
9 changes: 9 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ func connectOnly(test *testing.T, proxyAddress string) {
test.Error(err)
return
}
//give large-model-proxy time to start the service, so that it doesn't get killed before it started it
//which can lead to false positive passing tests
time.Sleep(1 * time.Second)
}

func minimal(test *testing.T, proxyAddress string) {
Expand Down Expand Up @@ -189,6 +192,12 @@ func TestAppScenarios(test *testing.T) {
"2005",
connectOnly,
},
{
"service-stuck-no-healthcheck",
"test-server/service-stuck-no-healthcheck.json",
"2006",
connectOnly,
},
}

for _, testCase := range tests {
Expand Down
2 changes: 1 addition & 1 deletion test-server/healthcheck-stuck.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"Services": [
{
"Name": "test-server_healthcheck",
"Name": "test-server_healthcheck-stuck",
"ListenPort": "2005",
"ProxyTargetHost": "localhost",
"ProxyTargetPort": "12005",
Expand Down
12 changes: 12 additions & 0 deletions test-server/service-stuck-no-healthcheck.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"Services": [
{
"Name": "test-server_service-stuck-no-healthcheck",
"ListenPort": "2006",
"ProxyTargetHost": "localhost",
"ProxyTargetPort": "12006",
"Command": "test-server/test-server",
"Args": "-p 12006 -startup-duration 1d"
}
]
}

0 comments on commit b67f4b0

Please sign in to comment.