From d2c7a1f7ed79cce502eb2250a6b9ec1510017fb5 Mon Sep 17 00:00:00 2001 From: Florian Esser Date: Mon, 26 Feb 2024 17:00:51 +0100 Subject: [PATCH] feat: implement S3 upload of transformation result Adds a simple S3 upload service that uploads the transformations result when the necessary details and credentials were provided in the transformation message. SVC-1398 --- build.gradle | 3 + .../hale/transformer/Transformer.java | 35 ++++++----- .../TransformationMessageConsumer.java | 60 +++++++++++++++++-- .../hale/transformer/io/s3/S3Service.java | 60 +++++++++++++++++++ src/main/resources/application.yml | 10 ++++ 5 files changed, 147 insertions(+), 21 deletions(-) create mode 100644 src/main/java/to/wetransform/hale/transformer/io/s3/S3Service.java create mode 100644 src/main/resources/application.yml diff --git a/build.gradle b/build.gradle index f7c2a12..b8a9488 100644 --- a/build.gradle +++ b/build.gradle @@ -40,6 +40,9 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-amqp' implementation 'org.apache.httpcomponents.client5:httpclient5:5.2.1' + // S3 API + implementation 'software.amazon.awssdk:s3:2.20.52' + // hale implementation 'eu.esdihumboldt.unpuzzled:org.eclipse.equinox.nonosgi.registry:1.0.0' implementation "to.wetransform:hale-cli:$cliVersion", { diff --git a/src/main/java/to/wetransform/hale/transformer/Transformer.java b/src/main/java/to/wetransform/hale/transformer/Transformer.java index 262d02b..979cf56 100644 --- a/src/main/java/to/wetransform/hale/transformer/Transformer.java +++ b/src/main/java/to/wetransform/hale/transformer/Transformer.java @@ -45,8 +45,10 @@ public class Transformer { private static final Logger LOG = LoggerFactory.getLogger(Transformer.class); private final CountDownLatch latch = new CountDownLatch(1); + private TargetConfig targetConfig; + private ExecContext execContext; - public void transform(String sourceDataURL, String projectURL, String targetURL) { + public void transform(String sourceDataUrl, String projectUrl, String targetFileName) { File transformationLogFile = null; try { @@ -62,22 +64,22 @@ public void transform(String sourceDataURL, String projectURL, String targetURL) Init.init(); logPlatformVersion(); - ExecContext context = new ExecContext(); + execContext = new ExecContext(); // Set up project URI - URI projectUri = new URI(projectURL); - context.setProject(projectUri); + URI projectUri = new URI(projectUrl); + execContext.setProject(projectUri); // Load project Project project = loadProject(projectUri); - Value sourceCrs = initializeSourceConfig(context, sourceDataURL); + Value sourceCrs = initializeSourceConfig(execContext, sourceDataUrl); - TargetConfig targetConfig = configureTarget(project, sourceCrs); - configureTargetContext(context, tempDirectory, targetConfig, reportFile); + targetConfig = configureTarget(project, sourceCrs, targetFileName); + configureTargetContext(execContext, tempDirectory, targetConfig, reportFile); // run the transformation LOG.info("Transforming started."); - new ExecTransformation().run(context); + new ExecTransformation().run(execContext); // evaluate results boolean success = evaluateTransformationResults(reportFile); @@ -86,11 +88,16 @@ public void transform(String sourceDataURL, String projectURL, String targetURL) LOG.error("Failed to execute transformation: {}", t.getMessage(), t); } finally { latch.countDown(); - deleteDir(transformationLogFile.getParentFile()); } } - // Extracted Methods + public TargetConfig getTargetConfig() { + return targetConfig; + } + + public ExecContext getExecContext() { + return execContext; + } private Path createTempDirectory() throws IOException { return Files.createTempDirectory("hale-transformer"); @@ -245,12 +252,11 @@ private Project loadProject(URI projectUri) { return result; } - private TargetConfig configureTarget(Project lp, Value sourceCrs) { - String filename; + private TargetConfig configureTarget(Project project, Value sourceCrs, String filename) { String preset = null; CustomTarget customTarget = null; - Map presets = getPresets(lp); + Map presets = getPresets(project); // Preset names String defaultPreset = "default"; @@ -260,12 +266,10 @@ private TargetConfig configureTarget(Project lp, Value sourceCrs) { // Project contains hale-connect preset preset = hcPreset; IOConfiguration ioConfiguration = presets.get(hcPreset); - filename = determineTargetFileName(ioConfiguration); } else if (presets.containsKey(defaultPreset)) { // Project contains default preset preset = defaultPreset; IOConfiguration ioConfiguration = presets.get(defaultPreset); - filename = determineTargetFileName(ioConfiguration); } else { // No specific presets found, creating a custom target configuration @@ -291,7 +295,6 @@ private TargetConfig configureTarget(Project lp, Value sourceCrs) { // Create a custom target configuration CustomTarget target = new CustomTarget(targetProvider, targetMap); - filename = "inspire.gml"; customTarget = target; } diff --git a/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java b/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java index 5e713ef..164f757 100644 --- a/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java +++ b/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java @@ -1,17 +1,24 @@ package to.wetransform.hale.transformer.api.messaging; import java.io.Serializable; +import java.net.URI; +import java.nio.file.Paths; import java.util.concurrent.TimeUnit; import com.fasterxml.jackson.annotation.JsonProperty; +import eu.esdihumboldt.hale.app.transform.ExecContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.regions.Region; +import to.wetransform.hale.transformer.TargetConfig; import to.wetransform.hale.transformer.Transformer; import to.wetransform.hale.transformer.api.TransformerApiApplication; import to.wetransform.hale.transformer.api.internal.CountdownLatchConfig; +import to.wetransform.hale.transformer.io.s3.S3Service; @Service public class TransformationMessageConsumer { @@ -19,8 +26,20 @@ public class TransformationMessageConsumer { * */ public record TransformationMessage( - @JsonProperty("projectUrl") String projectUrl, @JsonProperty("sourceDataUrl") String sourceDataUrl) - implements Serializable {} + @JsonProperty("projectUrl") String projectUrl, + @JsonProperty("sourceDataUrl") String sourceDataUrl, + @JsonProperty("targetFileName") String targetFileName, + @JsonProperty("s3Endpoint") String s3Endpoint, + @JsonProperty("s3Region") String s3Region, + @JsonProperty("s3BucketName") String s3BucketName, + @JsonProperty("s3AccessKey") String s3AccessKey, + @JsonProperty("s3SecretKey") String s3SecretKey) + implements Serializable { + + public boolean hasS3Details() { + return s3Region != null && s3BucketName != null && s3AccessKey != null && s3SecretKey != null; + } + } private static final Logger LOG = LoggerFactory.getLogger(TransformationMessageConsumer.class); @@ -33,16 +52,17 @@ public TransformationMessageConsumer(CountdownLatchConfig countdownLatchConfig) @RabbitListener(queues = TransformerApiApplication.QUEUE_NAME) public void receiveMessage(final TransformationMessage message) { - LOG.info("Received projectUrl = " + message.projectUrl + " sourceDataUrl = " + message.sourceDataUrl); + LOG.info("Received projectUrl = " + message.projectUrl + " sourceDataUrl = " + message.sourceDataUrl + + " targetFileName = " + message.targetFileName); // TODO Implement mechanism to only accept a message from the queue if no // transformation is currently running - if (message.projectUrl != null && message.sourceDataUrl() != null) { + if (message.projectUrl != null && message.sourceDataUrl() != null && message.targetFileName != null) { Transformer tx = new Transformer(); try { LOG.info("Transformation started"); - tx.transform(message.sourceDataUrl(), message.projectUrl, null); + tx.transform(message.sourceDataUrl(), message.projectUrl, message.targetFileName); tx.getLatch().await(countdownLatchConfig.getWaitingTime(), TimeUnit.MINUTES); } catch (InterruptedException e) { // TODO What should be done when the transformation fails or times out? @@ -50,7 +70,37 @@ public void receiveMessage(final TransformationMessage message) { // - Send a message back so that the producer can react? Thread.currentThread().interrupt(); LOG.error("Transformation process timed out: " + e.getMessage(), e); + return; + } + + if (message.hasS3Details()) { + try (S3Service s3 = buildS3Service(message)) { + ExecContext execContext = tx.getExecContext(); + TargetConfig targetConfig = tx.getTargetConfig(); + if (execContext != null && targetConfig != null) { + URI target = execContext.getTarget(); + s3.putObject( + message.s3BucketName, + message.targetFileName, + Paths.get(target).toFile()); + } + } catch (Throwable t) { + LOG.error("Error uploading result: " + t.getMessage(), t); + // TODO What now? Should the result just be discarded? Should we send a message back? + } } } } + + private static S3Service buildS3Service(TransformationMessage message) throws IllegalArgumentException { + URI endpoint = null; + if (message.s3Endpoint != null) { + endpoint = URI.create(message.s3Endpoint); + } + + return new S3Service( + Region.of(message.s3Region), + AwsBasicCredentials.create(message.s3AccessKey, message.s3SecretKey), + endpoint); + } } diff --git a/src/main/java/to/wetransform/hale/transformer/io/s3/S3Service.java b/src/main/java/to/wetransform/hale/transformer/io/s3/S3Service.java new file mode 100644 index 0000000..cab7c15 --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/io/s3/S3Service.java @@ -0,0 +1,60 @@ +package to.wetransform.hale.transformer.io.s3; + +import java.io.File; +import java.net.URI; +import java.nio.file.Path; + +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; + +/** + * Adapter for S3 object storages + */ +public class S3Service implements AutoCloseable { + private final S3Client s3Client; + + /** + * Creates an S3 service instance with the given parameters. + * + * @param region S3 region to connect to + * @param credentials Credentials for accessing S3 + * @param endpoint Optional endpoint URL. If null, AWS S3 endpoints are used. + */ + public S3Service(Region region, AwsCredentials credentials, URI endpoint) { + S3ClientBuilder builder = + S3Client.builder().region(region).credentialsProvider(StaticCredentialsProvider.create(credentials)); + + if (endpoint != null) { + builder = builder.endpointOverride(endpoint); + } + + this.s3Client = builder.build(); + } + + /** + * Upload a file to an S3 bucket + * + * @param bucketName Name of the target bucket + * @param key Key (file name) of the created object + * @param file File to upload + * @return API response + */ + public PutObjectResponse putObject(String bucketName, String key, File file) { + PutObjectRequest req = + PutObjectRequest.builder().bucket(bucketName).key(key).build(); + + return s3Client.putObject(req, Path.of(file.toURI())); + } + + /** + * Close the S3 connection + */ + public void close() { + this.s3Client.close(); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..6648c9d --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,10 @@ +spring: + rabbitmq: + host: ${RABBITMQ_HOSTNAME} + username: ${RABBITMQ_USERNAME} + password: ${RABBITMQ_PASSWORD} + +management: + health: + probes: + enabled: true