diff --git a/pygribjump/src/pygribjump/gribjump_c.h b/pygribjump/src/pygribjump/gribjump_c.h index 4d6a0ed..2d10c04 100644 --- a/pygribjump/src/pygribjump/gribjump_c.h +++ b/pygribjump/src/pygribjump/gribjump_c.h @@ -26,7 +26,7 @@ int gribjump_result_values(gribjump_extraction_result_t* result, double*** value int gribjump_result_values_nocopy(gribjump_extraction_result_t* result, double*** values, unsigned long* nrange, unsigned long** nvalues); int gribjump_result_mask(gribjump_extraction_result_t* result, unsigned long long*** masks, unsigned long* nrange, unsigned long** nmasks); int gribjump_delete_result(gribjump_extraction_result_t* result); -int gribjump_new_axes(gj_axes_t** axes, const char* reqstr, gribjump_handle_t* gj); +int gribjump_new_axes(gj_axes_t** axes, const char* reqstr, int* level, const char* ctx, gribjump_handle_t* gj); int gribjump_axes_keys(gj_axes_t* axes, const char*** keys_out, unsigned long* size); int gribjump_axes_values(gj_axes_t* axes, const char* key, const char*** values_out, unsigned long* size); int gribjump_delete_axes(gj_axes_t* axes); diff --git a/pygribjump/src/pygribjump/pygribjump.py b/pygribjump/src/pygribjump/pygribjump.py index 892bc35..e52e7a0 100644 --- a/pygribjump/src/pygribjump/pygribjump.py +++ b/pygribjump/src/pygribjump/pygribjump.py @@ -126,10 +126,7 @@ def extract(self, polyrequest, ctx=None, dump=True): nfields = ffi.new('unsigned long**') nrequests = len(requests) c_requests = ffi.new('gribjump_extraction_request_t*[]', [r.ctype for r in requests]) - if (ctx): - logctx=str(ctx) - else: - logctx="" + logctx=str(ctx) if ctx else "pygribjump_extract" logctx_c = ffi.new('const char[]', logctx.encode('ascii')) lib.extract(self.__gribjump, c_requests, nrequests, results_array, nfields, logctx_c) @@ -219,12 +216,17 @@ def extract_single(self, request): ] return res - def axes(self, req): + + def axes(self, req, level=3, ctx=None): # note old axes used a dict in. This is now a string. + logctx=str(ctx) if ctx else "pygribjump_axes" + ctx_c = ffi.new('const char[]', logctx.encode('ascii')) requeststr = dic_to_request(req) newaxes = ffi.new('gj_axes_t**') reqstr = ffi.new('const char[]', requeststr.encode('ascii')) - lib.gribjump_new_axes(newaxes, reqstr, self.__gribjump) + level_c = ffi.new('int*', level) + lib.gribjump_new_axes(newaxes, reqstr, level_c, ctx_c, self.__gribjump) + # TODO want to return a dict like: # {key: [value1, value2, ...], ...} # each key and value is a string diff --git a/pygribjump/tests/test_pygribjump.py b/pygribjump/tests/test_pygribjump.py index f4d4816..c451fd0 100644 --- a/pygribjump/tests/test_pygribjump.py +++ b/pygribjump/tests/test_pygribjump.py @@ -83,3 +83,16 @@ def test_extract_simple_sunshine_case(read_only_fdb_setup) -> None: actual = grib_jump.extract(polyrequest) assert numpy.array_equal(expected, actual[0][0][0][0], equal_nan=True) + +def test_axes(read_only_fdb_setup) -> None: + gribjump = pygj.GribJump() + req = { + "date": "20230508", + } + ax1 = gribjump.axes(req, level=1) # {'class': ['od'], 'date': ['20230508'], 'domain': ['g'], 'expver': ['0001'], 'stream': ['oper'], 'time': ['1200']} + ax2 = gribjump.axes(req, level=2) # {'class': ['od'], 'date': ['20230508'], 'domain': ['g'], 'expver': ['0001'], 'levtype': ['sfc'], 'stream': ['oper'], 'time': ['1200'], 'type': ['fc']} + ax3 = gribjump.axes(req, level=3) # {'class': ['od'], 'date': ['20230508'], 'domain': ['g'], 'expver': ['0001'], 'levelist': [''], 'levtype': ['sfc'], 'param': ['151130'], 'step': ['1'], 'stream': ['oper'], 'time': ['1200'], 'type': ['fc']} + + assert len(ax1.keys()) == 6 + assert len(ax2.keys()) == 8 + assert len(ax3.keys()) == 11 diff --git a/src/gribjump/CMakeLists.txt b/src/gribjump/CMakeLists.txt index 6dad4bc..dce4cc5 100644 --- a/src/gribjump/CMakeLists.txt +++ b/src/gribjump/CMakeLists.txt @@ -47,6 +47,7 @@ list( APPEND gribjump_srcs ExtractionData.cc ExtractionData.h Metrics.h + Metrics.cc Types.h ) diff --git a/src/gribjump/Engine.cc b/src/gribjump/Engine.cc index 3024387..c33d82d 100644 --- a/src/gribjump/Engine.cc +++ b/src/gribjump/Engine.cc @@ -159,6 +159,7 @@ filemap_t Engine::buildFileMap(const ExtractionRequests& requests, ExItemMap& ke } const metkit::mars::MarsRequest req = unionRequest(marsrequests); + MetricsManager::instance().set("union_request", req.asString()); timer.reset("Gribjump Engine: Flattened requests and constructed union request"); filemap_t filemap = FDBLister::instance().fileMap(req, keyToExtractionItem); @@ -248,14 +249,19 @@ void Engine::scheduleTasks(filemap_t& filemap){ ResultsMap Engine::extract(const ExtractionRequests& requests, bool flatten) { + eckit::Timer timer("Engine::extract"); ExItemMap keyToExtractionItem = buildKeyToExtractionItem(requests, flatten); // Owns the ExtractionItems filemap_t filemap = buildFileMap(requests, keyToExtractionItem); - eckit::Timer timer("Engine::extract"); + MetricsManager::instance().set("elapsed_build_filemap", timer.elapsed()); + timer.reset("Gribjump Engine: Built file map"); scheduleTasks(filemap); + MetricsManager::instance().set("elapsed_tasks", timer.elapsed()); timer.reset("Gribjump Engine: All tasks finished"); ResultsMap results = collectResults(keyToExtractionItem); + MetricsManager::instance().set("elapsed_collect_results", timer.elapsed()); + timer.reset("Gribjump Engine: Repackaged results"); return results; @@ -309,8 +315,9 @@ size_t Engine::scan(std::vector files) { return files.size(); } -std::map > Engine::axes(const std::string& request) { - return FDBLister::instance().axes(request); +std::map > Engine::axes(const std::string& request, int level) { + MetricsManager::instance().set("request", request); + return FDBLister::instance().axes(request, level); } void Engine::reportErrors(eckit::Stream& client) { @@ -320,10 +327,6 @@ void Engine::reportErrors(eckit::Stream& client) { void Engine::raiseErrors() { taskGroup_.raiseErrors(); } -void Engine::updateMetrics(Metrics& metrics) { - metrics.nTasks = taskGroup_.nTasks(); - metrics.nFailedTasks = taskGroup_.nErrors(); -} //---------------------------------------------------------------------------------------------------------------------- } // namespace gribjump diff --git a/src/gribjump/Engine.h b/src/gribjump/Engine.h index 18f9047..57f5760 100644 --- a/src/gribjump/Engine.h +++ b/src/gribjump/Engine.h @@ -34,10 +34,9 @@ class Engine { size_t scan(const MarsRequests& requests, bool byfiles = false); size_t scan(std::vector files); - std::map > axes(const std::string& request); + std::map > axes(const std::string& request, int level=3); void scheduleTasks(filemap_t& filemap); - void updateMetrics(Metrics& metrics); void reportErrors(eckit::Stream& client_); void raiseErrors(); diff --git a/src/gribjump/GribJump.cc b/src/gribjump/GribJump.cc index d3f010d..d40c8c2 100644 --- a/src/gribjump/GribJump.cc +++ b/src/gribjump/GribJump.cc @@ -27,7 +27,8 @@ GribJump::GribJump() { GribJump::~GribJump() { } -size_t GribJump::scan(const std::vector& paths) { +size_t GribJump::scan(const std::vector& paths, const LogContext& ctx) { + ContextManager::instance().set(ctx); if (paths.empty()) { throw eckit::UserError("Paths must not be empty", Here()); @@ -37,7 +38,8 @@ size_t GribJump::scan(const std::vector& paths) { return ret; } -size_t GribJump::scan(const std::vector requests, bool byfiles) { +size_t GribJump::scan(const std::vector requests, bool byfiles, const LogContext& ctx) { + ContextManager::instance().set(ctx); if (requests.empty()) { throw eckit::UserError("Requests must not be empty", Here()); @@ -48,17 +50,19 @@ size_t GribJump::scan(const std::vector requests, boo } -std::vector>> GribJump::extract(const std::vector& requests, LogContext ctx) { +std::vector>> GribJump::extract(const std::vector& requests, const LogContext& ctx) { + ContextManager::instance().set(ctx); if (requests.empty()) { throw eckit::UserError("Requests must not be empty", Here()); } - std::vector>> out = impl_->extract(requests, ctx); // ... why is this still using raw pointers? // why are we not using extraction items? + std::vector>> out = impl_->extract(requests); // why are we not using extraction items? return out; } -std::vector> GribJump::extract(const eckit::PathName& path, const std::vector& offsets, const std::vector>& ranges) { +std::vector> GribJump::extract(const eckit::PathName& path, const std::vector& offsets, const std::vector>& ranges, const LogContext& ctx) { + ContextManager::instance().set(ctx); if (path.asString().empty()) { throw eckit::UserError("Path must not be empty", Here()); @@ -74,13 +78,14 @@ std::vector> GribJump::extract(const eckit::Path return out; } -std::map> GribJump::axes(const std::string& request) { +std::map> GribJump::axes(const std::string& request, int level, const LogContext& ctx) { + ContextManager::instance().set(ctx); if (request.empty()) { throw eckit::UserError("Request string must not be empty", Here()); } - auto out = impl_->axes(request); + auto out = impl_->axes(request, level); return out; } diff --git a/src/gribjump/GribJump.h b/src/gribjump/GribJump.h index 947d09e..ae29e7e 100644 --- a/src/gribjump/GribJump.h +++ b/src/gribjump/GribJump.h @@ -43,13 +43,13 @@ class GribJump { ~GribJump(); - size_t scan(const std::vector& paths); - size_t scan(std::vector requests, bool byfiles = false); + size_t scan(const std::vector& paths, const LogContext& ctx=LogContext("none")); + size_t scan(std::vector requests, bool byfiles = false, const LogContext& ctx=LogContext("none")); - std::vector>> extract(const std::vector& requests, LogContext ctx=LogContext("none")); - std::vector> extract(const eckit::PathName& path, const std::vector& offsets, const std::vector>& ranges); + std::vector>> extract(const std::vector& requests, const LogContext& ctx=LogContext("none")); + std::vector> extract(const eckit::PathName& path, const std::vector& offsets, const std::vector>& ranges, const LogContext& ctx=LogContext("none")); - std::map> axes(const std::string& request); + std::map> axes(const std::string& request, int level=3, const LogContext& ctx=LogContext("none")); void stats(); diff --git a/src/gribjump/GribJumpBase.h b/src/gribjump/GribJumpBase.h index 9baf74f..190355f 100644 --- a/src/gribjump/GribJumpBase.h +++ b/src/gribjump/GribJumpBase.h @@ -47,17 +47,16 @@ class GribJumpBase : public eckit::NonCopyable { virtual size_t scan(const std::vector requests, bool byfiles) = 0; - virtual std::vector>> extract(std::vector, LogContext ctx=LogContext("none")) = 0; + virtual std::vector>> extract(std::vector) = 0; virtual std::vector> extract(const eckit::PathName& path, const std::vector& offsets, const std::vector>& ranges) = 0; - virtual std::map> axes(const std::string& request) = 0; + virtual std::map> axes(const std::string& request, int level) = 0; virtual void stats(); protected: // members Stats stats_; - LogContext ctx_; }; } // namespace gribjump diff --git a/src/gribjump/Lister.cc b/src/gribjump/Lister.cc index c800a34..0a5853b 100644 --- a/src/gribjump/Lister.cc +++ b/src/gribjump/Lister.cc @@ -172,20 +172,20 @@ std::map< eckit::PathName, eckit::OffsetList > FDBLister::filesOffsets(std::vect return files; } -std::map > FDBLister::axes(const std::string& request) { +std::map > FDBLister::axes(const std::string& request, int level) { std::vector requests = fdb5::FDBToolRequest::requestsFromString(request, std::vector(), true); ASSERT(requests.size() == 1); // i.e. assume string is a single request. - return axes(requests.front()); + return axes(requests.front(), level); } -std::map > FDBLister::axes(const fdb5::FDBToolRequest& request) { +std::map > FDBLister::axes(const fdb5::FDBToolRequest& request, int level) { eckit::AutoLock lock(this); std::map> values; LOG_DEBUG_LIB(LibGribJump) << "Using FDB's (new) axes impl" << std::endl; - fdb5::IndexAxis ax = fdb_.axes(request); + fdb5::IndexAxis ax = fdb_.axes(request, level); ax.sort(); std::map> fdbValues = ax.map(); diff --git a/src/gribjump/Lister.h b/src/gribjump/Lister.h index ec3e264..e181569 100644 --- a/src/gribjump/Lister.h +++ b/src/gribjump/Lister.h @@ -32,7 +32,7 @@ class Lister { public: virtual std::vector list(const std::vector requests) = 0; // <-- May not want to use mars request? - virtual std::map > axes(const std::string& request) = 0 ; + virtual std::map > axes(const std::string& request, int level) = 0 ; void lock() { mutex_.lock(); locked_ = true; } void unlock() { mutex_.unlock(); locked_ = false; } @@ -55,8 +55,8 @@ class FDBLister : public Lister { static FDBLister& instance(); virtual std::vector list(const std::vector requests) override; - virtual std::map > axes(const std::string& request) override; - virtual std::map > axes(const fdb5::FDBToolRequest& request); + virtual std::map > axes(const std::string& request, int level) override; + virtual std::map > axes(const fdb5::FDBToolRequest& request, int level); filemap_t fileMap(const metkit::mars::MarsRequest& unionRequest, const ExItemMap& reqToXRR); // Used during extraction diff --git a/src/gribjump/LocalGribJump.cc b/src/gribjump/LocalGribJump.cc index 542c808..1386ce9 100644 --- a/src/gribjump/LocalGribJump.cc +++ b/src/gribjump/LocalGribJump.cc @@ -84,7 +84,7 @@ std::vector> LocalGribJump::extract(const eckit: } /// @todo, change API, remove extraction request -std::vector>> LocalGribJump::extract(ExtractionRequests requests, LogContext ctx) { +std::vector>> LocalGribJump::extract(ExtractionRequests requests) { bool flatten = true; Engine engine; @@ -120,13 +120,13 @@ ResultsMap LocalGribJump::extract(const std::vector& requests, cons return results; } -std::map> LocalGribJump::axes(const std::string& request) { +std::map> LocalGribJump::axes(const std::string& request, int level) { // Note: This is likely to be removed from GribJump, and moved to FDB. // Here for now to support polytope. Engine engine; - return engine.axes(request); + return engine.axes(request, level); } static GribJumpBuilder builder("local"); diff --git a/src/gribjump/LocalGribJump.h b/src/gribjump/LocalGribJump.h index 399a094..dbaa238 100644 --- a/src/gribjump/LocalGribJump.h +++ b/src/gribjump/LocalGribJump.h @@ -39,9 +39,9 @@ class LocalGribJump : public GribJumpBase { // old API std::vector> extract(const eckit::PathName& path, const std::vector& offsets, const std::vector>& ranges) override; - std::vector>> extract(std::vector, LogContext ctx=LogContext("none")) override; + std::vector>> extract(std::vector) override; - std::map> axes(const std::string& request) override; + std::map> axes(const std::string& request, int level) override; private: }; diff --git a/src/gribjump/Metrics.cc b/src/gribjump/Metrics.cc new file mode 100644 index 0000000..7fb69dc --- /dev/null +++ b/src/gribjump/Metrics.cc @@ -0,0 +1,106 @@ +/* + * (C) Copyright 2024- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +/// @author Christopher Bradley + +#include "eckit/runtime/Main.h" +#include "gribjump/Metrics.h" + +namespace { +std::string iso(time_t t) { + char buf[80]; + ::strftime(buf, sizeof(buf), "%FT%TZ", gmtime(&t)); + return std::string(buf); +} +} // namespace + +namespace gribjump { + +// -------------------------------------------------------------------------------------------------------------------------------- +thread_local LogContext ContextManager::context_; + +// -------------------------------------------------------------------------------------------------------------------------------- + +Metrics::Metrics() : created_(std::time(nullptr)) {} + +Metrics::~Metrics() {} + +void Metrics::add(const std::string& name, const eckit::Value& value) { + values_[name] = value; +} + +void Metrics::report() { + + time_t now = std::time(nullptr); + + std::ostringstream oss; + eckit::JSON j(oss, false); + + j.startObject(); + j << "process" << eckit::Main::instance().name(); + j << "start_time" << iso(created_); + j << "end_time" << iso(now); + j << "run_time" << timer_.elapsed(); + + for (const auto& [name, value] : values_) { + j << name << value; + } + j << "context" << ContextManager::instance().context(); + j.endObject(); + + eckit::Log::metrics() << oss.str() << std::endl; +} + +// -------------------------------------------------------------------------------------------------------------------------------- + +MetricsManager::MetricsManager() {} + +MetricsManager& MetricsManager::instance() { + static MetricsManager instance; + return instance; +} + +MetricsManager::~MetricsManager() {} + + +void MetricsManager::set(const std::string& name, const eckit::Value& value) { + metrics().add(name, value); +} + +Metrics& MetricsManager::metrics() { + static thread_local Metrics metrics; + return metrics; +} + +void MetricsManager::report() { + metrics().report(); +} + + +// -------------------------------------------------------------------------------------------------------------------------------- +ContextManager::ContextManager() { +} + +ContextManager& ContextManager::instance() { + static ContextManager instance; + return instance; +} + +ContextManager::~ContextManager() {} + +void ContextManager::set(const LogContext& context) { + context_ = context; +} + +LogContext& ContextManager::context() { + return context_; +} + +} // namespace gribjump diff --git a/src/gribjump/Metrics.h b/src/gribjump/Metrics.h index 22399eb..24bd44d 100644 --- a/src/gribjump/Metrics.h +++ b/src/gribjump/Metrics.h @@ -16,6 +16,7 @@ #include "eckit/log/Timer.h" #include "eckit/log/JSON.h" #include "eckit/log/TimeStamp.h" +#include "eckit/runtime/Metrics.h" namespace gribjump { @@ -53,51 +54,72 @@ class LogContext { std::string context_; }; - // -------------------------------------------------------------------------------------------------------------------------------- - +///@todo: Investigate using eckit::metrics and MetricsCollector +// it does not look usable in a multi-threaded context. class Metrics { public: // methods - Metrics(LogContext ctx) : context_(ctx) { - start_ = std::string(eckit::TimeStamp("%Y-%m-%dT%H:%M:%SZ")); - } + Metrics(); + ~Metrics(); - ~Metrics() {} - - void report() { - eckit::JSON j(eckit::Log::metrics(), false); - j.startObject(); - j << "action" << action; - j << "start_time" << start_; - j << "end_time" << eckit::TimeStamp("%Y-%m-%dT%H:%M:%SZ"); - j << "count_requests" << nRequests; - j << "count_tasks" << nTasks; - j << "count_failed_tasks" << nFailedTasks; - j << "elapsed_receive" << timeReceive; - j << "elapsed_execute" << timeExecute; - j << "elapsed_reply" << timeReply; - j << "elapsed_total" << timer_.elapsed(); - j << "context" << context_; - j.endObject(); - } + void add(const std::string& name, const eckit::Value& value); + + void addContext(const LogContext& context) { context_ = context; } + + void report(); public: // members - std::string action; - int nRequests = -1; - int nTasks = -1; - int nFailedTasks = -1; - double timeReceive = 0; - double timeExecute = 0; - double timeReply = 0; + eckit::ValueMap values_; private: // members LogContext context_; + time_t created_; + time_t prevTime_; + eckit::Timer timer_; - std::string start_; }; +// -------------------------------------------------------------------------------------------------------------------------------- +// Wrapper around Metrics to treat them as thread-local +class MetricsManager { + +public: // methods + static MetricsManager& instance(); + + void set(const std::string& name, const eckit::Value& value); + // void setContext(const LogContext& context); + void report(); + +private: + MetricsManager(); + ~MetricsManager(); + + Metrics& metrics(); + +}; + +// -------------------------------------------------------------------------------------------------------------------------------- + +// Context also needs to be thread-local +class ContextManager { +public: + static ContextManager& instance(); + + LogContext& context(); + + void set(const LogContext& context); + +private: + + ContextManager(); + ~ContextManager(); + +private: + static thread_local LogContext context_; + +}; } // namespace gribjump diff --git a/src/gribjump/Task.cc b/src/gribjump/Task.cc index 63e0544..cb22c48 100644 --- a/src/gribjump/Task.cc +++ b/src/gribjump/Task.cc @@ -109,6 +109,13 @@ void TaskGroup::waitForTasks() { cv_.wait(lock, [&]{return counter_ == taskStatus_.size();}); waiting_ = false; LOG_DEBUG_LIB(LibGribJump) << "All tasks complete" << std::endl; + + MetricsManager::instance().set("count_tasks", taskStatus_.size()); + MetricsManager::instance().set("count_failed_tasks", errors_.size()); + + if (errors_.size() > 0) { + MetricsManager::instance().set("first_error", errors_[0]); + } } void TaskGroup::reportErrors(eckit::Stream& client) { diff --git a/src/gribjump/gribjump_c.cc b/src/gribjump/gribjump_c.cc index 4271662..0b323a5 100644 --- a/src/gribjump/gribjump_c.cc +++ b/src/gribjump/gribjump_c.cc @@ -251,7 +251,7 @@ int extract(gribjump_handle_t* handle, gribjump_extraction_request_t** requests, if (ctx) { logctx = LogContext(ctx); } - + std::vector>> results; results = handle->extract(reqs, logctx); @@ -269,12 +269,16 @@ int extract(gribjump_handle_t* handle, gribjump_extraction_request_t** requests, } -int gribjump_new_axes(gj_axes_t** axes, const char* reqstr, gribjump_handle_t* gj) { +int gribjump_new_axes(gj_axes_t** axes, const char* reqstr, int* level, const char* ctx, gribjump_handle_t* gj) { return wrapApiFunction([=] { ASSERT(gj); + LogContext logctx; + if (ctx) { + logctx = LogContext(ctx); + } std::string reqstr_str(reqstr); std::map> values; - values = gj->axes(reqstr_str); + values = gj->axes(reqstr_str, *level, logctx); *axes = new gj_axes_t(values); }); } diff --git a/src/gribjump/gribjump_c.h b/src/gribjump/gribjump_c.h index ec25370..3a2a5c1 100644 --- a/src/gribjump/gribjump_c.h +++ b/src/gribjump/gribjump_c.h @@ -43,7 +43,7 @@ int gribjump_result_values_nocopy(gribjump_extraction_result_t* result, double** int gribjump_result_mask(gribjump_extraction_result_t* result, unsigned long long*** masks, unsigned long* nrange, unsigned long** nmasks); int gribjump_delete_result(gribjump_extraction_result_t* result); -int gribjump_new_axes(gj_axes_t** axes, const char* reqstr, gribjump_handle_t* gj); +int gribjump_new_axes(gj_axes_t** axes, const char* reqstr, int* level, const char* ctx, gribjump_handle_t* gj); int gribjump_axes_keys(gj_axes_t* axes, const char*** keys_out, unsigned long* size); int gribjump_axes_values(gj_axes_t* axes, const char* key, const char*** values_out, unsigned long* size); int gribjump_delete_axes(gj_axes_t* axes); diff --git a/src/gribjump/remote/GribJumpUser.cc b/src/gribjump/remote/GribJumpUser.cc index 998bc61..0d3ba02 100644 --- a/src/gribjump/remote/GribJumpUser.cc +++ b/src/gribjump/remote/GribJumpUser.cc @@ -40,6 +40,7 @@ void GribJumpUser::serve(eckit::Stream& s, std::istream& in, std::ostream& out) catch (std::exception& e) { eckit::Log::error() << "** " << e.what() << " Caught in " << Here() << std::endl; eckit::Log::error() << "** Exception is handled" << std::endl; + MetricsManager::instance().set("error", e.what()); try { s << e; } @@ -51,10 +52,10 @@ void GribJumpUser::serve(eckit::Stream& s, std::istream& in, std::ostream& out) LOG_DEBUG_LIB(LibGribJump) << eckit::system::ResourceUsage() << std::endl; + MetricsManager::instance().report(); } void GribJumpUser::handle_client(eckit::Stream& s, eckit::Timer& timer) { - uint16_t version; uint16_t i_requestType; @@ -64,87 +65,45 @@ void GribJumpUser::handle_client(eckit::Stream& s, eckit::Timer& timer) { } LogContext ctx(s); - ctx_ = ctx; + ContextManager::instance().set(ctx); s >> i_requestType; RequestType requestType = static_cast(i_requestType); + switch (requestType) { case RequestType::EXTRACT: - extract(s, timer); + processRequest(s); break; case RequestType::AXES: - axes(s, timer); + processRequest(s); break; case RequestType::SCAN: - scan(s, timer); + processRequest(s); break; case RequestType::FORWARD_EXTRACT: - forwardedExtract(s, timer); + processRequest(s); break; default: throw eckit::SeriousBug("Unknown request type: " + std::to_string(i_requestType)); } } -void GribJumpUser::forwardedExtract(eckit::Stream& s, eckit::Timer& timer) { - - timer.reset(); - - ForwardedExtractRequest request(s, ctx_); - timer.reset("ForwardedExtract requests received"); - - request.execute(); - timer.reset("ForwardedExtract tasks completed"); - - request.replyToClient(); - timer.reset("ForwardedExtract results sent"); - -} - -void GribJumpUser::scan(eckit::Stream& s, eckit::Timer& timer) { - - timer.reset(); - - ScanRequest request(s, ctx_); - timer.reset("SCAN requests received"); - - request.execute(); - timer.reset("SCAN tasks completed"); - - request.replyToClient(); - timer.reset("SCAN results sent"); -} - -void GribJumpUser::axes(eckit::Stream& s, eckit::Timer& timer) { - - timer.reset(); - - AxesRequest request(s, ctx_); - timer.reset("Axes request received"); - - request.execute(); - timer.reset("Axes tasks completed"); - - request.replyToClient(); - timer.reset("Axes results sent"); -} -void GribJumpUser::extract(eckit::Stream& s, eckit::Timer& timer) { +template +void GribJumpUser::processRequest(eckit::Stream& s) { + eckit::Timer timer; - timer.reset(); - - ExtractRequest request(s, ctx_); - - timer.reset("EXTRACT requests received"); + RequestType request(s); + MetricsManager::instance().set("elapsed_receive", timer.elapsed()); + timer.reset("Request received"); request.execute(); - timer.reset("EXTRACT tasks completed"); + MetricsManager::instance().set("elapsed_execute", timer.elapsed()); + timer.reset("Request executed"); + request.reportErrors(); request.replyToClient(); - - request.reportMetrics(); - - timer.reset("EXTRACT results sent"); + MetricsManager::instance().set("elapsed_reply", timer.elapsed()); + timer.reset("Request replied"); } - } // namespace gribjump diff --git a/src/gribjump/remote/GribJumpUser.h b/src/gribjump/remote/GribJumpUser.h index 8aeb9ab..f28c3b1 100644 --- a/src/gribjump/remote/GribJumpUser.h +++ b/src/gribjump/remote/GribJumpUser.h @@ -34,15 +34,12 @@ class GribJumpUser : public eckit::net::NetUser { void handle_client(eckit::Stream& s, eckit::Timer& timer); - void extract(eckit::Stream& s, eckit::Timer& timer); - void axes(eckit::Stream& s, eckit::Timer& timer); - void scan(eckit::Stream& s, eckit::Timer& timer); - void forwardedExtract(eckit::Stream& s, eckit::Timer& timer); + template + void processRequest(eckit::Stream& s); private: // members constexpr static uint16_t protocolVersion_ = 0; - LogContext ctx_; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/gribjump/remote/RemoteGribJump.cc b/src/gribjump/remote/RemoteGribJump.cc index 0bcb7e0..1872e68 100644 --- a/src/gribjump/remote/RemoteGribJump.cc +++ b/src/gribjump/remote/RemoteGribJump.cc @@ -37,7 +37,7 @@ RemoteGribJump::~RemoteGribJump() {} void RemoteGribJump::sendHeader(eckit::net::InstantTCPStream& stream, RequestType type) { stream << remoteProtocolVersion; - stream << ctx_; + stream << ContextManager::instance().context(); stream << static_cast(type); } @@ -78,7 +78,7 @@ size_t RemoteGribJump::scan(const std::vector request return count; } -std::vector>> RemoteGribJump::extract(std::vector requests, LogContext ctx) { +std::vector>> RemoteGribJump::extract(std::vector requests) { eckit::Timer timer("RemoteGribJump::extract()"); std::vector>> result; @@ -174,7 +174,7 @@ void RemoteGribJump::extract(filemap_t& filemap){ return; } -std::map> RemoteGribJump::axes(const std::string& request) { +std::map> RemoteGribJump::axes(const std::string& request, int level) { eckit::Timer timer("RemoteGribJump::axes()"); std::map> result; @@ -184,7 +184,8 @@ std::map> RemoteGribJump::axes(cons timer.report("Connection established"); sendHeader(stream, RequestType::AXES); - stream << request; + stream << request; + stream << level; timer.report("Request sent"); // receive response diff --git a/src/gribjump/remote/RemoteGribJump.h b/src/gribjump/remote/RemoteGribJump.h index 4665a84..e32d066 100644 --- a/src/gribjump/remote/RemoteGribJump.h +++ b/src/gribjump/remote/RemoteGribJump.h @@ -38,11 +38,11 @@ class RemoteGribJump : public GribJumpBase { size_t scan(const std::vector requests, bool byfiles) override; - std::vector>> extract(std::vector polyRequest, LogContext ctx) override; + std::vector>> extract(std::vector polyRequest) override; std::vector> extract(const eckit::PathName& path, const std::vector& offsets, const std::vector>& ranges) override; void extract(filemap_t& filemap); - std::map> axes(const std::string& request) override; + std::map> axes(const std::string& request, int level) override; private: // methods diff --git a/src/gribjump/remote/Request.cc b/src/gribjump/remote/Request.cc index e942e64..57d3cab 100644 --- a/src/gribjump/remote/Request.cc +++ b/src/gribjump/remote/Request.cc @@ -14,12 +14,21 @@ #include "gribjump/remote/Request.h" #include "gribjump/Engine.h" +namespace { + static std::atomic requestid_{0}; + static uint64_t requestid() { + return requestid_++; + } +} // namespace + namespace gribjump { //---------------------------------------------------------------------------------------------------------------------- // @todo: Lots of common behaviour between these classes, consider refactoring. Especially the interaction with metrics. -Request::Request(eckit::Stream& stream, LogContext ctx) : client_(stream), metrics_(ctx) { +Request::Request(eckit::Stream& stream) : client_(stream) { + id_ = requestid(); + MetricsManager::instance().set("request_id", id_); } Request::~Request() {} @@ -30,10 +39,8 @@ void Request::reportErrors() { //---------------------------------------------------------------------------------------------------------------------- -ScanRequest::ScanRequest(eckit::Stream& stream, LogContext ctx) : Request(stream, ctx) { - metrics_.action = "scan"; - - eckit::Timer timer; +ScanRequest::ScanRequest(eckit::Stream& stream) : Request(stream) { + MetricsManager::instance().set("action", "scan"); client_ >> byfiles_; @@ -48,38 +55,30 @@ ScanRequest::ScanRequest(eckit::Stream& stream, LogContext ctx) : Request(stream requests_.emplace_back(metkit::mars::MarsRequest(client_)); } - metrics_.nRequests = numRequests; - metrics_.timeReceive = timer.elapsed(); + MetricsManager::instance().set("count_requests", numRequests); } ScanRequest::~ScanRequest() { } void ScanRequest::execute() { - eckit::Timer timer; nfiles_ = engine_.scan(requests_, byfiles_); - engine_.updateMetrics(metrics_); - metrics_.timeExecute = timer.elapsed(); } void ScanRequest::replyToClient() { - eckit::Timer timer; - reportErrors(); client_ << nfiles_; - metrics_.timeReply = timer.elapsed(); } //---------------------------------------------------------------------------------------------------------------------- -ExtractRequest::ExtractRequest(eckit::Stream& stream, LogContext ctx) : Request(stream, ctx) { - metrics_.action = "extract"; +ExtractRequest::ExtractRequest(eckit::Stream& stream) : Request(stream) { + MetricsManager::instance().set("action", "extract"); // Receive the requests // Temp, repackage the requests from old format into format the engine expects - eckit::Timer timer; size_t nRequests; client_ >> nRequests; @@ -91,18 +90,15 @@ ExtractRequest::ExtractRequest(eckit::Stream& stream, LogContext ctx) : Request( flatten_ = false; // xxx hard coded for now - metrics_.nRequests = nRequests; - metrics_.timeReceive = timer.elapsed(); + MetricsManager::instance().set("count_requests", nRequests); } ExtractRequest::~ExtractRequest() { } void ExtractRequest::execute() { - eckit::Timer timer; results_ = engine_.extract(requests_, flatten_); - engine_.updateMetrics(metrics_); if (LibGribJump::instance().debug()) { for (auto& pair : results_) { @@ -112,13 +108,9 @@ void ExtractRequest::execute() { } } } - metrics_.timeExecute = timer.elapsed(); } void ExtractRequest::replyToClient() { - eckit::Timer timer; - - reportErrors(); // Send the results, again repackage. @@ -141,16 +133,12 @@ void ExtractRequest::replyToClient() { } LOG_DEBUG_LIB(LibGribJump) << "Sent " << nRequests << " results to client" << std::endl; - - metrics_.timeReply = timer.elapsed(); } //---------------------------------------------------------------------------------------------------------------------- -ForwardedExtractRequest::ForwardedExtractRequest(eckit::Stream& stream, LogContext ctx) : Request(stream, ctx) { - metrics_.action = "forwarded-extract"; - - eckit::Timer timer; +ForwardedExtractRequest::ForwardedExtractRequest(eckit::Stream& stream) : Request(stream) { + MetricsManager::instance().set("action", "forwarded-extract"); size_t nFiles; client_ >> nFiles; @@ -176,8 +164,7 @@ ForwardedExtractRequest::ForwardedExtractRequest(eckit::Stream& stream, LogConte } count += nItems; } - metrics_.nRequests = count; - metrics_.timeReceive = timer.elapsed(); + MetricsManager::instance().set("count_requests", count); ASSERT(count > 0); // We should not be talking to this server if we have no requests. } @@ -186,15 +173,10 @@ ForwardedExtractRequest::~ForwardedExtractRequest() { } void ForwardedExtractRequest::execute() { - eckit::Timer timer; engine_.scheduleTasks(filemap_); - engine_.updateMetrics(metrics_); - metrics_.timeExecute = timer.elapsed(); } void ForwardedExtractRequest::replyToClient() { - eckit::Timer timer; - reportErrors(); for (auto& [fname, extractionItems] : filemap_) { client_ << fname; // sanity check @@ -205,15 +187,14 @@ void ForwardedExtractRequest::replyToClient() { client_ << res; } } - - metrics_.timeReply = timer.elapsed(); } //---------------------------------------------------------------------------------------------------------------------- -AxesRequest::AxesRequest(eckit::Stream& stream, LogContext ctx) : Request(stream, ctx) { - metrics_.action = "axes"; +AxesRequest::AxesRequest(eckit::Stream& stream) : Request(stream) { + MetricsManager::instance().set("action", "axes"); client_ >> request_; + client_ >> level_; ASSERT(request_.size() > 0); } @@ -221,19 +202,11 @@ AxesRequest::~AxesRequest() { } void AxesRequest::execute() { - // @todo, use the engine. - // or, polytope should use pyfdb not gribjump for this. - eckit::Timer timer; - GribJump gj; - axes_ = gj.axes(request_); - metrics_.timeExecute = timer.elapsed(); + axes_ = engine_.axes(request_, level_); } void AxesRequest::replyToClient() { - eckit::Timer timer; - reportErrors(); - // print the axes we are sending for (auto& pair : axes_) { eckit::Log::info() << pair.first << ": "; @@ -253,8 +226,6 @@ void AxesRequest::replyToClient() { client_ << val; } } - - metrics_.timeReply = timer.elapsed(); } //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/gribjump/remote/Request.h b/src/gribjump/remote/Request.h index 9ecb5df..7fe014a 100644 --- a/src/gribjump/remote/Request.h +++ b/src/gribjump/remote/Request.h @@ -30,7 +30,7 @@ namespace gribjump { class Request { public: // methods - Request(eckit::Stream& stream, LogContext ctx = LogContext("none")); + Request(eckit::Stream& stream); virtual ~Request(); @@ -40,10 +40,6 @@ class Request { /// Reply to the client with the results of the request virtual void replyToClient() = 0; - void reportMetrics() {metrics_.report();} - -protected: // methods - void reportErrors(); protected: // members @@ -51,7 +47,7 @@ class Request { eckit::Stream& client_; Engine engine_; //< Engine and schedule tasks based on request - Metrics metrics_; + uint64_t id_; }; //---------------------------------------------------------------------------------------------------------------------- @@ -59,7 +55,7 @@ class Request { class ScanRequest : public Request { public: - ScanRequest(eckit::Stream& stream, LogContext ctx); + ScanRequest(eckit::Stream& stream); ~ScanRequest(); @@ -81,7 +77,7 @@ class ScanRequest : public Request { class ExtractRequest : public Request { public: - ExtractRequest(eckit::Stream& stream, LogContext ctx); + ExtractRequest(eckit::Stream& stream); ~ExtractRequest(); @@ -102,7 +98,7 @@ class ExtractRequest : public Request { class ForwardedExtractRequest : public Request { public: - ForwardedExtractRequest(eckit::Stream& stream, LogContext ctx); + ForwardedExtractRequest(eckit::Stream& stream); ~ForwardedExtractRequest(); @@ -123,7 +119,7 @@ class ForwardedExtractRequest : public Request { class AxesRequest : public Request { public: - AxesRequest(eckit::Stream& stream, LogContext ctx); + AxesRequest(eckit::Stream& stream); ~AxesRequest(); @@ -134,6 +130,7 @@ class AxesRequest : public Request { private: std::string request_; /// @todo why is this a string? + int level_; std::map> axes_; }; diff --git a/tests/remote/test_remote.cc b/tests/remote/test_remote.cc index ca92621..de868b8 100644 --- a/tests/remote/test_remote.cc +++ b/tests/remote/test_remote.cc @@ -10,12 +10,16 @@ #include #include +#include +#include #include "eckit/testing/Test.h" #include "eckit/filesystem/LocalPathName.h" #include "eckit/filesystem/TmpDir.h" +#include "eckit/parser/JSONParser.h" #include "gribjump/GribJump.h" +#include "gribjump/gribjump_config.h" #include "gribjump/FDBPlugin.h" #include "gribjump/info/InfoCache.h" #include "gribjump/info/JumpInfo.h" @@ -26,6 +30,7 @@ #include "metkit/mars/MarsParser.h" #include "metkit/mars/MarsExpension.h" + using namespace eckit::testing; namespace gribjump { @@ -33,6 +38,7 @@ namespace test { //----------------------------------------------------------------------------- static std::string gridHash = "33c7d6025995e1b4913811e77d38ec50"; // base file: extract_ranges.grib +static eckit::PathName metricsFile = "test_metrics"; //----------------------------------------------------------------------------- // Note: the environment for this test is configured by an external script. See tests/remote/test_server.sh.in @@ -58,7 +64,8 @@ CASE( "Remote protocol: extract" ) { } GribJump gribjump; - std::vector>> output = gribjump.extract(exRequests); + LogContext ctx("test_extract"); + std::vector>> output = gribjump.extract(exRequests, ctx); EXPECT_EQUAL(output.size(), 2); for (size_t i = 0; i < output.size(); i++) { @@ -69,22 +76,68 @@ CASE( "Remote protocol: extract" ) { } } } + CASE( "Remote protocol: axes" ) { GribJump gribjump; - std::map> axes = gribjump.axes("class=rd,expver=xxxx"); + LogContext ctx("test_axes"); + std::map> axes = gribjump.axes("class=rd,expver=xxxx", 3, ctx); EXPECT(axes.find("step") != axes.end()); EXPECT_EQUAL(axes["step"].size(), 3); } +#ifdef GRIBJUMP_HAVE_DHSKIT // metrics target is set by dhskit + +CASE( "Parse the metrics file" ) { + // The metrics file is created by the server, and contains JSON objects with metrics data. + // Make sure this is parseable. + + // To make sure server has had time to finish writing the file + std::this_thread::sleep_for(std::chrono::seconds(1)); + EXPECT(metricsFile.exists()); + + std::vector commonKeys = { + "process", + "start_time", + "end_time", + "run_time", + "context" + }; + + // Expect 2 JSON objects in the file + std::vector values; + std::ifstream file(metricsFile.asString().c_str()); + std::string line; + while (std::getline(file, line)) { + eckit::Value v = eckit::JSONParser::decodeString(line); + eckit::Log::info() << v << std::endl; + values.push_back(v); + + // Check common keys + for (const auto& key : commonKeys) { + EXPECT(v.contains(key)); + } + } + EXPECT_EQUAL(values.size(), 2); + // Check extract + eckit::Value v = values[0]; + EXPECT_EQUAL(v["action"], "extract"); + EXPECT_EQUAL(v["context"], "test_extract"); + + // Check axes + v = values[1]; + EXPECT_EQUAL(v["action"], "axes"); + EXPECT_EQUAL(v["context"], "test_axes"); + +} +#endif } // namespace test } // namespace gribjump //----------------------------------------------------------------------------- -int main(int argc, char **argv) -{ +int main(int argc, char **argv) { return run_tests ( argc, argv ); } diff --git a/tests/remote/test_server.sh.in b/tests/remote/test_server.sh.in index f57d296..bb36280 100755 --- a/tests/remote/test_server.sh.in +++ b/tests/remote/test_server.sh.in @@ -17,7 +17,7 @@ fdbconfig=$bindir/fdb_config.yaml fdbrootdir=$bindir/fdb_root fdbdata=$bindir/../extract_ranges.grib pidfile=$bindir/server.pid - +metricsfile=$bindir/test_metrics # --- Cleanup and setup cleanup() { if [[ -f $pidfile ]]; then @@ -29,6 +29,7 @@ cleanup() { fi rm -rf $fdbrootdir + rm -f $metricsfile } cleanup # start fresh @@ -39,7 +40,7 @@ env FDB5_CONFIG_FILE=$fdbconfig $fdbwrite $fdbdata # --- Start the server in the background. trap cleanup EXIT # Ensure cleanup is called when the script exits -env FDB5_CONFIG_FILE=$fdbconfig GRIBJUMP_CONFIG_FILE=$serverconfig GRIBJUMP_SERVER_PORT=57777 $gjserver & +env FDB5_CONFIG_FILE=$fdbconfig GRIBJUMP_CONFIG_FILE=$serverconfig GRIBJUMP_SERVER_PORT=57777 DHS_METRICS_FILE=$metricsfile $gjserver & echo $! > $pidfile sleep 1