Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option es.read.field.*.include unable to take field names containing a colon #2221

Open
1 of 2 tasks
SebGay opened this issue Apr 23, 2024 · 0 comments
Open
1 of 2 tasks

Comments

@SebGay
Copy link

SebGay commented Apr 23, 2024

What kind an issue is this?

  • Bug report. If you’ve found a bug, please provide a code snippet or test to reproduce it below.
    The easier it is to track down the bug, the faster it is solved.
  • Feature Request. Start by telling us what problem you’re trying to solve.
    Often a solution already exists! Don’t send pull requests to implement new features without
    first getting our support. Sometimes we leave features out on purpose to keep the project small.

Issue description

Hello,

When attempting to use the following configurations: es.read.field.as.array.include or es.read.field.include providing a value that contains a colon causes an org.elasticsearch.hadoop.EsHadoopIllegalArgumentException error.

Steps to reproduce

Code:

Setup for testing
from json import dumps 
def boilerplate(additional_options, index, good_field):
    constant_options = {'es.nodes': 'xxx.xxx.xxx.xxx', 'es.port': '9200', 'es.nodes.resolve.hostname': 'false', 'es.nodes.wan.only': 'true'}

    query = dumps({"query": {"bool":{"filter": {"exists": {"field" : good_field}}}}})

    (
    spark.read 
    .format( "org.elasticsearch.spark.sql" )
    .options(**constant_options)  
    .options(**additional_options)          
    .option( "es.query", query)
    .load(index)
    .select(good_field)
    .display()
    )

Note that the following are palceholders in code for real names: STRUCTNAME, NESTEDFIELDn, PREFIX, SUFFIXn, INDEXn.
INDEX1 does not contain field names with : while INDEX2 only contains fields with :.

Working on nested field in INDEX1:
boilerplate({'es.read.field.as.array.include': 'STRUCTNAME.NESTEDFIELD1'}, "INDEX1", "STRUCTNAME.NESTEDFIELD2")
Working without setting in INDEX2:
boilerplate({}, "INDEX2", "PREFIX:SUFFIX1")
Error (1) with include setting:
boilerplate({'es.read.field.as.array.include': 'PREFIX:SUFFIX2'}, "INDEX2", "PREFIX:SUFFIX1")

results in:

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Failed to parse [es.read.field.as.array.include] option with value of [PREFIX:SUFFIX1]

as does including the suggestion from this thread

boilerplate({'es.mapping.date.rich':'false', 'es.read.field.as.array.include': 'PREFIX:SUFFIX2'}, "INDEX2", "PREFIX:SUFFIX1")
Error (2) with include setting:
boilerplate({'es.read.field.include': 'PREFIX:SUFFIX2'}, "INDEX2", "PREFIX:SUFFIX1")

results in:

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Invalid parameter [dc:contributors] specified in inclusion configuration
Working with exclude
boilerplate({'es.read.field.exclude': 'PREFIX:SUFFIX2'}, "INDEX2", "PREFIX:SUFFIX1")

Trace

Error 1
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Failed to parse [es.read.field.as.array.include] option with value of [PREFIX:SUFFIX2]
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
File <command-2472143304110332>, line 1
----> 1 boilerplate({'es.mapping.date.rich':'false', 'es.read.field.as.array.include': 'PREFIX:SUFFIX2'}, "INDEX2", "PREFIX:SUFFIX1")

File <command-2472143304110327>, line 13, in boilerplate(additional_options, index, good_field)
      3 constant_options = {'es.nodes': '[REDACTED]', 'es.port': '9200', 'es.nodes.resolve.hostname': 'false', 'es.nodes.wan.only': 'true'}
      5 query = dumps({"query": {"bool":{"filter": {"exists": {"field" : good_field}}}}})
      7 (
      8 spark.read 
      9 .format( "org.elasticsearch.spark.sql" )
     10 .options(**constant_options)  
     11 .options(**additional_options)          
     12 .option( "es.query", query)
---> 13 .load(index)
     14 .select(good_field)
     15 .display()
     16 )

File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     45 start = time.perf_counter()
     46 try:
---> 47     res = func(*args, **kwargs)
     48     logger.log_success(
     49         module_name, class_name, function_name, time.perf_counter() - start, signature
     50     )
     51     return res

File /databricks/spark/python/pyspark/sql/readwriter.py:312, in DataFrameReader.load(self, path, format, schema, **options)
    310 self.options(**options)
    311 if isinstance(path, str):
--> 312     return self._df(self._jreader.load(path))
    313 elif path is not None:
    314     if type(path) != list:

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args)
   1349 command = proto.CALL_COMMAND_NAME +\
   1350     self.command_header +\
   1351     args_command +\
   1352     proto.END_COMMAND_PART
   1354 answer = self.gateway_client.send_command(command)
-> 1355 return_value = get_return_value(
   1356     answer, self.gateway_client, self.target_id, self.name)
   1358 for temp_arg in temp_args:
   1359     if hasattr(temp_arg, "_detach"):

File /databricks/spark/python/pyspark/errors/exceptions/captured.py:188, in capture_sql_exception.<locals>.deco(*a, **kw)
    186 def deco(*a: Any, **kw: Any) -> Any:
    187     try:
--> 188         return f(*a, **kw)
    189     except Py4JJavaError as e:
    190         converted = convert_exception(e.java_exception)

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o923.load.
: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Failed to parse [es.read.field.as.array.include] option with value of [PREFIX:SUFFIX2]
	at org.elasticsearch.hadoop.util.SettingsUtils.getFieldArrayFilterInclude(SettingsUtils.java:228)
	at org.elasticsearch.spark.sql.SchemaUtils$.convertToStruct(SchemaUtils.scala:129)
	at org.elasticsearch.spark.sql.SchemaUtils$.discoverMapping(SchemaUtils.scala:93)
	at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:229)
	at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:229)
	at org.elasticsearch.spark.sql.ElasticsearchRelation.$anonfun$schema$1(DefaultSource.scala:233)
	at scala.Option.getOrElse(Option.scala:189)
	at org.elasticsearch.spark.sql.ElasticsearchRelation.schema(DefaultSource.scala:233)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:503)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:384)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:340)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:340)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:244)
	at sun.reflect.GeneratedMethodAccessor430.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
	at py4j.Gateway.invoke(Gateway.java:306)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Invalid parameter [PREFIX:SUFFIX2] specified in inclusion configuration
	at org.elasticsearch.hadoop.serialization.field.FieldFilter.toNumberedFilter(FieldFilter.java:189)
	at org.elasticsearch.hadoop.util.SettingsUtils.getFieldArrayFilterInclude(SettingsUtils.java:226)
	... 24 more
Caused by: java.lang.NumberFormatException: For input string: "SUFFIX2"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at org.elasticsearch.hadoop.serialization.field.FieldFilter.toNumberedFilter(FieldFilter.java:184)
	... 25 more

Error 2
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Invalid parameter [PREFIX:SUFFIX1] specified in inclusion configuration
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
File <command-2472143304110330>, line 1
----> 1 boilerplate({'es.read.field.include': 'PREFIX:SUFFIX1'}, "INDEX2", "PREFIX:SUFFIX2")

File <command-2472143304110327>, line 13, in boilerplate(additional_options, index, good_field)
      3 constant_options = {'es.nodes': '[REDACTED]', 'es.port': '9200', 'es.nodes.resolve.hostname': 'false', 'es.nodes.wan.only': 'true'}
      5 query = dumps({"query": {"bool":{"filter": {"exists": {"field" : good_field}}}}})
      7 (
      8 spark.read 
      9 .format( "org.elasticsearch.spark.sql" )
     10 .options(**constant_options)  
     11 .options(**additional_options)          
     12 .option( "es.query", query)
---> 13 .load(index)
     14 .select(good_field)
     15 .display()
     16 )

File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     45 start = time.perf_counter()
     46 try:
---> 47     res = func(*args, **kwargs)
     48     logger.log_success(
     49         module_name, class_name, function_name, time.perf_counter() - start, signature
     50     )
     51     return res

File /databricks/spark/python/pyspark/sql/readwriter.py:312, in DataFrameReader.load(self, path, format, schema, **options)
    310 self.options(**options)
    311 if isinstance(path, str):
--> 312     return self._df(self._jreader.load(path))
    313 elif path is not None:
    314     if type(path) != list:

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args)
   1349 command = proto.CALL_COMMAND_NAME +\
   1350     self.command_header +\
   1351     args_command +\
   1352     proto.END_COMMAND_PART
   1354 answer = self.gateway_client.send_command(command)
-> 1355 return_value = get_return_value(
   1356     answer, self.gateway_client, self.target_id, self.name)
   1358 for temp_arg in temp_args:
   1359     if hasattr(temp_arg, "_detach"):

File /databricks/spark/python/pyspark/errors/exceptions/captured.py:188, in capture_sql_exception.<locals>.deco(*a, **kw)
    186 def deco(*a: Any, **kw: Any) -> Any:
    187     try:
--> 188         return f(*a, **kw)
    189     except Py4JJavaError as e:
    190         converted = convert_exception(e.java_exception)

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o898.load.
: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Invalid parameter [PREFIX:SUFFIX1] specified in inclusion configuration
	at org.elasticsearch.hadoop.serialization.field.FieldFilter.toNumberedFilter(FieldFilter.java:189)
	at org.elasticsearch.hadoop.serialization.dto.mapping.Mapping.filter(Mapping.java:86)
	at org.elasticsearch.hadoop.serialization.dto.mapping.MappingUtils.filterMapping(MappingUtils.java:138)
	at org.elasticsearch.spark.sql.SchemaUtils$.discoverMappingAndGeoFields(SchemaUtils.scala:109)
	at org.elasticsearch.spark.sql.SchemaUtils$.discoverMapping(SchemaUtils.scala:92)
	at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:229)
	at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:229)
	at org.elasticsearch.spark.sql.ElasticsearchRelation.$anonfun$schema$1(DefaultSource.scala:233)
	at scala.Option.getOrElse(Option.scala:189)
	at org.elasticsearch.spark.sql.ElasticsearchRelation.schema(DefaultSource.scala:233)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:503)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:384)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:340)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:340)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:244)
	at sun.reflect.GeneratedMethodAccessor430.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
	at py4j.Gateway.invoke(Gateway.java:306)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NumberFormatException: For input string: "SUFFIX1"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at org.elasticsearch.hadoop.serialization.field.FieldFilter.toNumberedFilter(FieldFilter.java:184)
	... 26 more

Version Info

OS: Databricks Runtime 14.2 on Azure
JVM : Unknown
Hadoop/Spark: Spark 3.5.0
ES-Hadoop : org.elasticsearch:elasticsearch-spark-30_2.12:7.17.1
ES : 7.17.1

Feature description

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant