From 5a9ebdbeb553af115faca627aa8e188a7e4e4a9b Mon Sep 17 00:00:00 2001 From: Oleksandr Vayda Date: Sun, 8 Sep 2024 18:22:22 +0200 Subject: [PATCH] issue #827 Elasticsearch: Cannot extract source URI --- .../plugin/embedded/ElasticSearchPlugin.scala | 21 ++++-- .../za/co/absa/spline/ElasticSearchSpec.scala | 73 ++++++++++++++++++- 2 files changed, 84 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/ElasticSearchPlugin.scala b/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/ElasticSearchPlugin.scala index da3d15a7..43df6837 100644 --- a/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/ElasticSearchPlugin.scala +++ b/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/ElasticSearchPlugin.scala @@ -38,7 +38,7 @@ class ElasticSearchPlugin import za.co.absa.spline.commons.ExtractorImplicits._ override def baseRelationProcessor: PartialFunction[(BaseRelation, LogicalRelation), ReadNodeInfo] = { - case (`_: ElasticsearchRelation`(esr), _) => + case (`_: org.elasticsearch.spark.sql.ElasticsearchRelation`(esr), _) => val parameters = extractValue[SparkSettings](esr, "cfg") val server = parameters.getProperty("es.nodes") val indexDocType = parameters.getProperty("es.resource") @@ -46,18 +46,27 @@ class ElasticSearchPlugin } override def relationProviderProcessor: PartialFunction[(AnyRef, SaveIntoDataSourceCommand), WriteNodeInfo] = { - case (rp, cmd) if rp == "es" || ElasticSearchSourceExtractor.matches(rp) => - val indexDocType = cmd.options("path") - val server = cmd.options("es.nodes") + case (rp, cmd) if rp == "es" || `_: org.elasticsearch.spark.sql.DefaultSource`.matches(rp) => + + val server = cmd + .options.getOrElse("es.nodes", sys.error(s"ElasticSearch: Cannot extract server from the options keys: ${cmd.options.keySet mkString ","}")) + + val indexDocType = cmd + .options.get("path") + .orElse(cmd.options.get("es.resource")) + .getOrElse(sys.error(s"ElasticSearch: Cannot extract index and doc type from the options keys: ${cmd.options.keySet mkString ","}")) + WriteNodeInfo(asSourceId(server, indexDocType), cmd.mode, cmd.query, cmd.options) } } object ElasticSearchPlugin { - private object `_: ElasticsearchRelation` extends SafeTypeMatchingExtractor[AnyRef]("org.elasticsearch.spark.sql.ElasticsearchRelation") + private object `_: org.elasticsearch.spark.sql.ElasticsearchRelation` + extends SafeTypeMatchingExtractor[AnyRef]("org.elasticsearch.spark.sql.ElasticsearchRelation") - private object ElasticSearchSourceExtractor extends SafeTypeMatchingExtractor(classOf[org.elasticsearch.spark.sql.DefaultSource15]) + private object `_: org.elasticsearch.spark.sql.DefaultSource` + extends SafeTypeMatchingExtractor[AnyRef]("org.elasticsearch.spark.sql.DefaultSource") private def asSourceId(server: String, indexDocType: String) = SourceIdentifier(Some("elasticsearch"), s"elasticsearch://$server/$indexDocType") diff --git a/integration-tests/src/test/scala/za/co/absa/spline/ElasticSearchSpec.scala b/integration-tests/src/test/scala/za/co/absa/spline/ElasticSearchSpec.scala index 67c843c0..934a11b7 100644 --- a/integration-tests/src/test/scala/za/co/absa/spline/ElasticSearchSpec.scala +++ b/integration-tests/src/test/scala/za/co/absa/spline/ElasticSearchSpec.scala @@ -21,11 +21,16 @@ import org.apache.spark.sql.{DataFrame, Row, SaveMode} import org.scalatest.flatspec.AsyncFlatSpec import org.scalatest.matchers.should.Matchers import org.testcontainers.elasticsearch.ElasticsearchContainer +import za.co.absa.spline.ElasticSearchSpec.Formats import za.co.absa.spline.commons.io.TempDirectory import za.co.absa.spline.test.fixture.spline.SplineFixture import za.co.absa.spline.test.fixture.{ReleasableResourceFixture, SparkFixture} +object ElasticSearchSpec { + private val Formats = Seq("es", "org.elasticsearch.spark.sql") +} + class ElasticSearchSpec extends AsyncFlatSpec with Matchers @@ -33,7 +38,7 @@ class ElasticSearchSpec with SplineFixture with ReleasableResourceFixture { - it should "support ES" in { + for (format <- Formats) it should s"""support `format("$format")` with `save/load("")`""" in { usingResource(new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.9.2")) { container => container.start() @@ -41,7 +46,11 @@ class ElasticSearchSpec val docType = "test" val esNodes = container.getHost val esPort = container.getFirstMappedPort - val esOptions = Map("es.nodes" -> esNodes, "es.port" -> esPort.toString, "es.nodes.wan.only" -> "true") + val esOptions = Map( + "es.nodes" -> esNodes, + "es.port" -> esPort.toString, + "es.nodes.wan.only" -> "true", + ) withNewSparkSession(implicit spark => { withLineageTracking { captor => @@ -57,7 +66,7 @@ class ElasticSearchSpec .write .mode(SaveMode.Append) .options(esOptions) - .format("es") + .format(format) .save(s"$index/$docType") } @@ -65,7 +74,7 @@ class ElasticSearchSpec val df = spark .read .options(esOptions) - .format("es") + .format(format) .load(s"$index/$docType") df.write.save(TempDirectory(pathOnly = true).deleteOnExit().path.toString) @@ -85,4 +94,60 @@ class ElasticSearchSpec } } + for (format <- Formats) it should s"""support `format("$format")` with `option("es.resource", "")`""" in { + usingResource(new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.9.2")) { container => + container.start() + + val index = "test" + val docType = "test" + val esNodes = container.getHost + val esPort = container.getFirstMappedPort + val esOptions = Map( + "es.nodes" -> esNodes, + "es.port" -> esPort.toString, + "es.nodes.wan.only" -> "true", + "es.resource" -> s"$index/$docType", + ) + + withNewSparkSession(implicit spark => { + withLineageTracking { captor => + val testData: DataFrame = { + val schema = StructType(StructField("id", IntegerType, nullable = false) :: StructField("name", StringType, nullable = false) :: Nil) + val rdd = spark.sparkContext.parallelize(Row(1014, "Warsaw") :: Row(1002, "Corte") :: Nil) + spark.sqlContext.createDataFrame(rdd, schema) + } + + for { + (plan1, _) <- captor.lineageOf { + testData + .write + .mode(SaveMode.Append) + .options(esOptions) + .format(format) + .save() + } + + (plan2, _) <- captor.lineageOf { + val df = spark + .read + .options(esOptions) + .format(format) + .load() + + df.write.save(TempDirectory(pathOnly = true).deleteOnExit().path.toString) + } + } yield { + + plan1.operations.write.append shouldBe true + plan1.operations.write.extra("destinationType") shouldBe Some("elasticsearch") + plan1.operations.write.outputSource shouldBe s"elasticsearch://$esNodes/$index/$docType" + + plan2.operations.reads.head.inputSources.head shouldBe plan1.operations.write.outputSource + plan2.operations.reads.head.extra("sourceType") shouldBe Some("elasticsearch") + plan2.operations.write.append shouldBe false + } + } + }) + } + } }