Skip to content

Commit

Permalink
CustomMessageType Support for publish, subscribe, Signal, Files.
Browse files Browse the repository at this point in the history
Acceptance Tests step implementation for custom messageType
  • Loading branch information
mohitpubnub committed Nov 19, 2024
1 parent 7f4180c commit 43dee13
Show file tree
Hide file tree
Showing 30 changed files with 2,277 additions and 92 deletions.
2 changes: 1 addition & 1 deletion src/Api/PubnubApi/Builder/StatusBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public PNStatus CreateStatusResponse<T>(PNOperationType type, PNStatusCategory c
status.Category = PNStatusCategory.PNAccessDeniedCategory;
}


status.Uuid = config.UserId;
status.Origin = config.Origin;
status.TlsEnabled = config.Secure;

Expand Down
10 changes: 10 additions & 0 deletions src/Api/PubnubApi/EndPoint/Files/PublishFileMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class PublishFileMessageOperation : PubnubCoreBase
private object publishMessageContent;
private bool storeInHistory = true;
private Dictionary<string, object> userMetadata;
private string customMessageType;
private int ttl = -1;

public PublishFileMessageOperation(PNConfiguration pubnubConfig, IJsonPluggableLibrary jsonPluggableLibrary, IPubnubUnitTest pubnubUnit, IPubnubLog log, EndPoint.TokenManager tokenManager, Pubnub instance) : base(pubnubConfig, jsonPluggableLibrary, pubnubUnit, log, tokenManager, instance)
Expand Down Expand Up @@ -76,6 +77,11 @@ public PublishFileMessageOperation FileName(string name)
this.currentFileName = name;
return this;
}
public PublishFileMessageOperation CustomMessageType(string customMessageType)
{
this.customMessageType = customMessageType;
return this;
}

public PublishFileMessageOperation QueryParam(Dictionary<string, object> customQueryParam)
{
Expand Down Expand Up @@ -250,6 +256,10 @@ private RequestParameter CreateRequestParameter()
if (!storeInHistory) {
requestQueryStringParams.Add("store", "0");
}

if (!string.IsNullOrEmpty(customMessageType)) {
requestQueryStringParams.Add("custom_message_type", customMessageType);
}

if (queryParam != null && queryParam.Count > 0) {
foreach (KeyValuePair<string, object> kvp in queryParam) {
Expand Down
41 changes: 28 additions & 13 deletions src/Api/PubnubApi/EndPoint/Files/SendFileOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class SendFileOperation : PubnubCoreBase
private bool storeInHistory = true;
private Dictionary<string, object> userMetadata;
private int ttl = -1;
private string customMessageType;

public SendFileOperation(PNConfiguration pubnubConfig, IJsonPluggableLibrary jsonPluggableLibrary, IPubnubUnitTest pubnubUnit, IPubnubLog log, TokenManager tokenManager, Pubnub instance) : base(pubnubConfig, jsonPluggableLibrary, pubnubUnit, log, tokenManager, instance)
{
Expand Down Expand Up @@ -67,6 +68,12 @@ public SendFileOperation Ttl(int ttl)
this.ttl = ttl;
return this;
}

public SendFileOperation CustomMessageType(string customMessageType)
{
this.customMessageType = customMessageType;
return this;
}

public SendFileOperation File(string fileNameWithFullPath)
{
Expand Down Expand Up @@ -180,11 +187,12 @@ private void ProcessFileUpload(PNCallback<PNFileUploadResult> callback)
int publishFileRetryLimit = config.FileMessagePublishRetryLimit;
int currentFileRetryCount = 0;
bool publishFailed;
PNStatus publishFileMessageStatus;
do {
currentFileRetryCount += 1;
PNResult<PNPublishFileMessageResult> publishFileMessageResponse = PublishFileMessage(publishPayload, queryParam).Result;
PNPublishFileMessageResult publishFileMessage = publishFileMessageResponse.Result;
PNStatus publishFileMessageStatus = publishFileMessageResponse.Status;
publishFileMessageStatus = publishFileMessageResponse.Status;
if (publishFileMessageStatus != null && !publishFileMessageStatus.Error && publishFileMessage != null) {
publishFailed = false;
PNFileUploadResult result = new PNFileUploadResult();
Expand All @@ -202,11 +210,11 @@ private void ProcessFileUpload(PNCallback<PNFileUploadResult> callback)
LoggingMethod.WriteToLog(pubnubLog, string.Format(CultureInfo.InvariantCulture, "DateTime {0} PublishFileMessage Failed. currentFileRetryCount={1}", DateTime.Now.ToString(CultureInfo.InvariantCulture), currentFileRetryCount), config.LogVerbosity);
}
}
while (publishFailed && currentFileRetryCount <= publishFileRetryLimit);
while (publishFailed && currentFileRetryCount <= publishFileRetryLimit && !(publishFileMessageStatus?.StatusCode != 400 || publishFileMessageStatus.StatusCode != 403));
} else {
int statusCode = PNStatusCodeHelper.GetHttpStatusCode(transportResponse.Error.Message);
PNStatusCategory category = PNStatusCategoryHelper.GetPNStatusCategory(statusCode, transportResponse.Error.Message);
PNStatus status = new StatusBuilder(config, jsonLibrary).CreateStatusResponse(PNOperationType.PNFileUploadOperation, category, requestState, statusCode, new PNException(transportResponse.Error.Message, transportResponse.Error));
int statusCode = PNStatusCodeHelper.GetHttpStatusCode(transportResponse?.Error?.Message);
PNStatusCategory category = PNStatusCategoryHelper.GetPNStatusCategory(statusCode, transportResponse?.Error?.Message);
PNStatus status = new StatusBuilder(config, jsonLibrary).CreateStatusResponse(PNOperationType.PNFileUploadOperation, category, requestState, statusCode, new PNException(transportResponse?.Error?.Message, transportResponse?.Error));
requestState.PubnubCallback.OnResponse(default, status);
}
}
Expand All @@ -229,11 +237,13 @@ private async Task<PNResult<PNFileUploadResult>> ProcessFileUpload()
return returnValue;
}
LoggingMethod.WriteToLog(pubnubLog, string.Format(CultureInfo.InvariantCulture, "DateTime {0} GenerateFileUploadUrl executed.", DateTime.Now.ToString(CultureInfo.InvariantCulture)), config.LogVerbosity);
RequestState<PNFileUploadResult> requestState = new RequestState<PNFileUploadResult>();
requestState.ResponseType = PNOperationType.PNFileUploadOperation;
requestState.Reconnect = false;
requestState.EndPointOperation = this;
requestState.UsePostMethod = true;
RequestState<PNFileUploadResult> requestState = new RequestState<PNFileUploadResult>
{
ResponseType = PNOperationType.PNFileUploadOperation,
Reconnect = false,
EndPointOperation = this,
UsePostMethod = true
};
byte[] sendFileByteArray = sendFileBytes ?? GetByteArrayFromFilePath(sendFileFullPath);
string dataBoundary = string.Format(CultureInfo.InvariantCulture, "----------{0:N}", Guid.NewGuid());
string contentType = "multipart/form-data; boundary=" + dataBoundary;
Expand Down Expand Up @@ -303,11 +313,12 @@ private async Task<PNResult<PNFileUploadResult>> ProcessFileUpload()
int publishFileRetryLimit = config.FileMessagePublishRetryLimit;
int currentFileRetryCount = 0;
bool publishFailed;
PNStatus publishFileMessageStatus;
do {
currentFileRetryCount += 1;
PNResult<PNPublishFileMessageResult> publishFileMessageResponse = await PublishFileMessage(publishPayload, queryParam).ConfigureAwait(false);
PNPublishFileMessageResult publishFileMessage = publishFileMessageResponse.Result;
PNStatus publishFileMessageStatus = publishFileMessageResponse.Status;
publishFileMessageStatus = publishFileMessageResponse.Status;
if (publishFileMessageStatus != null && !publishFileMessageStatus.Error && publishFileMessage != null) {
publishFailed = false;
PNFileUploadResult result = new PNFileUploadResult
Expand All @@ -326,7 +337,7 @@ private async Task<PNResult<PNFileUploadResult>> ProcessFileUpload()
await Task.Delay(1000);
}
}
while (publishFailed && currentFileRetryCount <= publishFileRetryLimit);
while (publishFailed && currentFileRetryCount <= publishFileRetryLimit && !(publishFileMessageStatus?.StatusCode != 400 || publishFileMessageStatus.StatusCode != 403));
}

return returnValue;
Expand Down Expand Up @@ -403,7 +414,7 @@ private async Task<PNResult<PNPublishFileMessageResult>> PublishFileMessage(obje
ResponseBuilder responseBuilder = new ResponseBuilder(config, jsonLibrary, pubnubLog);
PNPublishFileMessageResult publishResult = responseBuilder.JsonToObject<PNPublishFileMessageResult>(result, true);
StatusBuilder statusBuilder = new StatusBuilder(config, jsonLibrary);
if (publishResult != null) {
if (publishResult != null && transportResponse.StatusCode == Constants.HttpRequestSuccessStatusCode) {
returnValue.Result = publishResult;
PNStatus status = statusBuilder.CreateStatusResponse(requestState.ResponseType, PNStatusCategory.PNAcknowledgmentCategory, requestState, transportResponse.StatusCode, null);
returnValue.Status = status;
Expand Down Expand Up @@ -564,6 +575,10 @@ private RequestParameter CreatePublishFileMessageRequestParameter()
if (storeInHistory && ttl >= 0) {
requestQueryStringParams.Add("ttl", ttl.ToString(CultureInfo.InvariantCulture));
}

if (!string.IsNullOrEmpty(customMessageType)) {
requestQueryStringParams.Add("custom_message_type", customMessageType);
}

if (!storeInHistory) {
requestQueryStringParams.Add("store", "0");
Expand Down
74 changes: 50 additions & 24 deletions src/Api/PubnubApi/EndPoint/PubSub/PublishOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class PublishOperation : PubnubCoreBase
private bool httpPost;
private Dictionary<string, object> userMetadata;
private int ttl = -1;
private string customMessageType;
private PNCallback<PNPublishResult> savedCallback;
private bool syncRequest;
private Dictionary<string, object> queryParam;
Expand Down Expand Up @@ -75,6 +76,12 @@ public PublishOperation Ttl(int ttl)
this.ttl = ttl;
return this;
}

public PublishOperation CustomMessageType(string customMessageType)
{
this.customMessageType = customMessageType;
return this;
}

public PublishOperation QueryParam(Dictionary<string, object> customQueryParam)
{
Expand Down Expand Up @@ -157,12 +164,14 @@ internal void Publish(string channel, object message, bool storeInHistory, int t
callback.OnResponse(null, status);
return;
}
RequestState<PNPublishResult> requestState = new RequestState<PNPublishResult>();
requestState.Channels = new[] { channel };
requestState.ResponseType = PNOperationType.PNPublishOperation;
requestState.PubnubCallback = callback;
requestState.Reconnect = false;
requestState.EndPointOperation = this;
RequestState<PNPublishResult> requestState = new RequestState<PNPublishResult>
{
Channels = [channel],
ResponseType = PNOperationType.PNPublishOperation,
PubnubCallback = callback,
Reconnect = false,
EndPointOperation = this
};

var requestParameters = CreateRequestParameter();
var transportRequest = PubnubInstance.transportMiddleware.PreapareTransportRequest(requestParameter: requestParameters, operationType: PNOperationType.PNPublishOperation);
Expand Down Expand Up @@ -220,27 +229,30 @@ internal async Task<PNResult<PNPublishResult>> Publish(string channel, object me
returnValue.Status = errStatus;
return returnValue;
}
RequestState<PNPublishResult> requestState = new RequestState<PNPublishResult>();
Tuple<string, PNStatus> JsonAndStatusTuple;
requestState.Channels = new[] { channel };
requestState.ResponseType = PNOperationType.PNPublishOperation;
requestState.Reconnect = false;
requestState.EndPointOperation = this;
RequestState<PNPublishResult> requestState = new RequestState<PNPublishResult>
{
Channels = [channel],
ResponseType = PNOperationType.PNPublishOperation,
Reconnect = false,
EndPointOperation = this
};
var requestParameter = CreateRequestParameter();
var transportRequest = PubnubInstance.transportMiddleware.PreapareTransportRequest(requestParameter: requestParameter, operationType: PNOperationType.PNPublishOperation);
var transportResponse = await PubnubInstance.transportMiddleware.Send(transportRequest).ConfigureAwait(false);
if (transportResponse.Error == null) {
string responseString = Encoding.UTF8.GetString(transportResponse.Content);
PNStatus errorStatus = GetStatusIfError(requestState, responseString);
if (errorStatus == null) {
PNStatus errorStatus = GetStatusIfError<PNPublishResult>(requestState, responseString);
Tuple<string, PNStatus> jsonAndStatusTuple;
if (errorStatus == null && transportResponse.StatusCode == Constants.HttpRequestSuccessStatusCode) {
requestState.GotJsonResponse = true;
PNStatus status = new StatusBuilder(config, jsonLibrary).CreateStatusResponse(requestState.ResponseType, PNStatusCategory.PNAcknowledgmentCategory, requestState, Constants.HttpRequestSuccessStatusCode, null);
JsonAndStatusTuple = new Tuple<string, PNStatus>(responseString, status);
jsonAndStatusTuple = new Tuple<string, PNStatus>(responseString, status);
} else {
JsonAndStatusTuple = new Tuple<string, PNStatus>("", errorStatus);
requestState.GotJsonResponse = true;
jsonAndStatusTuple = new Tuple<string, PNStatus>(responseString??"", errorStatus);
}
returnValue.Status = JsonAndStatusTuple.Item2;
string json = JsonAndStatusTuple.Item1;
returnValue.Status = jsonAndStatusTuple.Item2;
string json = jsonAndStatusTuple.Item1;

if (!string.IsNullOrEmpty(json)) {
List<object> result = ProcessJsonResponse(requestState, json);
Expand All @@ -257,6 +269,16 @@ internal async Task<PNResult<PNPublishResult>> Publish(string channel, object me
}
}
}
else
{
PNStatusCategory category =
PNStatusCategoryHelper.GetPNStatusCategory(400, result[1].ToString());
PNStatus status =
new StatusBuilder(config, jsonLibrary).CreateStatusResponse<PNPublishResult>(
PNOperationType.PNPublishOperation, category, requestState, 400,
new PNException(responseString));
returnValue.Status = status;
}
}
}
} else {
Expand Down Expand Up @@ -296,15 +318,15 @@ private string PrepareContent(object originalMessage)

private RequestParameter CreateRequestParameter()
{
List<string> urlSegments = new List<string>
{
List<string> urlSegments =
[
"publish",
config.PublishKey?? "",
config.SubscribeKey??"",
config.PublishKey ?? "",
config.SubscribeKey ?? "",
"0",
channelName,
"0"
};
];
if (!httpPost) {
urlSegments.Add(PrepareContent(this.publishContent));
}
Expand All @@ -323,7 +345,11 @@ private RequestParameter CreateRequestParameter()
requestQueryStringParams.Add("store", "0");
}

if (queryParam != null && queryParam.Count > 0) {
if (!string.IsNullOrEmpty(customMessageType)) {
requestQueryStringParams.Add("custom_message_type", customMessageType);
}

if (queryParam is { Count: > 0 }) {
foreach (KeyValuePair<string, object> kvp in queryParam) {
if (!requestQueryStringParams.ContainsKey(kvp.Key)) {
requestQueryStringParams.Add(kvp.Key, UriUtil.EncodeUriComponent(kvp.Value.ToString(), PNOperationType.PNPublishOperation, false, false, false));
Expand Down
Loading

0 comments on commit 43dee13

Please sign in to comment.