Skip to content

Commit

Permalink
Refactored hraven for feature adds:
Browse files Browse the repository at this point in the history
*ability to output to sinks other than just hbase (for job conf and history)
*added graphite sink and refactored hbase to work as a sink
*generic object model and abstraction for output records of JobFileProcessor's mapper instead of emitting Hbase puts
*changes no hraven behaviour for hbase output

note: this commit does makes hadoop2 support broken. work in progress for JobHistoryFileParserHadoop2
  • Loading branch information
Angad Singh authored and Angad Singh committed May 20, 2014
1 parent fb60917 commit 0df1646
Show file tree
Hide file tree
Showing 50 changed files with 1,391 additions and 401 deletions.
15 changes: 12 additions & 3 deletions hraven-core/src/main/java/com/twitter/hraven/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ public class Constants {
// HBase constants

// separator character used between key components
public static final char SEP_CHAR = '!';
public static final String SEP = "" + SEP_CHAR;
public static final byte[] SEP_BYTES = Bytes.toBytes(SEP);
public static final char HBASE_SEP_CHAR = '!';
public static final String HBASE_SEP = "" + HBASE_SEP_CHAR;
public static final byte[] HBASE_SEP_BYTES = Bytes.toBytes(HBASE_SEP);

public static final String SEP_CHAR = ".";

// common default values
public static final byte[] EMPTY_BYTES = new byte[0];
Expand Down Expand Up @@ -424,4 +426,11 @@ public class Constants {

/** name of the properties file used for cluster to cluster identifier mapping */
public static final String HRAVEN_CLUSTER_PROPERTIES_FILENAME = "hRavenClusters.properties";

public static final String JOBCONF_GRAPHITE_HOST_KEY = "hraven.sink.graphite.host";
public static final String JOBCONF_GRAPHITE_PORT_KEY = "hraven.sink.graphite.port";
public static final String JOBCONF_GRAPHITE_PREFIX = "hraven.sink.graphite.prefix";
public static final int GRAPHITE_DEFAULT_PORT = 2003;

public static final String JOBCONF_SINKS = "hraven.sinks";
}
4 changes: 2 additions & 2 deletions hraven-core/src/main/java/com/twitter/hraven/FlowKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ public class FlowKey implements Comparable {
*/
protected final String userName;
/**
* Identifying an application, which can go through different versions.
* Identifying an application/flow, which can go through different versions.
*/
protected final String appId;
/**
* Identifying one single run of a version of an app. Smaller values indicate
* Identifying one single run of a version of an app/flow. Smaller values indicate
* a later run. We're using an inverted timestamp Long.MAXVALUE -
* timstampMillis (milliseconds since January 1, 1970 UTC)
*/
Expand Down
66 changes: 66 additions & 0 deletions hraven-core/src/main/java/com/twitter/hraven/HravenRecord.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.twitter.hraven;

import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;

/**
*
* @author angad.singh
*
* {@link JobFileTableMapper outputs this as value. It corresponds to the
* Put record which was earlier emitted
*
* @param <K> key type
* @param <V> type of dataValue object to be stored
*/

public abstract class HravenRecord<K,V> {
private K key;
private RecordCategory dataCategory;
private RecordDataKey dataKey;
private V dataValue;
private long submitTime;

public HravenRecord() {

}

public K getKey() {
return key;
}

public void setKey(K key) {
this.key = key;
}

public RecordCategory getDataCategory() {
return dataCategory;
}

public void setDataCategory(RecordCategory dataCategory) {
this.dataCategory = dataCategory;
}

public RecordDataKey getDataKey() {
return dataKey;
}

public void setDataKey(RecordDataKey dataKey) {
this.dataKey = dataKey;
}

public V getDataValue() {
return dataValue;
}

public void setDataValue(V dataValue) {
this.dataValue = dataValue;
}

public long getSubmitTime() {
return submitTime;
}

public void setSubmitTime(long submitTime) {
this.submitTime = submitTime;
}
}
32 changes: 32 additions & 0 deletions hraven-core/src/main/java/com/twitter/hraven/HravenService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.twitter.hraven;

/**
*
* @author angad.singh
*
* {@link JobFileTableMapper outputs this as key. It corresponds to the
* Hbase table which was earlier emitted
*/

public enum HravenService {
JOB_HISTORY_RAW {
@Override
public HravenRecord getNewRecord() {
return new JobHistoryRawRecord();
}
},
JOB_HISTORY {
@Override
public HravenRecord getNewRecord() {
return new JobHistoryRecord();
}
},
JOB_HISTORY_TASK {
@Override
public HravenRecord getNewRecord() {
return new JobHistoryRecord();
}
};

public abstract HravenRecord getNewRecord();
}
6 changes: 3 additions & 3 deletions hraven-core/src/main/java/com/twitter/hraven/JobDesc.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ public Framework getFramework() {
* @see java.lang.Object#toString()
*/
public String toString() {
return getCluster() + Constants.SEP + this.userName + Constants.SEP
+ this.appId + Constants.SEP + this.version + Constants.SEP
+ this.runId + Constants.SEP + this.jobId + Constants.SEP + this.framework;
return getCluster() + Constants.HBASE_SEP + this.userName + Constants.HBASE_SEP
+ this.appId + Constants.HBASE_SEP + this.version + Constants.HBASE_SEP
+ this.runId + Constants.HBASE_SEP + this.jobId + Constants.HBASE_SEP + this.framework;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package com.twitter.hraven;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;

/**
*
* @author angad.singh
*
* Store multiple {@link JobHistoryRecord}s in a 2 level HashMap
*
* Supports iteration to get individual {@link JobHistoryRecord}s
*/

public class JobHistoryMultiRecord extends HravenRecord<JobKey, Object> implements
Iterable<JobHistoryRecord> {

private Map<RecordCategory, Map<RecordDataKey, Object>> valueMap;

public JobHistoryMultiRecord() {
valueMap = new HashMap<RecordCategory, Map<RecordDataKey, Object>>();
}

public JobHistoryMultiRecord(JobKey jobKey) {
this.setKey(jobKey);
valueMap = new HashMap<RecordCategory, Map<RecordDataKey, Object>>();
}

public void add(RecordCategory category, RecordDataKey key, Object value) {
if (valueMap.containsKey(category)) {
valueMap.get(category).put(key, value);
} else {
HashMap<RecordDataKey, Object> categoryMap = new HashMap<RecordDataKey, Object>();
valueMap.put(category, categoryMap);
categoryMap.put(key, value);
}
}

public void add(HravenRecord record) {
add(record.getDataCategory(), record.getDataKey(), record.getDataValue());
}

public Map<RecordCategory, Map<RecordDataKey, Object>> getValueMap() {
return valueMap;
}

public Object getValue(RecordCategory category, RecordDataKey key) {
return valueMap.containsKey(category) ? valueMap.get(category).get(key) : null;
}

public int size() {
int size = 0;
for (Entry<RecordCategory, Map<RecordDataKey, Object>> catMap : valueMap.entrySet()) {
size += catMap.getValue().size();
}

return size;
}

/**
* Be able to iterate easily to get individual {@link JobHistoryRecord}s
*/

@Override
public Iterator<JobHistoryRecord> iterator() {

return new Iterator<JobHistoryRecord>() {

private Iterator<Entry<RecordCategory, Map<RecordDataKey, Object>>> catIterator;
private Iterator<Entry<RecordDataKey, Object>> dataIterator;
Entry<RecordCategory, Map<RecordDataKey, Object>> nextCat;
Entry<RecordDataKey, Object> nextData;

{
catIterator = valueMap.entrySet().iterator();
nextCat = catIterator.next();
dataIterator = nextCat.getValue().entrySet().iterator();
}

@Override
public boolean hasNext() {
return dataIterator.hasNext() || catIterator.hasNext();
}

@Override
public JobHistoryRecord next() {
if (!dataIterator.hasNext()) {
nextCat = catIterator.next();
dataIterator = nextCat.getValue().entrySet().iterator();
}

nextData = dataIterator.next();

return new JobHistoryRecord(nextCat.getKey(), getKey(), nextData.getKey(),
nextData.getValue(), getSubmitTime());
}

@Override
public void remove() {
}

};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.twitter.hraven;

public class JobHistoryRawRecord extends HravenRecord<String, Object> {

public JobHistoryRawRecord(RecordCategory dataCategory, String key, RecordDataKey dataKey,
Object dataValue) {
this.setKey(key);
this.setDataCategory(dataCategory);
this.setDataKey(dataKey);
this.setDataValue(dataValue);
}

public JobHistoryRawRecord() {

}

public JobHistoryRawRecord(String taskKey) {
this.setKey(taskKey);
}

public void set(RecordCategory category, RecordDataKey key, String value) {
this.setDataCategory(category);
this.setDataKey(key);
this.setDataValue(value);
}
}
40 changes: 40 additions & 0 deletions hraven-core/src/main/java/com/twitter/hraven/JobHistoryRecord.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.twitter.hraven;

/**
*
* @author angad.singh
*
* Abstraction of a record to be stored in the {@link HravenService#JOB_HISTORY} service.
* Was earlier directly written as an Hbase put
*/

public class JobHistoryRecord extends HravenRecord<JobKey, Object> {

public JobHistoryRecord(RecordCategory dataCategory, JobKey key, RecordDataKey dataKey,
Object dataValue) {
this.setKey(key);
this.setDataCategory(dataCategory);
this.setDataKey(dataKey);
this.setDataValue(dataValue);
}

public JobHistoryRecord(RecordCategory dataCategory, JobKey key, RecordDataKey dataKey,
Object dataValue, long submitTime) {
this(dataCategory, key, dataKey, dataValue);
setSubmitTime(submitTime);
}

public JobHistoryRecord() {

}

public JobHistoryRecord(JobKey jobKey) {
this.setKey(jobKey);
}

public void set(RecordCategory category, RecordDataKey key, String value) {
this.setDataCategory(category);
this.setDataKey(key);
this.setDataValue(value);
}
}
6 changes: 3 additions & 3 deletions hraven-core/src/main/java/com/twitter/hraven/JobKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ public JobId getJobId() {
* @see java.lang.Object#toString()
*/
public String toString() {
return getCluster() + Constants.SEP + this.userName + Constants.SEP
+ this.appId + Constants.SEP + this.getRunId()
+ Constants.SEP + this.jobId.getJobIdString();
return getCluster() + Constants.HBASE_SEP + this.userName + Constants.HBASE_SEP
+ this.appId + Constants.HBASE_SEP + this.getRunId()
+ Constants.HBASE_SEP + this.jobId.getJobIdString();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.twitter.hraven;

public enum RecordCategory {
HISTORY_COUNTER, HISTORY_META, HISTORY_TASK_COUNTER, HISTORY_TASK_META, CONF, CONF_META, META,
INFERRED
}
37 changes: 37 additions & 0 deletions hraven-core/src/main/java/com/twitter/hraven/RecordDataKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.twitter.hraven;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.util.StringUtils;

public class RecordDataKey {
private List<String> components;

public RecordDataKey(String[] components) {
this.components = Arrays.asList(components);
}

public RecordDataKey(String firstComponent) {
this.components = new ArrayList<String>();
this.components.add(firstComponent);
}

public void add(String component) {
this.components.add(component);
}

public String get(int index) {
return components.get(index);
}

public List<String> getComponents() {
return components;
}

@Override
public String toString() {
return StringUtils.join(Constants.SEP_CHAR, components);
}
}
2 changes: 1 addition & 1 deletion hraven-core/src/main/java/com/twitter/hraven/TaskKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public String getTaskId() {

public String toString() {
return new StringBuilder(super.toString())
.append(Constants.SEP).append(taskId).toString();
.append(Constants.HBASE_SEP).append(taskId).toString();
}

/**
Expand Down
Loading

0 comments on commit 0df1646

Please sign in to comment.