From 2713f084569f4c720ec4b954f154accb9d560938 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Mon, 16 Oct 2023 15:19:34 -0700 Subject: [PATCH] query rewrite for nexus skipping index Signed-off-by: Sean Kao --- build.sbt | 4 ++ .../lib/AWSLogsDALConnectorSpark-1.0.jar | Bin 0 -> 2193 bytes .../ApplyFlintSparkSkippingIndex.scala | 54 +++++++++++++++++- 3 files changed, 57 insertions(+), 1 deletion(-) create mode 100644 flint-spark-integration/lib/AWSLogsDALConnectorSpark-1.0.jar diff --git a/build.sbt b/build.sbt index 0dcfb8af7..4cb825f1a 100644 --- a/build.sbt +++ b/build.sbt @@ -139,6 +139,10 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration")) val oldStrategy = (assembly / assemblyMergeStrategy).value oldStrategy(x) }, + assembly / assemblyExcludedJars := { + val cp = (assembly / fullClasspath).value + cp filter { file => file.data.getName.contains("AWSLogsDALConnectorSpark")} + }, assembly / test := (Test / test).value) // Test assembly package with integration test. diff --git a/flint-spark-integration/lib/AWSLogsDALConnectorSpark-1.0.jar b/flint-spark-integration/lib/AWSLogsDALConnectorSpark-1.0.jar new file mode 100644 index 0000000000000000000000000000000000000000..30c3fd3c40a7c71458090c4751bea6b8715f78c3 GIT binary patch literal 2193 zcmWIWW@h1HVBp|j;ONQ?X8-~w5CH_7K2Rrz^%l`5esO)M|Y$xkm%Ni0cBOioTMF3w4;OfAA|f-X@eBJlUL&dg~6=R#Kh!?0NeC44)d>l>RTK(Eygp?aDVgv z3+Zx3i3x1?DjrOfXSoxr-S^h2SB*#i(%rki3*YT5fB*02cm4g0HxA5e6uBg2WZ9EA zC-IeF{VCn))x6pJzj@pcJX~?}KC-y()9i890 z?mowBn@4xHxk~KXqs(;rn{EG}AC4KVx^gz#w3d8nxE|xU&iQwMiRIy>qx@20`Grx( zvmOdx@3PVrjJm3kar+|kByXcXE_{<`BwhFu_T)$034_SM6}Kloj*9;fcJhG9x~v`3 z4r|XyG;rOU+qq1>WQ|%s-`fkT3V#=#=2v=sB~P29qP?c~^M*$)Q@%MpOiZ286SPr( zQb=&ay5|#)XY;Tv-T!z;(ssYPWp0`GP9%mo&Cg-om*R1yT;}05=jL;Hr?~c<{IFrd zgzkk)eqWq4?XhW_X&1ZHv(DGSd-L~fxI4+E<5!(GOU$Rr_tnuFNxWxHo{LD>VJj4{ ze#10h)#Z2AW?hf`vGqYI+um0uHga2;wZ6=C)Ofmi!?*W2(%Xcz34S^?2TMl39zLXXV!>ccuL*=MkP) z5w>+_`S!OvN(Bu}&Lu=P-gvg~##g6@t$t5zMDI>mWw*H3kKNUw`QR&V7me%sLLKim z9G_jXeBIm)#ueI6b{nYbD`Z#v455b)Hw_ zEYq@!Ub4B|~IDeqZNJ)3qYyG*pY;;UTT-uCctjmxzfU*)`F7y2AGxTk)a|AS(!YL98Cox*SJ zXZviAMeo;^?M#Y!JDbbVJj%@NK!B5_-(#EKR|UVt%E({5mzHFot^f72p`GW9^T$rq zY%4I|HoYQ9f1|$kZ@t4e<6Ji#`D!-TxTdR`b7D@fOOa+_YMI-u_YXF=^t_vU@ + val index = flint.describeIndex(getIndexName(catalog, identifier)) + if (index.exists(_.kind == SKIPPING_INDEX_TYPE)) { + val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex] + val indexFilter = rewriteToIndexFilter(skippingIndex, condition) + /* + * Replace original LogsTable with a new one with log file idx: + * Filter(a=b) + * |- DataSourceV2Relation(A) + * |- LogsTable <== replaced with a new LogsTable with log file ids + */ + if (indexFilter.isDefined) { + val indexScan = flint.queryIndex(skippingIndex.name()) + val selectedFiles = + // Non hybrid scan + // TODO: refactor common logic with file-based skipping index + indexScan + .filter(new Column(indexFilter.get)) + .select(FILE_PATH_COLUMN) + .collect + .map(_.getString(0)) + + // Construct LogsTable with selectedFiles as its log file ids + // It will build scan operator using these log file ids + val logsTable = table.asInstanceOf[LogsTable] + val newTable = new LogsTable( + logsTable.getSchema(), + logsTable.getOptions(), + selectedFiles, + logsTable.getProcessedFields()) + filter.copy(child = relation.copy(table = newTable)) + } else { + filter + } + } else { + filter + } } private def getIndexName(table: CatalogTable): String = { @@ -67,6 +114,11 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] getSkippingIndexName(qualifiedTableName) } + private def getIndexName(catalog: CatalogPlugin, identifier: Identifier): String = { + val qualifiedTableName = s"${catalog.name}.${identifier}" + getSkippingIndexName(qualifiedTableName) + } + private def hasNoDisjunction(condition: Expression): Boolean = { condition.collectFirst { case Or(_, _) => true