Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for ArrayType in es.update.script.params #2036

Open
1 of 2 tasks
vararo27 opened this issue Oct 31, 2022 · 4 comments
Open
1 of 2 tasks

Support for ArrayType in es.update.script.params #2036

vararo27 opened this issue Oct 31, 2022 · 4 comments

Comments

@vararo27
Copy link

vararo27 commented Oct 31, 2022

What kind an issue is this?

  • Bug report. If you’ve found a bug, please provide a code snippet or test to reproduce it below.
    The easier it is to track down the bug, the faster it is solved.
  • Feature Request. Start by telling us what problem you’re trying to solve.
    Often a solution already exists! Don’t send pull requests to implement new features without
    first getting our support. Sometimes we leave features out on purpose to keep the project small.

Issue description

Arrays fields are not working with ES Update script. tags is an array in shared example and it fails in transformation stage. Do we have reference to use ArrayType with es.update.script.params

Steps to reproduce

Code:

        List<String> data = Collections.singletonList("{\"Context\":\"129\",\"MessageType\":{\"id\":\"1013\",\"content\":\"Hello World\"},\"Module\":\"1203\",\"time\":3249,\"critical\":0,\"id\":1, \"tags\":[\"user\",\"device\"]}");

        SparkConf sparkConf = new SparkConf();

        sparkConf.set("es.nodes", "localhost");
        sparkConf.set("es.port", "9200");
        sparkConf.set("es.net.ssl", "false");
        sparkConf.set("es.nodes.wan.only", "true");

        SparkSession session = SparkSession.builder().appName("SparkElasticSearchTest").master("local[*]").config(sparkConf).getOrCreate();

        Dataset<Row> df = session.createDataset(data, Encoders.STRING()).toDF();
        Dataset<String> df1 = df.as(Encoders.STRING());
        Dataset<Row> df2 = session.read().json(df1.javaRDD());

        df2.printSchema();
        df2.show(false);

        String script = "ctx._source.Context = params.Context; ctx._source.Module = params.Module; ctx._source.critical = params.critical; ctx._source.id = params.id; if (ctx._source.time == null) {ctx._source.time = params.time} ctx._source.MessageType = new HashMap(); ctx._source.MessageType.put('id', params.MessageTypeId); ctx._source.MessageType.put('content', params.MessageTypeContent); ctx._source.tags = params.tags";

        String ja = "MessageTypeId:MessageType.id, MessageTypeContent:MessageType.content, Context:Context, Module:Module, time:time, critical:critical, id:id, tags:tags";

        DataFrameWriter<Row> dsWriter = df2.write()
                .format("org.elasticsearch.spark.sql")
                .option(ConfigurationOptions.ES_NODES, "localhost")
                .option(ConfigurationOptions.ES_PORT, "9200")
                .option(ConfigurationOptions.ES_NET_USE_SSL, false)
                .option(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
                .option(ConfigurationOptions.ES_MAPPING_ID, "id")
                .option(ConfigurationOptions.ES_WRITE_OPERATION, ConfigurationOptions.ES_OPERATION_UPSERT)
                .option(ConfigurationOptions.ES_UPDATE_SCRIPT_UPSERT, "true")
                .option(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, script)
                .option(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "painless")
                .option(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, ja);


        dsWriter.mode("append");
        dsWriter.save("user-details");

Strack trace:

org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
	at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:136)
	at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:170)
	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:83)
	at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1(EsSparkSQL.scala:103)
	at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1$adapted(EsSparkSQL.scala:103)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
	at org.elasticsearch.spark.sql.DataFrameValueWriter.writeArray(DataFrameValueWriter.scala:75)
	at org.elasticsearch.spark.sql.DataFrameValueWriter.write(DataFrameValueWriter.scala:69)
	at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.doWrite(AbstractBulkFactory.java:153)
	at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:123)
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
	at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:68)
	... 12 more

Version Info

OS: : Any
JVM : 1.8
Hadoop/Spark: 3.2.1
ES-Hadoop : 8.2.2
ES : 7.10.2

Feature description

@masseyke
Copy link
Member

masseyke commented Nov 8, 2022

I have been able to reproduce this and it does look like a bug. It is very similar to #1838. There we added support for arrays of Rows in scripted upserts. It looks like we also need to add support for arrays of other types (Strings in this case).
Here is the scala code I was using to reproduce this (might be the beginning of a unit test) --

val data = Seq(Row("129", Row("1013","Hello World"), "1203", 3249, 0, 1, List("user", "device")))
val messageTypeSchema = new StructType().add("id", StringType).add("content", StringType)
val schema = new StructType().add("Context",StringType).add("MessageType", messageTypeSchema).add("Module",StringType).add("time",IntegerType).add("critical",IntegerType).add("id",IntegerType).add("tags", ArrayType(StringType))
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
val script = "ctx._source.Context = params.Context; ctx._source.Module = params.Module; ctx._source.critical = params.critical; ctx._source.id = params.id; if (ctx._source.time == null) {ctx._source.time = params.time} ctx._source.MessageType = new HashMap(); ctx._source.MessageType.put('id', params.MessageTypeId); ctx._source.MessageType.put('content', params.MessageTypeContent); ctx._source.tags = params.tags"
val ja = "MessageTypeId:MessageType.id, MessageTypeContent:MessageType.content, Context:Context, Module:Module, time:time, critical:critical, id:id, tags:tags"
val dsWriter = df.write.format("org.elasticsearch.spark.sql").option(ConfigurationOptions.ES_MAPPING_ID, "id").option(ConfigurationOptions.ES_WRITE_OPERATION, ConfigurationOptions.ES_OPERATION_UPSERT).option(ConfigurationOptions.ES_UPDATE_SCRIPT_UPSERT, "true").option(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, script).option(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "painless").option(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, ja)
dsWriter.mode("append")
dsWriter.save("user-details")

@masseyke
Copy link
Member

masseyke commented Nov 8, 2022

Something like this seems to fix it. If I get time in the next few weeks I'll try to support other types, properly test it, and get a PR up:

diff --git a/spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/DataFrameValueWriter.scala b/spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/DataFrameValueWriter.scala
index 65274b47..829db41f 100644
--- a/spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/DataFrameValueWriter.scala
+++ b/spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/DataFrameValueWriter.scala
@@ -72,12 +72,23 @@ class DataFrameValueWriter(writeUnknownTypes: Boolean = false) extends Filtering
     }
   }
 
-  private[spark] def writeArray(value: Seq[Row], generator: Generator): Result = {
+  private[spark] def writeArray(value: Seq[AnyRef], generator: Generator): Result = {
     if (value.nonEmpty) {
-      val schema = value.head.schema
-      val result = write(DataTypes.createArrayType(schema), value, generator)
-      if (!result.isSuccesful) {
-        return handleUnknown(value, generator)
+      val firstElement = value.head
+      firstElement match {
+        case r: Row =>
+          val schema = r.schema
+          val result = write(DataTypes.createArrayType(schema), value, generator)
+          if (!result.isSuccesful) {
+            return handleUnknown(value, generator)
+          }
+        case s: String =>
+          val result = write(StringType, value, generator)
+          if (!result.isSuccesful) {
+            return handleUnknown(value, generator)
+          }
+        case _ =>
+          return handleUnknown(value, generator)
       }
     } else {
       generator.writeBeginArray().writeEndArray()

@devJackie
Copy link

@masseyke cc. @vararo27
hi,
When will this issue be merged?
Still getting the same error.

Strack trace:
24/04/30 17:31:38 INFO FileScanRDD: Reading File path: hdfs://hadoop-cdp/cdpdev/de/local/sample/parquet/parquet_sample/part-00000-83b8108b-24e5-435d-ac15-b3589e9d1d74-c000.snappy.parquet, range: 0-708, partition values: [empty row] 24/04/30 17:31:39 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4) org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.spark.sql.Row at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:136) at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:170) at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:83) at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1(EsSparkSQL.scala:103) at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1$adapted(EsSparkSQL.scala:103) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.spark.sql.Row at org.elasticsearch.spark.sql.DataFrameValueWriter.writeArray(DataFrameValueWriter.scala:77) at org.elasticsearch.spark.sql.DataFrameValueWriter.write(DataFrameValueWriter.scala:71) at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.doWrite(AbstractBulkFactory.java:153) at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:123) at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80) at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56) at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:68)

OS: : Any
JVM : 1.8
Hadoop/Spark: 3.2.4
ES-Hadoop : 8.10.4
ES : 8.8.2

@masseyke
Copy link
Member

masseyke commented May 3, 2024

@devJackie I have been unable to carve out the time to work on this. It is unlikely it will get merged soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants