Skip to content

Commit

Permalink
feat: Add job creator CLI run trace (#418)
Browse files Browse the repository at this point in the history
* feat: Add job creator run trace

* chore: Remove commented code
  • Loading branch information
bgins authored Nov 1, 2024
1 parent c2c4a11 commit f9ce8a7
Showing 1 changed file with 36 additions and 2 deletions.
38 changes: 36 additions & 2 deletions pkg/jobcreator/run.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package jobcreator

import (
"errors"
"fmt"
"time"

"github.com/lilypad-tech/lilypad/pkg/data"
"github.com/lilypad-tech/lilypad/pkg/system"
"github.com/lilypad-tech/lilypad/pkg/web3"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -46,20 +49,42 @@ func RunJob(
// wait a short period because we've just started the job creator service
time.Sleep(100 * time.Millisecond)

// Start run job trace
c, span := jobCreatorService.controller.tracer.Start(ctx.Ctx, "run_job",
trace.WithAttributes(
attribute.String("job_offer.job_creator", offer.JobCreator),
attribute.String("job_offer.module.repo", offer.Module.Repo),
attribute.String("job_offer.module.hash", offer.Module.Hash),
attribute.String("job_offer.mode", string(offer.Mode)),
))
ctx.Ctx = c
defer span.End()

span.AddEvent("add_job_offer.start")
jobOfferContainer, err := jobCreatorService.AddJobOffer(offer)
if err != nil {
jobCreatorService.controller.log.Error("failed to add job offer", err)
span.SetStatus(codes.Error, "failed to add job offer")
span.RecordError(err)
return nil, err
}
jobCreatorService.controller.log.Debug("job offer ID", jobOfferContainer.ID)
span.AddEvent("add_job_offer.done",
trace.WithAttributes(
attribute.String("job_offer_container.deal_id", jobOfferContainer.DealID),
attribute.String("job_offer_container.state", data.GetAgreementStateString(jobOfferContainer.State)),
))
span.SetAttributes(attribute.String("job_offer.id", jobOfferContainer.JobOffer.ID),
attribute.String("deal.id", jobOfferContainer.DealID))

updateChan := make(chan data.JobOfferContainer)

jobCreatorService.SubscribeToJobOfferUpdates(func(evOffer data.JobOfferContainer) {
// spew.Dump(evOffer)
if evOffer.JobOffer.ID != jobOfferContainer.ID {
return
}
span.AddEvent("job_offer_update",
trace.WithAttributes(attribute.String("job_offer_container.state", data.GetAgreementStateString(evOffer.State))))
updateChan <- evOffer
})

Expand All @@ -71,9 +96,14 @@ waitloop:
select {
// this means the job was cancelled
case err := <-jobCreatorErrors:
span.SetStatus(codes.Error, "job cancelled")
span.RecordError(err)
return nil, err
case <-ctx.Ctx.Done():
return nil, fmt.Errorf("job cancelled")
err = errors.New("job cancelled by closed context")
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
return nil, err
case finalJobOffer = <-updateChan:
if data.IsTerminalAgreementState(finalJobOffer.State) {
break waitloop
Expand All @@ -83,13 +113,17 @@ waitloop:

// Check if our job was cancelled
if finalJobOffer.State == data.GetAgreementStateIndex("JobOfferCancelled") {
span.SetStatus(codes.Error, "job cancelled")
span.RecordError(err)
return nil, fmt.Errorf("job was cancelled")
}

span.AddEvent("get_result.start")
result, err := jobCreatorService.GetResult(finalJobOffer.DealID)
if err != nil {
return nil, err
}
span.AddEvent("get_result.done", trace.WithAttributes(attribute.String("result.deal_id", result.DealID)))

return &RunJobResults{
JobOffer: finalJobOffer,
Expand Down

0 comments on commit f9ce8a7

Please sign in to comment.