Skip to content

Commit

Permalink
[fix] ID Generation
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier committed Dec 9, 2024
1 parent f84fee1 commit 69e75a2
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 6 deletions.
19 changes: 16 additions & 3 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package syncer
import (
"bytes"
"context"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
Expand Down Expand Up @@ -405,6 +407,17 @@ func (h *eventHandler) workflowRegisteredEvent(
return fmt.Errorf("failed to fetch binary from %s : %w", payload.BinaryURL, err)
}

sum := sha256.New()
sum.Write(binary)
sumh := sum.Sum(nil)
h.lggr.Debugf("decoding URL: %s, id %x, sha %x, workflowOwner", payload.BinaryURL, payload.WorkflowID, sumh, payload.WorkflowOwner)

// Base64 decode
decodedBinary, err := base64.StdEncoding.DecodeString(string(binary))
if err != nil {
return fmt.Errorf("failed to decode binary: %w", err)
}

var config []byte
if payload.ConfigURL != "" {
c, err := h.fetcher(ctx, payload.ConfigURL)
Expand All @@ -424,7 +437,7 @@ func (h *eventHandler) workflowRegisteredEvent(
}

// Calculate the hash of the binary and config files
hash, err := pkgworkflows.GenerateWorkflowID(payload.WorkflowOwner, binary, config, payload.SecretsURL)
hash, err := pkgworkflows.GenerateWorkflowID(payload.WorkflowOwner, decodedBinary, config, payload.SecretsURL)
if err != nil {
return fmt.Errorf("failed to generate workflow id: %w", err)
}
Expand All @@ -448,7 +461,7 @@ func (h *eventHandler) workflowRegisteredEvent(

wfID := hex.EncodeToString(payload.WorkflowID[:])
entry := &job.WorkflowSpec{
Workflow: hex.EncodeToString(binary),
Workflow: hex.EncodeToString(decodedBinary),
Config: string(config),
WorkflowID: wfID,
Status: status,
Expand All @@ -474,7 +487,7 @@ func (h *eventHandler) workflowRegisteredEvent(
string(payload.WorkflowOwner),
payload.WorkflowName,
config,
binary,
decodedBinary,
)
if err != nil {
return fmt.Errorf("failed to create workflow engine", err)
Expand Down
26 changes: 23 additions & 3 deletions core/services/workflows/syncer/workflow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,19 @@ type GetWorkflowMetadataListByDONParams struct {
Limit uint64
}

type GetWorkflowMetadata struct {
WorkflowID [32]byte
Owner []byte
DonID uint32
Status uint8
WorkflowName string
BinaryURL string
ConfigURL string
SecretsURL string
}

type GetWorkflowMetadataListByDONReturnVal struct {
WorkflowMetadataList []WorkflowRegistryWorkflowRegisteredV1
WorkflowMetadataList []GetWorkflowMetadata
}

// WorkflowRegistryEvent is an event emitted by the WorkflowRegistry. Each event is typed
Expand Down Expand Up @@ -512,7 +523,6 @@ func queryEvent(
responseBatch = append(responseBatch, toWorkflowRegistryEventResponse(log, et, lggr))
cursor = log.Cursor
}
lggr.Debug("composed response batch", "responseBatch", responseBatch)
batchCh <- responseBatch
}
}
Expand Down Expand Up @@ -676,8 +686,18 @@ func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context, don

l.lggr.Debugw("Rehydrating existing workflows", "len", len(workflows.WorkflowMetadataList))
for _, workflow := range workflows.WorkflowMetadataList {
toRegisteredEvent := WorkflowRegistryWorkflowRegisteredV1{
WorkflowID: workflow.WorkflowID,
WorkflowOwner: workflow.Owner,
DonID: workflow.DonID,
Status: workflow.Status,
WorkflowName: workflow.WorkflowName,
BinaryURL: workflow.BinaryURL,
ConfigURL: workflow.ConfigURL,
SecretsURL: workflow.SecretsURL,
}
if err = l.handler.Handle(ctx, workflowAsEvent{
Data: workflow,
Data: toRegisteredEvent,
EventType: WorkflowRegisteredEvent,
}); err != nil {
l.lggr.Errorf("failed to handle workflow registration: %s", err)
Expand Down

0 comments on commit 69e75a2

Please sign in to comment.