diff --git a/README.md b/README.md index 872aaea..5c2cadb 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ User can use this library to write/persist spark dataframe to various storage op Currently, we are supporting below options: * S3 Buckets +* Redshift Tables Please see the detailed documentation [here](aws/README.md). @@ -37,7 +38,7 @@ User can use this library to write/persist spark dataframe to various storage op Currently, we are supporting below options: -* GCS -* BigQuery +* GCS Buckets +* BigQuery Tables Please see the detailed documentation [here](gcp/README.md). diff --git a/aws/README.md b/aws/README.md index 438487e..001875c 100644 --- a/aws/README.md +++ b/aws/README.md @@ -19,7 +19,7 @@ Make sure you add `GITHUB_USERNAME` and `GITHUB_TOKEN` to the environment variab `GITHUB_TOKEN` is the Personal Access Token with the permission to read packages. -## S3 Bucket +## S3 BUCKET User can use this library to write/persist spark dataframe to s3 buckets in various file formats. Supported file formats are: @@ -47,7 +47,7 @@ val csvFileFormat = CSVFileFormat( User can provide below options to the `CSVFileFormat` instance: | Parameter Name | Default Value | Description | -| :------------------------ | :-------------------------: | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +|:--------------------------|:---------------------------:|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | charToEscapeQuoteEscaping | \ | Sets a single character used for escaping the escape for the quote character. | | compression | none | Compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and deflate). | | dateFormat | yyyy-MM-dd | Sets the string that indicates a date format. | @@ -76,17 +76,18 @@ import com.clairvoyant.data.scalaxy.writer.aws.s3.instances.DataFrameToCSVFileWr ```scala DataFrameToS3BucketWriter - .write( - dataFrame = df, - fileFormat = csvFileFormat, - bucketName = mybucket, - path = outputPath - ) + .write( + dataFrame = df, + fileFormat = csvFileFormat, + bucketName = mybucket, + path = outputPath + ) `````` ### JSON -Suppose user wants to write the dataframe `df` to the s3 bucket `myBucket` under the path `outputPath` in the `json` format. +Suppose user wants to write the dataframe `df` to the s3 bucket `myBucket` under the path `outputPath` in the `json` +format. Then user need to perform below steps: #### 1. Define file format @@ -95,14 +96,14 @@ Then user need to perform below steps: import com.clairvoyant.data.scalaxy.writer.aws.s3.formats.JSONFileFormat val jsonFileFormat = JSONFileFormat( - ignoreNullFields = true - ) + ignoreNullFields = true +) ``` User can provide below options to the `JSONFileFormat` instance: | Parameter Name | Default Value | Description | -| :----------------- | :-------------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------------ | +|:-------------------|:---------------------------:|:--------------------------------------------------------------------------------------------------------------------------------------------------------| | compression | none | Compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and deflate). | | dateFormat | yyyy-MM-dd | Sets the string that indicates a date format. | | encoding | UTF-8 | Specifies encoding (charset) of saved CSV files. | @@ -122,12 +123,12 @@ import com.clairvoyant.data.scalaxy.writer.aws.s3.instances.DataFrameToJSONFileW ```scala DataFrameToS3BucketWriter - .write( - dataFrame = df, - fileFormat = jsonFileFormat, - bucketName = myBucket, - path = outputPath - ) + .write( + dataFrame = df, + fileFormat = jsonFileFormat, + bucketName = myBucket, + path = outputPath + ) `````` ### XML @@ -141,14 +142,14 @@ Then user need to perform below steps: import com.clairvoyant.data.scalaxy.writer.aws.s3.formats.XMLFileFormat val xmlFileFormat = XMLFileFormat( - attributePrefix = "attr_" - ) + attributePrefix = "attr_" +) ``` User can provide below options to the `XMLFileFormat` instance: | Parameter Name | Default Value | Description | -| :--------------- | :---------------------------------------------: | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +|:-----------------|:-----------------------------------------------:|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | arrayElementName | item | Name of XML element that encloses each element of an array-valued column when writing. | | attributePrefix | _ | The prefix for attributes so that we can differentiating attributes and elements. This will be the prefix for field names. | | compression | None | Compression codec to use when saving to file.
Should be the fully qualified name of a class implementing org.apache.hadoop.io.compress.CompressionCodec or one of case-insensitive shorten names (bzip2, gzip, lz4, and snappy).
Defaults to no compression when a codec is not specified. | @@ -170,17 +171,18 @@ import com.clairvoyant.data.scalaxy.writer.aws.s3.instances.DataFrameToXMLFileWr ```scala DataFrameToS3BucketWriter - .write( - dataFrame = df, - fileFormat = xmlFileFormat, - bucketName = myBucket, - path = outputPath - ) + .write( + dataFrame = df, + fileFormat = xmlFileFormat, + bucketName = myBucket, + path = outputPath + ) `````` ### PARQUET -Suppose user wants to write the dataframe `df` to s3 bucket `myBucket` under the path `outputPath` in the `parquet` format. +Suppose user wants to write the dataframe `df` to s3 bucket `myBucket` under the path `outputPath` in the `parquet` +format. Then user need to perform below steps: #### 1. Define file format @@ -194,7 +196,7 @@ val parquetFileFormat = ParquetFileFormat() User can provide below options to the `ParquetFileFormat` instance: | Parameter Name | Default Value | Description | -| :----------------- | :-----------: | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +|:-------------------|:-------------:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | datetimeRebaseMode | EXCEPTION | The datetimeRebaseMode option allows to specify the rebasing mode for the values of the DATE, TIMESTAMP_MILLIS, TIMESTAMP_MICROS logical types from the Julian to Proleptic Gregorian calendar.
Currently supported modes are:
EXCEPTION: fails in reads of ancient dates/timestamps that are ambiguous between the two calendars.
CORRECTED: loads dates/timestamps without rebasing.
LEGACY: performs rebasing of ancient dates/timestamps from the Julian to Proleptic Gregorian calendar. | | int96RebaseMode | EXCEPTION | The int96RebaseMode option allows to specify the rebasing mode for INT96 timestamps from the Julian to Proleptic Gregorian calendar. Currently supported modes are:
EXCEPTION: fails in reads of ancient INT96 timestamps that are ambiguous between the two calendars.
CORRECTED: loads INT96 timestamps without rebasing.
LEGACY: performs rebasing of ancient timestamps from the Julian to Proleptic Gregorian calendar. | | mergeSchema | false | Sets whether we should merge schemas collected from all Parquet part-files. | @@ -210,10 +212,255 @@ import com.clairvoyant.data.scalaxy.writer.aws.s3.instances.DataFrameToParquetFi ```scala DataFrameToS3BucketWriter - .write( - dataFrame = df, - fileFormat = parquetFileFormat, - bucketName = myBucket, - path = outputPath - ) -`````` \ No newline at end of file + .write( + dataFrame = df, + fileFormat = parquetFileFormat, + bucketName = myBucket, + path = outputPath + ) +`````` + +## REDSHIFT + +User can use this library to write/persist spark dataframe to redshift tables. + +User need to perform below steps in order to write the dataframe to redshift table: + +### 1. Import necessary classes + +```scala +import com.clairvoyant.data.scalaxy.writer.aws.redshift.* +``` + +### 2. Call the write API + +```scala +val redshiftWriterOptions = RedshiftWriterOptions( + tempDirRegion = Some("ca-central-1"), + iamRoleARN = Some("arn:aws:iam::283220348991:role/service-role/AmazonRedshift-CommandsAccessRole-20231115T135908") +) + +DataFrameToRedshiftWriter + .write( + dataFrame = df, + hostName = "my-host", + port = 5439, + databaseName = "dev", + tableName = "my_redshift_table", + userName = "testuser", + password = "testpassword", + tempDirS3Path = "s3a://my-tmp-redshift-bucket/redshift-tmp-dir/", + writerOptions = redshiftWriterOptions + ) +``` + +User need to provide below arguments to the `write` API: + +| Argument Name | Mandatory | Default Value | Description | +|:--------------|:---------:|:-------------------------------------------------------------------------:|:----------------------------------------------------------------| +| dataFrame | Yes | - | Dataframe to write to redshift table. | +| hostName | Yes | - | Redshift host name. | +| port | No | 5439 | Redshift port. | +| databaseName | No | dev | Redshift database name. | +| tableName | Yes | - | Redshift table name. | +| userName | Yes | - | Redshift user name. | +| password | Yes | - | Redshift password. | +| tempDirS3Path | Yes | - | S3 path to store temporary files. | +| writerOptions | No | Default instance of
`RedshiftWriterOptions`
with default values | Redshift writer options represented by `RedshiftWriterOptions`. | +| saveMode | No | overwrite | Save mode to use while writing to redshift table. | + +User can pass below options to the `RedshiftWriterOptions` instance: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Parameter NameMandatoryDefault valueDescription
tempDirRegionNo- +

AWS region where tempdir is located. Setting this option will improve connector performance for interactions with tempdir as well as automatically supply this value as part of COPY and UNLOAD operations during connector writes and reads. If the region is not specified, the connector will attempt to use the Default Credential Provider Chain for resolving where the tempdir region is located. In some cases, such as when the connector is being used outside of an AWS environment, this resolution will fail. Therefore, this setting is highly recommended in the following situations:

+
    +
  1. When the connector is running outside of AWS as automatic region discovery will fail and negatively affect connector performance.
  2. +
  3. When tempdir is in a different region than the Redshift cluster as using this setting alleviates the need to supply the region manually using the extracopyoptions and extraunloadoptions parameters.
  4. +
  5. When the connector is running in a different region than tempdir as it improves the connector's access performance of tempdir.
  6. +
+
iamRoleARNOnly if using IAM roles to authorize Redshift COPY/UNLOAD operations-Fully specified ARN of the IAM Role attached to the Redshift cluster, ex: arn:aws:iam::123456789000:role/redshift_iam_role
forwardSparkS3CredentialsNofalse + If true then this library will automatically discover the credentials that Spark is + using to connect to S3 and will forward those credentials to Redshift over JDBC. + These credentials are sent as part of the JDBC query, so it is strongly + recommended to enable SSL encryption of the JDBC connection when using this option. +
jdbcDriverNoDetermined by the JDBC URL's subprotocolThe class name of the JDBC driver to use. This class must be on the classpath. In most cases, it should not be necessary to specify this option, as the appropriate driver classname should automatically be determined by the JDBC URL's subprotocol.
distStyleNoEVENThe Redshift Distribution Style to + be used when creating a table. Can be one of EVEN, KEY or ALL (see Redshift docs). When using KEY, you + must also set a distribution key with the distkey option. +
distKeyNo, unless using DISTSTYLE KEY-The name of a column in the table to use as the distribution key when creating a table.
sortKeySpecNo- +

A full Redshift Sort Key definition.

+

Examples include:

+
    +
  • SORTKEY(my_sort_column)
  • +
  • COMPOUND SORTKEY(sort_col_1, sort_col_2)
  • +
  • INTERLEAVED SORTKEY(sort_col_1, sort_col_2)
  • +
+
includeColumnListNofalse + If true then this library will automatically extract the columns from the schema + and add them to the COPY command according to the Column List docs. + (e.g. `COPY "PUBLIC"."tablename" ("column1" [,"column2", ...])`). +
descriptionNo- +

A description for the table. Will be set using the SQL COMMENT command, and should show up in most query tools. + See also the description metadata to set descriptions on individual columns. +

preActionsNo- +

This can be a list of SQL commands to be executed before loading COPY command. + It may be useful to have some DELETE commands or similar run here before loading new data. If the command contains + %s, the table name will be formatted in before execution (in case you're using a staging table).

+

Be warned that if this commands fail, it is treated as an error, and you'll get an exception. If using a staging + table, the changes will be reverted and the backup table restored if pre actions fail.

+
postActionsNoNo default +

This can be a list of SQL commands to be executed after a successful COPY when loading data. + It may be useful to have some GRANT commands or similar run here when loading new data. If the command contains + %s, the table name will be formatted in before execution (in case you're using a staging table).

+

Be warned that if this commands fail, it is treated as an error and you'll get an exception. If using a staging + table, the changes will be reverted and the backup table restored if post actions fail.

+
extraCopyOptionsNoNo default +

A list extra options to append to the Redshift COPY command when loading data, e.g. TRUNCATECOLUMNS + or MAXERROR n (see the Redshift docs + for other options).

+

Note that since these options are appended to the end of the COPY command, only options that make sense + at the end of the command can be used, but that should cover most possible use cases.

+
tempFormatNoAVRO +

+ The format in which to save temporary files in S3 when writing to Redshift. + Defaults to "AVRO"; the other allowed values are "CSV", "CSV GZIP", and "PARQUET" for CSV, + gzipped CSV, and parquet, respectively. +

+

+ Redshift is significantly faster when loading CSV than when loading Avro files, so + using that tempformat may provide a large performance boost when writing + to Redshift. +

+

+ Parquet should not be used as the tempformat when using an S3 bucket (tempdir) in a region that is different + from the region where the redshift cluster you are writing to resides. This is because cross-region copies are + not supported in redshift when using parquet as the format. +

+
csvNullStringNo@NULL@ +

+ The String value to write for nulls when using the CSV tempformat. + This should be a value which does not appear in your actual data. +

+
autoPushDownNoTrue +

+ Apply predicate and query pushdown by capturing and analyzing the Spark logical plans for SQL operations. + The operations are translated into a SQL query and then executed in Redshift to improve performance. +

+

Once autopushdown is enabled, it is enabled for all the Redshift tables in the same Spark session.

+
autoPushDownS3ResultCacheNoFalseCache the query SQL to unload data S3 path mapping in memory so that the same query don't need to execute again in the same Spark session.
copyRetryCountNo2Number of times to retry a copy operation including dropping and creating any required table before failing.
jdbcOptionsNo- + Additional map of parameters to pass to the underlying JDBC driver where the wildcard is the name of the JDBC parameter (e.g., jdbc.ssl). Note that the jdbc prefix will be stripped off before passing to the JDBC driver. + A complete list of possible options for the Redshift JDBC driver may be seen in the Redshift docs. +
\ No newline at end of file diff --git a/aws/lib/spark-redshift.jar b/aws/lib/spark-redshift.jar new file mode 100644 index 0000000..f86fb3f Binary files /dev/null and b/aws/lib/spark-redshift.jar differ diff --git a/aws/src/main/scala/com/clairvoyant/data/scalaxy/writer/aws/redshift/DataFrameToRedshiftWriter.scala b/aws/src/main/scala/com/clairvoyant/data/scalaxy/writer/aws/redshift/DataFrameToRedshiftWriter.scala new file mode 100644 index 0000000..868edf5 --- /dev/null +++ b/aws/src/main/scala/com/clairvoyant/data/scalaxy/writer/aws/redshift/DataFrameToRedshiftWriter.scala @@ -0,0 +1,58 @@ +package com.clairvoyant.data.scalaxy.writer.aws.redshift + +import org.apache.spark.sql.{DataFrame, SaveMode} + +object DataFrameToRedshiftWriter { + + def write( + dataFrame: DataFrame, + hostName: String, + port: Int = 5439, + databaseName: String, + tableName: String, + userName: String, + password: String, + tempDirS3Path: String, + writerOptions: RedshiftWriterOptions = RedshiftWriterOptions(), + saveMode: SaveMode = SaveMode.Overwrite + ): Unit = + val dataFrameWriterOptions = + Map( + "url" -> s"jdbc:redshift://$hostName:$port/$databaseName", + "dbtable" -> tableName, + "user" -> userName, + "password" -> password, + "tempdir" -> tempDirS3Path, + "tempformat" -> writerOptions.tempFormat, + "diststyle" -> writerOptions.distStyle, + "We " -> writerOptions.preActions.mkString(";"), + "postactions" -> writerOptions.postActions.mkString(";"), + "csvnullstring" -> writerOptions.csvNullString + ) ++ + Map( + "tempdir_region" -> writerOptions.tempDirRegion, + "aws_iam_role" -> writerOptions.iamRoleARN, + "jdbcdriver" -> writerOptions.jdbcDriver, + "distkey" -> writerOptions.distKey, + "sortkeyspec" -> writerOptions.sortKeySpec, + "description" -> writerOptions.description, + "extracopyoptions" -> writerOptions.extraCopyOptions + ).collect { case (key, Some(value)) => + key -> value + } ++ + Map( + "forward_spark_s3_credentials" -> writerOptions.forwardSparkS3Credentials, + "include_column_list" -> writerOptions.includeColumnList, + "autopushdown" -> writerOptions.autoPushDown, + "autopushdown.s3_result_cache" -> writerOptions.autoPushDownS3ResultCache, + "copyretrycount" -> writerOptions.copyRetryCount + ).map((optionName, optionValue) => (optionName, optionValue.toString)) ++ + writerOptions.jdbcOptions + + dataFrame.write + .format(source = "io.github.spark_redshift_community.spark.redshift") + .options(dataFrameWriterOptions) + .mode(saveMode) + .save() + +} diff --git a/aws/src/main/scala/com/clairvoyant/data/scalaxy/writer/aws/redshift/RedshiftWriterOptions.scala b/aws/src/main/scala/com/clairvoyant/data/scalaxy/writer/aws/redshift/RedshiftWriterOptions.scala new file mode 100644 index 0000000..b59ed7b --- /dev/null +++ b/aws/src/main/scala/com/clairvoyant/data/scalaxy/writer/aws/redshift/RedshiftWriterOptions.scala @@ -0,0 +1,22 @@ +package com.clairvoyant.data.scalaxy.writer.aws.redshift + +case class RedshiftWriterOptions( + tempDirRegion: Option[String] = None, + iamRoleARN: Option[String] = None, + forwardSparkS3Credentials: Boolean = false, + jdbcDriver: Option[String] = None, + distStyle: String = "EVEN", + distKey: Option[String] = None, + sortKeySpec: Option[String] = None, + includeColumnList: Boolean = false, + description: Option[String] = None, + preActions: List[String] = List.empty, + postActions: List[String] = List.empty, + extraCopyOptions: Option[String] = None, + tempFormat: String = "AVRO", + csvNullString: String = "@NULL@", + autoPushDown: Boolean = true, + autoPushDownS3ResultCache: Boolean = false, + copyRetryCount: Int = 2, + jdbcOptions: Map[String, String] = Map.empty +) diff --git a/build.sbt b/build.sbt index c57d9f4..25d5ade 100644 --- a/build.sbt +++ b/build.sbt @@ -41,22 +41,27 @@ ThisBuild / wartremoverErrors ++= Warts.allBut( val dataScalaxyTestUtilVersion = "1.0.0" val gcsConnectorVersion = "hadoop3-2.2.17" val bigqueryConnectorVersion = "0.32.2" +val redshiftJDBCDriverVersion = "2.1.0.22" val s3MockVersion = "0.2.6" val scalaParserCombinatorsVersion = "2.3.0" -val sparkVersion = "3.4.1" +val sparkVersion = "3.5.0" val sparkXMLVersion = "0.16.0" val zioConfigVersion = "4.0.0-RC16" // ----- TOOL DEPENDENCIES ----- // +val bigqueryConnectorDependencies = Seq("com.google.cloud.spark" %% "spark-bigquery" % bigqueryConnectorVersion) + .map(_.cross(CrossVersion.for3Use2_13)) + val dataScalaxyTestUtilDependencies = Seq( "com.clairvoyant.data.scalaxy" %% "test-util" % dataScalaxyTestUtilVersion % Test ) val gcsConnectorDependencies = Seq("com.google.cloud.bigdataoss" % "gcs-connector" % gcsConnectorVersion) -val bigqueryConnectorDependencies = Seq("com.google.cloud.spark" %% "spark-bigquery" % bigqueryConnectorVersion) - .map(_.cross(CrossVersion.for3Use2_13)) +val redshiftJDBCDriverDependencies = Seq( + "com.amazon.redshift" % "redshift-jdbc42" % redshiftJDBCDriverVersion +) val s3MockDependencies = Seq( "io.findify" %% "s3mock" % s3MockVersion % Test @@ -75,6 +80,10 @@ val sparkDependencies = Seq( .map(_ excludeAll ("org.scala-lang.modules", "scala-xml")) .map(_.cross(CrossVersion.for3Use2_13)) +val sparkAvroDependencies = Seq( + "org.apache.spark" %% "spark-avro" % sparkVersion +).map(_.cross(CrossVersion.for3Use2_13)) + val sparkHadoopCloudDependencies = Seq( "org.apache.spark" %% "spark-hadoop-cloud" % sparkVersion ).map(_.cross(CrossVersion.for3Use2_13)) @@ -97,9 +106,11 @@ val localFileSystemDependencies = val awsDependencies = dataScalaxyTestUtilDependencies ++ + redshiftJDBCDriverDependencies ++ s3MockDependencies ++ scalaParserCombinatorsDependencies ++ sparkDependencies ++ + sparkAvroDependencies ++ sparkHadoopCloudDependencies ++ sparkXMLDependencies ++ zioConfigDependencies @@ -125,13 +136,14 @@ lazy val `writer-local-file-system` = (project in file("local-file-system")) .settings( version := "1.0.0", libraryDependencies ++= localFileSystemDependencies, + Test / parallelExecution := false, publishConfiguration := publishConfiguration.value.withOverwrite(true), publishLocalConfiguration := publishLocalConfiguration.value.withOverwrite(true) ) lazy val `writer-aws` = (project in file("aws")) .settings( - version := "1.0.0", + version := "1.1.0", libraryDependencies ++= awsDependencies, Test / parallelExecution := false, publishConfiguration := publishConfiguration.value.withOverwrite(true),