-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-50285] Metrics for commits to StagedTable instances #48830
[SPARK-50285] Metrics for commits to StagedTable instances #48830
Conversation
cc @cloud-fan Can you take a look? I saw you originally implemented the Staged Table interface |
…for-staged-table-commits
FYI @manuzhang this PR intoroduces analogous interfaces to the ones you added in Write for staged tables |
…for-staged-table-commits
…for-staged-table-commits
...e/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
Outdated
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
Outdated
Show resolved
Hide resolved
@@ -148,6 +150,8 @@ case class ReplaceTableAsSelectExec( | |||
|
|||
val properties = CatalogV2Util.convertTableProperties(tableSpec) | |||
|
|||
override val metrics: Map[String, SQLMetric] = commitMetrics(catalog) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the atomic version of RTAS, does it have commit metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK this probably doesn't matter, as we return Map.empty
if the catalog is not staged.
Then maybe we can move this to V2CreateTableAsSelectBaseExec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
V2CreateTableAsSelectBaseExec does not have access to catalog. But I changed the signature of commitMetrics() to only accept StagedTableCatalog and removed it here, that looks cleaner
...e/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
Outdated
Show resolved
Hide resolved
@@ -631,7 +641,18 @@ private[v2] trait V2CreateTableAsSelectBaseExec extends LeafV2CommandExec { | |||
qe.assertCommandExecuted() | |||
|
|||
table match { | |||
case st: StagedTable => st.commitStagedChanges() | |||
case st: StagedTable => | |||
st.commitStagedChanges() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add a def commitStagedTable
in DataSourceV2Utils
, which does the commit work and also reports driver metrics. Then we can reuse this method in both AtomicReplaceTableExec
and here.
@@ -612,6 +616,12 @@ case class DeltaWithMetadataWritingSparkTask( | |||
private[v2] trait V2CreateTableAsSelectBaseExec extends LeafV2CommandExec { | |||
override def output: Seq[Attribute] = Nil | |||
|
|||
protected def commitMetrics(tableCatalog: StagingTableCatalog): Map[String, SQLMetric] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe move this to DataSourceV2Utils
as well so that we can reuse it in ReplaceTableExec
?
Thanks, merging to master |
What changes were proposed in this pull request?
Commands that commit using the StagedTable interface do not have any metrics at the moment because the interface does not support retrieving metrics after a commit. This PR
Why are the changes needed?
Many create table commands currently return no metrics at all
Does this PR introduce any user-facing change?
No
How was this patch tested?
New tests with a test catalog for the affected commands
Was this patch authored or co-authored using generative AI tooling?
No