Skip to content

Commit

Permalink
Merge pull request #453 from battlecode/jerrym-saturn
Browse files Browse the repository at this point in the history
Saturn enhancements
  • Loading branch information
j-mao authored Jan 7, 2023
2 parents 024b465 + eb8da77 commit 05d4b3b
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 17 deletions.
5 changes: 3 additions & 2 deletions backend/siarnaq/api/compete/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def enqueue_all(self):
message_id=message_id,
)
invocation.status = SaturnStatus.QUEUED
invocation.logs = f"Enqueued with ID: {message_id}"
invocation.num_failures = 0
except Exception as err:
log.error(
Expand All @@ -70,7 +69,9 @@ def enqueue_all(self):
topic=topic,
ordering_key=self._publish_ordering_key,
)
invocation.logs = f"type: {type(err)} Exception message: {err}"
invocation.logs = (
invocation.logs + f"Exception {type(err)} on enqueue!\n{err}\n"
)
self.model.objects.bulk_update(invocations, ["status", "logs", "num_failures"])


Expand Down
1 change: 1 addition & 0 deletions backend/siarnaq/api/compete/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ def for_saturn(self):
for p in self.participants.all()
}
return {
"maps": [m.name for m in self.maps.all()],
"replay": {
"bucket": settings.GCLOUD_BUCKET_SECURE,
"name": self.get_replay_path(),
Expand Down
12 changes: 12 additions & 0 deletions deploy/saturn/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ resource "google_artifact_registry_repository_iam_member" "this" {
member = "serviceAccount:${google_service_account.this.email}"
}

resource "google_project_iam_member" "log" {
project = var.gcp_project
role = "roles/logging.logWriter"
member = "serviceAccount:${google_service_account.this.email}"
}

resource "google_project_iam_member" "monitoring" {
project = var.gcp_project
role = "roles/monitoring.metricWriter"
member = "serviceAccount:${google_service_account.this.email}"
}

resource "google_pubsub_subscription" "queue" {
name = var.name
topic = var.pubsub_topic_name
Expand Down
2 changes: 2 additions & 0 deletions saturn/pkg/run/java8.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"regexp"
"strings"

"github.com/battlecode/galaxy/saturn/pkg/saturn"
"github.com/go-git/go-git/v5"
Expand Down Expand Up @@ -168,6 +169,7 @@ func (s *Java8Scaffold) RunMatch() *Step {
fmt.Sprintf("-PclassLocationB=%s", filepath.Join("data", "B")),
fmt.Sprintf("-PpackageNameA=%s", arg.Details.(ExecuteRequest).A.Package),
fmt.Sprintf("-PpackageNameB=%s", arg.Details.(ExecuteRequest).B.Package),
fmt.Sprintf("-Pmaps=%s", strings.Join(arg.Details.(ExecuteRequest).Maps, ",")),
fmt.Sprintf("-Preplay=%s", filepath.Join("data", "replay.bin")),
fmt.Sprintf("-PoutputVerbose=%t", false),
fmt.Sprintf("-PshowIndicators=%t", false),
Expand Down
1 change: 1 addition & 0 deletions saturn/pkg/run/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type CompileRequest struct {
type ExecuteRequest struct {
A Submission `mapstructure:"a"`
B Submission `mapstructure:"b"`
Maps []string `mapstructure:"maps"`
Replay FileSpecification `mapstructure:"replay"`
}

Expand Down
2 changes: 1 addition & 1 deletion saturn/pkg/run/recipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Step struct {

func (s *Step) Run(ctx context.Context, desc string, arg *StepArguments) error {
log.Ctx(ctx).Debug().Msgf(">>> Starting %s: %s", desc, s.Name)
defer log.Ctx(ctx).Debug().Msgf(">>> Ending %s", desc)
defer log.Ctx(ctx).Debug().Msgf(">>> Ending %s\n", desc)
return s.Callable(ctx, arg)
}

Expand Down
4 changes: 2 additions & 2 deletions saturn/pkg/run/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ func (s *Scaffold) Refresh(ctx context.Context) error {
log.Ctx(ctx).Debug().Msg("Pulling scaffold.")
switch err := wt.PullContext(ctx, &git.PullOptions{Auth: s.gitAuth}); true {
case err == nil:
log.Ctx(ctx).Debug().Msg("New scaffold version downloaded.")
log.Ctx(ctx).Debug().Msg("> New scaffold version downloaded.")
case errors.Is(err, git.NoErrAlreadyUpToDate):
log.Ctx(ctx).Debug().Msg("Already up to date.")
log.Ctx(ctx).Debug().Msg("> Already up to date.")
default:
return fmt.Errorf("wt.PullContext: %v", err)
}
Expand Down
6 changes: 5 additions & 1 deletion saturn/pkg/saturn/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,13 @@ func (r *GCPTokenedReporter) Report(ctx context.Context, t *Task) error {
for k, v := range t.details {
payload[k] = v
}
logs, err := ioutil.ReadAll(&t.logs)
if err != nil {
return fmt.Errorf("ioutil.ReadAll: %v", err)
}
payload["invocation"] = map[string]interface{}{
"status": t.status.String(),
"logs": t.logs.String(),
"logs": string(logs),
"interrupted": t.status == TaskInterrupted,
}

Expand Down
19 changes: 8 additions & 11 deletions saturn/pkg/saturn/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type Task struct {
}

func (t *Task) Run(ctx context.Context, r Reporter) (err error) {
defer func() {
defer func(ctx context.Context) {
switch r := recover(); r {
case taskFinished{}, nil:
log.Ctx(ctx).Info().Stringer("status", t.status).Msg("Task finished.")
Expand All @@ -100,23 +100,20 @@ func (t *Task) Run(ctx context.Context, r Reporter) (err error) {
if t.status.Retryable() {
err = fmt.Errorf("task not complete: %v", err)
}
}()
}(ctx)
defer t.Finish(TaskErrored, nil)

if err = r.Report(ctx, t); err != nil {
err = fmt.Errorf("r.Report: %v", err)
return
}

hook := zerolog.HookFunc(func(e *zerolog.Event, level zerolog.Level, message string) {
if _, err := t.logs.WriteString(message + "\n"); err != nil {
panic(err)
}
})
ctx = log.Ctx(ctx).Hook(hook).WithContext(ctx)

log.Ctx(ctx).Debug().Msg("Initializing task.")
if err = r.Report(ctx, t); err != nil {
err = fmt.Errorf("r.Report: %v", err)
return
}

log.Ctx(ctx).Debug().Msg("Running task.")
if err = t.Runner(ctx, t, t.Payload); err != nil {
err = fmt.Errorf("t.Runner: %v", err)
return
Expand All @@ -142,7 +139,7 @@ func (t *Task) FinalizeReport(ctx context.Context, r Reporter) error {
}
if ctx.Err() != nil {
t.status = TaskInterrupted
log.Ctx(ctx).Debug().Msg("This task was interrupted and will be retried soon.")
log.Ctx(ctx).Debug().Msg("System: This task was interrupted and will be retried soon.\n")
}
if err := r.Report(ctx, t); err != nil {
return fmt.Errorf("r.Report: %v", err)
Expand Down

0 comments on commit 05d4b3b

Please sign in to comment.