Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark batch ingestion common code abstraction. #14415

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@
<groupId>org.codehaus.woodstox</groupId>
<artifactId>stax2-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-batch-ingestion-spark-base</artifactId>
<version>1.3.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,13 @@
*/
package org.apache.pinot.plugin.ingestion.batch.spark;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.pinot.plugin.ingestion.batch.spark.common.AbstractSparkSegmentMetadataPushJobRunner;
import org.apache.pinot.segment.local.utils.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.plugin.PluginManager;
Expand All @@ -43,8 +36,8 @@
import org.apache.spark.api.java.function.VoidFunction;


public class SparkSegmentMetadataPushJobRunner implements IngestionJobRunner, Serializable {
private SegmentGenerationJobSpec _spec;

public class SparkSegmentMetadataPushJobRunner extends AbstractSparkSegmentMetadataPushJobRunner {

public SparkSegmentMetadataPushJobRunner() {
}
Expand All @@ -54,80 +47,31 @@ public SparkSegmentMetadataPushJobRunner(SegmentGenerationJobSpec spec) {
}

@Override
public void init(SegmentGenerationJobSpec spec) {
_spec = spec;
}

@Override
public void run() {
//init all file systems
List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
}

//Get outputFS for writing output pinot segments
URI outputDirURI;
try {
outputDirURI = new URI(_spec.getOutputDirURI());
if (outputDirURI.getScheme() == null) {
outputDirURI = new File(_spec.getOutputDirURI()).toURI();
}
} catch (URISyntaxException e) {
throw new RuntimeException("outputDirURI is not valid - '" + _spec.getOutputDirURI() + "'");
}
PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
//Get list of files to process
String[] files;
try {
files = outputDirFS.listFiles(outputDirURI, true);
} catch (IOException e) {
throw new RuntimeException("Unable to list all files under outputDirURI - '" + outputDirURI + "'");
}

List<String> segmentsToPush = new ArrayList<>();
for (String file : files) {
if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
segmentsToPush.add(file);
}
}

int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
if (pushParallelism < 1) {
pushParallelism = segmentsToPush.size();
}
if (pushParallelism == 1) {
// Push from driver
try {
SegmentPushUtils.pushSegments(_spec, outputDirFS, segmentsToPush);
} catch (RetriableOperationException | AttemptsExceededException e) {
throw new RuntimeException(e);
}
} else {
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush, pushParallelism);
URI finalOutputDirURI = outputDirURI;
// Prevent using lambda expression in Spark to avoid potential serialization exceptions, use inner function
// instead.
pathRDD.foreach(new VoidFunction<String>() {
@Override
public void call(String segmentTarPath)
throws Exception {
PluginManager.get().init();
for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
PinotFSFactory
.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
}
try {
Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
.getSegmentUriToTarPathMap(finalOutputDirURI, _spec.getPushJobSpec(), new String[]{segmentTarPath});
SegmentPushUtils.sendSegmentUriAndMetadata(_spec, PinotFSFactory.create(finalOutputDirURI.getScheme()),
segmentUriToTarPathMap);
} catch (RetriableOperationException | AttemptsExceededException e) {
throw new RuntimeException(e);
}
public void parallelizeMetadataPushJob(List<String> segmentsToPush, List<PinotFSSpec> pinotFSSpecs,
int pushParallelism, URI outputDirURI) {
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush, pushParallelism);
URI finalOutputDirURI = outputDirURI;
// Prevent using lambda expression in Spark to avoid potential serialization exceptions, use inner function
// instead.
pathRDD.foreach(new VoidFunction<String>() {
@Override
public void call(String segmentTarPath)
throws Exception {
PluginManager.get().init();
for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
PinotFSFactory
.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
}
try {
Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
.getSegmentUriToTarPathMap(finalOutputDirURI, _spec.getPushJobSpec(), new String[]{segmentTarPath});
SegmentPushUtils.sendSegmentUriAndMetadata(_spec, PinotFSFactory.create(finalOutputDirURI.getScheme()),
segmentUriToTarPathMap);
} catch (RetriableOperationException | AttemptsExceededException e) {
throw new RuntimeException(e);
}
});
}
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,13 @@
*/
package org.apache.pinot.plugin.ingestion.batch.spark;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.pinot.plugin.ingestion.batch.spark.common.AbstractSparkSegmentTarPushJobRunner;
import org.apache.pinot.segment.local.utils.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.plugin.PluginManager;
Expand All @@ -43,89 +36,41 @@
import org.apache.spark.api.java.function.VoidFunction;


public class SparkSegmentTarPushJobRunner implements IngestionJobRunner, Serializable {

public class SparkSegmentTarPushJobRunner extends AbstractSparkSegmentTarPushJobRunner {
private SegmentGenerationJobSpec _spec;

public SparkSegmentTarPushJobRunner() {
super();
}

public SparkSegmentTarPushJobRunner(SegmentGenerationJobSpec spec) {
init(spec);
}

@Override
public void init(SegmentGenerationJobSpec spec) {
_spec = spec;
super(spec);
}

@Override
public void run() {
//init all file systems
List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
}

//Get outputFS for writing output pinot segments
URI outputDirURI;
try {
outputDirURI = new URI(_spec.getOutputDirURI());
if (outputDirURI.getScheme() == null) {
outputDirURI = new File(_spec.getOutputDirURI()).toURI();
}
} catch (URISyntaxException e) {
throw new RuntimeException("outputDirURI is not valid - '" + _spec.getOutputDirURI() + "'");
}
PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
//Get list of files to process
String[] files;
try {
files = outputDirFS.listFiles(outputDirURI, true);
} catch (IOException e) {
throw new RuntimeException("Unable to list all files under outputDirURI - '" + outputDirURI + "'");
}

List<String> segmentsToPush = new ArrayList<>();
for (String file : files) {
if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
segmentsToPush.add(file);
}
}

int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
if (pushParallelism < 1) {
pushParallelism = segmentsToPush.size();
}
if (pushParallelism == 1) {
// Push from driver
try {
SegmentPushUtils.pushSegments(_spec, outputDirFS, segmentsToPush);
} catch (RetriableOperationException | AttemptsExceededException e) {
throw new RuntimeException(e);
}
} else {
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush, pushParallelism);
URI finalOutputDirURI = outputDirURI;
// Prevent using lambda expression in Spark to avoid potential serialization exceptions, use inner function
// instead.
pathRDD.foreach(new VoidFunction<String>() {
@Override
public void call(String segmentTarPath)
throws Exception {
PluginManager.get().init();
for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
PinotFSFactory
.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
}
try {
SegmentPushUtils.pushSegments(_spec, PinotFSFactory.create(finalOutputDirURI.getScheme()),
Arrays.asList(segmentTarPath));
} catch (RetriableOperationException | AttemptsExceededException e) {
throw new RuntimeException(e);
}
public void parallelizeTarPushJob(List<PinotFSSpec> pinotFSSpecs,
List<String> segmentUris, int pushParallelism, URI outputDirURI) {
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
JavaRDD<String> pathRDD = sparkContext.parallelize(segmentUris, pushParallelism);
URI finalOutputDirURI = outputDirURI;
// Prevent using lambda expression in Spark to avoid potential serialization exceptions, use inner function
// instead.
pathRDD.foreach(new VoidFunction<String>() {
@Override
public void call(String segmentTarPath)
throws Exception {
PluginManager.get().init();
for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
PinotFSFactory
.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
}
});
}
try {
SegmentPushUtils.pushSegments(_spec, PinotFSFactory.create(finalOutputDirURI.getScheme()),
Arrays.asList(segmentTarPath));
} catch (RetriableOperationException | AttemptsExceededException e) {
throw new RuntimeException(e);
}
}
});
}
}
Loading
Loading