Skip to content

Commit

Permalink
Merge pull request #427 from mmd-osm/feature/stream_to
Browse files Browse the repository at this point in the history
Enable libpqxx stream_to for selected tables
  • Loading branch information
mmd-osm authored Jul 26, 2024
2 parents fee59ae + 0a871a7 commit 1e82c83
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 2 deletions.
57 changes: 57 additions & 0 deletions include/cgimap/backend/apidb/transaction_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,57 @@

#include <iostream>

#if PQXX_VERSION_MAJOR >= 7

class Stream_Wrapper
{
public:
Stream_Wrapper(pqxx::transaction_base &txn, std::string_view table,
std::string_view columns) :

m_stream(pqxx::stream_to::raw_table(txn, table, columns)), m_table(table), m_start(
std::chrono::steady_clock::now())
{
}

~Stream_Wrapper()
{
if (m_stream)
m_stream.complete();
}

template< typename ... Ts > void write_values(Ts const &...fields)
{
m_stream.write_values(std::forward<Ts const& >(fields)...);
row_count++;
}

void complete()
{
m_stream.complete();
log_stats();
}

int row_count = 0;

private:

void log_stats()
{
const auto end = std::chrono::steady_clock::now();
const auto elapsed = std::chrono::duration_cast < std::chrono::milliseconds > (end - m_start);

logger::message(fmt::format(
"Executed COPY statement for table {} in {:d} ms, inserted {:d} rows",
m_table, elapsed.count(), row_count));
}

pqxx::stream_to m_stream;
const std::string_view m_table;
const std::chrono::steady_clock::time_point m_start;

};
#endif

class Transaction_Owner_Base
{
Expand Down Expand Up @@ -102,6 +153,12 @@ class Transaction_Manager {
return res;
}

#if PQXX_VERSION_MAJOR >= 7
Stream_Wrapper to_stream(std::string_view table, std::string_view columns) {
return Stream_Wrapper(m_txn, table, columns);
}
#endif

private:
pqxx::transaction_base & m_txn;
std::set<std::string>& m_prep_stmt;
Expand Down
19 changes: 19 additions & 0 deletions src/backend/apidb/changeset_upload/node_updater.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,8 @@ std::vector<osm_nwr_id_t> ApiDB_Node_Updater::insert_new_current_node_tags(
if (nodes.empty())
return {};

#if PQXX_VERSION_MAJOR < 7

m.prepare("insert_new_current_node_tags",

R"(
Expand Down Expand Up @@ -712,6 +714,23 @@ std::vector<osm_nwr_id_t> ApiDB_Node_Updater::insert_new_current_node_tags(
if (r.affected_rows() != total_tags)
throw http::server_error("Could not create new current node tags");

#else

std::vector<osm_nwr_id_t> ids;

auto stream = m.to_stream("current_node_tags", "node_id, k, v");

for (const auto &node : nodes) {
for (const auto &tag : node.tags) {
stream.write_values(node.id, tag.first, tag.second);
ids.emplace_back(node.id);
}
}

stream.complete();

#endif

// prepare list of node ids with tags
std::sort(ids.begin(), ids.end());
ids.erase(std::unique(ids.begin(), ids.end()), ids.end());
Expand Down
36 changes: 36 additions & 0 deletions src/backend/apidb/changeset_upload/relation_updater.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,8 @@ std::vector<osm_nwr_id_t> ApiDB_Relation_Updater::insert_new_current_relation_t
if (relations.empty())
return {};

#if PQXX_VERSION_MAJOR < 7

m.prepare("insert_new_current_relation_tags",

R"(
Expand Down Expand Up @@ -1333,10 +1335,29 @@ std::vector<osm_nwr_id_t> ApiDB_Relation_Updater::insert_new_current_relation_t
if (r.affected_rows() != total_tags)
throw http::server_error("Could not create new current relation tags");


#else

std::vector<osm_nwr_id_t> ids;

auto stream = m.to_stream("current_relation_tags", "relation_id, k, v");

for (const auto &relation : relations) {
for (const auto &tag : relation.tags) {
stream.write_values(relation.id, tag.first, tag.second);
ids.emplace_back(relation.id);
}
}

stream.complete();

#endif

// prepare list of relation ids with tags
std::sort(ids.begin(), ids.end());
ids.erase(std::unique(ids.begin(), ids.end()), ids.end());
return ids;

}

void ApiDB_Relation_Updater::insert_new_current_relation_members(
Expand All @@ -1345,6 +1366,8 @@ void ApiDB_Relation_Updater::insert_new_current_relation_members(
if (relations.empty())
return;

#if PQXX_VERSION_MAJOR < 7

m.prepare("insert_new_current_relation_members",

R"(
Expand Down Expand Up @@ -1378,6 +1401,19 @@ void ApiDB_Relation_Updater::insert_new_current_relation_members(

pqxx::result r = m.exec_prepared("insert_new_current_relation_members",
ids, membertypes, memberids, memberroles, sequenceids);
#else

auto stream = m.to_stream("current_relation_members", "relation_id, member_type, member_id, member_role, sequence_id");

for (const auto &relation : relations) {
for (const auto &member : relation.members) {
stream.write_values(relation.id, member.member_type, member.member_id, member.member_role, member.sequence_id);
}
}

stream.complete();

#endif
}

void ApiDB_Relation_Updater::save_current_relations_to_history(
Expand Down
38 changes: 36 additions & 2 deletions src/backend/apidb/changeset_upload/way_updater.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -693,8 +693,10 @@ void ApiDB_Way_Updater::update_current_ways(const std::vector<way_t> &ways,
std::vector<osm_nwr_id_t> ApiDB_Way_Updater::insert_new_current_way_tags(
const std::vector<way_t> &ways) {

if (ways.empty())
return {};
if (ways.empty())
return {};

#if PQXX_VERSION_MAJOR < 7

m.prepare("insert_new_current_way_tags",

Expand Down Expand Up @@ -733,6 +735,23 @@ std::vector<osm_nwr_id_t> ApiDB_Way_Updater::insert_new_current_way_tags(
if (r.affected_rows() != total_tags)
throw http::server_error("Could not create new current way tags");

#else

std::vector<osm_nwr_id_t> ids;

auto stream = m.to_stream("current_way_tags", "way_id, k, v");

for (const auto &way : ways) {
for (const auto &tag : way.tags) {
stream.write_values(way.id, tag.first, tag.second);
ids.emplace_back(way.id);
}
}

stream.complete();

#endif

// prepare list of way ids with tags
std::sort(ids.begin(), ids.end());
ids.erase(std::unique(ids.begin(), ids.end()), ids.end());
Expand All @@ -745,6 +764,8 @@ void ApiDB_Way_Updater::insert_new_current_way_nodes(
if (ways.empty())
return;

#if PQXX_VERSION_MAJOR < 7

m.prepare("insert_new_current_way_nodes",

R"(
Expand Down Expand Up @@ -772,6 +793,19 @@ void ApiDB_Way_Updater::insert_new_current_way_nodes(

pqxx::result r =
m.exec_prepared("insert_new_current_way_nodes", ids, nodeids, sequenceids);
#else

auto stream = m.to_stream("current_way_nodes", "way_id, node_id, sequence_id");

for (const auto &way : ways) {
for (const auto &wn : way.way_nodes) {
stream.write_values(way.id, wn.node_id, wn.sequence_id);
}
}

stream.complete();

#endif
}

void ApiDB_Way_Updater::save_current_ways_to_history(
Expand Down

0 comments on commit 1e82c83

Please sign in to comment.