Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[reefd] HTTP server to handle launching instances with goroutine #249

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@

__pycache__
.hypothesis

*.db
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
require (
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/aws/aws-sdk-go v1.55.5 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/containerd/stargz-snapshotter/estargz v0.15.1 // indirect
github.com/distribution/reference v0.6.0 // indirect
Expand All @@ -25,8 +26,10 @@ require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.17.10 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/mattn/go-sqlite3 v1.14.24 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU=
github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
Expand Down Expand Up @@ -43,6 +45,9 @@ github.com/google/go-containerregistry v0.20.2 h1:B1wPJ1SN/S7pB+ZAimcciVD+r+yV/l
github.com/google/go-containerregistry v0.20.2/go.mod h1:z38EKdKh4h7IP2gSfUUqEvalZBqs6AoLeWfUy34nQC8=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 h1:Wqo399gCIufwto+VfwCSvsnfGpF/w5E9CNxSwbpD6No=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0/go.mod h1:qmOFXW2epJhM0qSnUUYpldc7gVz2KMQwJ/QYCDIa7XU=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0=
Expand All @@ -51,6 +56,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
Expand Down Expand Up @@ -148,6 +155,7 @@ google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHh
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
228 changes: 228 additions & 0 deletions reefd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package main

import (
"database/sql"
"encoding/json"
"flag"
"log"
"net/http"
"os"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
_ "github.com/mattn/go-sqlite3"
)

type instanceInfo struct {
InstanceType string `json:"instance_type"`
AMI string `json:"ami"`
State string `json:"state"`
}

type launchRequest struct {
Id string `json:"id"`
DesiredState instanceInfo `json:"desired_state"`
CurrentState *instanceInfo `json:"current_state"`
InstanceId *string `json:"instance_id,omitempty"`
}

const (
region = "us-west-2"
)

func main() {
dbPath := flag.String("db", "", "Path to .db file")
flag.Parse()

if *dbPath == "" {
log.Fatal("Database path is required")
}

if _, err := os.Stat(*dbPath); err != nil {
if os.IsNotExist(err) {
log.Fatalf("File %s does not exist", *dbPath)
}
log.Fatalf("Error checking database file %s: %v", *dbPath, err)
}

db, err := sql.Open("sqlite3", *dbPath)
if err != nil {
log.Fatalf("Error connecting to database: %s", err)
}
defer db.Close()

http.HandleFunc("/launch", func(w http.ResponseWriter, r *http.Request) {
handleLaunchRequest(db, w, r)
})

log.Printf("Starting server on port 8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatalf("Server failed: %v", err)
}
}

// handleLaunchRequest retrieves the desired state from the request body and inserts it into the database
// then starts a goroutine to process the launch requests
func handleLaunchRequest(db *sql.DB, w http.ResponseWriter, r *http.Request) {
var desiredState instanceInfo
if err := json.NewDecoder(r.Body).Decode(&desiredState); err != nil {
http.Error(w, "Invalid JSON format: "+err.Error(), http.StatusBadRequest)
return
}
// marshal the desired state to a json string
desiredJSON, err := json.Marshal(desiredState)
if err != nil {
http.Error(w, "Error marshaling desired state: "+err.Error(), http.StatusInternalServerError)
return
}

// insert the desired state into the database
if _, err := db.Exec(`INSERT INTO launch_requests (desired_state) VALUES (?)`, string(desiredJSON)); err != nil {
http.Error(w, "Error inserting into database: "+err.Error(), http.StatusInternalServerError)
return
}

// start a goroutine to scan the database for launch requests with different desired and current states
go processLaunchRequests(db)
}

// launchInstance launches an instance with the given instance type and AMI
func launchInstance(instanceType, ami string) string {
sess, err := session.NewSession(&aws.Config{Region: aws.String(region)})
if err != nil {
log.Printf("Failed to create session: %v", err)
return ""
}

svc := ec2.New(sess)
log.Printf("Launching instance with type: %s, AMI: %s", instanceType, ami)

runResult, err := svc.RunInstances(&ec2.RunInstancesInput{
ImageId: aws.String(ami),
InstanceType: aws.String(instanceType),
MinCount: aws.Int64(1),
MaxCount: aws.Int64(1),
TagSpecifications: []*ec2.TagSpecification{{
ResourceType: aws.String("instance"),
Tags: []*ec2.Tag{{
Key: aws.String("Name"),
Value: aws.String("Kevin-launch"),
}},
}},
})

if err != nil {
log.Printf("Failed to launch instance: %v", err)
return ""
}
instanceID := *runResult.Instances[0].InstanceId
log.Printf("Created instance: %s", instanceID)
return instanceID
}

// getInstanceState retrieves the current state of the instance from AWS with the given instance ID
func getInstanceState(instanceID string) *instanceInfo {
sess, err := session.NewSession(&aws.Config{Region: aws.String(region)})
if err != nil {
log.Printf("Error creating session: %s", err)
return nil
}

svc := ec2.New(sess)
result, err := svc.DescribeInstances(&ec2.DescribeInstancesInput{
InstanceIds: []*string{aws.String(instanceID)},
})
if err != nil {
log.Printf("Error describing instance %s: %s", instanceID, err)
return nil
}

if len(result.Reservations) == 0 || len(result.Reservations[0].Instances) == 0 {
return nil
}

instance := result.Reservations[0].Instances[0]
return &instanceInfo{
InstanceType: *instance.InstanceType,
AMI: *instance.ImageId,
State: *instance.State.Name,
}
}

// updateCurrentState updates the current state of the existing instances in the database table
func updateCurrentState(db *sql.DB) {
rows, err := db.Query("SELECT id, instance_id FROM launch_requests WHERE instance_id IS NOT NULL")
if err != nil {
log.Printf("Error querying database: %s", err)
return
}
defer rows.Close()

currentStateMap := make(map[string]string)
for rows.Next() {
var id, instanceID string
if err := rows.Scan(&id, &instanceID); err != nil {
log.Printf("Error scanning row: %s", err)
continue
}

if state := getInstanceState(instanceID); state != nil {
if currentStateJSON, err := json.Marshal(state); err == nil {
currentStateMap[id] = string(currentStateJSON)
}
}
}

for id, currentStateJSON := range currentStateMap {
log.Printf("Updating current state for request %s: %s", id, currentStateJSON)
if _, err := db.Exec(`UPDATE launch_requests SET current_state = ? WHERE id = ?`, currentStateJSON, id); err != nil {
log.Printf("Error updating current state for request %s: %s", id, err)
}
}
}

// processLaunchRequests updates the current state of the existing instances and launches new instances for the launch requests that have not been launched yet
func processLaunchRequests(db *sql.DB) {
// update the current state of the existing instances
updateCurrentState(db)

// query all launch requests where the instance has not been launched yet
log.Printf("Scanning for launch requests with different desired and current states")
rows, err := db.Query("SELECT id, desired_state FROM launch_requests WHERE current_state IS NULL OR instance_id IS NULL")
if err != nil {
log.Printf("Error querying database: %s", err)
return
}
defer rows.Close()

instanceIDMap := make(map[string]string)

// iterate over all matching launch requests
for rows.Next() {
var id, desiredStateJSON string
if err := rows.Scan(&id, &desiredStateJSON); err != nil {
log.Printf("Error scanning row: %s", err)
continue
}

var desiredState instanceInfo
if err := json.Unmarshal([]byte(desiredStateJSON), &desiredState); err != nil {
log.Printf("Error unmarshalling desired state: %s", err)
continue
}

// launch instance
if instanceID := launchInstance(desiredState.InstanceType, desiredState.AMI); instanceID != "" {
instanceIDMap[id] = instanceID // map the id to the instance id to update on the db later
}
}

// update the launch requests with the instance id
for id, instanceID := range instanceIDMap {
log.Printf("Updating instance ID for request %s: %s", id, instanceID)
if _, err := db.Exec(`UPDATE launch_requests SET instance_id = ? WHERE id = ?`, instanceID, id); err != nil {
log.Printf("Error updating instance ID for request %s: %s", id, err)
}
}
}