Skip to content

Commit

Permalink
[DOP-16941] Add example for onETL
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Oct 1, 2024
1 parent 976fe41 commit d27d8a8
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 54 deletions.
7 changes: 3 additions & 4 deletions docs/data_type_mappings.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ This documentation outlines the customized mappings that the Spark Dialect Exten

#### Customized Type Mappings with Spark Dialect Extension

Primitive types:

| ClickHouse Type (Read) | Spark Type | ClickHouse Type (Write) | ClickHouse Type (Create) |
|------------------------|----------------------|-------------------------|-------------------------------------------------|
| `Bool` | `BooleanType` | `Bool` | `Bool (Spark's default is UInt64)` |
Expand All @@ -26,7 +28,7 @@ This documentation outlines the customized mappings that the Spark Dialect Exten
| `DateTime64(6)` | `TimestampType` | `DateTime64(6)` | `DateTime64(6) (Spark's default is DateTime32)` |


``Array(T)`` `->` ``ArrayType(T)``:
``Array(T)`` `->` ``ArrayType(T)`` (without this extension Spark does not support Arrays for GenericJDBC dialect):

| ClickHouse Type (Read) | Spark Type | ClickHouse Type (Write) | ClickHouse Type (Create) |
|------------------------|--------------------------------|-------------------------|--------------------------|
Expand All @@ -39,6 +41,3 @@ This documentation outlines the customized mappings that the Spark Dialect Exten
| unsupported | `ArrayType(Date)` | `Array(Date)` | `Array(Date)` |
| unsupported | `ArrayType(FloatType)` | `Array(Float32)` | `Array(Float32)` |
| unsupported | `ArrayType(DoubleType)` | unsupported | unsupported |


#### By Default Array Type Mappings without Spark Dialect Extension is not supported
104 changes: 54 additions & 50 deletions docs/using_the_dialect.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,95 +2,99 @@

This section provides instructions on how to configure Apache Spark to use the Spark Dialect Extension, enabling custom handling of JDBC data types.

### Add the JAR to Spark
### Using onETL with PySpark

#### Using release version

##### Using SparkConf

For PySpark:
See [onETL documentation](https://onetl.readthedocs.io) for installation instructions.

```python
from pyspark.sql import SparkSession
from onetl.connection import Clickhouse

# describe packages should be loaded by Spark
maven_packages = [
"io.github.mtsongithub.doetl:spark-dialect-extension_2.12:0.0.1",
*Clickhouse.get_packages(),
]

# Create Spark session
spark = (
SparkSession.builder
.appName("My Spark App")
.config("spark.jars.packages", "io.github.mtsongithub.doetl:spark-dialect-extension_2.12:0.0.1")
.config("spark.jars.packages", ",".join(maven_packages))
.getOrCreate()
)
```

For Spark on Scala:

```scala
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
.appName("My Spark App")
.config("spark.jars.packages", "io.github.mtsongithub.doetl:spark-dialect-extension_2.12:0.0.1")
.getOrCreate()
```

##### Using Spark Submit

```bash
spark-submit --conf spark.jars.packages=io.github.mtsongithub.doetl:spark-dialect-extension_2.12:0.0.1
```

#### Compile from source

##### Build .jar file

See [CONTRIBUTING.md](../CONTRIBUTING.md) for build instructions.
# Register custom Clickhouse dialect
ClickhouseDialectRegistry = spark._jvm.io.github.mtsongithub.doetl.sparkdialectextensions.clickhouse.ClickhouseDialectRegistry
ClickhouseDialectRegistry.register()

After build you'll have a file `/path/to/cloned-repo/target/scala_2.12/spark-dialect-extension_2.12-0.0.1.jar`

##### Using SparkConf
# use onETL to interact with Clickhouse
clickhouse = Clickhouse(
host="my.clickhouse.hostname.or.ip",
port=9000,
user="someuser",
password="******",
spark=spark,
)

For PySpark:
from onetl.db import DBReader, DBWriter

```python
from pyspark.sql import SparkSession
# onETL now can properly read some Clickhouse types
reader = DBReader(connection=clickhouse, source="mytable")
df = reader.run()

spark = (
SparkSession.builder
.appName("My Spark App")
.config("spark.jars", "/path/to/cloned-repo/target/scala_2.12/spark-dialect-extension_2.12-0.0.1.jar")
.getOrCreate()
)
# onETL now can properly write some Clickhouse types
writer = DBWriter(connection=clickhouse, target="anothertable")
writer.run(df)
```

For Spark on Scala:
### Using Spark on Scala

```scala
import org.apache.spark.sql.SparkSession

// describe packages should be loaded by Spark
var maven_packages = Array(
"io.github.mtsongithub.doetl:spark-dialect-extension_2.12:0.0.1",
"com.clickhouse:clickhouse-jdbc:0.6.5",
"com.clickhouse:clickhouse-http-client:0.6.5",
"org.apache.httpcomponents.client5:httpclient5::5.3.1",
)

val spark = SparkSession.builder()
.appName("My Spark App")
.config("spark.jars", "/path/to/cloned-repo/target/scala_2.12/spark-dialect-extension_2.12-0.0.1.jar")
.config("spark.jars.packages", maven_packages.mkString(","))
.getOrCreate()

// Register custom Clickhouse dialect
import io.github.mtsongithub.doetl.sparkdialectextensions.clickhouse.ClickhouseDialectRegistry

ClickhouseDialectRegistry.register()

// now Spark can properly handle some Clickhouse types during read & write
df = spark.read.jdbc.options(...).load()
df.write.jdbc.options(...).saveAsTable("anothertable")
```

##### Using Spark Submit
### Using Spark Submit

Start Spark session with downloaded packages:

```bash
spark-submit --jars /path/to/cloned-repo/target/scala_2.12/spark-dialect-extension_2.12-0.0.1.jar
spark-submit --conf spark.jars.packages=io.github.mtsongithub.doetl:spark-dialect-extension_2.12:0.0.1,com.clickhouse:clickhouse-jdbc:0.6.5,com.clickhouse:clickhouse-http-client:0.6.5,org.apache.httpcomponents.client5:httpclient5::5.3.1 ...
```

### Register a dialect

To integrate the Spark Dialect Extension into your Spark application, you need to use ``<DBMS>DialectRegistry`` classes, which dynamically detect the Spark version and register the corresponding dialect.
And then register custom dialect in started session.

For PySpark:

```python
# Register custom Clickhouse dialect
ClickhouseDialectRegistry = spark._jvm.io.github.mtsongithub.doetl.sparkdialectextensions.clickhouse.ClickhouseDialectRegistry
ClickhouseDialectRegistry.register()
```

For Spark on Scala:
For Scala:
```scala
// Register custom Clickhouse dialect
import io.github.mtsongithub.doetl.sparkdialectextensions.clickhouse.ClickhouseDialectRegistry
Expand Down

0 comments on commit d27d8a8

Please sign in to comment.