Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE]Add dedup PPL Command #421

Closed
YANG-DB opened this issue Jul 11, 2024 · 2 comments
Closed

[FEATURE]Add dedup PPL Command #421

YANG-DB opened this issue Jul 11, 2024 · 2 comments
Labels
enhancement New feature or request Lang:PPL Pipe Processing Language support

Comments

@YANG-DB
Copy link
Member

YANG-DB commented Jul 11, 2024

Is your feature request related to a problem?
Add dedup command for PPL spark based execution driver

PPL Dedup

Do you have any additional context?

@YANG-DB YANG-DB added enhancement New feature or request untriaged Lang:PPL Pipe Processing Language support labels Jul 11, 2024
@anasalkouz anasalkouz moved this to Todo in PPL Commands Jul 25, 2024
@LantaoJin
Copy link
Member

I'm starting to contribute it.

@LantaoJin
Copy link
Member

LantaoJin commented Aug 6, 2024

Design Doc

Syntax

dedup [int] <field-list> keepempty=<bool>] [consecutive=<bool>]

int also known as allowedDuplication, the number of duplicate events must be greater than 0.

We can separate this feature to 2 parts:

  • allowedDuplication = 1
  • allowedDuplication > 1

Part 1

For allowedDuplication = 1, we translate following three PPL dedup commands to different Spark Logical Plans. (assuming the <field-list> is a, b)

  1. keepempty=false: | dedup [1] a, b [keepempty=false]
Deduplicate ['a, 'b]
+- Filter (isnotnull('a) AND isnotnull('b))
     +- Project
          +- UnresolvedRelation
  1. keepempty=true : | dedup [1] a, b keepempty=true
Union
:- Deduplicate ['a, 'b]
:   +- Filter (isnotnull('a) AND isnotnull('b))
:        +- Project
:             +- UnresolvedRelation
+- Filter (isnull('a) OR isnull('b))
     +- Project
          +- UnresolvedRelation
  1. consecutive=true: | dedup [1] a, b [keepempty=true] consecutive=true
UnsupportedOperationException("Consecutive deduplication is not supported")

Question Deduplicate or DeduplicateWithinWatermark?

https://issues.apache.org/jira/browse/SPARK-42931 introduced a new LogicalPlan DeduplicateWithinWatermark to deduplicate events within the watermark and was available in 3.5.0+
I am not clear the architecture of Flint, but seems it performs on Spark Streaming. Deduplication on streaming dataset has different semantic with batch dataset:

We document the behavior clearly that the event time column should be a part of the subset columns for deduplication to clean up the state, but it cannot be applied to the customers as timestamps are not exactly the same for duplicated events in their use cases.

Need more guidance on the purpose of this repository and how it should be used in production or in future. For current time-being, I choose Deduplicate in this PR (DeduplicateWithinWatermark required 3.5.0+).

Part 2

To translate dedup command with allowedDuplication > 1, such as | dedup 2 a,b to Spark plan, the solution is translating to a plan with Window function (e.g row_number) and a new column row_number_col as Filter.

  • For | dedup 2 a, b keepempty=false
Project ['a, 'b] // ensure the row_number_col is removed. not a and b only, should contain all fields except row_number_col
+- Filter ('row_number_col <= 2) // allowed duplication = 2
   +- Window [row_number() windowspecdefinition('a, 'b, 'order_key ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 'row_number_col], ['a, 'b], ['order_key ASC NULLS FIRST]
       +- Filter (isnotnull('a) AND isnotnull('b)) // keepempty=false
          +- Project
             +- UnresolvedRelation
  • For | dedup 2 a, b keepempty=true
Union
:- Project ['a, 'b]
:  +- Filter ('row_number_col <= 2)
:     +- Window [row_number() windowspecdefinition('a, 'b, 'order_key ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 'row_number_col], ['a, 'b], ['order_key ASC NULLS FIRST]
:        +- Filter (isnotnull('a) AND isnotnull('b))
:           +- Project
:              +- UnresolvedRelation
+- Filter (isnull('a) OR isnull('b))
   +- Project
      +- UnresolvedRelation

Question What is the 'order_key in Window node?

Option 1

AFAIK, there always should be a time series field existing for log analysis, such as _time or @timestamp. How to get it in AST?

Option 2

If we cannot point out an existing time series field as order_key, an option is change the the syntax of dedup command as following (make sure sql repo will change first)

| dedup 2 a, b order by c
| dedup 2 a, b orderkey=c

Option 3

partitioning and ordering by the same columns:

Window.partitionBy(dedupColumns).orderBy(dedupColumns)

If that, the results can be non-deterministic for large datasets. This non-determinism arises because the sorting does not have a secondary tie-breaking rule to ensure a consistent order of rows within each partition. If multiple rows have the same values for the partition and order columns, their relative order can vary across different runs

A complex case for option 3 is while we have a sorter before dedup command

| sort c | dedup 2 a,b

FYI: In Spark, window function requires window to be ordered. Query fails if order by clause is not specified. For example, SELECT <wf>(expr) OVER ([PARTITION BY window_partition] ORDER BY window_ordering) from table
But in MySQL or opensearch-sql, the order by clause can be omitted.

Sub-Tasks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request Lang:PPL Pipe Processing Language support
Projects
Status: Done
Development

No branches or pull requests

3 participants