Skip to content

Commit

Permalink
Merge pull request #328 from marklogic/release/2.4.2
Browse files Browse the repository at this point in the history
Merge 2.4.2 into master
  • Loading branch information
rjrudin authored Oct 17, 2024
2 parents 7ff9e0f + 8454c58 commit afd19a3
Show file tree
Hide file tree
Showing 22 changed files with 306 additions and 34 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ plugins {
}

group 'com.marklogic'
version '2.4.1'
version '2.4.2'

java {
// To support reading RDF files, Apache Jena is used - but that requires Java 11.
Expand Down
4 changes: 2 additions & 2 deletions docs/getting-started/jupyter.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ connector and also to initialize Spark:

```
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars "/path/to/marklogic-spark-connector-2.4.1.jar" pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars "/path/to/marklogic-spark-connector-2.4.2.jar" pyspark-shell'
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName('My Notebook').getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark
```

The path of `/path/to/marklogic-spark-connector-2.4.1.jar` should be changed to match the location of the connector
The path of `/path/to/marklogic-spark-connector-2.4.2.jar` should be changed to match the location of the connector
jar on your filesystem. You are free to customize the `spark` variable in any manner you see fit as well.

Now that you have an initialized Spark session, you can run any of the examples found in the
Expand Down
2 changes: 1 addition & 1 deletion docs/getting-started/pyspark.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ shell by pressing `ctrl-D`.

Run PySpark from the directory that you downloaded the connector to per the [setup instructions](setup.md):

pyspark --jars marklogic-spark-connector-2.4.1.jar
pyspark --jars marklogic-spark-connector-2.4.2.jar

The `--jars` command line option is PySpark's method for utilizing Spark connectors. Each Spark environment should have
a similar mechanism for including third party connectors; please see the documentation for your particular Spark
Expand Down
4 changes: 2 additions & 2 deletions docs/getting-started/setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ have an instance of MarkLogic running, you can skip step 4 below, but ensure tha
extracted directory contains valid connection properties for your instance of MarkLogic.

1. From [this repository's Releases page](https://github.com/marklogic/marklogic-spark-connector/releases), select
the latest release and download the `marklogic-spark-getting-started-2.4.1.zip` file.
the latest release and download the `marklogic-spark-getting-started-2.4.2.zip` file.
2. Extract the contents of the downloaded zip file.
3. Open a terminal window and go to the directory created by extracting the zip file; the directory should have a
name of "marklogic-spark-getting-started-2.4.1".
name of "marklogic-spark-getting-started-2.4.2".
4. Run `docker-compose up -d` to start an instance of MarkLogic
5. Ensure that the `./gradlew` file is executable; depending on your operating system, you may need to run
`chmod 755 gradlew` to make the file executable.
Expand Down
2 changes: 1 addition & 1 deletion examples/entity-aggregation/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ repositories {

dependencies {
implementation 'org.apache.spark:spark-sql_2.12:3.5.3'
implementation "com.marklogic:marklogic-spark-connector:2.4.1"
implementation "com.marklogic:marklogic-spark-connector:2.4.2"
implementation "org.postgresql:postgresql:42.6.2"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"source": [
"# Make the MarkLogic connector available to the underlying PySpark application.\n",
"import os\n",
"os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars \"marklogic-spark-connector-2.4.1.jar\" pyspark-shell'\n",
"os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars \"marklogic-spark-connector-2.4.2.jar\" pyspark-shell'\n",
"\n",
"# Define the connection details for the getting-started example application.\n",
"client_uri = \"spark-example-user:password@localhost:8003\"\n",
Expand Down
2 changes: 1 addition & 1 deletion examples/java-dependency/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ repositories {

dependencies {
implementation 'org.apache.spark:spark-sql_2.12:3.5.3'
implementation 'com.marklogic:marklogic-spark-connector:2.4.1'
implementation 'com.marklogic:marklogic-spark-connector:2.4.2'
}

task runApp(type: JavaExec) {
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/marklogic/spark/ContextSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ public final String getStringOption(String option) {
return hasOption(option) ? properties.get(option).trim() : null;
}

public final boolean getBooleanOption(String option, boolean defaultValue) {
return hasOption(option) ? Boolean.parseBoolean(getStringOption(option)) : defaultValue;
}

public final boolean isStreamingFiles() {
return "true".equalsIgnoreCase(getStringOption(Options.STREAM_FILES));
}
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/marklogic/spark/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ public abstract class Options {
public static final String READ_TRIPLES_FILTERED = "spark.marklogic.read.triples.filtered";
public static final String READ_TRIPLES_BASE_IRI = "spark.marklogic.read.triples.baseIri";

/**
* The connector uses a consistent snapshot by default. Setting this to false results in queries being executed
* at multiple points of time, potentially yielding inconsistent results.
*
* @since 2.4.2
*/
public static final String READ_SNAPSHOT = "spark.marklogic.read.snapshot";

// For logging progress when reading documents, rows, or items via custom code. Defines the interval at which
// progress should be logged - e.g. a value of 10,000 will result in a message being logged on every 10,000 items
// being written/processed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ int getPartitionsPerForest() {
return (int) getNumericOption(Options.READ_DOCUMENTS_PARTITIONS_PER_FOREST, defaultPartitionsPerForest, 1);
}

boolean isConsistentSnapshot() {
// Starting in 2.2.0 and through 2.4.2, the default is a consistent snapshot. We may change this later.
return getBooleanOption(Options.READ_SNAPSHOT, true);
}

void setLimit(Integer limit) {
this.limit = limit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@ class ForestReader implements PartitionReader<InternalRow> {
context.connectToMarkLogic(forestPartition.getHost()) :
context.connectToMarkLogic();

final boolean filtered = context.getBooleanOption(Options.READ_DOCUMENTS_FILTERED, false);
final boolean consistentSnapshot = context.isConsistentSnapshot();

if (logger.isDebugEnabled()) {
logger.debug("Will read from host {} for partition: {}", client.getHost(), forestPartition);
logger.debug("Will read from host {} for partition: {}; filtered: {}; consistent snapshot: {}",
client.getHost(), forestPartition, filtered, consistentSnapshot);
}

SearchQueryDefinition query = context.buildSearchQuery(client);
boolean filtered = false;
if (context.hasOption(Options.READ_DOCUMENTS_FILTERED)) {
filtered = Boolean.parseBoolean(context.getProperties().get(Options.READ_DOCUMENTS_FILTERED));
}
this.uriBatcher = new UriBatcher(client, query, forestPartition, context.getBatchSize(), filtered);
this.uriBatcher = new UriBatcher(client, query, forestPartition, context.getBatchSize(), filtered, consistentSnapshot);

this.documentManager = client.newDocumentManager();
this.documentManager.setReadTransform(query.getResponseTransform());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.unsafe.types.UTF8String;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
Expand All @@ -27,6 +29,8 @@
*/
class OpticTriplesReader implements PartitionReader<InternalRow> {

private static final Logger logger = LoggerFactory.getLogger(OpticTriplesReader.class);

private static final String DATATYPE_COLUMN = "datatype";
private static final String GRAPH_COLUMN = "graph";
private static final String OBJECT_COLUMN = "object";
Expand Down Expand Up @@ -54,12 +58,15 @@ public OpticTriplesReader(ForestPartition forestPartition, DocumentContext conte
this.op = this.rowManager.newPlanBuilder();

final SearchQueryDefinition query = context.buildTriplesSearchQuery(this.databaseClient);
boolean filtered = false;
if (context.hasOption(Options.READ_TRIPLES_FILTERED)) {
filtered = Boolean.parseBoolean(context.getProperties().get(Options.READ_TRIPLES_FILTERED));
final boolean filtered = context.getBooleanOption(Options.READ_TRIPLES_FILTERED, false);
final boolean consistentSnapshot = context.isConsistentSnapshot();

if (logger.isDebugEnabled()) {
logger.debug("Will read from host {} for partition: {}; filtered: {}; consistent snapshot: {}",
databaseClient.getHost(), forestPartition, filtered, consistentSnapshot);
}
this.uriBatcher = new UriBatcher(this.databaseClient, query, forestPartition, context.getBatchSize(), filtered);

this.uriBatcher = new UriBatcher(this.databaseClient, query, forestPartition, context.getBatchSize(), filtered, consistentSnapshot);
this.batchSize = context.getBatchSize();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ class UriBatcher {
private final ForestPartition partition;
private final int pageLength;
private final boolean filtered;
private final boolean useConsistentSnapshot;

// These change as batches of URIs are retrieved.
private String lastUri;
private long offsetStart = 1;


UriBatcher(DatabaseClient client, SearchQueryDefinition query, ForestPartition partition, int pageLength, boolean filtered) {
UriBatcher(DatabaseClient client, SearchQueryDefinition query, ForestPartition partition, int pageLength,
boolean filtered, boolean useConsistentSnapshot) {
this.client = client;
this.queryManager = (QueryManagerImpl) this.client.newQueryManager();
this.queryManager.setPageLength(pageLength);
Expand All @@ -40,6 +42,7 @@ class UriBatcher {
this.offsetStart = this.partition.getOffsetStart();
this.pageLength = pageLength;
this.filtered = filtered;
this.useConsistentSnapshot = useConsistentSnapshot;
}

/**
Expand All @@ -53,7 +56,9 @@ List<String> nextBatchOfUris() {
}

UrisHandle urisHandle = new UrisHandle();
urisHandle.setPointInTimeQueryTimestamp(partition.getServerTimestamp());
if (useConsistentSnapshot) {
urisHandle.setPointInTimeQueryTimestamp(partition.getServerTimestamp());
}

// If we have an offsetEnd, the page length is adjusted to ensure we do not go past offsetEnd.
if (partition.getOffsetEnd() != null && (this.offsetStart + this.pageLength > partition.getOffsetEnd())) {
Expand Down
36 changes: 27 additions & 9 deletions src/main/java/com/marklogic/spark/reader/file/FileContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.spark.util.SerializableConfiguration;

import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.io.*;
import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.nio.charset.UnsupportedCharsetException;
Expand Down Expand Up @@ -49,22 +46,36 @@ public boolean isGzip() {
}

public InputStream openFile(String filePath) {
return openFile(filePath, false);
}

public InputStream openFile(String filePath, boolean guessIfGzipped) {
try {
Path hadoopPath = new Path(filePath);
FileSystem fileSystem = hadoopPath.getFileSystem(hadoopConfiguration.value());
FSDataInputStream inputStream = fileSystem.open(hadoopPath);
return this.isGzip() ? new GZIPInputStream(inputStream) : inputStream;
return isFileGzipped(filePath, guessIfGzipped) ? new GZIPInputStream(inputStream) : inputStream;
} catch (Exception e) {
throw new ConnectorException(String.format(
"Unable to read file at %s; cause: %s", filePath, e.getMessage()), e);
}
}

public boolean isReadAbortOnFailure() {
if (hasOption(Options.READ_FILES_ABORT_ON_FAILURE)) {
return Boolean.parseBoolean(getStringOption(Options.READ_FILES_ABORT_ON_FAILURE));
BufferedReader openFileReader(String filePath, boolean guessIfGzipped) {
try {
InputStream inputStream = openFile(filePath, guessIfGzipped);
InputStreamReader inputStreamReader = this.encoding != null ?
new InputStreamReader(inputStream, encoding) :
new InputStreamReader(inputStream);
return new BufferedReader(inputStreamReader);
} catch (Exception e) {
throw new ConnectorException(String.format(
"Unable to read file at %s; cause: %s", filePath, e.getMessage()), e);
}
return true;
}

public boolean isReadAbortOnFailure() {
return getBooleanOption(Options.READ_FILES_ABORT_ON_FAILURE, true);
}

byte[] readBytes(InputStream inputStream) throws IOException {
Expand All @@ -85,4 +96,11 @@ public String decodeFilePath(String path) {
return path;
}
}

private boolean isFileGzipped(String filePath, boolean guessIfGzipped) {
if (this.isGzip()) {
return true;
}
return guessIfGzipped && filePath != null && (filePath.endsWith(".gz") || filePath.endsWith(".gzip"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public PartitionReader<InternalRow> createReader(InputPartition partition) {
return new MlcpArchiveFileReader(filePartition, fileContext);
} else if ("archive".equalsIgnoreCase(fileType)) {
return new ArchiveFileReader(filePartition, fileContext);
} else if ("json_lines".equalsIgnoreCase(fileType)) {
return new JsonLinesFileReader(filePartition, fileContext);
} else if (fileContext.hasOption(Options.READ_AGGREGATES_XML_ELEMENT)) {
return fileContext.isZip() ?
new ZipAggregateXmlFileReader(filePartition, fileContext) :
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright © 2024 MarkLogic Corporation. All Rights Reserved.
*/
package com.marklogic.spark.reader.file;

import org.apache.commons.io.IOUtils;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.unsafe.types.ByteArray;
import org.apache.spark.unsafe.types.UTF8String;

import java.io.BufferedReader;
import java.util.Iterator;

class JsonLinesFileReader implements PartitionReader<InternalRow> {

private final FilePartition filePartition;
private final FileContext fileContext;

private BufferedReader bufferedReader;
private Iterator<String> bufferedLines;

private InternalRow nextRowToReturn;
private String currentFilePath;
private int lineCounter;
private int filePathIndex;

JsonLinesFileReader(FilePartition filePartition, FileContext fileContext) {
this.filePartition = filePartition;
this.fileContext = fileContext;
}

@Override
public boolean next() {
if (bufferedLines != null && bufferedLines.hasNext()) {
this.nextRowToReturn = createRowFromNextJsonLine();
return true;
}

if (bufferedReader != null) {
IOUtils.closeQuietly(bufferedReader);
}

if (filePathIndex >= filePartition.getPaths().size()) {
return false;
}

openNextFile();
return next();
}

@Override
public InternalRow get() {
return nextRowToReturn;
}

@Override
public void close() {
IOUtils.closeQuietly(bufferedReader);
}

private void openNextFile() {
final String originalFilePath = filePartition.getPaths().get(filePathIndex);
this.currentFilePath = fileContext.decodeFilePath(originalFilePath);
this.lineCounter = 1;
this.filePathIndex++;
// To mimic the behavior of the Spark JSON data source, this will guess if the file is gzipped based on its
// file extension. This allows for .gz/.gzip files to be supported without the user having to specify the
// compression option, which is the same behavior as Spark JSON provides.
this.bufferedReader = fileContext.openFileReader(currentFilePath, true);
this.bufferedLines = bufferedReader.lines().iterator();
}

private InternalRow createRowFromNextJsonLine() {
String line = bufferedLines.next();
String uri = String.format("%s-%d.json", UTF8String.fromString(currentFilePath), lineCounter);
lineCounter++;
return new GenericInternalRow(new Object[]{
UTF8String.fromString(uri),
ByteArray.concat(line.getBytes()),
null, null, null, null, null, null
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ void readByCollection() {
assertEquals("Vivianne", doc.get("ForeName").asText());
}

@Test
void dirtyRead() {
Dataset<Row> rows = startRead()
.option(Options.READ_DOCUMENTS_COLLECTIONS, "author")
.option(Options.READ_SNAPSHOT, false)
.load();

assertEquals(15, rows.count(), "This test only verifies that the snapshot option can be set to false. " +
"We don't yet have a way to verify that the query doesn't use a consistent snapshot, which would entail " +
"forcing the read to pause while an update and merge are performed in the database. Verifying the " +
"difference between a consistent snapshot and a dirty read will need to be done manually, including " +
"by inspecting the debug logs generated by this test.");
}

@Test
void logProgress() {
newWriter().save();
Expand Down
Loading

0 comments on commit afd19a3

Please sign in to comment.