Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
Swiddis committed Oct 30, 2024
2 parents b02e574 + a07f88f commit 3595c7d
Show file tree
Hide file tree
Showing 71 changed files with 2,146 additions and 254 deletions.
1 change: 1 addition & 0 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ _List any issues this PR will resolve, e.g. Resolves [...]._
- [ ] Updated documentation (docs/ppl-lang/README.md)
- [ ] Implemented unit tests
- [ ] Implemented tests for combination with other commands
- [ ] New added source code should include a copyright header
- [ ] Commits are signed per the DCO using `--signoff`

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
Expand Down
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.flint.monitor.initialDelaySeconds`: Initial delay in seconds before starting the monitoring task. Default value is 15.
- `spark.flint.monitor.intervalSeconds`: Interval in seconds for scheduling the monitoring task. Default value is 60.
- `spark.flint.monitor.maxErrorCount`: Maximum number of consecutive errors allowed before stopping the monitoring task. Default value is 5.
- `spark.flint.metadataCacheWrite.enabled`: default is false. enable writing metadata to index mappings _meta as read cache for frontend user to access. Do not use in production, this setting will be removed in later version.

#### Data Type Mapping

Expand Down
6 changes: 4 additions & 2 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ _- **Limitation: new field added by eval command with a function cannot be dropp
- `source = table | where isempty(a)`
- `source = table | where isblank(a)`
- `source = table | where case(length(a) > 6, 'True' else 'False') = 'True'`
- `source = table | where a not in (1, 2, 3) | fields a,b,c`
- `source = table | where a between 1 and 4` - Note: This returns a >= 1 and a <= 4, i.e. [1, 4]
- `source = table | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10'

```sql
source = table | eval status_category =
Expand Down Expand Up @@ -431,12 +434,11 @@ _- **Limitation: another command usage of (relation) subquery is in `appendcols`
### Planned Commands:

#### **fillnull**

[See additional command details](ppl-fillnull-command.md)
```sql
- `source=accounts | fillnull fields status_code=101`
- `source=accounts | fillnull fields request_path='/not_found', timestamp='*'`
- `source=accounts | fillnull using field1=101`
- `source=accounts | fillnull using field1=concat(field2, field3), field4=2*pi()*field5`
- `source=accounts | fillnull using field1=concat(field2, field3), field4=2*pi()*field5, field6 = 'N/A'`
```
[See additional command details](planning/ppl-fillnull-command.md)
2 changes: 1 addition & 1 deletion docs/ppl-lang/functions/ppl-expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ OR operator :
NOT operator :
os> source=accounts | where not age in (32, 33) | fields age ;
os> source=accounts | where age not in (32, 33) | fields age ;
fetched rows / total rows = 2/2
+-------+
| age |
Expand Down
17 changes: 17 additions & 0 deletions docs/ppl-lang/planning/ppl-between.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
## between syntax proposal

1. **Proposed syntax**
- `... | where expr1 [NOT] BETWEEN expr2 AND expr3`
- evaluate if expr1 is [not] in between expr2 and expr3
- `... | where a between 1 and 4` - Note: This returns a >= 1 and a <= 4, i.e. [1, 4]
- `... | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10'

### New syntax definition in ANTLR

```ANTLR
logicalExpression
...
| expr1 = functionArg NOT? BETWEEN expr2 = functionArg AND expr3 = functionArg # between
```
2 changes: 2 additions & 0 deletions docs/ppl-lang/ppl-eval-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ Assumptions: `a`, `b`, `c` are existing fields in `table`
- `source = table | eval f = case(a = 0, 'zero', a = 1, 'one', a = 2, 'two', a = 3, 'three', a = 4, 'four', a = 5, 'five', a = 6, 'six', a = 7, 'se7en', a = 8, 'eight', a = 9, 'nine')`
- `source = table | eval f = case(a = 0, 'zero', a = 1, 'one' else 'unknown')`
- `source = table | eval f = case(a = 0, 'zero', a = 1, 'one' else concat(a, ' is an incorrect binary digit'))`
- `source = table | eval f = a in ('foo', 'bar') | fields f`
- `source = table | eval f = a not in ('foo', 'bar') | fields f`

Eval with `case` example:
```sql
Expand Down
92 changes: 92 additions & 0 deletions docs/ppl-lang/ppl-fillnull-command.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
## PPL `fillnull` command

### Description
Using ``fillnull`` command to fill null with provided value in one or more fields in the search result.


### Syntax
`fillnull [with <null-replacement> in <nullable-field>["," <nullable-field>]] | [using <source-field> = <null-replacement> [","<source-field> = <null-replacement>]]`

* null-replacement: mandatory. The value used to replace `null`s.
* nullable-field: mandatory. Field reference. The `null` values in the field referred to by the property will be replaced with the values from the null-replacement.


### Example 1: fillnull one field

The example show fillnull one field.

PPL query:

os> source=logs | fields status_code | eval input=status_code | fillnull value = 0 status_code;
| input | status_code |
|-------|-------------|
| 403 | 403 |
| 403 | 403 |
| NULL | 0 |
| NULL | 0 |
| 200 | 200 |
| 404 | 404 |
| 500 | 500 |
| NULL | 0 |
| 500 | 500 |
| 404 | 404 |
| 200 | 200 |
| 500 | 500 |
| NULL | 0 |
| NULL | 0 |
| 404 | 404 |


### Example 2: fillnull applied to multiple fields

The example show fillnull applied to multiple fields.

PPL query:

os> source=logs | fields request_path, timestamp | eval input_request_path=request_path, input_timestamp = timestamp | fillnull value = '???' request_path, timestamp;
| input_request_path | input_timestamp | request_path | timestamp |
|--------------------|-----------------------|--------------|------------------------|
| /contact | NULL | /contact | ??? |
| /home | NULL | /home | ??? |
| /about | 2023-10-01 10:30:00 | /about | 2023-10-01 10:30:00 |
| /home | 2023-10-01 10:15:00 | /home | 2023-10-01 10:15:00 |
| NULL | 2023-10-01 10:20:00 | ??? | 2023-10-01 10:20:00 |
| NULL | 2023-10-01 11:05:00 | ??? | 2023-10-01 11:05:00 |
| /about | NULL | /about | ??? |
| /home | 2023-10-01 10:00:00 | /home | 2023-10-01 10:00:00 |
| /contact | NULL | /contact | ??? |
| NULL | 2023-10-01 10:05:00 | ??? | 2023-10-01 10:05:00 |
| NULL | 2023-10-01 10:50:00 | ??? | 2023-10-01 10:50:00 |
| /services | NULL | /services | ??? |
| /home | 2023-10-01 10:45:00 | /home | 2023-10-01 10:45:00 |
| /services | 2023-10-01 11:00:00 | /services | 2023-10-01 11:00:00 |
| NULL | 2023-10-01 10:35:00 | ??? | 2023-10-01 10:35:00 |

### Example 3: fillnull applied to multiple fields with various `null` replacement values

The example show fillnull with various values used to replace `null`s.
- `/error` in `request_path` field
- `1970-01-01 00:00:00` in `timestamp` field

PPL query:

os> source=logs | fields request_path, timestamp | eval input_request_path=request_path, input_timestamp = timestamp | fillnull using request_path = '/error', timestamp='1970-01-01 00:00:00';


| input_request_path | input_timestamp | request_path | timestamp |
|--------------------|-----------------------|--------------|------------------------|
| /contact | NULL | /contact | 1970-01-01 00:00:00 |
| /home | NULL | /home | 1970-01-01 00:00:00 |
| /about | 2023-10-01 10:30:00 | /about | 2023-10-01 10:30:00 |
| /home | 2023-10-01 10:15:00 | /home | 2023-10-01 10:15:00 |
| NULL | 2023-10-01 10:20:00 | /error | 2023-10-01 10:20:00 |
| NULL | 2023-10-01 11:05:00 | /error | 2023-10-01 11:05:00 |
| /about | NULL | /about | 1970-01-01 00:00:00 |
| /home | 2023-10-01 10:00:00 | /home | 2023-10-01 10:00:00 |
| /contact | NULL | /contact | 1970-01-01 00:00:00 |
| NULL | 2023-10-01 10:05:00 | /error | 2023-10-01 10:05:00 |
| NULL | 2023-10-01 10:50:00 | /error | 2023-10-01 10:50:00 |
| /services | NULL | /services | 1970-01-01 00:00:00 |
| /home | 2023-10-01 10:45:00 | /home | 2023-10-01 10:45:00 |
| /services | 2023-10-01 11:00:00 | /services | 2023-10-01 11:00:00 |
| NULL | 2023-10-01 10:35:00 | /error | 2023-10-01 10:35:00 |
2 changes: 2 additions & 0 deletions docs/ppl-lang/ppl-where-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ PPL query:
- `source = table | where isempty(a)`
- `source = table | where isblank(a)`
- `source = table | where case(length(a) > 6, 'True' else 'False') = 'True'`
- `source = table | where a between 1 and 4` - Note: This returns a >= 1 and a <= 4, i.e. [1, 4]
- `source = table | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10'
- `source = table | eval status_category =
case(a >= 200 AND a < 300, 'Success',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState
* log entry id
* @param state
* Flint index state
* @param lastRefreshStartTime
* timestamp when last refresh started for manual or external scheduler refresh
* @param lastRefreshCompleteTime
* timestamp when last refresh completed for manual or external scheduler refresh
* @param entryVersion
* entry version fields for consistency control
* @param error
Expand All @@ -28,10 +32,12 @@ import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState
case class FlintMetadataLogEntry(
id: String,
/**
* This is currently used as streaming job start time. In future, this should represent the
* create timestamp of the log entry
* This is currently used as streaming job start time for internal scheduler. In future, this
* should represent the create timestamp of the log entry
*/
createTime: Long,
lastRefreshStartTime: Long,
lastRefreshCompleteTime: Long,
state: IndexState,
entryVersion: Map[String, Any],
error: String,
Expand All @@ -40,26 +46,48 @@ case class FlintMetadataLogEntry(
def this(
id: String,
createTime: Long,
lastRefreshStartTime: Long,
lastRefreshCompleteTime: Long,
state: IndexState,
entryVersion: JMap[String, Any],
error: String,
properties: JMap[String, Any]) = {
this(id, createTime, state, entryVersion.asScala.toMap, error, properties.asScala.toMap)
this(
id,
createTime,
lastRefreshStartTime,
lastRefreshCompleteTime,
state,
entryVersion.asScala.toMap,
error,
properties.asScala.toMap)
}

def this(
id: String,
createTime: Long,
lastRefreshStartTime: Long,
lastRefreshCompleteTime: Long,
state: IndexState,
entryVersion: JMap[String, Any],
error: String,
properties: Map[String, Any]) = {
this(id, createTime, state, entryVersion.asScala.toMap, error, properties)
this(
id,
createTime,
lastRefreshStartTime,
lastRefreshCompleteTime,
state,
entryVersion.asScala.toMap,
error,
properties)
}
}

object FlintMetadataLogEntry {

val EMPTY_TIMESTAMP = 0L

/**
* Flint index state enum.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ static void recordOperationSuccess(String metricNamePrefix) {
MetricsUtil.incrementCounter(successMetricName);
}

static void recordLatency(String metricNamePrefix, long latencyMilliseconds) {
MetricsUtil.addHistoricGauge(metricNamePrefix + ".processingTime", latencyMilliseconds);
}

/**
* Records the failure of an OpenSearch operation by incrementing the corresponding metric counter.
* If the exception is an OpenSearchException with a specific status code (e.g., 403),
Expand All @@ -107,6 +111,8 @@ static void recordOperationFailure(String metricNamePrefix, Exception e) {
if (statusCode == 403) {
String forbiddenErrorMetricName = metricNamePrefix + ".403.count";
MetricsUtil.incrementCounter(forbiddenErrorMetricName);
} else if (statusCode == 429) {
MetricsUtil.incrementCounter(metricNamePrefix + ".429.count");
}

String failureMetricName = metricNamePrefix + "." + (statusCode / 100) + "xx.count";
Expand Down
Loading

0 comments on commit 3595c7d

Please sign in to comment.