Skip to content

Commit

Permalink
Put config values in spark-defaults.conf
Browse files Browse the repository at this point in the history
This better reflects how a Production Spark instance will run. Users
will supply spark defaults that have our custom S3 client etc.
in the Spark container. Spark Users shouldn't have to deal with this.
  • Loading branch information
Randgalt committed Aug 3, 2024
1 parent a00969b commit 8e04515
Showing 1 changed file with 20 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import jakarta.annotation.PreDestroy;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

import java.io.File;
Expand Down Expand Up @@ -86,6 +87,7 @@ private PySparkContainer(
case VERSION_3 -> findProjectClassDirectory(TrinoAwsProxyS3ClientSigil.class);
case VERSION_4 -> findProjectClassDirectory(TrinoAwsProxyS4ClientSigil.class);
};
File trinoClientIoDirectory = new File(trinoClientDirectory, "io");

File awsSdkJar = switch (version) {
case VERSION_3 -> findTestJar("aws");
Expand All @@ -107,10 +109,26 @@ private PySparkContainer(
case VERSION_4 -> "io.trino.aws.proxy.spark4.TrinoAwsProxyS4ClientFactory";
};

String s3Endpoint = asHostUrl(httpServer.getBaseUrl().resolve(trinoS3ProxyConfig.getS3Path()).toString());
String metastoreEndpoint = asHostUrl("localhost:" + metastoreContainer.port());

String sparkConfFile = """
hive.metastore.uris %s
spark.hadoop.fs.s3a.endpoint %s
spark.hadoop.fs.s3a.s3.client.factory.impl %s
spark.hadoop.fs.s3a.access.key %s
spark.hadoop.fs.s3a.secret.key %s
spark.hadoop.fs.s3a.path.style.access True
spark.hadoop.fs.s3a.connection.ssl.enabled False
spark.hadoop.fs.s3a.aws.credentials.provider org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
""".formatted(metastoreEndpoint, s3Endpoint, clientFactoryClassName, testingCredentials.emulated().accessKey(), testingCredentials.emulated().secretKey());

container = new GenericContainer<>(dockerImageName)
.withFileSystemBind(hadoopJar.getAbsolutePath(), "/opt/spark/jars/hadoop.jar", BindMode.READ_ONLY)
.withFileSystemBind(awsSdkJar.getAbsolutePath(), "/opt/spark/jars/aws.jar", BindMode.READ_ONLY)
.withFileSystemBind(trinoClientDirectory.getAbsolutePath(), "/opt/spark/conf", BindMode.READ_ONLY)
.withFileSystemBind(trinoClientIoDirectory.getAbsolutePath(), "/opt/spark/conf/io", BindMode.READ_ONLY)
.withCopyToContainer(Transferable.of(sparkConfFile), "/opt/spark/conf/spark-defaults.conf")
.withCreateContainerCmdModifier(modifier -> modifier.withTty(true).withStdinOpen(true).withAttachStdin(true).withAttachStdout(true).withAttachStderr(true))
.withCommand("/opt/spark/bin/pyspark");

Expand All @@ -120,26 +138,14 @@ private PySparkContainer(

container.start();

String metastoreEndpoint = asHostUrl("localhost:" + metastoreContainer.port());
String s3Endpoint = asHostUrl(httpServer.getBaseUrl().resolve(trinoS3ProxyConfig.getS3Path()).toString());

clearInputStreamAndClose(inputToContainerStdin(container.getContainerId(), "spark.stop()"));
clearInputStreamAndClose(inputToContainerStdin(container.getContainerId(), """
spark = SparkSession\\
.builder\\
.appName("testing")\\
.config("hive.metastore.uris", "thrift://%s")\\
.enableHiveSupport()\\
.config("spark.hadoop.fs.s3a.endpoint", "%s")\\
.config("spark.hadoop.fs.s3a.access.key", "%s")\\
.config("spark.hadoop.fs.s3a.secret.key", "%s")\\
.config("spark.hadoop.fs.s3a.path.style.access", True)\\
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\\
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")\\
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", False)\\
.config("spark.hadoop.fs.s3a.s3.client.factory.impl", "%s")\\
.getOrCreate()
""".formatted(metastoreEndpoint, s3Endpoint, testingCredentials.emulated().accessKey(), testingCredentials.emulated().secretKey(), clientFactoryClassName)));
"""));

log.info("PySpark container started");
}
Expand Down

0 comments on commit 8e04515

Please sign in to comment.