diff --git a/main.go b/main.go index d528640..6645714 100644 --- a/main.go +++ b/main.go @@ -48,10 +48,50 @@ type RunningService struct { } type ResourceManager struct { + serviceMutex *sync.Mutex resourcesInUse map[string]int runningServices map[string]RunningService } +func (rm ResourceManager) getRunningService(name string) RunningService { + rm.serviceMutex.Lock() + defer rm.serviceMutex.Unlock() + return rm.runningServices[name] +} + +func (rm ResourceManager) maybeGetRunningService(name string) (RunningService, bool) { + rm.serviceMutex.Lock() + defer rm.serviceMutex.Unlock() + rs, ok := rm.runningServices[name] + return rs, ok +} + +func (rm ResourceManager) storeRunningService(name string, rs RunningService) { + rm.serviceMutex.Lock() + defer rm.serviceMutex.Unlock() + rm.runningServices[name] = rs +} + +func (rm ResourceManager) incrementConnection(name string, count int) { + rm.serviceMutex.Lock() + defer rm.serviceMutex.Unlock() + + runningService := resourceManager.runningServices[name] + runningService.activeConnections += count + resourceManager.runningServices[name] = runningService +} + +func (rm ResourceManager) createRunningService(serviceConfig ServiceConfig) RunningService { + rs := RunningService{ + resourceRequirements: serviceConfig.ResourceRequirements, + activeConnections: 0, + lastUsed: time.Now(), + manageMutex: &sync.Mutex{}, + } + rm.storeRunningService(serviceConfig.Name, rs) + return rs +} + var ( config Config resourceManager ResourceManager @@ -73,6 +113,7 @@ func main() { resourceManager = ResourceManager{ resourcesInUse: make(map[string]int), runningServices: make(map[string]RunningService), + serviceMutex: &sync.Mutex{}, } for _, service := range config.Services { @@ -81,6 +122,8 @@ func main() { for { receivedSignal := <-exit log.Printf("Received %s signal, terminating all processes", signalToString(receivedSignal)) + // no need to unlock as os.Exit will be called + resourceManager.serviceMutex.Lock() for name := range resourceManager.runningServices { stopService(name) } @@ -172,7 +215,7 @@ func handleConnection(clientConnection net.Conn, serviceConfig ServiceConfig) { func startServiceIfNotAlreadyRunningAndConnect(serviceConfig ServiceConfig) net.Conn { var serviceConnection net.Conn - runningService, found := resourceManager.runningServices[serviceConfig.Name] + runningService, found := resourceManager.maybeGetRunningService(serviceConfig.Name) if !found { serviceConn, err := startService(serviceConfig) if err != nil { @@ -209,14 +252,11 @@ func getIdleTimeout(serviceConfig ServiceConfig) time.Duration { } func startService(serviceConfig ServiceConfig) (net.Conn, error) { - resourceManager.runningServices[serviceConfig.Name] = RunningService{ - resourceRequirements: serviceConfig.ResourceRequirements, - activeConnections: 0, - lastUsed: time.Now(), - manageMutex: &sync.Mutex{}, - } - resourceManager.runningServices[serviceConfig.Name].manageMutex.Lock() - defer resourceManager.runningServices[serviceConfig.Name].manageMutex.Unlock() + runningService := resourceManager.createRunningService(serviceConfig) + + runningService.manageMutex.Lock() + defer runningService.manageMutex.Unlock() + if !reserveResources(serviceConfig.ResourceRequirements, serviceConfig.Name) { delete(resourceManager.runningServices, serviceConfig.Name) return nil, fmt.Errorf("insufficient resources %s", serviceConfig.Name) @@ -231,20 +271,23 @@ func startService(serviceConfig ServiceConfig) (net.Conn, error) { var serviceConnection = connectWithWaiting(serviceConfig.ProxyTargetHost, serviceConfig.ProxyTargetPort, serviceConfig.Name, 120*time.Second) time.Sleep(2 * time.Second) //TODO: replace with a custom callback - runningService := resourceManager.runningServices[serviceConfig.Name] runningService.cmd = cmd idleTimeout := getIdleTimeout(serviceConfig) runningService.idleTimer = time.AfterFunc(idleTimeout, func() { + resourceManager.serviceMutex.Lock() + defer resourceManager.serviceMutex.Unlock() + if !canBeStopped(serviceConfig.Name) { log.Printf("[%s] Idle timeout %s reached, but service is busy, resetting idle time", serviceConfig.Name, idleTimeout) runningService.idleTimer.Reset(getIdleTimeout(serviceConfig)) return } + log.Printf("[%s] Idle timeout %s reached, stopping service", serviceConfig.Name, idleTimeout) stopService(serviceConfig.Name) }) - resourceManager.runningServices[serviceConfig.Name] = runningService + resourceManager.storeRunningService(serviceConfig.Name, runningService) return serviceConnection, nil } @@ -255,7 +298,7 @@ func connectToService(serviceConfig ServiceConfig) net.Conn { log.Printf("[%s] Error: failed to connect to %s:%s: %v", serviceConfig.Name, serviceConfig.ProxyTargetHost, serviceConfig.ProxyTargetPort, err) if serviceConfig.RestartOnConnectionFailure { log.Printf("[%s] Restarting service due to connection error", serviceConfig.Name) - _, isRunning := resourceManager.runningServices[serviceConfig.Name] + _, isRunning := resourceManager.maybeGetRunningService(serviceConfig.Name) if isRunning { stopService(serviceConfig.Name) } @@ -327,6 +370,10 @@ func reserveResources(resourceRequirements map[string]int, requestingService str func findEarliestLastUsedServiceUsingResource(requestingService string, missingResource string) string { earliestTime := time.Now() var earliestLastUsedService string + + resourceManager.serviceMutex.Lock() + defer resourceManager.serviceMutex.Unlock() + for serviceName, service := range resourceManager.runningServices { if serviceName == requestingService { continue @@ -367,20 +414,21 @@ func findFirstMissingResource(resourceRequirements map[string]int, requestingSer } func trackServiceLastUsed(serviceConfig ServiceConfig) { - runningService := resourceManager.runningServices[serviceConfig.Name] + runningService := resourceManager.getRunningService(serviceConfig.Name) runningService.lastUsed = time.Now() if runningService.idleTimer != nil { runningService.idleTimer.Reset(getIdleTimeout(serviceConfig)) } - resourceManager.runningServices[serviceConfig.Name] = runningService + resourceManager.storeRunningService(serviceConfig.Name, runningService) } + func canBeStopped(serviceName string) bool { runningService := resourceManager.runningServices[serviceName] if !runningService.manageMutex.TryLock() { return false } runningService.manageMutex.Unlock() - return resourceManager.runningServices[serviceName].activeConnections == 0 + return runningService.activeConnections == 0 } func releaseResources(used map[string]int) { @@ -427,14 +475,9 @@ func runServiceCommand(serviceConfig ServiceConfig) *exec.Cmd { } func forwardConnection(clientConnection net.Conn, serviceConnection net.Conn, serviceName string) { - defer func() { - runningService := resourceManager.runningServices[serviceName] - runningService.activeConnections-- - resourceManager.runningServices[serviceName] = runningService - }() - runningService := resourceManager.runningServices[serviceName] - runningService.activeConnections++ - resourceManager.runningServices[serviceName] = runningService + defer resourceManager.incrementConnection(serviceName, -1) + resourceManager.incrementConnection(serviceName, 1) + go copyAndHandleErrors( serviceConnection, clientConnection,