Skip to content

Commit

Permalink
feature: Adds S3 transport (OpenLineage#3129)
Browse files Browse the repository at this point in the history
* Adds S3 transport as additional dependency

Signed-off-by: Artur Owczarek <[email protected]>
  • Loading branch information
arturowczarek authored Oct 4, 2024
1 parent 90034f2 commit f34e8ab
Show file tree
Hide file tree
Showing 10 changed files with 489 additions and 2 deletions.
17 changes: 15 additions & 2 deletions .circleci/continue_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,9 @@ jobs:
release:
default: false
type: boolean
docker:
- image: cimg/openjdk:17.0
machine:
image: ubuntu-2404:current
resource_class: medium
steps:
- *checkout_project_root
- run:
Expand All @@ -448,6 +449,7 @@ jobs:
- restore_cache:
keys:
- v1-client-java-{{ checksum "/tmp/checksum.txt" }}
- set_java_version
- run: ./gradlew --no-daemon --console=plain --stacktrace build check jacocoTestReport
- run: bash <(curl -s https://codecov.io/bash)
- publish_shadow_jar_local
Expand All @@ -463,6 +465,8 @@ jobs:
path: transports-dataplex/build/test-results/test
- store_test_results:
path: transports-gcs/build/test-results/test
- store_test_results:
path: transports-s3/build/test-results/test
- store_artifacts:
path: build/reports/tests/test
destination: test-report
Expand All @@ -472,6 +476,9 @@ jobs:
- store_artifacts:
path: transports-gcs/build/reports/tests/test
destination: test-report
- store_artifacts:
path: transports-s3/build/reports/tests/test
destination: test-report
- store_artifacts:
path: build/reports/tests/pmd
destination: pmd-report
Expand All @@ -484,6 +491,9 @@ jobs:
- store_artifacts:
path: transports-gcs/build/libs
destination: libs
- store_artifacts:
path: transports-s3/build/libs
destination: libs
- save_cache:
key: v1-client-java-{{ checksum "/tmp/checksum.txt" }}
paths:
Expand All @@ -510,6 +520,9 @@ jobs:
- store_artifacts:
path: ./transports-gcs/build/libs
destination: transports-gcs-artifacts
- store_artifacts:
path: ./transports-s3/build/libs
destination: transports-s3-artifacts
- save_cache:
key: v1-release-client-java-{{ checksum "/tmp/checksum.txt" }}
paths:
Expand Down
1 change: 1 addition & 0 deletions client/java/settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ rootProject.name = 'openlineage-java'
include('generator')
include('transports-dataplex')
include('transports-gcs')
include('transports-s3')

buildCache {
local {
Expand Down
57 changes: 57 additions & 0 deletions client/java/transports-s3/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'eclipse'
id 'jacoco'
id 'java'
id 'java-library'
id 'maven-publish'
id 'signing'
id "com.adarshr.test-logger" version "3.2.0"
// Don't bump above 6.13 - it requires Java 11 https://github.com/diffplug/spotless/blob/main/plugin-gradle/CHANGES.md#changes-12
id 'com.diffplug.spotless' version '6.13.0'
id "com.gradleup.shadow" version "8.3.2"
id "pmd"
id "io.freefair.lombok" version "8.10"
id "com.github.gmazzo.buildconfig" version "5.5.0"
}

ext {
projectDescription = "S3 OpenLineage transport library"
s3MockVersion = "3.11.0"
testcontainersVersion = "1.19.3"
}

sourceSets {
test {
buildConfig {
// We should use the same version of dependency and Docker image
buildConfigField(String, "S3_MOCK_VERSION", s3MockVersion)
}
}
}


dependencies {
compileOnly("com.google.code.findbugs:jsr305:3.0.2")
implementation(platform("software.amazon.awssdk:bom:2.28.11"))
implementation("software.amazon.awssdk:auth")
implementation("software.amazon.awssdk:s3")
implementation("software.amazon.awssdk:url-connection-client")

testImplementation("com.adobe.testing:s3mock-testcontainers:${s3MockVersion}")
testImplementation(platform("org.testcontainers:testcontainers-bom:${testcontainersVersion}"))
testImplementation("org.testcontainers:junit-jupiter")
}

shadowJar {
relocate "software.amazon", "io.openlineage.client.shaded.software.amazon"
relocate "org.apache", "io.openlineage.client.shaded.org.apache"
relocate "org.reactivestreams", "io.openlineage.client.shaded.org.reactivestreams"
relocate "org.slf4j", "io.openlineage.client.shaded.org.slf4j"
}

apply from: '../transports.build.gradle'
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.client.transports.s3;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientException;
import io.openlineage.client.OpenLineageClientUtils;
import io.openlineage.client.transports.Transport;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.*;
import software.amazon.awssdk.utils.AttributeMap;

@Slf4j
public class S3Transport extends Transport {
private static final String FILENAME_FORMAT_FOR_DIRECTORY_PREFIX = "%s%s.json";
private static final String FILENAME_FORMAT_FOR_FILE_PREFIX = "%s_%s.json";

private final S3Client s3Client;
private final S3TransportConfig config;

public S3Transport(S3TransportConfig config) {
this.config = Objects.requireNonNull(config, "S3TransportConfig must not be null");
this.s3Client = buildS3Client(config);
}

private static S3Client buildS3Client(S3TransportConfig config) {
S3ClientBuilder builder = S3Client.builder();
if (config.getEndpoint() != null) {
builder.endpointOverride(URI.create(config.getEndpoint()));
}
return builder
.serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build())
.httpClient(
UrlConnectionHttpClient.builder()
.buildWithDefaults(
AttributeMap.builder()
.put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true)
.build()))
.build();
}

@Override
public void emit(OpenLineage.@NonNull RunEvent runEvent) {
emitEvent(runEvent);
}

@Override
public void emit(OpenLineage.@NonNull DatasetEvent datasetEvent) {
emitEvent(datasetEvent);
}

@Override
public void emit(OpenLineage.@NonNull JobEvent jobEvent) {
emitEvent(jobEvent);
}

private <T extends OpenLineage.BaseEvent> void emitEvent(T event) {
Long timestamp = event.getEventTime().toInstant().toEpochMilli();
uploadObject(getFileName(timestamp), OpenLineageClientUtils.toJson(event));
}

private String getFileName(Long timestamp) {
if (config.getFileNamePrefix() != null) {
String prefix = config.getFileNamePrefix();
return prefix.endsWith("/")
? String.format(FILENAME_FORMAT_FOR_DIRECTORY_PREFIX, prefix, timestamp)
: String.format(FILENAME_FORMAT_FOR_FILE_PREFIX, prefix, timestamp);
} else {
return timestamp + ".json";
}
}

private void uploadObject(String objectName, String contents) {
String bucketName = config.getBucketName();
log.debug("Attempting to upload event to bucket: {} with key: {}", bucketName, objectName);
// We don't want to overwrite the files
if (objectExists(bucketName, objectName)) {
String error =
String.format("File with given name s3://%s/%s, already exists!", bucketName, objectName);
throw new OpenLineageClientException(error);
}
s3Client.putObject(
PutObjectRequest.builder().bucket(bucketName).key(objectName).build(),
RequestBody.fromBytes(contents.getBytes(StandardCharsets.UTF_8)));
log.debug("Stored event: {}/{}", bucketName, objectName);
}

private boolean objectExists(String bucketName, String objectName) {
try {
// Try to retrieve the metadata from an object without returning the object itself.
s3Client.headObject(HeadObjectRequest.builder().bucket(bucketName).key(objectName).build());
return true;
} catch (NoSuchKeyException e) {
return false;
}
}

public void close() {
s3Client.close();
}

// Visible for testing
S3Client getS3Client() {
return s3Client;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.client.transports.s3;

import io.openlineage.client.OpenLineageClientException;
import io.openlineage.client.transports.Transport;
import io.openlineage.client.transports.TransportBuilder;
import io.openlineage.client.transports.TransportConfig;

public class S3TransportBuilder implements TransportBuilder {
@Override
public String getType() {
return "s3";
}

@Override
public TransportConfig getConfig() {
return new S3TransportConfig();
}

@Override
public Transport build(TransportConfig config) {
try {
return new S3Transport((S3TransportConfig) config);
} catch (Exception e) {
throw new OpenLineageClientException(
"Failed to create S3 transport with config: " + config.toString(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.client.transports.s3;

import io.openlineage.client.MergeConfig;
import io.openlineage.client.transports.TransportConfig;
import javax.annotation.Nullable;
import lombok.*;

@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
@ToString
public class S3TransportConfig implements TransportConfig, MergeConfig<S3TransportConfig> {

/** If not null, overrides the default endpoint. */
@Nullable private String endpoint;

private String bucketName;
private @Nullable String fileNamePrefix;

@Override
public S3TransportConfig mergeWithNonNull(S3TransportConfig s3TransportConfig) {
return new S3TransportConfig(
mergePropertyWith(endpoint, s3TransportConfig.getEndpoint()),
mergePropertyWith(bucketName, s3TransportConfig.getBucketName()),
mergePropertyWith(fileNamePrefix, s3TransportConfig.getFileNamePrefix()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.openlineage.client.transports.s3.S3TransportBuilder
Loading

0 comments on commit f34e8ab

Please sign in to comment.