From 4f946640418a305e96dcd1b726ce307f930cbccc Mon Sep 17 00:00:00 2001 From: Jay DeLuca Date: Mon, 25 Nov 2024 12:24:48 -0500 Subject: [PATCH] Convert remaining aws sdk 1.11 tests from groovy to java (#12777) --- .../src/test/groovy/AwsConnector.groovy | 201 ------ .../src/test/groovy/S3TracingTest.groovy | 650 ------------------ .../src/test/groovy/SnsTracingTest.groovy | 215 ------ .../awssdk/v1_11/AwsConnector.java | 190 +++++ .../awssdk/v1_11/AwsSpanAssertions.java | 112 +++ .../awssdk/v1_11/S3TracingTest.java | 231 +++++++ .../awssdk/v1_11/SnsTracingTest.java | 112 +++ 7 files changed, 645 insertions(+), 1066 deletions(-) delete mode 100644 instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/AwsConnector.groovy delete mode 100644 instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/S3TracingTest.groovy delete mode 100644 instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/SnsTracingTest.groovy create mode 100644 instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/AwsConnector.java create mode 100644 instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/AwsSpanAssertions.java create mode 100644 instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/S3TracingTest.java create mode 100644 instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/SnsTracingTest.java diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/AwsConnector.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/AwsConnector.groovy deleted file mode 100644 index 5cf8a988900c..000000000000 --- a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/AwsConnector.groovy +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import com.amazonaws.auth.AWSCredentialsProvider -import com.amazonaws.auth.AWSStaticCredentialsProvider -import com.amazonaws.auth.BasicAWSCredentials -import com.amazonaws.client.builder.AwsClientBuilder -import com.amazonaws.regions.Regions -import com.amazonaws.services.s3.AmazonS3Client -import com.amazonaws.services.s3.model.BucketNotificationConfiguration -import com.amazonaws.services.s3.model.ObjectListing -import com.amazonaws.services.s3.model.QueueConfiguration -import com.amazonaws.services.s3.model.S3Event -import com.amazonaws.services.s3.model.S3ObjectSummary -import com.amazonaws.services.s3.model.SetBucketNotificationConfigurationRequest -import com.amazonaws.services.s3.model.TopicConfiguration -import com.amazonaws.services.sns.AmazonSNSAsyncClient -import com.amazonaws.services.sns.model.CreateTopicResult -import com.amazonaws.services.sns.model.SetTopicAttributesRequest -import com.amazonaws.services.sqs.AmazonSQSAsyncClient -import com.amazonaws.services.sqs.model.GetQueueAttributesRequest -import com.amazonaws.services.sqs.model.PurgeQueueRequest -import com.amazonaws.services.sqs.model.ReceiveMessageRequest -import org.slf4j.LoggerFactory -import org.testcontainers.containers.localstack.LocalStackContainer -import org.testcontainers.containers.output.Slf4jLogConsumer -import org.testcontainers.utility.DockerImageName - -import java.time.Duration - -class AwsConnector { - - private LocalStackContainer localstack - - private AmazonSQSAsyncClient sqsClient - private AmazonS3Client s3Client - private AmazonSNSAsyncClient snsClient - - static localstack() { - AwsConnector awsConnector = new AwsConnector() - - awsConnector.localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:2.0.2")) - .withServices(LocalStackContainer.Service.SQS, LocalStackContainer.Service.SNS, LocalStackContainer.Service.S3) - .withEnv("DEBUG", "1") - .withEnv("SQS_PROVIDER", "elasticmq") - .withStartupTimeout(Duration.ofMinutes(2)) - awsConnector.localstack.start() - awsConnector.localstack.followOutput(new Slf4jLogConsumer(LoggerFactory.getLogger("test"))) - - AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(awsConnector.localstack .getAccessKey(), awsConnector.localstack.getSecretKey())) - - awsConnector.sqsClient = AmazonSQSAsyncClient.asyncBuilder() - .withEndpointConfiguration(getEndpointConfiguration(awsConnector.localstack, LocalStackContainer.Service.SQS)) - .withCredentials(credentialsProvider) - .build() - - awsConnector.s3Client = AmazonS3Client.builder() - .withEndpointConfiguration(getEndpointConfiguration(awsConnector.localstack, LocalStackContainer.Service.S3)) - .withCredentials(credentialsProvider) - .build() - - awsConnector.snsClient = AmazonSNSAsyncClient.asyncBuilder() - .withEndpointConfiguration(getEndpointConfiguration(awsConnector.localstack, LocalStackContainer.Service.SNS)) - .withCredentials(credentialsProvider) - .build() - - return awsConnector - } - - static AwsClientBuilder.EndpointConfiguration getEndpointConfiguration(LocalStackContainer localstack, LocalStackContainer.Service service) { - return new AwsClientBuilder.EndpointConfiguration(localstack.getEndpointOverride(service).toString(), localstack.getRegion()) - } - - static liveAws() { - AwsConnector awsConnector = new AwsConnector() - - awsConnector.sqsClient = AmazonSQSAsyncClient.asyncBuilder() - .withRegion(Regions.US_EAST_1) - .build() - - awsConnector.s3Client = AmazonS3Client.builder() - .withRegion(Regions.US_EAST_1) - .build() - - awsConnector.snsClient = AmazonSNSAsyncClient.asyncBuilder() - .withRegion(Regions.US_EAST_1) - .build() - - return awsConnector - } - - def createQueue(String queueName) { - println "Create queue ${queueName}" - return sqsClient.createQueue(queueName).getQueueUrl() - } - - def getQueueArn(String queueUrl) { - println "Get ARN for queue ${queueUrl}" - return sqsClient.getQueueAttributes( - new GetQueueAttributesRequest(queueUrl) - .withAttributeNames("QueueArn")).getAttributes() - .get("QueueArn") - } - - def setTopicPublishingPolicy(String topicArn) { - println "Set policy for topic ${topicArn}" - snsClient.setTopicAttributes(new SetTopicAttributesRequest(topicArn, "Policy", String.format(SNS_POLICY, topicArn))) - } - - private static final String SNS_POLICY = "{" + - " \"Statement\": [" + - " {" + - " \"Effect\": \"Allow\"," + - " \"Principal\": \"*\"," + - " \"Action\": \"sns:Publish\"," + - " \"Resource\": \"%s\"" + - " }]" + - "}" - - def setQueuePublishingPolicy(String queueUrl, String queueArn) { - println "Set policy for queue ${queueArn}" - sqsClient.setQueueAttributes(queueUrl, Collections.singletonMap("Policy", String.format(SQS_POLICY, queueArn))) - } - - private static final String SQS_POLICY = "{" + - " \"Statement\": [" + - " {" + - " \"Effect\": \"Allow\"," + - " \"Principal\": \"*\"," + - " \"Action\": \"sqs:SendMessage\"," + - " \"Resource\": \"%s\"" + - " }]" + - "}" - - def createBucket(String bucketName) { - println "Create bucket ${bucketName}" - s3Client.createBucket(bucketName) - } - - def deleteBucket(String bucketName) { - println "Delete bucket ${bucketName}" - ObjectListing objectListing = s3Client.listObjects(bucketName) - Iterator objIter = objectListing.getObjectSummaries().iterator() - while (objIter.hasNext()) { - s3Client.deleteObject(bucketName, objIter.next().getKey()) - } - s3Client.deleteBucket(bucketName) - } - - def enableS3ToSqsNotifications(String bucketName, String sqsQueueArn) { - println "Enable notification for bucket ${bucketName} to queue ${sqsQueueArn}" - BucketNotificationConfiguration notificationConfiguration = new BucketNotificationConfiguration() - notificationConfiguration.addConfiguration("sqsQueueConfig", - new QueueConfiguration(sqsQueueArn, EnumSet.of(S3Event.ObjectCreatedByPut))) - s3Client.setBucketNotificationConfiguration(new SetBucketNotificationConfigurationRequest( - bucketName, notificationConfiguration)) - } - - def enableS3ToSnsNotifications(String bucketName, String snsTopicArn) { - println "Enable notification for bucket ${bucketName} to topic ${snsTopicArn}" - BucketNotificationConfiguration notificationConfiguration = new BucketNotificationConfiguration() - notificationConfiguration.addConfiguration("snsTopicConfig", - new TopicConfiguration(snsTopicArn, EnumSet.of(S3Event.ObjectCreatedByPut))) - s3Client.setBucketNotificationConfiguration(new SetBucketNotificationConfigurationRequest( - bucketName, notificationConfiguration)) - } - - def createTopicAndSubscribeQueue(String topicName, String queueArn) { - println "Create topic ${topicName} and subscribe to queue ${queueArn}" - CreateTopicResult ctr = snsClient.createTopic(topicName) - snsClient.subscribe(ctr.getTopicArn(), "sqs", queueArn) - return ctr.getTopicArn() - } - - def receiveMessage(String queueUrl) { - println "Receive message from queue ${queueUrl}" - sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(20)) - } - - def purgeQueue(String queueUrl) { - println "Purge queue ${queueUrl}" - sqsClient.purgeQueue(new PurgeQueueRequest(queueUrl)) - } - - def putSampleData(String bucketName) { - println "Put sample data to bucket ${bucketName}" - s3Client.putObject(bucketName, "otelTestKey", "otelTestData") - } - - def publishSampleNotification(String topicArn) { - snsClient.publish(topicArn, "Hello There") - } - - def disconnect() { - if (localstack != null) { - localstack.stop() - } - } -} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/S3TracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/S3TracingTest.groovy deleted file mode 100644 index 8ba084ba0215..000000000000 --- a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/S3TracingTest.groovy +++ /dev/null @@ -1,650 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes -import io.opentelemetry.semconv.incubating.AwsIncubatingAttributes -import io.opentelemetry.semconv.ServerAttributes -import io.opentelemetry.semconv.HttpAttributes -import io.opentelemetry.semconv.NetworkAttributes -import io.opentelemetry.semconv.UrlAttributes -import spock.lang.Shared - -import static io.opentelemetry.api.trace.SpanKind.CLIENT -import static io.opentelemetry.api.trace.SpanKind.CONSUMER - -class S3TracingTest extends AgentInstrumentationSpecification { - - @Shared - AwsConnector awsConnector = AwsConnector.localstack() - - def cleanupSpec() { - awsConnector.disconnect() - } - - def "S3 upload triggers SQS message"() { - setup: - String queueName = "s3ToSqsTestQueue" - String bucketName = "otel-s3-to-sqs-test-bucket" - - String queueUrl = awsConnector.createQueue(queueName) - awsConnector.createBucket(bucketName) - - String queueArn = awsConnector.getQueueArn(queueUrl) - awsConnector.setQueuePublishingPolicy(queueUrl, queueArn) - awsConnector.enableS3ToSqsNotifications(bucketName, queueArn) - - when: - // test message, auto created by AWS - awsConnector.receiveMessage(queueUrl) - awsConnector.putSampleData(bucketName) - // traced message - def receiveMessageResult = awsConnector.receiveMessage(queueUrl) - receiveMessageResult.messages.each {message -> - runWithSpan("process child") {} - } - - // cleanup - awsConnector.deleteBucket(bucketName) - awsConnector.purgeQueue(queueUrl) - - then: - assertTraces(10) { - trace(0, 1) { - - span(0) { - name "SQS.CreateQueue" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "aws.queue.name" queueName - "$AwsIncubatingAttributes.AWS_REQUEST_ID" String - "rpc.method" "CreateQueue" - "rpc.system" "aws-api" - "rpc.service" "AmazonSQS" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(1, 1) { - - span(0) { - name "S3.CreateBucket" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "aws.bucket.name" bucketName - "rpc.method" "CreateBucket" - "rpc.system" "aws-api" - "rpc.service" "Amazon S3" - "$HttpAttributes.HTTP_REQUEST_METHOD" "PUT" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(2, 1) { - - span(0) { - name "SQS.GetQueueAttributes" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "aws.queue.url" queueUrl - "$AwsIncubatingAttributes.AWS_REQUEST_ID" String - "rpc.method" "GetQueueAttributes" - "rpc.system" "aws-api" - "rpc.service" "AmazonSQS" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(3, 1) { - - span(0) { - name "SQS.SetQueueAttributes" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "aws.queue.url" queueUrl - "$AwsIncubatingAttributes.AWS_REQUEST_ID" String - "rpc.method" "SetQueueAttributes" - "rpc.system" "aws-api" - "rpc.service" "AmazonSQS" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(4, 1) { - - span(0) { - name "S3.SetBucketNotificationConfiguration" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "rpc.method" "SetBucketNotificationConfiguration" - "rpc.system" "aws-api" - "rpc.service" "Amazon S3" - "aws.bucket.name" bucketName - "$HttpAttributes.HTTP_REQUEST_METHOD" "PUT" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(5, 3) { - span(0) { - name "S3.PutObject" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "rpc.method" "PutObject" - "rpc.system" "aws-api" - "rpc.service" "Amazon S3" - "aws.bucket.name" bucketName - "$HttpAttributes.HTTP_REQUEST_METHOD" "PUT" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - span(1) { - name "s3ToSqsTestQueue process" - kind CONSUMER - childOf span(0) - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "aws.queue.url" queueUrl - "$AwsIncubatingAttributes.AWS_REQUEST_ID" String - "rpc.method" "ReceiveMessage" - "rpc.system" "aws-api" - "rpc.service" "AmazonSQS" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$ServerAttributes.SERVER_PORT" { it == null || Number } - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "s3ToSqsTestQueue" - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process" - "$MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - } - } - span(2) { - name "process child" - childOf span(1) - attributes { - } - } - } - trace(6, 1) { - span(0) { - name "S3.ListObjects" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "rpc.method" "ListObjects" - "rpc.system" "aws-api" - "rpc.service" "Amazon S3" - "aws.bucket.name" bucketName - "$HttpAttributes.HTTP_REQUEST_METHOD" "GET" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(7, 1) { - span(0) { - name "S3.DeleteObject" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "rpc.method" "DeleteObject" - "rpc.system" "aws-api" - "rpc.service" "Amazon S3" - "aws.bucket.name" bucketName - "$HttpAttributes.HTTP_REQUEST_METHOD" "DELETE" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 204 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(8, 1) { - span(0) { - name "S3.DeleteBucket" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "rpc.method" "DeleteBucket" - "rpc.system" "aws-api" - "rpc.service" "Amazon S3" - "aws.bucket.name" bucketName - "$HttpAttributes.HTTP_REQUEST_METHOD" "DELETE" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 204 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(9, 1) { - span(0) { - name "SQS.PurgeQueue" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "aws.queue.url" queueUrl - "$AwsIncubatingAttributes.AWS_REQUEST_ID" String - "rpc.method" "PurgeQueue" - "rpc.system" "aws-api" - "rpc.service" "AmazonSQS" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - } - } - - def "S3 upload triggers SNS topic notification, then creates SQS message"() { - setup: - String queueName = "s3ToSnsToSqsTestQueue" - String bucketName = "otel-s3-sns-sqs-test-bucket" - String topicName = "s3ToSnsToSqsTestTopic" - - String queueUrl = awsConnector.createQueue(queueName) - String queueArn = awsConnector.getQueueArn(queueUrl) - awsConnector.createBucket(bucketName) - String topicArn = awsConnector.createTopicAndSubscribeQueue(topicName, queueArn) - - awsConnector.setQueuePublishingPolicy(queueUrl, queueArn) - awsConnector.setTopicPublishingPolicy(topicArn) - awsConnector.enableS3ToSnsNotifications(bucketName, topicArn) - - when: - // test message, auto created by AWS - awsConnector.receiveMessage(queueUrl) - awsConnector.putSampleData(bucketName) - // traced message - def receiveMessageResult = awsConnector.receiveMessage(queueUrl) - receiveMessageResult.messages.each {message -> - runWithSpan("process child") {} - } - // cleanup - awsConnector.deleteBucket(bucketName) - awsConnector.purgeQueue(queueUrl) - - then: - assertTraces(14) { - trace(0, 1) { - span(0) { - name "SQS.CreateQueue" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "aws.queue.name" queueName - "$AwsIncubatingAttributes.AWS_REQUEST_ID" String - "rpc.method" "CreateQueue" - "rpc.system" "aws-api" - "rpc.service" "AmazonSQS" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(1, 1) { - span(0) { - name "SQS.GetQueueAttributes" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "aws.queue.url" queueUrl - "$AwsIncubatingAttributes.AWS_REQUEST_ID" String - "rpc.method" "GetQueueAttributes" - "rpc.system" "aws-api" - "rpc.service" "AmazonSQS" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(2, 1) { - span(0) { - name "S3.CreateBucket" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "rpc.method" "CreateBucket" - "rpc.system" "aws-api" - "rpc.service" "Amazon S3" - "aws.bucket.name" bucketName - "$HttpAttributes.HTTP_REQUEST_METHOD" "PUT" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(3, 1) { - span(0) { - name "SNS.CreateTopic" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "$AwsIncubatingAttributes.AWS_REQUEST_ID" String - "rpc.method" "CreateTopic" - "rpc.system" "aws-api" - "rpc.service" "AmazonSNS" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(4, 1) { - span(0) { - name "SNS.Subscribe" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "$AwsIncubatingAttributes.AWS_REQUEST_ID" String - "rpc.method" "Subscribe" - "rpc.system" "aws-api" - "rpc.service" "AmazonSNS" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" topicArn - } - } - } - trace(5, 1) { - span(0) { - name "SQS.SetQueueAttributes" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "aws.queue.url" queueUrl - "$AwsIncubatingAttributes.AWS_REQUEST_ID" String - "rpc.method" "SetQueueAttributes" - "rpc.system" "aws-api" - "rpc.service" "AmazonSQS" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(6, 1) { - span(0) { - name "SNS.SetTopicAttributes" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "$AwsIncubatingAttributes.AWS_REQUEST_ID" String - "rpc.method" "SetTopicAttributes" - "rpc.system" "aws-api" - "rpc.service" "AmazonSNS" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" topicArn - } - } - } - trace(7, 1) { - span(0) { - name "S3.SetBucketNotificationConfiguration" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "rpc.method" "SetBucketNotificationConfiguration" - "rpc.system" "aws-api" - "rpc.service" "Amazon S3" - "aws.bucket.name" bucketName - "$HttpAttributes.HTTP_REQUEST_METHOD" "PUT" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(8, 1) { - span(0) { - name "S3.PutObject" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "rpc.method" "PutObject" - "rpc.system" "aws-api" - "rpc.service" "Amazon S3" - "aws.bucket.name" bucketName - "$HttpAttributes.HTTP_REQUEST_METHOD" "PUT" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(9, 2) { - span(0) { - name "s3ToSnsToSqsTestQueue process" - kind CONSUMER - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "aws.queue.url" queueUrl - "$AwsIncubatingAttributes.AWS_REQUEST_ID" String - "rpc.method" "ReceiveMessage" - "rpc.system" "aws-api" - "rpc.service" "AmazonSQS" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$ServerAttributes.SERVER_PORT" { it == null || Number } - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "s3ToSnsToSqsTestQueue" - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process" - "$MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - } - } - span(1) { - name "process child" - childOf span(0) - attributes { - } - } - } - trace(10, 1) { - span(0) { - name "S3.ListObjects" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "rpc.method" "ListObjects" - "rpc.system" "aws-api" - "rpc.service" "Amazon S3" - "aws.bucket.name" bucketName - "$HttpAttributes.HTTP_REQUEST_METHOD" "GET" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(11, 1) { - span(0) { - name "S3.DeleteObject" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "rpc.method" "DeleteObject" - "rpc.system" "aws-api" - "rpc.service" "Amazon S3" - "aws.bucket.name" bucketName - "$HttpAttributes.HTTP_REQUEST_METHOD" "DELETE" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 204 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(12, 1) { - span(0) { - name "S3.DeleteBucket" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "rpc.method" "DeleteBucket" - "rpc.system" "aws-api" - "rpc.service" "Amazon S3" - "aws.bucket.name" bucketName - "$HttpAttributes.HTTP_REQUEST_METHOD" "DELETE" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 204 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(13, 1) { - span(0) { - name "SQS.PurgeQueue" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "aws.queue.url" queueUrl - "$AwsIncubatingAttributes.AWS_REQUEST_ID" String - "rpc.method" "PurgeQueue" - "rpc.system" "aws-api" - "rpc.service" "AmazonSQS" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://") } - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - } - } -} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/SnsTracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/SnsTracingTest.groovy deleted file mode 100644 index 0a227581227a..000000000000 --- a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/SnsTracingTest.groovy +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes -import io.opentelemetry.semconv.incubating.AwsIncubatingAttributes -import io.opentelemetry.semconv.ServerAttributes -import io.opentelemetry.semconv.HttpAttributes -import io.opentelemetry.semconv.NetworkAttributes -import io.opentelemetry.semconv.UrlAttributes -import spock.lang.Shared - -import static io.opentelemetry.api.trace.SpanKind.CLIENT -import static io.opentelemetry.api.trace.SpanKind.CONSUMER - -class SnsTracingTest extends AgentInstrumentationSpecification { - - @Shared - AwsConnector awsConnector = AwsConnector.localstack() - - - def cleanupSpec() { - awsConnector.disconnect() - } - - def "SNS notification triggers SQS message consumed with AWS SDK"() { - setup: - String queueName = "snsToSqsTestQueue" - String topicName = "snsToSqsTestTopic" - - String queueUrl = awsConnector.createQueue(queueName) - String queueArn = awsConnector.getQueueArn(queueUrl) - awsConnector.setQueuePublishingPolicy(queueUrl, queueArn) - String topicArn = awsConnector.createTopicAndSubscribeQueue(topicName, queueArn) - - when: - awsConnector.publishSampleNotification(topicArn) - def receiveMessageResult = awsConnector.receiveMessage(queueUrl) - receiveMessageResult.messages.each {message -> - runWithSpan("process child") {} - } - - then: - assertTraces(6) { - trace(0, 1) { - - span(0) { - name "SQS.CreateQueue" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "aws.queue.name" queueName - "$AwsIncubatingAttributes.AWS_REQUEST_ID" String - "rpc.method" "CreateQueue" - "rpc.system" "aws-api" - "rpc.service" "AmazonSQS" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" String - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(1, 1) { - - span(0) { - name "SQS.GetQueueAttributes" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "aws.queue.url" queueUrl - "$AwsIncubatingAttributes.AWS_REQUEST_ID" String - "rpc.method" "GetQueueAttributes" - "rpc.system" "aws-api" - "rpc.service" "AmazonSQS" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" String - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(2, 1) { - - span(0) { - name "SQS.SetQueueAttributes" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "aws.queue.url" queueUrl - "$AwsIncubatingAttributes.AWS_REQUEST_ID" String - "rpc.method" "SetQueueAttributes" - "rpc.system" "aws-api" - "rpc.service" "AmazonSQS" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" String - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(3, 1) { - - span(0) { - name "SNS.CreateTopic" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "$AwsIncubatingAttributes.AWS_REQUEST_ID" String - "rpc.method" "CreateTopic" - "rpc.system" "aws-api" - "rpc.service" "AmazonSNS" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" String - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - } - } - } - trace(4, 1) { - - span(0) { - name "SNS.Subscribe" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "$AwsIncubatingAttributes.AWS_REQUEST_ID" String - "rpc.method" "Subscribe" - "rpc.system" "aws-api" - "rpc.service" "AmazonSNS" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" String - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" topicArn - } - } - } - trace(5, 3) { - span(0) { - name "SNS.Publish" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "$AwsIncubatingAttributes.AWS_REQUEST_ID" String - "rpc.method" "Publish" - "rpc.system" "aws-api" - "rpc.service" "AmazonSNS" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" String - "$ServerAttributes.SERVER_ADDRESS" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - "$ServerAttributes.SERVER_PORT" { it == null || Number } - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" topicArn - } - } - span(1) { - name "snsToSqsTestQueue process" - kind CONSUMER - childOf span(0) - attributes { - "aws.agent" "java-aws-sdk" - "aws.endpoint" String - "aws.queue.url" queueUrl - "$AwsIncubatingAttributes.AWS_REQUEST_ID" String - "rpc.system" "aws-api" - "rpc.service" "AmazonSQS" - "rpc.method" "ReceiveMessage" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" String - "$ServerAttributes.SERVER_ADDRESS" String - "$ServerAttributes.SERVER_PORT" { it == null || Number } - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "snsToSqsTestQueue" - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process" - "$MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID" String - "$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1" - } - } - span(2) { - name "process child" - childOf span(1) - attributes { - } - } - } - } - } -} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/AwsConnector.java b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/AwsConnector.java new file mode 100644 index 000000000000..90e5ff90cbd8 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/AwsConnector.java @@ -0,0 +1,190 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.awssdk.v1_11; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.BucketNotificationConfiguration; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.QueueConfiguration; +import com.amazonaws.services.s3.model.S3Event; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.SetBucketNotificationConfigurationRequest; +import com.amazonaws.services.s3.model.TopicConfiguration; +import com.amazonaws.services.sns.AmazonSNSAsync; +import com.amazonaws.services.sns.AmazonSNSAsyncClient; +import com.amazonaws.services.sns.model.CreateTopicResult; +import com.amazonaws.services.sns.model.SetTopicAttributesRequest; +import com.amazonaws.services.sqs.AmazonSQSAsync; +import com.amazonaws.services.sqs.AmazonSQSAsyncClient; +import com.amazonaws.services.sqs.model.GetQueueAttributesRequest; +import com.amazonaws.services.sqs.model.PurgeQueueRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import java.time.Duration; +import java.util.Collections; +import java.util.EnumSet; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerImageName; + +class AwsConnector { + private final LocalStackContainer localStack; + private final AmazonSQSAsync sqsClient; + private final AmazonS3 s3Client; + private final AmazonSNSAsync snsClient; + + AwsConnector() { + localStack = + new LocalStackContainer(DockerImageName.parse("localstack/localstack:2.0.2")) + .withServices( + LocalStackContainer.Service.SQS, + LocalStackContainer.Service.SNS, + LocalStackContainer.Service.S3) + .withEnv("DEBUG", "1") + .withEnv("SQS_PROVIDER", "elasticmq") + .withStartupTimeout(Duration.ofMinutes(2)); + localStack.start(); + localStack.followOutput(new Slf4jLogConsumer(LoggerFactory.getLogger("test"))); + + AWSCredentialsProvider credentialsProvider = + new AWSStaticCredentialsProvider( + new BasicAWSCredentials(localStack.getAccessKey(), localStack.getSecretKey())); + + sqsClient = + AmazonSQSAsyncClient.asyncBuilder() + .withEndpointConfiguration( + getEndpointConfiguration(localStack, LocalStackContainer.Service.SQS)) + .withCredentials(credentialsProvider) + .build(); + + s3Client = + AmazonS3Client.builder() + .withEndpointConfiguration( + getEndpointConfiguration(localStack, LocalStackContainer.Service.S3)) + .withCredentials(credentialsProvider) + .build(); + + snsClient = + AmazonSNSAsyncClient.asyncBuilder() + .withEndpointConfiguration( + getEndpointConfiguration(localStack, LocalStackContainer.Service.SNS)) + .withCredentials(credentialsProvider) + .build(); + } + + static AwsClientBuilder.EndpointConfiguration getEndpointConfiguration( + LocalStackContainer localStack, LocalStackContainer.Service service) { + return new AwsClientBuilder.EndpointConfiguration( + localStack.getEndpointOverride(service).toString(), localStack.getRegion()); + } + + String createQueue(String queueName) { + return sqsClient.createQueue(queueName).getQueueUrl(); + } + + String getQueueArn(String queueUrl) { + return sqsClient + .getQueueAttributes(new GetQueueAttributesRequest(queueUrl).withAttributeNames("QueueArn")) + .getAttributes() + .get("QueueArn"); + } + + void setTopicPublishingPolicy(String topicArn) { + String snsPolicy = + "{" + + " \"Statement\": [" + + " {" + + " \"Effect\": \"Allow\"," + + " \"Principal\": \"*\"," + + " \"Action\": \"sns:Publish\"," + + " \"Resource\": \"%s\"" + + " }]" + + "}"; + snsClient.setTopicAttributes( + new SetTopicAttributesRequest(topicArn, "Policy", String.format(snsPolicy, topicArn))); + } + + void setQueuePublishingPolicy(String queueUrl, String queueArn) { + String sqsPolicy = + "{" + + " \"Statement\": [" + + " {" + + " \"Effect\": \"Allow\"," + + " \"Principal\": \"*\"," + + " \"Action\": \"sqs:SendMessage\"," + + " \"Resource\": \"%s\"" + + " }]" + + "}"; + sqsClient.setQueueAttributes( + queueUrl, Collections.singletonMap("Policy", String.format(sqsPolicy, queueArn))); + } + + void createBucket(String bucketName) { + s3Client.createBucket(bucketName); + } + + void deleteBucket(String bucketName) { + ObjectListing objectListing = s3Client.listObjects(bucketName); + for (S3ObjectSummary element : objectListing.getObjectSummaries()) { + s3Client.deleteObject(bucketName, element.getKey()); + } + s3Client.deleteBucket(bucketName); + } + + void enableS3ToSqsNotifications(String bucketName, String sqsQueueArn) { + BucketNotificationConfiguration notificationConfiguration = + new BucketNotificationConfiguration(); + notificationConfiguration.addConfiguration( + "sqsQueueConfig", + new QueueConfiguration(sqsQueueArn, EnumSet.of(S3Event.ObjectCreatedByPut))); + s3Client.setBucketNotificationConfiguration( + new SetBucketNotificationConfigurationRequest(bucketName, notificationConfiguration)); + } + + void enableS3ToSnsNotifications(String bucketName, String snsTopicArn) { + BucketNotificationConfiguration notificationConfiguration = + new BucketNotificationConfiguration(); + notificationConfiguration.addConfiguration( + "snsTopicConfig", + new TopicConfiguration(snsTopicArn, EnumSet.of(S3Event.ObjectCreatedByPut))); + s3Client.setBucketNotificationConfiguration( + new SetBucketNotificationConfigurationRequest(bucketName, notificationConfiguration)); + } + + String createTopicAndSubscribeQueue(String topicName, String queueArn) { + CreateTopicResult ctr = snsClient.createTopic(topicName); + snsClient.subscribe(ctr.getTopicArn(), "sqs", queueArn); + return ctr.getTopicArn(); + } + + ReceiveMessageResult receiveMessage(String queueUrl) { + return sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(20)); + } + + void purgeQueue(String queueUrl) { + sqsClient.purgeQueue(new PurgeQueueRequest(queueUrl)); + } + + void putSampleData(String bucketName) { + s3Client.putObject(bucketName, "otelTestKey", "otelTestData"); + } + + void publishSampleNotification(String topicArn) { + snsClient.publish(topicArn, "Hello There"); + } + + void disconnect() { + if (localStack != null) { + localStack.stop(); + } + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/AwsSpanAssertions.java b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/AwsSpanAssertions.java new file mode 100644 index 000000000000..483a0c5230bc --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/AwsSpanAssertions.java @@ -0,0 +1,112 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.awssdk.v1_11; + +import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_REQUEST_METHOD; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE; +import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PROTOCOL_VERSION; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; +import static io.opentelemetry.semconv.UrlAttributes.URL_FULL; +import static io.opentelemetry.semconv.incubating.AwsIncubatingAttributes.AWS_REQUEST_ID; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; +import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_METHOD; +import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SERVICE; +import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SYSTEM; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.testing.assertj.SpanDataAssert; + +class AwsSpanAssertions { + private AwsSpanAssertions() {} + + static SpanDataAssert sqs( + SpanDataAssert span, String queueName, String queueUrl, String rpcMethod) { + return span.hasName("SQS." + rpcMethod) + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(stringKey("aws.agent"), "java-aws-sdk"), + satisfies(stringKey("aws.endpoint"), v -> v.isInstanceOf(String.class)), + equalTo(stringKey("aws.queue.name"), queueName), + equalTo(stringKey("aws.queue.url"), queueUrl), + satisfies(AWS_REQUEST_ID, v -> v.isInstanceOf(String.class)), + equalTo(RPC_METHOD, rpcMethod), + equalTo(RPC_SYSTEM, "aws-api"), + equalTo(RPC_SERVICE, "AmazonSQS"), + equalTo(HTTP_REQUEST_METHOD, "POST"), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + satisfies(URL_FULL, val -> val.startsWith("http://")), + satisfies(SERVER_ADDRESS, v -> v.isInstanceOf(String.class)), + equalTo(NETWORK_PROTOCOL_VERSION, "1.1"), + satisfies( + SERVER_PORT, + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isNull(), + v -> assertThat(v).isInstanceOf(Number.class)))); + } + + static SpanDataAssert s3( + SpanDataAssert span, + String bucketName, + String rpcMethod, + String requestMethod, + int responseStatusCode) { + + return span.hasName("S3." + rpcMethod) + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(stringKey("aws.agent"), "java-aws-sdk"), + satisfies(stringKey("aws.endpoint"), v -> v.isInstanceOf(String.class)), + equalTo(stringKey("aws.bucket.name"), bucketName), + equalTo(RPC_METHOD, rpcMethod), + equalTo(RPC_SYSTEM, "aws-api"), + equalTo(RPC_SERVICE, "Amazon S3"), + equalTo(HTTP_REQUEST_METHOD, requestMethod), + equalTo(HTTP_RESPONSE_STATUS_CODE, responseStatusCode), + satisfies(URL_FULL, val -> val.startsWith("http://")), + satisfies(SERVER_ADDRESS, v -> v.isInstanceOf(String.class)), + equalTo(NETWORK_PROTOCOL_VERSION, "1.1"), + satisfies( + SERVER_PORT, + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isNull(), + v -> assertThat(v).isInstanceOf(Number.class)))); + } + + static SpanDataAssert sns(SpanDataAssert span, String topicArn, String rpcMethod) { + + return span.hasName("SNS." + rpcMethod) + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(stringKey("aws.agent"), "java-aws-sdk"), + equalTo(MESSAGING_DESTINATION_NAME, topicArn), + satisfies(stringKey("aws.endpoint"), v -> v.isInstanceOf(String.class)), + satisfies(AWS_REQUEST_ID, v -> v.isInstanceOf(String.class)), + equalTo(RPC_METHOD, rpcMethod), + equalTo(RPC_SYSTEM, "aws-api"), + equalTo(RPC_SERVICE, "AmazonSNS"), + equalTo(HTTP_REQUEST_METHOD, "POST"), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + satisfies(URL_FULL, val -> val.startsWith("http://")), + satisfies(SERVER_ADDRESS, v -> v.isInstanceOf(String.class)), + equalTo(NETWORK_PROTOCOL_VERSION, "1.1"), + satisfies( + SERVER_PORT, + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isNull(), + v -> assertThat(v).isInstanceOf(Number.class)))); + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/S3TracingTest.java b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/S3TracingTest.java new file mode 100644 index 000000000000..56eca09f8c46 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/S3TracingTest.java @@ -0,0 +1,231 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.awssdk.v1_11; + +import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static io.opentelemetry.javaagent.instrumentation.awssdk.v1_11.AwsSpanAssertions.s3; +import static io.opentelemetry.javaagent.instrumentation.awssdk.v1_11.AwsSpanAssertions.sns; +import static io.opentelemetry.javaagent.instrumentation.awssdk.v1_11.AwsSpanAssertions.sqs; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_REQUEST_METHOD; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE; +import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PROTOCOL_VERSION; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; +import static io.opentelemetry.semconv.UrlAttributes.URL_FULL; +import static io.opentelemetry.semconv.incubating.AwsIncubatingAttributes.AWS_REQUEST_ID; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS; +import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_METHOD; +import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SERVICE; +import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SYSTEM; +import static org.assertj.core.api.Assertions.assertThat; + +import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +@SuppressWarnings("deprecation") // MESSAGING_OPERATION is deprecated +class S3TracingTest { + + @RegisterExtension + private static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + private static final AwsConnector awsConnector = new AwsConnector(); + + @AfterAll + static void cleanUp() { + awsConnector.disconnect(); + } + + @Test + void testS3UploadTriggersSqsMessage() { + String queueName = "s3ToSqsTestQueue"; + String bucketName = "otel-s3-to-sqs-test-bucket"; + + String queueUrl = awsConnector.createQueue(queueName); + awsConnector.createBucket(bucketName); + + String queueArn = awsConnector.getQueueArn(queueUrl); + awsConnector.setQueuePublishingPolicy(queueUrl, queueArn); + awsConnector.enableS3ToSqsNotifications(bucketName, queueArn); + + // test message, auto created by AWS + awsConnector.receiveMessage(queueUrl); + awsConnector.putSampleData(bucketName); + + // traced message + ReceiveMessageResult receiveMessageResult = awsConnector.receiveMessage(queueUrl); + receiveMessageResult + .getMessages() + .forEach(message -> testing.runWithSpan("process child", () -> {})); + + // cleanup + awsConnector.deleteBucket(bucketName); + awsConnector.purgeQueue(queueUrl); + + testing.waitAndAssertTraces( + trace -> trace.hasSpansSatisfyingExactly(span -> sqs(span, queueName, null, "CreateQueue")), + trace -> + trace.hasSpansSatisfyingExactly( + span -> s3(span, bucketName, "CreateBucket", "PUT", 200)), + trace -> + trace.hasSpansSatisfyingExactly( + span -> sqs(span, null, queueUrl, "GetQueueAttributes")), + trace -> + trace.hasSpansSatisfyingExactly( + span -> sqs(span, null, queueUrl, "SetQueueAttributes")), + trace -> + trace.hasSpansSatisfyingExactly( + span -> s3(span, bucketName, "SetBucketNotificationConfiguration", "PUT", 200)), + trace -> + trace.hasSpansSatisfyingExactly( + span -> s3(span, bucketName, "PutObject", "PUT", 200), + span -> + span.hasName("s3ToSqsTestQueue process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("aws.agent"), "java-aws-sdk"), + satisfies(stringKey("aws.endpoint"), v -> v.isInstanceOf(String.class)), + equalTo(stringKey("aws.queue.url"), queueUrl), + satisfies(AWS_REQUEST_ID, v -> v.isInstanceOf(String.class)), + equalTo(RPC_METHOD, "ReceiveMessage"), + equalTo(RPC_SYSTEM, "aws-api"), + equalTo(RPC_SERVICE, "AmazonSQS"), + equalTo(HTTP_REQUEST_METHOD, "POST"), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + satisfies(URL_FULL, val -> val.startsWith("http://")), + satisfies(SERVER_ADDRESS, v -> v.isInstanceOf(String.class)), + equalTo(NETWORK_PROTOCOL_VERSION, "1.1"), + satisfies( + SERVER_PORT, + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isNull(), + v -> assertThat(v).isInstanceOf(Number.class))), + equalTo(MESSAGING_SYSTEM, AWS_SQS), + equalTo(MESSAGING_DESTINATION_NAME, "s3ToSqsTestQueue"), + equalTo(MESSAGING_OPERATION, "process"), + satisfies(MESSAGING_MESSAGE_ID, v -> v.isInstanceOf(String.class))), + span -> + span.hasName("process child") + .hasParent(trace.getSpan(1)) + .hasAttributes(Attributes.empty())), + trace -> + trace.hasSpansSatisfyingExactly( + span -> s3(span, bucketName, "ListObjects", "GET", 200)), + trace -> + trace.hasSpansSatisfyingExactly( + span -> s3(span, bucketName, "DeleteObject", "DELETE", 204)), + trace -> + trace.hasSpansSatisfyingExactly( + span -> s3(span, bucketName, "DeleteBucket", "DELETE", 204)), + trace -> trace.hasSpansSatisfyingExactly(span -> sqs(span, null, queueUrl, "PurgeQueue"))); + } + + @Test + void testS3UploadTriggersSnsTopicNotificationThenCreatesSqsMessage() { + String queueName = "s3ToSnsToSqsTestQueue"; + String bucketName = "otel-s3-to-sns-to-sqs-test-bucket"; + String topicName = "s3ToSnsTestTopic"; + + String queueUrl = awsConnector.createQueue(queueName); + String queueArn = awsConnector.getQueueArn(queueUrl); + awsConnector.createBucket(bucketName); + String topicArn = awsConnector.createTopicAndSubscribeQueue(topicName, queueArn); + + awsConnector.setQueuePublishingPolicy(queueUrl, queueArn); + awsConnector.setTopicPublishingPolicy(topicArn); + awsConnector.enableS3ToSnsNotifications(bucketName, topicArn); + + // test message, auto created by AWS + awsConnector.receiveMessage(queueUrl); + awsConnector.putSampleData(bucketName); + + // traced message + ReceiveMessageResult receiveMessageResult = awsConnector.receiveMessage(queueUrl); + receiveMessageResult + .getMessages() + .forEach(message -> testing.runWithSpan("process child", () -> {})); + + // cleanup + awsConnector.deleteBucket(bucketName); + awsConnector.purgeQueue(queueUrl); + + testing.waitAndAssertTraces( + trace -> trace.hasSpansSatisfyingExactly(span -> sqs(span, queueName, null, "CreateQueue")), + trace -> + trace.hasSpansSatisfyingExactly( + span -> sqs(span, null, queueUrl, "GetQueueAttributes")), + trace -> + trace.hasSpansSatisfyingExactly( + span -> s3(span, bucketName, "CreateBucket", "PUT", 200)), + trace -> trace.hasSpansSatisfyingExactly(span -> sns(span, null, "CreateTopic")), + trace -> trace.hasSpansSatisfyingExactly(span -> sns(span, topicArn, "Subscribe")), + trace -> + trace.hasSpansSatisfyingExactly( + span -> sqs(span, null, queueUrl, "SetQueueAttributes")), + trace -> trace.hasSpansSatisfyingExactly(span -> sns(span, topicArn, "SetTopicAttributes")), + trace -> + trace.hasSpansSatisfyingExactly( + span -> s3(span, bucketName, "SetBucketNotificationConfiguration", "PUT", 200)), + trace -> + trace.hasSpansSatisfyingExactly(span -> s3(span, bucketName, "PutObject", "PUT", 200)), + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("s3ToSnsToSqsTestQueue process") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(stringKey("aws.agent"), "java-aws-sdk"), + satisfies(stringKey("aws.endpoint"), v -> v.isInstanceOf(String.class)), + equalTo(stringKey("aws.queue.url"), queueUrl), + satisfies(AWS_REQUEST_ID, v -> v.isInstanceOf(String.class)), + equalTo(RPC_METHOD, "ReceiveMessage"), + equalTo(RPC_SYSTEM, "aws-api"), + equalTo(RPC_SERVICE, "AmazonSQS"), + equalTo(HTTP_REQUEST_METHOD, "POST"), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + satisfies(URL_FULL, val -> val.startsWith("http://")), + satisfies(SERVER_ADDRESS, v -> v.isInstanceOf(String.class)), + equalTo(NETWORK_PROTOCOL_VERSION, "1.1"), + satisfies( + SERVER_PORT, + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isNull(), + v -> assertThat(v).isInstanceOf(Number.class))), + equalTo(MESSAGING_SYSTEM, AWS_SQS), + equalTo(MESSAGING_DESTINATION_NAME, "s3ToSnsToSqsTestQueue"), + equalTo(MESSAGING_OPERATION, "process"), + satisfies(MESSAGING_MESSAGE_ID, v -> v.isInstanceOf(String.class))), + span -> + span.hasName("process child") + .hasParent(trace.getSpan(0)) + .hasAttributes(Attributes.empty())), + trace -> + trace.hasSpansSatisfyingExactly( + span -> s3(span, bucketName, "ListObjects", "GET", 200)), + trace -> + trace.hasSpansSatisfyingExactly( + span -> s3(span, bucketName, "DeleteObject", "DELETE", 204)), + trace -> + trace.hasSpansSatisfyingExactly( + span -> s3(span, bucketName, "DeleteBucket", "DELETE", 204)), + trace -> trace.hasSpansSatisfyingExactly(span -> sqs(span, null, queueUrl, "PurgeQueue"))); + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/SnsTracingTest.java b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/SnsTracingTest.java new file mode 100644 index 000000000000..429ca07938e6 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/SnsTracingTest.java @@ -0,0 +1,112 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.awssdk.v1_11; + +import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static io.opentelemetry.javaagent.instrumentation.awssdk.v1_11.AwsSpanAssertions.sns; +import static io.opentelemetry.javaagent.instrumentation.awssdk.v1_11.AwsSpanAssertions.sqs; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_REQUEST_METHOD; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE; +import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PROTOCOL_VERSION; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; +import static io.opentelemetry.semconv.UrlAttributes.URL_FULL; +import static io.opentelemetry.semconv.incubating.AwsIncubatingAttributes.AWS_REQUEST_ID; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS; +import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_METHOD; +import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SERVICE; +import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SYSTEM; +import static org.assertj.core.api.Assertions.assertThat; + +import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class SnsTracingTest { + @RegisterExtension + private static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + private static final AwsConnector awsConnector = new AwsConnector(); + + @AfterAll + static void cleanUp() { + awsConnector.disconnect(); + } + + @Test + @SuppressWarnings("deprecation") // MESSAGING_OPERATION is deprecated + void testSnsNotificationTriggersSqsMessageConsumedWithAwsSdk() { + String queueName = "snsToSqsTestQueue"; + String topicName = "snsToSqsTestTopic"; + + String queueUrl = awsConnector.createQueue(queueName); + String queueArn = awsConnector.getQueueArn(queueUrl); + awsConnector.setQueuePublishingPolicy(queueUrl, queueArn); + String topicArn = awsConnector.createTopicAndSubscribeQueue(topicName, queueArn); + + awsConnector.publishSampleNotification(topicArn); + ReceiveMessageResult receiveMessageResult = awsConnector.receiveMessage(queueUrl); + receiveMessageResult + .getMessages() + .forEach(message -> testing.runWithSpan("process child", () -> {})); + + testing.waitAndAssertTraces( + trace -> trace.hasSpansSatisfyingExactly(span -> sqs(span, queueName, null, "CreateQueue")), + trace -> + trace.hasSpansSatisfyingExactly( + span -> sqs(span, null, queueUrl, "GetQueueAttributes")), + trace -> + trace.hasSpansSatisfyingExactly( + span -> sqs(span, null, queueUrl, "SetQueueAttributes")), + trace -> trace.hasSpansSatisfyingExactly(span -> sns(span, null, "CreateTopic")), + trace -> trace.hasSpansSatisfyingExactly(span -> sns(span, topicArn, "Subscribe")), + trace -> + trace.hasSpansSatisfyingExactly( + span -> sns(span, topicArn, "Publish"), + span -> + span.hasName("snsToSqsTestQueue process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("aws.agent"), "java-aws-sdk"), + satisfies(stringKey("aws.endpoint"), v -> v.isInstanceOf(String.class)), + equalTo(stringKey("aws.queue.url"), queueUrl), + satisfies(AWS_REQUEST_ID, v -> v.isInstanceOf(String.class)), + equalTo(RPC_METHOD, "ReceiveMessage"), + equalTo(RPC_SYSTEM, "aws-api"), + equalTo(RPC_SERVICE, "AmazonSQS"), + equalTo(HTTP_REQUEST_METHOD, "POST"), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + satisfies(URL_FULL, val -> val.startsWith("http://")), + satisfies(SERVER_ADDRESS, v -> v.isInstanceOf(String.class)), + equalTo(NETWORK_PROTOCOL_VERSION, "1.1"), + satisfies( + SERVER_PORT, + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isNull(), + v -> assertThat(v).isInstanceOf(Number.class))), + equalTo(MESSAGING_SYSTEM, AWS_SQS), + equalTo(MESSAGING_DESTINATION_NAME, "snsToSqsTestQueue"), + equalTo(MESSAGING_OPERATION, "process"), + satisfies(MESSAGING_MESSAGE_ID, v -> v.isInstanceOf(String.class))), + span -> + span.hasName("process child") + .hasParent(trace.getSpan(1)) + .hasAttributes(Attributes.empty()))); + } +}