From deb5fe8ee0dd6ba2291db8c3ccb36a4d15f087be Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Tue, 14 Nov 2023 13:08:38 -0800 Subject: [PATCH] Delay index scan collect to query execution time Signed-off-by: Sean Kao --- build.sbt | 2 +- .../lib/AWSLogsDALConnectorSpark-1.0.jar | Bin 2193 -> 0 bytes .../lib/LogsConnectorSpark-1.0.jar | Bin 0 -> 2049 bytes .../ApplyFlintSparkSkippingIndex.scala | 22 ++++++++---------- 4 files changed, 11 insertions(+), 13 deletions(-) delete mode 100644 flint-spark-integration/lib/AWSLogsDALConnectorSpark-1.0.jar create mode 100644 flint-spark-integration/lib/LogsConnectorSpark-1.0.jar diff --git a/build.sbt b/build.sbt index 4cb825f1a..938f19a64 100644 --- a/build.sbt +++ b/build.sbt @@ -141,7 +141,7 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration")) }, assembly / assemblyExcludedJars := { val cp = (assembly / fullClasspath).value - cp filter { file => file.data.getName.contains("AWSLogsDALConnectorSpark")} + cp filter { file => file.data.getName.contains("LogsConnectorSpark")} }, assembly / test := (Test / test).value) diff --git a/flint-spark-integration/lib/AWSLogsDALConnectorSpark-1.0.jar b/flint-spark-integration/lib/AWSLogsDALConnectorSpark-1.0.jar deleted file mode 100644 index 30c3fd3c40a7c71458090c4751bea6b8715f78c3..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2193 zcmWIWW@h1HVBp|j;ONQ?X8-~w5CH_7K2Rrz^%l`5esO)M|Y$xkm%Ni0cBOioTMF3w4;OfAA|f-X@eBJlUL&dg~6=R#Kh!?0NeC44)d>l>RTK(Eygp?aDVgv z3+Zx3i3x1?DjrOfXSoxr-S^h2SB*#i(%rki3*YT5fB*02cm4g0HxA5e6uBg2WZ9EA zC-IeF{VCn))x6pJzj@pcJX~?}KC-y()9i890 z?mowBn@4xHxk~KXqs(;rn{EG}AC4KVx^gz#w3d8nxE|xU&iQwMiRIy>qx@20`Grx( zvmOdx@3PVrjJm3kar+|kByXcXE_{<`BwhFu_T)$034_SM6}Kloj*9;fcJhG9x~v`3 z4r|XyG;rOU+qq1>WQ|%s-`fkT3V#=#=2v=sB~P29qP?c~^M*$)Q@%MpOiZ286SPr( zQb=&ay5|#)XY;Tv-T!z;(ssYPWp0`GP9%mo&Cg-om*R1yT;}05=jL;Hr?~c<{IFrd zgzkk)eqWq4?XhW_X&1ZHv(DGSd-L~fxI4+E<5!(GOU$Rr_tnuFNxWxHo{LD>VJj4{ ze#10h)#Z2AW?hf`vGqYI+um0uHga2;wZ6=C)Ofmi!?*W2(%Xcz34S^?2TMl39zLXXV!>ccuL*=MkP) z5w>+_`S!OvN(Bu}&Lu=P-gvg~##g6@t$t5zMDI>mWw*H3kKNUw`QR&V7me%sLLKim z9G_jXeBIm)#ueI6b{nYbD`Z#v455b)Hw_ zEYq@!Ub4B|~IDeqZNJ)3qYyG*pY;;UTT-uCctjmxzfU*)`F7y2AGxTk)a|AS(!YL98Cox*SJ zXZviAMeo;^?M#Y!JDbbVJj%@NK!B5_-(#EKR|UVt%E({5mzHFot^f72p`GW9^T$rq zY%4I|HoYQ9f1|$kZ@t4e<6Ji#`D!-TxTdR`b7D@fOOa+_YMI-u_YXF=^t_vU@2Rrz^%l`5esO)M|Y$xkm%Ni0cBOioTMF3w4;OfAA|f-X@eB4N?}3Mf=km*SM7Ib?p$#(!gQn8qWr^UI^DPLmZobzdjG-v$2(@l zi!S$TKd4Kd_nFSLbi%S@)8EZBem>{@?DuDu?*CWwoAH5|*rg7|c`i!L$}f^7A|A`{ zcv-ML%`QGtIPAoQ@J+GSF)1C-xlgkN8BFp>FA$MzoAmUmZ`||wHrJoLzVtC`(<$e3 zy51fyKky39nD%^E$Z_r^nzeF!j8BVvnY7-b$;&%C^or%-kel3DYTqq;7aRP%XLy)5 z-ea>68?Sba=eMS&_vaP58QYdTZ{POZt1TyTb6(4~WV@&P`c9=5FsV!S?cFN-@y6P& ze$npTJkPC^@2X!5YOyLcXjWvM@Riwrt#gR{twSqMnVjX-p6e!Y@7eB@J8o|azHEDN zs7NT?W6s=pdx}yIf8VPha`pK>jvYb|4!V11s9ur?T`QBk4lAn|1PjjvpT=;!?n_)q?4<;uKL{; zJumfDRM6$OO87UOcRHUBggrhnlOsGg>tE8(_-_ge`QGXJ`F1nZeZTftZ|ker74sf& zo#yGUy1V7=?MV+m|G&EIx~SZF@mnX1?qq#0-8+*#Y-i1m@4psUf8cfAuYK+6kGR8S ztCSK}$SSnA#VqQxD_dHyRQ8s94fA5mnfwp%}yB?V`d9QGr z@M-O2>*WFk^WHs?coF5!WcgL?;_7L^@0oudJJ}cL;b{SDcdJl+l)E-N{^W7V%M`5S{}{1knrTDT1tl0$Bttr zscDA77k8|RW?Hf@`O8*}Krhu{RxD{;PM!s5Wqg1)D;vm64j@bb K*7Dt~ARYh^@6faW literal 0 HcmV?d00001 diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala index 5a2f97038..83f8def4d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala @@ -67,36 +67,34 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] relation @ DataSourceV2Relation(table, _, Some(catalog), Some(identifier), _)) if hasNoDisjunction(condition) && // Check if query plan already rewritten - table.isInstanceOf[LogsTable] && !table.asInstanceOf[LogsTable].hasLogFileIds() => + table.isInstanceOf[LogsTable] && !table.asInstanceOf[LogsTable].hasFileIndexScan() => val index = flint.describeIndex(getIndexName(catalog, identifier)) if (index.exists(_.kind == SKIPPING_INDEX_TYPE)) { val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex] val indexFilter = rewriteToIndexFilter(skippingIndex, condition) /* - * Replace original LogsTable with a new one with log file idx: + * Replace original LogsTable with a new one with file index scan: * Filter(a=b) * |- DataSourceV2Relation(A) - * |- LogsTable <== replaced with a new LogsTable with log file ids + * |- LogsTable <== replaced with a new LogsTable with file index scan */ if (indexFilter.isDefined) { val indexScan = flint.queryIndex(skippingIndex.name()) - val selectedFiles = + val selectFileIndexScan = // Non hybrid scan // TODO: refactor common logic with file-based skipping index indexScan .filter(new Column(indexFilter.get)) .select(FILE_PATH_COLUMN) - .collect - .map(_.getString(0)) - // Construct LogsTable with selectedFiles as its log file ids - // It will build scan operator using these log file ids + // Construct LogsTable with file index scan + // It will build scan operator using log file ids collected from file index scan val logsTable = table.asInstanceOf[LogsTable] val newTable = new LogsTable( - logsTable.getSchema(), - logsTable.getOptions(), - selectedFiles, - logsTable.getProcessedFields()) + logsTable.schema(), + logsTable.options(), + selectFileIndexScan, + logsTable.processedFields()) filter.copy(child = relation.copy(table = newTable)) } else { filter