JDBC Connector allow you to execute any SQL statement using Apache Spark.
To add JDBC connector dependency to your sbt build:
libraryDependencies += "com.github.music-of-the-ainur" %% "jdbc-almaren" % "0.0.5-3.3"
To run in spark-shell:
For scala-version(2.12):
spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.8-3.3,com.github.music-of-the-ainur:jdbc-almaren_2.12:0.0.5-3.3"
For scala-version(2.13):
spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:almaren-framework_2.13:0.9.8-3.3,com.github.music-of-the-ainur:jdbc-almaren_2.13:0.0.5-3.3"
The connector is also available from the
Maven Central
repository. It can be used using the --packages
option or the
spark.jars.packages
configuration property. Use the following value
version | Connector Artifact |
---|---|
Spark 3.3.x and scala 2.13 | com.github.music-of-the-ainur:jdbc-almaren_2.13:0.0.5-3.3 |
Spark 3.3.x and scala 2.12 | com.github.music-of-the-ainur:jdbc-almaren_2.12:0.0.5-3.3 |
Spark 3.2.x and scala 2.12 | com.github.music-of-the-ainur:jdbc-almaren_2.12:0.0.5-3.2 |
Spark 3.1.x and scala 2.12 | com.github.music-of-the-ainur:jdbc-almaren_2.12:0.0.5-3.1 |
Spark 2.4.x and scala 2.12 | com.github.music-of-the-ainur:jdbc-almaren_2.12:0.0.5-2.4 |
Spark 2.4.x and scala 2.11 | com.github.music-of-the-ainur:jdbc-almaren_2.11:0.0.5-2.4 |
import com.github.music.of.the.ainur.almaren.Almaren
import com.github.music.of.the.ainur.almaren.builder.Core.Implicit
import com.github.music.of.the.ainur.almaren.jdbc.JDBC.JDBCImplicit
import spark.implicits._
val almaren = Almaren("jdbc-almaren")
val updateSourceDf = Seq(
("John", "Jones"),
("David", "Smith"),
("Michael", "Lee"),
("Chris", "Johnson"),
("Mike", "Brown")
).toDF("first_name", "last_name")
val updateQuery = "UPDATE person_info set first_name = ? where last_name = ?"
almaren.builder
.sourceDataFrame(updateSourceDf)
.sql("select monotonically_increasing_id() as __ID__,first_name,last_name from __TABLE__")
.jdbcBatch("jdbc:postgresql://localhost:5432/almaren", "org.postgresql.Driver", updateQuery, 1000, Some("postgres"), Some("postgres"),Map("connectionTimeoutMillis" -> "3000","maxSize"->"10"))
.batch
.count
Parameter | Description | Type |
---|---|---|
url | The JDBC URL to connect to | String |
driver | The class name of the JDBC driver to use to connect to this URL. | String |
query | Query to be executed | String |
batchSize | Number of records that will be send to the database in a single transaction | Int |
user | Database user | Option[String] |
password | Database password | Option[String] |
params | Other extra parameters like connectionTimeout etc ..can be specified | Map[String,String] |
Parameters | Mandatory | Description |
---|---|---|
__ID__ | Yes(Should be the first column) | This field will be in response of jdbc.almaren component, it's useful to join data |
Parameters | Description |
---|---|
__ID__ | Custom ID , This field will be useful to join data |
__URL__ | The JDBC URL used to connect to |
__DRIVER__ | The class name of the JDBC driver to used to connect to this URL |
__QUERY__ | Query executed |
__BATCHSIZE__ | Number of records that will be send to the database in a single transaction |
__ELAPSED_TIME__ | Query Execution time |
__ERROR__ | Error message if query execution fails |
import com.github.music.of.the.ainur.almaren.Almaren
import com.github.music.of.the.ainur.almaren.builder.Core.Implicit
import com.github.music.of.the.ainur.almaren.jdbc.JDBC.JDBCImplicit
import spark.implicits._
val almaren = Almaren("jdbc-almaren")
val mergeQuery =
"""WITH upsert as(
| update
| public.person_info t2
| set
| first_name = t1.first_name,
| last_name = t1.last_name
| from
| person_info_temp t1
| where
| t2.country = t1.country RETURNING t2.*
|)
|insert into
| person_info
|select
| p.first_name,
| p.last_name,
| p.country
|from
| person_info_temp p
|where
| p.country not in (
| select
| q.country
| from
| upsert q
| );""".stripMargin
almaren.builder
.jdbcQuery("jdbc:postgresql://localhost:5432/almaren", "org.postgresql.Driver", mergeQuery, Some("postgres"), Some("postgres"),Map("connectionTimeoutMillis" -> "3000","maxSize"->"10"))
.batch
Parameter | Description | Type |
---|---|---|
url | The JDBC URL to connect to | String |
driver | The class name of the JDBC driver to use to connect to this URL. | String |
query | Query to be executed | String |
user | Database user | Option[String] |
password | Database password | Option[String] |
params | Extra parameters like connectionTimeout etc ..can be specified | Map[String,String] |
Parameter | Description |
---|---|
connectionTimeoutMillis | The time for which the connection gets timed out |
maxSize | Maximum number of connections available |