diff --git a/hraven-core/src/main/java/com/twitter/hraven/rest/ObjectMapperProvider.java b/hraven-core/src/main/java/com/twitter/hraven/rest/ObjectMapperProvider.java index cebb649..a9cfdbf 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/rest/ObjectMapperProvider.java +++ b/hraven-core/src/main/java/com/twitter/hraven/rest/ObjectMapperProvider.java @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -*/ + */ package com.twitter.hraven.rest; import java.io.IOException; @@ -23,6 +23,7 @@ import javax.ws.rs.ext.Provider; import org.apache.hadoop.conf.Configuration; +import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.Version; import org.codehaus.jackson.map.JsonSerializer; @@ -42,7 +43,10 @@ import com.twitter.hraven.Flow; import com.twitter.hraven.HdfsStats; import com.twitter.hraven.HdfsStatsKey; +import com.twitter.hraven.JobDetails; import com.twitter.hraven.QualifiedPathKey; +import com.twitter.hraven.TaskDetails; +import com.twitter.hraven.rest.SerializationContext.DetailLevel; /** * Class that provides custom JSON bindings (where needed) for out object model. @@ -63,13 +67,23 @@ public ObjectMapper getContext(Class type) { return customMapper; } + /** + * creates a new SimpleModule for holding the serializers + * @return SimpleModule + */ + private static SimpleModule createhRavenModule() { + return new SimpleModule("hRavenModule", new Version(0, 4, 0, null)); + } + public static ObjectMapper createCustomMapper() { ObjectMapper result = new ObjectMapper(); result.configure(Feature.INDENT_OUTPUT, true); - SimpleModule module = new SimpleModule("hRavenModule", new Version(0, 4, 0, null)); + SimpleModule module = createhRavenModule(); addJobMappings(module); module.addSerializer(Flow.class, new FlowSerializer()); module.addSerializer(AppSummary.class, new AppSummarySerializer()); + module.addSerializer(TaskDetails.class, new TaskDetailsSerializer()); + module.addSerializer(JobDetails.class, new JobDetailsSerializer()); result.registerModule(module); return result; } @@ -82,14 +96,17 @@ private static SimpleModule addJobMappings(SimpleModule module) { } /** - * Custom serializer for Configuration object. We don't want to serialize the classLoader. + * Custom serializer for Configuration object. We don't want to serialize the + * classLoader. */ - public static class ConfigurationSerializer extends JsonSerializer { + public static class ConfigurationSerializer extends + JsonSerializer { @Override public void serialize(Configuration conf, JsonGenerator jsonGenerator, - SerializerProvider serializerProvider) throws IOException { - SerializationContext context = RestJSONResource.serializationContext.get(); + SerializerProvider serializerProvider) throws IOException { + SerializationContext context = RestJSONResource.serializationContext + .get(); Predicate configFilter = context.getConfigurationFilter(); Iterator> keyValueIterator = conf.iterator(); @@ -107,6 +124,145 @@ public void serialize(Configuration conf, JsonGenerator jsonGenerator, } } + /** + * Custom serializer for TaskDetails object. + */ + public static class TaskDetailsSerializer extends JsonSerializer { + + @Override + public void serialize(TaskDetails td, JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) throws IOException { + + SerializationContext context = RestJSONResource.serializationContext + .get(); + Predicate includeFilter = context.getTaskFilter(); + + if (includeFilter == null) { + // should generate the json for everything in the task details object + ObjectMapper om = new ObjectMapper(); + om.registerModule(addJobMappings(createhRavenModule())); + om.writeValue(jsonGenerator, td); + } else { + // should generate the json for everything in the task details object + // as per the filtering criteria + ObjectMapper om = new ObjectMapper(); + om.registerModule(addJobMappings(createhRavenModule())); + jsonGenerator.writeStartObject(); + filteredWrite("taskKey", includeFilter, td.getTaskKey(), jsonGenerator); + filteredWrite("taskId", includeFilter, td.getTaskId(), jsonGenerator); + filteredWrite("startTime", includeFilter, td.getStartTime(), + jsonGenerator); + filteredWrite("finishTime", includeFilter, td.getFinishTime(), + jsonGenerator); + filteredWrite("taskType", includeFilter, td.getType(), jsonGenerator); + filteredWrite("status", includeFilter, td.getStatus(), jsonGenerator); + filteredWrite("splits", includeFilter, td.getSplits(), jsonGenerator); + filteredWrite("counters", includeFilter, td.getCounters(), + jsonGenerator); + filteredWrite("taskAttemptId", includeFilter, td.getTaskAttemptId(), + jsonGenerator); + filteredWrite("trackerName", includeFilter, td.getTrackerName(), + jsonGenerator); + filteredWrite("hostname", includeFilter, td.getHostname(), + jsonGenerator); + filteredWrite("httpPort", includeFilter, td.getTrackerName(), + jsonGenerator); + filteredWrite("state", includeFilter, td.getState(), jsonGenerator); + filteredWrite("error", includeFilter, td.getError(), jsonGenerator); + filteredWrite("shuffleFinished", includeFilter, + td.getShuffleFinished(), jsonGenerator); + filteredWrite("sortFinished", includeFilter, td.getSortFinished(), + jsonGenerator); + jsonGenerator.writeEndObject(); + } + } + + } + + /** + * Custom serializer for JobDetails object. + */ + public static class JobDetailsSerializer extends JsonSerializer { + + @Override + public void serialize(JobDetails jd, JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) throws IOException { + SerializationContext context = RestJSONResource.serializationContext + .get(); + Predicate includeFilter = context.getJobFilter(); + + if (includeFilter == null) { + ObjectMapper om = new ObjectMapper(); + om.registerModule(addJobMappings(createhRavenModule())); + om.writeValue(jsonGenerator, jd); + } else { + // should generate the json for every field in the job details object + // as per the filtering criteria + ObjectMapper om = new ObjectMapper(); + om.registerModule(addJobMappings(createhRavenModule())); + jsonGenerator.writeStartObject(); + filteredWrite("jobKey", includeFilter, jd.getJobKey(), jsonGenerator); + filteredWrite("jobId", includeFilter, jd.getJobId(), jsonGenerator); + filteredWrite("jobName", includeFilter, jd.getJobName(), jsonGenerator); + filteredWrite("user", includeFilter, jd.getUser(), jsonGenerator); + filteredWrite("priority", includeFilter, jd.getPriority(), + jsonGenerator); + filteredWrite("status", includeFilter, jd.getStatus(), jsonGenerator); + filteredWrite("version", includeFilter, jd.getVersion(), jsonGenerator); + filteredWrite("historyFileType", includeFilter, jd.getHistoryFileType(), + jsonGenerator); + filteredWrite("queue", includeFilter, jd.getQueue(), jsonGenerator); + filteredWrite("submitTime", includeFilter, jd.getSubmitTime(), + jsonGenerator); + filteredWrite("launchTime", includeFilter, jd.getLaunchTime(), + jsonGenerator); + filteredWrite("finishTime", includeFilter, jd.getFinishTime(), + jsonGenerator); + filteredWrite("totalMaps", includeFilter, jd.getTotalMaps(), + jsonGenerator); + filteredWrite("totalReduces", includeFilter, jd.getTotalReduces(), + jsonGenerator); + filteredWrite("finishedMaps", includeFilter, jd.getFinishedMaps(), + jsonGenerator); + filteredWrite("finishedReduces", includeFilter, + jd.getFinishedReduces(), jsonGenerator); + filteredWrite("failedMaps", includeFilter, jd.getFailedMaps(), + jsonGenerator); + filteredWrite("failedReduces", includeFilter, jd.getFailedReduces(), + jsonGenerator); + filteredWrite("mapFileBytesRead", includeFilter, + jd.getMapFileBytesRead(), jsonGenerator); + filteredWrite("mapFileBytesWritten", includeFilter, + jd.getMapFileBytesWritten(), jsonGenerator); + filteredWrite("reduceFileBytesRead", includeFilter, + jd.getReduceFileBytesRead(), jsonGenerator); + filteredWrite("hdfsBytesRead", includeFilter, jd.getHdfsBytesRead(), + jsonGenerator); + filteredWrite("hdfsBytesWritten", includeFilter, + jd.getHdfsBytesWritten(), jsonGenerator); + filteredWrite("mapSlotMillis", includeFilter, jd.getMapSlotMillis(), + jsonGenerator); + filteredWrite("reduceSlotMillis", includeFilter, + jd.getReduceSlotMillis(), jsonGenerator); + filteredWrite("reduceShuffleBytes", includeFilter, + jd.getReduceShuffleBytes(), jsonGenerator); + filteredWrite("megabyteMillis", includeFilter, jd.getMegabyteMillis(), + jsonGenerator); + filteredWrite("cost", includeFilter, jd.getCost(), jsonGenerator); + filteredWrite("counters", includeFilter, jd.getCounters(), + jsonGenerator); + filteredWrite("mapCounters", includeFilter, jd.getMapCounters(), + jsonGenerator); + filteredWrite("reduceCounters", includeFilter, jd.getReduceCounters(), + jsonGenerator); + filteredWrite("cost", includeFilter, jd.getCost(), jsonGenerator); + filteredWrite("configuration", includeFilter, jd.getConfiguration(), jsonGenerator); + jsonGenerator.writeEndObject(); + } + } + + } + /** * Custom serializer for HdfsStats object */ @@ -114,7 +270,7 @@ public static class HdfsStatsSerializer extends JsonSerializer { @Override public void serialize(HdfsStats hdfsStats, JsonGenerator jsonGenerator, - SerializerProvider serializerProvider) throws IOException { + SerializerProvider serializerProvider) throws IOException { jsonGenerator.writeStartObject(); jsonGenerator.writeFieldName("hdfsStatsKey"); @@ -160,19 +316,19 @@ public void serialize(HdfsStats hdfsStats, JsonGenerator jsonGenerator, jsonGenerator.writeNumber(hdfsStats.getStorageCost()); jsonGenerator.writeFieldName("hdfsCost"); jsonGenerator.writeNumber(hdfsStats.getHdfsCost()); - jsonGenerator.writeEndObject(); } } /** - * Custom serializer for Configuration object. We don't want to serialize the classLoader. + * Custom serializer for Configuration object. We don't want to serialize the + * classLoader. */ public static class CounterSerializer extends JsonSerializer { @Override public void serialize(CounterMap counterMap, JsonGenerator jsonGenerator, - SerializerProvider serializerProvider) throws IOException { + SerializerProvider serializerProvider) throws IOException { jsonGenerator.writeStartObject(); for (String group : counterMap.getGroups()) { @@ -197,92 +353,14 @@ public void serialize(CounterMap counterMap, JsonGenerator jsonGenerator, * fields to include in serialized response */ public static class FlowSerializer extends JsonSerializer { - @SuppressWarnings("deprecation") @Override public void serialize(Flow aFlow, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { - SerializationContext.DetailLevel selectedSerialization = - RestJSONResource.serializationContext.get().getLevel(); - if (selectedSerialization == SerializationContext.DetailLevel.EVERYTHING) { - // should generate the json for everything in the flow object - ObjectMapper om = new ObjectMapper(); - om.registerModule( - addJobMappings(new SimpleModule("hRavenModule", new Version(0, 4, 0, null)))); - om.writeValue(jsonGenerator, aFlow); - } else { - if (selectedSerialization == SerializationContext.DetailLevel.FLOW_SUMMARY_STATS_ONLY - || selectedSerialization == SerializationContext.DetailLevel.FLOW_SUMMARY_STATS_WITH_JOB_STATS) { - jsonGenerator.writeStartObject(); - // serialize the FlowKey object - jsonGenerator.writeFieldName("flowKey"); - jsonGenerator.writeObject(aFlow.getFlowKey()); - // serialize individual members of this class - jsonGenerator.writeFieldName("flowName"); - jsonGenerator.writeString(aFlow.getFlowName()); - jsonGenerator.writeFieldName("userName"); - jsonGenerator.writeString(aFlow.getUserName()); - jsonGenerator.writeFieldName("jobCount"); - jsonGenerator.writeNumber(aFlow.getJobCount()); - jsonGenerator.writeFieldName("totalMaps"); - jsonGenerator.writeNumber(aFlow.getTotalMaps()); - jsonGenerator.writeFieldName("totalReduces"); - jsonGenerator.writeNumber(aFlow.getTotalReduces()); - jsonGenerator.writeFieldName("mapFilesBytesRead"); - jsonGenerator.writeNumber(aFlow.getMapFileBytesRead()); - jsonGenerator.writeFieldName("mapFilesBytesWritten"); - jsonGenerator.writeNumber(aFlow.getMapFileBytesWritten()); - jsonGenerator.writeFieldName("reduceFilesBytesRead"); - jsonGenerator.writeNumber(aFlow.getReduceFileBytesRead()); - jsonGenerator.writeFieldName("hdfsBytesRead"); - jsonGenerator.writeNumber(aFlow.getHdfsBytesRead()); - jsonGenerator.writeFieldName("hdfsBytesWritten"); - jsonGenerator.writeNumber(aFlow.getHdfsBytesWritten()); - jsonGenerator.writeFieldName("mapSlotMillis"); - jsonGenerator.writeNumber(aFlow.getMapSlotMillis()); - jsonGenerator.writeFieldName("reduceSlotMillis"); - jsonGenerator.writeNumber(aFlow.getReduceSlotMillis()); - jsonGenerator.writeFieldName("megabyteMillis"); - jsonGenerator.writeNumber(aFlow.getMegabyteMillis()); - jsonGenerator.writeFieldName("cost"); - jsonGenerator.writeNumber(aFlow.getCost()); - jsonGenerator.writeFieldName("reduceShuffleBytes"); - jsonGenerator.writeNumber(aFlow.getReduceShuffleBytes()); - jsonGenerator.writeFieldName("duration"); - jsonGenerator.writeNumber(aFlow.getDuration()); - jsonGenerator.writeFieldName("wallClockTime"); - jsonGenerator.writeNumber(aFlow.getWallClockTime()); - jsonGenerator.writeFieldName("cluster"); - jsonGenerator.writeString(aFlow.getCluster()); - jsonGenerator.writeFieldName("appId"); - jsonGenerator.writeString(aFlow.getAppId()); - jsonGenerator.writeFieldName("runId"); - jsonGenerator.writeNumber(aFlow.getRunId()); - jsonGenerator.writeFieldName("version"); - jsonGenerator.writeString(aFlow.getVersion()); - jsonGenerator.writeFieldName("historyFileType"); - /** - * unlikely that the next line with .toString - * will throw NPE since Flow class always sets - * default hadoop version in Flow#addJob - */ - jsonGenerator.writeString(aFlow.getHistoryFileType().toString()); - jsonGenerator.writeFieldName(Constants.HRAVEN_QUEUE); - jsonGenerator.writeString(aFlow.getQueue()); - jsonGenerator.writeFieldName("counters"); - jsonGenerator.writeObject(aFlow.getCounters()); - jsonGenerator.writeFieldName("mapCounters"); - jsonGenerator.writeObject(aFlow.getMapCounters()); - jsonGenerator.writeFieldName("reduceCounters"); - jsonGenerator.writeObject(aFlow.getReduceCounters()); - // if flag, include job details - if (selectedSerialization == - SerializationContext.DetailLevel.FLOW_SUMMARY_STATS_WITH_JOB_STATS) { - jsonGenerator.writeFieldName("jobs"); - jsonGenerator.writeObject(aFlow.getJobs()); - } - jsonGenerator.writeEndObject(); - } - } + + SerializationContext context = RestJSONResource.serializationContext.get(); + SerializationContext.DetailLevel selectedSerialization = context.getLevel(); + Predicate includeFilter = context.getFlowFilter(); + writeFlowDetails(jsonGenerator, aFlow, selectedSerialization, includeFilter); } } @@ -295,20 +373,18 @@ public static class AppSummarySerializer extends JsonSerializer { @Override public void serialize(AppSummary anApp, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { - SerializationContext.DetailLevel selectedSerialization = - RestJSONResource.serializationContext.get().getLevel(); + SerializationContext.DetailLevel selectedSerialization = RestJSONResource.serializationContext + .get().getLevel(); if (selectedSerialization == SerializationContext.DetailLevel.EVERYTHING) { - // should generate the json for everything in the flow object + // should generate the json for everything in the app summary object ObjectMapper om = new ObjectMapper(); - om.registerModule( - addJobMappings(new SimpleModule("hRavenModule", new Version(0, 4, 0, null)))); + om.registerModule(addJobMappings(createhRavenModule())); om.writeValue(jsonGenerator, anApp); } else { if (selectedSerialization == SerializationContext.DetailLevel.APP_SUMMARY_STATS_NEW_JOBS_ONLY) { - // should generate the json for everything in the flow object + // should generate the json for stats relevant for new jobs ObjectMapper om = new ObjectMapper(); - om.registerModule( - addJobMappings(new SimpleModule("hRavenModule", new Version(0, 4, 0, null)))); + om.registerModule(addJobMappings(createhRavenModule())); jsonGenerator.writeStartObject(); jsonGenerator.writeFieldName("cluster"); jsonGenerator.writeString(anApp.getKey().getCluster()); @@ -326,10 +402,9 @@ public void serialize(AppSummary anApp, JsonGenerator jsonGenerator, jsonGenerator.writeNumber(anApp.getLastRunId()); jsonGenerator.writeEndObject(); } else if (selectedSerialization == SerializationContext.DetailLevel.APP_SUMMARY_STATS_ALL_APPS) { - // should generate the json for everything in the flow object + // should generate the json for everything in the app summary object ObjectMapper om = new ObjectMapper(); - om.registerModule( - addJobMappings(new SimpleModule("hRavenModule", new Version(0, 4, 0, null)))); + om.registerModule(addJobMappings(createhRavenModule())); jsonGenerator.writeStartObject(); jsonGenerator.writeFieldName("cluster"); jsonGenerator.writeString(anApp.getKey().getCluster()); @@ -365,4 +440,91 @@ public void serialize(AppSummary anApp, JsonGenerator jsonGenerator, } } } + + /** + * checks if the member is to be filtered out or no if filter itself is + * null, writes out that member + * + * @param member + * @param includeFilter + * @param taskObject + * @param jsonGenerator + * @throws JsonGenerationException + * @throws IOException + */ + public static void filteredWrite(String member, Predicate includeFilter, + Object taskObject, JsonGenerator jsonGenerator) + throws JsonGenerationException, IOException { + if (includeFilter != null) { + if (includeFilter.apply(member)) { + jsonGenerator.writeFieldName(member); + jsonGenerator.writeObject(taskObject); + } + } else { + jsonGenerator.writeFieldName(member); + jsonGenerator.writeObject(taskObject); + } + } + + /** + * Writes out the flow object + * + * @param jsonGenerator + * @param aFlow + * @param selectedSerialization + * @param includeFilter + * @param includeJobFieldFilter + * @throws JsonGenerationException + * @throws IOException + */ + @SuppressWarnings("deprecation") + public static void writeFlowDetails(JsonGenerator jsonGenerator, Flow aFlow, + DetailLevel selectedSerialization, Predicate includeFilter) + throws JsonGenerationException, IOException { + jsonGenerator.writeStartObject(); + // serialize the FlowKey object + filteredWrite("flowKey", includeFilter, aFlow.getFlowKey(), jsonGenerator); + + // serialize individual members of this class + filteredWrite("flowName", includeFilter, aFlow.getFlowName(), jsonGenerator); + filteredWrite("userName", includeFilter, aFlow.getUserName(), jsonGenerator); + filteredWrite("jobCount", includeFilter, aFlow.getJobCount(), jsonGenerator); + filteredWrite("totalMaps", includeFilter, aFlow.getTotalMaps(), jsonGenerator); + filteredWrite("totalReduces", includeFilter, aFlow.getTotalReduces(), jsonGenerator); + filteredWrite("mapFileBytesRead", includeFilter, aFlow.getMapFileBytesRead(), jsonGenerator); + filteredWrite("mapFileBytesWritten", includeFilter, aFlow.getMapFileBytesWritten(), jsonGenerator); + filteredWrite("reduceFileBytesRead", includeFilter, aFlow.getReduceFileBytesRead(), jsonGenerator); + filteredWrite("hdfsBytesRead", includeFilter, aFlow.getHdfsBytesRead(), jsonGenerator); + filteredWrite("hdfsBytesWritten", includeFilter, aFlow.getHdfsBytesWritten(), jsonGenerator); + filteredWrite("mapSlotMillis", includeFilter, aFlow.getMapSlotMillis(), jsonGenerator); + filteredWrite("reduceSlotMillis", includeFilter, aFlow.getReduceSlotMillis(), jsonGenerator); + filteredWrite("megabyteMillis", includeFilter, aFlow.getMegabyteMillis(), jsonGenerator); + filteredWrite("cost", includeFilter, aFlow.getCost(), jsonGenerator); + filteredWrite("reduceShuffleBytes", includeFilter, aFlow.getReduceShuffleBytes(), jsonGenerator); + filteredWrite("duration", includeFilter, aFlow.getDuration(), jsonGenerator); + filteredWrite("wallClockTime", includeFilter, aFlow.getWallClockTime(), jsonGenerator); + filteredWrite("cluster", includeFilter, aFlow.getCluster(), jsonGenerator); + filteredWrite("appId", includeFilter, aFlow.getAppId(), jsonGenerator); + filteredWrite("runId", includeFilter, aFlow.getRunId(), jsonGenerator); + filteredWrite("version", includeFilter, aFlow.getVersion(), jsonGenerator); + filteredWrite("historyFileType", includeFilter, aFlow.getHistoryFileType(), jsonGenerator); + if (selectedSerialization == SerializationContext.DetailLevel.EVERYTHING) { + filteredWrite("submitTime", includeFilter, aFlow.getSubmitTime(), jsonGenerator); + filteredWrite("launchTime", includeFilter, aFlow.getLaunchTime(), jsonGenerator); + filteredWrite("finishTime", includeFilter, aFlow.getFinishTime(), jsonGenerator); + } + filteredWrite(Constants.HRAVEN_QUEUE, includeFilter, aFlow.getQueue(), jsonGenerator); + filteredWrite("counters", includeFilter, aFlow.getCounters(), jsonGenerator); + filteredWrite("mapCounters", includeFilter, aFlow.getMapCounters(), jsonGenerator); + filteredWrite("reduceCounters", includeFilter, aFlow.getReduceCounters(), jsonGenerator); + + // if flag, include job details + if ((selectedSerialization == SerializationContext.DetailLevel.FLOW_SUMMARY_STATS_WITH_JOB_STATS) + || (selectedSerialization == SerializationContext.DetailLevel.EVERYTHING)) { + jsonGenerator.writeFieldName("jobs"); + jsonGenerator.writeObject(aFlow.getJobs()); + } + jsonGenerator.writeEndObject(); + + } } diff --git a/hraven-core/src/main/java/com/twitter/hraven/rest/RestJSONResource.java b/hraven-core/src/main/java/com/twitter/hraven/rest/RestJSONResource.java index 18cf27f..a86e30f 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/rest/RestJSONResource.java +++ b/hraven-core/src/main/java/com/twitter/hraven/rest/RestJSONResource.java @@ -53,6 +53,7 @@ import com.twitter.hraven.datasource.JobHistoryService; import com.twitter.hraven.datasource.ProcessingException; import com.twitter.hraven.datasource.VersionInfo; +import com.twitter.hraven.util.StringUtil; /** * Main REST resource that handles binding the REST API to the JobHistoryService. @@ -63,7 +64,7 @@ @Path("/api/v1/") public class RestJSONResource { private static final Log LOG = LogFactory.getLog(RestJSONResource.class); - private static final String SLASH = "/" ; + public static final String SLASH = "/" ; private static final Configuration HBASE_CONF = HBaseConfiguration.create(); private static final ThreadLocal serviceThreadLocal = @@ -135,18 +136,29 @@ protected SerializationContext initialValue() { @Path("job/{cluster}/{jobId}") @Produces(MediaType.APPLICATION_JSON) public JobDetails getJobById(@PathParam("cluster") String cluster, - @PathParam("jobId") String jobId) throws IOException { + @PathParam("jobId") String jobId, + @QueryParam("include") List includeFields) + throws IOException { LOG.info("Fetching JobDetails for jobId=" + jobId); Stopwatch timer = new Stopwatch().start(); + Predicate includeFilter = null; + if (includeFields != null && !includeFields.isEmpty()) { + includeFilter = new SerializationContext.FieldNameFilter(includeFields); + } serializationContext.set(new SerializationContext( - SerializationContext.DetailLevel.EVERYTHING)); + SerializationContext.DetailLevel.EVERYTHING, null, null, includeFilter, null)); JobDetails jobDetails = getJobHistoryService().getJobByJobID(cluster, jobId); timer.stop(); if (jobDetails != null) { - LOG.info("For job/{cluster}/{jobId} with input query:" + " job/" + cluster + SLASH + jobId - + " fetched jobDetails for " + jobDetails.getJobName() + " in " + timer); + LOG.info("For job/{cluster}/{jobId} with input query:" + + " job/" + cluster + SLASH + jobId + "&" + + StringUtil.buildParam("include", includeFields) + + " fetched jobDetails for " + + jobDetails.getJobName() + " in " + timer); } else { - LOG.info("For job/{cluster}/{jobId} with input query:" + " job/" + cluster + SLASH + jobId + LOG.info("For job/{cluster}/{jobId} with input query:" + " job/" + + cluster + SLASH + jobId + "&" + + StringUtil.buildParam("include",includeFields) + " No jobDetails found, but spent " + timer); } // export latency metrics @@ -161,19 +173,34 @@ public JobDetails getJobById(@PathParam("cluster") String cluster, @Path("tasks/{cluster}/{jobId}") @Produces(MediaType.APPLICATION_JSON) public List getJobTasksById(@PathParam("cluster") String cluster, - @PathParam("jobId") String jobId) throws IOException { + @PathParam("jobId") String jobId, + @QueryParam("include") List includeFields) + throws IOException { LOG.info("Fetching tasks info for jobId=" + jobId); Stopwatch timer = new Stopwatch().start(); + + Predicate includeFilter = null; + if (includeFields != null && !includeFields.isEmpty()) { + includeFilter = new SerializationContext.FieldNameFilter(includeFields); + } serializationContext.set(new SerializationContext( - SerializationContext.DetailLevel.EVERYTHING)); - JobDetails jobDetails = getJobHistoryService().getJobByJobID(cluster, jobId, true); - timer.stop(); + SerializationContext.DetailLevel.EVERYTHING, null, null, null, + includeFilter)); + + JobDetails jobDetails = getJobHistoryService().getJobByJobID(cluster, + jobId, true); List tasks = jobDetails.getTasks(); + timer.stop(); + if(tasks != null && !tasks.isEmpty()) { - LOG.info("For endpoint /tasks/" + cluster + "/" + jobId + ", fetched " + LOG.info("For endpoint /tasks/" + cluster + "/" + + jobId + "?" + + StringUtil.buildParam("include", includeFields) + + " fetched " + tasks.size() + " tasks, spent time " + timer); } else { LOG.info("For endpoint /tasks/" + cluster + "/" + jobId + + "?" + StringUtil.buildParam("include", includeFields) + ", found no tasks, spent time " + timer); } return tasks; @@ -183,20 +210,43 @@ public List getJobTasksById(@PathParam("cluster") String cluster, @Path("jobFlow/{cluster}/{jobId}") @Produces(MediaType.APPLICATION_JSON) public Flow getJobFlowById(@PathParam("cluster") String cluster, - @PathParam("jobId") String jobId) throws IOException { + @PathParam("jobId") String jobId, + @QueryParam("includeFlowField") List includeFlowFields, + @QueryParam("includeJobField") List includeJobFields) + throws IOException { LOG.info(String.format("Fetching Flow for cluster=%s, jobId=%s", cluster, jobId)); Stopwatch timer = new Stopwatch().start(); + Predicate jobFilter = null; + if (includeJobFields != null && !includeJobFields.isEmpty()) { + jobFilter = new SerializationContext.FieldNameFilter(includeJobFields); + } + + Predicate flowFilter = null; + if (includeFlowFields != null && !includeFlowFields.isEmpty()) { + flowFilter = new SerializationContext.FieldNameFilter(includeFlowFields); + } + serializationContext.set(new SerializationContext( - SerializationContext.DetailLevel.EVERYTHING)); + SerializationContext.DetailLevel.EVERYTHING, null, flowFilter, + jobFilter, null)); Flow flow = getJobHistoryService().getFlowByJobID(cluster, jobId, false); timer.stop(); + if (flow != null) { - LOG.info("For jobFlow/{cluster}/{jobId} with input query: " + "jobFlow/" + cluster + SLASH - + jobId + " fetched flow " + flow.getFlowName() + " with #jobs " + flow.getJobCount() - + " in " + timer); + LOG.info("For jobFlow/{cluster}/{jobId} with input query: " + "jobFlow/" + + cluster + SLASH + jobId + "&" + + StringUtil.buildParam("includeJobField", includeJobFields) + + "&" + + StringUtil.buildParam("includeFlowField", includeFlowFields) + + " fetched flow " + flow.getFlowName() + + " with # " + flow.getJobCount() + " in " + timer); } else { - LOG.info("For jobFlow/{cluster}/{jobId} with input query: " + "jobFlow/" + cluster + SLASH - + jobId + " No flow found, spent " + timer); + LOG.info("For jobFlow/{cluster}/{jobId} with input query: " + "jobFlow/" + + cluster + SLASH + jobId + "&" + + StringUtil.buildParam("includeJobField", includeJobFields) + + "&" + + StringUtil.buildParam("includeFlowField", includeFlowFields) + + " No flow found, spent " + timer); } // export latency metrics @@ -215,8 +265,10 @@ public List getJobFlowById(@PathParam("cluster") String cluster, @QueryParam("limit") int limit, @QueryParam("startTime") long startTime, @QueryParam("endTime") long endTime, + @QueryParam("include") List include, @QueryParam("includeConf") List includeConfig, - @QueryParam("includeConfRegex") List includeConfigRegex) + @QueryParam("includeConfRegex") List includeConfigRegex, + @QueryParam("includeJobField") List includeJobFields) throws IOException { Stopwatch timer = new Stopwatch().start(); @@ -233,13 +285,27 @@ public List getJobFlowById(@PathParam("cluster") String cluster, Predicate configFilter = null; if (includeConfig != null && !includeConfig.isEmpty()) { - configFilter = new SerializationContext.ConfigurationFilter(includeConfig); + configFilter = new SerializationContext.FieldNameFilter(includeConfig); } else if (includeConfigRegex != null && !includeConfigRegex.isEmpty()) { - configFilter = new SerializationContext.RegexConfigurationFilter(includeConfigRegex); + configFilter = new SerializationContext.RegexConfigurationFilter( + includeConfigRegex); + } + + Predicate jobFilter = null; + if (includeJobFields != null && !includeJobFields.isEmpty()) { + jobFilter = new SerializationContext.FieldNameFilter(includeJobFields); + } + + Predicate flowFilter = null; + if (include != null && !include.isEmpty()) { + flowFilter = new SerializationContext.FieldNameFilter(include); } + serializationContext.set(new SerializationContext( - SerializationContext.DetailLevel.EVERYTHING, configFilter)); - List flows = getFlowList(cluster, user, appId, version, startTime, endTime, limit); + SerializationContext.DetailLevel.EVERYTHING, configFilter, flowFilter, + jobFilter, null)); + List flows = getFlowList(cluster, user, appId, version, startTime, + endTime, limit); timer.stop(); StringBuilder builderIncludeConfigs = new StringBuilder(); @@ -252,17 +318,28 @@ public List getJobFlowById(@PathParam("cluster") String cluster, } if (flows != null) { - LOG.info("For flow/{cluster}/{user}/{appId}/{version} with input query: " + "flow/" + cluster - + SLASH + user + SLASH + appId + SLASH + version + "?limit=" + limit + LOG.info("For flow/{cluster}/{user}/{appId}/{version} with input query: " + + "flow/" + cluster + + SLASH + user + SLASH + appId + SLASH + version + + "?limit=" + limit + " startTime=" + startTime + " endTime=" + endTime - + " &includeConf=" + builderIncludeConfigs + " &includeConfRegex=" - + builderIncludeConfigRegex + " fetched " + flows.size() + " flows " + " in " + timer); + + " &includeConf=" + builderIncludeConfigs + + " &includeConfRegex=" + + builderIncludeConfigRegex + + StringUtil.buildParam("includeJobField", includeJobFields) + + "&" + StringUtil.buildParam("include", include) + + " fetched " + flows.size() + " flows " + " in " + timer); } else { - LOG.info("For flow/{cluster}/{user}/{appId}/{version} with input query: " + "flow/" + cluster - + SLASH + user + SLASH + appId + SLASH + version + "?limit=" + limit + LOG.info("For flow/{cluster}/{user}/{appId}/{version} with input query: " + + "flow/" + cluster + + SLASH + user + SLASH + appId + SLASH + version + + "?limit=" + limit + " startTime=" + startTime + " endTime=" + endTime - + " &includeConf=" + builderIncludeConfigs + "&includeConfRegex=" - + builderIncludeConfigRegex + " No flows fetched, spent " + timer); + + " &includeConf=" + builderIncludeConfigs + + "&includeConfRegex=" + builderIncludeConfigRegex + + StringUtil.buildParam("includeJobField", includeJobFields) + + "&" + StringUtil.buildParam("include", include) + + " No flows fetched, spent " + timer); } // export latency metrics @@ -280,42 +357,69 @@ public List getJobFlowById(@PathParam("cluster") String cluster, @QueryParam("limit") int limit, @QueryParam("startTime") long startTime, @QueryParam("endTime") long endTime, + @QueryParam("include") List include, @QueryParam("includeConf") List includeConfig, - @QueryParam("includeConfRegex") List includeConfigRegex) - throws IOException { + @QueryParam("includeConfRegex") List includeConfigRegex, + @QueryParam("includeJobField") List includeJobFields) + throws IOException { Stopwatch timer = new Stopwatch().start(); Predicate configFilter = null; if (includeConfig != null && !includeConfig.isEmpty()) { - configFilter = new SerializationContext.ConfigurationFilter(includeConfig); + configFilter = new SerializationContext.FieldNameFilter(includeConfig); } else if (includeConfigRegex != null && !includeConfigRegex.isEmpty()) { - configFilter = new SerializationContext.RegexConfigurationFilter(includeConfigRegex); - } else { - serializationContext.set(new SerializationContext( - SerializationContext.DetailLevel.EVERYTHING, configFilter)); + configFilter = new SerializationContext.RegexConfigurationFilter( + includeConfigRegex); } - List flows = getFlowList(cluster, user, appId, null, startTime, endTime, limit); + Predicate jobFilter = null; + if (includeJobFields != null && !includeJobFields.isEmpty()) { + jobFilter = new SerializationContext.FieldNameFilter(includeJobFields); + } + + Predicate flowFilter = null; + if (include != null && !include.isEmpty()) { + flowFilter = new SerializationContext.FieldNameFilter(include); + } + + serializationContext.set(new SerializationContext( + SerializationContext.DetailLevel.EVERYTHING, configFilter, flowFilter, + jobFilter, null)); + + List flows = getFlowList(cluster, user, appId, null, startTime, + endTime, limit); timer.stop(); StringBuilder builderIncludeConfigs = new StringBuilder(); for(String s : includeConfig) { builderIncludeConfigs.append(s); } + StringBuilder builderIncludeConfigRegex = new StringBuilder(); for(String s : includeConfig) { builderIncludeConfigRegex.append(s); } if (flows != null) { - LOG.info("For flow/{cluster}/{user}/{appId} with input query: " + "flow/" + cluster + SLASH - + user + SLASH + appId + "?limit=" + limit + "&startTime=" + startTime - + "&endTime=" + endTime + "&includeConf=" + builderIncludeConfigs - + "&includeConfRegex=" + builderIncludeConfigRegex + " fetched " + flows.size() + LOG.info("For flow/{cluster}/{user}/{appId} with input query: " + + "flow/" + cluster + SLASH + + user + SLASH + appId + "?limit=" + limit + + "&startTime=" + startTime + + "&endTime=" + endTime + + "&includeConf=" + builderIncludeConfigs + + "&includeConfRegex=" + builderIncludeConfigRegex + + StringUtil.buildParam("includeJobField", includeJobFields) + + "&" + StringUtil.buildParam("include", include) + + " fetched " + flows.size() + " flows in " + timer); } else { - LOG.info("For flow/{cluster}/{user}/{appId} with input query: " + "flow/" + cluster + SLASH - + user + SLASH + appId + "?limit=" + limit + "&includeConf=" + builderIncludeConfigs - + "&includeConfRegex=" + builderIncludeConfigRegex + " No flows fetched, spent "+ timer); + LOG.info("For flow/{cluster}/{user}/{appId} with input query: " + + "flow/" + cluster + SLASH + + user + SLASH + appId + "?limit=" + limit + + "&includeConf=" + builderIncludeConfigs + + "&includeConfRegex=" + builderIncludeConfigRegex + + StringUtil.buildParam("includeJobField", includeJobFields) + + "&" + StringUtil.buildParam("include", include) + + " No flows fetched, spent "+ timer); } // export latency metrics @@ -336,12 +440,15 @@ public PaginatedResult getJobFlowStats(@PathParam("cluster") String cluste @QueryParam("startTime") long startTime, @QueryParam("endTime") long endTime, @QueryParam("limit") @DefaultValue("100") int limit, - @QueryParam("includeJobs") boolean includeJobs + @QueryParam("include") List include, + @QueryParam("includeJobs") boolean includeJobs, + @QueryParam("includeJobField") List includeJobFields ) throws IOException { LOG.info("Fetching flowStats for flowStats/{cluster}/{user}/{appId} with input query: " - + "flowStats/" + cluster + SLASH // + user /{appId} cluster + " user " + user + + "flowStats/" + cluster + SLASH + " user " + user + appId + "?version=" + version + "&limit=" + limit - + "&startRow=" + startRowParam + "&startTime=" + startTime + "&endTime=" + endTime + + "&startRow=" + startRowParam + "&startTime=" + startTime + + "&endTime=" + endTime + "&includeJobs=" + includeJobs); Stopwatch timer = new Stopwatch().start(); @@ -350,12 +457,24 @@ public PaginatedResult getJobFlowStats(@PathParam("cluster") String cluste startRow = Base64.decode(startRowParam); } + Predicate flowFilter = null; + if (include != null && !include.isEmpty()) { + flowFilter = new SerializationContext.FieldNameFilter(include); + } + if (includeJobs) { + Predicate jobFilter = null; + if (includeJobFields != null && !includeJobFields.isEmpty()) { + jobFilter = new SerializationContext.FieldNameFilter(includeJobFields); + } + serializationContext.set(new SerializationContext( - SerializationContext.DetailLevel.FLOW_SUMMARY_STATS_WITH_JOB_STATS)); + SerializationContext.DetailLevel.FLOW_SUMMARY_STATS_WITH_JOB_STATS, + null, flowFilter, jobFilter, null)); } else { serializationContext.set(new SerializationContext( - SerializationContext.DetailLevel.FLOW_SUMMARY_STATS_ONLY)); + SerializationContext.DetailLevel.FLOW_SUMMARY_STATS_ONLY, null, + flowFilter, null, null)); } if(endTime == 0) { @@ -409,6 +528,7 @@ public PaginatedResult getJobFlowStats(@PathParam("cluster") String cluste + SLASH // + user /{appId} cluster + " user " + user + appId + "?version=" + version + "&limit=" + limit + "&startRow=" + startRow + "&startTime=" + startTime + "&endTime=" + endTime + "&includeJobs=" + includeJobs + + "&" + StringUtil.buildParam("includeJobField", includeJobFields) + " fetched " + flows.size() + " in " + timer); // export latency metrics diff --git a/hraven-core/src/main/java/com/twitter/hraven/rest/SerializationContext.java b/hraven-core/src/main/java/com/twitter/hraven/rest/SerializationContext.java index b5669c6..7933a3c 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/rest/SerializationContext.java +++ b/hraven-core/src/main/java/com/twitter/hraven/rest/SerializationContext.java @@ -66,10 +66,10 @@ public enum DetailLevel { * Restricts returned job configuration data to specific configuration * properties. */ - public static class ConfigurationFilter implements Predicate { + public static class FieldNameFilter implements Predicate { private final Set allowedKeys; - public ConfigurationFilter(List keys) { + public FieldNameFilter(List keys) { if (keys != null) { this.allowedKeys = new HashSet(keys); } else { @@ -119,17 +119,36 @@ public boolean apply(String potentialKey) { } private final DetailLevel level; - private final Predicate filter; + private final Predicate configFilter; + private final Predicate flowFilter; + private final Predicate jobFilter; + private final Predicate taskFilter; public SerializationContext(DetailLevel serializationLevel) { this.level = serializationLevel; - this.filter = null; + this.configFilter = null; + this.flowFilter = null; + this.jobFilter = null; + this.taskFilter = null; } + /** + * constructor to set the config filter, job filter and task filter + * @param serializationLevel + * @param configFilter + * @param jobFilter + * @param taskFilter + */ public SerializationContext(DetailLevel serializationLevel, - Predicate filter) { + Predicate configFilter, + Predicate flowFilter, + Predicate jobFilter, + Predicate taskFilter) { this.level = serializationLevel; - this.filter = filter; + this.configFilter = configFilter; + this.flowFilter = flowFilter; + this.jobFilter = jobFilter; + this.taskFilter = taskFilter; } public DetailLevel getLevel() { @@ -137,6 +156,18 @@ public DetailLevel getLevel() { } public Predicate getConfigurationFilter() { - return this.filter; + return this.configFilter; + } + + public Predicate getFlowFilter() { + return flowFilter; + } + + public Predicate getJobFilter() { + return jobFilter; + } + + public Predicate getTaskFilter() { + return taskFilter; } } diff --git a/hraven-core/src/main/java/com/twitter/hraven/rest/client/HRavenRestClient.java b/hraven-core/src/main/java/com/twitter/hraven/rest/client/HRavenRestClient.java index a60b459..c47ed83 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/rest/client/HRavenRestClient.java +++ b/hraven-core/src/main/java/com/twitter/hraven/rest/client/HRavenRestClient.java @@ -19,15 +19,19 @@ import java.io.InputStream; import java.net.URL; import java.net.URLConnection; -import java.net.URLEncoder; import java.text.DateFormat; import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.codehaus.jackson.Version; import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.module.SimpleModule; import org.codehaus.jackson.type.TypeReference; import com.twitter.hraven.Flow; @@ -35,6 +39,9 @@ import com.twitter.hraven.TaskDetails; import com.twitter.hraven.datasource.JobHistoryService; import com.twitter.hraven.rest.ObjectMapperProvider; +import com.twitter.hraven.rest.RestJSONResource; +import com.twitter.hraven.rest.ObjectMapperProvider.FlowSerializer; +import com.twitter.hraven.rest.ObjectMapperProvider.JobDetailsSerializer; import com.twitter.hraven.util.StringUtil; /** @@ -48,6 +55,13 @@ public class HRavenRestClient { private int connectTimeout; private int readTimeout; + public static final String URL_PORTION_API_V1 = "api/v1/"; + public static final String AND = "&"; + public static final String QUESTION_MARK = "?"; + public static final String LIMIT = "limit"; + public static final String FLOW_API = "flow"; + public static final String EQUAL_TO = "="; + /** * Initializes with the given hostname and a default connect and read timeout of 5 seconds. * @param apiHostname the hostname to connect to @@ -106,6 +120,37 @@ public List fetchFlows(String cluster, return retrieveFlowsFromURL(urlString); } + public List fetchFlows(String cluster, + String username, + String batchDesc, + String signature, + List flowResponseFilters, + List jobResponseFilters, + int limit) throws IOException { + LOG.info(String.format( + "Fetching last %d matching jobs for cluster=%s, user.name=%s, " + + "batch.desc=%s, pig.logical.plan.signature=%s", limit, cluster, + username, batchDesc, signature)); + + StringBuilder urlStringBuilder = buildFlowURL(cluster, username, batchDesc, + signature, limit, flowResponseFilters, jobResponseFilters); + + return retrieveFlowsFromURL(urlStringBuilder.toString()); + } + + /** + * Fetches a list of flows that include jobs in that flow that include the + * specified configuration properties + * + * @param cluster + * @param username + * @param batchDesc + * @param signature + * @param limit + * @param configProps + * @return list of flows + * @throws IOException + */ public List fetchFlowsWithConfig(String cluster, String username, String batchDesc, @@ -117,7 +162,7 @@ public List fetchFlowsWithConfig(String cluster, String configParam = ""; if (configProps != null && configProps.length > 0) { - configParam = buildConfigParam("includeConf", configProps); + configParam = StringUtil.buildParam("includeConf", configProps); } String urlString = signature == null ? String.format("http://%s/api/v1/flow/%s/%s/%s?limit=%d&%s", @@ -129,6 +174,54 @@ public List fetchFlowsWithConfig(String cluster, return retrieveFlowsFromURL(urlString); } + /** + * Fetches a list of flows that include jobs in that flow that include the + * specified flow fields and job fields + * specified configuration properties + * @param cluster + * @param username + * @param batchDesc + * @param signature + * @param limit + * @param flowResponseFilters + * @param jobResponseFilters + * @param configPropertyFields + * @return list of flows + * @throws IOException + */ + public List fetchFlowsWithConfig(String cluster, + String username, + String batchDesc, + String signature, + int limit, + List flowResponseFilters, + List jobResponseFilters, + List configPropertyFields) throws IOException { + LOG.info(String.format("Fetching last %d matching jobs for cluster=%s, user.name=%s, " + + "batch.desc=%s, pig.logical.plan.signature=%s", limit, cluster, username, batchDesc, signature)); + StringBuilder urlStringBuilder = buildFlowURL(cluster, username, batchDesc, + signature, limit, flowResponseFilters, jobResponseFilters); + + if ((configPropertyFields != null) && (configPropertyFields.size() > 0)) { + urlStringBuilder.append(AND); + urlStringBuilder.append(StringUtil.buildParam("includeConf", + configPropertyFields)); + } + return retrieveFlowsFromURL(urlStringBuilder.toString()); + } + + /** + * Returns a list of flows that contain config elements + * matching a specific pattern. + * @param cluster + * @param username + * @param batchDesc + * @param signature + * @param limit + * @param configPatterns + * @return + * @throws IOException + */ public List fetchFlowsWithConfigPatterns(String cluster, String username, String batchDesc, @@ -140,7 +233,7 @@ public List fetchFlowsWithConfigPatterns(String cluster, String configParam = ""; if (configPatterns != null && configPatterns.length > 0) { - configParam = buildConfigParam("includeConfRegex", configPatterns); + configParam = StringUtil.buildParam("includeConfRegex", configPatterns); } String urlString = signature == null ? String.format("http://%s/api/v1/flow/%s/%s/%s?limit=%d&%s", @@ -152,18 +245,85 @@ public List fetchFlowsWithConfigPatterns(String cluster, return retrieveFlowsFromURL(urlString); } - private String buildConfigParam(String paramName, String[] paramArgs) throws IOException { - StringBuilder sb = new StringBuilder(); - for (String arg : paramArgs) { - if (sb.length() > 0) { - sb.append("&"); - } - sb.append(paramName).append("=").append(URLEncoder.encode(arg, "UTF-8")); + public List fetchFlowsWithConfigPatterns(String cluster, + String username, + String batchDesc, + String signature, + int limit, + List flowResponseFilters, + List jobResponseFilters, + List configPatterns) + throws IOException { + LOG.info(String.format( + "Fetching last %d matching jobs for cluster=%s, user.name=%s, " + + "batch.desc=%s, pig.logical.plan.signature=%s", limit, cluster, + username, batchDesc, signature)); + + StringBuilder urlStringBuilder = buildFlowURL(cluster, username, batchDesc, + signature, limit, flowResponseFilters, jobResponseFilters); + + if ((configPatterns != null) && (configPatterns.size() > 0)) { + urlStringBuilder.append(AND); + urlStringBuilder.append(StringUtil.buildParam("includeConfRegex", + configPatterns)); + } + return retrieveFlowsFromURL(urlStringBuilder.toString()); + } + + /** + * builds up a StringBuilder with the parameters for the FLOW API + * @param cluster + * @param username + * @param batchDesc + * @param signature + * @param limit + * @param flowResponseFilters + * @param jobResponseFilters + * @return + * @throws IOException + */ + private StringBuilder buildFlowURL(String cluster, + String username, + String batchDesc, + String signature, + int limit, + List flowResponseFilters, + List jobResponseFilters) throws IOException { + StringBuilder urlStringBuilder = new StringBuilder(); + urlStringBuilder.append("http://"); + urlStringBuilder.append(apiHostname); + urlStringBuilder.append(RestJSONResource.SLASH); + urlStringBuilder.append(URL_PORTION_API_V1); + urlStringBuilder.append(FLOW_API); + urlStringBuilder.append(RestJSONResource.SLASH); + urlStringBuilder.append(cluster); + urlStringBuilder.append(RestJSONResource.SLASH); + urlStringBuilder.append(username); + urlStringBuilder.append(RestJSONResource.SLASH); + urlStringBuilder.append(StringUtil.cleanseToken(batchDesc)); + if (StringUtils.isNotEmpty(signature)) { + urlStringBuilder.append(RestJSONResource.SLASH); + urlStringBuilder.append(signature); + } + urlStringBuilder.append(QUESTION_MARK); + urlStringBuilder.append(LIMIT); + urlStringBuilder.append(EQUAL_TO); + urlStringBuilder.append(limit); + if ((flowResponseFilters != null) && (flowResponseFilters.size() > 0)) { + urlStringBuilder.append(AND); + urlStringBuilder.append(StringUtil.buildParam("include", + flowResponseFilters)); + } + + if ((jobResponseFilters != null) && (jobResponseFilters.size() > 0)) { + urlStringBuilder.append(AND); + urlStringBuilder.append(StringUtil.buildParam("includeJobField", + jobResponseFilters)); } - return sb.toString(); + + return urlStringBuilder; } - @SuppressWarnings("unchecked") private List retrieveFlowsFromURL(String endpointURL) throws IOException { if (LOG.isInfoEnabled()) { LOG.info("Requesting job history from " + endpointURL); @@ -186,7 +346,23 @@ public List fetchTaskDetails(String cluster, String jobId) throws I return retrieveTaskDetailsFromUrl(urlString); } - @SuppressWarnings("unchecked") + /** + * Fetch details tasks of a given job for the specified fields + * @param cluster + * @param jobId + * @param taskResponseFilters + * @return + */ + public List fetchTaskDetails(String cluster, String jobId, + List taskResponseFilters) throws IOException { + String taskFilters = StringUtil.buildParam("include", + taskResponseFilters); + String urlString = String.format("http://%s/api/v1/tasks/%s/%s?%s", + apiHostname, cluster, jobId, taskFilters); + return retrieveTaskDetailsFromUrl(urlString); + } + + private List retrieveTaskDetailsFromUrl(String endpointURL) throws IOException { if (LOG.isInfoEnabled()) { LOG.info("Requesting task history from " + endpointURL); @@ -210,6 +386,10 @@ public static void main(String[] args) throws IOException { boolean useHBaseAPI = false; boolean dumpJson = false; boolean hydrateTasks = false; + List taskResponseFilters = new ArrayList(); + List jobResponseFilters = new ArrayList(); + List flowResponseFilters = new ArrayList(); + List configFields = new ArrayList(); StringBuffer usage = new StringBuffer("Usage: java "); usage.append(HRavenRestClient.class.getName()).append(" [-options]\n"); @@ -225,6 +405,10 @@ public static void main(String[] args) throws IOException { usage.append(" -H - use HBase API, not the REST API\n"); usage.append(" -j - output json\n"); usage.append(" -t - retrieve task information as well"); + usage.append(" -w - config field to be included in job response"); + usage.append(" -z - field to be included in task response"); + usage.append(" -y - field to be included in job response"); + usage.append(" -x - field to be included in flow response"); for (int i = 0; i < args.length; i++) { if("-a".equals(args[i])) { @@ -254,6 +438,22 @@ public static void main(String[] args) throws IOException { } else if("-t".equals(args[i])) { hydrateTasks = true; continue; + } else if("-z".equals(args[i])) { + String taskFilters = args[++i]; + taskResponseFilters = Arrays.asList(taskFilters.split(",")); + continue; + } else if("-y".equals(args[i])) { + String jobFilters = args[++i]; + jobResponseFilters = Arrays.asList(jobFilters.split(",")); + continue; + } else if("-x".equals(args[i])) { + String flowFilters = args[++i]; + flowResponseFilters =Arrays.asList(flowFilters.split(",")); + continue; + } else if("-w".equals(args[i])) { + String configFilters = args[++i]; + configFields =Arrays.asList(configFilters.split(",")); + continue; } else if ("-h".equals(args[i])) { System.err.println(usage.toString()); System.exit(1); @@ -273,22 +473,39 @@ public static void main(String[] args) throws IOException { flows = jobHistoryService.getFlowSeries(cluster, username, batchDesc, signature, hydrateTasks, limit); } else { - HRavenRestClient client = new HRavenRestClient(apiHostname); - flows = client.fetchFlows(cluster, username, batchDesc, signature, limit); + HRavenRestClient client = new HRavenRestClient(apiHostname, 100000, 100000); + + // use this call to call flows without configs + flows = client.fetchFlows(cluster, username, batchDesc, signature, + flowResponseFilters, jobResponseFilters, limit); + // use this call to call flows with configs + flows = client.fetchFlowsWithConfig(cluster, username, batchDesc, signature, + limit, flowResponseFilters, jobResponseFilters, configFields ); + // use this call to call flows with config patterns + flows = client.fetchFlowsWithConfig(cluster, username, batchDesc, signature, + limit, flowResponseFilters, jobResponseFilters, configFields ); + if (hydrateTasks) { for (Flow flow : flows) { for (JobDetails jd : flow.getJobs()) { String jobId = jd.getJobId(); - List td = client.fetchTaskDetails(cluster, jobId); + List td = client.fetchTaskDetails(cluster, jobId, taskResponseFilters); jd.addTasks(td); } } } } - if(dumpJson) { + if (dumpJson) { ObjectMapper om = ObjectMapperProvider.createCustomMapper(); - System.out.println(om.writeValueAsString(flows)); + SimpleModule module = new SimpleModule("hRavenModule", new Version(0, 4, + 0, null)); + module.addSerializer(Flow.class, new FlowSerializer()); + module.addSerializer(JobDetails.class, new JobDetailsSerializer()); + om.registerModule(module); + if (flows.size() > 0) { + System.out.println(om.writeValueAsString(flows.get(0))); + } return; } diff --git a/hraven-core/src/main/java/com/twitter/hraven/util/StringUtil.java b/hraven-core/src/main/java/com/twitter/hraven/util/StringUtil.java index aaf1bd1..a29f7d1 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/util/StringUtil.java +++ b/hraven-core/src/main/java/com/twitter/hraven/util/StringUtil.java @@ -15,6 +15,10 @@ */ package com.twitter.hraven.util; +import java.io.IOException; +import java.net.URLEncoder; +import java.util.List; + import com.twitter.hraven.Constants; /** @@ -41,4 +45,43 @@ public static String cleanseToken(String token) { return cleansed; } + + /** + * builds up a String with the parameters for the filtering of fields + * @param paramName + * @param paramArgs + * @return String + * @throws IOException + */ + public static String buildParam(String paramName, List paramArgs) + throws IOException { + StringBuilder sb = new StringBuilder(); + for (String arg : paramArgs) { + if (sb.length() > 0) { + sb.append("&"); + } + sb.append(paramName).append("=").append(URLEncoder.encode(arg, "UTF-8")); + } + return sb.toString(); + } + + /** + * builds up a String with the parameters for the filtering of fields + * @param paramName + * @param paramArgs + * @return String + * @throws IOException + */ + public static String buildParam(String paramName, String[] paramArgs) + throws IOException { + StringBuilder sb = new StringBuilder(); + for (String arg : paramArgs) { + if (sb.length() > 0) { + sb.append("&"); + } + sb.append(paramName).append("=").append(URLEncoder.encode(arg, "UTF-8")); + } + return sb.toString(); + } + } diff --git a/hraven-core/src/test/java/com/twitter/hraven/TestJsonSerde.java b/hraven-core/src/test/java/com/twitter/hraven/TestJsonSerde.java index 5b6f029..eed4ad4 100644 --- a/hraven-core/src/test/java/com/twitter/hraven/TestJsonSerde.java +++ b/hraven-core/src/test/java/com/twitter/hraven/TestJsonSerde.java @@ -128,9 +128,10 @@ public void testSerializationContext() throws Exception { // test serialization matching specific property keys // serialize flow into json - RestJSONResource.serializationContext.set( - new SerializationContext(SerializationContext.DetailLevel.EVERYTHING, - new SerializationContext.ConfigurationFilter(serializedKeys))); + RestJSONResource.serializationContext.set(new SerializationContext( + SerializationContext.DetailLevel.EVERYTHING, + new SerializationContext.FieldNameFilter(serializedKeys), null, null, + null)); ObjectMapper om = ObjectMapperProvider.createCustomMapper(); om.configure(SerializationConfig.Feature.INDENT_OUTPUT, true); om.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false); @@ -149,9 +150,10 @@ public void testSerializationContext() throws Exception { // test serialization matching property regexes List patterns = Lists.newArrayList("^.*prop$"); - RestJSONResource.serializationContext.set( - new SerializationContext(SerializationContext.DetailLevel.EVERYTHING, - new SerializationContext.RegexConfigurationFilter(patterns))); + RestJSONResource.serializationContext.set(new SerializationContext( + SerializationContext.DetailLevel.EVERYTHING, + new SerializationContext.RegexConfigurationFilter(patterns), null, + null, null)); om = ObjectMapperProvider.createCustomMapper(); om.configure(SerializationConfig.Feature.INDENT_OUTPUT, true); om.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false);