Skip to content

Commit

Permalink
https://github.com/gbif/pipelines/issues/1078
Browse files Browse the repository at this point in the history
adding CLI components for incremental table build using iceberg
  • Loading branch information
fmendezh committed Sep 20, 2024
1 parent ac238be commit 92b2970
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@
import lombok.Builder;
import lombok.NoArgsConstructor;
import org.gbif.api.model.pipelines.InterpretationType.RecordType;
import org.gbif.common.messaging.api.messages.PipelinesEventsInterpretedMessage;
import org.gbif.common.messaging.api.messages.PipelinesEventsMessage;
import org.gbif.common.messaging.api.messages.PipelinesInterpretationMessage;
import org.gbif.common.messaging.api.messages.PipelinesInterpretedMessage;
import org.gbif.common.messaging.api.messages.PipelinesVerbatimMessage;
import org.gbif.common.messaging.api.messages.*;
import org.gbif.pipelines.common.configs.AvroWriteConfiguration;
import org.gbif.pipelines.common.configs.ElasticsearchConfiguration;
import org.gbif.pipelines.common.configs.IndexConfiguration;
Expand Down Expand Up @@ -195,6 +191,23 @@ public static BeamParameters occurrenceHdfsView(
.putCondition(config.recordType == RecordType.EVENT, "coreRecordType", "EVENT");
}

public static BeamParameters occurrenceWarehouse(
HdfsViewConfiguration config, PipelinesHdfsViewMessage message) {

return BeamParameters.create()
.putRequireNonNull("datasetId", message.getDatasetUuid())
.put("attempt", message.getAttempt())
.put("runner", "SparkRunner")
.putRequireNonNull("metaFileName", config.metaFileName)
.putRequireNonNull("inputPath", config.stepConfig.repositoryPath)
.putRequireNonNull("targetPath", config.repositoryTargetPath)
.putRequireNonNull("hdfsSiteConfig", config.stepConfig.hdfsSiteConfig)
.putRequireNonNull("coreSiteConfig", config.stepConfig.coreSiteConfig)
.putRequireNonNull("properties", config.pipelinesConfig)
.put("experiments", "use_deprecated_read")
.putCondition(config.recordType == RecordType.EVENT, "coreRecordType", "EVENT");
}

public static BeamParameters eventInterpretation(
EventsInterpretationConfiguration config,
PipelinesEventsMessage message,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package org.gbif.pipelines.tasks.occurrences.warehouse;

import com.google.common.base.Strings;
import lombok.Builder;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.gbif.common.messaging.AbstractMessageCallback;
import org.gbif.common.messaging.api.MessagePublisher;
import org.gbif.common.messaging.api.messages.PipelinesHdfsViewMessage;
import org.gbif.common.messaging.api.messages.PipelinesInterpretationMessage;
import org.gbif.common.messaging.api.messages.PipelinesInterpretedMessage;
import org.gbif.pipelines.common.PipelinesVariables;
import org.gbif.pipelines.common.airflow.AppName;
import org.gbif.pipelines.common.process.AirflowSparkLauncher;
import org.gbif.pipelines.common.process.BeamParametersBuilder;
import org.gbif.pipelines.common.process.RecordCountReader;
import org.gbif.pipelines.common.process.SparkDynamicSettings;
import org.gbif.pipelines.tasks.PipelinesCallback;
import org.gbif.pipelines.tasks.StepHandler;
import org.gbif.pipelines.tasks.common.hdfs.CommonHdfsViewCallback;
import org.gbif.pipelines.tasks.common.hdfs.HdfsViewConfiguration;
import org.gbif.pipelines.tasks.verbatims.dwca.DwcaToAvroConfiguration;
import org.gbif.registry.ws.client.DatasetClient;
import org.gbif.registry.ws.client.pipelines.PipelinesHistoryClient;

/** Callback which is called when an instance {@link PipelinesInterpretationMessage} is received. */
@Slf4j
@Builder
public class DataWarehouseCallback extends AbstractMessageCallback<PipelinesHdfsViewMessage>
implements StepHandler<PipelinesHdfsViewMessage, DataWarehouseFinishMessage> {

protected final HdfsViewConfiguration config;
private final MessagePublisher publisher;
private final PipelinesHistoryClient historyClient;
private final DatasetClient datasetClient;
private final CommonHdfsViewCallback commonHdfsViewCallback;

@Override
public void handleMessage(PipelinesHdfsViewMessage message) {
PipelinesCallback.<PipelinesHdfsViewMessage, DataWarehouseFinishMessage>builder()
.historyClient(historyClient)
.datasetClient(datasetClient)
.config(config)
.stepType(config.stepType)
.publisher(publisher)
.message(message)
.handler(this)
.build()
.handleMessage();
}

@Override
public String getRouting() {
return new PipelinesInterpretedMessage().setRunner(config.processRunner).getRoutingKey();
}

/** Main message processing logic, creates a terminal java process, which runs */
@Override
public Runnable createRunnable(PipelinesHdfsViewMessage message) {
return () -> {
BeamParametersBuilder.BeamParameters beamParameters =
BeamParametersBuilder.occurrenceWarehouse(config, message);
runDistributed(message, beamParameters);
};
}

@SneakyThrows
private void runDistributed(
PipelinesHdfsViewMessage message, BeamParametersBuilder.BeamParameters beamParameters) {

long recordsNumber =
RecordCountReader.builder()
.stepConfig(config.stepConfig)
.datasetKey(message.getDatasetUuid().toString())
.attempt(message.getAttempt().toString())
.metaFileName(new DwcaToAvroConfiguration().metaFileName)
.metricName(PipelinesVariables.Metrics.ARCHIVE_TO_OCC_COUNT)
.build()
.get();

log.info("Calculate job's settings based on {} records", recordsNumber);
boolean useMemoryExtraCoef =
config.sparkConfig.extraCoefDatasetSet.contains(message.getDatasetUuid().toString());
SparkDynamicSettings sparkDynamicSettings =
SparkDynamicSettings.create(config.sparkConfig, recordsNumber, useMemoryExtraCoef);

// App name
String sparkAppName =
AppName.get(config.stepType, message.getDatasetUuid(), message.getAttempt());

// Submit
AirflowSparkLauncher.builder()
.airflowConfiguration(config.airflowConfig)
.sparkStaticConfiguration(config.sparkConfig)
.sparkDynamicSettings(sparkDynamicSettings)
.beamParameters(beamParameters)
.sparkAppName(sparkAppName)
.build()
.submitAwaitVoid();
}

@Override
public DataWarehouseFinishMessage createOutgoingMessage(PipelinesHdfsViewMessage message) {
return new DataWarehouseFinishMessage(
message.getDatasetUuid(), message.getAttempt(), message.getPipelineSteps(), null, null);
}

/**
* Only correct messages can be handled, by now is only messages with the same runner as runner in
* service config {@link HdfsViewConfiguration#processRunner}
*/
@Override
public boolean isMessageCorrect(PipelinesHdfsViewMessage message) {
if (Strings.isNullOrEmpty(message.getRunner())) {
throw new IllegalArgumentException("Runner can't be null or empty " + message);
}

if (!config.processRunner.equals(message.getRunner())) {
log.warn("Skipping, because runner is incorrect");
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.gbif.pipelines.tasks.occurrences.warehouse;

import com.google.common.util.concurrent.Service;
import org.gbif.cli.Command;
import org.gbif.cli.service.ServiceCommand;
import org.gbif.pipelines.tasks.common.hdfs.HdfsViewConfiguration;
import org.kohsuke.MetaInfServices;

/**
* Entry class for cli command, to start service to process Hdfs View This command starts a service
* which listens to the {@link org.gbif.common.messaging.api.messages.PipelinesInterpretedMessage }
*/
@MetaInfServices(Command.class)
public class DataWarehouseCommand extends ServiceCommand {

private final HdfsViewConfiguration config = new HdfsViewConfiguration();

public DataWarehouseCommand() {
super("pipelines-warehouse");
}

@Override
protected Service getService() {
return new DataWarehouseService(config);
}

@Override
protected Object getConfigurationObject() {
return config;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.gbif.pipelines.tasks.occurrences.warehouse;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Set;
import java.util.UUID;
import org.gbif.common.messaging.api.messages.PipelinesHdfsViewMessage;

public class DataWarehouseFinishMessage extends PipelinesHdfsViewMessage {

public DataWarehouseFinishMessage() {}

@JsonCreator
public DataWarehouseFinishMessage(
@JsonProperty("datasetUuid") UUID datasetUuid,
@JsonProperty("attempt") int attempt,
@JsonProperty("pipelineSteps") Set<String> pipelineSteps,
@JsonProperty("runner") String runner,
@JsonProperty("executionId") Long executionId) {
super(datasetUuid, attempt, pipelineSteps, runner, executionId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.gbif.pipelines.tasks.occurrences.warehouse;

import com.google.common.util.concurrent.AbstractIdleService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
import org.gbif.common.messaging.DefaultMessagePublisher;
import org.gbif.common.messaging.MessageListener;
import org.gbif.common.messaging.api.MessagePublisher;
import org.gbif.pipelines.common.configs.StepConfiguration;
import org.gbif.pipelines.tasks.ServiceFactory;
import org.gbif.pipelines.tasks.common.hdfs.CommonHdfsViewCallback;
import org.gbif.pipelines.tasks.common.hdfs.HdfsViewConfiguration;
import org.gbif.registry.ws.client.DatasetClient;
import org.gbif.registry.ws.client.pipelines.PipelinesHistoryClient;

/**
* A service which listens to the {@link
* org.gbif.common.messaging.api.messages.PipelinesInterpretedMessage }
*/
@Slf4j
public class DataWarehouseService extends AbstractIdleService {

private final HdfsViewConfiguration config;
private MessageListener listener;
private MessagePublisher publisher;
private ExecutorService executor;

public DataWarehouseService(HdfsViewConfiguration config) {
this.config = config;
}

@Override
protected void startUp() throws Exception {
log.info("Started pipelines-warehouse service");
// Prefetch is one, since this is a long-running process.
StepConfiguration c = config.stepConfig;
listener = new MessageListener(c.messaging.getConnectionParameters(), 1);
publisher = new DefaultMessagePublisher(c.messaging.getConnectionParameters());
executor =
config.standaloneNumberThreads == null
? null
: Executors.newFixedThreadPool(config.standaloneNumberThreads);

PipelinesHistoryClient historyClient =
ServiceFactory.createPipelinesHistoryClient(config.stepConfig);

DatasetClient datasetClient = ServiceFactory.createDatasetClient(config.stepConfig);

DataWarehouseCallback callback =
DataWarehouseCallback.builder()
.config(config)
.publisher(publisher)
.historyClient(historyClient)
.datasetClient(datasetClient)
.commonHdfsViewCallback(CommonHdfsViewCallback.create(config, executor))
.build();

listener.listen(c.queueName, callback.getRouting(), c.poolSize, callback);
}

@Override
protected void shutDown() {
listener.close();
publisher.close();
executor.shutdown();
log.info("Stopping pipelines-warehouse service");
}
}

0 comments on commit 92b2970

Please sign in to comment.