Skip to content

Commit

Permalink
Merge branch 'feature/sqs-instrumentation' of https://github.com/newr…
Browse files Browse the repository at this point in the history
…elic/newrelic-dotnet-agent into feature/sqs-instrumentation
  • Loading branch information
tippmar-nr committed Jul 8, 2024
2 parents df8518c + 8e90406 commit e44d44c
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
using Amazon.SQS;
using Amazon.SQS.Model;
using System.Linq;
using System.Collections;
using System.Collections.Generic;

namespace AwsSdkTestApp.AwsSdkExerciser
{
Expand Down Expand Up @@ -61,21 +63,23 @@ await _amazonSqsClient.DeleteQueueAsync(new DeleteQueueRequest
QueueUrl = _sqsQueueUrl
});
}
public async Task SQS_Initialize(string queueName)
public async Task<string> SQS_Initialize(string queueName)
{
if (_sqsQueueUrl != null)
{
throw new InvalidOperationException("Queue URL is already set. Call SQS_Teardown first.");
}

_sqsQueueUrl = await SQS_CreateQueueAsync(queueName);

return _sqsQueueUrl;
}

public async Task SQS_Teardown()
{
if (_sqsQueueUrl == null)
{
throw new InvalidOperationException("Queue URL is not set. Call SQS_Initialize first.");
throw new InvalidOperationException("Queue URL is not set. Call SQS_Initialize or SQS_SetQueueUrl first.");
}

await SQS_DeleteQueueAsync();
Expand All @@ -87,29 +91,35 @@ public async Task SQS_SendMessage(string message)
{
if (_sqsQueueUrl == null)
{
throw new InvalidOperationException("Queue URL is not set. Call SQS_Initialize first.");
throw new InvalidOperationException("Queue URL is not set. Call SQS_Initialize or SQS_SetQueueUrl first.");
}

await _amazonSqsClient.SendMessageAsync(_sqsQueueUrl, message);
}

[MethodImpl(MethodImplOptions.NoOptimization | MethodImplOptions.NoInlining)]
public async Task SQS_ReceiveMessage()
public async Task<IEnumerable<Message>> SQS_ReceiveMessage(int maxMessagesToReceive = 1)
{
if (_sqsQueueUrl == null)
{
throw new InvalidOperationException("Queue URL is not set. Call SQS_Initialize first.");
throw new InvalidOperationException("Queue URL is not set. Call SQS_Initialize or SQS_SetQueueUrl first.");
}

var response = await _amazonSqsClient.ReceiveMessageAsync(new ReceiveMessageRequest
{
QueueUrl = _sqsQueueUrl,
MaxNumberOfMessages = 1
MaxNumberOfMessages = maxMessagesToReceive,
MessageAttributeNames = ["All"]
});

foreach (var message in response.Messages)
{
Console.WriteLine($"Message: {message.Body}");
foreach (var attr in message.MessageAttributes)
{
Console.WriteLine($"MessageAttributes: {attr.Key} = {{ DataType = {attr.Value.DataType}, StringValue = {attr.Value.StringValue}}}");
}

// delete message
await _amazonSqsClient.DeleteMessageAsync(new DeleteMessageRequest
{
Expand All @@ -118,6 +128,7 @@ await _amazonSqsClient.DeleteMessageAsync(new DeleteMessageRequest
});
}

return response.Messages;
}

// send message batch
Expand All @@ -126,7 +137,7 @@ public async Task SQS_SendMessageBatch(string[] messages)
{
if (_sqsQueueUrl == null)
{
throw new InvalidOperationException("Queue URL is not set. Call SQS_Initialize first.");
throw new InvalidOperationException("Queue URL is not set. Call SQS_Initialize or SQS_SetQueueUrl first.");
}

var request = new SendMessageBatchRequest
Expand All @@ -149,7 +160,7 @@ public async Task SQS_PurgeQueue()
{
if (_sqsQueueUrl == null)
{
throw new InvalidOperationException("Queue URL is not set. Call SQS_Initialize first.");
throw new InvalidOperationException("Queue URL is not set. Call SQS_Initialize or SQS_SetQueueUrl first.");
}

await _amazonSqsClient.PurgeQueueAsync(new PurgeQueueRequest
Expand All @@ -158,6 +169,11 @@ await _amazonSqsClient.PurgeQueueAsync(new PurgeQueueRequest
});
}

public void SQS_SetQueueUrl(string messageQueueUrl)
{
_sqsQueueUrl = messageQueueUrl;
}

#endregion

public void Dispose()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
// Copyright 2020 New Relic, Inc. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

using System.Collections;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.Threading.Tasks;
using Amazon.SQS.Model;
using AwsSdkTestApp.AwsSdkExerciser;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
Expand All @@ -25,10 +28,59 @@ public async Task SQS_SendReceivePurge([Required]string queueName)

await awsSdkExerciser.SQS_SendMessage("Hello World!");
await awsSdkExerciser.SQS_ReceiveMessage();
await awsSdkExerciser.SQS_SendMessageBatch(new[] { "Hello", "World" });

var messages = new[] { "Hello", "World" };
await awsSdkExerciser.SQS_SendMessageBatch(messages);
await awsSdkExerciser.SQS_ReceiveMessage(messages.Length);

await awsSdkExerciser.SQS_PurgeQueue();

await awsSdkExerciser.SQS_Teardown();
}

/// <summary>
/// Creates a queue and returns the queue URL
/// </summary>
/// <param name="queueName"></param>
/// <returns></returns>
// GET: /AwsSdk/SQS_InitializeQueue?queueName=MyQueue
[HttpGet("SQS_InitializeQueue")]
public async Task<string> SQS_InitializeQueue([Required]string queueName)
{
using var awsSdkExerciser = new AwsSdkExerciser.AwsSdkExerciser(AwsSdkTestType.SQS);
return await awsSdkExerciser.SQS_Initialize(queueName);
}

// GET: /AwsSdk/SQS_SendMessageToQueue?message=Hello&messageQueueUrl=MyQueue
[HttpGet("SQS_SendMessageToQueue")]
public async Task SQS_SendMessageToQueue([Required]string message, [Required]string messageQueueUrl)
{
using var awsSdkExerciser = new AwsSdkExerciser.AwsSdkExerciser(AwsSdkTestType.SQS);
awsSdkExerciser.SQS_SetQueueUrl(messageQueueUrl);

await awsSdkExerciser.SQS_SendMessage(message);
}

// GET: /AwsSdk/SQS_SendMessageBatchToQueue?messageQueueUrl=MyQueue
[HttpGet("SQS_ReceiveMessageFromQueue")]
public async Task<IEnumerable<Message>> SQS_ReceiveMessageFromQueue([Required]string messageQueueUrl)
{
using var awsSdkExerciser = new AwsSdkExerciser.AwsSdkExerciser(AwsSdkTestType.SQS);
awsSdkExerciser.SQS_SetQueueUrl(messageQueueUrl);

var messages = await awsSdkExerciser.SQS_ReceiveMessage();

return messages;
}

// GET: /AwsSdk/SQS_SendMessageBatchToQueue?messageQueueUrl=MyQueue
[HttpGet("SQS_DeleteQueue")]
public async Task SQS_DeleteQueue([Required]string messageQueueUrl)
{
using var awsSdkExerciser = new AwsSdkExerciser.AwsSdkExerciser(AwsSdkTestType.SQS);
awsSdkExerciser.SQS_SetQueueUrl(messageQueueUrl);

await awsSdkExerciser.SQS_Teardown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,25 @@ public class AwsSdkContainerSQSTestFixture : AwsSdkContainerTestFixtureBase

public AwsSdkContainerSQSTestFixture() : base(DistroTag, Architecture, Dockerfile) { }

public void ExerciseSQS(string queueName)
public void ExerciseSQS_SendReceivePurge(string queueName)
{
var address = $"http://localhost:{Port}/awssdk";

GetAndAssertStatusCode($"{address}/SQS_SendReceivePurge?queueName={queueName}", System.Net.HttpStatusCode.OK);
}

public string ExerciseSQS_SendAndReceiveInSeparateTransactions(string queueName)
{
var address = $"http://localhost:{Port}/awssdk";

var queueUrl = GetString($"{address}/SQS_InitializeQueue?queueName={queueName}");

GetAndAssertStatusCode($"{address}/SQS_SendMessageToQueue?message=Hello&messageQueueUrl={queueUrl}", System.Net.HttpStatusCode.OK);
var messagesJson = GetString($"{address}/SQS_ReceiveMessageFromQueue?messageQueueUrl={queueUrl}");

GetAndAssertStatusCode($"{address}/SQS_DeleteQueue?messageQueueUrl={queueUrl}", System.Net.HttpStatusCode.OK);

return messagesJson;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ public class AwsSdkSQSTest : NewRelicIntegrationTest<AwsSdkContainerSQSTestFixtu
private readonly AwsSdkContainerSQSTestFixture _fixture;

private readonly string _testQueueName = $"TestQueue-{Guid.NewGuid()}";
private readonly string _metricScope = "WebTransaction/MVC/AwsSdk/SQS_SendReceivePurge/{queueName}";
private readonly string _metricScope1 = "WebTransaction/MVC/AwsSdk/SQS_SendReceivePurge/{queueName}";
private string _messagesJson;

public AwsSdkSQSTest(AwsSdkContainerSQSTestFixture fixture, ITestOutputHelper output) : base(fixture)
{
Expand All @@ -27,16 +28,23 @@ public AwsSdkSQSTest(AwsSdkContainerSQSTestFixture fixture, ITestOutputHelper ou
_fixture.Actions(setupConfiguration: () =>
{
var configModifier = new NewRelicConfigModifier(_fixture.DestinationNewRelicConfigFilePath);
configModifier.SetLogLevel("debug");
configModifier.SetLogLevel("finest");
configModifier.ForceTransactionTraces();
configModifier.EnableDistributedTrace();
configModifier.ConfigureFasterMetricsHarvestCycle(15);
configModifier.ConfigureFasterSpanEventsHarvestCycle(15);
configModifier.ConfigureFasterTransactionTracesHarvestCycle(15);
configModifier.LogToConsole();
},
exerciseApplication: () =>
{
_fixture.Delay(15);
_fixture.ExerciseSQS(_testQueueName);
_fixture.ExerciseSQS_SendReceivePurge(_testQueueName);
_messagesJson = _fixture.ExerciseSQS_SendAndReceiveInSeparateTransactions(_testQueueName);
_fixture.Delay(15);
_fixture.AgentLog.WaitForLogLine(AgentLogBase.MetricDataLogLineRegex, TimeSpan.FromMinutes(2));
Expand All @@ -48,26 +56,30 @@ public AwsSdkSQSTest(AwsSdkContainerSQSTestFixture fixture, ITestOutputHelper ou
_fixture.Initialize();
}


[Fact]
public void Test()
{
var metrics = _fixture.AgentLog.GetMetrics().ToList();

var expectedMetrics = new List<Assertions.ExpectedMetric>
{
new() { metricName = $"MessageBroker/SQS/Queue/Produce/Named/{_testQueueName}", callCount = 2}, // SendMessage and SendMessageBatch
new() { metricName = $"MessageBroker/SQS/Queue/Produce/Named/{_testQueueName}", callCount = 2, metricScope = _metricScope},
new() { metricName = $"MessageBroker/SQS/Queue/Produce/Named/{_testQueueName}", callCount = 3}, // SendMessage and SendMessageBatch
new() { metricName = $"MessageBroker/SQS/Queue/Produce/Named/{_testQueueName}", callCount = 2, metricScope = _metricScope1},
new() { metricName = $"MessageBroker/SQS/Queue/Produce/Named/{_testQueueName}", callCount = 1, metricScope = "WebTransaction/MVC/AwsSdk/SQS_SendMessageToQueue/{message}/{messageQueueUrl}"},

new() { metricName = $"MessageBroker/SQS/Queue/Consume/Named/{_testQueueName}", callCount = 1},
new() { metricName = $"MessageBroker/SQS/Queue/Consume/Named/{_testQueueName}", callCount = 1, metricScope = _metricScope},

new() { metricName = $"MessageBroker/SQS/Queue/Consume/Named/{_testQueueName}", callCount = 3},
new() { metricName = $"MessageBroker/SQS/Queue/Consume/Named/{_testQueueName}", callCount = 2, metricScope = _metricScope1},
new() { metricName = $"MessageBroker/SQS/Queue/Consume/Named/{_testQueueName}", callCount = 1, metricScope = "WebTransaction/MVC/AwsSdk/SQS_ReceiveMessageFromQueue/{messageQueueUrl}"},

new() { metricName = $"MessageBroker/SQS/Queue/Purge/Named/{_testQueueName}", callCount = 1},
new() { metricName = $"MessageBroker/SQS/Queue/Purge/Named/{_testQueueName}", callCount = 1, metricScope = _metricScope},
new() { metricName = $"MessageBroker/SQS/Queue/Purge/Named/{_testQueueName}", callCount = 1, metricScope = _metricScope1},
};

var sendMessageTransactionEvent = _fixture.AgentLog.TryGetTransactionEvent(_metricScope);
var sendMessageTransactionEvent = _fixture.AgentLog.TryGetTransactionEvent(_metricScope1);

var transactionSample = _fixture.AgentLog.TryGetTransactionSample(_metricScope);
var transactionSample = _fixture.AgentLog.TryGetTransactionSample(_metricScope1);
var expectedTransactionTraceSegments = new List<string>
{
$"MessageBroker/SQS/Queue/Produce/Named/{_testQueueName}",
Expand All @@ -81,5 +93,23 @@ public void Test()
() => Assert.True(transactionSample != null, "transactionSample should not be null"),
() => Assertions.TransactionTraceSegmentsExist(expectedTransactionTraceSegments, transactionSample)
);

// verify that traceparent / tracestate / newrelic attributes were received in the messages
var jsonObject = System.Text.Json.JsonDocument.Parse(_messagesJson);
var messages = jsonObject.RootElement.EnumerateArray().ToList();
foreach (var message in messages)
{
var messageAttributes = message.GetProperty("messageAttributes").EnumerateObject().ToList();
var messageAttributesDict = messageAttributes.ToDictionary(
kvp => kvp.Name,
kvp => kvp.Value.GetProperty("stringValue").GetString()
);
NrAssert.Multiple(
() => Assert.True(messageAttributesDict.ContainsKey("traceparent"), "messageAttributesDict should contain traceparent"),
() => Assert.True(messageAttributesDict.ContainsKey("tracestate"), "messageAttributesDict should contain tracestate"),
() => Assert.True(messageAttributesDict.ContainsKey("newrelic"), "messageAttributesDict should contain newrelic")
);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,9 @@ public static void MetricsExist(IEnumerable<ExpectedMetric> expectedMetrics, IEn

if (expectedMetric.callCount.HasValue && matchedMetric.Values.CallCount != expectedMetric.callCount)
{
builder.AppendFormat($"Metric named {matchedMetric.MetricSpec.Name} scoped to {matchedMetric.MetricSpec.Scope ?? "nothing"} had an unexpected count of {matchedMetric.Values.CallCount} (Expected {expectedMetric.callCount})");
builder.AppendFormat("Metric named {0} scoped to {1} had an unexpected count of {2} (Expected {3})",
matchedMetric.MetricSpec.Name, matchedMetric.MetricSpec.Scope ?? "nothing",
matchedMetric.Values.CallCount, expectedMetric.callCount);
builder.AppendLine();
builder.AppendLine();

Expand Down

0 comments on commit e44d44c

Please sign in to comment.