Skip to content

Commit

Permalink
Merge pull request #15 from teamclairvoyant/REST-155
Browse files Browse the repository at this point in the history
[REST-155] Modifying type class hierarchy
  • Loading branch information
rahulbhatia023 authored Dec 20, 2023
2 parents d5c9834 + 409ebc0 commit 6d3b071
Show file tree
Hide file tree
Showing 33 changed files with 473 additions and 308 deletions.
125 changes: 89 additions & 36 deletions aws/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ ThisBuild / credentials += Credentials(
System.getenv("GITHUB_TOKEN")
)

ThisBuild / libraryDependencies += "com.clairvoyant.data.scalaxy" %% "writer-aws" % "1.0.0"
ThisBuild / libraryDependencies += "com.clairvoyant.data.scalaxy" %% "writer-aws" % "2.0.0"
```

Make sure you add `GITHUB_USERNAME` and `GITHUB_TOKEN` to the environment variables.
Expand All @@ -22,19 +22,59 @@ Make sure you add `GITHUB_USERNAME` and `GITHUB_TOKEN` to the environment variab
## S3 BUCKET

User can use this library to write/persist spark dataframe to s3 buckets in various file formats.

### API

The library provides below `write` API in type class `DataFrameToS3BucketWriter` in order to write spark
dataframe into S3 bucket.

```scala
def write(
dataFrame: DataFrame,
fileFormat: T,
bucketName: String,
path: String,
saveMode: SaveMode = SaveMode.Overwrite
): Unit
```

The `write` method takes below arguments:

| Argument Name | Mandatory | Default Value | Description |
|:--------------|:---------:|:-------------:|:------------------------------------------------|
| dataFrame | Yes | - | Dataframe to write to s3 bucket. |
| fileFormat | Yes | - | `FileFormat` to use while writing to s3 bucket. |
| bucketName | Yes | - | S3 bucket name. |
| path | Yes | - | S3 path to write the dataframe. |
| saveMode | No | overwrite | Save mode to use while writing to s3 bucket. |


Supported file formats are:

* CSV
* JSON
* XML
* Parquet


### CSV

Suppose user wants to write the dataframe `df` to s3 bucket `mybucket` under the path `outputPath` in the `csv` format.
Then user need to perform below steps:

#### 1. Define file format
#### 1. Import type class

```scala
import com.clairvoyant.data.scalaxy.writer.aws.DataFrameToS3BucketWriter
```

#### 2. Import type class instance

```scala
import com.clairvoyant.data.scalaxy.writer.aws.s3.instances.DataFrameToCSVFileWriter
```

#### 3. Define file format

```scala
import com.clairvoyant.data.scalaxy.writer.aws.s3.formats.CSVFileFormat
Expand Down Expand Up @@ -66,16 +106,11 @@ User can provide below options to the `CSVFileFormat` instance:
| timestampFormat | yyyy-MM-dd HH:mm:ss | Sets the string that indicates a timestamp format. |
| timestampNTZFormat | yyyy-MM-dd'T'HH:mm:ss[.SSS] | Sets the string that indicates a timestamp without timezone format. |

#### 2. Import type class instance

```scala
import com.clairvoyant.data.scalaxy.writer.aws.s3.instances.DataFrameToCSVFileWriter
``````

#### 3. Call API
#### 4. Call API

```scala
DataFrameToS3BucketWriter
DataFrameToS3BucketWriter[CSVFileFormat]
.write(
dataFrame = df,
fileFormat = csvFileFormat,
Expand All @@ -90,7 +125,19 @@ Suppose user wants to write the dataframe `df` to the s3 bucket `myBucket` under
format.
Then user need to perform below steps:

#### 1. Define file format
#### 1. Import type class

```scala
import com.clairvoyant.data.scalaxy.writer.aws.DataFrameToS3BucketWriter
```

#### 2. Import type class instance

```scala
import com.clairvoyant.data.scalaxy.writer.aws.s3.instances.DataFrameToJSONFileWriter
```

#### 3. Define file format

```scala
import com.clairvoyant.data.scalaxy.writer.aws.s3.formats.JSONFileFormat
Expand All @@ -113,16 +160,10 @@ User can provide below options to the `JSONFileFormat` instance:
| timestampNTZFormat | yyyy-MM-dd'T'HH:mm:ss[.SSS] | Sets the string that indicates a timestamp without timezone format. |
| timezone | UTC | Sets the string that indicates a time zone ID to be used to format timestamps in the JSON datasources or partition values. |

#### 2. Import type class instance
#### 4. Call API

```scala
import com.clairvoyant.data.scalaxy.writer.aws.s3.instances.DataFrameToJSONFileWriter
``````

#### 3. Call API

```scala
DataFrameToS3BucketWriter
DataFrameToS3BucketWriter[JSONFileFormat]
.write(
dataFrame = df,
fileFormat = jsonFileFormat,
Expand All @@ -136,7 +177,19 @@ DataFrameToS3BucketWriter
Suppose user wants to write the dataframe `df` to s3 bucket `myBucket` under the path `outputPath` in the `xml` format.
Then user need to perform below steps:

#### 1. Define file format
#### 1. Import type class

```scala
import com.clairvoyant.data.scalaxy.writer.aws.DataFrameToS3BucketWriter
```

#### 2. Import type class instance

```scala
import com.clairvoyant.data.scalaxy.writer.aws.s3.instances.DataFrameToXMLFileWriter
```

#### 3. Define file format

```scala
import com.clairvoyant.data.scalaxy.writer.aws.s3.formats.XMLFileFormat
Expand All @@ -161,16 +214,10 @@ User can provide below options to the `XMLFileFormat` instance:
| timestampFormat | yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] | Controls the format used to write TimestampType format columns. |
| valueTag | _VALUE | The tag used for the value when there are attributes in the element having no child. |

#### 2. Import type class instance

```scala
import com.clairvoyant.data.scalaxy.writer.aws.s3.instances.DataFrameToXMLFileWriter
``````

#### 3. Call API
#### 4. Call API

```scala
DataFrameToS3BucketWriter
DataFrameToS3BucketWriter[XMLFileFormat]
.write(
dataFrame = df,
fileFormat = xmlFileFormat,
Expand All @@ -185,7 +232,19 @@ Suppose user wants to write the dataframe `df` to s3 bucket `myBucket` under the
format.
Then user need to perform below steps:

#### 1. Define file format
#### 1. Import type class

```scala
import com.clairvoyant.data.scalaxy.writer.aws.DataFrameToS3BucketWriter
```

#### 2. Import type class instance

```scala
import com.clairvoyant.data.scalaxy.writer.aws.s3.instances.DataFrameToParquetFileWriter
```

#### 3. Define file format

```scala
import com.clairvoyant.data.scalaxy.writer.aws.s3.formats.ParquetFileFormat
Expand All @@ -202,16 +261,10 @@ User can provide below options to the `ParquetFileFormat` instance:
| mergeSchema | false | Sets whether we should merge schemas collected from all Parquet part-files. |
| compression | snappy | Compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, uncompressed, snappy, gzip, lzo, brotli, lz4, and zstd). |

#### 2. Import type class instance

```scala
import com.clairvoyant.data.scalaxy.writer.aws.s3.instances.DataFrameToParquetFileWriter
``````

#### 3. Call API
#### 4. Call API

```scala
DataFrameToS3BucketWriter
DataFrameToS3BucketWriter[ParquetFileFormat]
.write(
dataFrame = df,
fileFormat = parquetFileFormat,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
package com.clairvoyant.data.scalaxy.writer.aws.s3

import com.clairvoyant.data.scalaxy.writer.aws.s3.formats.FileFormat
import com.clairvoyant.data.scalaxy.writer.aws.s3.instances.DataFrameToS3FileWriter
import org.apache.spark.sql.{DataFrame, SaveMode}

object DataFrameToS3BucketWriter {
trait DataFrameToS3BucketWriter[T]:

def write[T <: FileFormat](
def write(
dataFrame: DataFrame,
fileFormat: T,
bucketName: String,
path: String,
saveMode: SaveMode = SaveMode.Overwrite
)(using dataFrameToS3FileWriter: DataFrameToS3FileWriter[T]): Unit =
dataFrameToS3FileWriter.write(dataFrame, fileFormat, bucketName, path, saveMode)
): Unit

}
object DataFrameToS3BucketWriter:

def apply[T <: FileFormat](
using dataFrameToS3BucketWriter: DataFrameToS3BucketWriter[T]
): DataFrameToS3BucketWriter[T] = dataFrameToS3BucketWriter
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.clairvoyant.data.scalaxy.writer.aws.s3.instances

import com.clairvoyant.data.scalaxy.writer.aws.s3.DataFrameToS3BucketWriter
import com.clairvoyant.data.scalaxy.writer.aws.s3.formats.CSVFileFormat
import org.apache.spark.sql.{DataFrame, SaveMode}

implicit object DataFrameToCSVFileWriter extends DataFrameToS3FileWriter[CSVFileFormat] {
implicit object DataFrameToCSVFileWriter extends DataFrameToS3BucketWriter[CSVFileFormat] {

import org.apache.spark.sql.catalyst.csv.CSVOptions.*

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.clairvoyant.data.scalaxy.writer.aws.s3.instances

import com.clairvoyant.data.scalaxy.writer.aws.s3.DataFrameToS3BucketWriter
import com.clairvoyant.data.scalaxy.writer.aws.s3.formats.JSONFileFormat
import org.apache.spark.sql.{DataFrame, SaveMode}

implicit object DataFrameToJSONFileWriter extends DataFrameToS3FileWriter[JSONFileFormat] {
implicit object DataFrameToJSONFileWriter extends DataFrameToS3BucketWriter[JSONFileFormat] {

import org.apache.spark.sql.catalyst.json.JSONOptions.*

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.clairvoyant.data.scalaxy.writer.aws.s3.instances

import com.clairvoyant.data.scalaxy.writer.aws.s3.DataFrameToS3BucketWriter
import com.clairvoyant.data.scalaxy.writer.aws.s3.formats.ParquetFileFormat
import org.apache.spark.sql.{DataFrame, SaveMode}

implicit object DataFrameToParquetFileWriter extends DataFrameToS3FileWriter[ParquetFileFormat] {
implicit object DataFrameToParquetFileWriter extends DataFrameToS3BucketWriter[ParquetFileFormat] {

override def write(
dataFrame: DataFrame,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.clairvoyant.data.scalaxy.writer.aws.s3.instances

import com.clairvoyant.data.scalaxy.writer.aws.s3.DataFrameToS3BucketWriter
import com.clairvoyant.data.scalaxy.writer.aws.s3.formats.XMLFileFormat
import org.apache.spark.sql.{DataFrame, SaveMode}

implicit object DataFrameToXMLFileWriter extends DataFrameToS3FileWriter[XMLFileFormat] {
implicit object DataFrameToXMLFileWriter extends DataFrameToS3BucketWriter[XMLFileFormat] {

import com.databricks.spark.xml.*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class DataFrameToCSVS3BucketWriterSpec extends DataFrameReader with DataFrameMat

val outputDirPath = s"/tmp/out_${System.currentTimeMillis()}"

val dataFrameToS3BucketWriter = DataFrameToS3BucketWriter[CSVFileFormat]

"write()" should "write a dataframe to the provided s3 path" in {
val df = readJSONFromText(
"""|{
Expand All @@ -25,8 +27,8 @@ class DataFrameToCSVS3BucketWriterSpec extends DataFrameReader with DataFrameMat

s3Client.createBucket(bucketName)

DataFrameToS3BucketWriter
.write[CSVFileFormat](
dataFrameToS3BucketWriter
.write(
dataFrame = df,
fileFormat = csvFileFormat,
bucketName = bucketName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class DataFrameToJSONS3BucketWriterSpec extends DataFrameReader with DataFrameMa

val outputDirPath = s"/tmp/out_${System.currentTimeMillis()}"

val dataFrameToS3BucketWriter = DataFrameToS3BucketWriter[JSONFileFormat]

"write()" should "write a dataframe to the provided s3 path" in {
val df = readJSONFromText(
"""|{
Expand All @@ -25,8 +27,8 @@ class DataFrameToJSONS3BucketWriterSpec extends DataFrameReader with DataFrameMa

s3Client.createBucket(bucketName)

DataFrameToS3BucketWriter
.write[JSONFileFormat](
dataFrameToS3BucketWriter
.write(
dataFrame = df,
fileFormat = jsonFileFormat,
bucketName = bucketName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class DataFrameToParquetS3BucketWriterSpec extends DataFrameReader with DataFram

val outputDirPath = s"/tmp/out_${System.currentTimeMillis()}"

val dataFrameToS3BucketWriter = DataFrameToS3BucketWriter[ParquetFileFormat]

"write()" should "write a dataframe to the provided s3 path" in {
val df = readJSONFromText(
"""|{
Expand All @@ -25,8 +27,8 @@ class DataFrameToParquetS3BucketWriterSpec extends DataFrameReader with DataFram

s3Client.createBucket(bucketName)

DataFrameToS3BucketWriter
.write[ParquetFileFormat](
dataFrameToS3BucketWriter
.write(
dataFrame = df,
fileFormat = parquetFileFormat,
bucketName = bucketName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class DataFrameToXMLS3BucketWriterSpec extends DataFrameReader with DataFrameMat

val outputDirPath = s"/tmp/out_${System.currentTimeMillis()}"

val dataFrameToS3BucketWriter = DataFrameToS3BucketWriter[XMLFileFormat]

"write()" should "write a dataframe to the provided s3 path" in {
val df = readJSONFromText(
"""|{
Expand All @@ -25,8 +27,8 @@ class DataFrameToXMLS3BucketWriterSpec extends DataFrameReader with DataFrameMat

s3Client.createBucket(bucketName)

DataFrameToS3BucketWriter
.write[XMLFileFormat](
dataFrameToS3BucketWriter
.write(
dataFrame = df,
fileFormat = xmlFileFormat,
bucketName = bucketName,
Expand Down
Loading

0 comments on commit 6d3b071

Please sign in to comment.