Skip to content

Commit

Permalink
issue #827 Elasticsearch: Cannot extract source URI
Browse files Browse the repository at this point in the history
  • Loading branch information
wajda committed Sep 8, 2024
1 parent 61bb536 commit 5a9ebdb
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,35 @@ class ElasticSearchPlugin
import za.co.absa.spline.commons.ExtractorImplicits._

override def baseRelationProcessor: PartialFunction[(BaseRelation, LogicalRelation), ReadNodeInfo] = {
case (`_: ElasticsearchRelation`(esr), _) =>
case (`_: org.elasticsearch.spark.sql.ElasticsearchRelation`(esr), _) =>
val parameters = extractValue[SparkSettings](esr, "cfg")
val server = parameters.getProperty("es.nodes")
val indexDocType = parameters.getProperty("es.resource")
ReadNodeInfo(asSourceId(server, indexDocType), Map.empty)
}

override def relationProviderProcessor: PartialFunction[(AnyRef, SaveIntoDataSourceCommand), WriteNodeInfo] = {
case (rp, cmd) if rp == "es" || ElasticSearchSourceExtractor.matches(rp) =>
val indexDocType = cmd.options("path")
val server = cmd.options("es.nodes")
case (rp, cmd) if rp == "es" || `_: org.elasticsearch.spark.sql.DefaultSource`.matches(rp) =>

val server = cmd
.options.getOrElse("es.nodes", sys.error(s"ElasticSearch: Cannot extract server from the options keys: ${cmd.options.keySet mkString ","}"))

val indexDocType = cmd
.options.get("path")
.orElse(cmd.options.get("es.resource"))
.getOrElse(sys.error(s"ElasticSearch: Cannot extract index and doc type from the options keys: ${cmd.options.keySet mkString ","}"))

WriteNodeInfo(asSourceId(server, indexDocType), cmd.mode, cmd.query, cmd.options)
}
}

object ElasticSearchPlugin {

private object `_: ElasticsearchRelation` extends SafeTypeMatchingExtractor[AnyRef]("org.elasticsearch.spark.sql.ElasticsearchRelation")
private object `_: org.elasticsearch.spark.sql.ElasticsearchRelation`
extends SafeTypeMatchingExtractor[AnyRef]("org.elasticsearch.spark.sql.ElasticsearchRelation")

private object ElasticSearchSourceExtractor extends SafeTypeMatchingExtractor(classOf[org.elasticsearch.spark.sql.DefaultSource15])
private object `_: org.elasticsearch.spark.sql.DefaultSource`
extends SafeTypeMatchingExtractor[AnyRef]("org.elasticsearch.spark.sql.DefaultSource")

private def asSourceId(server: String, indexDocType: String) =
SourceIdentifier(Some("elasticsearch"), s"elasticsearch://$server/$indexDocType")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,36 @@ import org.apache.spark.sql.{DataFrame, Row, SaveMode}
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import org.testcontainers.elasticsearch.ElasticsearchContainer
import za.co.absa.spline.ElasticSearchSpec.Formats
import za.co.absa.spline.commons.io.TempDirectory
import za.co.absa.spline.test.fixture.spline.SplineFixture
import za.co.absa.spline.test.fixture.{ReleasableResourceFixture, SparkFixture}


object ElasticSearchSpec {
private val Formats = Seq("es", "org.elasticsearch.spark.sql")
}

class ElasticSearchSpec
extends AsyncFlatSpec
with Matchers
with SparkFixture
with SplineFixture
with ReleasableResourceFixture {

it should "support ES" in {
for (format <- Formats) it should s"""support `format("$format")` with `save/load("<index>")`""" in {
usingResource(new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.9.2")) { container =>
container.start()

val index = "test"
val docType = "test"
val esNodes = container.getHost
val esPort = container.getFirstMappedPort
val esOptions = Map("es.nodes" -> esNodes, "es.port" -> esPort.toString, "es.nodes.wan.only" -> "true")
val esOptions = Map(
"es.nodes" -> esNodes,
"es.port" -> esPort.toString,
"es.nodes.wan.only" -> "true",
)

withNewSparkSession(implicit spark => {
withLineageTracking { captor =>
Expand All @@ -57,15 +66,15 @@ class ElasticSearchSpec
.write
.mode(SaveMode.Append)
.options(esOptions)
.format("es")
.format(format)
.save(s"$index/$docType")
}

(plan2, _) <- captor.lineageOf {
val df = spark
.read
.options(esOptions)
.format("es")
.format(format)
.load(s"$index/$docType")

df.write.save(TempDirectory(pathOnly = true).deleteOnExit().path.toString)
Expand All @@ -85,4 +94,60 @@ class ElasticSearchSpec
}
}

for (format <- Formats) it should s"""support `format("$format")` with `option("es.resource", "<index>")`""" in {
usingResource(new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.9.2")) { container =>
container.start()

val index = "test"
val docType = "test"
val esNodes = container.getHost
val esPort = container.getFirstMappedPort
val esOptions = Map(
"es.nodes" -> esNodes,
"es.port" -> esPort.toString,
"es.nodes.wan.only" -> "true",
"es.resource" -> s"$index/$docType",
)

withNewSparkSession(implicit spark => {
withLineageTracking { captor =>
val testData: DataFrame = {
val schema = StructType(StructField("id", IntegerType, nullable = false) :: StructField("name", StringType, nullable = false) :: Nil)
val rdd = spark.sparkContext.parallelize(Row(1014, "Warsaw") :: Row(1002, "Corte") :: Nil)
spark.sqlContext.createDataFrame(rdd, schema)
}

for {
(plan1, _) <- captor.lineageOf {
testData
.write
.mode(SaveMode.Append)
.options(esOptions)
.format(format)
.save()
}

(plan2, _) <- captor.lineageOf {
val df = spark
.read
.options(esOptions)
.format(format)
.load()

df.write.save(TempDirectory(pathOnly = true).deleteOnExit().path.toString)
}
} yield {

plan1.operations.write.append shouldBe true
plan1.operations.write.extra("destinationType") shouldBe Some("elasticsearch")
plan1.operations.write.outputSource shouldBe s"elasticsearch://$esNodes/$index/$docType"

plan2.operations.reads.head.inputSources.head shouldBe plan1.operations.write.outputSource
plan2.operations.reads.head.extra("sourceType") shouldBe Some("elasticsearch")
plan2.operations.write.append shouldBe false
}
}
})
}
}
}

0 comments on commit 5a9ebdb

Please sign in to comment.