Skip to content

Commit

Permalink
fix ValidationFunction (#19)
Browse files Browse the repository at this point in the history
* fix(CHUNK_SIZE): Update enqueue method

* Bump to version 0.0.14-1-fix-queue-visibility [skip ci]

* Bump to version 0.0.14-2-fix-queue-visibility [skip ci]

---------

Co-authored-by: pagopa-github-bot <[email protected]>
  • Loading branch information
cap-ang and pagopa-github-bot authored May 28, 2024
1 parent 2652e69 commit 157dd9a
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 28 deletions.
4 changes: 2 additions & 2 deletions helm/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ apiVersion: v2
name: pagopa-gpd-upload-function
description: Microservice description
type: application
version: 0.89.0
appVersion: 0.0.14-1-fix-queue-visibility
version: 0.90.0
appVersion: 0.0.14-2-fix-queue-visibility
dependencies:
- name: microservice-chart
version: 2.4.0
Expand Down
2 changes: 1 addition & 1 deletion helm/values-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ microservice-chart:
fullnameOverride: ""
image:
repository: ghcr.io/pagopa/pagopa-gpd-upload-function
tag: "0.0.14-1-fix-queue-visibility"
tag: "0.0.14-2-fix-queue-visibility"
pullPolicy: Always
# https://github.com/Azure/azure-functions-host/blob/dev/src/WebJobs.Script.WebHost/Controllers/HostController.cs
livenessProbe:
Expand Down
6 changes: 3 additions & 3 deletions helm/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ microservice-chart:
fullnameOverride: ""
image:
repository: ghcr.io/pagopa/pagopa-gpd-upload-function
tag: "0.0.14-1-fix-queue-visibility"
tag: "0.0.14-2-fix-queue-visibility"
pullPolicy: Always
# https://github.com/Azure/azure-functions-host/blob/dev/src/WebJobs.Script.WebHost/Controllers/HostController.cs
livenessProbe:
Expand Down Expand Up @@ -89,8 +89,8 @@ microservice-chart:
GPD_CONTAINER_NAME: "gpd_upload_status"
BLOB_EVENTS_QUEUE: "gpd-blob-events-queue"
VALID_POSITIONS_QUEUE: "gpd-valid-debt-position-queue"
CHUNK_SIZE: "50"
MAX_RETRY: "1"
CHUNK_SIZE: "20"
MAX_RETRY: "2"
RETRY_DELAY_IN_SECONDS: "300"
envFieldRef:
APP_NAME: "metadata.labels['app.kubernetes.io/instance']"
Expand Down
6 changes: 3 additions & 3 deletions helm/values-uat.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ microservice-chart:
fullnameOverride: ""
image:
repository: ghcr.io/pagopa/pagopa-gpd-upload-function
tag: "0.0.14-1-fix-queue-visibility"
tag: "0.0.14-2-fix-queue-visibility"
pullPolicy: Always
# https://github.com/Azure/azure-functions-host/blob/dev/src/WebJobs.Script.WebHost/Controllers/HostController.cs
livenessProbe:
Expand Down Expand Up @@ -89,8 +89,8 @@ microservice-chart:
GPD_CONTAINER_NAME: "gpd_upload_status"
BLOB_EVENTS_QUEUE: "gpd-blob-events-queue"
VALID_POSITIONS_QUEUE: "gpd-valid-debt-position-queue"
CHUNK_SIZE: "50"
MAX_RETRY: "1"
CHUNK_SIZE: "20"
MAX_RETRY: "2"
RETRY_DELAY_IN_SECONDS: "300"
envFieldRef:
APP_NAME: "metadata.labels['app.kubernetes.io/instance']"
Expand Down
3 changes: 2 additions & 1 deletion host.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
"logLevel": {
"default": "None",
"Host.Results": "Error",
"Function": "Error",
"Function.BlobQueueEventFunction": "Information",
"Function.PaymentPositionDequeueFunction": "Information",
"Microsoft": "Information",
"Worker": "Information",
"Host.Aggregator": "Error",
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>it.gov.pagopa.gpd.upload</groupId>
<artifactId>gpd-upload-function</artifactId>
<version>0.0.14-1-fix-queue-visibility</version>
<version>0.0.14-2-fix-queue-visibility</version>
<packaging>jar</packaging>

<name>GPD-Upload-Function</name>
Expand Down
14 changes: 6 additions & 8 deletions src/main/java/it/gov/pagopa/gpd/upload/ValidationFunction.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,7 @@ else if(iupds != null)
}

// enqueue chunk and other input to form message
enqueue(ctx, om, input.getOperation(), pps, iupds, uploadKey, fiscalCode, broker);

return true;
return enqueue(ctx, om, input.getOperation(), pps, iupds, uploadKey, fiscalCode, broker);
} catch (JsonMappingException e) {
// todo: in this case is a BAD_REQUEST -> update status
ctx.getLogger().log(Level.SEVERE, () -> String.format("[id=%s][ValidationFunction] Processing function exception: %s, caused by: %s", ctx.getInvocationId(), e.getMessage(), e.getCause()));
Expand All @@ -138,16 +136,16 @@ public BinaryData downloadBlob(ExecutionContext ctx, String broker, String fisca
}

public Status createStatus(ExecutionContext ctx, String broker, String orgFiscalCode, String uploadKey, int size) throws AppException {
Status status = StatusService.getInstance(ctx.getLogger())
return StatusService.getInstance(ctx.getLogger())
.createStatus(ctx.getInvocationId(), broker, orgFiscalCode, uploadKey, size);
return status;
}

public boolean enqueue(ExecutionContext ctx, ObjectMapper om, CRUDOperation operation, List<PaymentPosition> paymentPositions, List<String> IUPDList, String uploadKey, String fiscalCode, String broker) {
QueueMessage.QueueMessageBuilder builder = QueueService.getInstance(ctx.getLogger()).generateMessageBuilder(operation, uploadKey, fiscalCode, broker);
QueueService queueService = QueueService.getInstance(ctx.getLogger());
QueueMessage.QueueMessageBuilder builder = queueService.generateMessageBuilder(operation, uploadKey, fiscalCode, broker);
return switch (operation) {
case CREATE, UPDATE -> QueueService.getInstance(ctx.getLogger()).enqueueUpsertMessage(ctx, om, paymentPositions, builder, 0);
case DELETE -> QueueService.getInstance(ctx.getLogger()).enqueueDeleteMessage(ctx, om, IUPDList, builder, 0);
case CREATE, UPDATE -> queueService.enqueueUpsertMessage(ctx, om, paymentPositions, builder, 0, QueueService.CHUNK_SIZE);
case DELETE -> queueService.enqueueDeleteMessage(ctx, om, IUPDList, builder, 0);
};
}
}
25 changes: 16 additions & 9 deletions src/main/java/it/gov/pagopa/gpd/upload/service/QueueService.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package it.gov.pagopa.gpd.upload.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.Constants;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.queue.CloudQueue;
import com.microsoft.azure.storage.queue.CloudQueueMessage;
Expand All @@ -22,7 +22,7 @@ public class QueueService {
private static final String GPD_SA_CONNECTION_STRING = System.getenv("GPD_SA_CONNECTION_STRING");
private static final String VALID_POSITIONS_QUEUE =
System.getenv("VALID_POSITIONS_QUEUE") != null ? System.getenv("VALID_POSITIONS_QUEUE") : "VALID_POSITIONS_QUEUE";
private static final Integer CHUNK_SIZE = System.getenv("CHUNK_SIZE") != null ? Integer.parseInt(System.getenv("CHUNK_SIZE")) : 30;
public static final Integer CHUNK_SIZE = System.getenv("CHUNK_SIZE") != null ? Integer.parseInt(System.getenv("CHUNK_SIZE")) : 30;
private CloudQueue cloudQueue;
private Logger logger;

Expand Down Expand Up @@ -76,25 +76,32 @@ public boolean enqueueDeleteMessage(ExecutionContext ctx, ObjectMapper om, List<

try {
enqueue(ctx.getInvocationId(), om.writeValueAsString(message), delay);
} catch (JsonProcessingException e) {
} catch (Exception e) {
ctx.getLogger().log(Level.SEVERE, () -> String.format("[id=%s][QueueService] Processing function exception: %s, caused by: %s", ctx.getInvocationId(), e.getMessage(), e.getCause()));
return false;
}
}
return true;
}

public boolean enqueueUpsertMessage(ExecutionContext ctx, ObjectMapper om, List<PaymentPosition> paymentPositions, QueueMessage.QueueMessageBuilder builder, int delay) {
for (int i = 0; i < paymentPositions.size(); i += CHUNK_SIZE) {
public boolean enqueueUpsertMessage(ExecutionContext ctx, ObjectMapper om, List<PaymentPosition> paymentPositions, QueueMessage.QueueMessageBuilder builder, int delay, int chunk_size) {
chunk_size = chunk_size > 0 ? chunk_size : CHUNK_SIZE;
for (int i = 0; i < paymentPositions.size(); i += chunk_size) {
List<PaymentPosition> positionSubList = paymentPositions.subList(i, Math.min(i + CHUNK_SIZE, paymentPositions.size()));
QueueMessage message = builder.paymentPositions(positionSubList).build();
QueueMessage cloudMessage = builder.paymentPositions(positionSubList).build();

try {
enqueue(ctx.getInvocationId(), om.writeValueAsString(message), delay);
} catch (JsonProcessingException e) {
String message = om.writeValueAsString(cloudMessage);

if(message.length() > 64 * Constants.KB)
enqueueUpsertMessage(ctx, om, positionSubList, builder, delay, chunk_size/2);
else
enqueue(ctx.getInvocationId(), om.writeValueAsString(message), delay);
} catch (Exception e) {
ctx.getLogger().log(Level.SEVERE, () -> String.format("[id=%s][QueueService] Processing function exception: %s, caused by: %s", ctx.getInvocationId(), e.getMessage(), e.getCause()));
return false;
}
}
return true;
}
}
}

0 comments on commit 157dd9a

Please sign in to comment.