Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into issues/838
Browse files Browse the repository at this point in the history
  • Loading branch information
LantaoJin committed Oct 31, 2024
2 parents c162259 + a2a9838 commit 119ace7
Show file tree
Hide file tree
Showing 21 changed files with 491 additions and 37 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test",
"com.github.sbt" % "junit-interface" % "0.13.3" % "test",
"org.projectlombok" % "lombok" % "1.18.30",
"com.github.seancfoley" % "ipaddress" % "5.5.1",
),
libraryDependencies ++= deps(sparkVersion),
// ANTLR settings
Expand Down
2 changes: 2 additions & 0 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ _- **Limitation: new field added by eval command with a function cannot be dropp
- `source = table | where a not in (1, 2, 3) | fields a,b,c`
- `source = table | where a between 1 and 4` - Note: This returns a >= 1 and a <= 4, i.e. [1, 4]
- `source = table | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10'
- `source = table | where cidrmatch(ip, '192.169.1.0/24')`
- `source = table | where cidrmatch(ipv6, '2003:db8::/32')`

```sql
source = table | eval status_category =
Expand Down
1 change: 1 addition & 0 deletions docs/ppl-lang/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ For additional examples see the next [documentation](PPL-Example-Commands.md).

- [`Cryptographic Functions`](functions/ppl-cryptographic.md)

- [`IP Address Functions`](functions/ppl-ip.md)

---
### PPL On Spark
Expand Down
35 changes: 35 additions & 0 deletions docs/ppl-lang/functions/ppl-ip.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
## PPL IP Address Functions

### `CIDRMATCH`

**Description**

`CIDRMATCH(ip, cidr)` checks if ip is within the specified cidr range.

**Argument type:**
- STRING, STRING
- Return type: **BOOLEAN**

Example:

os> source=ips | where cidrmatch(ip, '192.169.1.0/24') | fields ip
fetched rows / total rows = 1/1
+--------------+
| ip |
|--------------|
| 192.169.1.5 |
+--------------+

os> source=ipsv6 | where cidrmatch(ip, '2003:db8::/32') | fields ip
fetched rows / total rows = 1/1
+-----------------------------------------+
| ip |
|-----------------------------------------|
| 2003:0db8:0000:0000:0000:0000:0000:0000 |
+-----------------------------------------+

Note:
- `ip` can be an IPv4 or an IPv6 address
- `cidr` can be an IPv4 or an IPv6 block
- `ip` and `cidr` must be either both IPv4 or both IPv6
- `ip` and `cidr` must both be valid and non-empty/non-null
2 changes: 2 additions & 0 deletions docs/ppl-lang/ppl-where-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ PPL query:
- `source = table | where case(length(a) > 6, 'True' else 'False') = 'True'`
- `source = table | where a between 1 and 4` - Note: This returns a >= 1 and a <= 4, i.e. [1, 4]
- `source = table | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10'
- `source = table | where cidrmatch(ip, '192.169.1.0/24')`
- `source = table | where cidrmatch(ipv6, '2003:db8::/32')`
- `source = table | eval status_category =
case(a >= 200 AND a < 300, 'Success',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,16 @@ public final class MetricConstants {
*/
public static final String CHECKPOINT_DELETE_TIME_METRIC = "checkpoint.delete.processingTime";

/**
* Metric prefix for tracking the index state transitions
*/
public static final String INDEX_STATE_UPDATED_TO_PREFIX = "indexState.updatedTo.";

/**
* Metric for tracking the index state transitions
*/
public static final String INITIAL_CONDITION_CHECK_FAILED_PREFIX = "initialConditionCheck.failed.";

private MetricConstants() {
// Private constructor to prevent instantiation
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.opensearch.flint.common.metadata.log.FlintMetadataLog;
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.common.metadata.log.OptimisticTransaction;
import org.opensearch.flint.core.metrics.MetricConstants;
import org.opensearch.flint.core.metrics.MetricsUtil;

/**
* Default optimistic transaction implementation that captures the basic workflow for
Expand Down Expand Up @@ -73,6 +75,7 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
// Perform initial log check
if (!initialCondition.test(latest)) {
LOG.warning("Initial log entry doesn't satisfy precondition " + latest);
emitConditionCheckFailedMetric(latest);
throw new IllegalStateException(
String.format("Index state [%s] doesn't satisfy precondition", latest.state()));
}
Expand Down Expand Up @@ -104,6 +107,7 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
metadataLog.purge();
} else {
metadataLog.add(finalLog);
emitFinalLogStateMetric(finalLog);
}
return result;
} catch (Exception e) {
Expand All @@ -119,4 +123,12 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
throw new IllegalStateException("Failed to commit transaction operation", e);
}
}

private void emitConditionCheckFailedMetric(FlintMetadataLogEntry latest) {
MetricsUtil.addHistoricGauge(MetricConstants.INITIAL_CONDITION_CHECK_FAILED_PREFIX + latest.state() + ".count", 1);
}

private void emitFinalLogStateMetric(FlintMetadataLogEntry finalLog) {
MetricsUtil.addHistoricGauge(MetricConstants.INDEX_STATE_UPDATED_TO_PREFIX + finalLog.state() + ".count", 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.flint.common.metadata.log.FlintMetadataLog;
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry;

import java.util.Optional;
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState$;
import org.opensearch.flint.core.metrics.MetricsTestUtil;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

@ExtendWith(MockitoExtension.class)
class DefaultOptimisticTransactionMetricTest {

@Mock
private FlintMetadataLog<FlintMetadataLogEntry> metadataLog;

@Mock
private FlintMetadataLogEntry logEntry;

@InjectMocks
private DefaultOptimisticTransaction<String> transaction;

@Test
void testCommitWithValidInitialCondition() throws Exception {
MetricsTestUtil.withMetricEnv(verifier -> {
when(metadataLog.getLatest()).thenReturn(Optional.of(logEntry));
when(metadataLog.add(any(FlintMetadataLogEntry.class))).thenReturn(logEntry);
when(logEntry.state()).thenReturn(IndexState$.MODULE$.ACTIVE());

transaction.initialLog(entry -> true)
.transientLog(entry -> logEntry)
.finalLog(entry -> logEntry)
.commit(entry -> "Success");

verify(metadataLog, times(2)).add(logEntry);
verifier.assertHistoricGauge("indexState.updatedTo.active.count", 1);
});
}

@Test
void testConditionCheckFailed() throws Exception {
MetricsTestUtil.withMetricEnv(verifier -> {
when(metadataLog.getLatest()).thenReturn(Optional.of(logEntry));
when(logEntry.state()).thenReturn(IndexState$.MODULE$.DELETED());

transaction.initialLog(entry -> false)
.finalLog(entry -> logEntry);

assertThrows(IllegalStateException.class, () -> {
transaction.commit(entry -> "Should Fail");
});
verifier.assertHistoricGauge("initialConditionCheck.failed.deleted.count", 1);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -669,4 +669,30 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
| (11, null, false)
| """.stripMargin)
}

protected def createIpAddressTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
| (
| id INT,
| ipAddress STRING,
| isV6 BOOLEAN,
| isValid BOOLEAN
| )
| USING $tableType $tableOptions
|""".stripMargin)

sql(s"""
| INSERT INTO $testTable
| VALUES (1, '127.0.0.1', false, true),
| (2, '192.168.1.0', false, true),
| (3, '192.168.1.1', false, true),
| (4, '192.168.2.1', false, true),
| (5, '192.168.2.', false, false),
| (6, '2001:db8::ff00:12:3455', true, true),
| (7, '2001:db8::ff00:12:3456', true, true),
| (8, '2001:db8::ff00:13:3457', true, true),
| (9, '2001:db8::ff00:12:', true, false)
| """.stripMargin)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.apache.spark.SparkException
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLCidrmatchITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
with StreamTest {

/** Test table and index name */
private val testTable = "spark_catalog.default.flint_ppl_test"

override def beforeAll(): Unit = {
super.beforeAll()

// Create test table
createIpAddressTable(testTable)
}

protected override def afterEach(): Unit = {
super.afterEach()
// Stop all streaming jobs if any
spark.streams.active.foreach { job =>
job.stop()
job.awaitTermination()
}
}

test("test cidrmatch for ipv4 for 192.168.1.0/24") {
val frame = sql(s"""
| source = $testTable | where isV6 = false and isValid = true and cidrmatch(ipAddress, '192.168.1.0/24')
| """.stripMargin)

val results = frame.collect()
assert(results.length == 2)
}

test("test cidrmatch for ipv4 for 192.169.1.0/24") {
val frame = sql(s"""
| source = $testTable | where isV6 = false and isValid = true and cidrmatch(ipAddress, '192.169.1.0/24')
| """.stripMargin)

val results = frame.collect()
assert(results.length == 0)
}

test("test cidrmatch for ipv6 for 2001:db8::/32") {
val frame = sql(s"""
| source = $testTable | where isV6 = true and isValid = true and cidrmatch(ipAddress, '2001:db8::/32')
| """.stripMargin)

val results = frame.collect()
assert(results.length == 3)
}

test("test cidrmatch for ipv6 for 2003:db8::/32") {
val frame = sql(s"""
| source = $testTable | where isV6 = true and isValid = true and cidrmatch(ipAddress, '2003:db8::/32')
| """.stripMargin)

val results = frame.collect()
assert(results.length == 0)
}

test("test cidrmatch for ipv6 with ipv4 cidr") {
val frame = sql(s"""
| source = $testTable | where isV6 = true and isValid = true and cidrmatch(ipAddress, '192.169.1.0/24')
| """.stripMargin)

assertThrows[SparkException](frame.collect())
}

test("test cidrmatch for invalid ipv4 addresses") {
val frame = sql(s"""
| source = $testTable | where isV6 = false and isValid = false and cidrmatch(ipAddress, '192.169.1.0/24')
| """.stripMargin)

assertThrows[SparkException](frame.collect())
}

test("test cidrmatch for invalid ipv6 addresses") {
val frame = sql(s"""
| source = $testTable | where isV6 = true and isValid = false and cidrmatch(ipAddress, '2003:db8::/32')
| """.stripMargin)

assertThrows[SparkException](frame.collect())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ ISNULL: 'ISNULL';
ISNOTNULL: 'ISNOTNULL';
ISPRESENT: 'ISPRESENT';
BETWEEN: 'BETWEEN';
CIDRMATCH: 'CIDRMATCH';

// FLOWCONTROL FUNCTIONS
IFNULL: 'IFNULL';
Expand Down
6 changes: 6 additions & 0 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ booleanExpression
| isEmptyExpression # isEmptyExpr
| valueExpressionList NOT? IN LT_SQR_PRTHS subSearch RT_SQR_PRTHS # inSubqueryExpr
| EXISTS LT_SQR_PRTHS subSearch RT_SQR_PRTHS # existsSubqueryExpr
| cidrMatchFunctionCall # cidrFunctionCallExpr
;

isEmptyExpression
Expand Down Expand Up @@ -520,6 +521,10 @@ booleanFunctionCall
: conditionFunctionBase LT_PRTHS functionArgs RT_PRTHS
;

cidrMatchFunctionCall
: CIDRMATCH LT_PRTHS ipAddress = functionArg COMMA cidrBlock = functionArg RT_PRTHS
;

convertedDataType
: typeName = DATE
| typeName = TIME
Expand Down Expand Up @@ -1119,4 +1124,5 @@ keywordsCanBeId
| SEMI
| ANTI
| BETWEEN
| CIDRMATCH
;
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.sql.ast.expression.AttributeList;
import org.opensearch.sql.ast.expression.Between;
import org.opensearch.sql.ast.expression.Case;
import org.opensearch.sql.ast.expression.Cidr;
import org.opensearch.sql.ast.expression.Compare;
import org.opensearch.sql.ast.expression.EqualTo;
import org.opensearch.sql.ast.expression.Field;
Expand Down Expand Up @@ -322,4 +323,7 @@ public T visitExistsSubquery(ExistsSubquery node, C context) {
public T visitWindow(Window node, C context) {
return visitChildren(node, context);
}
public T visitCidr(Cidr node, C context) {
return visitChildren(node, context);
}
}
Loading

0 comments on commit 119ace7

Please sign in to comment.