Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMQNET-625: Content Property of IBytesMessage should be idempotent #48

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/NMS.AMQP/Message/Facade/INmsBytesMessageFacade.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ public interface INmsBytesMessageFacade : INmsMessageFacade
BinaryWriter GetDataWriter();
void Reset();
long BodyLength { get; }
byte[] Content { get; set; }
}
}
17 changes: 12 additions & 5 deletions src/NMS.AMQP/Message/NmsBytesMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ public byte[] Content
{
get
{
byte[] buffer = new byte [BodyLength];
ReadBytes(buffer);
return buffer;
CheckWriteOnlyBody();
return this.facade.Content;
}
set
{
CheckReadOnlyBody();
this.facade.Content = value;
}
set => WriteBytes(value);
}

public byte ReadByte()
Expand Down Expand Up @@ -292,15 +295,19 @@ public int ReadBytes(byte[] value)
public int ReadBytes(byte[] value, int length)
{
InitializeReading();
return ReadBytes(dataIn, value, length);
}

private int ReadBytes(BinaryReader binaryReader, byte[] value, int length)
{
if (length < 0 || value.Length < length)
{
throw new IndexOutOfRangeException("length must not be negative or larger than the size of the provided array");
}

try
{
return dataIn.Read(value, 0, length);
return binaryReader.Read(value, 0, length);
}
catch (EndOfStreamException e)
{
Expand Down
71 changes: 37 additions & 34 deletions src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsBytesMessageFacade.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ public class AmqpNmsBytesMessageFacade : AmqpNmsMessageFacade, INmsBytesMessageF
private EndianBinaryReader byteIn = null;
private EndianBinaryWriter byteOut = null;

private static readonly Data EMPTY_DATA = new Data { Binary = new byte[0] };
private static readonly byte[] EMPTY_BINARY = new byte[0];
private static readonly Data EMPTY_DATA = new Data { Binary = EMPTY_BINARY };

public override sbyte? JmsMsgType => MessageSupport.JMS_TYPE_BYTE;
public long BodyLength => GetBinaryFromBody().Binary.LongLength;
public long BodyLength => Content.Length;

public BinaryReader GetDataReader()
{
Expand All @@ -45,58 +46,60 @@ public BinaryReader GetDataReader()

if (byteIn == null)
{
Data body = GetBinaryFromBody();
Stream dataStream = new MemoryStream(body.Binary, false);
byte[] body = Content;
Stream dataStream = new MemoryStream(body, false);
byteIn = new EndianBinaryReader(dataStream);
}

return byteIn;
}

private Data GetBinaryFromBody()
public byte[] Content
{
RestrictedDescribed body = Message.BodySection;
Data result = EMPTY_DATA;
if (body == null)
get
{
return result;
}
else if (body is Data)
{
byte[] binary = (body as Data).Binary;
if (binary != null && binary.Length != 0)
RestrictedDescribed body = Message.BodySection;
byte[] result = EMPTY_BINARY;
if (body == null)
{
return body as Data;
return result;
}
}
else if (body is AmqpValue)
{
object value = (body as AmqpValue).Value;
if (value == null)
else if (body is Data)
{
return result;
byte[] binary = (body as Data).Binary;
if (binary != null && binary.Length != 0)
{
return binary;
}
}

if (value is byte[])
else if (body is AmqpValue)
{
byte[] dataValue = value as byte[];
if (dataValue.Length > 0)
object value = (body as AmqpValue).Value;
if (value == null)
{
result = new Data();
result.Binary = dataValue;
return result;
}
if (value is byte[])
{
byte[] dataValue = value as byte[];
if (dataValue.Length > 0)
{
return dataValue;
}
}
else
{
throw new IllegalStateException("Unexpected Amqp value content-type: " + value.GetType().FullName);
}
}
else
{
throw new IllegalStateException("Unexpected Amqp value content-type: " + value.GetType().FullName);
throw new IllegalStateException("Unexpected body content-type: " + body.GetType().FullName);
}
}
else
{
throw new IllegalStateException("Unexpected body content-type: " + body.GetType().FullName);
}

return result;
return result;
}
set => Message.BodySection = new Data { Binary = value };
}

public BinaryWriter GetDataWriter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@ public class NmsTestBytesMessageFacade : NmsTestMessageFacade, INmsBytesMessageF
{
private BinaryWriter bytesOut = null;
private BinaryReader bytesIn = null;
private byte[] content = null;

public NmsTestBytesMessageFacade()
{
content = new byte[0];
Content = new byte[0];
}

public NmsTestBytesMessageFacade(byte[] content)
{
this.content = content;
this.Content = content;
}

public BinaryReader GetDataReader()
Expand All @@ -45,7 +44,7 @@ public BinaryReader GetDataReader()
throw new IllegalStateException("Body is being written to, cannot perform a read.");
}

return bytesIn ?? (bytesIn = new BinaryReader(new MemoryStream(content)));
return bytesIn ?? (bytesIn = new BinaryReader(new MemoryStream(Content)));
}

public BinaryWriter GetDataWriter()
Expand All @@ -65,7 +64,7 @@ public void Reset()
bytesOut.BaseStream.Position = 0;
bytesOut.BaseStream.CopyTo(byteStream);

content = byteStream.ToArray();
Content = byteStream.ToArray();

byteStream.Close();
bytesOut.Close();
Expand All @@ -81,9 +80,10 @@ public void Reset()
public override void ClearBody()
{
this.Reset();
content = new byte[0];
Content = new byte[0];
}

public long BodyLength => content?.LongLength ?? 0;
public long BodyLength => Content?.LongLength ?? 0;
public byte[] Content { get; set; }
}
}
38 changes: 38 additions & 0 deletions test/Apache-NMS-AMQP-Test/Message/NmsBytesMessageTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -534,5 +534,43 @@ public void TestMessageCopy()
NmsBytesMessage copy = message.Copy() as NmsBytesMessage;
Assert.IsNotNull(copy);
}

[Test]
public void TestMessageContentCanBeObtainedMultipleTimesWithoutReset()
{
byte[] bytes = Encoding.UTF8.GetBytes("myBytes");
NmsBytesMessage message = factory.CreateBytesMessage();
message.Content = bytes;
message.Reset();

CollectionAssert.AreEqual(bytes, message.Content);
CollectionAssert.AreEqual(bytes, message.Content);
}

[Test]
public void TestConsecutiveReadBytes()
{
byte[] bytes = CreateBytesArrayOfSize(24);
NmsBytesMessage message = factory.CreateBytesMessage(bytes);
message.Reset();

CollectionAssert.AreEqual(bytes.Take(8), ReadBytes(message, 8));
CollectionAssert.AreEqual(bytes.Skip(8).Take(8), ReadBytes(message, 8));
CollectionAssert.AreEqual(bytes.Skip(16).Take(8), ReadBytes(message, 8));
CollectionAssert.AreEqual(new byte[8], ReadBytes(message, 8));
}

private static byte[] CreateBytesArrayOfSize(int size)
{
var random = new Random();
return Enumerable.Range(0, size).Select(x => (byte) random.Next(byte.MaxValue)).ToArray();
}

private static byte[] ReadBytes(IBytesMessage message, int count)
{
byte[] bytes = new byte[count];
message.ReadBytes(bytes);
return bytes;
}
}
}