From 29a91e3fd9524563f4086921087f6a5093a2b149 Mon Sep 17 00:00:00 2001 From: Venki Korukanti Date: Mon, 28 Aug 2023 11:08:53 -0700 Subject: [PATCH] Add a java based shared table reader that uses Kernel APIs --- .../client/DeltaSharingRestClientSuite.scala | 58 +++++++ .../java/kernel-based-java-client/pom.xml | 99 ++++++++++++ .../sharing/examples/BaseTableReader.java | 142 ++++++++++++++++++ .../KernelBasedJavaSharingClient.java | 127 ++++++++++++++++ .../sharing/examples/KernelUtilities.java | 36 +++++ .../src/main/resources/test-server-profile | 5 + 6 files changed, 467 insertions(+) create mode 100644 examples/java/kernel-based-java-client/pom.xml create mode 100644 examples/java/kernel-based-java-client/src/main/java/io/delta/sharing/examples/BaseTableReader.java create mode 100644 examples/java/kernel-based-java-client/src/main/java/io/delta/sharing/examples/KernelBasedJavaSharingClient.java create mode 100644 examples/java/kernel-based-java-client/src/main/java/io/delta/sharing/examples/KernelUtilities.java create mode 100644 examples/java/kernel-based-java-client/src/main/resources/test-server-profile diff --git a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala index be7c91b08..fe91534b7 100644 --- a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala @@ -1026,4 +1026,62 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { } } } + + integrationTest("kernel:getFiles") { + val client = new DeltaSharingRestClient( + testProfileProvider, + sslTrustAll = true, + responseFormat = DeltaSharingRestClient.RESPONSE_FORMAT_KERNEL + ) + try { + val tableFiles = + client.getFiles( + Table(name = "hackathon_dv_table", schema = "default", share = "share1"), + Nil, + None, + None, + None, + None + ) + + val scanStateJson = tableFiles.kernelStateAndScanFiles.head + val scanFilesJson = tableFiles.kernelStateAndScanFiles.drop(1) + + val hadoopConf = new Configuration() { + { + set("spark.hadoop.fs.s3a.aws.credentials.provider", + "com.amazonaws.auth.EnvironmentVariableCredentialsProvider") + set("fs.s3a.endpoint", "s3.us-west-2.amazonaws.com") + } + } + val tableClient = DefaultTableClient.create(hadoopConf) + + val scanState = KernelUtils.deserializeRowFromJson(tableClient, scanStateJson) + val scanFiles = scanFilesJson.map { scanFileJson => + KernelUtils.deserializeRowFromJson(tableClient, scanFileJson) + } + + var readRecordCount = 0 + val maxRowCount = 100 + val data = Scan.readData( + tableClient, + scanState, + KernelUtils.convertToCloseableIterator(scanFiles), + Optional.empty()) + breakable { + try { + while (data.hasNext) { + val dataReadResult = data.next() + readRecordCount += KernelUtils.printData(dataReadResult, maxRowCount - readRecordCount) + if (readRecordCount >= maxRowCount) { + break() // This will break out of the enclosing breakable block + } + } + } finally { + data.asInstanceOf[Closeable].close() + } + } + client.close() + } + } } diff --git a/examples/java/kernel-based-java-client/pom.xml b/examples/java/kernel-based-java-client/pom.xml new file mode 100644 index 000000000..19f9ad3e5 --- /dev/null +++ b/examples/java/kernel-based-java-client/pom.xml @@ -0,0 +1,99 @@ + + + + + + 4.0.0 + + org.example + kernel-based-java-client + 0.1-SNAPSHOT + + + 1.8 + 1.8 + "" + 3.0.0-SNAPSHOT + 1.0.0-SNAPSHOT + 3.3.1 + + + + + staging-repo + ${staging.repo.url} + + + + + + io.delta + delta-kernel-api + ${delta-kernel.version} + + + + io.delta + delta-kernel-defaults + ${delta-kernel.version} + + + + io.delta + delta-sharing-client_2.12 + ${delta-sharing.version} + + + + org.apache.hadoop + hadoop-client-runtime + ${hadoop.version} + + + + org.apache.hadoop + hadoop-client-api + ${hadoop.version} + + + + commons-cli + commons-cli + 1.5.0 + + + + com.fasterxml.jackson.core + jackson-databind + 2.13.5 + + + + org.scala-lang + scala-library + 2.12.11 + + + + com.fasterxml.jackson.module + jackson-module-scala_2.12 + 2.13.5 + + + + + diff --git a/examples/java/kernel-based-java-client/src/main/java/io/delta/sharing/examples/BaseTableReader.java b/examples/java/kernel-based-java-client/src/main/java/io/delta/sharing/examples/BaseTableReader.java new file mode 100644 index 000000000..08333cad0 --- /dev/null +++ b/examples/java/kernel-based-java-client/src/main/java/io/delta/sharing/examples/BaseTableReader.java @@ -0,0 +1,142 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.sharing.examples; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.DataReadResult; +import io.delta.kernel.defaults.internal.data.vector.VectorUtils; +import io.delta.kernel.types.StructType; + +/** + * Base class for reading Delta Lake tables using the Delta Kernel APIs. + */ +public abstract class BaseTableReader +{ + protected static int printData(DataReadResult dataReadResult, int maxRowsToPrint) + { + int printedRowCount = 0; + ColumnarBatch data = dataReadResult.getData(); + Optional selectionVector = dataReadResult.getSelectionVector(); + for (int rowId = 0; rowId < data.getSize(); rowId++) { + if (!selectionVector.isPresent() || selectionVector.get().getBoolean(rowId)) { + printRow(data, rowId); + printedRowCount++; + if (printedRowCount == maxRowsToPrint) { + break; + } + } + } + return printedRowCount; + } + + protected static void printSchema(StructType schema) + { + System.out.printf(formatter(schema.length()), schema.fieldNames().toArray(new String[0])); + } + + protected static void printRow(ColumnarBatch batch, int rowId) + { + int numCols = batch.getSchema().length(); + Object[] rowValues = IntStream.range(0, numCols).mapToObj(colOrdinal -> { + ColumnVector columnVector = batch.getColumnVector(colOrdinal); + return VectorUtils.getValueAsObject(columnVector, rowId); + }).toArray(); + + // TODO: Need to handle the Row, Map, Array, Timestamp, Date types specially to + // print them in the format they need. Copy this code from Spark CLI. + + System.out.printf(formatter(numCols), rowValues); + } + + /** + * Minimum command line options for any implementation of this reader. + */ + protected static Options baseOptions() + { + return new Options() + .addRequiredOption("t", "table", true, "Fully qualified table path") + .addOption("c", "columns", true, + "Comma separated list of columns to read from the table. " + + "Ex. --columns=id,name,address") + .addOption( + Option.builder() + .option("l") + .longOpt("limit") + .hasArg(true) + .desc("Maximum number of rows to read from the table (default 20).") + .type(Number.class) + .build() + ); + } + + /** + * Helper method to parse the command line arguments. + */ + protected static CommandLine parseArgs(String mainClassName, Options options, String[] args) + { + CommandLineParser cliParser = new DefaultParser(); + + try { + return cliParser.parse(options, args); + } + catch (ParseException parseException) { + new HelpFormatter().printHelp( + "java " + mainClassName, + options, + true + ); + } + System.exit(-1); + return null; + } + + protected static Optional> parseColumnList(CommandLine cli, String optionName) + { + return Optional.ofNullable(cli.getOptionValue(optionName)) + .map(colString -> Arrays.asList(colString.split(",[ ]*"))); + } + + protected static int parseInt(CommandLine cli, String optionName, int defaultValue) + throws ParseException + { + return Optional.ofNullable(cli.getParsedOptionValue(optionName)) + .map(Number.class::cast) + .map(Number::intValue) + .orElse(defaultValue); + } + + private static String formatter(int length) + { + return IntStream.range(0, length) + .mapToObj(i -> "%20s") + .collect(Collectors.joining("|")) + "\n"; + } +} + diff --git a/examples/java/kernel-based-java-client/src/main/java/io/delta/sharing/examples/KernelBasedJavaSharingClient.java b/examples/java/kernel-based-java-client/src/main/java/io/delta/sharing/examples/KernelBasedJavaSharingClient.java new file mode 100644 index 000000000..2c855af70 --- /dev/null +++ b/examples/java/kernel-based-java-client/src/main/java/io/delta/sharing/examples/KernelBasedJavaSharingClient.java @@ -0,0 +1,127 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.sharing.examples; + +import scala.Option; +import scala.collection.Iterator; + +import io.delta.sharing.client.DeltaSharingClient; +import io.delta.sharing.client.DeltaSharingFileProfileProvider; +import io.delta.sharing.client.DeltaSharingRestClient; +import io.delta.sharing.client.model.DeltaTableFiles; +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.commons.cli.CommandLine; +import org.apache.hadoop.conf.Configuration; + +import io.delta.kernel.Scan; +import io.delta.kernel.TableNotFoundException; +import io.delta.kernel.client.TableClient; +import io.delta.kernel.data.DataReadResult; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.client.DefaultTableClient; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.Utils; + +/** + * Single threaded Delta sharing table reader using the Delta Kernel APIs. + * + *

+ * Usage: java io.delta.sharing.examples.KernelBasedJavaSharingClient [-c ] [-l ] -t + *

+ * -c,--columns Comma separated list of columns to read from the + * table. Ex. --columns=id,name,address + * -l,--limit Maximum number of rows to read from the table + * (default 20). + * -t,--table Fully qualified table path + *

+ */ +public class KernelBasedJavaSharingClient + extends BaseTableReader +{ + public void show() throws IOException + { + Configuration hadoopConf = new Configuration() + {{ + set("spark.hadoop.fs.s3a.aws.credentials.provider", + "com.amazonaws.auth.EnvironmentVariableCredentialsProvider"); + set("fs.s3a.endpoint", "s3.us-west-2.amazonaws.com"); + }}; + TableClient tableClient = DefaultTableClient.create(hadoopConf); + + DeltaSharingFileProfileProvider shareProvider = new DeltaSharingFileProfileProvider( + hadoopConf, + // this.getClass().getResource("test-server-profile").toString() + "/Users/venkateshwar.korukanti/opensource/delta-sharing/examples/java/" + + "kernel-based-java-client/src/main/resources/test-server-profile" + ); + DeltaSharingClient sharingClient = new DeltaSharingRestClient( + shareProvider, + 120, + 10, + Long.MAX_VALUE, + false, + false, + "kernel", + "", + false, + 100000 + ); + + DeltaTableFiles files = sharingClient.getFiles( + new io.delta.sharing.client.model.Table("hackathon_dv_table", "default", "share1"), + null, + Option.empty(), + Option.empty(), + Option.empty(), + Option.empty() + ); + + String scanStateStr = files.kernelStateAndScanFiles().head(); + List scanFilesStr = scala.collection.JavaConverters.seqAsJavaListConverter( + files.kernelStateAndScanFiles().drop(1).toSeq()).asJava(); + + Row scanState = KernelUtilities.deserializeRowFromJson(tableClient, scanStateStr); + List scanFilesIter = scanFilesStr.stream().map( + scanFileStr -> KernelUtilities.deserializeRowFromJson(tableClient, scanFileStr) + ).collect(Collectors.toList()); + + boolean printedSchema = false; + try (CloseableIterator data = + Scan.readData( + tableClient, + scanState, + Utils.toCloseableIterator(scanFilesIter.iterator()), + Optional.empty())) { + while (data.hasNext()) { + DataReadResult dataReadResult = data.next(); + if (!printedSchema) { + printSchema(dataReadResult.getData().getSchema()); + printedSchema = true; + } + printData(dataReadResult, Integer.MAX_VALUE); + } + } + } + + public static void main(String[] args) + throws Exception + { + new KernelBasedJavaSharingClient().show(); + } +} diff --git a/examples/java/kernel-based-java-client/src/main/java/io/delta/sharing/examples/KernelUtilities.java b/examples/java/kernel-based-java-client/src/main/java/io/delta/sharing/examples/KernelUtilities.java new file mode 100644 index 000000000..bd25198ec --- /dev/null +++ b/examples/java/kernel-based-java-client/src/main/java/io/delta/sharing/examples/KernelUtilities.java @@ -0,0 +1,36 @@ +package io.delta.sharing.examples; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.UncheckedIOException; + +import io.delta.kernel.client.TableClient; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.internal.data.DefaultJsonRow; +import io.delta.kernel.types.StructType; + +import io.delta.kernel.internal.types.TableSchemaSerDe; + +public class KernelUtilities +{ + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + /** + * Utility method to deserialize a {@link Row} object from the JSON form. + */ + public static Row deserializeRowFromJson(TableClient tableClient, String jsonRowWithSchema) + { + try { + JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonRowWithSchema); + JsonNode schemaNode = jsonNode.get("schema"); + StructType schema = + TableSchemaSerDe.fromJson(tableClient.getJsonHandler(), schemaNode.asText()); + return new DefaultJsonRow((ObjectNode) jsonNode.get("row"), schema); + } + catch (JsonProcessingException ex) { + throw new UncheckedIOException(ex); + } + } +} diff --git a/examples/java/kernel-based-java-client/src/main/resources/test-server-profile b/examples/java/kernel-based-java-client/src/main/resources/test-server-profile new file mode 100644 index 000000000..8ba3c759a --- /dev/null +++ b/examples/java/kernel-based-java-client/src/main/resources/test-server-profile @@ -0,0 +1,5 @@ +{ + "shareCredentialsVersion": 1, + "endpoint": "https://localhost:12345/delta-sharing", + "bearerToken": "dapi5e3574ec767ca1548ae5bbed1a2dc04d" +} \ No newline at end of file