Skip to content

Commit

Permalink
Merge pull request #26 from ecmwf/feature/metrics-overhaul
Browse files Browse the repository at this point in the history
Thread local metrics overhaul
Thread local context
Add "level" to axes calls
Add "context" to all C++ API calls
  • Loading branch information
ChrisspyB authored Nov 4, 2024
2 parents 65d9010 + b00df78 commit b49eaca
Show file tree
Hide file tree
Showing 26 changed files with 357 additions and 217 deletions.
2 changes: 1 addition & 1 deletion pygribjump/src/pygribjump/gribjump_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ int gribjump_result_values(gribjump_extraction_result_t* result, double*** value
int gribjump_result_values_nocopy(gribjump_extraction_result_t* result, double*** values, unsigned long* nrange, unsigned long** nvalues);
int gribjump_result_mask(gribjump_extraction_result_t* result, unsigned long long*** masks, unsigned long* nrange, unsigned long** nmasks);
int gribjump_delete_result(gribjump_extraction_result_t* result);
int gribjump_new_axes(gj_axes_t** axes, const char* reqstr, gribjump_handle_t* gj);
int gribjump_new_axes(gj_axes_t** axes, const char* reqstr, int* level, const char* ctx, gribjump_handle_t* gj);
int gribjump_axes_keys(gj_axes_t* axes, const char*** keys_out, unsigned long* size);
int gribjump_axes_values(gj_axes_t* axes, const char* key, const char*** values_out, unsigned long* size);
int gribjump_delete_axes(gj_axes_t* axes);
Expand Down
14 changes: 8 additions & 6 deletions pygribjump/src/pygribjump/pygribjump.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,7 @@ def extract(self, polyrequest, ctx=None, dump=True):
nfields = ffi.new('unsigned long**')
nrequests = len(requests)
c_requests = ffi.new('gribjump_extraction_request_t*[]', [r.ctype for r in requests])
if (ctx):
logctx=str(ctx)
else:
logctx=""
logctx=str(ctx) if ctx else "pygribjump_extract"

logctx_c = ffi.new('const char[]', logctx.encode('ascii'))
lib.extract(self.__gribjump, c_requests, nrequests, results_array, nfields, logctx_c)
Expand Down Expand Up @@ -219,12 +216,17 @@ def extract_single(self, request):
]
return res

def axes(self, req):

def axes(self, req, level=3, ctx=None):
# note old axes used a dict in. This is now a string.
logctx=str(ctx) if ctx else "pygribjump_axes"
ctx_c = ffi.new('const char[]', logctx.encode('ascii'))
requeststr = dic_to_request(req)
newaxes = ffi.new('gj_axes_t**')
reqstr = ffi.new('const char[]', requeststr.encode('ascii'))
lib.gribjump_new_axes(newaxes, reqstr, self.__gribjump)
level_c = ffi.new('int*', level)
lib.gribjump_new_axes(newaxes, reqstr, level_c, ctx_c, self.__gribjump)

# TODO want to return a dict like:
# {key: [value1, value2, ...], ...}
# each key and value is a string
Expand Down
13 changes: 13 additions & 0 deletions pygribjump/tests/test_pygribjump.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,16 @@ def test_extract_simple_sunshine_case(read_only_fdb_setup) -> None:

actual = grib_jump.extract(polyrequest)
assert numpy.array_equal(expected, actual[0][0][0][0], equal_nan=True)

def test_axes(read_only_fdb_setup) -> None:
gribjump = pygj.GribJump()
req = {
"date": "20230508",
}
ax1 = gribjump.axes(req, level=1) # {'class': ['od'], 'date': ['20230508'], 'domain': ['g'], 'expver': ['0001'], 'stream': ['oper'], 'time': ['1200']}
ax2 = gribjump.axes(req, level=2) # {'class': ['od'], 'date': ['20230508'], 'domain': ['g'], 'expver': ['0001'], 'levtype': ['sfc'], 'stream': ['oper'], 'time': ['1200'], 'type': ['fc']}
ax3 = gribjump.axes(req, level=3) # {'class': ['od'], 'date': ['20230508'], 'domain': ['g'], 'expver': ['0001'], 'levelist': [''], 'levtype': ['sfc'], 'param': ['151130'], 'step': ['1'], 'stream': ['oper'], 'time': ['1200'], 'type': ['fc']}

assert len(ax1.keys()) == 6
assert len(ax2.keys()) == 8
assert len(ax3.keys()) == 11
1 change: 1 addition & 0 deletions src/gribjump/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ list( APPEND gribjump_srcs
ExtractionData.cc
ExtractionData.h
Metrics.h
Metrics.cc
Types.h
)

Expand Down
17 changes: 10 additions & 7 deletions src/gribjump/Engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ filemap_t Engine::buildFileMap(const ExtractionRequests& requests, ExItemMap& ke
}

const metkit::mars::MarsRequest req = unionRequest(marsrequests);
MetricsManager::instance().set("union_request", req.asString());
timer.reset("Gribjump Engine: Flattened requests and constructed union request");

filemap_t filemap = FDBLister::instance().fileMap(req, keyToExtractionItem);
Expand Down Expand Up @@ -248,14 +249,19 @@ void Engine::scheduleTasks(filemap_t& filemap){

ResultsMap Engine::extract(const ExtractionRequests& requests, bool flatten) {

eckit::Timer timer("Engine::extract");
ExItemMap keyToExtractionItem = buildKeyToExtractionItem(requests, flatten); // Owns the ExtractionItems
filemap_t filemap = buildFileMap(requests, keyToExtractionItem);
eckit::Timer timer("Engine::extract");
MetricsManager::instance().set("elapsed_build_filemap", timer.elapsed());
timer.reset("Gribjump Engine: Built file map");

scheduleTasks(filemap);
MetricsManager::instance().set("elapsed_tasks", timer.elapsed());
timer.reset("Gribjump Engine: All tasks finished");

ResultsMap results = collectResults(keyToExtractionItem);
MetricsManager::instance().set("elapsed_collect_results", timer.elapsed());

timer.reset("Gribjump Engine: Repackaged results");

return results;
Expand Down Expand Up @@ -309,8 +315,9 @@ size_t Engine::scan(std::vector<eckit::PathName> files) {
return files.size();
}

std::map<std::string, std::unordered_set<std::string> > Engine::axes(const std::string& request) {
return FDBLister::instance().axes(request);
std::map<std::string, std::unordered_set<std::string> > Engine::axes(const std::string& request, int level) {
MetricsManager::instance().set("request", request);
return FDBLister::instance().axes(request, level);
}

void Engine::reportErrors(eckit::Stream& client) {
Expand All @@ -320,10 +327,6 @@ void Engine::reportErrors(eckit::Stream& client) {
void Engine::raiseErrors() {
taskGroup_.raiseErrors();
}
void Engine::updateMetrics(Metrics& metrics) {
metrics.nTasks = taskGroup_.nTasks();
metrics.nFailedTasks = taskGroup_.nErrors();
}
//----------------------------------------------------------------------------------------------------------------------

} // namespace gribjump
3 changes: 1 addition & 2 deletions src/gribjump/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ class Engine {
size_t scan(const MarsRequests& requests, bool byfiles = false);
size_t scan(std::vector<eckit::PathName> files);

std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request);
std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request, int level=3);

void scheduleTasks(filemap_t& filemap);
void updateMetrics(Metrics& metrics);

void reportErrors(eckit::Stream& client_);
void raiseErrors();
Expand Down
19 changes: 12 additions & 7 deletions src/gribjump/GribJump.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ GribJump::GribJump() {
GribJump::~GribJump() {
}

size_t GribJump::scan(const std::vector<eckit::PathName>& paths) {
size_t GribJump::scan(const std::vector<eckit::PathName>& paths, const LogContext& ctx) {
ContextManager::instance().set(ctx);

if (paths.empty()) {
throw eckit::UserError("Paths must not be empty", Here());
Expand All @@ -37,7 +38,8 @@ size_t GribJump::scan(const std::vector<eckit::PathName>& paths) {
return ret;
}

size_t GribJump::scan(const std::vector<metkit::mars::MarsRequest> requests, bool byfiles) {
size_t GribJump::scan(const std::vector<metkit::mars::MarsRequest> requests, bool byfiles, const LogContext& ctx) {
ContextManager::instance().set(ctx);

if (requests.empty()) {
throw eckit::UserError("Requests must not be empty", Here());
Expand All @@ -48,17 +50,19 @@ size_t GribJump::scan(const std::vector<metkit::mars::MarsRequest> requests, boo
}


std::vector<std::vector<std::unique_ptr<ExtractionResult>>> GribJump::extract(const std::vector<ExtractionRequest>& requests, LogContext ctx) {
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> GribJump::extract(const std::vector<ExtractionRequest>& requests, const LogContext& ctx) {
ContextManager::instance().set(ctx);

if (requests.empty()) {
throw eckit::UserError("Requests must not be empty", Here());
}

std::vector<std::vector<std::unique_ptr<ExtractionResult>>> out = impl_->extract(requests, ctx); // ... why is this still using raw pointers? // why are we not using extraction items?
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> out = impl_->extract(requests); // why are we not using extraction items?
return out;
}

std::vector<std::unique_ptr<ExtractionItem>> GribJump::extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges) {
std::vector<std::unique_ptr<ExtractionItem>> GribJump::extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges, const LogContext& ctx) {
ContextManager::instance().set(ctx);

if (path.asString().empty()) {
throw eckit::UserError("Path must not be empty", Here());
Expand All @@ -74,13 +78,14 @@ std::vector<std::unique_ptr<ExtractionItem>> GribJump::extract(const eckit::Path
return out;
}

std::map<std::string, std::unordered_set<std::string>> GribJump::axes(const std::string& request) {
std::map<std::string, std::unordered_set<std::string>> GribJump::axes(const std::string& request, int level, const LogContext& ctx) {
ContextManager::instance().set(ctx);

if (request.empty()) {
throw eckit::UserError("Request string must not be empty", Here());
}

auto out = impl_->axes(request);
auto out = impl_->axes(request, level);
return out;
}

Expand Down
10 changes: 5 additions & 5 deletions src/gribjump/GribJump.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ class GribJump {

~GribJump();

size_t scan(const std::vector<eckit::PathName>& paths);
size_t scan(std::vector<metkit::mars::MarsRequest> requests, bool byfiles = false);
size_t scan(const std::vector<eckit::PathName>& paths, const LogContext& ctx=LogContext("none"));
size_t scan(std::vector<metkit::mars::MarsRequest> requests, bool byfiles = false, const LogContext& ctx=LogContext("none"));

std::vector<std::vector<std::unique_ptr<ExtractionResult>>> extract(const std::vector<ExtractionRequest>& requests, LogContext ctx=LogContext("none"));
std::vector<std::unique_ptr<ExtractionItem>> extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges);
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> extract(const std::vector<ExtractionRequest>& requests, const LogContext& ctx=LogContext("none"));
std::vector<std::unique_ptr<ExtractionItem>> extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges, const LogContext& ctx=LogContext("none"));

std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request);
std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request, int level=3, const LogContext& ctx=LogContext("none"));

void stats();

Expand Down
5 changes: 2 additions & 3 deletions src/gribjump/GribJumpBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,16 @@ class GribJumpBase : public eckit::NonCopyable {

virtual size_t scan(const std::vector<metkit::mars::MarsRequest> requests, bool byfiles) = 0;

virtual std::vector<std::vector<std::unique_ptr<ExtractionResult>>> extract(std::vector<ExtractionRequest>, LogContext ctx=LogContext("none")) = 0;
virtual std::vector<std::vector<std::unique_ptr<ExtractionResult>>> extract(std::vector<ExtractionRequest>) = 0;
virtual std::vector<std::unique_ptr<ExtractionItem>> extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges) = 0;

virtual std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request) = 0;
virtual std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request, int level) = 0;

virtual void stats();

protected: // members

Stats stats_;
LogContext ctx_;
};

} // namespace gribjump
8 changes: 4 additions & 4 deletions src/gribjump/Lister.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,20 +172,20 @@ std::map< eckit::PathName, eckit::OffsetList > FDBLister::filesOffsets(std::vect
return files;
}

std::map<std::string, std::unordered_set<std::string> > FDBLister::axes(const std::string& request) {
std::map<std::string, std::unordered_set<std::string> > FDBLister::axes(const std::string& request, int level) {
std::vector<fdb5::FDBToolRequest> requests = fdb5::FDBToolRequest::requestsFromString(request, std::vector<std::string>(), true);
ASSERT(requests.size() == 1); // i.e. assume string is a single request.

return axes(requests.front());
return axes(requests.front(), level);
}

std::map<std::string, std::unordered_set<std::string> > FDBLister::axes(const fdb5::FDBToolRequest& request) {
std::map<std::string, std::unordered_set<std::string> > FDBLister::axes(const fdb5::FDBToolRequest& request, int level) {
eckit::AutoLock<FDBLister> lock(this);
std::map<std::string, std::unordered_set<std::string>> values;

LOG_DEBUG_LIB(LibGribJump) << "Using FDB's (new) axes impl" << std::endl;

fdb5::IndexAxis ax = fdb_.axes(request);
fdb5::IndexAxis ax = fdb_.axes(request, level);
ax.sort();
std::map<std::string, eckit::DenseSet<std::string>> fdbValues = ax.map();

Expand Down
6 changes: 3 additions & 3 deletions src/gribjump/Lister.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Lister {
public:

virtual std::vector<eckit::URI> list(const std::vector<metkit::mars::MarsRequest> requests) = 0; // <-- May not want to use mars request?
virtual std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request) = 0 ;
virtual std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request, int level) = 0 ;

void lock() { mutex_.lock(); locked_ = true; }
void unlock() { mutex_.unlock(); locked_ = false; }
Expand All @@ -55,8 +55,8 @@ class FDBLister : public Lister {
static FDBLister& instance();

virtual std::vector<eckit::URI> list(const std::vector<metkit::mars::MarsRequest> requests) override;
virtual std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request) override;
virtual std::map<std::string, std::unordered_set<std::string> > axes(const fdb5::FDBToolRequest& request);
virtual std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request, int level) override;
virtual std::map<std::string, std::unordered_set<std::string> > axes(const fdb5::FDBToolRequest& request, int level);

filemap_t fileMap(const metkit::mars::MarsRequest& unionRequest, const ExItemMap& reqToXRR); // Used during extraction

Expand Down
6 changes: 3 additions & 3 deletions src/gribjump/LocalGribJump.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ std::vector<std::unique_ptr<ExtractionItem>> LocalGribJump::extract(const eckit:
}

/// @todo, change API, remove extraction request
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> LocalGribJump::extract(ExtractionRequests requests, LogContext ctx) {
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> LocalGribJump::extract(ExtractionRequests requests) {

bool flatten = true;
Engine engine;
Expand Down Expand Up @@ -120,13 +120,13 @@ ResultsMap LocalGribJump::extract(const std::vector<MarsRequest>& requests, cons
return results;
}

std::map<std::string, std::unordered_set<std::string>> LocalGribJump::axes(const std::string& request) {
std::map<std::string, std::unordered_set<std::string>> LocalGribJump::axes(const std::string& request, int level) {

// Note: This is likely to be removed from GribJump, and moved to FDB.
// Here for now to support polytope.

Engine engine;
return engine.axes(request);
return engine.axes(request, level);
}

static GribJumpBuilder<LocalGribJump> builder("local");
Expand Down
4 changes: 2 additions & 2 deletions src/gribjump/LocalGribJump.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ class LocalGribJump : public GribJumpBase {

// old API
std::vector<std::unique_ptr<ExtractionItem>> extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges) override;
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> extract(std::vector<ExtractionRequest>, LogContext ctx=LogContext("none")) override;
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> extract(std::vector<ExtractionRequest>) override;

std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request) override;
std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request, int level) override;

private:
};
Expand Down
Loading

0 comments on commit b49eaca

Please sign in to comment.