Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do I have to restart the Spark streaming job for new schema to take effect ? #309

Open
akshayar opened this issue Oct 3, 2022 · 8 comments

Comments

@akshayar
Copy link

akshayar commented Oct 3, 2022

I was trying Abris library and consuming CDC record generated by Debezium.

val abrisConfig: FromAvroConfig = (AbrisConfig
  .fromConfluentAvro
  .downloadReaderSchemaByLatestVersion
  .andTopicNameStrategy(topicName)
  .usingSchemaRegistry(schemaRegistryURL))

val df=(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokerServers)
  .option("subscribe", topicName)
  .load())

val deserializedAvro = (df
  .select(from_avro(col("value"), abrisConfig)
          .as("data"))
  .select(col("data.after.*")))
deserializedAvro.printSchema()

val query = (deserializedAvro
  .writeStream
  .format("console")
  .outputMode("append")
  .option("checkpointLocation", s"s3://$bucketName/checkpoints/$tableName")
  .trigger(Trigger.ProcessingTime("60 seconds"))
  .start())

I added column while the streaming job is running. I was expecting it to print the new col that I added. It did not. Does it not dynamically refresh the schema from the version information in the payload ?
Do I have to restart the spark streaming job to process/view new columns ?

@kevinwallimann
Copy link
Collaborator

Hi @akshayar
In Spark, it's impossible to change the schema during a run. So, indeed you have to restart the streaming job to consume the new field.

@akshayar
Copy link
Author

Thanks @kevinwallimann . Is this a feature that you are considering to implement ? Or the nature of problem itself is such that it can not be solved?

@kevinwallimann
Copy link
Collaborator

Hi @akshayar This is a limitation of Spark, so it's not in the scope of this project.

@github-raphael-douyere
Copy link

What could be done is to run Spark's streaming "forEachBatch" and initialize the AbrisConfig inside it. But it is a really limiting approach.

I'm a bit surprised that there is no way to implement a refresh behaviour. What if AvroDataToCatalyst did re-read the readerSchema from time to time and re-instantiate an AbrisAvroDeserializer based on a fresh schema ? In spark, UDFs can have side-effects and this problem seems quite similar to me.

@cerveada
Copy link
Collaborator

cerveada commented Dec 1, 2022

Look at this code:

.select(from_avro(col("value"), abrisConfig)

from_avro is called here to create a Spark Expression. The expression has method dataType that is called by Spark to get what type will the column in resulting DataFrame have. Abris return the type converted from the reader schema.

If we later decide to change the reader schema we still have no way to change the Spark type, because it calls us not the other way.

As far as I know DataFrame`s dataType cannot be changed you can only create new dataFrame so that is other limitation here.

I am not expert on Spark streaming so if you think this can be done please provide some code example how this should work.

@github-raphael-douyere
Copy link

Thanks for the answer. I see the issue: Spark infers the DataFrame schema only once and it cannot changed.

An alternative would be to be able to stop the app automatically when a schema change happens so that it could be restarted (watchdog) with the new schema.

@kevinwallimann
Copy link
Collaborator

Hi @github-raphael-douyere Spot on. Theoretically, it may be possible to change the logical plan from one microbatch to the next one, but I don't know how this could be implemented or if it would break something else in Spark.

Your suggestion to restart a query when a schema change has been detected can work. In case of a schema change in the middle of a microbatch, some messages might still be consumed using the old schema.

Maybe these constraints aren't present with the continuous mode, but I'm no expert about the continuous mode.

If you find a solution to your problem, please feel free to share it here.

@talperetz1
Copy link

Hi, I am facing the same issue just wanted to know if there was a solution to this problem.
Thanks
@akshayar @github-raphael-douyere

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants