Skip to content

Commit

Permalink
Merge branch 'main' into ppl-fieldsummary-command
Browse files Browse the repository at this point in the history
  • Loading branch information
YANG-DB committed Oct 15, 2024
2 parents 21924ae + 0b106f1 commit a9e7c6e
Show file tree
Hide file tree
Showing 46 changed files with 1,983 additions and 394 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ lazy val flintCore = (project in file("flint-core"))
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
"com.amazonaws" % "aws-java-sdk-cloudwatch" % "1.12.593"
exclude("com.fasterxml.jackson.core", "jackson-databind"),
"software.amazon.awssdk" % "auth-crt" % "2.28.10" % "provided",
"software.amazon.awssdk" % "auth-crt" % "2.28.10",
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
Expand Down Expand Up @@ -117,7 +117,7 @@ lazy val flintCommons = (project in file("flint-commons"))
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test",
"org.projectlombok" % "lombok" % "1.18.30",
"org.projectlombok" % "lombok" % "1.18.30" % "provided",
),
libraryDependencies ++= deps(sparkVersion),
publish / skip := true,
Expand Down
66 changes: 20 additions & 46 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

#### **Describe**
- `describe table` This command is equal to the `DESCRIBE EXTENDED table` SQL command
- `describe schema.table`
- `` describe schema.`table` ``
- `describe catalog.schema.table`
- `` describe catalog.schema.`table` ``
- `` describe `catalog`.`schema`.`table` ``

#### **Explain**
- `explain simple | source = table | where a = 1 | fields a,b,c`
Expand Down Expand Up @@ -268,7 +273,7 @@ _- **Limitation: "REPLACE" or "APPEND" clause must contain "AS"**_

**SQL Migration examples with IN-Subquery PPL:**

1. tpch q4 (in-subquery with aggregation)
tpch q4 (in-subquery with aggregation)
```sql
select
o_orderpriority,
Expand Down Expand Up @@ -304,52 +309,21 @@ source = orders
| fields o_orderpriority, order_count
```

2.tpch q20 (nested in-subquery)
```sql
select
s_name,
s_address
from
supplier,
nation
where
s_suppkey in (
select
ps_suppkey
from
partsupp
where
ps_partkey in (
select
p_partkey
from
part
where
p_name like 'forest%'
)
)
and s_nationkey = n_nationkey
and n_name = 'CANADA'
order by
s_name
```
#### **ExistsSubquery**
[See additional command details](ppl-subquery-command.md)

Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table inner, `e`, `f` are fields of table inner2
- `source = outer | where exists [ source = inner | where a = c ]`
- `source = outer | where not exists [ source = inner | where a = c ]`
- `source = outer | where exists [ source = inner | where a = c and b = d ]`
- `source = outer | where not exists [ source = inner | where a = c and b = d ]`
- `source = outer | where exists [ source = inner1 | where a = c and exists [ source = inner2 | where c = e ] ]` (nested)
- `source = outer | where exists [ source = inner1 | where a = c | where exists [ source = inner2 | where c = e ] ]` (nested)
- `source = outer | where exists [ source = inner | where c > 10 ]` (uncorrelated exists)
- `source = outer | where not exists [ source = inner | where c > 10 ]` (uncorrelated exists)
- `source = outer | where exists [ source = inner ] | eval l = "Bala" | fields l` (special uncorrelated exists)


Rewritten by PPL InSubquery query:
```sql
source = supplier
| where s_suppkey IN [
source = partsupp
| where ps_partkey IN [
source = part
| where like(p_name, "forest%")
| fields p_partkey
]
| fields ps_suppkey
]
| inner join left=l right=r on s_nationkey = n_nationkey and n_name = 'CANADA'
nation
| sort s_name
```
#### **ScalarSubquery**
[See additional command details](ppl-subquery-command.md)

Expand Down
90 changes: 84 additions & 6 deletions docs/ppl-lang/ppl-subquery-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,58 @@ source = supplier
| sort s_name
```

**ExistsSubquery usage**

Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table inner, `e`, `f` are fields of table inner2

- `source = outer | where exists [ source = inner | where a = c ]`
- `source = outer | where not exists [ source = inner | where a = c ]`
- `source = outer | where exists [ source = inner | where a = c and b = d ]`
- `source = outer | where not exists [ source = inner | where a = c and b = d ]`
- `source = outer | where exists [ source = inner1 | where a = c and exists [ source = inner2 | where c = e ] ]` (nested)
- `source = outer | where exists [ source = inner1 | where a = c | where exists [ source = inner2 | where c = e ] ]` (nested)
- `source = outer | where exists [ source = inner | where c > 10 ]` (uncorrelated exists)
- `source = outer | where not exists [ source = inner | where c > 10 ]` (uncorrelated exists)
- `source = outer | where exists [ source = inner ] | eval l = "nonEmpty" | fields l` (special uncorrelated exists)

**_SQL Migration examples with Exists-Subquery PPL:_**

tpch q4 (exists subquery with aggregation)
```sql
select
o_orderpriority,
count(*) as order_count
from
orders
where
o_orderdate >= date '1993-07-01'
and o_orderdate < date '1993-07-01' + interval '3' month
and exists (
select
l_orderkey
from
lineitem
where l_orderkey = o_orderkey
and l_commitdate < l_receiptdate
)
group by
o_orderpriority
order by
o_orderpriority
```
Rewritten by PPL ExistsSubquery query:
```sql
source = orders
| where o_orderdate >= "1993-07-01" and o_orderdate < "1993-10-01"
and exists [
source = lineitem
| where l_orderkey = o_orderkey and l_commitdate < l_receiptdate
]
| stats count(1) as order_count by o_orderpriority
| sort o_orderpriority
| fields o_orderpriority, order_count
```

**ScalarSubquery usage**

Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table inner, `e`, `f` are fields of table nested
Expand Down Expand Up @@ -191,14 +243,14 @@ source = spark_catalog.default.outer

### **Additional Context**

The most cases in the description is to request a `InSubquery` expression.
`InSubquery`, `ExistsSubquery` and `ScalarSubquery` are all subquery expression. The common usage of subquery expression is in `where` clause:

The `where` command syntax is:

```
| where <boolean expression>
```
So the subquery in description is part of boolean expression, such as
So the subquery is part of boolean expression, such as

```sql
| where orders.order_id in (subquery source=returns | where return_reason="damaged" | return order_id)
Expand All @@ -217,10 +269,11 @@ In issue description is a `ScalarSubquery`:
```sql
source=employees
| join source=sales on employees.employee_id = sales.employee_id
| where sales.sale_amount > (subquery source=targets | where target_met="true" | return target_value)
| where sales.sale_amount > [ source=targets | where target_met="true" | fields target_value ]
```

Recall the join command doc: https://github.com/opensearch-project/opensearch-spark/blob/main/docs/PPL-Join-command.md#more-examples, the example is a subquery/subsearch **plan**, rather than a **expression**.
But `RelationSubquery` is not a subquery expression, it is a subquery plan.
[Recall the join command doc](ppl-join-command.md), the example is a subquery/subsearch **plan**, rather than a **expression**.

```sql
SEARCH source=customer
Expand All @@ -245,7 +298,32 @@ SEARCH <leftPlan>
Apply the syntax here and simply into

```sql
search <leftPlan> | left join on <condition> (subquery search ...)
search <leftPlan> | left join on <condition> [ search ... ]
```

The `(subquery search ...)` is not a `expression`, it's `plan`, similar to the `relation` plan
The `[ search ...]` is not a `expression`, it's `plan`, similar to the `relation` plan

**Uncorrelated Subquery**

An uncorrelated subquery is independent of the outer query. It is executed once, and the result is used by the outer query.
It's **less common** when using `ExistsSubquery` because `ExistsSubquery` typically checks for the presence of rows that are dependent on the outer query’s row.

There is a very special exists subquery which highlight by `(special uncorrelated exists)`:
```sql
SELECT 'nonEmpty'
FROM outer
WHERE EXISTS (
SELECT *
FROM inner
);
```
Rewritten by PPL ExistsSubquery query:
```sql
source = outer
| where exists [
source = inner
]
| eval l = "nonEmpty"
| fields l
```
This query just print "nonEmpty" if the inner table is not empty.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.codahale.metrics.Timer;
import org.apache.spark.SparkEnv;
import org.apache.spark.metrics.source.FlintMetricSource;
import org.apache.spark.metrics.source.FlintIndexMetricSource;
import org.apache.spark.metrics.source.Source;
import scala.collection.Seq;

Expand All @@ -33,10 +34,20 @@ private MetricsUtil() {
* If the counter does not exist, it is created before being incremented.
*
* @param metricName The name of the metric for which the counter is incremented.
* This name is used to retrieve or create the counter.
*/
public static void incrementCounter(String metricName) {
Counter counter = getOrCreateCounter(metricName);
incrementCounter(metricName, false);
}

/**
* Increments the Counter metric associated with the given metric name.
* If the counter does not exist, it is created before being incremented.
*
* @param metricName The name of the metric for which the counter is incremented.
* @param isIndexMetric Whether this metric is an index-specific metric.
*/
public static void incrementCounter(String metricName, boolean isIndexMetric) {
Counter counter = getOrCreateCounter(metricName, isIndexMetric);
if (counter != null) {
counter.inc();
}
Expand All @@ -48,29 +59,48 @@ public static void incrementCounter(String metricName) {
* @param metricName The name of the metric counter to be decremented.
*/
public static void decrementCounter(String metricName) {
Counter counter = getOrCreateCounter(metricName);
decrementCounter(metricName, false);
}

/**
* Decrements the value of the specified metric counter by one, if the counter exists and its current count is greater than zero.
*
* @param metricName The name of the metric counter to be decremented.
* @param isIndexMetric Whether this metric is an index-specific metric.
*/
public static void decrementCounter(String metricName, boolean isIndexMetric) {
Counter counter = getOrCreateCounter(metricName, isIndexMetric);
if (counter != null && counter.getCount() > 0) {
counter.dec();
}
}

/**
* Retrieves a {@link Timer.Context} for the specified metric name, creating a new timer if one does not already exist.
* This context can be used to measure the duration of a particular operation or event.
*
* @param metricName The name of the metric timer to retrieve the context for.
* @return A {@link Timer.Context} instance for timing operations, or {@code null} if the timer could not be created or retrieved.
*/
public static Timer.Context getTimerContext(String metricName) {
Timer timer = getOrCreateTimer(metricName);
return getTimerContext(metricName, false);
}

/**
* Retrieves a {@link Timer.Context} for the specified metric name, creating a new timer if one does not already exist.
*
* @param metricName The name of the metric timer to retrieve the context for.
* @param isIndexMetric Whether this metric is an index-specific metric.
* @return A {@link Timer.Context} instance for timing operations, or {@code null} if the timer could not be created or retrieved.
*/
public static Timer.Context getTimerContext(String metricName, boolean isIndexMetric) {
Timer timer = getOrCreateTimer(metricName, isIndexMetric);
return timer != null ? timer.time() : null;
}

/**
* Stops the timer associated with the given {@link Timer.Context}, effectively recording the elapsed time since the timer was started
* and returning the duration. If the context is {@code null}, this method does nothing and returns {@code null}.
* Stops the timer associated with the given {@link Timer.Context}.
*
* @param context The {@link Timer.Context} to stop. May be {@code null}, in which case this method has no effect and returns {@code null}.
* @param context The {@link Timer.Context} to stop. May be {@code null}.
* @return The elapsed time in nanoseconds since the timer was started, or {@code null} if the context was {@code null}.
*/
public static Long stopTimer(Timer.Context context) {
Expand All @@ -79,53 +109,61 @@ public static Long stopTimer(Timer.Context context) {

/**
* Registers a gauge metric with the provided name and value.
* The gauge will reflect the current value of the AtomicInteger provided.
*
* @param metricName The name of the gauge metric to register.
* @param value The AtomicInteger whose current value should be reflected by the gauge.
* @param value The AtomicInteger whose current value should be reflected by the gauge.
*/
public static void registerGauge(String metricName, final AtomicInteger value) {
MetricRegistry metricRegistry = getMetricRegistry();
registerGauge(metricName, value, false);
}

/**
* Registers a gauge metric with the provided name and value.
*
* @param metricName The name of the gauge metric to register.
* @param value The AtomicInteger whose current value should be reflected by the gauge.
* @param isIndexMetric Whether this metric is an index-specific metric.
*/
public static void registerGauge(String metricName, final AtomicInteger value, boolean isIndexMetric) {
MetricRegistry metricRegistry = getMetricRegistry(isIndexMetric);
if (metricRegistry == null) {
LOG.warning("MetricRegistry not available, cannot register gauge: " + metricName);
return;
}
metricRegistry.register(metricName, (Gauge<Integer>) value::get);
}

// Retrieves or creates a new counter for the given metric name
private static Counter getOrCreateCounter(String metricName) {
MetricRegistry metricRegistry = getMetricRegistry();
private static Counter getOrCreateCounter(String metricName, boolean isIndexMetric) {
MetricRegistry metricRegistry = getMetricRegistry(isIndexMetric);
return metricRegistry != null ? metricRegistry.counter(metricName) : null;
}

// Retrieves or creates a new Timer for the given metric name
private static Timer getOrCreateTimer(String metricName) {
MetricRegistry metricRegistry = getMetricRegistry();
private static Timer getOrCreateTimer(String metricName, boolean isIndexMetric) {
MetricRegistry metricRegistry = getMetricRegistry(isIndexMetric);
return metricRegistry != null ? metricRegistry.timer(metricName) : null;
}

// Retrieves the MetricRegistry from the current Spark environment.
private static MetricRegistry getMetricRegistry() {
private static MetricRegistry getMetricRegistry(boolean isIndexMetric) {
SparkEnv sparkEnv = SparkEnv.get();
if (sparkEnv == null) {
LOG.warning("Spark environment not available, cannot access MetricRegistry.");
return null;
}

FlintMetricSource flintMetricSource = getOrInitFlintMetricSource(sparkEnv);
return flintMetricSource.metricRegistry();
Source metricSource = isIndexMetric ?
getOrInitMetricSource(sparkEnv, FlintMetricSource.FLINT_INDEX_METRIC_SOURCE_NAME(), FlintIndexMetricSource::new) :
getOrInitMetricSource(sparkEnv, FlintMetricSource.FLINT_METRIC_SOURCE_NAME(), FlintMetricSource::new);
return metricSource.metricRegistry();
}

// Gets or initializes the FlintMetricSource
private static FlintMetricSource getOrInitFlintMetricSource(SparkEnv sparkEnv) {
Seq<Source> metricSourceSeq = sparkEnv.metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME());
private static Source getOrInitMetricSource(SparkEnv sparkEnv, String sourceName, java.util.function.Supplier<Source> sourceSupplier) {
Seq<Source> metricSourceSeq = sparkEnv.metricsSystem().getSourcesByName(sourceName);

if (metricSourceSeq == null || metricSourceSeq.isEmpty()) {
FlintMetricSource metricSource = new FlintMetricSource();
Source metricSource = sourceSupplier.get();
sparkEnv.metricsSystem().registerSource(metricSource);
return metricSource;
}
return (FlintMetricSource) metricSourceSeq.head();
return metricSourceSeq.head();
}
}
Loading

0 comments on commit a9e7c6e

Please sign in to comment.