Skip to content

Commit

Permalink
Merge pull request #230 from tigergraph/spark-conn-dev
Browse files Browse the repository at this point in the history
[Automated]Spark Connector Release 0.2.1
  • Loading branch information
chengjie-qin authored Sep 24, 2024
2 parents e46e69c + 3691f33 commit ed699b6
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 21 deletions.
27 changes: 27 additions & 0 deletions tools/etl/tg-spark-connector/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# v0.2.1
* Fix the NPE issue when `log.level` is unset. The `log.level` option is optional. If not provided, the connector should initialize the logger using the Spark Log4j configurations
* New writer option `loading.ack` with default value "all":
* "all": loading requests will return after all GPE instances have acknowledged the requests
* "none": loading requests will return immediately after RESTPP processed the requests
* Change the naming convention of release packages:
* `tigergraph-spark-connector-<version>.jar` is renamed to `original-tigergraph-spark-connector-<version>.jar` and moved into `tigergraph-spark-connector-<version>.tar.gz`: the JAR file containing only the compiled classes of the connector, which does not include any dependencies.
* `tigergraph-spark-connector-<version>-jar-with-dependencies.jar` is renamed to `tigergraph-spark-connector-<version>.jar`: the JAR file that includes compiled classes, as well as all the dependencies.

# v0.2.0
* Support reading from TigerGraph to Spark DataFrame: [documentation](https://docs.tigergraph.com/tigergraph-server/current/data-loading/read-to-spark-dataframe)
* Built-in vertex query
* Built-in edge query
* Pre-installed query
* Interpreted query
* Support built-in logger by setting option `log.level` to 0,1,2 or 3
* Support TigerGraph 4.1+ (Still backward compatible with previous versions)
* Support requesting and refreshing JWT
* **NOTE**: it's mandatory to provide TigerGraph version by option `version`, e.g. `.option("version", "4.1.0")`

# v0.1.1
* Fix the issue that failed to load decimal data type

# v0.1.0
* Support writing from Spark DataFrame to TigerGraph: [documentation](https://docs.tigergraph.com/tigergraph-server/current/data-loading/load-from-spark-dataframe)
* Batch write
* Write with Spark Structured Streaming API
4 changes: 1 addition & 3 deletions tools/etl/tg-spark-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

<groupId>com.tigergraph</groupId>
<artifactId>tigergraph-spark-connector</artifactId>
<version>0.2.0</version>
<version>0.2.1</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down Expand Up @@ -140,8 +140,6 @@
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>jar-with-dependencies</shadedClassifierName>
<filters>
<filter>
<artifact>*:*</artifact>
Expand Down
4 changes: 2 additions & 2 deletions tools/etl/tg-spark-connector/src/assembly/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
</formats>
<files>
<file>
<source>${project.build.directory}/${project.artifactId}-${project.version}.jar</source>
<destName>${project.artifactId}-${project.version}.jar</destName>
<source>${project.build.directory}/original-${project.artifactId}-${project.version}.jar</source>
<destName>original-${project.artifactId}-${project.version}.jar</destName>
<outputDirectory>/</outputDirectory>
</file>
</files>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Pattern;

public class OptionDef implements Serializable {
// A unique Java object which represents the lack of a default value.
public static final Serializable NO_DEFAULT_VALUE = UUID.randomUUID();

// Identify whether the option has default value
public enum DefaultVal {
NON_DEFAULT
};

// Options' definitions
private final Map<String, OptionKey> optionKeys = new HashMap<>();
Expand Down Expand Up @@ -60,7 +62,7 @@ public OptionDef define(String name, Type type, String group) {
}

public OptionDef define(String name, Type type, boolean required, String group) {
return define(name, type, NO_DEFAULT_VALUE, required, null, group);
return define(name, type, DefaultVal.NON_DEFAULT, required, null, group);
}

/*
Expand Down Expand Up @@ -91,14 +93,15 @@ public OptionKey(
String group) {
this.name = name;
this.type = type;
this.defaultValue = NO_DEFAULT_VALUE.equals(defaultValue) ? NO_DEFAULT_VALUE : defaultValue;
this.defaultValue =
DefaultVal.NON_DEFAULT.equals(defaultValue) ? DefaultVal.NON_DEFAULT : defaultValue;
this.required = required;
this.validator = validator;
this.group = group;
}

public boolean hasDefault() {
return !NO_DEFAULT_VALUE.equals(this.defaultValue);
return !DefaultVal.NON_DEFAULT.equals(this.defaultValue);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@
*/
package com.tigergraph.spark.util;

import com.tigergraph.spark.util.OptionDef.OptionKey;
import com.tigergraph.spark.util.OptionDef.Type;
import com.tigergraph.spark.util.OptionDef.ValidVersion;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.tigergraph.spark.util.OptionDef.OptionKey;
import com.tigergraph.spark.util.OptionDef.Type;
import com.tigergraph.spark.util.OptionDef.ValidVersion;

/** Validate and transform Spark DataFrame options(configurations) */
public class Options implements Serializable {

Expand Down Expand Up @@ -61,6 +60,7 @@ public static enum QueryType {
public static final String LOADING_EOL = "loading.eol";
public static final String LOADING_BATCH_SIZE_BYTES = "loading.batch.size.bytes";
public static final String LOADING_TIMEOUT_MS = "loading.timeout.ms";
public static final String LOADING_ACK = "loading.ack";
public static final String LOADING_MAX_PERCENT_ERROR = "loading.max.percent.error";
public static final String LOADING_MAX_NUM_ERROR = "loading.max.num.error";
public static final String LOADING_RETRY_INTERVAL_MS = "loading.retry.interval.ms";
Expand All @@ -71,6 +71,8 @@ public static enum QueryType {
public static final String LOADING_EOL_DEFAULT = "\n";
public static final int LOADING_BATCH_SIZE_BYTES_DEFAULT = 2 * 1024 * 1024; // 2mb
public static final int LOADING_TIMEOUT_MS_DEFAULT = 0; // restpp default
public static final String LOADING_ACK_ALL = "all";
public static final String LOADING_ACK_NONE = "none";
public static final int LOADING_RETRY_INTERVAL_MS_DEFAULT = 5 * 1000; // 5s
public static final int LOADING_MAX_RETRY_INTERVAL_MS_DEFAULT = 5 * 60 * 1000; // 5min
public static final int LOADING_MAX_RETRY_ATTEMPTS_DEFAULT = 10;
Expand Down Expand Up @@ -133,8 +135,8 @@ public static enum QueryType {
public static final String GROUP_QUERY = "query";
public static final String GROUP_LOG = "log";

private final Map<String, String> originals;
private final Map<String, Serializable> transformed = new HashMap<>();
private final HashMap<String, String> originals;
private final HashMap<String, Serializable> transformed = new HashMap<>();
private final OptionDef definition;

public Options(Map<String, String> originals, boolean skipValidate) {
Expand All @@ -144,15 +146,15 @@ public Options(Map<String, String> originals, boolean skipValidate) {
} else {
this.optionType = OptionType.READ;
}
this.originals = originals != null ? originals : new HashMap<>();
this.originals = originals != null ? new HashMap<>(originals) : new HashMap<>();
this.definition =
new OptionDef()
.define(GRAPH, Type.STRING, true, GROUP_GENERAL)
.define(URL, Type.STRING, true, GROUP_GENERAL)
.define(
VERSION,
Type.STRING,
OptionDef.NO_DEFAULT_VALUE,
OptionDef.DefaultVal.NON_DEFAULT,
true,
ValidVersion.INSTANCE,
GROUP_GENERAL)
Expand Down Expand Up @@ -241,6 +243,13 @@ public Options(Map<String, String> originals, boolean skipValidate) {
true,
null,
GROUP_LOADING_JOB)
.define(
LOADING_ACK,
Type.STRING,
LOADING_ACK_ALL,
true,
OptionDef.ValidString.in(LOADING_ACK_ALL, LOADING_ACK_NONE),
GROUP_LOADING_JOB)
.define(LOADING_MAX_PERCENT_ERROR, Type.DOUBLE, GROUP_LOADING_JOB)
.define(LOADING_MAX_NUM_ERROR, Type.INT, GROUP_LOADING_JOB)
.define(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class TigerGraphDataWriter implements DataWriter<InternalRow> {
queryMap.put("sep", opts.getString(Options.LOADING_SEPARATOR));
queryMap.put("eol", opts.getString(Options.LOADING_EOL));
queryMap.put("timeout", opts.getInt(Options.LOADING_TIMEOUT_MS));
queryMap.put("ack", opts.getString(Options.LOADING_ACK));
if (Utils.versionCmp(version, "3.9.4") >= 0) {
queryMap.put("jobid", jobId);
if (opts.containsOption(Options.LOADING_MAX_NUM_ERROR)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package com.tigergraph.spark.integration;

import static com.github.tomakehurst.wiremock.client.WireMock.*;

import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
import com.tigergraph.spark.TigerGraphTable;
Expand All @@ -24,8 +23,12 @@
import com.tigergraph.spark.read.TigerGraphScan;
import com.tigergraph.spark.read.TigerGraphScanBuilder;
import com.tigergraph.spark.util.Options;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.serializer.JavaSerializer;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
Expand Down Expand Up @@ -76,13 +79,27 @@ static void mockSparkExecution(Map<String, String> opts, StructType schema) {
TigerGraphScan scan = scanBuilder.build();
TigerGraphBatch batchRead = scan.toBatch();
InputPartition[] partitions = batchRead.planInputPartitions();
TigerGraphPartitionReaderFactory readerFactory = batchRead.createReaderFactory();
TigerGraphPartitionReaderFactory readerFactory = serdes(batchRead.createReaderFactory());
for (int i = 0; i < partitions.length; i++) {
readerFactory.createReader(partitions[i]);
}
// after `createReader`, the RESTPP request is already sent and we can directly verify it
}

// Spark serialize it in driver, and deserialize in executor
static TigerGraphPartitionReaderFactory serdes(TigerGraphPartitionReaderFactory factory) {
SparkConf conf = new SparkConf().setAppName("TestSerialization").setMaster("local[*]");
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");
JavaSerializer javaSerializer = new JavaSerializer(conf);
SerializerInstance serializer = javaSerializer.newInstance();
ByteBuffer serializedData =
serializer.serialize(
factory, scala.reflect.ClassTag.apply(TigerGraphPartitionReaderFactory.class));
return (TigerGraphPartitionReaderFactory)
serializer.deserialize(
serializedData, scala.reflect.ClassTag.apply(TigerGraphPartitionReaderFactory.class));
}

@Test
void testSendVertexQueryWithOperator(WireMockRuntimeInfo wmRuntimeInfo) {
Map<String, String> opts =
Expand Down

0 comments on commit ed699b6

Please sign in to comment.