Skip to content

Commit

Permalink
fetch factory
Browse files Browse the repository at this point in the history
  • Loading branch information
ettec committed Jan 3, 2025
1 parent 03a9444 commit 6cdc1f4
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 28 deletions.
65 changes: 41 additions & 24 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ var (

var _ capabilities.ActionCapability = (*Compute)(nil)

type fetcherFactory interface {
CreateFetcher(log logger.Logger, emitter custmsg.MessageEmitter, metrics *computeMetricsLabeler) func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error)
}

type Compute struct {
stopCh services.StopChan
log logger.Logger
Expand All @@ -88,9 +92,9 @@ type Compute struct {

// transformer is used to transform a values.Map into a ParsedConfig struct on each execution
// of a request.
transformer *transformer
outgoingConnectorHandler *webapi.OutgoingConnectorHandler
idGenerator func() string
transformer *transformer

fetcherFactory fetcherFactory

numWorkers int
queue chan request
Expand Down Expand Up @@ -185,7 +189,7 @@ func (c *Compute) execute(ctx context.Context, respCh chan response, req capabil
func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, requestMetadata capabilities.RequestMetadata) (*module, error) {
initStart := time.Now()

cfg.Fetch = c.createFetcher()
cfg.Fetch = c.fetcherFactory.CreateFetcher(c.log, c.emitter, c.metrics)
mod, err := host.NewModule(cfg, binary)
if err != nil {
return nil, fmt.Errorf("failed to instantiate WASM module: %w", err)
Expand Down Expand Up @@ -289,7 +293,22 @@ func (c *Compute) Close() error {
return nil
}

func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
type outgoingConnectorfetcherFactory struct {
outgoingConnectorHandler *webapi.OutgoingConnectorHandler
idGenerator func() string
}

func NewOutgoingConnectorFetcherFactory(
outgoingConnectorHandler *webapi.OutgoingConnectorHandler,
idGenerator func() string,
) *outgoingConnectorfetcherFactory {
return &outgoingConnectorfetcherFactory{
outgoingConnectorHandler: outgoingConnectorHandler,
idGenerator: idGenerator,
}
}

func (f *outgoingConnectorfetcherFactory) CreateFetcher(log logger.Logger, emitter custmsg.MessageEmitter, metrics *computeMetricsLabeler) func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
return func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowId); err != nil {
return nil, fmt.Errorf("workflow ID %q is invalid: %w", req.Metadata.WorkflowId, err)
Expand All @@ -298,7 +317,7 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq
return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", req.Metadata.WorkflowExecutionId, err)
}

cma := c.emitter.With(
cma := emitter.With(
platform.KeyWorkflowID, req.Metadata.WorkflowId,
platform.KeyWorkflowName, req.Metadata.WorkflowName,
platform.KeyWorkflowOwner, req.Metadata.WorkflowOwner,
Expand All @@ -309,7 +328,7 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq
messageID := strings.Join([]string{
req.Metadata.WorkflowExecutionId,
ghcapabilities.MethodComputeAction,
c.idGenerator(),
f.idGenerator(),
}, "/")

fields := req.Headers.GetFields()
Expand All @@ -318,7 +337,7 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq
headersReq[k] = v.String()
}

resp, err := c.outgoingConnectorHandler.HandleSingleNodeRequest(ctx, messageID, ghcapabilities.Request{
resp, err := f.outgoingConnectorHandler.HandleSingleNodeRequest(ctx, messageID, ghcapabilities.Request{
URL: req.Url,
Method: req.Method,
Headers: headersReq,
Expand All @@ -329,14 +348,14 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq
return nil, err
}

c.log.Debugw("received gateway response", "resp", resp)
log.Debugw("received gateway response", "resp", resp)
var response wasmpb.FetchResponse
err = json.Unmarshal(resp.Body.Payload, &response)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal fetch response: %w", err)
}

c.metrics.with(
metrics.with(
"status", strconv.FormatUint(uint64(response.StatusCode), 10),
platform.KeyWorkflowID, req.Metadata.WorkflowId,
platform.KeyWorkflowName, req.Metadata.WorkflowName,
Expand All @@ -348,7 +367,7 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq
msg := fmt.Sprintf("compute fetch request failed with status code %d", response.StatusCode)
err = cma.Emit(ctx, msg)
if err != nil {
c.log.Errorf("failed to send custom message with msg: %s, err: %v", msg, err)
log.Errorf("failed to send custom message with msg: %s, err: %v", msg, err)
}
}

Expand All @@ -369,8 +388,7 @@ func NewAction(
config Config,
log logger.Logger,
registry coretypes.CapabilitiesRegistry,
handler *webapi.OutgoingConnectorHandler,
idGenerator func() string,
fetcherFactory fetcherFactory,
opts ...func(*Compute),
) (*Compute, error) {
if config.NumWorkers == 0 {
Expand All @@ -384,17 +402,16 @@ func NewAction(
lggr = logger.Named(log, "CustomCompute")
labeler = custmsg.NewLabeler()
compute = &Compute{
stopCh: make(services.StopChan),
log: lggr,
emitter: labeler,
metrics: metricsLabeler,
registry: registry,
modules: newModuleCache(clockwork.NewRealClock(), 1*time.Minute, 10*time.Minute, 3),
transformer: NewTransformer(lggr, labeler),
outgoingConnectorHandler: handler,
idGenerator: idGenerator,
queue: make(chan request),
numWorkers: defaultNumWorkers,
stopCh: make(services.StopChan),
log: lggr,
emitter: labeler,
metrics: metricsLabeler,
registry: registry,
modules: newModuleCache(clockwork.NewRealClock(), 1*time.Minute, 10*time.Minute, 3),
transformer: NewTransformer(lggr, labeler),
fetcherFactory: fetcherFactory,
queue: make(chan request),
numWorkers: defaultNumWorkers,
}
)

Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/compute/compute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func setup(t *testing.T, config Config) testHarness {
connectorHandler, err := webapi.NewOutgoingConnectorHandler(connector, config.ServiceConfig, ghcapabilities.MethodComputeAction, log)
require.NoError(t, err)

compute, err := NewAction(config, log, registry, connectorHandler, idGeneratorFn)
compute, err := NewAction(config, log, registry, NewOutgoingConnectorFetcherFactory(connectorHandler, idGeneratorFn))
require.NoError(t, err)
compute.modules.clock = clockwork.NewFakeClock()

Expand Down
2 changes: 1 addition & 1 deletion core/services/standardcapabilities/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
return uuid.New().String()
}

computeSrvc, err := compute.NewAction(cfg, log, d.registry, handler, idGeneratorFn)
computeSrvc, err := compute.NewAction(cfg, log, d.registry, compute.NewOutgoingConnectorFetcherFactory(handler, idGeneratorFn))
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1449,7 +1449,7 @@ func TestEngine_WithCustomComputeStep(t *testing.T) {
require.NoError(t, err)

idGeneratorFn := func() string { return "validRequestID" }
compute, err := compute.NewAction(cfg, log, reg, handler, idGeneratorFn)
compute, err := compute.NewAction(cfg, log, reg, compute.NewOutgoingConnectorFetcherFactory(handler, idGeneratorFn))
require.NoError(t, err)
require.NoError(t, compute.Start(ctx))
defer compute.Close()
Expand Down Expand Up @@ -1515,7 +1515,7 @@ func TestEngine_CustomComputePropagatesBreaks(t *testing.T) {
require.NoError(t, err)

idGeneratorFn := func() string { return "validRequestID" }
compute, err := compute.NewAction(cfg, log, reg, handler, idGeneratorFn)
compute, err := compute.NewAction(cfg, log, reg, compute.NewOutgoingConnectorFetcherFactory(handler, idGeneratorFn))
require.NoError(t, err)
require.NoError(t, compute.Start(ctx))
defer compute.Close()
Expand Down

0 comments on commit 6cdc1f4

Please sign in to comment.