diff --git a/dataframe-arrow/build.gradle.kts b/dataframe-arrow/build.gradle.kts index 68bb3c311..ac8c97bbb 100644 --- a/dataframe-arrow/build.gradle.kts +++ b/dataframe-arrow/build.gradle.kts @@ -13,6 +13,7 @@ dependencies { implementation(libs.arrow.vector) implementation(libs.arrow.format) implementation(libs.arrow.memory) + implementation(libs.arrow.dataset) implementation(libs.commonsCompress) implementation(libs.kotlin.reflect) implementation(libs.kotlin.datetimeJvm) diff --git a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt index dac1fe680..bf34f0c83 100644 --- a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt +++ b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt @@ -1,5 +1,6 @@ package org.jetbrains.kotlinx.dataframe.io +import org.apache.arrow.dataset.file.FileFormat import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.ipc.ArrowReader import org.apache.commons.compress.utils.SeekableInMemoryByteChannel @@ -186,3 +187,11 @@ public fun DataFrame.Companion.readArrow( public fun ArrowReader.toDataFrame( nullability: NullabilityOptions = NullabilityOptions.Infer ): AnyFrame = DataFrame.Companion.readArrowImpl(this, nullability) + +/** + * Read [Parquet](https://parquet.apache.org/) data from existing [url] by using [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html) + */ +public fun DataFrame.Companion.readParquet( + url: URL, + nullability: NullabilityOptions = NullabilityOptions.Infer +): AnyFrame = readArrowDataset(url.toString(), FileFormat.PARQUET, nullability) diff --git a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt index d7d67b6a8..e983cc89d 100644 --- a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt +++ b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt @@ -1,5 +1,10 @@ package org.jetbrains.kotlinx.dataframe.io +import org.apache.arrow.dataset.file.FileFormat +import org.apache.arrow.dataset.file.FileSystemDatasetFactory +import org.apache.arrow.dataset.jni.DirectReservationListener +import org.apache.arrow.dataset.jni.NativeMemoryPool +import org.apache.arrow.dataset.scanner.ScanOptions import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.BigIntVector import org.apache.arrow.vector.BitVector @@ -296,7 +301,7 @@ internal fun DataFrame.Companion.readArrowImpl( add(df) } } - is ArrowStreamReader -> { + else -> { val root = reader.vectorSchemaRoot val schema = root.schema while (reader.loadNextBatch()) { @@ -309,3 +314,27 @@ internal fun DataFrame.Companion.readArrowImpl( return flattened.concatKeepingSchema() } } + +internal fun DataFrame.Companion.readArrowDataset( + fileUri: String, + fileFormat: FileFormat, + nullability: NullabilityOptions = NullabilityOptions.Infer, +): AnyFrame { + val scanOptions = ScanOptions(32768) + RootAllocator().use { allocator -> + FileSystemDatasetFactory( + allocator, + NativeMemoryPool.createListenable(DirectReservationListener.instance()), + fileFormat, + fileUri + ).use { datasetFactory -> + datasetFactory.finish().use { dataset -> + dataset.newScan(scanOptions).use { scanner -> + scanner.scanBatches().use { reader -> + return readArrow(reader, nullability) + } + } + } + } + } +} diff --git a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt index 513a46b21..7bc29e2c8 100644 --- a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt +++ b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt @@ -583,4 +583,17 @@ internal class ArrowKtTest { val arrowStreamReader = ArrowStreamReader(ipcInputStream, RootAllocator()) arrowStreamReader.toDataFrame() shouldBe expected } + + @Test + fun testReadParquet(){ + val path = testResource("test.arrow.parquet").path + val dataFrame = DataFrame.readParquet(URL("file:$path")) + dataFrame.rowsCount() shouldBe 300 + assertEstimations( + exampleFrame = dataFrame, + expectedNullable = false, + hasNulls = false, + fromParquet = true + ) + } } diff --git a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt index 3366e3055..686270c2a 100644 --- a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt +++ b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt @@ -18,7 +18,12 @@ import kotlin.reflect.typeOf * Assert that we have got the same data that was originally saved on example creation. * Example generation project is currently located at https://github.com/Kopilov/arrow_example */ -internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean, hasNulls: Boolean) { +internal fun assertEstimations( + exampleFrame: AnyFrame, + expectedNullable: Boolean, + hasNulls: Boolean, + fromParquet: Boolean = false +) { /** * In [exampleFrame] we get two concatenated batches. To assert the estimations, we should transform frame row number to batch row number */ @@ -129,10 +134,19 @@ internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean assertValueOrNull(iBatch(i), element, LocalDate.ofEpochDay(iBatch(i).toLong() * 30)) } - val datetimeCol = exampleFrame["date64"] as DataColumn - datetimeCol.type() shouldBe typeOf().withNullability(expectedNullable) - datetimeCol.forEachIndexed { i, element -> - assertValueOrNull(iBatch(i), element, LocalDateTime.ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC)) + if (fromParquet){ + //parquet format have only one type of date: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date without time + val datetimeCol = exampleFrame["date64"] as DataColumn + datetimeCol.type() shouldBe typeOf().withNullability(expectedNullable) + datetimeCol.forEachIndexed { i, element -> + assertValueOrNull(iBatch(i), element, LocalDate.ofEpochDay(iBatch(i).toLong() * 30)) + } + }else { + val datetimeCol = exampleFrame["date64"] as DataColumn + datetimeCol.type() shouldBe typeOf().withNullability(expectedNullable) + datetimeCol.forEachIndexed { i, element -> + assertValueOrNull(iBatch(i), element, LocalDateTime.ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC)) + } } val timeSecCol = exampleFrame["time32_seconds"] as DataColumn diff --git a/dataframe-arrow/src/test/resources/test.arrow.parquet b/dataframe-arrow/src/test/resources/test.arrow.parquet new file mode 100644 index 000000000..cf78b1c25 Binary files /dev/null and b/dataframe-arrow/src/test/resources/test.arrow.parquet differ diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 577d58219..1dc7c14df 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -30,7 +30,7 @@ sl4j = "2.0.7" junit = "4.13.2" kotestAsserions = "4.6.3" jsoup = "1.14.3" -arrow = "11.0.0" +arrow = "15.0.0" docProcessor = "0.2.3" simpleGit = "2.0.3" @@ -65,6 +65,7 @@ jsoup = { module = "org.jsoup:jsoup", version.ref = "jsoup" } arrow-format = { group = "org.apache.arrow", name = "arrow-format", version.ref = "arrow" } arrow-vector = { group = "org.apache.arrow", name = "arrow-vector", version.ref = "arrow" } arrow-memory = { group = "org.apache.arrow", name = "arrow-memory-unsafe", version.ref = "arrow" } +arrow-dataset = { group = "org.apache.arrow", name = "arrow-dataset", version.ref = "arrow" } kotlinpoet = { group = "com.squareup", name = "kotlinpoet", version.ref = "kotlinpoet" } swagger = { group = "io.swagger.parser.v3", name = "swagger-parser", version.ref = "openapi" }