Skip to content

Commit

Permalink
New p4orch development changes (#3066)
Browse files Browse the repository at this point in the history
* New p4orch development for performance.

This is the first PR for upstreaming the recent P4Orch changes (has been a while since the last time).
The main changes include:

Add new status code SWSS_RC_NOT_EXECUTED
Enable redis pipeline in response publisher for P4Orch (performance improvement)
Enable background thread of write to db in response publisher for P4Orch (performance improvement)
P4Orch writes to APPL DB instead of APPL STATE DB for responses (prepare for the zmq change, APPL DB will be used as APPL STATE DB in P4Orch for performance improvement)
  • Loading branch information
mint570 authored May 15, 2024
1 parent 26a5a1c commit c36333c
Show file tree
Hide file tree
Showing 16 changed files with 176 additions and 898 deletions.
2 changes: 1 addition & 1 deletion cfgmgr/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ CFLAGS_SAI = -I /usr/include/sai
LIBNL_CFLAGS = -I/usr/include/libnl3
LIBNL_LIBS = -lnl-genl-3 -lnl-route-3 -lnl-3
SAIMETA_LIBS = -lsaimeta -lsaimetadata -lzmq
COMMON_LIBS = -lswsscommon
COMMON_LIBS = -lswsscommon -lpthread

bin_PROGRAMS = vlanmgrd teammgrd portmgrd intfmgrd buffermgrd vrfmgrd nbrmgrd vxlanmgrd sflowmgrd natmgrd coppmgrd tunnelmgrd macsecmgrd fabricmgrd

Expand Down
2 changes: 1 addition & 1 deletion orchagent/orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ class Orch
void addExecutor(Executor* executor);
Executor *getExecutor(std::string executorName);

ResponsePublisher m_publisher;
ResponsePublisher m_publisher{"APPL_STATE_DB"};
private:
void addConsumer(swss::DBConnector *db, std::string tableName, int pri = default_orch_pri);
};
Expand Down
2 changes: 2 additions & 0 deletions orchagent/p4orch/p4orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ void P4Orch::doTask(Consumer &consumer)
{
manager->drain();
}

m_publisher.flush();
}

void P4Orch::doTask(swss::SelectableTimer &timer)
Expand Down
3 changes: 3 additions & 0 deletions orchagent/p4orch/p4orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,8 @@ class P4Orch : public Orch
// Notification consumer for port state change
swss::NotificationConsumer *m_portStatusNotificationConsumer;

// Sepcial publisher that writes to APPL DB instead of APPL STATE DB.
ResponsePublisher m_publisher{"APPL_DB", /*bool buffered=*/true, /*db_write_thread=*/true};

friend class p4orch::test::WcmpManagerTest;
};
1 change: 1 addition & 0 deletions orchagent/p4orch/tests/return_code_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ TEST(ReturnCodeTest, SaiCodeToReturnCodeMapping)
{SAI_STATUS_TABLE_FULL, StatusCode::SWSS_RC_FULL},
{SAI_STATUS_NOT_IMPLEMENTED, StatusCode::SWSS_RC_UNIMPLEMENTED},
{SAI_STATUS_OBJECT_IN_USE, StatusCode::SWSS_RC_IN_USE},
{SAI_STATUS_NOT_EXECUTED, StatusCode::SWSS_RC_NOT_EXECUTED},
{SAI_STATUS_FAILURE, StatusCode::SWSS_RC_UNKNOWN},
{SAI_STATUS_INVALID_ATTRIBUTE_0, StatusCode::SWSS_RC_INVALID_PARAM},
{SAI_STATUS_INVALID_ATTRIBUTE_10, StatusCode::SWSS_RC_INVALID_PARAM},
Expand Down
120 changes: 103 additions & 17 deletions orchagent/response_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,45 @@ void RecordResponse(const std::string &response_channel, const std::string &key,

} // namespace

ResponsePublisher::ResponsePublisher(bool buffered)
: m_db(std::make_unique<swss::DBConnector>("APPL_STATE_DB", 0)),
m_pipe(std::make_unique<swss::RedisPipeline>(m_db.get())), m_buffered(buffered)
ResponsePublisher::ResponsePublisher(const std::string &dbName, bool buffered, bool db_write_thread)
: m_db(std::make_unique<swss::DBConnector>(dbName, 0)), m_buffered(buffered)
{
if (m_buffered)
{
m_ntf_pipe = std::make_unique<swss::RedisPipeline>(m_db.get());
m_db_pipe = std::make_unique<swss::RedisPipeline>(m_db.get());
}
else
{
m_ntf_pipe = std::make_unique<swss::RedisPipeline>(m_db.get(), 1);
m_db_pipe = std::make_unique<swss::RedisPipeline>(m_db.get(), 1);
}
if (db_write_thread)
{
m_update_thread = std::unique_ptr<std::thread>(new std::thread(&ResponsePublisher::dbUpdateThread, this));
}
}

void ResponsePublisher::publish(const std::string &table, const std::string &key,
const std::vector<swss::FieldValueTuple> &intent_attrs, const ReturnCode &status,
const std::vector<swss::FieldValueTuple> &state_attrs, bool replace)
ResponsePublisher::~ResponsePublisher()
{
// Write to the DB only if:
// 1) A write operation is being performed and state attributes are specified.
// 2) A successful delete operation.
if ((intent_attrs.size() && state_attrs.size()) || (status.ok() && !intent_attrs.size()))
if (m_update_thread != nullptr)
{
writeToDB(table, key, state_attrs, intent_attrs.size() ? SET_COMMAND : DEL_COMMAND, replace);
{
std::lock_guard<std::mutex> lock(m_lock);
m_queue.emplace(/*table=*/"", /*key=*/"", /*values =*/std::vector<swss::FieldValueTuple>{}, /*op=*/"",
/*replace=*/false, /*flush=*/false, /*shutdown=*/true);
}
m_signal.notify_one();
m_update_thread->join();
}
}

void ResponsePublisher::publish(const std::string &table, const std::string &key,
const std::vector<swss::FieldValueTuple> &intent_attrs, const ReturnCode &status,
const std::vector<swss::FieldValueTuple> &state_attrs, bool replace)
{
std::string response_channel = "APPL_DB_" + table + "_RESPONSE_CHANNEL";
swss::NotificationProducer notificationProducer{m_pipe.get(), response_channel, m_buffered};
swss::NotificationProducer notificationProducer{m_ntf_pipe.get(), response_channel, m_buffered};

auto intent_attrs_copy = intent_attrs;
// Add error message as the first field-value-pair.
Expand All @@ -92,6 +111,14 @@ void ResponsePublisher::publish(const std::string &table, const std::string &key
// Sends the response to the notification channel.
notificationProducer.send(status.codeStr(), key, intent_attrs_copy);
RecordResponse(response_channel, key, intent_attrs_copy, status.codeStr());

// Write to the DB only if:
// 1) A write operation is being performed and state attributes are specified.
// 2) A successful delete operation.
if ((intent_attrs.size() && state_attrs.size()) || (status.ok() && !intent_attrs.size()))
{
writeToDB(table, key, state_attrs, intent_attrs.size() ? SET_COMMAND : DEL_COMMAND, replace);
}
}

void ResponsePublisher::publish(const std::string &table, const std::string &key,
Expand All @@ -113,7 +140,26 @@ void ResponsePublisher::publish(const std::string &table, const std::string &key
void ResponsePublisher::writeToDB(const std::string &table, const std::string &key,
const std::vector<swss::FieldValueTuple> &values, const std::string &op, bool replace)
{
swss::Table applStateTable{m_pipe.get(), table, m_buffered};
if (m_update_thread != nullptr)
{
{
std::lock_guard<std::mutex> lock(m_lock);
m_queue.emplace(table, key, values, op, replace, /*flush=*/false, /*shutdown=*/false);
}
m_signal.notify_one();
}
else
{
writeToDBInternal(table, key, values, op, replace);
}
RecordDBWrite(table, key, values, op);
}

void ResponsePublisher::writeToDBInternal(const std::string &table, const std::string &key,
const std::vector<swss::FieldValueTuple> &values, const std::string &op,
bool replace)
{
swss::Table applStateTable{m_db_pipe.get(), table, m_buffered};

auto attrs = values;
if (op == SET_COMMAND)
Expand All @@ -133,7 +179,6 @@ void ResponsePublisher::writeToDB(const std::string &table, const std::string &k
if (!applStateTable.get(key, fv))
{
applStateTable.set(key, attrs);
RecordDBWrite(table, key, attrs, op);
return;
}
for (auto it = attrs.cbegin(); it != attrs.cend();)
Expand All @@ -150,22 +195,63 @@ void ResponsePublisher::writeToDB(const std::string &table, const std::string &k
if (attrs.size())
{
applStateTable.set(key, attrs);
RecordDBWrite(table, key, attrs, op);
}
}
else if (op == DEL_COMMAND)
{
applStateTable.del(key);
RecordDBWrite(table, key, {}, op);
}
}

void ResponsePublisher::flush()
{
m_pipe->flush();
m_ntf_pipe->flush();
if (m_update_thread != nullptr)
{
{
std::lock_guard<std::mutex> lock(m_lock);
m_queue.emplace(/*table=*/"", /*key=*/"", /*values =*/std::vector<swss::FieldValueTuple>{}, /*op=*/"",
/*replace=*/false, /*flush=*/true, /*shutdown=*/false);
}
m_signal.notify_one();
}
else
{
m_db_pipe->flush();
}
}

void ResponsePublisher::setBuffered(bool buffered)
{
m_buffered = buffered;
}

void ResponsePublisher::dbUpdateThread()
{
while (true)
{
entry e;
{
std::unique_lock<std::mutex> lock(m_lock);
while (m_queue.empty())
{
m_signal.wait(lock);
}

e = m_queue.front();
m_queue.pop();
}
if (e.shutdown)
{
break;
}
if (e.flush)
{
m_db_pipe->flush();
}
else
{
writeToDBInternal(e.table, e.key, e.values, e.op, e.replace);
}
}
}
41 changes: 38 additions & 3 deletions orchagent/response_publisher.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
#pragma once

#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

Expand All @@ -17,9 +21,9 @@
class ResponsePublisher : public ResponsePublisherInterface
{
public:
explicit ResponsePublisher(bool buffered = false);
explicit ResponsePublisher(const std::string &dbName, bool buffered = false, bool db_write_thread = false);

virtual ~ResponsePublisher() = default;
virtual ~ResponsePublisher();

// Intent attributes are the attributes sent in the notification into the
// redis channel.
Expand Down Expand Up @@ -57,8 +61,39 @@ class ResponsePublisher : public ResponsePublisherInterface
void setBuffered(bool buffered);

private:
struct entry
{
std::string table;
std::string key;
std::vector<swss::FieldValueTuple> values;
std::string op;
bool replace;
bool flush;
bool shutdown;

entry()
{
}

entry(const std::string &table, const std::string &key, const std::vector<swss::FieldValueTuple> &values,
const std::string &op, bool replace, bool flush, bool shutdown)
: table(table), key(key), values(values), op(op), replace(replace), flush(flush), shutdown(shutdown)
{
}
};

void dbUpdateThread();
void writeToDBInternal(const std::string &table, const std::string &key,
const std::vector<swss::FieldValueTuple> &values, const std::string &op, bool replace);

std::unique_ptr<swss::DBConnector> m_db;
std::unique_ptr<swss::RedisPipeline> m_pipe;
std::unique_ptr<swss::RedisPipeline> m_ntf_pipe;
std::unique_ptr<swss::RedisPipeline> m_db_pipe;

bool m_buffered{false};
// Thread to write to DB.
std::unique_ptr<std::thread> m_update_thread;
std::queue<entry> m_queue;
mutable std::mutex m_lock;
std::condition_variable m_signal;
};
36 changes: 19 additions & 17 deletions orchagent/return_code.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,24 @@ class ReturnCode
ReturnCode(const sai_status_t &status, const std::string &message = "")
: stream_(std::ios_base::out | std::ios_base::ate), is_sai_(true)
{
if (m_saiStatusCodeLookup.find(status) == m_saiStatusCodeLookup.end())
// Non-ranged SAI codes that are not included in this lookup map will map to
// SWSS_RC_UNKNOWN. This includes the general SAI failure:
// SAI_STATUS_FAILURE.
static const auto *const saiStatusCodeLookup = new std::unordered_map<sai_status_t, StatusCode>({
{SAI_STATUS_SUCCESS, StatusCode::SWSS_RC_SUCCESS},
{SAI_STATUS_NOT_SUPPORTED, StatusCode::SWSS_RC_UNIMPLEMENTED},
{SAI_STATUS_NO_MEMORY, StatusCode::SWSS_RC_NO_MEMORY},
{SAI_STATUS_INSUFFICIENT_RESOURCES, StatusCode::SWSS_RC_FULL},
{SAI_STATUS_INVALID_PARAMETER, StatusCode::SWSS_RC_INVALID_PARAM},
{SAI_STATUS_ITEM_ALREADY_EXISTS, StatusCode::SWSS_RC_EXISTS},
{SAI_STATUS_ITEM_NOT_FOUND, StatusCode::SWSS_RC_NOT_FOUND},
{SAI_STATUS_TABLE_FULL, StatusCode::SWSS_RC_FULL},
{SAI_STATUS_NOT_IMPLEMENTED, StatusCode::SWSS_RC_UNIMPLEMENTED},
{SAI_STATUS_OBJECT_IN_USE, StatusCode::SWSS_RC_IN_USE},
{SAI_STATUS_NOT_EXECUTED, StatusCode::SWSS_RC_NOT_EXECUTED},
});

if (saiStatusCodeLookup->find(status) == saiStatusCodeLookup->end())
{
// Check for ranged SAI codes.
if (SAI_RANGED_STATUS_IS_INVALID_ATTRIBUTE(status))
Expand Down Expand Up @@ -207,7 +224,7 @@ class ReturnCode
}
else
{
status_ = m_saiStatusCodeLookup[status];
status_ = saiStatusCodeLookup->at(status);
}
stream_ << message;
}
Expand Down Expand Up @@ -298,21 +315,6 @@ class ReturnCode
}

private:
// Non-ranged SAI codes that are not included in this lookup map will map to
// SWSS_RC_UNKNOWN. This includes the general SAI failure: SAI_STATUS_FAILURE.
std::unordered_map<sai_status_t, StatusCode> m_saiStatusCodeLookup = {
{SAI_STATUS_SUCCESS, StatusCode::SWSS_RC_SUCCESS},
{SAI_STATUS_NOT_SUPPORTED, StatusCode::SWSS_RC_UNIMPLEMENTED},
{SAI_STATUS_NO_MEMORY, StatusCode::SWSS_RC_NO_MEMORY},
{SAI_STATUS_INSUFFICIENT_RESOURCES, StatusCode::SWSS_RC_FULL},
{SAI_STATUS_INVALID_PARAMETER, StatusCode::SWSS_RC_INVALID_PARAM},
{SAI_STATUS_ITEM_ALREADY_EXISTS, StatusCode::SWSS_RC_EXISTS},
{SAI_STATUS_ITEM_NOT_FOUND, StatusCode::SWSS_RC_NOT_FOUND},
{SAI_STATUS_TABLE_FULL, StatusCode::SWSS_RC_FULL},
{SAI_STATUS_NOT_IMPLEMENTED, StatusCode::SWSS_RC_UNIMPLEMENTED},
{SAI_STATUS_OBJECT_IN_USE, StatusCode::SWSS_RC_IN_USE},
};

StatusCode status_;
std::stringstream stream_;
// Whether the ReturnCode is generated from a SAI status code or not.
Expand Down
5 changes: 4 additions & 1 deletion tests/mock_tests/fake_response_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
* when needed to test code that uses response publisher. */
std::unique_ptr<MockResponsePublisher> gMockResponsePublisher;

ResponsePublisher::ResponsePublisher(bool buffered) : m_db(std::make_unique<swss::DBConnector>("APPL_STATE_DB", 0)), m_buffered(buffered) {}
ResponsePublisher::ResponsePublisher(const std::string& dbName, bool buffered, bool db_write_thread) :
m_db(std::make_unique<swss::DBConnector>(dbName, 0)), m_buffered(buffered) {}

ResponsePublisher::~ResponsePublisher() {}

void ResponsePublisher::publish(
const std::string& table, const std::string& key,
Expand Down
4 changes: 2 additions & 2 deletions tests/mock_tests/response_publisher/response_publisher_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ TEST(ResponsePublisher, TestPublish)
DBConnector conn{"APPL_STATE_DB", 0};
Table stateTable{&conn, "SOME_TABLE"};
std::string value;
ResponsePublisher publisher{};
ResponsePublisher publisher{"APPL_STATE_DB"};

publisher.publish("SOME_TABLE", "SOME_KEY", {{"field", "value"}}, ReturnCode(SAI_STATUS_SUCCESS));
ASSERT_TRUE(stateTable.hget("SOME_KEY", "field", value));
Expand All @@ -21,7 +21,7 @@ TEST(ResponsePublisher, TestPublishBuffered)
DBConnector conn{"APPL_STATE_DB", 0};
Table stateTable{&conn, "SOME_TABLE"};
std::string value;
ResponsePublisher publisher{};
ResponsePublisher publisher{"APPL_STATE_DB"};

publisher.setBuffered(true);

Expand Down
Loading

0 comments on commit c36333c

Please sign in to comment.