diff --git a/.circleci/config.yml b/.circleci/config.yml index 0116e7f3..aa45b0b6 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -20,7 +20,8 @@ jobs: name: Build and Push Docker tag Image command: | # Set environment variables - IMAGE_TAG=importer_rdbms_2 + IMAGE_TAG=$CIRCLE_TAG + # Check if the Docker image with the same tag already exists in Docker Hub if curl -s -f -u "$DOCKERHUB_USERNAME":"$DOCKERHUB_PASSWORD" "https://hub.docker.com/v2/repositories/openmf/ph-ee-importer-rdbms/tags/$IMAGE_TAG" > /dev/null; then echo "Skipping the build and push as the tag $IMAGE_TAG already exists in Docker Hub." @@ -50,9 +51,19 @@ jobs: - run: name: Build Docker image command: | - IMAGE_TAG=importer_rdbms_2 + #Check for PR title Validity + + IMAGE_TAG=$CIRCLE_TAG ./gradlew clean bootJar - docker build -t openmf/ph-ee-importer-rdbms:importer_rdbms_2 . + docker build -t openmf/ph-ee-importer-rdbms:latest . + + if [ "$CIRCLE_BRANCH" != "develop" ]; then + PR_NUMBER=$(basename $CIRCLE_PULL_REQUEST) + PR_TITLE=$(curl -sSL "https://api.github.com/repos/openmf/$CIRCLE_PR_REPONAME/pulls/$PR_NUMBER" | jq -r '.title') + JIRA_STORY=$(echo $PR_TITLE | cut -d "[" -f2 | cut -d "]" -f1 | tr '[A-Z]' '[a-z]') + if [ -z "$JIRA_STORY" ]; then echo "Invalid PR title" && exit 1; else echo "Ticket NO: $JIRA_STORY"; fi + docker image tag openmf/$CIRCLE_PR_REPONAME:latest openmf/$CIRCLE_PR_REPONAME:$JIRA_STORY + fi # Log in to DockerHub using environment variables - run: name: Login to DockerHub @@ -61,8 +72,16 @@ jobs: # Push the Docker image to DockerHub - run: name: Push Docker image to DockerHub - command: docker push openmf/ph-ee-importer-rdbms:importer_rdbms_2 - + command: | + if [ "$CIRCLE_BRANCH" = "develop" ]; then + docker push openmf/ph-ee-importer-rdbms:latest + fi + if [ "$CIRCLE_BRANCH" != "develop" ]; then + PR_NUMBER=$(basename $CIRCLE_PULL_REQUEST) + PR_TITLE=$(curl -sSL "https://api.github.com/repos/openmf/$CIRCLE_PR_REPONAME/pulls/$PR_NUMBER" | jq -r '.title') + JIRA_STORY=$(echo $PR_TITLE | cut -d "[" -f2 | cut -d "]" -f1 | tr '[A-Z]' '[a-z]') + docker push openmf/$CIRCLE_PR_REPONAME:${JIRA_STORY} + fi workflows: version: 2 build-and-push: diff --git a/src/main/java/hu/dpc/phee/operator/config/AwsStorageConfig.java b/src/main/java/hu/dpc/phee/operator/config/AwsStorageConfig.java index 6e782d9a..9a9f8d4e 100644 --- a/src/main/java/hu/dpc/phee/operator/config/AwsStorageConfig.java +++ b/src/main/java/hu/dpc/phee/operator/config/AwsStorageConfig.java @@ -3,6 +3,7 @@ import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import org.springframework.beans.factory.annotation.Value; @@ -21,12 +22,16 @@ public class AwsStorageConfig { @Value("${cloud.aws.region.static}") private String region; + @Value("${cloud.aws.s3BaseUrl}") + private String endpoint; @Bean @ConditionalOnProperty(value = "cloud.aws.enabled", havingValue = "true") public AmazonS3 s3Client() { AWSCredentials credentials = new BasicAWSCredentials(accessKey, accessSecret); - return AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(credentials)).withRegion(region).build(); + return AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(credentials)) + .withPathStyleAccessEnabled(true).withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region)) + .build(); } } diff --git a/src/main/java/hu/dpc/phee/operator/entity/transfer/Transfer.java b/src/main/java/hu/dpc/phee/operator/entity/transfer/Transfer.java index 01c13fbf..ab0ff068 100644 --- a/src/main/java/hu/dpc/phee/operator/entity/transfer/Transfer.java +++ b/src/main/java/hu/dpc/phee/operator/entity/transfer/Transfer.java @@ -80,18 +80,6 @@ public class Transfer { @Column(name = "BATCH_ID") private String batchId; - @Column(name = "ENDTOENDIDENTIFICATION") - private String endToEndIdentification; - - @Column(name = "RECALL_STATUS") - private String recallStatus; - - @Column(name = "RECALL_DIRECTION") - private String recallDirection; - - @Column(name = "PAYMENT_STATUS") - private String paymentStatus; - @Column(name = "CLIENTCORRELATIONID") private String clientCorrelationId; diff --git a/src/main/java/hu/dpc/phee/operator/file/CsvFileService.java b/src/main/java/hu/dpc/phee/operator/file/CsvFileService.java new file mode 100644 index 00000000..19d28ede --- /dev/null +++ b/src/main/java/hu/dpc/phee/operator/file/CsvFileService.java @@ -0,0 +1,44 @@ +package hu.dpc.phee.operator.file; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; +import hu.dpc.phee.operator.entity.batch.Transaction; +import java.io.IOException; +import java.io.Reader; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class CsvFileService { + + @Autowired + private CsvMapper csvMapper; + + public List getTransactionList(String filename) { + List transactionList; + try { + CsvSchema schema = CsvSchema.emptySchema().withHeader(); + Reader reader = Files.newBufferedReader(Paths.get(filename), Charset.defaultCharset()); + MappingIterator readValues = csvMapper.readerWithSchemaFor(Transaction.class).with(schema).readValues(reader); + transactionList = new ArrayList<>(); + while (readValues.hasNext()) { + Transaction current = readValues.next(); + transactionList.add(current); + } + } catch (IOException e) { + log.debug(e.getMessage()); + log.error("Error building TransactionList for file: {}", filename); + return null; + } + return transactionList; + } + +} diff --git a/src/main/java/hu/dpc/phee/operator/streams/InflightBatchManager.java b/src/main/java/hu/dpc/phee/operator/streams/InflightBatchManager.java index ba20926c..788b7963 100644 --- a/src/main/java/hu/dpc/phee/operator/streams/InflightBatchManager.java +++ b/src/main/java/hu/dpc/phee/operator/streams/InflightBatchManager.java @@ -1,8 +1,5 @@ package hu.dpc.phee.operator.streams; -import com.fasterxml.jackson.databind.MappingIterator; -import com.fasterxml.jackson.dataformat.csv.CsvMapper; -import com.fasterxml.jackson.dataformat.csv.CsvSchema; import com.jayway.jsonpath.DocumentContext; import hu.dpc.phee.operator.config.TransferTransformerConfig; import hu.dpc.phee.operator.entity.batch.Batch; @@ -11,6 +8,8 @@ import hu.dpc.phee.operator.entity.tenant.ThreadLocalContextUtil; import hu.dpc.phee.operator.entity.transfer.Transfer; import hu.dpc.phee.operator.entity.transfer.TransferRepository; +import hu.dpc.phee.operator.entity.transfer.TransferStatus; +import hu.dpc.phee.operator.file.CsvFileService; import hu.dpc.phee.operator.file.FileTransferService; import hu.dpc.phee.operator.util.BatchFormatToTransferMapper; import org.slf4j.Logger; @@ -19,18 +18,13 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import java.io.IOException; -import java.io.Reader; import java.math.BigDecimal; -import java.nio.charset.Charset; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import static hu.dpc.phee.operator.util.OperatorUtils.strip; @@ -47,7 +41,7 @@ public class InflightBatchManager { private FileTransferService fileTransferService; @Autowired - private CsvMapper csvMapper; + private CsvFileService csvFileService; @Value("${application.bucket-name}") private String bucketName; @@ -93,22 +87,7 @@ private void updateTransferTableForBatch(Batch batch, Long workflowInstanceKey, workflowInstanceKey, filename); return; } - List transactionList; - try { - CsvSchema schema = CsvSchema.emptySchema().withHeader(); - Reader reader = Files.newBufferedReader(Paths.get(filename), Charset.defaultCharset()); - MappingIterator readValues = csvMapper.readerWithSchemaFor(Transaction.class).with(schema).readValues(reader); - transactionList = new ArrayList<>(); - while (readValues.hasNext()) { - Transaction current = readValues.next(); - transactionList.add(current); - } - } catch (IOException e) { - logger.debug(e.getMessage()); - logger.error("Error building TransactionList for batch with instance key {} and batch filename {}", workflowInstanceKey, - filename); - return; - } + List transactionList = csvFileService.getTransactionList(filename); for (Transaction transaction : transactionList) { Transfer transfer = BatchFormatToTransferMapper.mapToTransferEntity(transaction); @@ -133,6 +112,30 @@ private void updateTransferTableForBatch(Batch batch, Long workflowInstanceKey, } + public void updateTransferTableWithFailedTransaction(Long workflowInstanceKey, String filename) { + logger.info("Filename {}", filename); + if (filename == null) { + return; + } + filename = strip(filename); + String localFilePath = fileTransferService.downloadFile(filename, bucketName); + List transactionList = csvFileService.getTransactionList(localFilePath); + for (Transaction transaction : transactionList) { + Transfer transfer = BatchFormatToTransferMapper.mapToTransferEntity(transaction); + transfer.setStatus(TransferStatus.FAILED); + transfer.setWorkflowInstanceKey(workflowInstanceKey);; + transfer.setBatchId(strip(getBatchId(workflowInstanceKey))); + transfer.setStartedAt(new Date()); + transfer.setCompletedAt(new Date()); + transfer.setErrorInformation(transaction.getNote()); + transfer.setClientCorrelationId(UUID.randomUUID().toString()); + transfer.setTransactionId(UUID.randomUUID().toString()); + logger.debug("Inserting failed txn: {}", transfer); + logger.info("Inserting failed txn with note: {}", transaction.getNote()); + transferRepository.save(transfer); + } + } + public String getBatchFileName(Long workflowKey) { synchronized (workflowKeyBatchFileNameAssociations) { return workflowKeyBatchFileNameAssociations.get(workflowKey); diff --git a/src/main/java/hu/dpc/phee/operator/streams/RecordParser.java b/src/main/java/hu/dpc/phee/operator/streams/RecordParser.java index 9161ccda..d4ef2fd2 100644 --- a/src/main/java/hu/dpc/phee/operator/streams/RecordParser.java +++ b/src/main/java/hu/dpc/phee/operator/streams/RecordParser.java @@ -1,5 +1,6 @@ package hu.dpc.phee.operator.streams; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.jayway.jsonpath.DocumentContext; @@ -33,6 +34,7 @@ import javax.xml.xpath.XPathFactory; import java.io.IOException; import java.io.StringReader; +import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Optional; @@ -156,7 +158,7 @@ public List processWorkflowInstance(DocumentContext recordDocument, Stri return List.of(); } - public List processVariable(DocumentContext recordDocument, String bpmn, Long workflowInstanceKey, Long workflowKey, Long timestamp, String flowType, DocumentContext sample) { + public List processVariable(DocumentContext recordDocument, String bpmn, Long workflowInstanceKey, Long workflowKey, Long timestamp, String flowType, DocumentContext sample)throws JsonProcessingException { logger.info("Processing variable instance"); String variableName = recordDocument.read("$.value.name", String.class); String variableValue = recordDocument.read("$.value.value", String.class); @@ -170,6 +172,21 @@ public List processVariable(DocumentContext recordDocument, String bpmn, .withTimestamp(timestamp) .withValue(value)); + if(variableName.equals("subBatchDetails")) { + parseSubBatchDetails(variableValue); + } + + if (variableName.equals("failedTransactionFile")) { + // insert the transaction into transfer table + logger.info("Name {} and value: {}"); + inflightBatchManager.updateTransferTableWithFailedTransaction(workflowInstanceKey, value); + } + + if (variableName.equals("batchId")) { + logger.info("store batchid {} in tempDocStore for instance {}", strip(value), workflowInstanceKey); + inflightBatchManager.storeBatchId(workflowInstanceKey, value); + } + logger.debug("finding transformers for bpmn: {} and variable: {}", bpmn, variableName); List matchingTransformers = transferTransformerConfig.getFlows().stream() .filter(it -> bpmn.equalsIgnoreCase(it.getName())) @@ -338,4 +355,13 @@ private void processJsonObject(JsonNode jsonNode, TransferTransformerConfig.Tran logger.error("null result when setting field {} from variable {}. Jsonpath: {}, variable value: {}", fieldName, variableName, transformer.getJsonPath(), variableValue); } } + + @Transactional + public void parseSubBatchDetails(String jsonString) throws JsonProcessingException { + ObjectMapper objectMapper = new ObjectMapper(); + List batches = Arrays.asList(objectMapper.readValue(jsonString, Batch[].class)); + for (Batch bt : batches) { + batchRepository.save(bt); + } + } } diff --git a/src/main/resources/application-local.yml b/src/main/resources/application-local.yml index ceeb31dd..ed3eb0a8 100644 --- a/src/main/resources/application-local.yml +++ b/src/main/resources/application-local.yml @@ -50,6 +50,29 @@ transfer: - field: paymentMode variableName: transactionList jsonPath: $.payment_mode + - name: bulk_processor_account_lookup + direction: UNKNOWN + type: BATCH + transformers: + - field: batchId + variableName: batchId + - field: payerFsp + variableName: tenantId + - field: requestFile + variableName: filename + - field: registeringInstitutionId + variableName: registeringInstituteId + - field: correlationId + variableName: clientCorrelationId + - field: requestId + variableName: transactionList + jsonPath: $.request_id + - field: note + variableName: transactionList + jsonPath: $.note + - field: paymentMode + variableName: transactionList + jsonPath: $.payment_mode - name: slcb direction: UNKNOWN type: BATCH diff --git a/src/main/resources/application-tenants.yml b/src/main/resources/application-tenants.yml index 8d8d336f..66d064ec 100644 --- a/src/main/resources/application-tenants.yml +++ b/src/main/resources/application-tenants.yml @@ -2,7 +2,7 @@ tenants: connections: - name: gorilla schema_server: operationsmysql - schema_name: tenants + schema_name: schema_server_port: 3306 schema_username: mifos schema_password: password @@ -28,7 +28,7 @@ tenants: schema_connection_parameters: - name: rhino schema_server: operationsmysql - schema_name: tenants + schema_name: schema_server_port: 3306 schema_username: mifos schema_password: password @@ -54,7 +54,7 @@ tenants: schema_connection_parameters: - name: lion schema_server: operationsmysql - schema_name: tenants + schema_name: schema_server_port: 3306 schema_username: mifos schema_password: password diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index b72f5e39..8ed538c3 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -48,6 +48,7 @@ application: cloud: aws: enabled: true + s3BaseUrl: "https://s3.ap-south-1.amazonaws.com" credentials: access-key: ${AWS_ACCESS_KEY:access_key_from_aws} secret-key: ${AWS_SECRET_KEY:secret_key_from_aws}