Skip to content

Commit

Permalink
feat(pipes-targets): add SageMaker (#30696)
Browse files Browse the repository at this point in the history
Add SageMaker pipeline as a Pipes target.
  • Loading branch information
msambol authored Oct 30, 2024
1 parent 048e753 commit a5fdf57
Show file tree
Hide file tree
Showing 20 changed files with 34,692 additions and 5 deletions.
34 changes: 34 additions & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Pipe targets are the end point of an EventBridge Pipe. The following targets are
* `targets.EventBridgeTarget`: [Send event source to an EventBridge event bus](#amazon-eventbridge-event-bus)
* `targets.KinesisTarget`: [Send event source to a Kinesis data stream](#amazon-kinesis-data-stream)
* `targets.LambdaFunction`: [Send event source to a Lambda function](#aws-lambda-function)
* `targets.SageMakerTarget`: [Send event source to a SageMaker pipeline](#amazon-sagemaker-pipeline)
* `targets.SfnStateMachine`: [Invoke a Step Functions state machine from an event source](#aws-step-functions-state-machine)
* `targets.SqsTarget`: [Send event source to an SQS queue](#amazon-sqs)

Expand Down Expand Up @@ -217,6 +218,39 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
});
```

### Amazon SageMaker Pipeline

A SageMaker pipeline can be used as a target for a pipe.
The pipeline will receive the (enriched/filtered) source payload.

```ts
declare const sourceQueue: sqs.Queue;
declare const targetPipeline: sagemaker.IPipeline;

const pipelineTarget = new targets.SageMakerTarget(targetPipeline);

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SqsSource(sourceQueue),
target: pipelineTarget,
});
```

The input to the target pipeline can be transformed:

```ts
declare const sourceQueue: sqs.Queue;
declare const targetPipeline: sagemaker.IPipeline;

const pipelineTarget = new targets.SageMakerTarget(targetPipeline, {
inputTransformation: pipes.InputTransformation.fromObject({ body: "👀" }),
});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SqsSource(sourceQueue),
target: pipelineTarget,
});
```

### AWS Step Functions State Machine

A Step Functions state machine can be used as a target for a pipe.
Expand Down
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ export * from './cloudwatch-logs';
export * from './event-bridge';
export * from './kinesis';
export * from './lambda';
export * from './sagemaker';
export * from './sqs';
export * from './stepfunctions';
2 changes: 1 addition & 1 deletion packages/@aws-cdk/aws-pipes-targets-alpha/lib/lambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface LambdaFunctionParameters {
* The input transformation to apply to the message before sending it to the target.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
* @default none
* @default - none
*/
readonly inputTransformation?: IInputTransformation;

Expand Down
66 changes: 66 additions & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/sagemaker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { IInputTransformation, IPipe, ITarget, TargetConfig } from '@aws-cdk/aws-pipes-alpha';
import { IRole } from 'aws-cdk-lib/aws-iam';
import { IPipeline } from 'aws-cdk-lib/aws-sagemaker';

/**
* SageMaker target properties.
*/
export interface SageMakerTargetParameters {
/**
* The input transformation to apply to the message before sending it to the target.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
* @default - none
*/
readonly inputTransformation?: IInputTransformation;

/**
* List of parameter names and values for SageMaker Model Building Pipeline execution.
*
* The Name/Value pairs are passed to start execution of the pipeline.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetsagemakerpipelineparameters.html#cfn-pipes-pipe-pipetargetsagemakerpipelineparameters-pipelineparameterlist
* @default - none
*/
readonly pipelineParameters?: Record<string, string>;
}

/**
* An EventBridge Pipes target that sends messages to a SageMaker pipeline.
*/
export class SageMakerTarget implements ITarget {
private pipeline: IPipeline;
private sageMakerParameters?: SageMakerTargetParameters;
private pipelineParameters?: Record<string, string>;
public readonly targetArn: string;

constructor(pipeline: IPipeline, parameters?: SageMakerTargetParameters) {
this.pipeline = pipeline;
this.targetArn = pipeline.pipelineArn;
this.sageMakerParameters = parameters;
this.pipelineParameters = this.sageMakerParameters?.pipelineParameters;
}

grantPush(grantee: IRole): void {
this.pipeline.grantStartPipelineExecution(grantee);
}

bind(pipe: IPipe): TargetConfig {
if (!this.sageMakerParameters) {
return { targetParameters: {} };
}

return {
targetParameters: {
inputTemplate: this.sageMakerParameters.inputTransformation?.bind(pipe).inputTemplate,
sageMakerPipelineParameters: {
pipelineParameterList: this.pipelineParameters ?
Object.entries(this.pipelineParameters).map(([key, value]) => ({
name: key,
value: value,
})) : undefined,
},
},
};
}
}
6 changes: 3 additions & 3 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export interface SqsTargetParameters {
* The input transformation to apply to the message before sending it to the target.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
* @default none
* @default - none
*/
readonly inputTransformation?: IInputTransformation;

Expand All @@ -20,15 +20,15 @@ export interface SqsTargetParameters {
* The token used for deduplication of sent messages.
*
* @see http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetsqsqueueparameters.html#cfn-pipes-pipe-pipetargetsqsqueueparameters-messagededuplicationid
* @default none
* @default - none
*/
readonly messageDeduplicationId?: string;

/**
* The FIFO message group ID to use as the target.
*
* @see http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetsqsqueueparameters.html#cfn-pipes-pipe-pipetargetsqsqueueparameters-messagegroupid
* @default none
* @default - none
*/
readonly messageGroupId?: string;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface SfnStateMachineParameters {
* The input transformation to apply to the message before sending it to the target.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
* @default none
* @default - none
*/
readonly inputTransformation?: IInputTransformation;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as cdk from 'aws-cdk-lib';
import * as events from 'aws-cdk-lib/aws-events';
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
import * as logs from 'aws-cdk-lib/aws-logs';
import * as sagemaker from 'aws-cdk-lib/aws-sagemaker';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as lambda from 'aws-cdk-lib/aws-lambda';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`SageMaker should grant pipe role push access 1`] = `
{
"MyPipeRoleCBC8E9AB": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
}
`;

exports[`SageMaker should grant pipe role push access 2`] = `
{
"MyPipeRoleDefaultPolicy31387C20": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "sagemaker:StartPipelineExecution",
"Effect": "Allow",
"Resource": "MyPipeline",
},
],
"Version": "2012-10-17",
},
"PolicyName": "MyPipeRoleDefaultPolicy31387C20",
"Roles": [
{
"Ref": "MyPipeRoleCBC8E9AB",
},
],
},
"Type": "AWS::IAM::Policy",
},
}
`;

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a5fdf57

Please sign in to comment.