Skip to content

Commit

Permalink
Raw Vpc schema integration (1.0.0 parquet ) (opensearch-project#1853)
Browse files Browse the repository at this point in the history
* revert default vpc flow logs integration into standard vpc schema

Signed-off-by: YANGDB <[email protected]>

* update sample queries

---------

Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB authored and RyanL1997 committed Jun 5, 2024
1 parent 441168e commit 60c6464
Show file tree
Hide file tree
Showing 17 changed files with 109 additions and 361 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"license": "Apache-2.0",
"type": "logs_vpc",
"labels": ["Observability", "Logs", "AWS", "Cloud", "Flint S3"],
"author": "Haidong Wang",
"author": "OpenSearch",
"sourceUrl": "https://github.com/opensearch-project/dashboards-observability/tree/main/server/adaptors/integrations/__data__/repository/aws_vpc_flow/info",
"workflows": [
{
Expand All @@ -26,18 +26,6 @@
"label": "Dashboards & Visualizations For Flint Integrations using live queries",
"description": "Dashboards and visualizations aligned with Flint S3 datasource ",
"enabled_by_default": false
},
{
"name": "flint-pre-agg-dashboards",
"label": "Dashboards & Visualizations For Flint Integrations using pre-aggregated queries",
"description": "This step creates the MV pre-aggregated queries without running them, in order to actually update their data select the following `flint-pre-agg-refresh` workflow option ",
"enabled_by_default": false
},
{
"name": "flint-pre-agg-refresh",
"label": "Refreshing and populate the pre-aggregated projections ",
"description": "This step populate the pre-aggregated projections by enabling the REFRESH command to run, this step depends on selection of the previous `flint-pre-agg-dashboards` step",
"enabled_by_default": false
}
],
"statics": {
Expand Down Expand Up @@ -76,97 +64,46 @@
],
"assets": [
{
"name": "aws_vpc_flow",
"version": "1.0.0",
"extension": "ndjson",
"type": "savedObjectBundle",
"workflows": ["dashboards"]
},
{
"name": "aws_vpc_flow_flint-live",
"version": "1.0.0",
"extension": "ndjson",
"type": "savedObjectBundle",
"workflows": ["flint-live-dashboards"]
},
{
"name": "aws_vpc_flow_flint-pre_agg",
"version": "1.0.0",
"extension": "ndjson",
"type": "savedObjectBundle",
"workflows": ["flint-pre-agg-dashboards"]
},

{
"name": "create_table_parquet_vpc",
"version": "1.0.0",
"extension": "sql",
"type": "query",
"workflows": ["flint-live-dashboards","flint-pre-agg-dashboards"]
},
{
"name": "vpc_live_all_mv",
"name": "create_table_vpc_schema",
"version": "1.0.0",
"extension": "sql",
"type": "query",
"workflows": ["flint-live-dashboards"]
},
{
"name": "vpc_live_week_mv",
"name": "example_queries",
"version": "1.0.0",
"extension": "sql",
"type": "query",
"workflows": ["flint-pre-agg-dashboards"]
},
{
"name": "vpc_agg_60min_connections_mv",
"version": "1.0.0",
"extension": "sql",
"type": "query",
"workflows": ["flint-pre-agg-dashboards"]
},
{
"name": "vpc_window-agg_60min_dest_ip_cardinality_mv",
"version": "1.0.0",
"extension": "sql",
"type": "query",
"workflows": ["flint-pre-agg-dashboards"]
},
{
"name": "vpc_window-agg_60min_dest_ip_total-bytes_mv",
"version": "1.0.0",
"extension": "sql",
"type": "query",
"workflows": ["flint-pre-agg-dashboards"]
"extension": "ndjson",
"type": "savedObjectBundle",
"workflows": ["queries"]
},

{
"name": "vpc_live_week_refresh",
"name": "create_skipping_index",
"version": "1.0.0",
"extension": "sql",
"type": "query",
"workflows": ["flint-pre-agg-refresh"]
"workflows": ["queries"]
},
{
"name": "vpc_agg_60min_connections_refresh",
"name": "aws_vpc_flow",
"version": "1.0.0",
"extension": "sql",
"type": "query",
"workflows": ["flint-pre-agg-refresh"]
"extension": "ndjson",
"type": "savedObjectBundle",
"workflows": ["dashboards"]
},
{
"name": "vpc_window-agg_60min_dest_ip_cardinality_refresh",
"name": "aws_vpc_flow_flint-live",
"version": "1.0.0",
"extension": "sql",
"type": "query",
"workflows": ["flint-pre-agg-refresh"]
"extension": "ndjson",
"type": "savedObjectBundle",
"workflows": ["flint-live-dashboards"]
},
{
"name": "vpc_window-agg_60min_dest_ip_total-bytes_refresh",
{
"name": "aws_vpc_live_stream_mv_schema",
"version": "1.0.0",
"extension": "sql",
"type": "query",
"workflows": ["flint-pre-agg-refresh"]
"workflows": ["flint-live-dashboards"]
}
],
"sampleData": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
CREATE MATERIALIZED VIEW {table_name}__live_mview AS
SELECT
CAST(IFNULL(srcport, 0) AS LONG) AS `aws.vpc.srcport`,
CAST(IFNULL(pkt_srcaddr, 'Unknown') AS STRING) AS `aws.vpc.pkt-src-aws-service`,
CAST(IFNULL(srcaddr, '0.0.0.0') AS STRING) AS `aws.vpc.srcaddr`,
CAST(IFNULL(interface_id, 'Unknown') AS STRING) AS `aws.vpc.src-interface_uid`,
CAST(IFNULL(vpc_id, 'Unknown') AS STRING) AS `aws.vpc.src-vpc_uid`,
CAST(IFNULL(instance_id, 'Unknown') AS STRING) AS `aws.vpc.src-instance_uid`,
CAST(IFNULL(subnet_id, 'Unknown') AS STRING) AS `aws.vpc.src-subnet_uid`,
CAST(IFNULL(dstport, 0) AS LONG) AS `aws.vpc.dstport`,
CAST(IFNULL(pkt_dstaddr, 'Unknown') AS STRING) AS `aws.vpc.pkt-dst-aws-service`,
CAST(IFNULL(dstaddr, '0.0.0.0') AS STRING) AS `aws.vpc.dstaddr`,
CAST(IFNULL(flow_direction, 'Unknown') AS STRING) AS `aws.vpc.flow-direction`,
CAST(IFNULL(tcp_flags, '0') AS STRING) AS `aws.vpc.connection.tcp_flags`,
CAST(IFNULL(packets, 0) AS LONG) AS `aws.vpc.packets`,
CAST(IFNULL(bytes, 0) AS LONG) AS `aws.vpc.bytes`,
CAST(FROM_UNIXTIME(start ) AS TIMESTAMP) AS `@timestamp`,
CAST(FROM_UNIXTIME(start ) AS TIMESTAMP) AS `start_time`,
CAST(FROM_UNIXTIME(start ) AS TIMESTAMP) AS `interval_start_time`,
CAST(FROM_UNIXTIME(`end` ) AS TIMESTAMP) AS `end_time`,
CAST(IFNULL(log_status, 'Unknown') AS STRING) AS `aws.vpc.status_code`,
CAST(IFNULL(version, 0) AS LONG) AS `aws.vpc.version`,
CAST(IFNULL(type, 'Unknown') AS STRING) AS `aws.vpc.type_name`,
CAST(IFNULL(traffic_path, 0) AS LONG) AS `aws.vpc.traffic_path`,
CAST(IFNULL(az_id, 'Unknown') AS STRING) AS `aws.vpc.az_id`,
CAST(IFNULL(action, 'Unknown') AS STRING) AS `aws.vpc.action`,
CAST(IFNULL(region, 'Unknown') AS STRING) AS `aws.vpc.region`,
CAST(IFNULL(account_id, 'Unknown') AS STRING) AS `aws.vpc.account-id`,
CAST(IFNULL(sublocation_type, 'Unknown') AS STRING) AS `aws.vpc.sublocation_type`,
CAST(IFNULL(sublocation_id, 'Unknown') AS STRING) AS `aws.vpc.sublocation_id`

FROM
{table_name}
WITH (
auto_refresh = true,
refresh_interval = '15 Minute',
checkpoint_location = '{s3_checkpoint_location}',
watermark_delay = '1 Minute',
extra_options = '{ "{table_name}": { "maxFilesPerTrigger": "10" }}'
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE SKIPPING INDEX ON {table_name} (
account_id BLOOM_FILTER,
region VALUE_SET,
srcaddr BLOOM_FILTER,
dstaddr BLOOM_FILTER,
pkt_src_aws_service VALUE_SET,
pkt_dst_aws_service VALUE_SET,
bytes MIN_MAX
) WITH (
auto_refresh = true,
refresh_interval = '15 Minutes',
checkpoint_location = '{s3_checkpoint_location}',
watermark_delay = '1 Minute'
)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
CREATE EXTERNAL TABLE IF NOT EXISTS {table_name} (
version int,
account_id string,
interface_id string,
srcaddr string,
dstaddr string,
srcport int,
dstport int,
protocol bigint,
packets bigint,
bytes bigint,
start bigint,
`end` bigint,
action string,
log_status string,
vpc_id string,
subnet_id string,
instance_id string,
tcp_flags int,
type string,
pkt_srcaddr string,
pkt_dstaddr string,
region string,
az_id string,
sublocation_type string,
sublocation_id string,
pkt_src_aws_service string,
pkt_dst_aws_service string,
flow_direction string,
traffic_path int
) USING parquet
LOCATION '{s3_bucket_location}'
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"attributes":{"createdTimeMs":1713290175184,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"TopCommonErrorServicesQuery","query":"SELECT pkt_src_aws_service AS source_service, pkt_dst_aws_service AS destination_service, COUNT(*) AS error_count FROM {table_name} where log_status IN ('SKIPDATA', 'RETIREDDATA') GROUP BY pkt_src_aws_service, pkt_dst_aws_service ORDER BY error_count DESC LIMIT 10","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Top 10 pairs of errored source / destination services","version":1},"id":"9e6a9b40-fc1a-11ee-99c9-43e5dbd0692c","references":[],"type":"observability-search","updated_at":"2024-04-16T17:56:15.220Z","version":"WzI3NTIsMV0="}
{"attributes":{"createdTimeMs":1713290175184,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"HourAggRequestsAndBytes","query":"SELECT date_trunc('hour', FROM_UNIXTIME(CAST(IFNULL(start, 0) AS LONG))) AS interval_start_time, CAST(IFNULL(pkt_srcaddr, 'Unknown') AS STRING) AS `aws.vpc.pkt-src-aws-service`, CAST(IFNULL(srcaddr, '0.0.0.0') AS STRING) AS `aws.vpc.srcaddr`, CAST(IFNULL(pkt_dstaddr, 'Unknown') AS STRING) AS `aws.vpc.pkt-dst-aws-service`, CAST(IFNULL(dstaddr, '0.0.0.0') AS STRING) AS `aws.vpc.dstaddr`, CAST(IFNULL(action, 'Unknown') AS STRING) AS `aws.vpc.action`, CAST(IFNULL(region, 'Unknown') AS STRING) AS `aws.vpc.region`, CAST(IFNULL(account_id, 'Unknown') AS STRING) AS `aws.vpc.account-id`, CAST(IFNULL(log_status, 'Unknown') AS STRING) AS `aws.vpc.status_code`, CAST(IFNULL(flow_direction, 'Unknown') AS STRING) AS `aws.vpc.connection.direction`, COUNT(*) AS total_connections, SUM(CAST(IFNULL(bytes, 0) AS LONG)) AS total_bytes, SUM(CAST(IFNULL(packets, 0) AS LONG)) AS total_packets FROM `zero_etl_walkthrough`.`default`.`amazon_vpc_flow` GROUP BY date_trunc('hour', FROM_UNIXTIME(CAST(IFNULL(start, 0) AS LONG))), pkt_srcaddr, srcaddr, pkt_dstaddr, dstaddr, action, region, account_id, log_status, flow_direction","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Hour aggregation by requests and bytes sum ","version":1},"id":"9e6a9b40-fc1a-11ee-99c9-43e5dbd9992c","references":[],"type":"observability-search","updated_at":"2024-04-16T17:56:15.220Z","version":"WzI3NTIsMV0="}
{"attributes":{"createdTimeMs":1713290175184,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"HourWindowTopIpByCardinality","query":" WITH hourly_buckets AS (SELECT date_trunc('hour', FROM_UNIXTIME(CAST(IFNULL(start, 0) AS LONG))) AS interval_start_time, CAST(IFNULL(dstaddr, '0.0.0.0') AS STRING) AS dstaddr, SUM(CAST(IFNULL(bytes, 0) AS LONG)) AS total_bytes FROM {table_name} GROUP BY interval_start_time, dstaddr), ranked_addresses AS (SELECT CAST(interval_start_time AS TIMESTAMP), dstaddr, total_bytes, RANK() OVER (PARTITION BY interval_start_time ORDER BY total_bytes DESC) AS bytes_rank FROM hourly_buckets) SELECT CAST(interval_start_time AS TIMESTAMP), dstaddr, total_bytes FROM ranked_addresses WHERE bytes_rank <= 50 ORDER BY interval_start_time ASC, bytes_rank ASC","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Hour window of top IP by cardinality ","version":1},"id":"9e6add40-fc1a-11ee-99c9-43e5dbd9992c","references":[],"type":"observability-search","updated_at":"2024-04-16T17:56:15.220Z","version":"WzI3NTIsMV0="}
{"exportedCount":7,"missingRefCount":0,"missingReferences":[]}

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 60c6464

Please sign in to comment.