diff --git a/src/ArtemisNetCoreClient/ByteBuffer.cs b/src/ArtemisNetCoreClient/ByteBuffer.cs index 1536a7c..7d3d519 100644 --- a/src/ArtemisNetCoreClient/ByteBuffer.cs +++ b/src/ArtemisNetCoreClient/ByteBuffer.cs @@ -324,4 +324,9 @@ public void WriteSize() _memoryStream.TryGetBuffer(out var buffer); BinaryPrimitives.WriteInt32BigEndian(buffer, buffer.Count - sizeof(int)); } + + public long ReadableBytes() + { + return _memoryStream.Length - _memoryStream.Position; + } } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Framing/Message.cs b/src/ArtemisNetCoreClient/Framing/Message.cs index c2b0a6e..7d6e851 100644 --- a/src/ArtemisNetCoreClient/Framing/Message.cs +++ b/src/ArtemisNetCoreClient/Framing/Message.cs @@ -21,6 +21,8 @@ public class Message internal void Encode(ByteBuffer buffer) { + // TODO: Add comment why it's 17 ;-) + buffer.WriteInt(Body.Length + 17); buffer.WriteInt(Body.Length); buffer.WriteBytes(Body); diff --git a/src/ArtemisNetCoreClient/Framing/NullResponse.cs b/src/ArtemisNetCoreClient/Framing/NullResponse.cs index b48a157..1678594 100644 --- a/src/ArtemisNetCoreClient/Framing/NullResponse.cs +++ b/src/ArtemisNetCoreClient/Framing/NullResponse.cs @@ -10,5 +10,13 @@ public override void Encode(ByteBuffer buffer) public override void Decode(ByteBuffer buffer) { + if (buffer.ReadableBytes() >= sizeof(long)) + { + CorrelationId = buffer.ReadLong(); + } + else + { + CorrelationId = -1; + } } } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Framing/Packet.cs b/src/ArtemisNetCoreClient/Framing/Packet.cs index 0d50635..b89b6d7 100644 --- a/src/ArtemisNetCoreClient/Framing/Packet.cs +++ b/src/ArtemisNetCoreClient/Framing/Packet.cs @@ -2,7 +2,7 @@ namespace ActiveMQ.Artemis.Core.Client.Framing; internal abstract class Packet { - public long CorrelationId { get; init; } = -1; + public long CorrelationId { get; set; } = -1; public virtual bool IsResponse => false; public abstract void Encode(ByteBuffer buffer); diff --git a/src/ArtemisNetCoreClient/IProducer.cs b/src/ArtemisNetCoreClient/IProducer.cs index fe12aa3..1facef9 100644 --- a/src/ArtemisNetCoreClient/IProducer.cs +++ b/src/ArtemisNetCoreClient/IProducer.cs @@ -27,7 +27,7 @@ await session.SendBlockingAsync(new SessionS Message = message, ProducerId = ProducerId, RequiresResponse = true, - CorrelationId = 1000 + CorrelationId = -4 }, cancellationToken); } } \ No newline at end of file diff --git a/test/ArtemisNetCoreClient.Tests/ProducerTests.cs b/test/ArtemisNetCoreClient.Tests/ProducerTests.cs index e3609e6..f4e1ab1 100644 --- a/test/ArtemisNetCoreClient.Tests/ProducerTests.cs +++ b/test/ArtemisNetCoreClient.Tests/ProducerTests.cs @@ -4,7 +4,7 @@ namespace ActiveMQ.Artemis.Core.Client.Tests; public class ProducerTests { - [Test, Ignore("Failing")] + [Test] public async Task should_send_message() { // Arrange