Skip to content

Commit

Permalink
Refactored SparkSegmentMetadataPushJob
Browse files Browse the repository at this point in the history
  • Loading branch information
abhioncbr committed Nov 22, 2024
1 parent 9ce4f7f commit 121c1c8
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,13 @@
*/
package org.apache.pinot.plugin.ingestion.batch.spark;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.pinot.plugin.ingestion.batch.spark.common.AbstractSparkSegmentMetadataPushJobRunner;
import org.apache.pinot.segment.local.utils.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.plugin.PluginManager;
Expand All @@ -43,8 +36,8 @@
import org.apache.spark.api.java.function.VoidFunction;


public class SparkSegmentMetadataPushJobRunner implements IngestionJobRunner, Serializable {
private SegmentGenerationJobSpec _spec;

public class SparkSegmentMetadataPushJobRunner extends AbstractSparkSegmentMetadataPushJobRunner {

public SparkSegmentMetadataPushJobRunner() {
}
Expand All @@ -54,80 +47,31 @@ public SparkSegmentMetadataPushJobRunner(SegmentGenerationJobSpec spec) {
}

@Override
public void init(SegmentGenerationJobSpec spec) {
_spec = spec;
}

@Override
public void run() {
//init all file systems
List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
}

//Get outputFS for writing output pinot segments
URI outputDirURI;
try {
outputDirURI = new URI(_spec.getOutputDirURI());
if (outputDirURI.getScheme() == null) {
outputDirURI = new File(_spec.getOutputDirURI()).toURI();
}
} catch (URISyntaxException e) {
throw new RuntimeException("outputDirURI is not valid - '" + _spec.getOutputDirURI() + "'");
}
PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
//Get list of files to process
String[] files;
try {
files = outputDirFS.listFiles(outputDirURI, true);
} catch (IOException e) {
throw new RuntimeException("Unable to list all files under outputDirURI - '" + outputDirURI + "'");
}

List<String> segmentsToPush = new ArrayList<>();
for (String file : files) {
if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
segmentsToPush.add(file);
}
}

int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
if (pushParallelism < 1) {
pushParallelism = segmentsToPush.size();
}
if (pushParallelism == 1) {
// Push from driver
try {
SegmentPushUtils.pushSegments(_spec, outputDirFS, segmentsToPush);
} catch (RetriableOperationException | AttemptsExceededException e) {
throw new RuntimeException(e);
}
} else {
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush, pushParallelism);
URI finalOutputDirURI = outputDirURI;
// Prevent using lambda expression in Spark to avoid potential serialization exceptions, use inner function
// instead.
pathRDD.foreach(new VoidFunction<String>() {
@Override
public void call(String segmentTarPath)
throws Exception {
PluginManager.get().init();
for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
PinotFSFactory
.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
}
try {
Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
.getSegmentUriToTarPathMap(finalOutputDirURI, _spec.getPushJobSpec(), new String[]{segmentTarPath});
SegmentPushUtils.sendSegmentUriAndMetadata(_spec, PinotFSFactory.create(finalOutputDirURI.getScheme()),
segmentUriToTarPathMap);
} catch (RetriableOperationException | AttemptsExceededException e) {
throw new RuntimeException(e);
}
public void parallelizeMetadataPushJob(List<String> segmentsToPush, List<PinotFSSpec> pinotFSSpecs,
int pushParallelism, URI outputDirURI) {
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush, pushParallelism);
URI finalOutputDirURI = outputDirURI;
// Prevent using lambda expression in Spark to avoid potential serialization exceptions, use inner function
// instead.
pathRDD.foreach(new VoidFunction<String>() {
@Override
public void call(String segmentTarPath)
throws Exception {
PluginManager.get().init();
for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
PinotFSFactory
.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
}
try {
Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
.getSegmentUriToTarPathMap(finalOutputDirURI, _spec.getPushJobSpec(), new String[]{segmentTarPath});
SegmentPushUtils.sendSegmentUriAndMetadata(_spec, PinotFSFactory.create(finalOutputDirURI.getScheme()),
segmentUriToTarPathMap);
} catch (RetriableOperationException | AttemptsExceededException e) {
throw new RuntimeException(e);
}
});
}
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public SparkSegmentTarPushJobRunner(SegmentGenerationJobSpec spec) {
super(spec);
}

public void parallelizePushJob(List<PinotFSSpec> pinotFSSpecs,
public void parallelizeTarPushJob(List<PinotFSSpec> pinotFSSpecs,
List<String> segmentUris, int pushParallelism, URI outputDirURI) {
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
JavaRDD<String> pathRDD = sparkContext.parallelize(segmentUris, pushParallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public SparkSegmentUriPushJobRunner(SegmentGenerationJobSpec spec) {
}

@Override
public void parallelizePushJob(List<PinotFSSpec> pinotFSSpecs, List<String> segmentUris, int pushParallelism) {
public void parallelizeUriPushJob(List<PinotFSSpec> pinotFSSpecs, List<String> segmentUris, int pushParallelism) {
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
JavaRDD<String> pathRDD = sparkContext.parallelize(segmentUris, pushParallelism);
// Prevent using lambda expression in Spark to avoid potential serialization exceptions, use inner function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public SparkSegmentTarPushJobRunner(SegmentGenerationJobSpec spec) {
super(spec);
}

public void parallelizePushJob(List<PinotFSSpec> pinotFSSpecs,
public void parallelizeTarPushJob(List<PinotFSSpec> pinotFSSpecs,
List<String> segmentUris, int pushParallelism, URI outputDirURI) {
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
JavaRDD<String> pathRDD = sparkContext.parallelize(segmentUris, pushParallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public SparkSegmentUriPushJobRunner(SegmentGenerationJobSpec spec) {
}

@Override
public void parallelizePushJob(List<PinotFSSpec> pinotFSSpecs, List<String> segmentUris, int pushParallelism) {
public void parallelizeUriPushJob(List<PinotFSSpec> pinotFSSpecs, List<String> segmentUris, int pushParallelism) {
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
JavaRDD<String> pathRDD = sparkContext.parallelize(segmentUris, pushParallelism);
// Prevent using lambda expression in Spark to avoid potential serialization exceptions, use inner function
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.ingestion.batch.spark.common;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pinot.segment.local.utils.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.spi.utils.retry.RetriableOperationException;


public abstract class AbstractSparkSegmentMetadataPushJobRunner implements IngestionJobRunner, Serializable {
protected SegmentGenerationJobSpec _spec;

public AbstractSparkSegmentMetadataPushJobRunner() {
}

public AbstractSparkSegmentMetadataPushJobRunner(SegmentGenerationJobSpec spec) {
init(spec);
}

@Override
public void init(SegmentGenerationJobSpec spec) {
_spec = spec;
}

@Override
public void run() {
//init all file systems
List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
}

//Get outputFS for writing output pinot segments
URI outputDirURI;
try {
outputDirURI = new URI(_spec.getOutputDirURI());
if (outputDirURI.getScheme() == null) {
outputDirURI = new File(_spec.getOutputDirURI()).toURI();
}
} catch (URISyntaxException e) {
throw new RuntimeException("outputDirURI is not valid - '" + _spec.getOutputDirURI() + "'");
}
PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
//Get list of files to process
String[] files;
try {
files = outputDirFS.listFiles(outputDirURI, true);
} catch (IOException e) {
throw new RuntimeException("Unable to list all files under outputDirURI - '" + outputDirURI + "'");
}

List<String> segmentsToPush = new ArrayList<>();
for (String file : files) {
if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
segmentsToPush.add(file);
}
}

int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
if (pushParallelism < 1) {
pushParallelism = segmentsToPush.size();
}
if (pushParallelism == 1) {
// Push from driver
try {
SegmentPushUtils.pushSegments(_spec, outputDirFS, segmentsToPush);
} catch (RetriableOperationException | AttemptsExceededException e) {
throw new RuntimeException(e);
}
} else {
parallelizeMetadataPushJob(segmentsToPush, pinotFSSpecs, pushParallelism, outputDirURI);
}
}

/**
* Parallelizes the metadata push job using Spark to distribute the work across multiple nodes.
*
* @param segmentsToPush the list of segment URIs to be pushed
* @param pinotFSSpecs the list of Pinot file system specifications to be registered
* @param pushParallelism the level of parallelism for the push job
* @param outputDirURI the URI of the output directory containing the segments
*/
public abstract void parallelizeMetadataPushJob(List<String> segmentsToPush, List<PinotFSSpec> pinotFSSpecs,
int pushParallelism, URI outputDirURI);
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,17 @@ public void run() {
throw new RuntimeException(e);
}
} else {
parallelizePushJob(pinotFSSpecs, segmentsToPush, pushParallelism, outputDirURI);
parallelizeTarPushJob(pinotFSSpecs, segmentsToPush, pushParallelism, outputDirURI);
}
}

/**
* Parallelizes the push job using Spark to distribute the work across multiple nodes.
* Parallelizes the tar push job using Spark to distribute the work across multiple nodes.
*
* @param pinotFSSpecs the list of Pinot file system specifications to be registered
* @param segmentUris the list of segment URIs to be pushed
* @param pushParallelism the level of parallelism for the push job
*/
public abstract void parallelizePushJob(List<PinotFSSpec> pinotFSSpecs,
public abstract void parallelizeTarPushJob(List<PinotFSSpec> pinotFSSpecs,
List<String> segmentUris, int pushParallelism, URI outputDirURI);
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,17 @@ public void run() {
throw new RuntimeException(e);
}
} else {
parallelizePushJob(pinotFSSpecs, segmentUris, pushParallelism);
parallelizeUriPushJob(pinotFSSpecs, segmentUris, pushParallelism);
}
}

/**
* Parallelizes the push job using Spark to distribute the work across multiple nodes.
* Parallelizes the uri push job using Spark to distribute the work across multiple nodes.
*
* @param pinotFSSpecs the list of Pinot file system specifications to be registered
* @param segmentUris the list of segment URIs to be pushed
* @param pushParallelism the level of parallelism for the push job
*/
public abstract void parallelizePushJob(List<PinotFSSpec> pinotFSSpecs,
public abstract void parallelizeUriPushJob(List<PinotFSSpec> pinotFSSpecs,
List<String> segmentUris, int pushParallelism);
}

0 comments on commit 121c1c8

Please sign in to comment.