Skip to content

Commit

Permalink
Wrap runningServices map in mutex (#6)
Browse files Browse the repository at this point in the history
Fixes #5
  • Loading branch information
lun-4 authored Aug 7, 2024
1 parent 98ec444 commit f063dc4
Showing 1 changed file with 66 additions and 23 deletions.
89 changes: 66 additions & 23 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,50 @@ type RunningService struct {
}

type ResourceManager struct {
serviceMutex *sync.Mutex
resourcesInUse map[string]int
runningServices map[string]RunningService
}

func (rm ResourceManager) getRunningService(name string) RunningService {
rm.serviceMutex.Lock()
defer rm.serviceMutex.Unlock()
return rm.runningServices[name]
}

func (rm ResourceManager) maybeGetRunningService(name string) (RunningService, bool) {
rm.serviceMutex.Lock()
defer rm.serviceMutex.Unlock()
rs, ok := rm.runningServices[name]
return rs, ok
}

func (rm ResourceManager) storeRunningService(name string, rs RunningService) {
rm.serviceMutex.Lock()
defer rm.serviceMutex.Unlock()
rm.runningServices[name] = rs
}

func (rm ResourceManager) incrementConnection(name string, count int) {
rm.serviceMutex.Lock()
defer rm.serviceMutex.Unlock()

runningService := resourceManager.runningServices[name]
runningService.activeConnections += count
resourceManager.runningServices[name] = runningService
}

func (rm ResourceManager) createRunningService(serviceConfig ServiceConfig) RunningService {
rs := RunningService{
resourceRequirements: serviceConfig.ResourceRequirements,
activeConnections: 0,
lastUsed: time.Now(),
manageMutex: &sync.Mutex{},
}
rm.storeRunningService(serviceConfig.Name, rs)
return rs
}

var (
config Config
resourceManager ResourceManager
Expand All @@ -73,6 +113,7 @@ func main() {
resourceManager = ResourceManager{
resourcesInUse: make(map[string]int),
runningServices: make(map[string]RunningService),
serviceMutex: &sync.Mutex{},
}

for _, service := range config.Services {
Expand All @@ -81,6 +122,8 @@ func main() {
for {
receivedSignal := <-exit
log.Printf("Received %s signal, terminating all processes", signalToString(receivedSignal))
// no need to unlock as os.Exit will be called
resourceManager.serviceMutex.Lock()
for name := range resourceManager.runningServices {
stopService(name)
}
Expand Down Expand Up @@ -172,7 +215,7 @@ func handleConnection(clientConnection net.Conn, serviceConfig ServiceConfig) {

func startServiceIfNotAlreadyRunningAndConnect(serviceConfig ServiceConfig) net.Conn {
var serviceConnection net.Conn
runningService, found := resourceManager.runningServices[serviceConfig.Name]
runningService, found := resourceManager.maybeGetRunningService(serviceConfig.Name)
if !found {
serviceConn, err := startService(serviceConfig)
if err != nil {
Expand Down Expand Up @@ -209,14 +252,11 @@ func getIdleTimeout(serviceConfig ServiceConfig) time.Duration {
}

func startService(serviceConfig ServiceConfig) (net.Conn, error) {
resourceManager.runningServices[serviceConfig.Name] = RunningService{
resourceRequirements: serviceConfig.ResourceRequirements,
activeConnections: 0,
lastUsed: time.Now(),
manageMutex: &sync.Mutex{},
}
resourceManager.runningServices[serviceConfig.Name].manageMutex.Lock()
defer resourceManager.runningServices[serviceConfig.Name].manageMutex.Unlock()
runningService := resourceManager.createRunningService(serviceConfig)

runningService.manageMutex.Lock()
defer runningService.manageMutex.Unlock()

if !reserveResources(serviceConfig.ResourceRequirements, serviceConfig.Name) {
delete(resourceManager.runningServices, serviceConfig.Name)
return nil, fmt.Errorf("insufficient resources %s", serviceConfig.Name)
Expand All @@ -231,20 +271,23 @@ func startService(serviceConfig ServiceConfig) (net.Conn, error) {
var serviceConnection = connectWithWaiting(serviceConfig.ProxyTargetHost, serviceConfig.ProxyTargetPort, serviceConfig.Name, 120*time.Second)
time.Sleep(2 * time.Second) //TODO: replace with a custom callback

runningService := resourceManager.runningServices[serviceConfig.Name]
runningService.cmd = cmd

idleTimeout := getIdleTimeout(serviceConfig)
runningService.idleTimer = time.AfterFunc(idleTimeout, func() {
resourceManager.serviceMutex.Lock()
defer resourceManager.serviceMutex.Unlock()

if !canBeStopped(serviceConfig.Name) {
log.Printf("[%s] Idle timeout %s reached, but service is busy, resetting idle time", serviceConfig.Name, idleTimeout)
runningService.idleTimer.Reset(getIdleTimeout(serviceConfig))
return
}

log.Printf("[%s] Idle timeout %s reached, stopping service", serviceConfig.Name, idleTimeout)
stopService(serviceConfig.Name)
})
resourceManager.runningServices[serviceConfig.Name] = runningService
resourceManager.storeRunningService(serviceConfig.Name, runningService)
return serviceConnection, nil
}

Expand All @@ -255,7 +298,7 @@ func connectToService(serviceConfig ServiceConfig) net.Conn {
log.Printf("[%s] Error: failed to connect to %s:%s: %v", serviceConfig.Name, serviceConfig.ProxyTargetHost, serviceConfig.ProxyTargetPort, err)
if serviceConfig.RestartOnConnectionFailure {
log.Printf("[%s] Restarting service due to connection error", serviceConfig.Name)
_, isRunning := resourceManager.runningServices[serviceConfig.Name]
_, isRunning := resourceManager.maybeGetRunningService(serviceConfig.Name)
if isRunning {
stopService(serviceConfig.Name)
}
Expand Down Expand Up @@ -327,6 +370,10 @@ func reserveResources(resourceRequirements map[string]int, requestingService str
func findEarliestLastUsedServiceUsingResource(requestingService string, missingResource string) string {
earliestTime := time.Now()
var earliestLastUsedService string

resourceManager.serviceMutex.Lock()
defer resourceManager.serviceMutex.Unlock()

for serviceName, service := range resourceManager.runningServices {
if serviceName == requestingService {
continue
Expand Down Expand Up @@ -367,20 +414,21 @@ func findFirstMissingResource(resourceRequirements map[string]int, requestingSer
}

func trackServiceLastUsed(serviceConfig ServiceConfig) {
runningService := resourceManager.runningServices[serviceConfig.Name]
runningService := resourceManager.getRunningService(serviceConfig.Name)
runningService.lastUsed = time.Now()
if runningService.idleTimer != nil {
runningService.idleTimer.Reset(getIdleTimeout(serviceConfig))
}
resourceManager.runningServices[serviceConfig.Name] = runningService
resourceManager.storeRunningService(serviceConfig.Name, runningService)
}

func canBeStopped(serviceName string) bool {
runningService := resourceManager.runningServices[serviceName]
if !runningService.manageMutex.TryLock() {
return false
}
runningService.manageMutex.Unlock()
return resourceManager.runningServices[serviceName].activeConnections == 0
return runningService.activeConnections == 0
}

func releaseResources(used map[string]int) {
Expand Down Expand Up @@ -427,14 +475,9 @@ func runServiceCommand(serviceConfig ServiceConfig) *exec.Cmd {
}

func forwardConnection(clientConnection net.Conn, serviceConnection net.Conn, serviceName string) {
defer func() {
runningService := resourceManager.runningServices[serviceName]
runningService.activeConnections--
resourceManager.runningServices[serviceName] = runningService
}()
runningService := resourceManager.runningServices[serviceName]
runningService.activeConnections++
resourceManager.runningServices[serviceName] = runningService
defer resourceManager.incrementConnection(serviceName, -1)
resourceManager.incrementConnection(serviceName, 1)

go copyAndHandleErrors(
serviceConnection,
clientConnection,
Expand Down

0 comments on commit f063dc4

Please sign in to comment.