From d27d8a88495ed4936ff7e3d8a7daf3ed176b0ee0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Tue, 1 Oct 2024 15:27:49 +0300 Subject: [PATCH] [DOP-16941] Add example for onETL --- docs/data_type_mappings.md | 7 ++- docs/using_the_dialect.md | 104 +++++++++++++++++++------------------ 2 files changed, 57 insertions(+), 54 deletions(-) diff --git a/docs/data_type_mappings.md b/docs/data_type_mappings.md index 1f0bb9e..b94b518 100644 --- a/docs/data_type_mappings.md +++ b/docs/data_type_mappings.md @@ -4,6 +4,8 @@ This documentation outlines the customized mappings that the Spark Dialect Exten #### Customized Type Mappings with Spark Dialect Extension +Primitive types: + | ClickHouse Type (Read) | Spark Type | ClickHouse Type (Write) | ClickHouse Type (Create) | |------------------------|----------------------|-------------------------|-------------------------------------------------| | `Bool` | `BooleanType` | `Bool` | `Bool (Spark's default is UInt64)` | @@ -26,7 +28,7 @@ This documentation outlines the customized mappings that the Spark Dialect Exten | `DateTime64(6)` | `TimestampType` | `DateTime64(6)` | `DateTime64(6) (Spark's default is DateTime32)` | -``Array(T)`` `->` ``ArrayType(T)``: +``Array(T)`` `->` ``ArrayType(T)`` (without this extension Spark does not support Arrays for GenericJDBC dialect): | ClickHouse Type (Read) | Spark Type | ClickHouse Type (Write) | ClickHouse Type (Create) | |------------------------|--------------------------------|-------------------------|--------------------------| @@ -39,6 +41,3 @@ This documentation outlines the customized mappings that the Spark Dialect Exten | unsupported | `ArrayType(Date)` | `Array(Date)` | `Array(Date)` | | unsupported | `ArrayType(FloatType)` | `Array(Float32)` | `Array(Float32)` | | unsupported | `ArrayType(DoubleType)` | unsupported | unsupported | - - -#### By Default Array Type Mappings without Spark Dialect Extension is not supported diff --git a/docs/using_the_dialect.md b/docs/using_the_dialect.md index 09e1afd..ef2665b 100644 --- a/docs/using_the_dialect.md +++ b/docs/using_the_dialect.md @@ -2,95 +2,99 @@ This section provides instructions on how to configure Apache Spark to use the Spark Dialect Extension, enabling custom handling of JDBC data types. -### Add the JAR to Spark +### Using onETL with PySpark -#### Using release version - -##### Using SparkConf - -For PySpark: +See [onETL documentation](https://onetl.readthedocs.io) for installation instructions. ```python from pyspark.sql import SparkSession +from onetl.connection import Clickhouse +# describe packages should be loaded by Spark +maven_packages = [ + "io.github.mtsongithub.doetl:spark-dialect-extension_2.12:0.0.1", + *Clickhouse.get_packages(), +] + +# Create Spark session spark = ( SparkSession.builder .appName("My Spark App") - .config("spark.jars.packages", "io.github.mtsongithub.doetl:spark-dialect-extension_2.12:0.0.1") + .config("spark.jars.packages", ",".join(maven_packages)) .getOrCreate() ) -``` - -For Spark on Scala: - -```scala -import org.apache.spark.sql.SparkSession - -val spark = SparkSession.builder() -.appName("My Spark App") -.config("spark.jars.packages", "io.github.mtsongithub.doetl:spark-dialect-extension_2.12:0.0.1") -.getOrCreate() -``` -##### Using Spark Submit - -```bash -spark-submit --conf spark.jars.packages=io.github.mtsongithub.doetl:spark-dialect-extension_2.12:0.0.1 -``` - -#### Compile from source - -##### Build .jar file - -See [CONTRIBUTING.md](../CONTRIBUTING.md) for build instructions. +# Register custom Clickhouse dialect +ClickhouseDialectRegistry = spark._jvm.io.github.mtsongithub.doetl.sparkdialectextensions.clickhouse.ClickhouseDialectRegistry +ClickhouseDialectRegistry.register() -After build you'll have a file `/path/to/cloned-repo/target/scala_2.12/spark-dialect-extension_2.12-0.0.1.jar` -##### Using SparkConf +# use onETL to interact with Clickhouse +clickhouse = Clickhouse( + host="my.clickhouse.hostname.or.ip", + port=9000, + user="someuser", + password="******", + spark=spark, +) -For PySpark: +from onetl.db import DBReader, DBWriter -```python -from pyspark.sql import SparkSession +# onETL now can properly read some Clickhouse types +reader = DBReader(connection=clickhouse, source="mytable") +df = reader.run() -spark = ( - SparkSession.builder - .appName("My Spark App") - .config("spark.jars", "/path/to/cloned-repo/target/scala_2.12/spark-dialect-extension_2.12-0.0.1.jar") - .getOrCreate() -) +# onETL now can properly write some Clickhouse types +writer = DBWriter(connection=clickhouse, target="anothertable") +writer.run(df) ``` -For Spark on Scala: +### Using Spark on Scala ```scala import org.apache.spark.sql.SparkSession +// describe packages should be loaded by Spark +var maven_packages = Array( + "io.github.mtsongithub.doetl:spark-dialect-extension_2.12:0.0.1", + "com.clickhouse:clickhouse-jdbc:0.6.5", + "com.clickhouse:clickhouse-http-client:0.6.5", + "org.apache.httpcomponents.client5:httpclient5::5.3.1", +) + val spark = SparkSession.builder() .appName("My Spark App") -.config("spark.jars", "/path/to/cloned-repo/target/scala_2.12/spark-dialect-extension_2.12-0.0.1.jar") +.config("spark.jars.packages", maven_packages.mkString(",")) .getOrCreate() + +// Register custom Clickhouse dialect +import io.github.mtsongithub.doetl.sparkdialectextensions.clickhouse.ClickhouseDialectRegistry + +ClickhouseDialectRegistry.register() + +// now Spark can properly handle some Clickhouse types during read & write +df = spark.read.jdbc.options(...).load() +df.write.jdbc.options(...).saveAsTable("anothertable") ``` -##### Using Spark Submit +### Using Spark Submit + +Start Spark session with downloaded packages: ```bash -spark-submit --jars /path/to/cloned-repo/target/scala_2.12/spark-dialect-extension_2.12-0.0.1.jar +spark-submit --conf spark.jars.packages=io.github.mtsongithub.doetl:spark-dialect-extension_2.12:0.0.1,com.clickhouse:clickhouse-jdbc:0.6.5,com.clickhouse:clickhouse-http-client:0.6.5,org.apache.httpcomponents.client5:httpclient5::5.3.1 ... ``` -### Register a dialect - -To integrate the Spark Dialect Extension into your Spark application, you need to use ``DialectRegistry`` classes, which dynamically detect the Spark version and register the corresponding dialect. +And then register custom dialect in started session. For PySpark: - ```python # Register custom Clickhouse dialect ClickhouseDialectRegistry = spark._jvm.io.github.mtsongithub.doetl.sparkdialectextensions.clickhouse.ClickhouseDialectRegistry ClickhouseDialectRegistry.register() ``` -For Spark on Scala: +For Scala: ```scala // Register custom Clickhouse dialect import io.github.mtsongithub.doetl.sparkdialectextensions.clickhouse.ClickhouseDialectRegistry