Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/twitter/hraven
Browse files Browse the repository at this point in the history
# By Vrushali Channapattan
# Via Joep Rottinghuis (1) and Vrushali Channapattan (1)
* 'master' of https://github.com/twitter/hraven:
  Issue twitter#87 Updating jobFileProcessor.sh with latest arguments and adding a sample cost file
  Updating class names to reflect their intention better, adding some more tests and cleaning up documentation
  Updating formatting
  Modifying to include AppService, App and AppKey classes, also making a single api call for new jobs given a cluster and making user as a query param
  Updating to move get new jobs to job history service
  Updating some more comments
  Updating java docs
  Updating to remove capacity info, ensure APIs don't mix service class calls
  Updating to add final modifiers, removing abstract in interfaces, changing to return Object instead of long
  updating to enable different schedulers via factory and interface
  Issue twitter#82 Allowing for different schedulers to be supported, presently adding for fair scheduler and Updating other things as per review comments.
  minor formatting changes
  Issue twitter#82: Add a newJobs REST API, Issue twitter#81: Correct the timestamp being stored in appVersion table, Issue twitter#80: Have queue/pool name returned at flow level

Conflicts:
	bin/etl/jobFileProcessor.sh
	hraven-core/src/main/java/com/twitter/hraven/FlowKey.java
	hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryService.java
  • Loading branch information
Angad Singh authored and Angad Singh committed May 20, 2014
2 parents 0df1646 + d1b12f8 commit 4982b3a
Show file tree
Hide file tree
Showing 20 changed files with 848 additions and 76 deletions.
10 changes: 6 additions & 4 deletions bin/etl/jobFileProcessor.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
# Run on the daemon node per specific cluster
# This script runs on the HBase cluster
# Usage ./jobFileProcessor.sh [hadoopconfdir]
# [schedulerpoolname] [historyprocessingdir] [cluster] [threads] [batchsize]
# [schedulerpoolname] [historyprocessingdir] [cluster] [threads] [batchsize] [machinetype] [costfile]
# a sample cost file can be found in the conf dir as sampleCostDetails.properties

if [ $# -ne 6 ]
if [ $# -ne 8 ]
then
echo "Usage: `basename $0` [hbaseconfdir] [schedulerpoolname] [historyprocessingdir] [cluster] [threads] [batchsize]"
echo "Usage: `basename $0` [hbaseconfdir] [schedulerpoolname] [historyprocessingdir] [cluster] [threads] [batchsize] [machinetype] [costfile]"
exit 1
fi

Expand All @@ -43,4 +44,5 @@ fi
create_pidfile $HRAVEN_PID_DIR
trap 'cleanup_pidfile_and_exit $HRAVEN_PID_DIR' INT TERM EXIT

hadoop --config $1 jar $hravenEtlJar com.twitter.hraven.etl.JobFileProcessor -libjars=$LIBJARS -Dmapred.fairscheduler.pool=$2 -d -p $3 -c $4 -t $5 -b $6 -m default -z dummy.costfile
hadoop --config $1 jar $hravenEtlJar com.twitter.hraven.etl.JobFileProcessor -libjars=$LIBJARS -Dmapred.fairscheduler.pool=$2 -d -p $3 -c $4 -t $5 -b $6 -m $7 -z $8

14 changes: 14 additions & 0 deletions conf/sampleCostDetails.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# properties file to store cost data
# used in calculating cost of a job in the processing step
#
# machine type is the hardware name of node that the job runs on
#
# compute cost is the part of dollar amount of total cost of operating a machine
# allocated to compute
#
# machinememory is the max amount of memory at run time in
# megabytes available to a hadoop job
#
default.computecost=10
default.machinememory=24576
#
111 changes: 111 additions & 0 deletions hraven-core/src/main/java/com/twitter/hraven/AppKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
Copyright 2014 Twitter, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.hraven;

import org.apache.commons.lang.builder.CompareToBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;

public class AppKey implements Comparable<Object> {

/**
* The cluster on which the application ran
*/
protected final String cluster;
/**
* Who ran the application on Hadoop
*/
protected final String userName;

/**
* The thing that identifies an application,
* such as Pig script identifier, or Scalding identifier.
*/
protected final String appId;

@JsonCreator
public AppKey(@JsonProperty("cluster") String cluster, @JsonProperty("userName") String userName,
@JsonProperty("appId") String appId) {
this.cluster = cluster;
this.userName = (null == userName) ? Constants.UNKNOWN : userName.trim();
this.appId = (null == appId) ? Constants.UNKNOWN : appId.trim();
}

/**
* @return The cluster on which the job ran.
*/
public String getCluster() {
return cluster;
}

/**
* @return Who ran the application
*/
public String getUserName() {
return userName;
}

/**
* @return The thing that identifies an application, such as Pig script
* identifier, or Scalding identifier.
*/
public String getAppId() {
return appId;
}

/**
* Compares two AppKey objects on the basis of their cluster, userName, appId and encodedRunId
* @param other
* @return 0 if this cluster, userName, appId are equal to the other's
* cluster, userName, appId,
* 1 if this cluster or userName or appId are less than the other's
* cluster, userName, appId,
* -1 if this cluster and userName and appId are greater the other's
* cluster, userName, appId
*/
@Override
public int compareTo(Object other) {
if (other == null) {
return -1;
}
AppKey otherKey = (AppKey) other;
return new CompareToBuilder()
.append(this.cluster, otherKey.getCluster())
.append(this.userName, otherKey.getUserName())
.append(this.appId, otherKey.getAppId())
.toComparison();
}

@Override
public boolean equals(Object other) {
if (other instanceof AppKey) {
return compareTo((AppKey) other) == 0;
}
return false;
}

@Override
public int hashCode() {
return new HashCodeBuilder()
.append(this.cluster)
.append(this.userName)
.append(this.appId)
.toHashCode();
}

}
219 changes: 219 additions & 0 deletions hraven-core/src/main/java/com/twitter/hraven/AppSummary.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
Copyright 2014 Twitter, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.hraven;

import java.util.HashSet;
import java.util.Set;

import org.codehaus.jackson.map.annotate.JsonSerialize;

import com.twitter.hraven.datasource.ProcessingException;

/**
* Represents summary information about a hadoop application
*
* cluster, user, application name identify this app via {@linkplain AppKey}
*
* Used to represent collective statistics of an app
* either over a period of time or any other summary reporting
*
* An actual instance of an app run is represented by {@linkplain Flow}
*/
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
public class AppSummary {

/** the key that uniquely identifies this hadoop application */
private AppKey appKey;

/** number of runs in this summary */
private long numberRuns;

/** run id of the first time this app ran in this summary */
private long firstRunId;

/** run id of the last time this app ran in this summary */
private long lastRunId;

/** how many hadoop jobs in this summary */
private long jobCount;

/** Number of map tasks in this summary */
private long totalMaps;

/** Number of reduce tasks in this summary */
private long totalReduces;

/** total cost of this app in this summary */
private double cost;

/** map slot millis it has taken up */
private long mapSlotMillis;

/** reduce slot millis it has taken up */
private long reduceSlotMillis;

/** mega byte millis it has taken up */
private long mbMillis;

/** the queue(s) this app ran in, in the context of this summary*/
private Set<String> queues;

public AppSummary(AppKey key) {
this.appKey = key;
this.queues = new HashSet<String>();
}

public AppKey getKey() {
return appKey;
}

public void setKey(AppKey key) {
this.appKey = key;
}

public double getCost() {
return cost;
}

public void setCost(double cost) {
this.cost = cost;
}

public long getMbMillis() {
return mbMillis;
}

public void setMbMillis(long mbMillis) {
this.mbMillis = mbMillis;
}

public Set<String> getQueue() {
return queues;
}

public void setQueue(Set<String> queue) {
this.queues = queue;
}

public void addQueue(String aQueue) {
if (this.queues != null) {
this.queues.add(aQueue);
} else {
throw new ProcessingException("Could not add pool to list of queue for this app "
+ this.appKey);
}
}

public long getNumberRuns() {
return numberRuns;
}

public void setNumberRuns(long numberRuns) {
this.numberRuns = numberRuns;
}

public long getJobCount() {
return jobCount;
}

public void setJobCount(long jobCount) {
this.jobCount = jobCount;
}

public long getMapSlotMillis() {
return mapSlotMillis;
}

public void setMapSlotMillis(long mapSlotMillis) {
this.mapSlotMillis = mapSlotMillis;
}

public long getReduceSlotMillis() {
return reduceSlotMillis;
}

public void setReduceSlotMillis(long reduceSlotMillis) {
this.reduceSlotMillis = reduceSlotMillis;
}

public long getTotalMaps() {
return totalMaps;
}

public void setTotalMaps(long totalMaps) {
this.totalMaps = totalMaps;
}

public long getTotalReduces() {
return totalReduces;
}

public void setTotalReduces(long totalReduces) {
this.totalReduces = totalReduces;
}

public long getFirstRunId() {
return firstRunId;
}

public void setFirstRunId(long firstRunId) {
this.firstRunId = firstRunId;
}

public long getLastRunId() {
return lastRunId;
}

public void setLastRunId(long lastRunId) {
this.lastRunId = lastRunId;
}

/**
* adds a flow (a run of the app) of the app summary
* @param flow
*/
public void addFlow(Flow flow) {

// add the flow stats to this app summary
this.numberRuns++;
this.jobCount += flow.getJobCount();
this.mapSlotMillis += flow.getMapSlotMillis();
this.reduceSlotMillis += flow.getReduceSlotMillis();
this.totalMaps += flow.getTotalMaps();
this.totalReduces += flow.getTotalReduces();
this.mbMillis += flow.getMegabyteMillis();

// add the queue of this flow to the set of queues in this app summary
this.queues.add(flow.getQueue());

/** TODO add jobcost once job cost has been added to job details and flow */

// store the latest timestamp seen for this app summary
// since these are epoch timestamps, a bigger number means more recent
if ((this.lastRunId == 0L) || (this.lastRunId < flow.getRunId())) {
this.lastRunId = flow.getRunId();
}

// store the oldest seen timestamp seen for this app summary
// since these are epoch timestamps, a smaller number means older run
if ((this.firstRunId == 0L) || (this.firstRunId > flow.getRunId())) {
this.firstRunId = flow.getRunId();
}

}

}
Loading

0 comments on commit 4982b3a

Please sign in to comment.