From 70a74112b8b0eb8147b10bb2d9205be46852f46d Mon Sep 17 00:00:00 2001 From: Lantao Jin Date: Wed, 18 Sep 2024 19:35:37 +0800 Subject: [PATCH 1/3] Support load CSV in PPL Signed-off-by: Lantao Jin --- ...nsearch_dashboards_sample_data_flights.csv | 22 ++ ...arch_dashboards_sample_data_flights.csv.gz | Bin 0 -> 2768 bytes .../src/integration/resources/users.parquet | Bin 0 -> 615 bytes .../integration/resources/users.parquet.gz | Bin 0 -> 417 bytes ...intSparkPPLFileSourceRelationITSuite.scala | 322 ++++++++++++++++++ ppl-spark-integration/README.md | 6 + .../src/main/antlr4/OpenSearchPPLLexer.g4 | 2 + .../src/main/antlr4/OpenSearchPPLParser.g4 | 6 + .../sql/ast/AbstractNodeVisitor.java | 6 + .../sql/ast/tree/FileSourceRelation.java | 62 ++++ .../sql/ppl/CatalystPlanContext.java | 14 +- .../sql/ppl/CatalystQueryPlanVisitor.java | 26 +- .../opensearch/sql/ppl/parser/AstBuilder.java | 45 ++- .../sql/ppl/utils/RelationUtils.java | 2 +- .../flint/spark/FlintPPLSparkExtensions.scala | 2 +- .../flint/spark/ppl/FlintSparkPPLParser.scala | 6 +- .../src/test/resources/people.csv | 3 + .../src/test/resources/people.csv.gz | Bin 0 -> 69 bytes .../spark/ppl/LogicalPlanTestUtils.scala | 12 +- ...ileSourceRelationTranslatorTestSuite.scala | 102 ++++++ 20 files changed, 629 insertions(+), 9 deletions(-) create mode 100644 integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv create mode 100644 integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv.gz create mode 100644 integ-test/src/integration/resources/users.parquet create mode 100644 integ-test/src/integration/resources/users.parquet.gz create mode 100644 integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFileSourceRelationITSuite.scala create mode 100644 ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/FileSourceRelation.java create mode 100644 ppl-spark-integration/src/test/resources/people.csv create mode 100644 ppl-spark-integration/src/test/resources/people.csv.gz create mode 100644 ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFileSourceRelationTranslatorTestSuite.scala diff --git a/integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv b/integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv new file mode 100644 index 000000000..2e3591a06 --- /dev/null +++ b/integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv @@ -0,0 +1,22 @@ +FlightNum,Origin,FlightDelay,DistanceMiles,FlightTimeMin,OriginWeather,dayOfWeek,AvgTicketPrice,Carrier,FlightDelayMin,OriginRegion,FlightDelayType,DestAirportID,Dest,FlightTimeHour,Cancelled,DistanceKilometers,OriginCityName,DestWeather,OriginCountry,DestCountry,DestRegion,OriginAirportID,DestCityName,timestamp +RGXY9H5,Chubu Centrair International Airport,false,1619.970725161303,124.1471507959044,Heavy Fog,0,626.1297405910661,OpenSearch Dashboards Airlines,0,SE-BD,No Delay,CAN,Guangzhou Baiyun International Airport,2.06911917993174,true,2607.0901667139924,Tokoname,Clear,JP,CN,SE-BD,NGO,Guangzhou,2019-12-23 11:19:32 +WOPNZEP,Munich Airport,true,198.57903689856937,34.9738738474057,Sunny,0,681.9911763989377,OpenSearch Dashboards Airlines,15,DE-BY,Carrier Delay,VE05,Venice Marco Polo Airport,0.5828978974567617,false,319.58198155849124,Munich,Cloudy,DE,IT,IT-34,MUC,Venice,2019-12-23 12:32:26 +G9J5O2V,Frankfurt am Main Airport,false,4857.154739888458,651.402736475921,Clear,0,868.0507463122127,OpenSearch Dashboards Airlines,0,DE-HE,No Delay,XIY,Xi'an Xianyang International Airport,10.856712274598683,false,7816.832837711051,Frankfurt am Main,Thunder & Lightning,DE,CN,SE-BD,FRA,Xi'an,2019-12-23 03:48:33 +HM80A5V,Itami Airport,false,5862.6666599206,555.0027890084269,Heavy Fog,0,765.0413127727119,Logstash Airways,0,SE-BD,No Delay,TV01,Treviso-Sant'Angelo Airport,9.250046483473783,true,9435.047413143258,Osaka,Clear,JP,IT,IT-34,ITM,Treviso,2019-12-23 19:50:48 +84B0Y32,Charles de Gaulle International Airport,false,4397.926660603864,372.51457282541395,Thunder & Lightning,0,913.1638984616233,OpenSearch Dashboards Airlines,0,FR-J,No Delay,STL,St Louis Lambert International Airport,6.208576213756899,false,7077.776883682865,Paris,Thunder & Lightning,FR,US,US-MO,CDG,St Louis,2019-12-23 11:30:48 +2AZWPJX,Rajiv Gandhi International Airport,true,0,180,Sunny,0,103.25307304704197,Logstash Airways,180,SE-BD,Security Delay,HYD,Rajiv Gandhi International Airport,3,false,0,Hyderabad,Hail,IN,IN,SE-BD,HYD,Hyderabad,2019-12-23 19:52:54 +SFLRI9O,Erie International Tom Ridge Field,false,6961.655654280931,622.4277087379495,Clear,0,775.1109173747694,OpenSearch Dashboards Airlines,0,US-PA,No Delay,CJU,Jeju International Airport,10.373795145632492,false,11203.698757283091,Erie,Clear,US,KR,SE-BD,ERI,Jeju City,2019-12-23 07:32:32 +QDQMOD6,Brisbane International Airport,false,8013.330880747018,716.4558873858294,Thunder & Lightning,0,832.082612870741,OpenSearch-Air,0,SE-BD,No Delay,DEN,Denver International Airport,11.94093145643049,false,12896.20597294493,Brisbane,Cloudy,AU,US,US-CO,BNE,Denver,2019-12-23 10:59:26 +XTGFN9A,Jorge Chavez International Airport,false,3946.924514217792,396.99745533808243,Thunder & Lightning,0,348.23579123315324,OpenSearch Dashboards Airlines,0,SE-BD,No Delay,YOW,Ottawa Macdonald-Cartier International Airport,6.616624255634707,false,6351.959285409319,Lima,Rain,PE,CA,CA-ON,LIM,Ottawa,2019-12-23 21:10:09 +USRQ3KK,Stockholm-Arlanda Airport,false,996.8381561540818,94.36797091633146,Clear,0,661.3465606549407,OpenSearch-Air,0,SE-AB,No Delay,TV01,Treviso-Sant'Angelo Airport,1.572799515272191,false,1604.2555055776347,Stockholm,Clear,SE,IT,IT-34,ARN,Treviso,2019-12-23 04:33:56 +PK46NHH,Milano Linate Airport,false,5261.396351845886,604.8140464617903,Rain,0,600.4401843290168,JetBeats,0,IT-25,No Delay,GEG,Spokane International Airport,10.080234107696505,false,8467.396650465065,Milan,Clear,IT,US,US-WA,MI11,Spokane,2019-12-23 20:35:25 +G80VHCJ,Bari Karol Wojty__a Airport,false,5630.111629019724,604.0524246328747,Sunny,0,738.260189539631,Logstash Airways,0,IT-75,No Delay,CJU,Jeju International Airport,10.067540410547911,false,9060.78636949312,Bari,Rain,IT,KR,SE-BD,BA02,Jeju City,2019-12-23 10:59:56 +PDS4U17,El Dorado International Airport,false,5591.079567130033,499.887241937962,Thunder & Lightning,0,437.9253204442997,OpenSearch-Air,0,SE-BD,No Delay,TO11,Turin Airport,8.331454032299368,false,8997.970354883317,Bogota,Hail,CO,IT,IT-21,BOG,Torino,2019-12-23 10:33:53 +2MXRGRK,Abu Dhabi International Airport,false,8160.7690090650885,656.6742320062424,Cloudy,0,825.9174161826418,JetBeats,0,SE-BD,No Delay,ABQ,Albuquerque International Sunport Airport,10.944570533437373,false,13133.484640124847,Abu Dhabi,Thunder & Lightning,AE,US,US-NM,AUH,Albuquerque,2019-12-23 19:27:11 +57CZEDA,London Heathrow Airport,true,4757.876231054233,720.4152685405732,Damaging Wind,0,836.1010286937247,OpenSearch-Air,270,GB-ENG,Carrier Delay,XHBU,Ukrainka Air Base,12.006921142342886,false,7657.059565189745,London,Sunny,GB,RU,RU-AMU,LHR,Belogorsk,2019-12-23 18:48:49 +5FYALP0,Malpensa International Airport,false,5812.230334559898,492.30936923905085,Damaging Wind,0,417.34744554513884,JetBeats,0,IT-25,No Delay,LAS,McCarran International Airport,8.20515615398418,false,9353.878015541966,Milan,Clear,IT,US,US-NV,MI12,Las Vegas,2019-12-23 10:37:54 +HVWAL6J,Comodoro Arturo Merino Benitez International Airport,false,7292.7292896018525,617.7110592550002,Cloudy,0,946.888426456834,Logstash Airways,0,SE-BD,No Delay,PA03,Falcone Borsellino Airport,10.29518432091667,false,11736.510125845005,Santiago,Cloudy,CL,IT,IT-82,SCL,Palermo,2019-12-23 03:54:12 +7ORM12S,Leonardo da Vinci___Fiumicino Airport,false,160.39074208529965,23.46580713004768,Sunny,0,118.37483602607261,OpenSearch Dashboards Airlines,0,IT-62,No Delay,PI05,Pisa International Airport,0.39109678550079463,false,258.1238784305245,Rome,Sunny,IT,IT,IT-52,RM11,Pisa,2019-12-23 03:54:12 +2P36OEP,New Chitose Airport,true,5340.290617241973,941.1970552595557,Cloudy,0,705.7149863531135,OpenSearch Dashboards Airlines,225,SE-BD,Late Aircraft Delay,VIE,Vienna International Airport,15.686617587659262,false,8594.364663114668,Chitose / Tomakomai,Rain,JP,AT,AT-9,CTS,Vienna,2019-12-23 09:41:52 +HLNZHCX,Verona Villafranca Airport,false,0,0,Sunny,0,172.3790782673846,OpenSearch-Air,0,IT-34,No Delay,VR10,Verona Villafranca Airport,0,false,0,Verona,Sunny,IT,IT,IT-34,VR10,Verona,2019-12-23 19:34:51 +HLNNULL,Verona Villafranca Airport,,,0,,0,172.3790782673846,OpenSearch-Air,0,IT-34,No Delay,VR10,Verona Villafranca Airport,0,false,0,Verona,Sunny,IT,IT,IT-34,VR10,Verona, diff --git a/integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv.gz b/integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv.gz new file mode 100644 index 0000000000000000000000000000000000000000..2b59979a504172a4eaa0a6c8f51816ab45a95b62 GIT binary patch literal 2768 zcmV;>3NQ5^iwFn`s_JF{18;C;ZgXW}a${&;WMOk?VsBw`WOHA0VQp}1WnW}rbYWj+ zY-wj`baO6ab9MmDSjl!8$r9c36|7!wlIolXc1!|EOa?T0%?#f(-IxmZaoj^M__U_phXxELT2#ihIdCUKY)KVYCeyr`xLK}dA5jGalS0hZX~PMzF%xl+of4N zm95iN{?Tg+fus>~u6cBOoqcm#YKe&z}jpy~_y4;9yj8F3NWr`$Hzrsm= zUM@}{-~D_U6Q-GqoS}v*{3xg(jLV2g!xS}6IVvUT#Qx`N`68PUN;KCI2L;A9jc=3Uu3|irmIXCNB5z=Bzx-=Gi_k8Y#=IGk8d) zjb^53r9dZ@MaE1iD;@e+TnSal)>8ngMJ^|JR0N*6X%sQu)P$oTRjx~IccljWmR#gM zefhr5S7pCIGD&&@A8D0|n34upwpJ=jv|^E@+z2g=a-0<{p~Pw%QGiNn!8m99)AG8QKV}u(~%Y2qC1RH#eTmanLMx4 zDVp)0ayJZknxwN(xjmpRMzw7g$AzP!Ds5GWQm1EWP2G}93}kXrEM+y1H2x`omTIDu ziYQ8D9i>)s?cPk9(Rfxe6w4TnY#r%lGtBvN1JPgp^_|(8+)_p+dHge3W|gr|SAW#f zS$qT@7jZ?Y)Y1y{%%CtEm^&##FcZQ_!O_OS(!cn_IPGzBIqB^JyaA4@DvA=6tgO=q z!9hVj2Sb;qak=f+;KdJFND5~n$59%psjylSVR)pNREArwkd{;LHii<%M8vefcuLJQ z7vghRU5qMMhnkHiT{2#kyV*Kfmb-rO7y}INC#xe)0eH=sFba&|cCezxM8;@q!4uqS zMTS04mhWbKF(Nl(JSx2bX*ApW&?nR+Hr3>{uXn?%douEWB|lNfbo!i}$!*g|N|>d4 zLSj^)-GUl{)&fe-{0^bb>o*`8$4~1Vno^Xd^U(Y!VRw^7N#_+;^^bl^IzE|`%YJxl zj}B)Yo<4yMuPRv@Uvx*8Za`Y#!IManY*8L1(^*`;NaFdlh^L)qkyc78$t{ICK$&EI1Py@If=kDD>dH8J zpq(`V$O0*BZdhan99)lz617H`+Xz9{-!O{_lnj~r()`jJG&QLMS&u&bc%UrBSO`I_ z1>qXXEHPj{31JAr1pz@3-k&G1V?-_2j9b839_xDrF@EQqW~&cBlKza(84U(ml%Yi^ zjsz5UQ-cUDOq6m4=}0H`SKA3z?WUNO#(>oOts=}D0MV*)L9p&8?Tfyvk*h2RQouex z@ZsdXP8W$-Wj)TsSpu)uzhOiDcn1yU?;BbgPC(WzZ+x z%U%)kxPTlkh*qg9-Hbze!)vMgoesg8;nNJ=M?2zEIp^VeF4R9bAFD~o!7>^26}Akn zaoYtHg3?Gzj3LA|7*~r~U)8Yz1ZW^5t_~$=w;&2XvX}EC4!WV1av>Q7n`vBmH}z7R zkP-fbs-x)J{EPeJ^4n>;tC8L%W4kDC;6STFRk zeLDpN&`3eA&|cgOqbxUNpbAKZ)dP#jgGC1SM6mD7Iu#1X-m^3)hf)8AdZ{)*B8mYU z3j4joDV#)PtQO#VOaDflaV-Ne&|VlzAru+;#%dKDcrlg$YDC1ji9@ zBRgccV6Y4oO2VXImm>9SmaY6&t3j{Z$$(w2587C#aAtN=j4&9Y#C!Ln_UM|_uwXWy z{o^@bP=sZn2^#(u?NCr7h5Z0TW+cZwX<#$CBYQ{>R}n;7ViAP^N%m$+PVQc-e<8K` zhthoFQF5FPku2)+9_Q`&E->E}aUviAX%VwCdgm2^+%ZSFt z20;`sRp7J2-D+e9m4?@1o29e3Ofl8%IvL&IQK|KANVhX0b@25p%a<>23Sz^8Bwa~e zJk+{FN_u{de3$;?VY0}I3rrDI8B4XrF!2aJ3RBI6!#W8>omNINkRWy$yannmu$F$n zw%yv8^qxY2{JBpdZ15->y~C0lP+1U+Py$VZ(?T2u(tV$7`?rB?IqCXk`8J;U<>^?N z;D0-}ceSp*B8_a3O|u+&m#^0N){Da=mh0HmSDzRfm?$2F|KO^?unMS%C0pNHg*hKxJ4(YRm*wAq_r1FR~5 z6^o@r9*P8IPy8&~<BcBR4mO5GxlPii z~UaeYz6kPHZicM!G^49_^65hlh3%2 zI*9#Hs!LpQn4Cp-$Oa3EHa7GH+lt;15=s=oRsvVB%Fs|nM%f}Ra@*7>#8RA~2y7GL z-6HX!(1X^H^y6PJ!^tXJ#&0P{2^rc&0g)g~AVeowgdgCaIA$F#>Hrq}5jc_#YN`Zd zLVZ>c4)AT!(=B|{Q|`a7b{}wEw#aP~r|G%!FcoPF^Jo-E5#ZIkeafnh*_0aI0Y(!| z+gu{#~uNPIHfm+YzH5#i08r+ZUbo9A_OJaUwQCRuv03>ECp_ WA4MTJ|NkNV(*FRZ>jq-x7XSe9Fgn}- literal 0 HcmV?d00001 diff --git a/integ-test/src/integration/resources/users.parquet b/integ-test/src/integration/resources/users.parquet new file mode 100644 index 0000000000000000000000000000000000000000..aa527338c43a8400fd56e549cb28aa1e6a9ccccf GIT binary patch literal 615 zcmZuv%WA?v6dhv>skOF(GbAMx8A#|N4V6|9aiOIPms03PD`l!<8_27ZC>8M^`h9*v zzoIu$LutDhxO2}r_i<*1{f8z-m}1MuG6X7C5vuhRgizmG#W5>FbjJgL&hf>LqokaZ zYYC8|l;VQV0B_^Ajmr=y801Dh!>c8wcbamJ;GDv#!@-jNG^p_p=0_fP*iwYfW6TA} zaK%KL95A1oK&zONR-LnDDBOfUPeU&hkZvLEEKddt|AmVfOQ~2gWv#@7U@Jsq-O#(1 zYT$})sz~1z#S)RpJsDWAfHh39mWmYpcax0PB|U2hw9kS8^O``r{L^;V4CrMtA|s$8 zM79MYBi+!Bv%TW!8~2&EEv#v>ia701!Ka~^QJbb)!ad!5e~TkFO;bOe0ch@WZx++e zczw`hQu|ObPJ|o0(v6+txjmU@P-546O!ri1zVJLc`A@QUG#BNAXU0Mr-ol4zs2e17 jvzcs=rbSG=FL-k0i^dXO!wrK*)46qS&=)u|gfI3D{z{mb literal 0 HcmV?d00001 diff --git a/integ-test/src/integration/resources/users.parquet.gz b/integ-test/src/integration/resources/users.parquet.gz new file mode 100644 index 0000000000000000000000000000000000000000..d41b8884bc09d8b1bda7b3eeee518341c12bce39 GIT binary patch literal 417 zcmV;S0bc$eiwFp3paf$819fv{a&sC~b7_#&I?98{M+qpX>Kp1g7 zqT|6J20*P{0)S31OA=nh)3u0@Iv|HDO-X5XPc&qTn&PDxB78|MJyOniwA=NJ-`gs+ zt4gztRmG}f5zTtIt%-LYjswx;QyDk!6Cu$?Dh`->#6#hLYBY0i#HX?JQ}Hy2gIFSb z0p@-VMw9Sa$b@LhtU0>5#iiskpt@ySE!VI~#c~X=R+kcu>hy-*x){LiD(=F7lJ$E? zCTr2Q+eO(&!Nz?ZlpXyUyBdb_GVReGN&)DyKy|Qeb0^D{3|>XZ&1GTE(%D$BD<;Jt zmP6(-wGj3y`MOKSe4wE3FN4&`7r`)LSN-|MlYP9+jY%r~=!Gr3oc|doSN1nj95)F9 z_V}M=9W6KIi)UF8Mp1~{Fh5G!V)3{D(;Rd8HWBG@C>vAs`f=1aZJsyJ;1ge@i@(?x LdidmgX955KkZa9i literal 0 HcmV?d00001 diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFileSourceRelationITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFileSourceRelationITSuite.scala new file mode 100644 index 000000000..b990d79de --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFileSourceRelationITSuite.scala @@ -0,0 +1,322 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.ppl + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} +import org.apache.spark.sql.catalyst.expressions.{Alias, And, EqualTo, GreaterThan, IsNotNull, Literal} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, Filter, LogicalPlan, Project} +import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{ArrayType, BooleanType, DoubleType, IntegerType, LongType, StringType, StructType, TimestampType} + +class FlintSparkPPLFileSourceRelationITSuite + extends QueryTest + with LogicalPlanTestUtils + with FlintPPLSuite + with StreamTest { + + /** Test table and index name */ + private val testTable = "flint_ppl_test" + + override def beforeAll(): Unit = { + super.beforeAll() + } + + protected override def afterEach(): Unit = { + super.afterEach() + // Stop all streaming jobs if any + spark.streams.active.foreach { job => + job.stop() + job.awaitTermination() + } + } + + test("test csv file source relation") { + val frame = sql(s""" + | file = $testTable ( integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv ) + | | where Cancelled = false AND DistanceMiles > 0 + | | fields FlightNum, DistanceMiles, FlightTimeMin, OriginWeather, AvgTicketPrice, Carrier, FlightDelayType, timestamp + | | stats avg(AvgTicketPrice) as avg_price_by_weather by OriginWeather + | """.stripMargin) + + val results: Array[Row] = frame.collect() + // results.foreach(println(_)) + val expectedResults: Array[Row] = Array( + Row(768.1694081139743, "Clear"), + Row(826.1736096641965, "Cloudy"), + Row(600.4401843290168, "Rain"), + Row(512.8754006548804, "Sunny"), + Row(632.8519057524543, "Thunder & Lightning"), + Row(626.7242371194318, "Damaging Wind")) + implicit val oneColRowOrdering: Ordering[Row] = Ordering.by[Row, Double](_.getAs[Double](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + + val userSpecifiedSchema = new StructType() + .add("FlightNum", StringType) + .add("Origin", StringType) + .add("FlightDelay", BooleanType) + .add("DistanceMiles", DoubleType) + .add("FlightTimeMin", DoubleType) + .add("OriginWeather", StringType) + .add("dayOfWeek", IntegerType) + .add("AvgTicketPrice", DoubleType) + .add("Carrier", StringType) + .add("FlightDelayMin", IntegerType) + .add("OriginRegion", StringType) + .add("FlightDelayType", StringType) + .add("DestAirportID", StringType) + .add("Dest", StringType) + .add("FlightTimeHour", DoubleType) + .add("Cancelled", BooleanType) + .add("DistanceKilometers", DoubleType) + .add("OriginCityName", StringType) + .add("DestWeather", StringType) + .add("OriginCountry", StringType) + .add("DestCountry", StringType) + .add("DestRegion", StringType) + .add("OriginAirportID", StringType) + .add("DestCityName", StringType) + .add("timestamp", TimestampType) + val dataSource = + DataSource( + spark, + userSpecifiedSchema = Some(userSpecifiedSchema), + className = "csv", + options = Map.empty) + val relation = LogicalRelation(dataSource.resolveRelation(true), isStreaming = false) + val filterExpr = And( + EqualTo(UnresolvedAttribute("Cancelled"), Literal(false)), + GreaterThan(UnresolvedAttribute("DistanceMiles"), Literal(0))) + val filter = Filter(filterExpr, relation) + val project = Project( + Seq( + UnresolvedAttribute("FlightNum"), + UnresolvedAttribute("DistanceMiles"), + UnresolvedAttribute("FlightTimeMin"), + UnresolvedAttribute("OriginWeather"), + UnresolvedAttribute("AvgTicketPrice"), + UnresolvedAttribute("Carrier"), + UnresolvedAttribute("FlightDelayType"), + UnresolvedAttribute("timestamp")), + filter) + val weatherAlias = Alias(UnresolvedAttribute("OriginWeather"), "OriginWeather")() + val groupByAttributes = Seq(weatherAlias) + val aggregateExpressions = + Alias( + UnresolvedFunction( + Seq("AVG"), + Seq(UnresolvedAttribute("AvgTicketPrice")), + isDistinct = false), + "avg_price_by_weather")() + val aggregatePlan = + Aggregate(groupByAttributes, Seq(aggregateExpressions, weatherAlias), project) + val expectedPlan = Project(Seq(UnresolvedStar(None)), aggregatePlan) + assert(compareByString(expectedPlan) === compareByString(logicalPlan)) + } + + test("test csv file source relation with compression codec") { + val frame = sql(s""" + | file = $testTable ( integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv.gz ) + | | where Cancelled = false AND DistanceMiles > 0 + | | fields FlightNum, DistanceMiles, FlightTimeMin, OriginWeather, AvgTicketPrice, Carrier, FlightDelayType, timestamp + | | stats avg(AvgTicketPrice) as avg_price_by_weather by OriginWeather + | """.stripMargin) + + val results: Array[Row] = frame.collect() + // results.foreach(println(_)) + val expectedResults: Array[Row] = Array( + Row(768.1694081139743, "Clear"), + Row(826.1736096641965, "Cloudy"), + Row(600.4401843290168, "Rain"), + Row(512.8754006548804, "Sunny"), + Row(632.8519057524543, "Thunder & Lightning"), + Row(626.7242371194318, "Damaging Wind")) + implicit val oneColRowOrdering: Ordering[Row] = Ordering.by[Row, Double](_.getAs[Double](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + + val userSpecifiedSchema = new StructType() + .add("FlightNum", StringType) + .add("Origin", StringType) + .add("FlightDelay", BooleanType) + .add("DistanceMiles", DoubleType) + .add("FlightTimeMin", DoubleType) + .add("OriginWeather", StringType) + .add("dayOfWeek", IntegerType) + .add("AvgTicketPrice", DoubleType) + .add("Carrier", StringType) + .add("FlightDelayMin", IntegerType) + .add("OriginRegion", StringType) + .add("FlightDelayType", StringType) + .add("DestAirportID", StringType) + .add("Dest", StringType) + .add("FlightTimeHour", DoubleType) + .add("Cancelled", BooleanType) + .add("DistanceKilometers", DoubleType) + .add("OriginCityName", StringType) + .add("DestWeather", StringType) + .add("OriginCountry", StringType) + .add("DestCountry", StringType) + .add("DestRegion", StringType) + .add("OriginAirportID", StringType) + .add("DestCityName", StringType) + .add("timestamp", TimestampType) + val dataSource = + DataSource( + spark, + userSpecifiedSchema = Some(userSpecifiedSchema), + className = "csv", + options = Map.empty) + val relation = LogicalRelation(dataSource.resolveRelation(true), isStreaming = false) + val filterExpr = And( + EqualTo(UnresolvedAttribute("Cancelled"), Literal(false)), + GreaterThan(UnresolvedAttribute("DistanceMiles"), Literal(0))) + val filter = Filter(filterExpr, relation) + val project = Project( + Seq( + UnresolvedAttribute("FlightNum"), + UnresolvedAttribute("DistanceMiles"), + UnresolvedAttribute("FlightTimeMin"), + UnresolvedAttribute("OriginWeather"), + UnresolvedAttribute("AvgTicketPrice"), + UnresolvedAttribute("Carrier"), + UnresolvedAttribute("FlightDelayType"), + UnresolvedAttribute("timestamp")), + filter) + val weatherAlias = Alias(UnresolvedAttribute("OriginWeather"), "OriginWeather")() + val groupByAttributes = Seq(weatherAlias) + val aggregateExpressions = + Alias( + UnresolvedFunction( + Seq("AVG"), + Seq(UnresolvedAttribute("AvgTicketPrice")), + isDistinct = false), + "avg_price_by_weather")() + val aggregatePlan = + Aggregate(groupByAttributes, Seq(aggregateExpressions, weatherAlias), project) + val expectedPlan = Project(Seq(UnresolvedStar(None)), aggregatePlan) + assert(compareByString(expectedPlan) === compareByString(logicalPlan)) + } + + test("test csv file source relation with filter") { + val frame = sql(s""" + | file = $testTable ( integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv ) + | Cancelled = false AND DistanceMiles > 0 + | | fields FlightNum, DistanceMiles, FlightTimeMin, OriginWeather, AvgTicketPrice, Carrier, FlightDelayType, timestamp + | | stats avg(AvgTicketPrice) as avg_price_by_weather by OriginWeather + | """.stripMargin) + + val results: Array[Row] = frame.collect() + // results.foreach(println(_)) + val expectedResults: Array[Row] = Array( + Row(768.1694081139743, "Clear"), + Row(826.1736096641965, "Cloudy"), + Row(600.4401843290168, "Rain"), + Row(512.8754006548804, "Sunny"), + Row(632.8519057524543, "Thunder & Lightning"), + Row(626.7242371194318, "Damaging Wind")) + implicit val oneColRowOrdering: Ordering[Row] = Ordering.by[Row, Double](_.getAs[Double](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + + val userSpecifiedSchema = new StructType() + .add("FlightNum", StringType) + .add("Origin", StringType) + .add("FlightDelay", BooleanType) + .add("DistanceMiles", DoubleType) + .add("FlightTimeMin", DoubleType) + .add("OriginWeather", StringType) + .add("dayOfWeek", IntegerType) + .add("AvgTicketPrice", DoubleType) + .add("Carrier", StringType) + .add("FlightDelayMin", IntegerType) + .add("OriginRegion", StringType) + .add("FlightDelayType", StringType) + .add("DestAirportID", StringType) + .add("Dest", StringType) + .add("FlightTimeHour", DoubleType) + .add("Cancelled", BooleanType) + .add("DistanceKilometers", DoubleType) + .add("OriginCityName", StringType) + .add("DestWeather", StringType) + .add("OriginCountry", StringType) + .add("DestCountry", StringType) + .add("DestRegion", StringType) + .add("OriginAirportID", StringType) + .add("DestCityName", StringType) + .add("timestamp", TimestampType) + val dataSource = + DataSource( + spark, + userSpecifiedSchema = Some(userSpecifiedSchema), + className = "csv", + options = Map.empty) + val relation = LogicalRelation(dataSource.resolveRelation(true), isStreaming = false) + val filterExpr = And( + EqualTo(UnresolvedAttribute("Cancelled"), Literal(false)), + GreaterThan(UnresolvedAttribute("DistanceMiles"), Literal(0))) + val filter = Filter(filterExpr, relation) + val project = Project( + Seq( + UnresolvedAttribute("FlightNum"), + UnresolvedAttribute("DistanceMiles"), + UnresolvedAttribute("FlightTimeMin"), + UnresolvedAttribute("OriginWeather"), + UnresolvedAttribute("AvgTicketPrice"), + UnresolvedAttribute("Carrier"), + UnresolvedAttribute("FlightDelayType"), + UnresolvedAttribute("timestamp")), + filter) + val weatherAlias = Alias(UnresolvedAttribute("OriginWeather"), "OriginWeather")() + val groupByAttributes = Seq(weatherAlias) + val aggregateExpressions = + Alias( + UnresolvedFunction( + Seq("AVG"), + Seq(UnresolvedAttribute("AvgTicketPrice")), + isDistinct = false), + "avg_price_by_weather")() + val aggregatePlan = + Aggregate(groupByAttributes, Seq(aggregateExpressions, weatherAlias), project) + val expectedPlan = Project(Seq(UnresolvedStar(None)), aggregatePlan) + assert(compareByString(expectedPlan) === compareByString(logicalPlan)) + } + + test("test parquet file source relation") { + val frame = sql(s""" + | file = $testTable ( integ-test/src/integration/resources/users.parquet ) + | | fields name, favorite_color + | """.stripMargin) + + val results: Array[Row] = frame.collect() + // results.foreach(println(_)) + val expectedResults: Array[Row] = Array(Row("Alyssa", null), Row("Ben", "red")) + implicit val oneColRowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + + val userSpecifiedSchema = new StructType() + .add("name", StringType) + .add("favorite_color", StringType) + .add("favorite_numbers", new ArrayType(IntegerType, true)) + val dataSource = + DataSource( + spark, + userSpecifiedSchema = Some(userSpecifiedSchema), + className = "parquet", + options = Map.empty) + val relation = LogicalRelation(dataSource.resolveRelation(true), isStreaming = false) + val expectedPlan = + Project(Seq(UnresolvedAttribute("name"), UnresolvedAttribute("favorite_color")), relation) + assert(compareByString(expectedPlan) === compareByString(logicalPlan)) + } +} diff --git a/ppl-spark-integration/README.md b/ppl-spark-integration/README.md index 979cb712d..0e7cc32ce 100644 --- a/ppl-spark-integration/README.md +++ b/ppl-spark-integration/README.md @@ -236,6 +236,12 @@ See the next samples of PPL queries : - `source = catalog.table | where struct_col2.field1.subfield > 'valueA' | sort int_col | fields int_col, struct_col.field1.subfield, struct_col2.field1.subfield` - `source = catalog.schema.table | where struct_col2.field1.subfield > 'valueA' | sort int_col | fields int_col, struct_col.field1.subfield, struct_col2.field1.subfield` +**Load File Source** +- `file = table1 ( s3://my_bucket/path/csv ) | fields a,b,c` +- `file = table2 ( s3://my_bucket/path/csv.gz ) | fields a,b,c` +- `file = table3 ( s3a://my_bucket/path/parquet ) | fields a,b,c` +- `file = table4 ( s3a://my_bucket/path/csv ) a > 0 | fields a,b,c` + **Filters** - `source = table | where a = 1 | fields a,b,c` - `source = table | where a >= 1 | fields a,b,c` diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 index d202f5ff6..5c45d89b0 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 @@ -49,6 +49,7 @@ AS: 'AS'; BY: 'BY'; SOURCE: 'SOURCE'; INDEX: 'INDEX'; +FILE: 'FILE'; D: 'D'; DESC: 'DESC'; DATASOURCES: 'DATASOURCES'; @@ -396,6 +397,7 @@ ID: ID_LITERAL; CLUSTER: CLUSTER_PREFIX_LITERAL; INTEGER_LITERAL: DEC_DIGIT+; DECIMAL_LITERAL: (DEC_DIGIT+)? '.' DEC_DIGIT+; +FILE_URL: [a-zA-Z] [a-zA-Z0-9._:/\\-]*; fragment DATE_SUFFIX: ([\-.][*0-9]+)+; fragment ID_LITERAL: [@*A-Z]+?[*A-Z_\-0-9]*; diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index 3ce7cef7c..7fb72be88 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -185,12 +185,18 @@ mlArg fromClause : SOURCE EQUAL tableSourceClause | INDEX EQUAL tableSourceClause + | FILE EQUAL fileSourceClause ; tableSourceClause : tableSource (COMMA tableSource)* ; +// support only one path URL first +fileSourceClause + : tableName = ident LT_PRTHS url = FILE_URL RT_PRTHS + ; + renameClasue : orignalField = wcFieldExpression AS renamedField = wcFieldExpression ; diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index e3d0c6a2b..a84161667 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -44,6 +44,7 @@ import org.opensearch.sql.ast.tree.Kmeans; import org.opensearch.sql.ast.tree.Limit; import org.opensearch.sql.ast.tree.Parse; +import org.opensearch.sql.ast.tree.FileSourceRelation; import org.opensearch.sql.ast.tree.Project; import org.opensearch.sql.ast.tree.RareTopN; import org.opensearch.sql.ast.tree.Relation; @@ -215,10 +216,15 @@ public T visitHead(Head node, C context) { public T visitRareTopN(RareTopN node, C context) { return visitChildren(node, context); } + public T visitValues(Values node, C context) { return visitChildren(node, context); } + public T visitFileSourceRelation(FileSourceRelation node, C context) { + return visitChildren(node, context); + } + public T visitAlias(Alias node, C context) { return visitChildren(node, context); } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/FileSourceRelation.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/FileSourceRelation.java new file mode 100644 index 000000000..c8ca1441e --- /dev/null +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/FileSourceRelation.java @@ -0,0 +1,62 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import com.google.common.collect.ImmutableList; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.Node; + +import java.util.List; /** + * AST node class for a sequence of files (such as CSV files) to read. + */ +public class FileSourceRelation extends UnresolvedPlan { + private final String tableName; + private final String path; + private final String format; + private final String compressionCodeName; + + public FileSourceRelation(String tableName, String path, String format) { + this(tableName, path, format, null); + } + + public FileSourceRelation(String tableName, String path, String format, String compressionCodeName) { + this.tableName = tableName; + this.path = path; + this.format = format; + this.compressionCodeName = compressionCodeName; + } + + public String getTableName() { + return tableName; + } + + public String getPath() { + return path; + } + + public String getFormat() { + return format; + } + + public String getCompressionCodeName() { + return compressionCodeName; + } + + @Override + public UnresolvedPlan attach(UnresolvedPlan child) { + throw new UnsupportedOperationException("PathsReader node is supposed to have no child node"); + } + + @Override + public T accept(AbstractNodeVisitor nodeVisitor, C context) { + return nodeVisitor.visitFileSourceRelation(this, context); + } + + @Override + public List getChild() { + return ImmutableList.of(); + } +} diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystPlanContext.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystPlanContext.java index 42f666236..7113547d2 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystPlanContext.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystPlanContext.java @@ -11,6 +11,7 @@ import org.apache.spark.sql.catalyst.expressions.NamedExpression; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; import org.apache.spark.sql.catalyst.plans.logical.Union; +import org.apache.spark.sql.execution.datasources.LogicalRelation; import org.apache.spark.sql.types.Metadata; import org.opensearch.sql.ast.expression.UnresolvedExpression; import org.opensearch.sql.data.type.ExprType; @@ -116,7 +117,7 @@ public LogicalPlan define(Expression symbol) { } /** - * append relation to relations list + * append unresolved relation to relations list * * @param relation * @return @@ -126,6 +127,17 @@ public LogicalPlan withRelation(UnresolvedRelation relation) { return with(relation); } + /** + * append logical relation to relations list + * + * @param relation + * @return + */ + public LogicalPlan withRelation(LogicalRelation relation) { + this.relations.add(relation); + return with(relation); + } + /** * append projected fields * diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 46453c8a6..3166d88ec 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -5,6 +5,7 @@ package org.opensearch.sql.ppl; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$; import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; @@ -21,6 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Limit; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; import org.apache.spark.sql.execution.command.DescribeTableCommand; +import org.apache.spark.sql.execution.datasources.LogicalRelation; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.opensearch.sql.ast.AbstractNodeVisitor; @@ -59,6 +61,7 @@ import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Kmeans; import org.opensearch.sql.ast.tree.Parse; +import org.opensearch.sql.ast.tree.FileSourceRelation; import org.opensearch.sql.ast.tree.Project; import org.opensearch.sql.ast.tree.RareAggregation; import org.opensearch.sql.ast.tree.RareTopN; @@ -75,7 +78,9 @@ import scala.collection.Seq; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.function.BiFunction; @@ -98,10 +103,15 @@ * Utility class to traverse PPL logical plan and translate it into catalyst logical plan */ public class CatalystQueryPlanVisitor extends AbstractNodeVisitor { - + private SparkSession sparkSession; private final ExpressionAnalyzer expressionAnalyzer; public CatalystQueryPlanVisitor() { + this(null); + } + + public CatalystQueryPlanVisitor(SparkSession spark) { + this.sparkSession = spark; this.expressionAnalyzer = new ExpressionAnalyzer(); } @@ -151,6 +161,20 @@ public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) { return context.getPlan(); } + @Override + public LogicalPlan visitFileSourceRelation(FileSourceRelation node, CatalystPlanContext context) { + // We have to use dataframe API instead of catalyst plan to handle file source relation + // since we can not deal with file based information (path, schema, format) by UnresolvedRelation. + Map options = new HashMap<>(); + options.put("header", "true"); + options.put("inferSchema", "true"); + if (node.getCompressionCodeName() != null) { + options.put("compression", node.getCompressionCodeName()); + } + var df = sparkSession.read().format(node.getFormat()).options(options).load(node.getPath()); + return context.withRelation((LogicalRelation) df.queryExecution().logical()); + } + @Override public LogicalPlan visitFilter(Filter node, CatalystPlanContext context) { node.getChild().get(0).accept(this, context); diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index fdb11c342..51b38f013 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -36,9 +36,9 @@ import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Kmeans; import org.opensearch.sql.ast.tree.Parse; +import org.opensearch.sql.ast.tree.FileSourceRelation; import org.opensearch.sql.ast.tree.Project; import org.opensearch.sql.ast.tree.RareAggregation; -import org.opensearch.sql.ast.tree.RareTopN; import org.opensearch.sql.ast.tree.Relation; import org.opensearch.sql.ast.tree.Rename; import org.opensearch.sql.ast.tree.Sort; @@ -48,8 +48,10 @@ import org.opensearch.sql.ppl.utils.ArgumentFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; @@ -356,7 +358,46 @@ public UnresolvedPlan visitRareCommand(OpenSearchPPLParser.RareCommandContext ct /** From clause. */ @Override public UnresolvedPlan visitFromClause(OpenSearchPPLParser.FromClauseContext ctx) { - return visitTableSourceClause(ctx.tableSourceClause()); + if (ctx.FILE() != null) { + return visitFileSourceClause(ctx.fileSourceClause()); + } else { + return visitTableSourceClause(ctx.tableSourceClause()); + } + } + + @Override + public UnresolvedPlan visitFileSourceClause(OpenSearchPPLParser.FileSourceClauseContext ctx) { + String tableName = ctx.tableName.getText().toLowerCase(Locale.ROOT); + String path = ctx.url.getText().toLowerCase(Locale.ROOT); + // S3 path supported only + if (!isTesting() && (!path.startsWith("s3://") || !path.startsWith("s3a://"))) { + throw new UnsupportedOperationException("Path should start with 's3://' or 's3a://'"); + } + String suffix = path.substring(path.lastIndexOf('.') + 1); + // if the suffix is compression codec, the format is second last suffix + if (Arrays.asList("gz", "lzo", "snappy", "zstd").contains(suffix)) { + int lastDotIndex = path.lastIndexOf('.'); + int secondLastDotIndex = path.lastIndexOf('.', lastDotIndex - 1); + String format = path.substring(secondLastDotIndex + 1, lastDotIndex); + if (Arrays.asList("csv", "parquet", "avro", "orc").contains(format)) { + return new FileSourceRelation(tableName, path, format, suffix.equals("gz") ? "gzip" : suffix); + } else { + throw new UnsupportedOperationException("Unsupported file suffix: " + suffix + + ", the supported formats are 'csv', 'parquet', 'avro', 'orc'" + + ", the supported compression codecs are 'gz', 'lzo', 'snappy', 'zstd'"); + } + } + if (Arrays.asList("csv", "parquet", "avro", "orc").contains(suffix)) { + return new FileSourceRelation(tableName, path, suffix); + } else { + throw new UnsupportedOperationException("Unsupported file suffix: " + suffix + + ", the supported formats are 'csv', 'parquet', 'avro', 'orc'" + + ", the supported compression codecs are 'gz', 'lzo', 'snappy', 'zstd'"); + } + } + + private boolean isTesting() { + return System.getenv("SPARK_TESTING") != null || System.getProperty("spark.testing") != null; } @Override diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java index 33cb5611d..a176dfe5c 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java @@ -15,7 +15,7 @@ public interface RelationUtils { * * @param relations * @param node - * @param contextRelations + * @param tables * @return */ static Optional resolveField(List relations, QualifiedName node, List tables) { diff --git a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala index 26ad4b69b..c2485ab3a 100644 --- a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala +++ b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala @@ -16,7 +16,7 @@ class FlintPPLSparkExtensions extends (SparkSessionExtensions => Unit) { override def apply(extensions: SparkSessionExtensions): Unit = { extensions.injectParser { (spark, parser) => - new FlintSparkPPLParser(parser) + new FlintSparkPPLParser(parser, spark) } } } diff --git a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLParser.scala b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLParser.scala index 51618d487..e2b482b21 100644 --- a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLParser.scala +++ b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLParser.scala @@ -31,6 +31,7 @@ import org.opensearch.flint.spark.ppl.PlaneUtils.plan import org.opensearch.sql.common.antlr.SyntaxCheckException import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser._ @@ -44,10 +45,11 @@ import org.apache.spark.sql.types.{DataType, StructType} * @param sparkParser * Spark SQL parser */ -class FlintSparkPPLParser(sparkParser: ParserInterface) extends ParserInterface { +class FlintSparkPPLParser(sparkParser: ParserInterface, spark: SparkSession) + extends ParserInterface { /** OpenSearch (PPL) AST builder. */ - private val planTransformer = new CatalystQueryPlanVisitor() + private val planTransformer = new CatalystQueryPlanVisitor(spark) private val pplParser = new PPLSyntaxParser() diff --git a/ppl-spark-integration/src/test/resources/people.csv b/ppl-spark-integration/src/test/resources/people.csv new file mode 100644 index 000000000..4d9b27bf9 --- /dev/null +++ b/ppl-spark-integration/src/test/resources/people.csv @@ -0,0 +1,3 @@ +name,age,job +Jorge,30,Developer +Bob,32,Developer diff --git a/ppl-spark-integration/src/test/resources/people.csv.gz b/ppl-spark-integration/src/test/resources/people.csv.gz new file mode 100644 index 0000000000000000000000000000000000000000..f3cfa84667502d0f8ba4437cc19b85ad54b69722 GIT binary patch literal 69 zcmV-L0J{GliwFoyx9Vm918`+;aBO8RV{>)@%S+5n)k#cG)yc|F;_}Kb0 literal 0 HcmV?d00001 diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/LogicalPlanTestUtils.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/LogicalPlanTestUtils.scala index 0c116a728..31face92e 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/LogicalPlanTestUtils.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/LogicalPlanTestUtils.scala @@ -5,8 +5,9 @@ package org.opensearch.flint.spark.ppl -import org.apache.spark.sql.catalyst.expressions.{Alias, ExprId} +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} +import org.apache.spark.sql.execution.datasources.LogicalRelation /** * general utility functions for ppl to spark transformation test @@ -43,6 +44,15 @@ trait LogicalPlanTestUtils { } agg.copy(groupingExpressions = newGrouping, aggregateExpressions = newAggregations) + case l @ LogicalRelation(_, output, _, _) => + // Because the exprIds of Output attributes in LogicalRelation cannot be normalized + // by PlanTest.normalizePlan(). We normalize it manually. + val newOutput = output.map { a: AttributeReference => + AttributeReference(a.name, a.dataType, a.nullable, a.metadata)( + exprId = ExprId(0), + qualifier = a.qualifier) + } + l.copy(output = newOutput) case other => other } diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFileSourceRelationTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFileSourceRelationTranslatorTestSuite.scala new file mode 100644 index 000000000..39c4d14e9 --- /dev/null +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFileSourceRelationTranslatorTestSuite.scala @@ -0,0 +1,102 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.ppl + +import org.opensearch.flint.spark.ppl.PlaneUtils.plan +import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.UnresolvedStar +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation} +import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructType} + +class PPLLogicalPlanFileSourceRelationTranslatorTestSuite + extends SparkFunSuite + with PlanTest + with LogicalPlanTestUtils + with Matchers { + + private val spark = SparkSession.builder().master("local").getOrCreate() + private val planTransformer = new CatalystQueryPlanVisitor(spark) + private val pplParser = new PPLSyntaxParser() + + test("test csv file source relation") { + val context = new CatalystPlanContext + val scanPlan = planTransformer.visit( + plan(pplParser, "file=test1(ppl-spark-integration/src/test/resources/people.csv)", false), + context) + + val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None)) + val userSpecifiedSchema = new StructType() + .add("name", StringType) + .add("age", IntegerType) + .add("job", StringType) + val dataSource = + DataSource( + spark, + userSpecifiedSchema = Some(userSpecifiedSchema), + className = "csv", + options = Map.empty) + val relation = LogicalRelation(dataSource.resolveRelation(true), isStreaming = false) + val expectedPlan = Project(projectList, relation) + assert(compareByString(expectedPlan) === compareByString(scanPlan)) + } + + test("test gz compressed csv file source relation") { + val context = new CatalystPlanContext + val scanPlan = planTransformer.visit( + plan( + pplParser, + "file=test1(ppl-spark-integration/src/test/resources/people.csv.gz)", + false), + context) + + val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None)) + val userSpecifiedSchema = new StructType() + .add("name", StringType) + .add("age", IntegerType) + .add("job", StringType) + val dataSource = + DataSource( + spark, + userSpecifiedSchema = Some(userSpecifiedSchema), + className = "csv", + options = Map.empty) + val relation = LogicalRelation(dataSource.resolveRelation(true), isStreaming = false) + val expectedPlan = Project(projectList, relation) + assert(compareByString(expectedPlan) === compareByString(scanPlan)) + } + + test("test unsupported compression") { + val context = new CatalystPlanContext + val thrown = intercept[UnsupportedOperationException] { + planTransformer.visit( + plan( + pplParser, + "file=test1(ppl-spark-integration/src/test/resources/head.csv.brotli)", + false), + context) + } + + assert(thrown.getMessage.startsWith("Unsupported file suffix: brotli")) + } + + test("test unsupported file format") { + val context = new CatalystPlanContext + val thrown = intercept[UnsupportedOperationException] { + planTransformer.visit( + plan(pplParser, "file=test1(ppl-spark-integration/src/test/resources/head.txt)", false), + context) + } + + assert(thrown.getMessage.startsWith("Unsupported file suffix: txt")) + } +} From 85806fb9bec0ea85bf467d6010c236eace6faf63 Mon Sep 17 00:00:00 2001 From: Lantao Jin Date: Wed, 18 Sep 2024 23:25:41 +0800 Subject: [PATCH 2/3] fix bug Signed-off-by: Lantao Jin --- .../ppl/FlintSparkPPLFileSourceRelationITSuite.scala | 8 ++++---- ppl-spark-integration/README.md | 9 +++++---- .../src/main/antlr4/OpenSearchPPLLexer.g4 | 1 - .../src/main/antlr4/OpenSearchPPLParser.g4 | 2 +- .../java/org/opensearch/sql/ppl/parser/AstBuilder.java | 2 +- ...ogicalPlanFileSourceRelationTranslatorTestSuite.scala | 8 ++++---- 6 files changed, 15 insertions(+), 15 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFileSourceRelationITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFileSourceRelationITSuite.scala index b990d79de..7a8530d09 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFileSourceRelationITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFileSourceRelationITSuite.scala @@ -37,7 +37,7 @@ class FlintSparkPPLFileSourceRelationITSuite test("test csv file source relation") { val frame = sql(s""" - | file = $testTable ( integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv ) + | file = $testTable ( "integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv" ) | | where Cancelled = false AND DistanceMiles > 0 | | fields FlightNum, DistanceMiles, FlightTimeMin, OriginWeather, AvgTicketPrice, Carrier, FlightDelayType, timestamp | | stats avg(AvgTicketPrice) as avg_price_by_weather by OriginWeather @@ -122,7 +122,7 @@ class FlintSparkPPLFileSourceRelationITSuite test("test csv file source relation with compression codec") { val frame = sql(s""" - | file = $testTable ( integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv.gz ) + | file = $testTable ( "integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv.gz" ) | | where Cancelled = false AND DistanceMiles > 0 | | fields FlightNum, DistanceMiles, FlightTimeMin, OriginWeather, AvgTicketPrice, Carrier, FlightDelayType, timestamp | | stats avg(AvgTicketPrice) as avg_price_by_weather by OriginWeather @@ -207,7 +207,7 @@ class FlintSparkPPLFileSourceRelationITSuite test("test csv file source relation with filter") { val frame = sql(s""" - | file = $testTable ( integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv ) + | file = $testTable ( "integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv" ) | Cancelled = false AND DistanceMiles > 0 | | fields FlightNum, DistanceMiles, FlightTimeMin, OriginWeather, AvgTicketPrice, Carrier, FlightDelayType, timestamp | | stats avg(AvgTicketPrice) as avg_price_by_weather by OriginWeather @@ -292,7 +292,7 @@ class FlintSparkPPLFileSourceRelationITSuite test("test parquet file source relation") { val frame = sql(s""" - | file = $testTable ( integ-test/src/integration/resources/users.parquet ) + | file = $testTable ( "integ-test/src/integration/resources/users.parquet" ) | | fields name, favorite_color | """.stripMargin) diff --git a/ppl-spark-integration/README.md b/ppl-spark-integration/README.md index 3b01203bf..dfe1784c9 100644 --- a/ppl-spark-integration/README.md +++ b/ppl-spark-integration/README.md @@ -237,10 +237,11 @@ See the next samples of PPL queries : - `source = catalog.schema.table | where struct_col2.field1.subfield > 'valueA' | sort int_col | fields int_col, struct_col.field1.subfield, struct_col2.field1.subfield` **Load File Source** -- `file = table1 ( s3://my_bucket/path/csv ) | fields a,b,c` -- `file = table2 ( s3://my_bucket/path/csv.gz ) | fields a,b,c` -- `file = table3 ( s3a://my_bucket/path/parquet ) | fields a,b,c` -- `file = table4 ( s3a://my_bucket/path/csv ) a > 0 | fields a,b,c` + +- `file = table1 ( "s3://my-bucket/path/file.csv" ) | fields a,b,c` +- `file = table2 ( "s3://my-bucket/path/file.csv.gz" ) | fields a,b,c` +- `file = table3 ( "s3a://my-bucket/path/file.parquet" ) | fields a,b,c` +- `file = table4 ( "s3a://my-bucket/path/file.csv" ) a > 0 | fields a,b,c` **Filters** - `source = table | where a = 1 | fields a,b,c` diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 index 420745b4a..14b8310b4 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 @@ -401,7 +401,6 @@ ID: ID_LITERAL; CLUSTER: CLUSTER_PREFIX_LITERAL; INTEGER_LITERAL: DEC_DIGIT+; DECIMAL_LITERAL: (DEC_DIGIT+)? '.' DEC_DIGIT+; -FILE_URL: [a-zA-Z] [a-zA-Z0-9._:/\\-]*; fragment DATE_SUFFIX: ([\-.][*0-9]+)+; fragment ID_LITERAL: [@*A-Z]+?[*A-Z_\-0-9]*; diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index 05c89ae9a..b9711cc39 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -194,7 +194,7 @@ tableSourceClause // support only one path URL first fileSourceClause - : tableName = ident LT_PRTHS url = FILE_URL RT_PRTHS + : tableName = ident LT_PRTHS url = stringLiteral RT_PRTHS ; renameClasue diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 51b38f013..29d40596c 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -368,7 +368,7 @@ public UnresolvedPlan visitFromClause(OpenSearchPPLParser.FromClauseContext ctx) @Override public UnresolvedPlan visitFileSourceClause(OpenSearchPPLParser.FileSourceClauseContext ctx) { String tableName = ctx.tableName.getText().toLowerCase(Locale.ROOT); - String path = ctx.url.getText().toLowerCase(Locale.ROOT); + String path = ((Literal) internalVisitExpression(ctx.url)).toString().toLowerCase(Locale.ROOT); // S3 path supported only if (!isTesting() && (!path.startsWith("s3://") || !path.startsWith("s3a://"))) { throw new UnsupportedOperationException("Path should start with 's3://' or 's3a://'"); diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFileSourceRelationTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFileSourceRelationTranslatorTestSuite.scala index 39c4d14e9..fb3604cab 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFileSourceRelationTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFileSourceRelationTranslatorTestSuite.scala @@ -31,7 +31,7 @@ class PPLLogicalPlanFileSourceRelationTranslatorTestSuite test("test csv file source relation") { val context = new CatalystPlanContext val scanPlan = planTransformer.visit( - plan(pplParser, "file=test1(ppl-spark-integration/src/test/resources/people.csv)", false), + plan(pplParser, "file=test1('ppl-spark-integration/src/test/resources/people.csv')", false), context) val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None)) @@ -55,7 +55,7 @@ class PPLLogicalPlanFileSourceRelationTranslatorTestSuite val scanPlan = planTransformer.visit( plan( pplParser, - "file=test1(ppl-spark-integration/src/test/resources/people.csv.gz)", + "file=test1('ppl-spark-integration/src/test/resources/people.csv.gz')", false), context) @@ -81,7 +81,7 @@ class PPLLogicalPlanFileSourceRelationTranslatorTestSuite planTransformer.visit( plan( pplParser, - "file=test1(ppl-spark-integration/src/test/resources/head.csv.brotli)", + "file=test1('ppl-spark-integration/src/test/resources/head.csv.brotli')", false), context) } @@ -93,7 +93,7 @@ class PPLLogicalPlanFileSourceRelationTranslatorTestSuite val context = new CatalystPlanContext val thrown = intercept[UnsupportedOperationException] { planTransformer.visit( - plan(pplParser, "file=test1(ppl-spark-integration/src/test/resources/head.txt)", false), + plan(pplParser, "file=test1('ppl-spark-integration/src/test/resources/head.txt')", false), context) } From b4132704b3aef31f345accfa0158ef22ae30918b Mon Sep 17 00:00:00 2001 From: Lantao Jin Date: Wed, 18 Sep 2024 23:55:04 +0800 Subject: [PATCH 3/3] remove parentheses Signed-off-by: Lantao Jin --- ...intSparkPPLFileSourceRelationITSuite.scala | 8 +++--- ppl-spark-integration/README.md | 8 +++--- .../src/main/antlr4/OpenSearchPPLParser.g4 | 2 +- .../sql/ast/tree/FileSourceRelation.java | 26 ++++++++----------- .../opensearch/sql/ppl/parser/AstBuilder.java | 2 +- ...ileSourceRelationTranslatorTestSuite.scala | 8 +++--- 6 files changed, 25 insertions(+), 29 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFileSourceRelationITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFileSourceRelationITSuite.scala index 7a8530d09..f19c78ef3 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFileSourceRelationITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFileSourceRelationITSuite.scala @@ -37,7 +37,7 @@ class FlintSparkPPLFileSourceRelationITSuite test("test csv file source relation") { val frame = sql(s""" - | file = $testTable ( "integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv" ) + | file = $testTable "integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv" | | where Cancelled = false AND DistanceMiles > 0 | | fields FlightNum, DistanceMiles, FlightTimeMin, OriginWeather, AvgTicketPrice, Carrier, FlightDelayType, timestamp | | stats avg(AvgTicketPrice) as avg_price_by_weather by OriginWeather @@ -122,7 +122,7 @@ class FlintSparkPPLFileSourceRelationITSuite test("test csv file source relation with compression codec") { val frame = sql(s""" - | file = $testTable ( "integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv.gz" ) + | file = $testTable "integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv.gz" | | where Cancelled = false AND DistanceMiles > 0 | | fields FlightNum, DistanceMiles, FlightTimeMin, OriginWeather, AvgTicketPrice, Carrier, FlightDelayType, timestamp | | stats avg(AvgTicketPrice) as avg_price_by_weather by OriginWeather @@ -207,7 +207,7 @@ class FlintSparkPPLFileSourceRelationITSuite test("test csv file source relation with filter") { val frame = sql(s""" - | file = $testTable ( "integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv" ) + | file = $testTable "integ-test/src/integration/resources/opensearch_dashboards_sample_data_flights.csv" | Cancelled = false AND DistanceMiles > 0 | | fields FlightNum, DistanceMiles, FlightTimeMin, OriginWeather, AvgTicketPrice, Carrier, FlightDelayType, timestamp | | stats avg(AvgTicketPrice) as avg_price_by_weather by OriginWeather @@ -292,7 +292,7 @@ class FlintSparkPPLFileSourceRelationITSuite test("test parquet file source relation") { val frame = sql(s""" - | file = $testTable ( "integ-test/src/integration/resources/users.parquet" ) + | file = $testTable "integ-test/src/integration/resources/users.parquet" | | fields name, favorite_color | """.stripMargin) diff --git a/ppl-spark-integration/README.md b/ppl-spark-integration/README.md index dfe1784c9..eaef98184 100644 --- a/ppl-spark-integration/README.md +++ b/ppl-spark-integration/README.md @@ -238,10 +238,10 @@ See the next samples of PPL queries : **Load File Source** -- `file = table1 ( "s3://my-bucket/path/file.csv" ) | fields a,b,c` -- `file = table2 ( "s3://my-bucket/path/file.csv.gz" ) | fields a,b,c` -- `file = table3 ( "s3a://my-bucket/path/file.parquet" ) | fields a,b,c` -- `file = table4 ( "s3a://my-bucket/path/file.csv" ) a > 0 | fields a,b,c` +- `file = table1 "s3://my-bucket/path/file.csv" | fields a,b,c` +- `file = table2 "s3://my-bucket/path/file.csv.gz" | fields a,b,c` +- `file = table3 "s3a://my-bucket/path/file.parquet" | fields a,b,c` +- `file = table4 "s3a://my-bucket/path/file.csv" a > 0 | fields a,b,c` **Filters** - `source = table | where a = 1 | fields a,b,c` diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index b9711cc39..4dd0734a4 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -194,7 +194,7 @@ tableSourceClause // support only one path URL first fileSourceClause - : tableName = ident LT_PRTHS url = stringLiteral RT_PRTHS + : tableName = ident url = stringLiteral ; renameClasue diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/FileSourceRelation.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/FileSourceRelation.java index c8ca1441e..2f2c3efe6 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/FileSourceRelation.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/FileSourceRelation.java @@ -8,31 +8,29 @@ import com.google.common.collect.ImmutableList; import org.opensearch.sql.ast.AbstractNodeVisitor; import org.opensearch.sql.ast.Node; +import org.opensearch.sql.ast.expression.UnresolvedExpression; -import java.util.List; /** +import java.util.List; + +/** * AST node class for a sequence of files (such as CSV files) to read. */ -public class FileSourceRelation extends UnresolvedPlan { - private final String tableName; +public class FileSourceRelation extends Relation { private final String path; private final String format; private final String compressionCodeName; - public FileSourceRelation(String tableName, String path, String format) { + public FileSourceRelation(UnresolvedExpression tableName, String path, String format) { this(tableName, path, format, null); } - public FileSourceRelation(String tableName, String path, String format, String compressionCodeName) { - this.tableName = tableName; + public FileSourceRelation(UnresolvedExpression tableName, String path, String format, String compressionCodeName) { + super(tableName); this.path = path; this.format = format; this.compressionCodeName = compressionCodeName; } - public String getTableName() { - return tableName; - } - public String getPath() { return path; } @@ -41,6 +39,9 @@ public String getFormat() { return format; } + /** + * Return the compression code name of the relation, could be null + */ public String getCompressionCodeName() { return compressionCodeName; } @@ -54,9 +55,4 @@ public UnresolvedPlan attach(UnresolvedPlan child) { public T accept(AbstractNodeVisitor nodeVisitor, C context) { return nodeVisitor.visitFileSourceRelation(this, context); } - - @Override - public List getChild() { - return ImmutableList.of(); - } } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 29d40596c..83deb1abe 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -367,7 +367,7 @@ public UnresolvedPlan visitFromClause(OpenSearchPPLParser.FromClauseContext ctx) @Override public UnresolvedPlan visitFileSourceClause(OpenSearchPPLParser.FileSourceClauseContext ctx) { - String tableName = ctx.tableName.getText().toLowerCase(Locale.ROOT); + UnresolvedExpression tableName = internalVisitExpression(ctx.tableName); String path = ((Literal) internalVisitExpression(ctx.url)).toString().toLowerCase(Locale.ROOT); // S3 path supported only if (!isTesting() && (!path.startsWith("s3://") || !path.startsWith("s3a://"))) { diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFileSourceRelationTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFileSourceRelationTranslatorTestSuite.scala index fb3604cab..24387f234 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFileSourceRelationTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFileSourceRelationTranslatorTestSuite.scala @@ -31,7 +31,7 @@ class PPLLogicalPlanFileSourceRelationTranslatorTestSuite test("test csv file source relation") { val context = new CatalystPlanContext val scanPlan = planTransformer.visit( - plan(pplParser, "file=test1('ppl-spark-integration/src/test/resources/people.csv')", false), + plan(pplParser, "file=test1 'ppl-spark-integration/src/test/resources/people.csv'", false), context) val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None)) @@ -55,7 +55,7 @@ class PPLLogicalPlanFileSourceRelationTranslatorTestSuite val scanPlan = planTransformer.visit( plan( pplParser, - "file=test1('ppl-spark-integration/src/test/resources/people.csv.gz')", + "file=test1 'ppl-spark-integration/src/test/resources/people.csv.gz'", false), context) @@ -81,7 +81,7 @@ class PPLLogicalPlanFileSourceRelationTranslatorTestSuite planTransformer.visit( plan( pplParser, - "file=test1('ppl-spark-integration/src/test/resources/head.csv.brotli')", + "file=test1 'ppl-spark-integration/src/test/resources/head.csv.brotli'", false), context) } @@ -93,7 +93,7 @@ class PPLLogicalPlanFileSourceRelationTranslatorTestSuite val context = new CatalystPlanContext val thrown = intercept[UnsupportedOperationException] { planTransformer.visit( - plan(pplParser, "file=test1('ppl-spark-integration/src/test/resources/head.txt')", false), + plan(pplParser, "file=test1 'ppl-spark-integration/src/test/resources/head.txt'", false), context) }