Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SCPITransport: Added dynamic rate limiting #910

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 58 additions & 18 deletions scopehal/SCPITransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,11 @@ SCPITransport* SCPITransport::CreateTransport(const string& transport, const str
@brief Pushes a command into the transmit FIFO then returns immediately.

This command will actually be sent the next time FlushCommandQueue() is called.

@param cmd Command to be sent
@param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´
*/
void SCPITransport::SendCommandQueued(const string& cmd)
void SCPITransport::SendCommandQueued(const string& cmd, std::chrono::milliseconds settle_time)
{
lock_guard<mutex> lock(m_queueMutex);

Expand Down Expand Up @@ -116,7 +119,8 @@ void SCPITransport::SendCommandQueued(const string& cmd)
auto it = m_txQueue.begin();
while(it != m_txQueue.end())
{
tmp = *it;
auto pair = *it;
tmp = pair.first;

//Split off subject, if we have one
//(ignore leading colon)
Expand All @@ -142,7 +146,7 @@ void SCPITransport::SendCommandQueued(const string& cmd)
{
LogTrace("Deduplicating redundant %s command %s and pushing new command %s\n",
ncmd.c_str(),
(*it).c_str(),
pair.first.c_str(),
cmd.c_str());

auto oldit = it;
Expand All @@ -159,18 +163,35 @@ void SCPITransport::SendCommandQueued(const string& cmd)

}

m_txQueue.push_back(cmd);
// Create a pair with cmd and settle_time
std::pair<std::string, std::chrono::milliseconds> pair;
pair = make_pair(cmd, settle_time);

// Push to queue
m_txQueue.push_back(pair);

LogTrace("%zu commands now queued\n", m_txQueue.size());
}

/**
@brief Block until it's time to send the next command when rate limiting.

@param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´
*/
void SCPITransport::RateLimitingWait()
void SCPITransport::RateLimitingWait(std::chrono::milliseconds settle_time)
{
this_thread::sleep_until(m_nextCommandReady);
m_nextCommandReady = chrono::system_clock::now() + m_rateLimitingInterval;

if(settle_time == std::chrono::milliseconds(0))
{
// Use the configured rate limit
m_nextCommandReady = chrono::system_clock::now() + m_rateLimitingInterval;
}
else
{
// Use the specified settle_time
m_nextCommandReady = chrono::system_clock::now() + settle_time;
}
}

/**
Expand All @@ -179,7 +200,7 @@ void SCPITransport::RateLimitingWait()
bool SCPITransport::FlushCommandQueue()
{
//Grab the queue, then immediately release the mutex so we can do more queued sends
list<string> tmp;
std::list<std::pair<std::string, std::chrono::milliseconds>> tmp;
{
lock_guard<mutex> lock(m_queueMutex);
tmp = std::move(m_txQueue);
Expand All @@ -190,11 +211,11 @@ bool SCPITransport::FlushCommandQueue()
LogTrace("%zu commands being flushed\n", tmp.size());

lock_guard<recursive_mutex> lock(m_netMutex);
for(auto str : tmp)
for(auto pair : tmp)
{
if(m_rateLimitingEnabled)
RateLimitingWait();
SendCommand(str);
RateLimitingWait(pair.second);
SendCommand(pair.first);
}
return true;
}
Expand All @@ -203,24 +224,34 @@ bool SCPITransport::FlushCommandQueue()
@brief Sends a command (flushing any pending/queued commands first), then returns the response.

This is an atomic operation requiring no mutexing at the caller side.

@param cmd Command to be sent
@param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´

@return A string with the reply
*/
string SCPITransport::SendCommandQueuedWithReply(string cmd, bool endOnSemicolon)
string SCPITransport::SendCommandQueuedWithReply(string cmd, bool endOnSemicolon, std::chrono::milliseconds settle_time)
{
FlushCommandQueue();
return SendCommandImmediateWithReply(cmd, endOnSemicolon);
return SendCommandImmediateWithReply(cmd, endOnSemicolon, settle_time);
}

/**
@brief Sends a command (jumping ahead of the queue), then returns the response.

This is an atomic operation requiring no mutexing at the caller side.

@param cmd Command to be sent
@param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´

@return A string with the reply
*/
string SCPITransport::SendCommandImmediateWithReply(string cmd, bool endOnSemicolon)
string SCPITransport::SendCommandImmediateWithReply(string cmd, bool endOnSemicolon, std::chrono::milliseconds settle_time)
{
lock_guard<recursive_mutex> lock(m_netMutex);

if(m_rateLimitingEnabled)
RateLimitingWait();
RateLimitingWait(settle_time);

SendCommand(cmd);

Expand All @@ -229,26 +260,35 @@ string SCPITransport::SendCommandImmediateWithReply(string cmd, bool endOnSemico

/**
@brief Sends a command (jumping ahead of the queue) which does not require a response.

@param cmd Command to be sent
@param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´
*/
void SCPITransport::SendCommandImmediate(string cmd)
void SCPITransport::SendCommandImmediate(string cmd, std::chrono::milliseconds settle_time)
{
lock_guard<recursive_mutex> lock(m_netMutex);

if(m_rateLimitingEnabled)
RateLimitingWait();
RateLimitingWait(settle_time);

SendCommand(cmd);
}

/**
@brief Sends a command (jumping ahead of the queue) which reads a binary block response

@param cmd Command to be sent
@param len A reference to a size_t that will get the number of bytes received written to it.
@param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´

@return A pointer to the reply buffer. This will need to be deleted manually.
*/
void* SCPITransport::SendCommandImmediateWithRawBlockReply(string cmd, size_t& len)
void* SCPITransport::SendCommandImmediateWithRawBlockReply(string cmd, size_t& len, std::chrono::milliseconds settle_time)
{
lock_guard<recursive_mutex> lock(m_netMutex);

if(m_rateLimitingEnabled)
RateLimitingWait();
RateLimitingWait(settle_time);
SendCommand(cmd);

//Read the length
Expand Down
27 changes: 20 additions & 7 deletions scopehal/SCPITransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ class SCPITransport
TODO: look into a background thread or something that's automatically launched by the transport to do this
after some kind of fixed timeout?
*/
void SendCommandQueued(const std::string& cmd);
std::string SendCommandQueuedWithReply(std::string cmd, bool endOnSemicolon = true);
void SendCommandImmediate(std::string cmd);
std::string SendCommandImmediateWithReply(std::string cmd, bool endOnSemicolon = true);
void* SendCommandImmediateWithRawBlockReply(std::string cmd, size_t& len);
void SendCommandQueued(const std::string& cmd, std::chrono::milliseconds settle_time = std::chrono::milliseconds(0));
std::string SendCommandQueuedWithReply(std::string cmd, bool endOnSemicolon = true, std::chrono::milliseconds settle_time = std::chrono::milliseconds(0));
void SendCommandImmediate(std::string cmd, std::chrono::milliseconds settle_time = std::chrono::milliseconds(0));
std::string SendCommandImmediateWithReply(std::string cmd, bool endOnSemicolon = true, std::chrono::milliseconds settle_time = std::chrono::milliseconds(0));
void* SendCommandImmediateWithRawBlockReply(std::string cmd, size_t& len, std::chrono::milliseconds settle_time = std::chrono::milliseconds(0));
bool FlushCommandQueue();

//Manual mutex locking for ReadRawData() etc
Expand All @@ -89,6 +89,19 @@ class SCPITransport
should be used if at all possible.

Once rate limiting is enabled on a transport, it cannot be disabled.

Invidual commands can be rate limited with the parameter `settle_time` in each Send*() call. If `settle_time`
is set to 0 (default value) it will default to the time specified in the rate limiting (if enabled). If
`settle_time` is set to anything else than 0, then this time will be used to block all subsequent message for
the specified amount of time.

Note that `settle_time` will always override the rate limit, even when a lower value is used.

When using `settle_time` on a write only call, it will block for the specified amount of time after the command
is sent.

When using `settle_time` on a request, the message will be sent, a reply will be read back immidiately, and
then the blocking will take place as the last step.
*/
void EnableRateLimiting(std::chrono::milliseconds interval)
{
Expand Down Expand Up @@ -126,7 +139,7 @@ class SCPITransport
static SCPITransport* CreateTransport(const std::string& transport, const std::string& args);

protected:
void RateLimitingWait();
void RateLimitingWait(std::chrono::milliseconds settle_time = std::chrono::milliseconds(0));

//Class enumeration
typedef std::map< std::string, CreateProcType > CreateMapType;
Expand All @@ -135,7 +148,7 @@ class SCPITransport
//Queued commands waiting to be sent
std::mutex m_queueMutex;
std::recursive_mutex m_netMutex;
std::list<std::string> m_txQueue;
std::list<std::pair<std::string, std::chrono::milliseconds>> m_txQueue;

//Set of commands that are OK to deduplicate
std::set<std::string> m_dedupCommands;
Expand Down