+ *
+ * Property |
+ * Description |
+ *
+ *
+ * thirdeye.flow |
+ * One of {@link com.linkedin.thirdeye.bootstrap.ThirdEyeJob.FlowSpec} |
+ *
+ *
+ * thirdeye.flow.schedule |
+ * A string describing the flow schedule (used to tag segments) |
+ *
+ *
+ * thirdeye.phase |
+ * One of {@link com.linkedin.thirdeye.bootstrap.ThirdEyeJob.PhaseSpec} |
+ *
+ *
+ * thirdeye.root |
+ * Root directory on HDFS, under which all collection data is stored |
+ *
+ *
+ * thirdeye.collection |
+ * Collection name (data stored at ${thirdeye.root}/${thirdeye.collection} |
+ *
+ *
+ * thirdeye.server.uri |
+ * URI prefix for thirdeye server (e.g. http://some-machine:10283) |
+ *
+ *
+ * thirdeye.time.path |
+ * A path to a properties file on HDFS containing thirdeye.time.min, thirdeye.time.max |
+ *
+ *
+ * thirdeye.time.min |
+ * Manually override thirdeye.time.min from thirdeye.time.path |
+ *
+ *
+ * thirdeye.time.max |
+ * Manually override thirdeye.time.max from thirdeye.time.path |
+ *
+ *
*/
public class ThirdEyeJob
{
private static final Logger LOG = LoggerFactory.getLogger(ThirdEyeJob.class);
+ private static final String ENCODING = "UTF-8";
private static final String USAGE = "usage: phase_name job.properties";
-
private static final String AVRO_SCHEMA = "schema.avsc";
-
private static final String TREE_FILE_FORMAT = ".bin";
+ private enum FlowSpec
+ {
+ BOOTSTRAP,
+ INCREMENT,
+ PATCH
+ }
+
private enum PhaseSpec
{
JOIN
@@ -106,7 +128,13 @@ String getDescription()
}
@Override
- Properties getJobProperties(Properties inputConfig, String root, String collection, long minTime, long maxTime, String inputPaths)
+ Properties getJobProperties(Properties inputConfig,
+ String root,
+ String collection,
+ FlowSpec flowSpec,
+ DateTime minTime,
+ DateTime maxTime,
+ String inputPaths)
{
return inputConfig;
}
@@ -126,7 +154,13 @@ String getDescription()
}
@Override
- Properties getJobProperties(Properties inputConfig, String root, String collection, long minTime, long maxTime, String inputPaths)
+ Properties getJobProperties(Properties inputConfig,
+ String root,
+ String collection,
+ FlowSpec flowSpec,
+ DateTime minTime,
+ DateTime maxTime,
+ String inputPaths)
{
Properties config = new Properties();
config.setProperty(AnalysisJobConstants.ANALYSIS_INPUT_AVRO_SCHEMA.toString(),
@@ -156,7 +190,13 @@ String getDescription()
}
@Override
- Properties getJobProperties(Properties inputConfig, String root, String collection, long minTime, long maxTime, String inputPaths)
+ Properties getJobProperties(Properties inputConfig,
+ String root,
+ String collection,
+ FlowSpec flowSpec,
+ DateTime minTime,
+ DateTime maxTime,
+ String inputPaths) throws Exception
{
Properties config = new Properties();
@@ -168,7 +208,7 @@ Properties getJobProperties(Properties inputConfig, String root, String collecti
config.setProperty(AggregationJobConstants.AGG_INPUT_PATH.toString(),
inputPaths);
config.setProperty(AggregationJobConstants.AGG_OUTPUT_PATH.toString(),
- getTimeDir(root, collection, minTime, maxTime) + File.separator + AGGREGATION.getName());
+ getTimeDir(root, collection, flowSpec, minTime, maxTime) + File.separator + AGGREGATION.getName());
return config;
}
@@ -188,16 +228,22 @@ String getDescription()
}
@Override
- Properties getJobProperties(Properties inputConfig, String root, String collection, long minTime, long maxTime, String inputPaths)
+ Properties getJobProperties(Properties inputConfig,
+ String root,
+ String collection,
+ FlowSpec flowSpec,
+ DateTime minTime,
+ DateTime maxTime,
+ String inputPaths) throws Exception
{
Properties config = new Properties();
config.setProperty(RollupPhaseOneConstants.ROLLUP_PHASE1_CONFIG_PATH.toString(),
getConfigPath(root, collection));
config.setProperty(RollupPhaseOneConstants.ROLLUP_PHASE1_INPUT_PATH.toString(),
- getTimeDir(root, collection, minTime, maxTime) + File.separator + AGGREGATION.getName());
+ getTimeDir(root, collection, flowSpec, minTime, maxTime) + File.separator + AGGREGATION.getName());
config.setProperty(RollupPhaseOneConstants.ROLLUP_PHASE1_OUTPUT_PATH.toString(),
- getTimeDir(root, collection, minTime, maxTime) + File.separator + ROLLUP_PHASE1.getName());
+ getTimeDir(root, collection, flowSpec, minTime, maxTime) + File.separator + ROLLUP_PHASE1.getName());
return config;
}
@@ -217,16 +263,22 @@ String getDescription()
}
@Override
- Properties getJobProperties(Properties inputConfig, String root, String collection, long minTime, long maxTime, String inputPaths)
+ Properties getJobProperties(Properties inputConfig,
+ String root,
+ String collection,
+ FlowSpec flowSpec,
+ DateTime minTime,
+ DateTime maxTime,
+ String inputPaths) throws Exception
{
Properties config = new Properties();
config.setProperty(RollupPhaseTwoConstants.ROLLUP_PHASE2_CONFIG_PATH.toString(),
getConfigPath(root, collection));
config.setProperty(RollupPhaseTwoConstants.ROLLUP_PHASE2_INPUT_PATH.toString(),
- getTimeDir(root, collection, minTime, maxTime) + File.separator + ROLLUP_PHASE1.getName() + File.separator + "belowThreshold");
+ getTimeDir(root, collection, flowSpec, minTime, maxTime) + File.separator + ROLLUP_PHASE1.getName() + File.separator + "belowThreshold");
config.setProperty(RollupPhaseTwoConstants.ROLLUP_PHASE2_OUTPUT_PATH.toString(),
- getTimeDir(root, collection, minTime, maxTime) + File.separator + ROLLUP_PHASE2.getName());
+ getTimeDir(root, collection, flowSpec, minTime, maxTime) + File.separator + ROLLUP_PHASE2.getName());
return config;
}
@@ -246,16 +298,22 @@ String getDescription()
}
@Override
- Properties getJobProperties(Properties inputConfig, String root, String collection, long minTime, long maxTime, String inputPaths)
+ Properties getJobProperties(Properties inputConfig,
+ String root,
+ String collection,
+ FlowSpec flowSpec,
+ DateTime minTime,
+ DateTime maxTime,
+ String inputPaths) throws Exception
{
Properties config = new Properties();
config.setProperty(RollupPhaseThreeConstants.ROLLUP_PHASE3_CONFIG_PATH.toString(),
getConfigPath(root, collection));
config.setProperty(RollupPhaseThreeConstants.ROLLUP_PHASE3_INPUT_PATH.toString(),
- getTimeDir(root, collection, minTime, maxTime) + File.separator + ROLLUP_PHASE2.getName());
+ getTimeDir(root, collection, flowSpec, minTime, maxTime) + File.separator + ROLLUP_PHASE2.getName());
config.setProperty(RollupPhaseThreeConstants.ROLLUP_PHASE3_OUTPUT_PATH.toString(),
- getTimeDir(root, collection, minTime, maxTime) + File.separator + ROLLUP_PHASE3.getName());
+ getTimeDir(root, collection, flowSpec, minTime, maxTime) + File.separator + ROLLUP_PHASE3.getName());
return config;
}
@@ -275,17 +333,23 @@ String getDescription()
}
@Override
- Properties getJobProperties(Properties inputConfig, String root, String collection, long minTime, long maxTime, String inputPaths)
+ Properties getJobProperties(Properties inputConfig,
+ String root,
+ String collection,
+ FlowSpec flowSpec,
+ DateTime minTime,
+ DateTime maxTime,
+ String inputPaths) throws Exception
{
Properties config = new Properties();
config.setProperty(RollupPhaseFourConstants.ROLLUP_PHASE4_CONFIG_PATH.toString(),
getConfigPath(root, collection));
config.setProperty(RollupPhaseFourConstants.ROLLUP_PHASE4_INPUT_PATH.toString(),
- getTimeDir(root, collection, minTime, maxTime) + File.separator + ROLLUP_PHASE3.getName() + "," +
- getTimeDir(root, collection, minTime, maxTime) + File.separator + ROLLUP_PHASE1.getName() + File.separator + "aboveThreshold");
+ getTimeDir(root, collection, flowSpec, minTime, maxTime) + File.separator + ROLLUP_PHASE3.getName() + "," +
+ getTimeDir(root, collection, flowSpec, minTime, maxTime) + File.separator + ROLLUP_PHASE1.getName() + File.separator + "aboveThreshold");
config.setProperty(RollupPhaseFourConstants.ROLLUP_PHASE4_OUTPUT_PATH.toString(),
- getTimeDir(root, collection, minTime, maxTime) + File.separator + ROLLUP_PHASE4.getName());
+ getTimeDir(root, collection, flowSpec, minTime, maxTime) + File.separator + ROLLUP_PHASE4.getName());
return config;
}
@@ -305,16 +369,22 @@ String getDescription()
}
@Override
- Properties getJobProperties(Properties inputConfig, String root, String collection, long minTime, long maxTime, String inputPaths)
+ Properties getJobProperties(Properties inputConfig,
+ String root,
+ String collection,
+ FlowSpec flowSpec,
+ DateTime minTime,
+ DateTime maxTime,
+ String inputPaths) throws Exception
{
Properties config = new Properties();
config.setProperty(StarTreeGenerationConstants.STAR_TREE_GEN_CONFIG_PATH.toString(),
getConfigPath(root, collection));
config.setProperty(StarTreeGenerationConstants.STAR_TREE_GEN_INPUT_PATH.toString(),
- getTimeDir(root, collection, minTime, maxTime) + File.separator + ROLLUP_PHASE4.getName());
+ getTimeDir(root, collection, flowSpec, minTime, maxTime) + File.separator + ROLLUP_PHASE4.getName());
config.setProperty(StarTreeGenerationConstants.STAR_TREE_GEN_OUTPUT_PATH.toString(),
- getTimeDir(root, collection, minTime, maxTime) + File.separator + STARTREE_GENERATION.getName());
+ getTimeDir(root, collection, flowSpec, minTime, maxTime) + File.separator + STARTREE_GENERATION.getName());
return config;
}
@@ -334,7 +404,13 @@ String getDescription()
}
@Override
- Properties getJobProperties(Properties inputConfig, String root, String collection, long minTime, long maxTime, String inputPaths) throws IOException
+ Properties getJobProperties(Properties inputConfig,
+ String root,
+ String collection,
+ FlowSpec flowSpec,
+ DateTime minTime,
+ DateTime maxTime,
+ String inputPaths) throws Exception
{
Properties config = new Properties();
config.setProperty(StarTreeBootstrapPhaseOneConstants.STAR_TREE_BOOTSTRAP_CONFIG_PATH.toString(),
@@ -346,7 +422,7 @@ Properties getJobProperties(Properties inputConfig, String root, String collecti
config.setProperty(StarTreeBootstrapPhaseOneConstants.STAR_TREE_BOOTSTRAP_INPUT_PATH.toString(),
inputPaths);
config.setProperty(StarTreeBootstrapPhaseOneConstants.STAR_TREE_BOOTSTRAP_OUTPUT_PATH.toString(),
- getTimeDir(root, collection, minTime, maxTime) + File.separator + STARTREE_BOOTSTRAP_PHASE1.getName());
+ getTimeDir(root, collection, flowSpec, minTime, maxTime) + File.separator + STARTREE_BOOTSTRAP_PHASE1.getName());
return config;
}
@@ -366,7 +442,13 @@ String getDescription()
}
@Override
- Properties getJobProperties(Properties inputConfig, String root, String collection, long minTime, long maxTime, String inputPaths) throws IOException
+ Properties getJobProperties(Properties inputConfig,
+ String root,
+ String collection,
+ FlowSpec flowSpec,
+ DateTime minTime,
+ DateTime maxTime,
+ String inputPaths) throws Exception
{
Properties config = new Properties();
@@ -375,14 +457,14 @@ Properties getJobProperties(Properties inputConfig, String root, String collecti
config.setProperty(StarTreeBootstrapPhaseTwoConstants.STAR_TREE_GENERATION_OUTPUT_PATH.toString(),
getLatestTreeDirPath(root, collection) + File.separator + STARTREE_GENERATION.getName());
config.setProperty(StarTreeBootstrapPhaseTwoConstants.STAR_TREE_BOOTSTRAP_PHASE2_INPUT_PATH.toString(),
- getTimeDir(root, collection, minTime, maxTime) + File.separator + STARTREE_BOOTSTRAP_PHASE1.getName());
+ getTimeDir(root, collection, flowSpec, minTime, maxTime) + File.separator + STARTREE_BOOTSTRAP_PHASE1.getName());
config.setProperty(StarTreeBootstrapPhaseTwoConstants.STAR_TREE_BOOTSTRAP_PHASE2_OUTPUT_PATH.toString(),
- getTimeDir(root, collection, minTime, maxTime) + File.separator + STARTREE_BOOTSTRAP_PHASE2.getName());
+ getTimeDir(root, collection, flowSpec, minTime, maxTime) + File.separator + STARTREE_BOOTSTRAP_PHASE2.getName());
return config;
}
},
- SERVER_UPDATE
+ SERVER_PUSH
{
@Override
Class> getKlazz()
@@ -393,60 +475,49 @@ Class> getKlazz()
@Override
String getDescription()
{
- return "Pushes metric data from startree_bootstrap_phase2 to thirdeye.server.uri";
+ return "Pushes data to thirdeye.server.uri";
}
@Override
- Properties getJobProperties(Properties inputConfig, String root, String collection, long minTime, long maxTime, String inputPaths)
+ Properties getJobProperties(Properties inputConfig, String root, String collection, FlowSpec flowSpec, DateTime minTime, DateTime maxTime, String inputPaths) throws Exception
{
return null; // unused
}
- },
- SERVER_BOOTSTRAP
- {
- @Override
- Class> getKlazz()
- {
- return null;
- }
-
- @Override
- String getDescription()
- {
- return "Pushes star tree, dimension, and metric data from startree_bootstrap_phase2 to thirdeye.server.uri";
- }
-
- @Override
- Properties getJobProperties(Properties inputConfig, String root, String collection, long minTime, long maxTime, String inputPaths)
- {
- return null;
- }
};
abstract Class> getKlazz();
abstract String getDescription();
- abstract Properties getJobProperties(Properties inputConfig,String root, String collection, long minTime, long maxTime, String inputPaths) throws Exception;
+ abstract Properties getJobProperties(Properties inputConfig,
+ String root,
+ String collection,
+ FlowSpec flowSpec,
+ DateTime minTime,
+ DateTime maxTime,
+ String inputPaths) throws Exception;
String getName()
{
return this.name().toLowerCase();
}
- String getCollectionDir(String root, String collection)
- {
- return root == null ? collection : root + File.separator + collection;
- }
-
String getAnalysisPath(String root, String collection)
{
return getCollectionDir(root, collection) + File.separator + "analysis";
}
- String getTimeDir(String root, String collection, long minTime, long maxTime)
+ String getTimeDir(String root,
+ String collection,
+ FlowSpec flowSpec,
+ DateTime minTime,
+ DateTime maxTime) throws IOException
{
- return getCollectionDir(root, collection) + File.separator + "data_" + minTime + "-" + maxTime;
+ return getCollectionDir(root, collection)
+ + File.separator + flowSpec.name()
+ + File.separator + "data_"
+ + StarTreeConstants.DATE_TIME_FORMATTER.print(minTime) + "_"
+ + StarTreeConstants.DATE_TIME_FORMATTER.print(maxTime);
}
String getConfigPath(String root, String collection)
@@ -458,52 +529,6 @@ String getSchemaPath(String root, String collection)
{
return getCollectionDir(root, collection) + File.separator + AVRO_SCHEMA;
}
-
- /*
- * Iterates in the data dir's generated in reverse order and returns the path
- * of the latest dir which contains tree.bin file.
- */
- String getLatestTreeDirPath(String root, String collection) throws IOException
- {
- FileSystem fs = FileSystem.get(new Configuration());
- Path collectionDir = new Path(getCollectionDir(root, collection));
-
- PathFilter dataDirFilter = new PathFilter() {
- public boolean accept(Path path) {
- return path.getName().startsWith("data_");
- }
- };
-
- Comparator