Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lambda.Option lambda.WithSetup #490

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions lambda/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type handlerOptions struct {
jsonResponseIndentValue string
enableSIGTERM bool
sigtermCallbacks []func()
setupFuncs []func() error
}

type Option func(*handlerOptions)
Expand Down Expand Up @@ -102,6 +103,15 @@ func WithEnableSIGTERM(callbacks ...func()) Option {
})
}

// WithSetup enables capturing of errors or panics that occur before the function is ready to handle invokes.
// The provided functions will be run a single time, in order, before the runtime reports itself ready to recieve invokes.
// If any of the provided functions returns an error, or panics, the error will be serialized and reported to the Runtime API.
func WithSetup(funcs ...func() error) Option {
return Option(func(h *handlerOptions) {
h.setupFuncs = append(h.setupFuncs, funcs...)
})
}

// handlerTakesContext returns whether the handler takes a context.Context as its first argument.
func handlerTakesContext(handler reflect.Type) (bool, error) {
switch handler.NumIn() {
Expand Down
31 changes: 31 additions & 0 deletions lambda/invoke_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func unixMS(ms int64) time.Time {
func startRuntimeAPILoop(api string, handler Handler) error {
client := newRuntimeAPIClient(api)
h := newHandler(handler)

if err := handleSetup(client, h); err != nil {
return err
}
for {
invoke, err := client.next()
if err != nil {
Expand All @@ -42,6 +46,21 @@ func startRuntimeAPILoop(api string, handler Handler) error {
}
}

// handleSetup returns an error if any of the handler's optional setup functions return and error or panic
func handleSetup(client *runtimeAPIClient, handler *handlerOptions) error {
for _, setup := range handler.setupFuncs {
if setupErr := callSetupFunc(setup); setupErr != nil {
errorPayload := safeMarshal(setupErr)
log.Printf("%s", errorPayload)
if err := client.initError(bytes.NewReader(errorPayload), contentTypeJSON); err != nil {
return fmt.Errorf("unexpected error occurred when sending the setup error to the API: %v", err)
}
return fmt.Errorf("setting up the handler function resulted in an error, the process should exit")
}
}
return nil
}

// handleInvoke returns an error if the function panics, or some other non-recoverable error occurred
func handleInvoke(invoke *invoke, handler *handlerOptions) error {
// set the deadline
Expand Down Expand Up @@ -110,6 +129,18 @@ func reportFailure(invoke *invoke, invokeErr *messages.InvokeResponse_Error) err
return nil
}

func callSetupFunc(f func() error) (setupErr *messages.InvokeResponse_Error) {
defer func() {
if err := recover(); err != nil {
setupErr = lambdaPanicResponse(err)
}
}()
if err := f(); err != nil {
return lambdaErrorResponse(err)
}
return nil
}

func callBytesHandlerFunc(ctx context.Context, payload []byte, handler handlerFunc) (response io.Reader, invokeErr *messages.InvokeResponse_Error) {
defer func() {
if err := recover(); err != nil {
Expand Down
10 changes: 9 additions & 1 deletion lambda/rpc_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,19 @@ func init() {
}

func startFunctionRPC(port string, handler Handler) error {
rpcFunction := NewFunction(handler)
if len(rpcFunction.handler.setupFuncs) > 0 {
runtimeAPIClient := newRuntimeAPIClient(os.Getenv("AWS_LAMBDA_RUNTIME_API"))
if err := handleSetup(runtimeAPIClient, rpcFunction.handler); err != nil {
return err
}
}

lis, err := net.Listen("tcp", "localhost:"+port)
if err != nil {
log.Fatal(err)
}
err = rpc.Register(NewFunction(handler))
err = rpc.Register(rpcFunction)
if err != nil {
log.Fatal("failed to register handler function")
}
Expand Down
15 changes: 11 additions & 4 deletions lambda/runtime_api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,18 @@ func newRuntimeAPIClient(address string) *runtimeAPIClient {
client := &http.Client{
Timeout: 0, // connections to the runtime API are never expected to time out
}
endpoint := "http://" + address + "/" + apiVersion + "/runtime/invocation/"
endpoint := "http://" + address + "/" + apiVersion + "/runtime"
userAgent := "aws-lambda-go/" + runtime.Version()
return &runtimeAPIClient{endpoint, userAgent, client, bytes.NewBuffer(nil)}
}

// initError connects to the Runtime API and reports that a failure occured during initialization.
// Note: After calling this function, the caller should call os.Exit()
func (c *runtimeAPIClient) initError(body io.Reader, contentType string) error {
url := c.baseURL + "/init/error"
return c.post(url, body, contentType)
}

type invoke struct {
id string
payload []byte
Expand All @@ -53,7 +60,7 @@ type invoke struct {
// Notes:
// - An invoke is not complete until next() is called again!
func (i *invoke) success(body io.Reader, contentType string) error {
url := i.client.baseURL + i.id + "/response"
url := i.client.baseURL + "/invocation/" + i.id + "/response"
return i.client.post(url, body, contentType)
}

Expand All @@ -63,14 +70,14 @@ func (i *invoke) success(body io.Reader, contentType string) error {
// - A Lambda Function continues to be re-used for future invokes even after a failure.
// If the error is fatal (panic, unrecoverable state), exit the process immediately after calling failure()
func (i *invoke) failure(body io.Reader, contentType string) error {
url := i.client.baseURL + i.id + "/error"
url := i.client.baseURL + "/invocation/" + i.id + "/error"
return i.client.post(url, body, contentType)
}

// next connects to the Runtime API and waits for a new invoke Request to be available.
// Note: After a call to Done() or Error() has been made, a call to next() will complete the in-flight invoke.
func (c *runtimeAPIClient) next() (*invoke, error) {
url := c.baseURL + "next"
url := c.baseURL + "/invocation/next"
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("failed to construct GET request to %s: %v", url, err)
Expand Down