Skip to content

Commit

Permalink
finish more migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
kaizencc committed Dec 6, 2023
1 parent 27ba42e commit a932eff
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import * as sqs from '../../aws-sqs';
* Use an SQS queue as a hook target
*/
export class QueueHook implements autoscaling.ILifecycleHookTarget {
constructor(private readonly queue: sqs.IQueue) {
private readonly _queue: sqs.IQueue;
constructor(queue: sqs.ICfnQueue) {
this._queue = sqs.Queue.fromCfnQueue(queue);
}

/**
Expand All @@ -18,10 +20,10 @@ export class QueueHook implements autoscaling.ILifecycleHookTarget {
*/
public bind(_scope: Construct, options: autoscaling.BindHookTargetOptions): autoscaling.LifecycleHookTargetConfig {
const role = createRole(_scope, options.role);
this.queue.grantSendMessages(role);
this._queue.grantSendMessages(role);

return {
notificationTargetArn: this.queue.queueArn,
notificationTargetArn: this._queue.attrArn,
createdRole: role,
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
AwsLogDriver, BaseService, CapacityProviderStrategy, Cluster, ContainerImage, DeploymentController, DeploymentCircuitBreaker,
ICluster, LogDriver, PropagatedTagSource, Secret,
} from '../../../aws-ecs';
import { IQueue, Queue } from '../../../aws-sqs';
import * as sqs from '../../../aws-sqs';
import { CfnOutput, Duration, FeatureFlags, Stack } from '../../../core';
import * as cxapi from '../../../cx-api';

Expand Down Expand Up @@ -92,7 +92,7 @@ export interface QueueProcessingServiceBaseProps {
*
* @default - SQS Queue with CloudFormation-generated name
*/
readonly queue?: IQueue;
readonly queue?: sqs.ICfnQueue;

/**
* The maximum number of times that a message can be received by consumers.
Expand Down Expand Up @@ -230,12 +230,12 @@ export abstract class QueueProcessingServiceBase extends Construct {
/**
* The SQS queue that the service will process from
*/
public readonly sqsQueue: IQueue;
public readonly sqsQueue: sqs.IQueue;

/**
* The dead letter queue for the primary SQS queue
*/
public readonly deadLetterQueue?: IQueue;
public readonly deadLetterQueue?: sqs.IQueue;

/**
* The cluster where your service will be deployed
Expand Down Expand Up @@ -297,12 +297,12 @@ export abstract class QueueProcessingServiceBase extends Construct {
}
// Create the SQS queue and it's corresponding DLQ if one is not provided
if (props.queue) {
this.sqsQueue = props.queue;
this.sqsQueue = sqs.Queue.fromCfnQueue(props.queue);
} else {
this.deadLetterQueue = new Queue(this, 'EcsProcessingDeadLetterQueue', {
this.deadLetterQueue = new sqs.Queue(this, 'EcsProcessingDeadLetterQueue', {
retentionPeriod: props.retentionPeriod || Duration.days(14),
});
this.sqsQueue = new Queue(this, 'EcsProcessingQueue', {
this.sqsQueue = new sqs.Queue(this, 'EcsProcessingQueue', {
visibilityTimeout: props.visibilityTimeout,
deadLetterQueue: {
queue: this.deadLetterQueue,
Expand Down
8 changes: 5 additions & 3 deletions packages/aws-cdk-lib/aws-lambda-destinations/lib/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@ import * as sqs from '../../aws-sqs';
* Use a SQS queue as a Lambda destination
*/
export class SqsDestination implements lambda.IDestination {
constructor(private readonly queue: sqs.IQueue) {
private readonly _queue: sqs.IQueue;
constructor(queue: sqs.ICfnQueue) {
this._queue = sqs.Queue.fromCfnQueue(queue);
}

/**
* Returns a destination configuration
*/
public bind(_scope: Construct, fn: lambda.IFunction, _options?: lambda.DestinationOptions): lambda.DestinationConfig {
// deduplicated automatically
this.queue.grantSendMessages(fn);
this._queue.grantSendMessages(fn);

return {
destination: this.queue.queueArn,
destination: this._queue.attrArn,
};
}
}
8 changes: 5 additions & 3 deletions packages/aws-cdk-lib/aws-lambda-event-sources/lib/sqs-dlq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ import * as sqs from '../../aws-sqs';
* An SQS dead letter queue destination configuration for a Lambda event source
*/
export class SqsDlq implements IEventSourceDlq {
constructor(private readonly queue: sqs.IQueue) {
private readonly _queue: sqs.IQueue;
constructor(queue: sqs.ICfnQueue) {
this._queue = sqs.Queue.fromCfnQueue(queue);
}

/**
* Returns a destination configuration for the DLQ
*/
public bind(_target: IEventSourceMapping, targetHandler: IFunction): DlqDestinationConfig {
this.queue.grantSendMessages(targetHandler);
this._queue.grantSendMessages(targetHandler);

return {
destination: this.queue.queueArn,
destination: this._queue.attrArn,
};
}
}
12 changes: 7 additions & 5 deletions packages/aws-cdk-lib/aws-lambda/lib/function.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ export interface FunctionOptions extends EventInvokeConfigOptions {
*
* @default - SQS queue with 14 day retention period if `deadLetterQueueEnabled` is `true`
*/
readonly deadLetterQueue?: sqs.IQueue;
readonly deadLetterQueue?: sqs.ICfnQueue;

/**
* The SNS topic to use as a DLQ.
Expand Down Expand Up @@ -1490,12 +1490,14 @@ Environment variables can be marked for removal when used in Lambda@Edge by sett
resources: [deadLetterQueue.topicArn],
}));
} else {
deadLetterQueue = props.deadLetterQueue || new sqs.Queue(this, 'DeadLetterQueue', {
retentionPeriod: Duration.days(14),
});
deadLetterQueue = props.deadLetterQueue ?
sqs.Queue.fromCfnQueue(props.deadLetterQueue) :
new sqs.Queue(this, 'DeadLetterQueue', {
retentionPeriod: Duration.days(14),
});
this.addToRolePolicy(new iam.PolicyStatement({
actions: ['sqs:SendMessage'],
resources: [deadLetterQueue.queueArn],
resources: [deadLetterQueue.attrArn],
}));
}

Expand Down
5 changes: 4 additions & 1 deletion packages/aws-cdk-lib/aws-s3-notifications/lib/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import { Annotations } from '../../core';
* Use an SQS queue as a bucket notification destination
*/
export class SqsDestination implements s3.IBucketNotificationDestination {
constructor(private readonly queue: sqs.IQueue) {
private readonly _queue: sqs.IQueue;

constructor(queue: sqs.ICfnQueue) {
this._queue = sqs.Queue.fromCfnQueue(queue);
}

/**
Expand Down
8 changes: 4 additions & 4 deletions packages/aws-cdk-lib/aws-sqs/lib/queue-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export abstract class QueueBase extends Resource implements IQueue {
/**
* The ARN of this queue
*
* @deprecated use attrArn
* Deprecated: use attrArn
*/
public abstract readonly queueArn: string;

Expand All @@ -130,7 +130,7 @@ export abstract class QueueBase extends Resource implements IQueue {
/**
* The URL of this queue
*
* @deprecated use attrQueueUrl
* Deprecated: use attrQueueUrl
*/
public abstract readonly queueUrl: string;

Expand Down Expand Up @@ -357,5 +357,5 @@ export enum QueueEncryption {
* To learn more about SSE-SQS on Amazon SQS, please visit the
* [Amazon SQS documentation](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-server-side-encryption.html).
*/
SQS_MANAGED = 'SQS_MANAGED'
}
SQS_MANAGED = 'SQS_MANAGED',
}
83 changes: 81 additions & 2 deletions packages/aws-cdk-lib/aws-sqs/lib/queue.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Construct } from 'constructs';
import { IQueue, QueueAttributes, QueueBase, QueueEncryption } from './queue-base';
import { CfnQueue } from './sqs.generated';
import { ICfnQueue, CfnQueue } from './sqs.generated';
import { validateProps } from './validate-props';
import * as iam from '../../aws-iam';
import * as kms from '../../aws-kms';
import { Duration, RemovalPolicy, Stack, Token, ArnFormat, Annotations } from '../../core';
import { Duration, RemovalPolicy, Stack, Token, ArnFormat, Annotations, Tokenization } from '../../core';
import { CfnReference } from '../../core/lib/private/cfn-reference';

/**
* Properties for creating a new Queue
Expand Down Expand Up @@ -288,6 +289,84 @@ export class Queue extends QueueBase {
});
}

/**
* Create a mutable `IQueue` out of a `ICfnQueue`.
*/
public static fromCfnQueue(cfnQueue: ICfnQueue): IQueue {
// if cfnQueue is already an IQueue, just return itself
if ((<IQueue>cfnQueue).grant !== undefined) {
return <IQueue>cfnQueue;
}

// use a "weird" id that has a higher chance of being unique
const id = '@FromCfnQueue';

// if fromCfnQueue() was already called on this cfnQueue,
// return the same L2
const existing = cfnQueue.node.tryFindChild(id);
if (existing) {
return <IQueue>existing;
}

// if cfnQueue is not a CfnResource, and thus not a CfnQueue, we are in a scenario where
// cfnQueue is an ICfnQueue but NOT a CfnQueue, which shouldn't happen
if (!CfnQueue.isCfnResource(cfnQueue)) {
throw new Error('Encountered an "ICfnQueue" that is not an "IQueue" or "CfnQueue". If you have a legitimate reason for this, please open an issue at https://github.com/aws/aws-cdk/issues');
}
const _cfnQueue = cfnQueue as CfnQueue;

let encryptionKey: kms.IKey | undefined;
if (_cfnQueue.kmsMasterKeyId) {
if (Token.isUnresolved(_cfnQueue.kmsMasterKeyId)) {
const kmsIResolvable = Tokenization.reverse(_cfnQueue.kmsMasterKeyId);
if (kmsIResolvable instanceof CfnReference) {
const cfnElement = kmsIResolvable.target;
if (cfnElement instanceof kms.CfnKey) {
encryptionKey = kms.Key.fromCfnKey(cfnElement);
}
}
}
}

return new class extends QueueBase {
public readonly attrArn = _cfnQueue.attrArn;
public readonly queueArn = this.attrArn;
public readonly queueName = _cfnQueue.attrQueueName;
public readonly attrQueueUrl = _cfnQueue.attrQueueUrl;
public readonly queueUrl = this.attrQueueUrl;
public readonly fifo = this.determineFifo(_cfnQueue.fifoQueue === true);
public readonly autoCreatePolicy = false;

public readonly encryptionMasterKey = encryptionKey;
public readonly encryptionType = encryptionKey ? QueueEncryption.KMS : undefined;

constructor() {
super(_cfnQueue, id);

this.node.defaultChild = _cfnQueue;
}

/**
* Determine fifo flag based on queueName and fifo attribute
*/
private determineFifo(fifo: boolean): boolean {
if (Token.isUnresolved(this.queueArn)) {
return fifo || false;
} else {
if (typeof fifo !== 'undefined') {
if (fifo && !this.queueName.endsWith('.fifo')) {
throw new Error("FIFO queue names must end in '.fifo'");
}
if (!fifo && this.queueName.endsWith('.fifo')) {
throw new Error("Non-FIFO queue name may not end in '.fifo'");
}
}
return this.queueName.endsWith('.fifo') ? true : false;
}
}
}();
}

/**
* The ARN of this queue
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export interface SqsSendMessageProps extends sfn.TaskStateBaseProps {
/**
* The SQS queue that messages will be sent to
*/
readonly queue: sqs.IQueue;
readonly queue: sqs.ICfnQueue;

/**
* The text message to send to the queue.
Expand Down Expand Up @@ -54,7 +54,6 @@ export interface SqsSendMessageProps extends sfn.TaskStateBaseProps {
*
*/
export class SqsSendMessage extends sfn.TaskStateBase {

private static readonly SUPPORTED_INTEGRATION_PATTERNS: sfn.IntegrationPattern[] = [
sfn.IntegrationPattern.REQUEST_RESPONSE,
sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
Expand All @@ -64,6 +63,7 @@ export class SqsSendMessage extends sfn.TaskStateBase {
protected readonly taskPolicies?: iam.PolicyStatement[];

private readonly integrationPattern: sfn.IntegrationPattern;
private readonly queue: sqs.IQueue;

constructor(scope: Construct, id: string, private readonly props: SqsSendMessageProps) {
super(scope, id, props);
Expand All @@ -80,17 +80,18 @@ export class SqsSendMessage extends sfn.TaskStateBase {
this.taskPolicies = [
new iam.PolicyStatement({
actions: ['sqs:SendMessage'],
resources: [this.props.queue.queueArn],
resources: [this.props.queue.attrArn],
}),
];
this.queue = sqs.Queue.fromCfnQueue(props.queue);

// sending to an encrypted queue requires
// permissions on the associated kms key
if (this.props.queue.encryptionMasterKey) {
if (this.queue.encryptionMasterKey) {
this.taskPolicies.push(
new iam.PolicyStatement({
actions: ['kms:Decrypt', 'kms:GenerateDataKey*'],
resources: [this.props.queue.encryptionMasterKey.keyArn],
resources: [this.queue.encryptionMasterKey.keyArn],
}));
}
}
Expand All @@ -105,7 +106,7 @@ export class SqsSendMessage extends sfn.TaskStateBase {
return {
Resource: integrationResourceArn('sqs', 'sendMessage', this.integrationPattern),
Parameters: sfn.FieldUtils.renderObject({
QueueUrl: this.props.queue.queueUrl,
QueueUrl: this.queue.attrQueueUrl,
MessageBody: this.props.messageBody.value,
DelaySeconds: this.props.delay?.toSeconds(),
MessageDeduplicationId: this.props.messageDeduplicationId,
Expand Down

0 comments on commit a932eff

Please sign in to comment.