Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for spark sql #113

Open
wants to merge 18 commits into
base: main
Choose a base branch
from

Conversation

jphalip
Copy link
Collaborator

@jphalip jphalip commented Oct 27, 2023

No description provided.

@jphalip
Copy link
Collaborator Author

jphalip commented Oct 28, 2023

/gcbrun

this.jobDetails = jobDetails;
String taskAttemptID = UUID.randomUUID().toString();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are relying on overwriting task attempt file when there are multiple attempts to handle attempt failures (and cleanup?). if each attempt has a different file, how is attempt faiilures handled? also preserving the task attempt info helps troubleshooting.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I changed this is because there is no available hive query id in the Hadoop conf when using Spark SQL. Do you know how else we could handle this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yigress Can you think of a way to reproduce this situation? It would be good to cover that in our test suite.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test would have first few attempts fail then another attempt succeed. I can't think of a good way to do that. maybe another thread monitor the writing location and switching permissions between first and later attempts, or something similar?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so I've added some code to retrieve the task ID from Spark. Let me know what you think

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: The Spark task ID now uses the Spark app ID and the partition ID. The temp output folder also uses the ".hive-staging-XXX" folder.

Class<?> taskContextClass = Class.forName("org.apache.spark.TaskContext");
Method getMethod = taskContextClass.getMethod("get");
Object taskContext = getMethod.invoke(null);
Method taskAttemptIdMethod = taskContextClass.getMethod("taskAttemptId");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want task id not task attempt id here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Hive we are getting the task attempt ID. Isn't that right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are only getting task id for hive. this remind that we need to set readme that spark/hive speculative is not supported.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are writing output file to be just task id. JobUtils#getTaskWriterOutputFile. here we can have the task attempt id just need to make sure the taskid can be fetched from it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this works if the spark application just fails when the insert query fails. however if user choose not to fail the application (for example try catch the query failure) or using spark-shell, it will be the same spark application, if user do another query either insert into same table or insert into another table, what would happen if the spark job info file already exists?

@yigress
Copy link
Collaborator

yigress commented Nov 29, 2023

add acceptance test or integration test for 'insert overwrite'? test on dataproc-2.1 shows incorrect results
``scala> spark.sql("select * from ba").show
+---+----+
| id|name|
+---+----+
| 11| aaa|
| 22| bbb|
+---+----+

scala> spark.sql("insert overwrite table snoop select * from ba")
res8: org.apache.spark.sql.DataFrame = []

scala> spark.sql("select * from snoop").show
+---+-------+
|num|str_val|
+---+-------+
| 11| aaa|
| 22| bbb|
| 1| aaa|
| 2| bbb|
| 11| aaa|
| 22| bbb|
| 3| ccc|
| 4| ddd|
| 11| aaa|
| 22| bbb|
+---+-------+```

Object partitionId = partitionIdMethod.invoke(taskContext);
Method stageIdMethod = taskContextClass.getMethod("stageId");
Object stageId = stageIdMethod.invoke(taskContext);
return String.format("stage-%s-partition-%s", stageId, partitionId);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right now all stream ref files are dumped into a parent path of .../

/ and we are assuming there is only 1 stage write to the table? if there are multiple stages write into same table, then for each stage all the stream ref files will be picked up, this will have data correctness issue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the code to use the ".hive-staging-XXXX" folder name, when present.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the getQueryTempOutputPath() method. We check for the presence of the FileOutputFormat.OUTDIR (mapreduce.output.fileoutputformat.outputdir) property, which is set by Spark. If present, then we extract the "hive-staging-XXXX" part of that folder name and then use that for our own temp output dir.

@jphalip
Copy link
Collaborator Author

jphalip commented Dec 8, 2023

@yigress INSERT OVERWRITE should now work. I've added an integration test for it as well.

@yigress
Copy link
Collaborator

yigress commented Dec 18, 2023

can you run some tests with dataproc cluster?
i tried this

spark-shell --conf spark.sql.hive.convertMetastoreBigquery=false --conf spark.sql.extensions=com.google.cloud.hive.bigquery.connector.sparksql.HiveBigQuerySparkSQLExtension --jars gs://yigress/test/hive-2-bigquery-connector-2.0.0-SNAPSHOT.jar

scala> spark.sql("select * from region").show
+-----------+-----------+--------------------+
|r_regionkey| r_name| r_comment|
+-----------+-----------+--------------------+
| 0| AFRICA|lar deposits. bli...|
| 1| AMERICA|hs use ironic, ev...|
| 2| ASIA|ges. thinly even ...|
| 3| EUROPE|ly final courts c...|
| 4|MIDDLE EAST|uickly special ac...|
+-----------+-----------+--------------------+

spark.sql("insert into region2 select * from region")
spark.sql("select * from region2")

scala> spark.sql("select * from region2").show
+-----------+------+---------+
|r_regionkey|r_name|r_comment|
+-----------+------+---------+
+-----------+------+---------+

there is no result, nor error message

it looks like a delay in spark query result when insert into an emtpy bq table. the results eventually show up when query again at a later time.

@yigress
Copy link
Collaborator

yigress commented Dec 20, 2023

there are bunch errors in the integration test

OK
2023-12-19 00:35:24 ERROR Unable to evaluate org.apache.hadoop.hive.ql.udf.generic.GenericUDFNamedStruct@6afcb427. Return value unrecoginizable.
2023-12-19 00:35:24 ERROR Unable to evaluate org.apache.hadoop.hive.ql.udf.generic.GenericUDFNamedStruct@6afcb427. Return value unrecoginizable.
2023-12-19 00:35:24 ERROR Unable to evaluate org.apache.hadoop.hive.ql.udf.generic.GenericUDFMap@ad168b5. Return value unrecoginizable.
2023-12-19 00:35:24 ERROR Unable to evaluate org.apache.hadoop.hive.ql.udf.generic.GenericUDFMap@ad168b5. Return value unrecoginizable.
2023-12-19 00:35:24 ERROR Unable to evaluate org.apache.hadoop.hive.ql.udf.generic.GenericUDFNamedStruct@6afcb427. Return value unrecoginizable.
2023-12-19 00:35:24 ERROR Unable to evaluate org.apache.hadoop.hive.ql.udf.generic.GenericUDFNamedStruct@6afcb427. Return value unrecoginizable.
2023-12-19 00:35:24 ERROR Unable to evaluate org.apache.hadoop.hive.ql.udf.generic.GenericUDFMap@ad168b5. Return value unrecoginizable.
2023-12-19 00:35:24 ERROR Unable to evaluate org.apache.hadoop.hive.ql.udf.generic.GenericUDFMap@ad168b5. Return value unrecoginizable.
2023-12-19 00:35:24 ERROR Unable to evaluate org.apache.hadoop.hive.ql.udf.generic.GenericUDFNamedStruct@6afcb427. Return value unrecoginizable.
2023-12-19 00:35:24 ERROR Unable to evaluate org.apache.hadoop.hive.ql.udf.generic.GenericUDFNamedStruct@6afcb427. Return value unrecoginizable.
2023-12-19 00:35:24 ERROR Unable to evaluate org.apache.hadoop.hive.ql.udf.generic.GenericUDFMap@ad168b5. Return value unrecoginizable.
2023-12-19 00:35:24 ERROR Unable to evaluate org.apache.hadoop.hive.ql.udf.generic.GenericUDFMap@ad168b5. Return value unrecoginizable.
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
Query ID = root_20231219003523_43efc817-d4b1-4170-9400-f70966da8691
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Job running in-process (local Hadoop)
2023-12-19 00:35:24,904 Stage-3 map = 0%, reduce = 0%
2023-12-19 00:35:24 ERROR java.lang.NoClassDefFoundError: Could not initialize class io.grpc.netty.Utils
at io.grpc.netty.UdsNettyChannelProvider.isAvailable(UdsNettyChannelProvider.java:34)
at io.grpc.ManagedChannelRegistry$ManagedChannelPriorityAccessor.isAvailable(ManagedChannelRegistry.java:211)
at io.grpc.ManagedChannelRegistry$ManagedChannelPriorityAccessor.isAvailable(ManagedChannelRegistry.java:207)
at io.grpc.ServiceProviders.loadAll(ServiceProviders.java:68)
at io.grpc.ManagedChannelRegistry.getDefaultRegistry(ManagedChannelRegistry.java:101)
at io.grpc.ManagedChannelProvider.provider(ManagedChannelProvider.java:43)
at io.grpc.ManagedChannelBuil


@Override
public void commitJob(org.apache.hadoop.mapreduce.JobContext jobContext) throws IOException {
SparkSQLUtils.cleanUpSparkJobFile(jobContext.getConfiguration());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this spark job file be moved to inside per query? right now it is per spark application, in case a previous query failure or the committer not called, this existing file will cause problem for next query?

Copy link
Collaborator Author

@jphalip jphalip Dec 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this Spark job file only contains one piece of information: what tables are "INSERT" and what tables are "INSERT OVERWRITE". So I think it's relevant for the whole Spark application. Makes sense?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this works if the spark application fails when the insert query fails. but if user choose to handle the spark sql failure and continue the spark application (maybe a try catch), or user run in spark-shell, the same spark application can continue, so later another insert query (either insert into same table or different table), what would happen?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case I think the spark job file would be overwritten when a new query is run.

BTW, if you run multiple queries as part of the same Spark shell session, would the spark app ID be the same for all queries, would they be unique?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they will have the same application id.
the logic looks like readOrElseCreate the spark job json file for each query?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I was using some readOrElseCreate logic because somehow Spark calls the apply() method multiple times. I've just changed it to always write the file. That way it should always be overwritten. What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would that cause locking issue if there are concurrent writes to this file? Is there difficulty of moving this under the temporary query directory (or is the temporary query directory is not available at this time)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yigress What do you mean by "temporary query directory"? Are you referring to the directory named after hive.query.id when using plain Hive? If so, as far as I can tell, there's no way to retrieve a unique ID for the query provided in the query plan object in the Spark extension strategy's apply() function. Or is there?

The challenge here is that we'd need some unique way of identifying the query that would be deterministically created or retrieved both from the Spark extension and from the Hive storage handler.

For now, the only thing I'm aware of is the spark app ID. But as you mentioned, that's not enough to identify a specific query.

Another question for you: If you run multiple queries as part of the same Spark Shell session (i.e. the same Spark App ID), could multiple queries be run in parallel, or could they only be run in sequence? If in sequence, then perhaps the Spark App ID is enough. But if in parallel, then we're in trouble :)

@jphalip
Copy link
Collaborator Author

jphalip commented Dec 28, 2023

@yigress The integration tests are now passing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants