Skip to content

Commit

Permalink
THIRDEYE-324 HDFS data directory should use wall clock time
Browse files Browse the repository at this point in the history
This patch introduces a data directory structure backward incompatibility on
server and HDFS.

We will bump the minor version of thirdeye to 0.5.x after this in the multi
product.

RB=460983
R=kgopalak,npawar
A=kgopalak
  • Loading branch information
brandtg committed Apr 13, 2015
1 parent 73ba084 commit f695ec7
Show file tree
Hide file tree
Showing 60 changed files with 1,765 additions and 1,554 deletions.
2 changes: 1 addition & 1 deletion thirdeye/build
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#!/bin/bash
mvn clean package install
mvn -T 1C clean compile package
4 changes: 4 additions & 0 deletions thirdeye/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
<module>thirdeye-realtime</module>
</modules>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<build>
<pluginManagement>
<plugins>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@

public enum ThirdEyeJobConstants
{
THIRDEYE_FLOW("thirdeye.flow"),
THIRDEYE_FLOW_SCHEDULE("thirdeye.flow.schedule"),
THIRDEYE_PHASE("thirdeye.phase"),
THIRDEYE_ROOT("thirdeye.root"),
THIRDEYE_COLLECTION("thirdeye.collection"),
THIRDEYE_SERVER_URI("thirdeye.server.uri"),
THIRDEYE_TIME_PATH("thirdeye.time.path"),
THIRDEYE_TIME_MIN("thirdeye.time.min"),
THIRDEYE_TIME_MAX("thirdeye.time.max"),
INPUT_PATHS("input.paths");

private final String propertyName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@
import com.linkedin.thirdeye.api.StarTreeRecord;
import com.linkedin.thirdeye.api.DimensionKey;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
Expand All @@ -19,15 +25,51 @@

public class StarTreeJobUtils
{
private static final Logger LOG = LoggerFactory.getLogger(StarTreeJobUtils.class);
private static final String ENCODING = "UTF-8";

public static String getTreeId(FileSystem fileSystem, Path treePath) throws Exception
{
ObjectInputStream inputStream = null;
try
{
inputStream = new ObjectInputStream(fileSystem.open(treePath));
StarTreeNode root = (StarTreeNode) inputStream.readObject();
return root.getId().toString();
}
finally
{
if (inputStream != null)
{
inputStream.close();
}
}
}

public static int pushConfig(InputStream configData, String thirdEyeUri, String collection) throws IOException
{
String url = thirdEyeUri + "/collections/" + URLEncoder.encode(collection, "UTF-8");
String url = thirdEyeUri + "/collections/" + URLEncoder.encode(collection, ENCODING);
return executeHttpPost(configData, url);
}

public static int pushTree(InputStream treeData, String thirdEyeUri, String collection) throws IOException
public static int pushTree(InputStream treeData,
String thirdEyeUri,
String collection,
String treeId,
DateTime minTime,
DateTime maxTime,
String schedule) throws IOException
{
String url = thirdEyeUri + "/collections/" + URLEncoder.encode(collection, "UTF-8") + "/starTree";
String url = thirdEyeUri + "/collections/" + URLEncoder.encode(collection, ENCODING)
+ "/starTree/" + URLEncoder.encode(treeId, ENCODING)
+ "/" + minTime.getMillis()
+ "/" + maxTime.getMillis();

if (schedule != null)
{
url += "?schedule=" + URLEncoder.encode(schedule, ENCODING);
}

return executeHttpPost(treeData, url);
}

Expand All @@ -37,13 +79,24 @@ public static int pushTree(InputStream treeData, String thirdEyeUri, String coll
* @return
* The status code of the HTTP response
*/
public static int pushData(InputStream leafData, String thirdEyeUri, String collection, boolean includeDimensions) throws IOException
public static int pushData(InputStream leafData,
String thirdEyeUri,
String collection,
DateTime minTime,
DateTime maxTime,
String schedule) throws IOException
{
String url = thirdEyeUri + "/collections/" + URLEncoder.encode(collection, "UTF-8") + "/data";
if (includeDimensions)
String url = thirdEyeUri + "/collections/" + URLEncoder.encode(collection, ENCODING) + "/data/"
+ minTime.getMillis() + "/"
+ maxTime.getMillis();

if (schedule != null)
{
url += "?includeDimensions=true";
url += "?schedule=" + URLEncoder.encode(schedule, ENCODING);
}

LOG.info("POST {}", url);

return executeHttpPost(leafData, url);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public static class BootstrapPhaseTwoReducer extends
private String localOutputDataDir = "./leaf-data-output";
private String hdfsOutputDir;
private StarTreeConfig starTreeConfig;
private Path pathToTree;

@Override
public void setup(Context context) throws IOException, InterruptedException {
Expand All @@ -210,7 +211,7 @@ public void setup(Context context) throws IOException, InterruptedException {
try {

collectionName = config.getCollectionName();
Path pathToTree = new Path(starTreeOutputPath + "/" + "star-tree-"
pathToTree = new Path(starTreeOutputPath + "/" + "star-tree-"
+ collectionName, collectionName + "-tree.bin");
InputStream is = dfs.open(pathToTree);
starTreeRootNode = StarTreePersistanceUtil.loadStarTree(is);
Expand Down Expand Up @@ -324,7 +325,8 @@ protected void cleanup(Context context) throws IOException,
LOG.info("Generating " + leafDataTarGz + " from " + localOutputDataDir);

// Combine
FixedBufferUtil.combineDataFiles(new File(localTmpDataDir), new File(localOutputDataDir, "data"));
FixedBufferUtil.combineDataFiles(
dfs.open(pathToTree), new File(localTmpDataDir), new File(localOutputDataDir, "data"));

// Create tar gz of directory
TarGzCompressionUtils.createTarGzOfDirectory(localOutputDataDir, leafDataTarGz);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.hadoop.io.AvroSerialization;
import org.apache.avro.mapred.AvroKey;
import org.apache.commons.io.FileUtils;
import org.apache.commons.math.random.RandomDataImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -23,6 +24,7 @@
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -169,6 +171,12 @@ public void setUp() throws IOException
config.set(StarTreeBootstrapPhaseOneConstants.STAR_TREE_BOOTSTRAP_CONFIG_PATH.toString(), ClassLoader.getSystemResource(CONF_FILE).toString());
}

@AfterClass
public void tearDown() throws IOException
{
FileUtils.forceDelete(new File(".leaf-data.tar.gz.crc"));
}

@Test
public void testStarTreeBootstrapPhase1() throws Exception
{
Expand Down
10 changes: 10 additions & 0 deletions thirdeye/thirdeye-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,21 @@
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.3</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.linkedin.thirdeye.api;

import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

public final class StarTreeConstants
{
public static final String STAR = "*";
Expand All @@ -12,8 +15,7 @@ public final class StarTreeConstants

public static final String CONFIG_FILE_NAME = "config.yml";
public static final String TREE_FILE_NAME = "tree.bin";
public static final String SCHEMA_FILE_NAME = "schema.avsc";
public static final String DATA_DIR_NAME = "data";
public static final String DATA_DIR_PREFIX = "data";
public static final String KAFKA_CONFIG_FILE_NAME = "kafka.yml";

public static final String INDEX_FILE_SUFFIX = ".idx";
Expand All @@ -23,4 +25,6 @@ public final class StarTreeConstants
public static final String METRIC_STORE = "metricStore";
public static final String DIMENSION_STORE = "dimensionStore";
public static final String DICT_STORE = "dictStore";

public static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormat.forPattern("YYYY-MM-dd-HHmmss");
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Set;

public interface StarTreeManager
Expand All @@ -14,22 +15,20 @@ public interface StarTreeManager

/**
* @return
* The StarTree for a collection.
* The StarTrees for a collection.
*/
StarTree getStarTree(String collection);
// Set<StarTree> getStarTrees(String collection);

/** @return a map of data directory to star tree index for a collection */
Map<File, StarTree> getStarTrees(String collection);

StarTreeConfig getConfig(String collection);

/**
* Restores a previously constructed tree.
*/
void restore(File rootDir, String collection) throws Exception;

/**
* Removes and closes a star tree for a collection.
*/
void remove(String collection) throws IOException;

void open(String collection) throws IOException;

/**
* Closes all star trees this manager is managing
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,4 +453,23 @@ public void getStats(StarTreeNode node, StarTreeStats stats)
getStats(node.getStarNode(), stats);
}
}

@Override
public boolean equals(Object o)
{
if (!(o instanceof StarTree))
{
return false;
}

StarTree starTree = (StarTree) o;

return root.getId().equals(starTree.getRoot().getId());
}

@Override
public int hashCode()
{
return root.hashCode();
}
}
Loading

0 comments on commit f695ec7

Please sign in to comment.