Skip to content

Commit

Permalink
Merge pull request #101 from memphisdev/fix-create-remove-consumer-pr…
Browse files Browse the repository at this point in the history
…oducer

Fix create remove consumer producer
  • Loading branch information
idanasulin2706 authored Jul 18, 2023
2 parents c7479bb + 63dcbc1 commit ba082de
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/Memphis.Client.UnitTests/MemphisClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public MemphisClientTest(ITestOutputHelper testOutputHelper)
_testOutputHelper = testOutputHelper;

this._brokerOptions = ConnectionFactory.GetDefaultOptions();
this._brokerOptions.User = "root";
this._connectionId = Guid.NewGuid().ToString();
this._connectionMock = new Mock<IConnection>();
this._jetStreamMock = new Mock<IJetStream>();
Expand Down
1 change: 1 addition & 0 deletions src/Memphis.Client/Consumer/MemphisConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public async Task DestroyAsync()
StationName = _consumerOptions.StationName,
ConnectionId = _memphisClient.ConnectionId,
Username = _memphisClient.Username,
RequestVersion = 1,
};

var removeConsumerModelJson = JsonSerDes.PrepareJsonString<RemoveConsumerRequest>(removeConsumerModel);
Expand Down
6 changes: 6 additions & 0 deletions src/Memphis.Client/MemphisClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public MemphisClient(Options brokerConnOptions, IConnection brokerConnection,
_jetStreamContext = jetStreamContext ?? throw new ArgumentNullException(nameof(jetStreamContext));
_connectionId = connectionId ?? throw new ArgumentNullException(nameof(connectionId));
_userName = brokerConnOptions.User;
int usernameSeparatorIndex = _userName.LastIndexOf('$');
if (usernameSeparatorIndex >= 0)
{
_userName = _userName.Substring(0, usernameSeparatorIndex);
}

_cancellationTokenSource = new();

Expand Down Expand Up @@ -298,6 +303,7 @@ public async Task<MemphisConsumer> CreateConsumer(MemphisConsumerOptions consume
UserName = _userName,
StartConsumeFromSequence = consumerOptions.StartConsumeFromSequence,
LastMessages = consumerOptions.LastMessages,
RequestVersion = 1,
};

var createConsumerModelJson = JsonSerDes.PrepareJsonString<CreateConsumerRequest>(createConsumerModel);
Expand Down
3 changes: 3 additions & 0 deletions src/Memphis.Client/Models/Request/CreateConsumerRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,8 @@ internal sealed class CreateConsumerRequest

[DataMember(Name = "last_messages")]
public int LastMessages { get; set; }

[DataMember(Name = "req_version")]
public int RequestVersion { get; set; }
}
}
3 changes: 3 additions & 0 deletions src/Memphis.Client/Models/Request/RemoveConsumerRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,8 @@ internal sealed class RemoveConsumerRequest

[DataMember(Name = "username")]
public string Username { get; set; }

[DataMember(Name = "req_version")]
public int RequestVersion { get; set; }
}
}
3 changes: 3 additions & 0 deletions src/Memphis.Client/Models/Request/RemoveProducerRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,8 @@ internal sealed class RemoveProducerRequest

[DataMember(Name = "username")]
public string Username { get; set; }

[DataMember(Name = "req_version")]
public int RequestVersion { get; set; }
}
}
3 changes: 2 additions & 1 deletion src/Memphis.Client/Producer/MemphisProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ public async Task DestroyAsync()
ProducerName = _producerName,
StationName = _stationName,
ConnectionId = _memphisClient.ConnectionId,
Username = _memphisClient.Username
Username = _memphisClient.Username,
RequestVersion = 1,
};

var removeProducerModelJson = JsonSerDes.PrepareJsonString<RemoveProducerRequest>(removeProducerModel);
Expand Down

0 comments on commit ba082de

Please sign in to comment.