Skip to content

Commit

Permalink
Thread local metrics overhaul
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisspyB committed Nov 1, 2024
1 parent 1282899 commit f8cc4af
Show file tree
Hide file tree
Showing 16 changed files with 249 additions and 172 deletions.
1 change: 1 addition & 0 deletions src/gribjump/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ list( APPEND gribjump_srcs
ExtractionData.cc
ExtractionData.h
Metrics.h
Metrics.cc
Types.h
)

Expand Down
13 changes: 8 additions & 5 deletions src/gribjump/Engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ filemap_t Engine::buildFileMap(const ExtractionRequests& requests, ExItemMap& ke
}

const metkit::mars::MarsRequest req = unionRequest(marsrequests);
MetricsManager::instance().set("union_request", req.asString());
timer.reset("Gribjump Engine: Flattened requests and constructed union request");

filemap_t filemap = FDBLister::instance().fileMap(req, keyToExtractionItem);
Expand Down Expand Up @@ -248,14 +249,19 @@ void Engine::scheduleTasks(filemap_t& filemap){

ResultsMap Engine::extract(const ExtractionRequests& requests, bool flatten) {

eckit::Timer timer("Engine::extract");
ExItemMap keyToExtractionItem = buildKeyToExtractionItem(requests, flatten); // Owns the ExtractionItems
filemap_t filemap = buildFileMap(requests, keyToExtractionItem);
eckit::Timer timer("Engine::extract");
MetricsManager::instance().set("elapsed_build_filemap", timer.elapsed());
timer.reset("Gribjump Engine: Built file map");

scheduleTasks(filemap);
MetricsManager::instance().set("elapsed_tasks", timer.elapsed());
timer.reset("Gribjump Engine: All tasks finished");

ResultsMap results = collectResults(keyToExtractionItem);
MetricsManager::instance().set("elapsed_collect_results", timer.elapsed());

timer.reset("Gribjump Engine: Repackaged results");

return results;
Expand Down Expand Up @@ -310,6 +316,7 @@ size_t Engine::scan(std::vector<eckit::PathName> files) {
}

std::map<std::string, std::unordered_set<std::string> > Engine::axes(const std::string& request) {
MetricsManager::instance().set("request", request);
return FDBLister::instance().axes(request);
}

Expand All @@ -320,10 +327,6 @@ void Engine::reportErrors(eckit::Stream& client) {
void Engine::raiseErrors() {
taskGroup_.raiseErrors();
}
void Engine::updateMetrics(Metrics& metrics) {
metrics.nTasks = taskGroup_.nTasks();
metrics.nFailedTasks = taskGroup_.nErrors();
}
//----------------------------------------------------------------------------------------------------------------------

} // namespace gribjump
1 change: 0 additions & 1 deletion src/gribjump/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class Engine {
std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request);

void scheduleTasks(filemap_t& filemap);
void updateMetrics(Metrics& metrics);

void reportErrors(eckit::Stream& client_);
void raiseErrors();
Expand Down
2 changes: 1 addition & 1 deletion src/gribjump/GribJump.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ std::vector<std::vector<std::unique_ptr<ExtractionResult>>> GribJump::extract(co
throw eckit::UserError("Requests must not be empty", Here());
}

std::vector<std::vector<std::unique_ptr<ExtractionResult>>> out = impl_->extract(requests, ctx); // ... why is this still using raw pointers? // why are we not using extraction items?
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> out = impl_->extract(requests, ctx); // why are we not using extraction items?
return out;
}

Expand Down
1 change: 0 additions & 1 deletion src/gribjump/GribJumpBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class GribJumpBase : public eckit::NonCopyable {
protected: // members

Stats stats_;
LogContext ctx_;
};

} // namespace gribjump
85 changes: 85 additions & 0 deletions src/gribjump/Metrics.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* (C) Copyright 2024- ECMWF.
*
* This software is licensed under the terms of the Apache Licence Version 2.0
* which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
* In applying this licence, ECMWF does not waive the privileges and immunities
* granted to it by virtue of its status as an intergovernmental organisation nor
* does it submit to any jurisdiction.
*/

/// @author Christopher Bradley

#include "eckit/runtime/Main.h"
#include "gribjump/Metrics.h"

namespace {
std::string iso(time_t t) {
char buf[80];
::strftime(buf, sizeof(buf), "%FT%TZ", gmtime(&t));
return std::string(buf);
}
} // namespace

namespace gribjump {

Metrics::Metrics() : created_(std::time(nullptr)) {}

Metrics::~Metrics() {}

void Metrics::add(const std::string& name, const eckit::Value& value) {
values_[name] = value;
}

void Metrics::report() {

time_t now = std::time(nullptr);

std::ostringstream oss;
eckit::JSON j(oss, false);

j.startObject();
j << "process" << eckit::Main::instance().name();
j << "start_time" << iso(created_);
j << "end_time" << iso(now);
j << "run_time" << timer_.elapsed();

for (const auto& [name, value] : values_) {
j << name << value;
}
j << "context" << context_;
j.endObject();

eckit::Log::metrics() << oss.str() << std::endl;
}

// --------------------------------------------------------------------------------------------------------------------------------

MetricsManager::MetricsManager() {}

MetricsManager& MetricsManager::instance() {
static MetricsManager instance;
return instance;
}

MetricsManager::~MetricsManager() {}


void MetricsManager::set(const std::string& name, const eckit::Value& value) {
metrics().add(name, value);
}

void MetricsManager::setContext(const LogContext& context) {
metrics().addContext(context);
}

Metrics& MetricsManager::metrics() {
static thread_local Metrics metrics;
return metrics;
}

void MetricsManager::report() {
metrics().report();
}

} // namespace gribjump
67 changes: 36 additions & 31 deletions src/gribjump/Metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "eckit/log/Timer.h"
#include "eckit/log/JSON.h"
#include "eckit/log/TimeStamp.h"
#include "eckit/runtime/Metrics.h"

namespace gribjump {

Expand Down Expand Up @@ -53,51 +54,55 @@ class LogContext {
std::string context_;
};


// --------------------------------------------------------------------------------------------------------------------------------

///@todo: Investigate using eckit::metrics and MetricsCollector
// it does not look usable in a multi-threaded context.
class Metrics {

public: // methods

Metrics(LogContext ctx) : context_(ctx) {
start_ = std::string(eckit::TimeStamp("%Y-%m-%dT%H:%M:%SZ"));
}
Metrics();
~Metrics();

~Metrics() {}

void report() {
eckit::JSON j(eckit::Log::metrics(), false);
j.startObject();
j << "action" << action;
j << "start_time" << start_;
j << "end_time" << eckit::TimeStamp("%Y-%m-%dT%H:%M:%SZ");
j << "count_requests" << nRequests;
j << "count_tasks" << nTasks;
j << "count_failed_tasks" << nFailedTasks;
j << "elapsed_receive" << timeReceive;
j << "elapsed_execute" << timeExecute;
j << "elapsed_reply" << timeReply;
j << "elapsed_total" << timer_.elapsed();
j << "context" << context_;
j.endObject();
}
void add(const std::string& name, const eckit::Value& value);

//
void addTime(const std::string& name);

void addContext(const LogContext& context) { context_ = context; }

void report();

public: // members

std::string action;
int nRequests = -1;
int nTasks = -1;
int nFailedTasks = -1;
double timeReceive = 0;
double timeExecute = 0;
double timeReply = 0;
eckit::ValueMap values_;

private: // members

LogContext context_;
time_t created_;
time_t prevTime_;

eckit::Timer timer_;
std::string start_;
};

// --------------------------------------------------------------------------------------------------------------------------------
// Wrapper around Metrics to treat them as thread-local
class MetricsManager {

public: // methods
static MetricsManager& instance();

void set(const std::string& name, const eckit::Value& value);
void setContext(const LogContext& context);
void report();

private:
MetricsManager();
~MetricsManager();

Metrics& metrics();

};

} // namespace gribjump
7 changes: 7 additions & 0 deletions src/gribjump/Task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ void TaskGroup::waitForTasks() {
cv_.wait(lock, [&]{return counter_ == taskStatus_.size();});
waiting_ = false;
LOG_DEBUG_LIB(LibGribJump) << "All tasks complete" << std::endl;

MetricsManager::instance().set("count_tasks", taskStatus_.size());
MetricsManager::instance().set("count_failed_tasks", errors_.size());

if (errors_.size() > 0) {
MetricsManager::instance().set("first_error", errors_[0]);
}
}

void TaskGroup::reportErrors(eckit::Stream& client) {
Expand Down
Loading

0 comments on commit f8cc4af

Please sign in to comment.