Skip to content

Commit

Permalink
Change hashing scheme to match TS SDK v3 (#23)
Browse files Browse the repository at this point in the history
* Change hashing scheme to match TS SDK v3

This updates the hashing scheme for TS SDK v3 compatibility (v1 of
our internal API).

* Suffix on > 1 iteration

* Update hashing scheme and set IDs everywhere

* Add sleep opts
  • Loading branch information
tonyhb authored Nov 2, 2023
1 parent d187c87 commit 88c1f5b
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 59 deletions.
72 changes: 72 additions & 0 deletions examples/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package main

import (
"context"
"fmt"
"net/http"
"time"

"github.com/inngest/inngestgo"
"github.com/inngest/inngestgo/step"
)

func main() {
h := inngestgo.NewHandler("core", inngestgo.HandlerOpts{})
f := inngestgo.CreateFunction(
inngestgo.FunctionOpts{
ID: "account-created",
},
// Run on every api/account.created event.
inngestgo.EventTrigger("api/account.created", nil),
UserCreated,
)
h.Register(f)
http.ListenAndServe(":8080", h)
}

func UserCreated(ctx context.Context, input inngestgo.Input[any]) (any, error) {
// Sleep for a second
step.Sleep(ctx, "initial-delay", time.Second)

// Run a step which emails the user.
step.Run(ctx, "on-user-created", func(ctx context.Context) (string, error) {
return "", nil
})

fn, err := step.WaitForEvent[FunctionCreatedEvent](ctx, "wait-for-activity", step.WaitForEventOpts{
Name: "Wait for a function to be created",
Event: "api/function.created",
If: inngestgo.StrPtr("async.data.user_id == event.data.user_id"),
Timeout: time.Hour * 72,
})
if err == step.ErrEventNotReceived {
// A function wasn't created within 3 days.
return nil, nil
}

// The function event is fully typed :)
fmt.Println(fn.Data.FunctionID)

return nil, nil
}

// AccountCreatedEvent represents the fully defined event received when an account is created.
//
// This is shorthand for defining a new Inngest-conforming struct:
//
// type AccountCreatedEvent struct {
// Name string `json:"name"`
// Data AccountCreatedEventData `json:"data"`
// User any `json:"user"`
// Timestamp int64 `json:"ts,omitempty"`
// Version string `json:"v,omitempty"`
// }
type AccountCreatedEvent inngestgo.GenericEvent[AccountCreatedEventData, any]
type AccountCreatedEventData struct {
AccountID string
}

type FunctionCreatedEvent inngestgo.GenericEvent[FunctionCreatedEventData, any]
type FunctionCreatedEventData struct {
FunctionID string
}
18 changes: 10 additions & 8 deletions funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
)

type FunctionOpts struct {
Name string
// ID is an optional function ID. If not specified, the ID
// will be auto-generated by lowercasing and slugging the name.
ID *string
ID string
// Name represents a human-readable function name.
Name string

Priority *inngest.Priority
Concurrency []inngest.Concurrency
Idempotency *string
Expand All @@ -22,8 +24,7 @@ type FunctionOpts struct {
Debounce *Debounce

// RateLimit allows the function to be rate limited.
RateLimit *RateLimit

RateLimit *RateLimit
BatchEvents *inngest.EventBatchConfig
}

Expand Down Expand Up @@ -116,10 +117,11 @@ func CreateFunction[T any](
return sf
}

func EventTrigger(name string) inngest.Trigger {
func EventTrigger(name string, expression *string) inngest.Trigger {
return inngest.Trigger{
EventTrigger: &inngest.EventTrigger{
Event: name,
Event: name,
Expression: expression,
},
}
}
Expand Down Expand Up @@ -197,10 +199,10 @@ func (s servableFunc) Config() FunctionOpts {
}

func (s servableFunc) Slug() string {
if s.fc.ID == nil {
if s.fc.ID == "" {
return slug.Make(s.fc.Name)
}
return *s.fc.ID
return s.fc.ID
}

func (s servableFunc) Name() string {
Expand Down
22 changes: 11 additions & 11 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,21 @@ func TestRegister(t *testing.T) {
FunctionOpts{
Name: "my func name",
},
EventTrigger("test/event.a"),
EventTrigger("test/event.a", nil),
func(ctx context.Context, input Input[EventA]) (any, error) {
return nil, nil
},
)
b := CreateFunction(
FunctionOpts{Name: "another func"},
EventTrigger("test/event.b"),
EventTrigger("test/event.b", nil),
func(ctx context.Context, input Input[EventB]) (any, error) {
return nil, nil
},
)
c := CreateFunction(
FunctionOpts{Name: "batch func", BatchEvents: &inngest.EventBatchConfig{MaxSize: 20, Timeout: "10s"}},
EventTrigger("test/batch.a"),
EventTrigger("test/batch.a", nil),
func(ctx context.Context, input Input[EventC]) (any, error) {
return nil, nil
},
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestInvoke(t *testing.T) {
}
a := CreateFunction(
FunctionOpts{Name: "my func name"},
EventTrigger("test/event.a"),
EventTrigger("test/event.a", nil),
func(ctx context.Context, event Input[EventA]) (any, error) {
require.EqualValues(t, event.Event, input)
return resp, nil
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestInvoke(t *testing.T) {
}
a := CreateFunction(
FunctionOpts{Name: "my func name", BatchEvents: &inngest.EventBatchConfig{MaxSize: 5, Timeout: "10s"}},
EventTrigger("test/event.a"),
EventTrigger("test/event.a", nil),
func(ctx context.Context, event Input[EventA]) (any, error) {
require.EqualValues(t, event.Event, input)
require.EqualValues(t, len(event.Events), 5)
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestInvoke(t *testing.T) {
}
a := CreateFunction(
FunctionOpts{Name: "my func name"},
EventTrigger("test/event.a"),
EventTrigger("test/event.a", nil),
func(ctx context.Context, event Input[*EventA]) (any, error) {
require.NotNil(t, event.Event)
require.EqualValues(t, *event.Event, input)
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestInvoke(t *testing.T) {
}
a := CreateFunction(
FunctionOpts{Name: "my func name"},
EventTrigger("test/event.a"),
EventTrigger("test/event.a", nil),
func(ctx context.Context, event Input[any]) (any, error) {
require.NotNil(t, event.Event)
val, ok := event.Event.(map[string]any)
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestInvoke(t *testing.T) {
}
a := CreateFunction(
FunctionOpts{Name: "my func name"},
EventTrigger("test/event.a"),
EventTrigger("test/event.a", nil),
func(ctx context.Context, event Input[map[string]any]) (any, error) {
require.NotNil(t, event.Event)
val := event.Event
Expand Down Expand Up @@ -256,7 +256,7 @@ func TestInvoke(t *testing.T) {
// before deploying to Inngest.
CreateFunction(
FunctionOpts{Name: "my func name"},
EventTrigger("test/event.a"),
EventTrigger("test/event.a", nil),
func(ctx context.Context, event Input[io.Reader]) (any, error) {
return nil, nil
},
Expand All @@ -282,7 +282,7 @@ func TestServe(t *testing.T) {
var called int32
a := CreateFunction(
FunctionOpts{Name: "My servable function!"},
EventTrigger("test/event.a"),
EventTrigger("test/event.a", nil),
func(ctx context.Context, input Input[EventA]) (any, error) {
atomic.AddInt32(&called, 1)
require.EqualValues(t, event, input.Event)
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestSteps(t *testing.T) {

a := CreateFunction(
FunctionOpts{Name: "step function"},
EventTrigger("test/event.a"),
EventTrigger("test/event.a", nil),
func(ctx context.Context, input Input[EventA]) (any, error) {
atomic.AddInt32(&fnCt, 1)
stepA := step.Run(ctx, "First step", func(ctx context.Context) (map[string]any, error) {
Expand Down
27 changes: 11 additions & 16 deletions internal/sdkrequest/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"sync"

"github.com/gowebpki/jcs"
"github.com/inngest/inngest/pkg/enums"
"github.com/inngest/inngest/pkg/execution/state"
)
Expand Down Expand Up @@ -36,7 +35,7 @@ type InvocationManager interface {
Step(op UnhashedOp) (json.RawMessage, bool)
// NewOp generates a new unhashed op for creating a state.GeneratorOpcode. This
// is required for future execution of a step.
NewOp(op enums.Opcode, name string, opts map[string]any) UnhashedOp
NewOp(op enums.Opcode, id string, opts map[string]any) UnhashedOp
}

// NewManager returns an InvocationManager to manage the incoming executor request. This
Expand Down Expand Up @@ -117,47 +116,43 @@ func (r *requestCtxManager) Step(op UnhashedOp) (json.RawMessage, bool) {
return val, ok
}

func (r *requestCtxManager) NewOp(op enums.Opcode, name string, opts map[string]any) UnhashedOp {
func (r *requestCtxManager) NewOp(op enums.Opcode, id string, opts map[string]any) UnhashedOp {
r.l.Lock()
defer r.l.Unlock()

key := fmt.Sprintf("%s-%s", op, name)
n, ok := r.indexes[key]
n, ok := r.indexes[id]
if ok {
// We have an index already, so increase the counter as we're
// adding to this key.
n += 1
}

// Update indexes for each particualar key.
r.indexes[key] = n
r.indexes[id] = n

return UnhashedOp{
Name: name,
ID: id,
Op: op,
Opts: opts,
Pos: uint(n),
}
}

type UnhashedOp struct {
Name string `json:"name"`
ID string `json:"id"`
Op enums.Opcode `json:"op"`
Opts map[string]any `json:"opts"`
Pos uint `json:"pos"`
Parent *string `json:"parent,omitempty"`
}

func (u UnhashedOp) Hash() (string, error) {
j, err := json.Marshal(u)
if err != nil {
return "", err
}
byt, err := jcs.Transform(j)
if err != nil {
return "", err
input := u.ID
if u.Pos > 0 {
// We only suffix the counter if there's > 1 operation with the same ID.
input = fmt.Sprintf("%s:%d", u.ID, u.Pos)
}
sum := sha1.Sum(byt)
sum := sha1.Sum([]byte(input))
return hex.EncodeToString(sum[:]), nil
}

Expand Down
17 changes: 12 additions & 5 deletions step/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,32 @@ import (
"github.com/inngest/inngest/pkg/execution/state"
)

type RunOpts struct {
// ID represents the optional step name.
ID string
// Name represents the optional step name.
Name string
}

// StepRun runs any code reliably, with retries, returning the resulting data. If this
// fails the function stops.
//
// TODO: Allow users to catch single step errors.
func Run[T any](
ctx context.Context,
name string,
id string,
f func(ctx context.Context) (T, error),
) T {
mgr := preflight(ctx)
op := mgr.NewOp(enums.OpcodeStep, name, nil)
op := mgr.NewOp(enums.OpcodeStep, id, nil)

if val, ok := mgr.Step(op); ok {
// This step has already ran as we have state for it.
// Unmarshal the JSON into type T
ft := reflect.TypeOf(f)
v := reflect.New(ft.Out(0)).Interface()
if err := json.Unmarshal(val, v); err != nil {
mgr.SetErr(fmt.Errorf("error unmarshalling state for step '%s': %w", name, err))
mgr.SetErr(fmt.Errorf("error unmarshalling state for step '%s': %w", id, err))
panic(ControlHijack{})
}
val, _ := reflect.ValueOf(v).Elem().Interface().(T)
Expand All @@ -47,13 +54,13 @@ func Run[T any](

byt, err := json.Marshal(result)
if err != nil {
mgr.SetErr(fmt.Errorf("unable to marshal run respone for '%s': %w", name, err))
mgr.SetErr(fmt.Errorf("unable to marshal run respone for '%s': %w", id, err))
}

mgr.AppendOp(state.GeneratorOpcode{
ID: op.MustHash(),
Op: enums.OpcodeStep,
Name: name,
Name: id,
Data: byt,
})
panic(ControlHijack{})
Expand Down
Loading

0 comments on commit 88c1f5b

Please sign in to comment.