From 3691f33ac201307dae6a3bb7af47670b2f1f6f8d Mon Sep 17 00:00:00 2001 From: TG infra <> Date: Tue, 24 Sep 2024 16:41:14 +0000 Subject: [PATCH] [Automated]Spark Connector Release 0.2.1 --- tools/etl/tg-spark-connector/CHANGELOG.md | 27 +++++++++++++++++++ tools/etl/tg-spark-connector/pom.xml | 4 +-- .../src/assembly/package.xml | 4 +-- .../com/tigergraph/spark/util/OptionDef.java | 15 ++++++----- .../com/tigergraph/spark/util/Options.java | 25 +++++++++++------ .../spark/write/TigerGraphDataWriter.java | 1 + .../TigerGraphReadRequestTest.java | 21 +++++++++++++-- 7 files changed, 76 insertions(+), 21 deletions(-) create mode 100644 tools/etl/tg-spark-connector/CHANGELOG.md diff --git a/tools/etl/tg-spark-connector/CHANGELOG.md b/tools/etl/tg-spark-connector/CHANGELOG.md new file mode 100644 index 00000000..05bab8b3 --- /dev/null +++ b/tools/etl/tg-spark-connector/CHANGELOG.md @@ -0,0 +1,27 @@ +# v0.2.1 +* Fix the NPE issue when `log.level` is unset. The `log.level` option is optional. If not provided, the connector should initialize the logger using the Spark Log4j configurations +* New writer option `loading.ack` with default value "all": + * "all": loading requests will return after all GPE instances have acknowledged the requests + * "none": loading requests will return immediately after RESTPP processed the requests +* Change the naming convention of release packages: + * `tigergraph-spark-connector-.jar` is renamed to `original-tigergraph-spark-connector-.jar` and moved into `tigergraph-spark-connector-.tar.gz`: the JAR file containing only the compiled classes of the connector, which does not include any dependencies. + * `tigergraph-spark-connector--jar-with-dependencies.jar` is renamed to `tigergraph-spark-connector-.jar`: the JAR file that includes compiled classes, as well as all the dependencies. + +# v0.2.0 +* Support reading from TigerGraph to Spark DataFrame: [documentation](https://docs.tigergraph.com/tigergraph-server/current/data-loading/read-to-spark-dataframe) + * Built-in vertex query + * Built-in edge query + * Pre-installed query + * Interpreted query +* Support built-in logger by setting option `log.level` to 0,1,2 or 3 +* Support TigerGraph 4.1+ (Still backward compatible with previous versions) + * Support requesting and refreshing JWT + * **NOTE**: it's mandatory to provide TigerGraph version by option `version`, e.g. `.option("version", "4.1.0")` + +# v0.1.1 +* Fix the issue that failed to load decimal data type + +# v0.1.0 +* Support writing from Spark DataFrame to TigerGraph: [documentation](https://docs.tigergraph.com/tigergraph-server/current/data-loading/load-from-spark-dataframe) + * Batch write + * Write with Spark Structured Streaming API \ No newline at end of file diff --git a/tools/etl/tg-spark-connector/pom.xml b/tools/etl/tg-spark-connector/pom.xml index d5dcd196..659ec7be 100644 --- a/tools/etl/tg-spark-connector/pom.xml +++ b/tools/etl/tg-spark-connector/pom.xml @@ -33,7 +33,7 @@ com.tigergraph tigergraph-spark-connector - 0.2.0 + 0.2.1 UTF-8 @@ -140,8 +140,6 @@ shade - true - jar-with-dependencies *:* diff --git a/tools/etl/tg-spark-connector/src/assembly/package.xml b/tools/etl/tg-spark-connector/src/assembly/package.xml index 09886049..d630ece4 100644 --- a/tools/etl/tg-spark-connector/src/assembly/package.xml +++ b/tools/etl/tg-spark-connector/src/assembly/package.xml @@ -10,8 +10,8 @@ - ${project.build.directory}/${project.artifactId}-${project.version}.jar - ${project.artifactId}-${project.version}.jar + ${project.build.directory}/original-${project.artifactId}-${project.version}.jar + original-${project.artifactId}-${project.version}.jar / diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/util/OptionDef.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/util/OptionDef.java index 73a93f7c..afde1745 100644 --- a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/util/OptionDef.java +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/util/OptionDef.java @@ -18,12 +18,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.regex.Pattern; public class OptionDef implements Serializable { - // A unique Java object which represents the lack of a default value. - public static final Serializable NO_DEFAULT_VALUE = UUID.randomUUID(); + + // Identify whether the option has default value + public enum DefaultVal { + NON_DEFAULT + }; // Options' definitions private final Map optionKeys = new HashMap<>(); @@ -60,7 +62,7 @@ public OptionDef define(String name, Type type, String group) { } public OptionDef define(String name, Type type, boolean required, String group) { - return define(name, type, NO_DEFAULT_VALUE, required, null, group); + return define(name, type, DefaultVal.NON_DEFAULT, required, null, group); } /* @@ -91,14 +93,15 @@ public OptionKey( String group) { this.name = name; this.type = type; - this.defaultValue = NO_DEFAULT_VALUE.equals(defaultValue) ? NO_DEFAULT_VALUE : defaultValue; + this.defaultValue = + DefaultVal.NON_DEFAULT.equals(defaultValue) ? DefaultVal.NON_DEFAULT : defaultValue; this.required = required; this.validator = validator; this.group = group; } public boolean hasDefault() { - return !NO_DEFAULT_VALUE.equals(this.defaultValue); + return !DefaultVal.NON_DEFAULT.equals(this.defaultValue); } } diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/util/Options.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/util/Options.java index 2c995930..3df730e7 100644 --- a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/util/Options.java +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/util/Options.java @@ -13,6 +13,9 @@ */ package com.tigergraph.spark.util; +import com.tigergraph.spark.util.OptionDef.OptionKey; +import com.tigergraph.spark.util.OptionDef.Type; +import com.tigergraph.spark.util.OptionDef.ValidVersion; import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; @@ -20,10 +23,6 @@ import java.util.List; import java.util.Map; -import com.tigergraph.spark.util.OptionDef.OptionKey; -import com.tigergraph.spark.util.OptionDef.Type; -import com.tigergraph.spark.util.OptionDef.ValidVersion; - /** Validate and transform Spark DataFrame options(configurations) */ public class Options implements Serializable { @@ -61,6 +60,7 @@ public static enum QueryType { public static final String LOADING_EOL = "loading.eol"; public static final String LOADING_BATCH_SIZE_BYTES = "loading.batch.size.bytes"; public static final String LOADING_TIMEOUT_MS = "loading.timeout.ms"; + public static final String LOADING_ACK = "loading.ack"; public static final String LOADING_MAX_PERCENT_ERROR = "loading.max.percent.error"; public static final String LOADING_MAX_NUM_ERROR = "loading.max.num.error"; public static final String LOADING_RETRY_INTERVAL_MS = "loading.retry.interval.ms"; @@ -71,6 +71,8 @@ public static enum QueryType { public static final String LOADING_EOL_DEFAULT = "\n"; public static final int LOADING_BATCH_SIZE_BYTES_DEFAULT = 2 * 1024 * 1024; // 2mb public static final int LOADING_TIMEOUT_MS_DEFAULT = 0; // restpp default + public static final String LOADING_ACK_ALL = "all"; + public static final String LOADING_ACK_NONE = "none"; public static final int LOADING_RETRY_INTERVAL_MS_DEFAULT = 5 * 1000; // 5s public static final int LOADING_MAX_RETRY_INTERVAL_MS_DEFAULT = 5 * 60 * 1000; // 5min public static final int LOADING_MAX_RETRY_ATTEMPTS_DEFAULT = 10; @@ -133,8 +135,8 @@ public static enum QueryType { public static final String GROUP_QUERY = "query"; public static final String GROUP_LOG = "log"; - private final Map originals; - private final Map transformed = new HashMap<>(); + private final HashMap originals; + private final HashMap transformed = new HashMap<>(); private final OptionDef definition; public Options(Map originals, boolean skipValidate) { @@ -144,7 +146,7 @@ public Options(Map originals, boolean skipValidate) { } else { this.optionType = OptionType.READ; } - this.originals = originals != null ? originals : new HashMap<>(); + this.originals = originals != null ? new HashMap<>(originals) : new HashMap<>(); this.definition = new OptionDef() .define(GRAPH, Type.STRING, true, GROUP_GENERAL) @@ -152,7 +154,7 @@ public Options(Map originals, boolean skipValidate) { .define( VERSION, Type.STRING, - OptionDef.NO_DEFAULT_VALUE, + OptionDef.DefaultVal.NON_DEFAULT, true, ValidVersion.INSTANCE, GROUP_GENERAL) @@ -241,6 +243,13 @@ public Options(Map originals, boolean skipValidate) { true, null, GROUP_LOADING_JOB) + .define( + LOADING_ACK, + Type.STRING, + LOADING_ACK_ALL, + true, + OptionDef.ValidString.in(LOADING_ACK_ALL, LOADING_ACK_NONE), + GROUP_LOADING_JOB) .define(LOADING_MAX_PERCENT_ERROR, Type.DOUBLE, GROUP_LOADING_JOB) .define(LOADING_MAX_NUM_ERROR, Type.INT, GROUP_LOADING_JOB) .define( diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphDataWriter.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphDataWriter.java index beb2d837..7da9f2d9 100644 --- a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphDataWriter.java +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphDataWriter.java @@ -97,6 +97,7 @@ public class TigerGraphDataWriter implements DataWriter { queryMap.put("sep", opts.getString(Options.LOADING_SEPARATOR)); queryMap.put("eol", opts.getString(Options.LOADING_EOL)); queryMap.put("timeout", opts.getInt(Options.LOADING_TIMEOUT_MS)); + queryMap.put("ack", opts.getString(Options.LOADING_ACK)); if (Utils.versionCmp(version, "3.9.4") >= 0) { queryMap.put("jobid", jobId); if (opts.containsOption(Options.LOADING_MAX_NUM_ERROR)) { diff --git a/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/integration/TigerGraphReadRequestTest.java b/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/integration/TigerGraphReadRequestTest.java index 5c751da5..1d2c078b 100644 --- a/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/integration/TigerGraphReadRequestTest.java +++ b/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/integration/TigerGraphReadRequestTest.java @@ -14,7 +14,6 @@ package com.tigergraph.spark.integration; import static com.github.tomakehurst.wiremock.client.WireMock.*; - import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; import com.github.tomakehurst.wiremock.junit5.WireMockTest; import com.tigergraph.spark.TigerGraphTable; @@ -24,8 +23,12 @@ import com.tigergraph.spark.read.TigerGraphScan; import com.tigergraph.spark.read.TigerGraphScanBuilder; import com.tigergraph.spark.util.Options; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; +import org.apache.spark.SparkConf; +import org.apache.spark.serializer.JavaSerializer; +import org.apache.spark.serializer.SerializerInstance; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; @@ -76,13 +79,27 @@ static void mockSparkExecution(Map opts, StructType schema) { TigerGraphScan scan = scanBuilder.build(); TigerGraphBatch batchRead = scan.toBatch(); InputPartition[] partitions = batchRead.planInputPartitions(); - TigerGraphPartitionReaderFactory readerFactory = batchRead.createReaderFactory(); + TigerGraphPartitionReaderFactory readerFactory = serdes(batchRead.createReaderFactory()); for (int i = 0; i < partitions.length; i++) { readerFactory.createReader(partitions[i]); } // after `createReader`, the RESTPP request is already sent and we can directly verify it } + // Spark serialize it in driver, and deserialize in executor + static TigerGraphPartitionReaderFactory serdes(TigerGraphPartitionReaderFactory factory) { + SparkConf conf = new SparkConf().setAppName("TestSerialization").setMaster("local[*]"); + conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer"); + JavaSerializer javaSerializer = new JavaSerializer(conf); + SerializerInstance serializer = javaSerializer.newInstance(); + ByteBuffer serializedData = + serializer.serialize( + factory, scala.reflect.ClassTag.apply(TigerGraphPartitionReaderFactory.class)); + return (TigerGraphPartitionReaderFactory) + serializer.deserialize( + serializedData, scala.reflect.ClassTag.apply(TigerGraphPartitionReaderFactory.class)); + } + @Test void testSendVertexQueryWithOperator(WireMockRuntimeInfo wmRuntimeInfo) { Map opts =