Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support FileSourceRelation to load CSV in PPL #677

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FlightNum,Origin,FlightDelay,DistanceMiles,FlightTimeMin,OriginWeather,dayOfWeek,AvgTicketPrice,Carrier,FlightDelayMin,OriginRegion,FlightDelayType,DestAirportID,Dest,FlightTimeHour,Cancelled,DistanceKilometers,OriginCityName,DestWeather,OriginCountry,DestCountry,DestRegion,OriginAirportID,DestCityName,timestamp
RGXY9H5,Chubu Centrair International Airport,false,1619.970725161303,124.1471507959044,Heavy Fog,0,626.1297405910661,OpenSearch Dashboards Airlines,0,SE-BD,No Delay,CAN,Guangzhou Baiyun International Airport,2.06911917993174,true,2607.0901667139924,Tokoname,Clear,JP,CN,SE-BD,NGO,Guangzhou,2019-12-23 11:19:32
WOPNZEP,Munich Airport,true,198.57903689856937,34.9738738474057,Sunny,0,681.9911763989377,OpenSearch Dashboards Airlines,15,DE-BY,Carrier Delay,VE05,Venice Marco Polo Airport,0.5828978974567617,false,319.58198155849124,Munich,Cloudy,DE,IT,IT-34,MUC,Venice,2019-12-23 12:32:26
G9J5O2V,Frankfurt am Main Airport,false,4857.154739888458,651.402736475921,Clear,0,868.0507463122127,OpenSearch Dashboards Airlines,0,DE-HE,No Delay,XIY,Xi'an Xianyang International Airport,10.856712274598683,false,7816.832837711051,Frankfurt am Main,Thunder & Lightning,DE,CN,SE-BD,FRA,Xi'an,2019-12-23 03:48:33
HM80A5V,Itami Airport,false,5862.6666599206,555.0027890084269,Heavy Fog,0,765.0413127727119,Logstash Airways,0,SE-BD,No Delay,TV01,Treviso-Sant'Angelo Airport,9.250046483473783,true,9435.047413143258,Osaka,Clear,JP,IT,IT-34,ITM,Treviso,2019-12-23 19:50:48
84B0Y32,Charles de Gaulle International Airport,false,4397.926660603864,372.51457282541395,Thunder & Lightning,0,913.1638984616233,OpenSearch Dashboards Airlines,0,FR-J,No Delay,STL,St Louis Lambert International Airport,6.208576213756899,false,7077.776883682865,Paris,Thunder & Lightning,FR,US,US-MO,CDG,St Louis,2019-12-23 11:30:48
2AZWPJX,Rajiv Gandhi International Airport,true,0,180,Sunny,0,103.25307304704197,Logstash Airways,180,SE-BD,Security Delay,HYD,Rajiv Gandhi International Airport,3,false,0,Hyderabad,Hail,IN,IN,SE-BD,HYD,Hyderabad,2019-12-23 19:52:54
SFLRI9O,Erie International Tom Ridge Field,false,6961.655654280931,622.4277087379495,Clear,0,775.1109173747694,OpenSearch Dashboards Airlines,0,US-PA,No Delay,CJU,Jeju International Airport,10.373795145632492,false,11203.698757283091,Erie,Clear,US,KR,SE-BD,ERI,Jeju City,2019-12-23 07:32:32
QDQMOD6,Brisbane International Airport,false,8013.330880747018,716.4558873858294,Thunder & Lightning,0,832.082612870741,OpenSearch-Air,0,SE-BD,No Delay,DEN,Denver International Airport,11.94093145643049,false,12896.20597294493,Brisbane,Cloudy,AU,US,US-CO,BNE,Denver,2019-12-23 10:59:26
XTGFN9A,Jorge Chavez International Airport,false,3946.924514217792,396.99745533808243,Thunder & Lightning,0,348.23579123315324,OpenSearch Dashboards Airlines,0,SE-BD,No Delay,YOW,Ottawa Macdonald-Cartier International Airport,6.616624255634707,false,6351.959285409319,Lima,Rain,PE,CA,CA-ON,LIM,Ottawa,2019-12-23 21:10:09
USRQ3KK,Stockholm-Arlanda Airport,false,996.8381561540818,94.36797091633146,Clear,0,661.3465606549407,OpenSearch-Air,0,SE-AB,No Delay,TV01,Treviso-Sant'Angelo Airport,1.572799515272191,false,1604.2555055776347,Stockholm,Clear,SE,IT,IT-34,ARN,Treviso,2019-12-23 04:33:56
PK46NHH,Milano Linate Airport,false,5261.396351845886,604.8140464617903,Rain,0,600.4401843290168,JetBeats,0,IT-25,No Delay,GEG,Spokane International Airport,10.080234107696505,false,8467.396650465065,Milan,Clear,IT,US,US-WA,MI11,Spokane,2019-12-23 20:35:25
G80VHCJ,Bari Karol Wojty__a Airport,false,5630.111629019724,604.0524246328747,Sunny,0,738.260189539631,Logstash Airways,0,IT-75,No Delay,CJU,Jeju International Airport,10.067540410547911,false,9060.78636949312,Bari,Rain,IT,KR,SE-BD,BA02,Jeju City,2019-12-23 10:59:56
PDS4U17,El Dorado International Airport,false,5591.079567130033,499.887241937962,Thunder & Lightning,0,437.9253204442997,OpenSearch-Air,0,SE-BD,No Delay,TO11,Turin Airport,8.331454032299368,false,8997.970354883317,Bogota,Hail,CO,IT,IT-21,BOG,Torino,2019-12-23 10:33:53
2MXRGRK,Abu Dhabi International Airport,false,8160.7690090650885,656.6742320062424,Cloudy,0,825.9174161826418,JetBeats,0,SE-BD,No Delay,ABQ,Albuquerque International Sunport Airport,10.944570533437373,false,13133.484640124847,Abu Dhabi,Thunder & Lightning,AE,US,US-NM,AUH,Albuquerque,2019-12-23 19:27:11
57CZEDA,London Heathrow Airport,true,4757.876231054233,720.4152685405732,Damaging Wind,0,836.1010286937247,OpenSearch-Air,270,GB-ENG,Carrier Delay,XHBU,Ukrainka Air Base,12.006921142342886,false,7657.059565189745,London,Sunny,GB,RU,RU-AMU,LHR,Belogorsk,2019-12-23 18:48:49
5FYALP0,Malpensa International Airport,false,5812.230334559898,492.30936923905085,Damaging Wind,0,417.34744554513884,JetBeats,0,IT-25,No Delay,LAS,McCarran International Airport,8.20515615398418,false,9353.878015541966,Milan,Clear,IT,US,US-NV,MI12,Las Vegas,2019-12-23 10:37:54
HVWAL6J,Comodoro Arturo Merino Benitez International Airport,false,7292.7292896018525,617.7110592550002,Cloudy,0,946.888426456834,Logstash Airways,0,SE-BD,No Delay,PA03,Falcone Borsellino Airport,10.29518432091667,false,11736.510125845005,Santiago,Cloudy,CL,IT,IT-82,SCL,Palermo,2019-12-23 03:54:12
7ORM12S,Leonardo da Vinci___Fiumicino Airport,false,160.39074208529965,23.46580713004768,Sunny,0,118.37483602607261,OpenSearch Dashboards Airlines,0,IT-62,No Delay,PI05,Pisa International Airport,0.39109678550079463,false,258.1238784305245,Rome,Sunny,IT,IT,IT-52,RM11,Pisa,2019-12-23 03:54:12
2P36OEP,New Chitose Airport,true,5340.290617241973,941.1970552595557,Cloudy,0,705.7149863531135,OpenSearch Dashboards Airlines,225,SE-BD,Late Aircraft Delay,VIE,Vienna International Airport,15.686617587659262,false,8594.364663114668,Chitose / Tomakomai,Rain,JP,AT,AT-9,CTS,Vienna,2019-12-23 09:41:52
HLNZHCX,Verona Villafranca Airport,false,0,0,Sunny,0,172.3790782673846,OpenSearch-Air,0,IT-34,No Delay,VR10,Verona Villafranca Airport,0,false,0,Verona,Sunny,IT,IT,IT-34,VR10,Verona,2019-12-23 19:34:51
HLNNULL,Verona Villafranca Airport,,,0,,0,172.3790782673846,OpenSearch-Air,0,IT-34,No Delay,VR10,Verona Villafranca Airport,0,false,0,Verona,Sunny,IT,IT,IT-34,VR10,Verona,
Binary file not shown.
Binary file added integ-test/src/integration/resources/users.parquet
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,322 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, EqualTo, GreaterThan, IsNotNull, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, Filter, LogicalPlan, Project}
import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.types.{ArrayType, BooleanType, DoubleType, IntegerType, LongType, StringType, StructType, TimestampType}

class FlintSparkPPLFileSourceRelationITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
with StreamTest {

/** Test table and index name */
private val testTable = "flint_ppl_test"

override def beforeAll(): Unit = {
super.beforeAll()
}

protected override def afterEach(): Unit = {
super.afterEach()
// Stop all streaming jobs if any
spark.streams.active.foreach { job =>
job.stop()
job.awaitTermination()
}
}

test("test csv file source relation") {
val frame = sql(s"""
| file = $testTable "integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv"
| | where Cancelled = false AND DistanceMiles > 0
| | fields FlightNum, DistanceMiles, FlightTimeMin, OriginWeather, AvgTicketPrice, Carrier, FlightDelayType, timestamp
| | stats avg(AvgTicketPrice) as avg_price_by_weather by OriginWeather
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row(768.1694081139743, "Clear"),
Row(826.1736096641965, "Cloudy"),
Row(600.4401843290168, "Rain"),
Row(512.8754006548804, "Sunny"),
Row(632.8519057524543, "Thunder & Lightning"),
Row(626.7242371194318, "Damaging Wind"))
implicit val oneColRowOrdering: Ordering[Row] = Ordering.by[Row, Double](_.getAs[Double](0))
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical

val userSpecifiedSchema = new StructType()
.add("FlightNum", StringType)
.add("Origin", StringType)
.add("FlightDelay", BooleanType)
.add("DistanceMiles", DoubleType)
.add("FlightTimeMin", DoubleType)
.add("OriginWeather", StringType)
.add("dayOfWeek", IntegerType)
.add("AvgTicketPrice", DoubleType)
.add("Carrier", StringType)
.add("FlightDelayMin", IntegerType)
.add("OriginRegion", StringType)
.add("FlightDelayType", StringType)
.add("DestAirportID", StringType)
.add("Dest", StringType)
.add("FlightTimeHour", DoubleType)
.add("Cancelled", BooleanType)
.add("DistanceKilometers", DoubleType)
.add("OriginCityName", StringType)
.add("DestWeather", StringType)
.add("OriginCountry", StringType)
.add("DestCountry", StringType)
.add("DestRegion", StringType)
.add("OriginAirportID", StringType)
.add("DestCityName", StringType)
.add("timestamp", TimestampType)
val dataSource =
DataSource(
spark,
userSpecifiedSchema = Some(userSpecifiedSchema),
className = "csv",
options = Map.empty)
val relation = LogicalRelation(dataSource.resolveRelation(true), isStreaming = false)
val filterExpr = And(
EqualTo(UnresolvedAttribute("Cancelled"), Literal(false)),
GreaterThan(UnresolvedAttribute("DistanceMiles"), Literal(0)))
val filter = Filter(filterExpr, relation)
val project = Project(
Seq(
UnresolvedAttribute("FlightNum"),
UnresolvedAttribute("DistanceMiles"),
UnresolvedAttribute("FlightTimeMin"),
UnresolvedAttribute("OriginWeather"),
UnresolvedAttribute("AvgTicketPrice"),
UnresolvedAttribute("Carrier"),
UnresolvedAttribute("FlightDelayType"),
UnresolvedAttribute("timestamp")),
filter)
val weatherAlias = Alias(UnresolvedAttribute("OriginWeather"), "OriginWeather")()
val groupByAttributes = Seq(weatherAlias)
val aggregateExpressions =
Alias(
UnresolvedFunction(
Seq("AVG"),
Seq(UnresolvedAttribute("AvgTicketPrice")),
isDistinct = false),
"avg_price_by_weather")()
val aggregatePlan =
Aggregate(groupByAttributes, Seq(aggregateExpressions, weatherAlias), project)
val expectedPlan = Project(Seq(UnresolvedStar(None)), aggregatePlan)
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}

test("test csv file source relation with compression codec") {
val frame = sql(s"""
| file = $testTable "integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv.gz"
| | where Cancelled = false AND DistanceMiles > 0
| | fields FlightNum, DistanceMiles, FlightTimeMin, OriginWeather, AvgTicketPrice, Carrier, FlightDelayType, timestamp
| | stats avg(AvgTicketPrice) as avg_price_by_weather by OriginWeather
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row(768.1694081139743, "Clear"),
Row(826.1736096641965, "Cloudy"),
Row(600.4401843290168, "Rain"),
Row(512.8754006548804, "Sunny"),
Row(632.8519057524543, "Thunder & Lightning"),
Row(626.7242371194318, "Damaging Wind"))
implicit val oneColRowOrdering: Ordering[Row] = Ordering.by[Row, Double](_.getAs[Double](0))
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical

val userSpecifiedSchema = new StructType()
.add("FlightNum", StringType)
.add("Origin", StringType)
.add("FlightDelay", BooleanType)
.add("DistanceMiles", DoubleType)
.add("FlightTimeMin", DoubleType)
.add("OriginWeather", StringType)
.add("dayOfWeek", IntegerType)
.add("AvgTicketPrice", DoubleType)
.add("Carrier", StringType)
.add("FlightDelayMin", IntegerType)
.add("OriginRegion", StringType)
.add("FlightDelayType", StringType)
.add("DestAirportID", StringType)
.add("Dest", StringType)
.add("FlightTimeHour", DoubleType)
.add("Cancelled", BooleanType)
.add("DistanceKilometers", DoubleType)
.add("OriginCityName", StringType)
.add("DestWeather", StringType)
.add("OriginCountry", StringType)
.add("DestCountry", StringType)
.add("DestRegion", StringType)
.add("OriginAirportID", StringType)
.add("DestCityName", StringType)
.add("timestamp", TimestampType)
val dataSource =
DataSource(
spark,
userSpecifiedSchema = Some(userSpecifiedSchema),
className = "csv",
options = Map.empty)
val relation = LogicalRelation(dataSource.resolveRelation(true), isStreaming = false)
val filterExpr = And(
EqualTo(UnresolvedAttribute("Cancelled"), Literal(false)),
GreaterThan(UnresolvedAttribute("DistanceMiles"), Literal(0)))
val filter = Filter(filterExpr, relation)
val project = Project(
Seq(
UnresolvedAttribute("FlightNum"),
UnresolvedAttribute("DistanceMiles"),
UnresolvedAttribute("FlightTimeMin"),
UnresolvedAttribute("OriginWeather"),
UnresolvedAttribute("AvgTicketPrice"),
UnresolvedAttribute("Carrier"),
UnresolvedAttribute("FlightDelayType"),
UnresolvedAttribute("timestamp")),
filter)
val weatherAlias = Alias(UnresolvedAttribute("OriginWeather"), "OriginWeather")()
val groupByAttributes = Seq(weatherAlias)
val aggregateExpressions =
Alias(
UnresolvedFunction(
Seq("AVG"),
Seq(UnresolvedAttribute("AvgTicketPrice")),
isDistinct = false),
"avg_price_by_weather")()
val aggregatePlan =
Aggregate(groupByAttributes, Seq(aggregateExpressions, weatherAlias), project)
val expectedPlan = Project(Seq(UnresolvedStar(None)), aggregatePlan)
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}

test("test csv file source relation with filter") {
val frame = sql(s"""
| file = $testTable "integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv"
| Cancelled = false AND DistanceMiles > 0
| | fields FlightNum, DistanceMiles, FlightTimeMin, OriginWeather, AvgTicketPrice, Carrier, FlightDelayType, timestamp
| | stats avg(AvgTicketPrice) as avg_price_by_weather by OriginWeather
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row(768.1694081139743, "Clear"),
Row(826.1736096641965, "Cloudy"),
Row(600.4401843290168, "Rain"),
Row(512.8754006548804, "Sunny"),
Row(632.8519057524543, "Thunder & Lightning"),
Row(626.7242371194318, "Damaging Wind"))
implicit val oneColRowOrdering: Ordering[Row] = Ordering.by[Row, Double](_.getAs[Double](0))
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical

val userSpecifiedSchema = new StructType()
.add("FlightNum", StringType)
.add("Origin", StringType)
.add("FlightDelay", BooleanType)
.add("DistanceMiles", DoubleType)
.add("FlightTimeMin", DoubleType)
.add("OriginWeather", StringType)
.add("dayOfWeek", IntegerType)
.add("AvgTicketPrice", DoubleType)
.add("Carrier", StringType)
.add("FlightDelayMin", IntegerType)
.add("OriginRegion", StringType)
.add("FlightDelayType", StringType)
.add("DestAirportID", StringType)
.add("Dest", StringType)
.add("FlightTimeHour", DoubleType)
.add("Cancelled", BooleanType)
.add("DistanceKilometers", DoubleType)
.add("OriginCityName", StringType)
.add("DestWeather", StringType)
.add("OriginCountry", StringType)
.add("DestCountry", StringType)
.add("DestRegion", StringType)
.add("OriginAirportID", StringType)
.add("DestCityName", StringType)
.add("timestamp", TimestampType)
val dataSource =
DataSource(
spark,
userSpecifiedSchema = Some(userSpecifiedSchema),
className = "csv",
options = Map.empty)
val relation = LogicalRelation(dataSource.resolveRelation(true), isStreaming = false)
val filterExpr = And(
EqualTo(UnresolvedAttribute("Cancelled"), Literal(false)),
GreaterThan(UnresolvedAttribute("DistanceMiles"), Literal(0)))
val filter = Filter(filterExpr, relation)
val project = Project(
Seq(
UnresolvedAttribute("FlightNum"),
UnresolvedAttribute("DistanceMiles"),
UnresolvedAttribute("FlightTimeMin"),
UnresolvedAttribute("OriginWeather"),
UnresolvedAttribute("AvgTicketPrice"),
UnresolvedAttribute("Carrier"),
UnresolvedAttribute("FlightDelayType"),
UnresolvedAttribute("timestamp")),
filter)
val weatherAlias = Alias(UnresolvedAttribute("OriginWeather"), "OriginWeather")()
val groupByAttributes = Seq(weatherAlias)
val aggregateExpressions =
Alias(
UnresolvedFunction(
Seq("AVG"),
Seq(UnresolvedAttribute("AvgTicketPrice")),
isDistinct = false),
"avg_price_by_weather")()
val aggregatePlan =
Aggregate(groupByAttributes, Seq(aggregateExpressions, weatherAlias), project)
val expectedPlan = Project(Seq(UnresolvedStar(None)), aggregatePlan)
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}

test("test parquet file source relation") {
val frame = sql(s"""
| file = $testTable "integ-test/src/integration/resources/users.parquet"
| | fields name, favorite_color
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(Row("Alyssa", null), Row("Ben", "red"))
implicit val oneColRowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical

val userSpecifiedSchema = new StructType()
.add("name", StringType)
.add("favorite_color", StringType)
.add("favorite_numbers", new ArrayType(IntegerType, true))
val dataSource =
DataSource(
spark,
userSpecifiedSchema = Some(userSpecifiedSchema),
className = "parquet",
options = Map.empty)
val relation = LogicalRelation(dataSource.resolveRelation(true), isStreaming = false)
val expectedPlan =
Project(Seq(UnresolvedAttribute("name"), UnresolvedAttribute("favorite_color")), relation)
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}
}
Loading
Loading