Skip to content

Commit

Permalink
Send Message
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret committed Apr 3, 2024
1 parent c12745d commit 224d352
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 3 deletions.
5 changes: 5 additions & 0 deletions src/ArtemisNetCoreClient/ByteBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -324,4 +324,9 @@ public void WriteSize()
_memoryStream.TryGetBuffer(out var buffer);
BinaryPrimitives.WriteInt32BigEndian(buffer, buffer.Count - sizeof(int));
}

public long ReadableBytes()
{
return _memoryStream.Length - _memoryStream.Position;
}
}
2 changes: 2 additions & 0 deletions src/ArtemisNetCoreClient/Framing/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public class Message

internal void Encode(ByteBuffer buffer)
{
// TODO: Add comment why it's 17 ;-)
buffer.WriteInt(Body.Length + 17);
buffer.WriteInt(Body.Length);
buffer.WriteBytes(Body);

Expand Down
8 changes: 8 additions & 0 deletions src/ArtemisNetCoreClient/Framing/NullResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,13 @@ public override void Encode(ByteBuffer buffer)

public override void Decode(ByteBuffer buffer)
{
if (buffer.ReadableBytes() >= sizeof(long))
{
CorrelationId = buffer.ReadLong();
}
else
{
CorrelationId = -1;
}
}
}
2 changes: 1 addition & 1 deletion src/ArtemisNetCoreClient/Framing/Packet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace ActiveMQ.Artemis.Core.Client.Framing;

internal abstract class Packet
{
public long CorrelationId { get; init; } = -1;
public long CorrelationId { get; set; } = -1;
public virtual bool IsResponse => false;

public abstract void Encode(ByteBuffer buffer);
Expand Down
2 changes: 1 addition & 1 deletion src/ArtemisNetCoreClient/IProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ await session.SendBlockingAsync<SessionSendMessageV3, NullResponse>(new SessionS
Message = message,
ProducerId = ProducerId,
RequiresResponse = true,
CorrelationId = 1000
CorrelationId = -4
}, cancellationToken);
}
}
2 changes: 1 addition & 1 deletion test/ArtemisNetCoreClient.Tests/ProducerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace ActiveMQ.Artemis.Core.Client.Tests;

public class ProducerTests
{
[Test, Ignore("Failing")]
[Test]
public async Task should_send_message()
{
// Arrange
Expand Down

0 comments on commit 224d352

Please sign in to comment.