Skip to content

Commit

Permalink
Merge branch '622_distribution' of https://github.com/gbif/pipelines
Browse files Browse the repository at this point in the history
…into 622_distribution
  • Loading branch information
qifeng-bai committed Feb 3, 2022
2 parents 4f3a8a0 + 5e51baa commit 8e193fc
Show file tree
Hide file tree
Showing 79 changed files with 402 additions and 307 deletions.
2 changes: 1 addition & 1 deletion examples/metrics/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>examples</artifactId>
<version>2.10.0-SNAPSHOT</version>
<version>2.11.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>pipelines-parent</artifactId>
<groupId>org.gbif.pipelines</groupId>
<version>2.10.0-SNAPSHOT</version>
<version>2.11.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion examples/transform/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>examples</artifactId>
<version>2.10.0-SNAPSHOT</version>
<version>2.11.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/coordinator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>gbif</artifactId>
<version>2.10.0-SNAPSHOT</version>
<version>2.11.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/coordinator/tasks-integration-tests-hbase/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>coordinator</artifactId>
<version>2.10.0-SNAPSHOT</version>
<version>2.11.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/coordinator/tasks-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>coordinator</artifactId>
<version>2.10.0-SNAPSHOT</version>
<version>2.11.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,65 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.UUID;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.TestingServer;
import org.apache.zookeeper.CreateMode;
import org.gbif.common.messaging.api.messages.PipelinesCleanerMessage;
import org.gbif.pipelines.estools.EsIndex;
import org.gbif.pipelines.estools.model.IndexParams;
import org.gbif.pipelines.estools.service.EsService;
import org.gbif.pipelines.tasks.ValidationWsClientStub;
import org.gbif.pipelines.tasks.utils.EsServer;
import org.gbif.validator.ws.client.ValidationWsClient;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

public class CleanerCallbackIT {

@ClassRule public static final EsServer ES_SERVER = new EsServer();
private static CuratorFramework curator;
private static TestingServer server;

@BeforeClass
public static void setUp() throws Exception {

server = new TestingServer();
curator =
CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.namespace("crawler")
.retryPolicy(new RetryOneTime(1))
.build();
curator.start();
}

@AfterClass
public static void tearDown() throws IOException {
curator.close();
server.close();
}

@Before
public void cleanIndexes() {
EsService.deleteAllIndexes(ES_SERVER.getEsClient());
}

@Test
public void cleanerDeleteEsRecordsTest() {
public void cleanerDeleteEsRecordsTest() throws Exception {

// State
String datasetUuid = "8a4934ac-7d7f-41d4-892c-f6b71bb777a3";
Expand Down Expand Up @@ -64,7 +94,7 @@ public void cleanerDeleteEsRecordsTest() {
EsService.refreshIndex(ES_SERVER.getEsClient(), config.esAliases[0]);

// When
new CleanerCallback(config, validationClient).handleMessage(message);
new CleanerCallback(config, validationClient, curator).handleMessage(message);

// Update deleted data available
EsService.refreshIndex(ES_SERVER.getEsClient(), config.esAliases[0]);
Expand All @@ -74,10 +104,11 @@ public void cleanerDeleteEsRecordsTest() {
assertFalse(Files.exists(Paths.get(String.join("/", config.hdfsRootPath, datasetUuid))));
assertEquals(0L, EsService.countIndexDocuments(ES_SERVER.getEsClient(), config.esAliases[0]));
assertNotNull(validationClient.get(UUID.fromString(datasetUuid)).getDeleted());
assertNull(curator.checkExists().forPath("/validator/" + datasetUuid));
}

@Test
public void cleanerDeleteEsIndexTest() {
public void cleanerDeleteEsIndexTest() throws Exception {

// State
String datasetUuid = "8a4934ac-7d7f-41d4-892c-f6b71bb777a3";
Expand Down Expand Up @@ -124,14 +155,22 @@ public void cleanerDeleteEsIndexTest() {
Collections.singleton(indexName),
Collections.singleton(indexToSwap));

// Add ZK path
curator
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/validator/" + datasetUuid, "test".getBytes());

// When
new CleanerCallback(config, validationClient).handleMessage(message);
new CleanerCallback(config, validationClient, curator).handleMessage(message);

// Should
assertFalse(Files.exists(Paths.get(String.join("/", config.fsRootPath, datasetUuid))));
assertFalse(Files.exists(Paths.get(String.join("/", config.hdfsRootPath, datasetUuid))));
assertFalse(EsService.existsIndex(ES_SERVER.getEsClient(), indexName));
assertNotNull(validationClient.get(UUID.fromString(datasetUuid)).getDeleted());
assertNull(curator.checkExists().forPath("/validator/" + datasetUuid));
}

private PipelinesCleanerMessage createMessage(String datasetUuid) {
Expand Down
4 changes: 2 additions & 2 deletions gbif/coordinator/tasks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>coordinator</artifactId>
<version>2.10.0-SNAPSHOT</version>
<version>2.11.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -84,7 +84,7 @@
<gbif-cli.version>0.14</gbif-cli.version>
<gbif-httputils.version>0.12</gbif-httputils.version>
<gbif-wrangler.version>0.3</gbif-wrangler.version>
<gbif-crawler.version>1.10-SNAPSHOT</gbif-crawler.version>
<gbif-crawler.version>1.10</gbif-crawler.version>

<guice.version>4.2.3</guice.version>
<!-- Match the GBIF http-client! -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -109,7 +111,7 @@ public static List<FileStatus> getSubDirList(
* @param filePath to a yaml file
* @param key to value in yaml
*/
public static String getValueByKey(
public static Optional<String> getValueByKey(
String hdfsSiteConfig, String coreSiteConfig, String filePath, String key)
throws IOException {
FileSystem fs = getFileSystem(hdfsSiteConfig, coreSiteConfig, filePath);
Expand All @@ -120,11 +122,36 @@ public static String getValueByKey(
.map(x -> x.replace("\u0000", ""))
.filter(y -> y.startsWith(key))
.findFirst()
.map(z -> z.replace(key + ": ", ""))
.orElse("");
.map(z -> z.replace(key + ": ", ""));
}
}
return "";
return Optional.empty();
}

/**
* Reads a yaml file and returns long by key
*
* @param hdfsSiteConfig path to hdfs-site.xml config file
* @param filePath to a yaml file
* @param key to value in yaml
*/
public static Optional<Long> getLongByKey(
String hdfsSiteConfig, String coreSiteConfig, String filePath, String key)
throws IOException {
return getValueByKey(hdfsSiteConfig, coreSiteConfig, filePath, key).map(Long::parseLong);
}

/**
* Reads a yaml file and returns double by key
*
* @param hdfsSiteConfig path to hdfs-site.xml config file
* @param filePath to a yaml file
* @param key to value in yaml
*/
public static Optional<Double> getDoubleByKey(
String hdfsSiteConfig, String coreSiteConfig, String filePath, String key)
throws IOException {
return getValueByKey(hdfsSiteConfig, coreSiteConfig, filePath, key).map(Double::parseDouble);
}

/**
Expand Down Expand Up @@ -189,10 +216,15 @@ public static void deleteSubFolders(
getSubDirList(hdfsSiteConfig, coreSiteConfig, filePath).stream()
.filter(
x ->
LocalDateTime.ofEpochSecond(x.getModificationTime(), 0, ZoneOffset.UTC)
LocalDateTime.ofInstant(
Instant.ofEpochMilli(x.getModificationTime()), ZoneId.systemDefault())
.isBefore(date))
.map(y -> y.getPath().getName())
.forEach(z -> deleteDirectory(hdfsSiteConfig, coreSiteConfig, z));
.map(y -> y.getPath().toString())
.forEach(
z -> {
boolean deleted = deleteDirectory(hdfsSiteConfig, coreSiteConfig, z);
log.info("Tried to delete directory {}, is deleted? {}", z, deleted);
});
}

private static FileSystem getFileSystem(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Optional;
import java.util.Set;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -125,37 +126,37 @@ private static long getRecordNumber(
String metaPath = String.join("/", repositoryPath, datasetId, attempt, metaFileName);

Long messageNumber = message.getNumberOfRecords();
String fileNumber =
HdfsUtils.getValueByKey(
Optional<Long> fileNumber =
HdfsUtils.getLongByKey(
stepConfig.hdfsSiteConfig,
stepConfig.coreSiteConfig,
metaPath,
Metrics.BASIC_RECORDS_COUNT + "Attempted");
Metrics.BASIC_RECORDS_COUNT + Metrics.ATTEMPTED);

// Fail if fileNumber is null
if (!message.isValidator()) {
Set<String> types = message.getInterpretTypes();
boolean isCorrectType = types.contains(ALL.name()) || types.contains(BASIC.name());
boolean noFileRecords = fileNumber == null || Long.parseLong(fileNumber) == 0L;
boolean noFileRecords = !fileNumber.isPresent() || fileNumber.get() == 0L;
if (isCorrectType && noFileRecords) {
throw new IllegalArgumentException(
"Basic records must be interpreted, but fileNumber is null or 0, please validate the archive!");
}
}

if (messageNumber == null && (fileNumber == null || fileNumber.isEmpty())) {
if (messageNumber == null && !fileNumber.isPresent()) {
throw new IllegalArgumentException(
"Please check archive-to-avro metadata yaml file or message records number, recordsNumber can't be null or empty!");
}

if (messageNumber == null) {
return Long.parseLong(fileNumber);
return fileNumber.get();
}

if (fileNumber == null || fileNumber.isEmpty()) {
if (!fileNumber.isPresent() || messageNumber > fileNumber.get()) {
return messageNumber;
}

return messageNumber > Long.parseLong(fileNumber) ? messageNumber : Long.parseLong(fileNumber);
return fileNumber.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Optional;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
Expand Down Expand Up @@ -133,27 +134,27 @@ private static long getRecordNumber(
&& message.getValidationResult().getNumberOfRecords() != null
? message.getValidationResult().getNumberOfRecords()
: null;
String fileNumber =
HdfsUtils.getValueByKey(
Optional<Long> fileNumber =
HdfsUtils.getLongByKey(
stepConfig.hdfsSiteConfig,
stepConfig.coreSiteConfig,
metaPath,
Metrics.ARCHIVE_TO_ER_COUNT);

if (messageNumber == null && (fileNumber == null || fileNumber.isEmpty())) {
if (messageNumber == null && !fileNumber.isPresent()) {
throw new IllegalArgumentException(
"Please check archive-to-avro metadata yaml file or message records number, recordsNumber can't be null or empty!");
}

if (messageNumber == null) {
return Long.parseLong(fileNumber);
return fileNumber.get();
}

if (fileNumber == null || fileNumber.isEmpty()) {
if (!fileNumber.isPresent() || messageNumber > fileNumber.get()) {
return messageNumber;
}

return messageNumber > Long.parseLong(fileNumber) ? messageNumber : Long.parseLong(fileNumber);
return fileNumber.get();
}

/** Finds the latest attempt number in HDFS */
Expand Down
Loading

0 comments on commit 8e193fc

Please sign in to comment.