Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/socketio #14

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions ringbuffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ namespace jnk0le
}

/*!
* \brief For debug purposes. Outputs data_buff array in full
*/
void print()
Copy link

@drewr95 drewr95 Aug 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. This will produce a compiler error for files that include this header that do not include <iostream> before including this header
  2. Since this library is OS independent, embedded applications could use this, and <string> is notoriously large and not typically used, which is included by <iostream>

{
std::cout << "{ " << data_buff[0];
for (unsigned i = 1; i < buffer_size; i++)
std::cout << ", " << data_buff[i];
std::cout << " };" << std::endl;
}

/*!
* \brief Check if buffer is empty
* \return True if buffer is empty
*/
Expand All @@ -86,6 +97,24 @@ namespace jnk0le
return head.load(index_acquire_barrier) - tail.load(std::memory_order_relaxed);
}

/*!
* \brief Calculates the number of elements that can be read
* from a continous memory segment starting at peek()
* \return Number of elements that can be read as a continous memory segment
*/
index_t readAvailableContinous(void) const {
index_t tmp_head = head.load(index_acquire_barrier);
index_t tmp_tail = tail.load(std::memory_order_relaxed);
index_t head_trunc = tmp_head & buffer_mask;
index_t tail_trunc = tmp_tail & buffer_mask;

if (head_trunc > tail_trunc)
return head_trunc - tail_trunc;
if (head_trunc < tail_trunc || isFull())
return buffer_size - tail_trunc;
return 0;
}

/*!
* \brief Check how many elements can be written into the buffer
* \return Number of free slots that can be be written
Expand All @@ -94,6 +123,24 @@ namespace jnk0le
return buffer_size - (head.load(std::memory_order_relaxed) - tail.load(index_acquire_barrier));
}

/*!
* \brief Calculates then number of elements that can be written
* into a continous memory segment starting at space()
* \return Number of elements that can be written as a continous memory segment
*/
index_t writeAvailableContinous(void) const {
index_t tmp_head = head.load(index_acquire_barrier);
index_t tmp_tail = tail.load(std::memory_order_relaxed);
index_t head_trunc = tmp_head & buffer_mask;
index_t tail_trunc = tmp_tail & buffer_mask;

if (head_trunc < tail_trunc)
return tail_trunc - head_trunc;
if (head_trunc > tail_trunc || isEmpty())
return buffer_size - head_trunc;
return 0;
}

/*!
* \brief Inserts data into internal buffer, without blocking
* \param data element to be inserted into internal buffer
Expand Down Expand Up @@ -190,6 +237,25 @@ namespace jnk0le
return cnt;
}

/*!
* \brief Bulk insert accounts for inserts performed via
* external function writeBuff
* \param cnt Number of elements inserted
* \return Number of actually inserted elements as per
* buffer capacity
*/
index_t bulkInsert(size_t cnt) {
index_t tmp_head = head.load(std::memory_order_relaxed);
index_t tmp_tail = tail.load(std::memory_order_relaxed);

// It is an error to insert more elements than
// available. If so truncate insert.
size_t inserted = cnt < writeAvailable() ? cnt : writeAvailable();
tmp_head += inserted;
head.store(tmp_head, index_release_barrier);
return inserted;
}

/*!
* \brief Reads one element from internal buffer without blocking
* \param[out] data Reference to memory location where removed element will be stored
Expand Down Expand Up @@ -235,6 +301,18 @@ namespace jnk0le
}

/*!
* \brief Gets the first empty position in the buffer on consumed side
*
* It is safe to use and modify item contents only on consumer side
*
* \return Pointer to first empty position, nullptr if buffer is full
*/
T* space() {
index_t tmp_head = head.load(std::memory_order_relaxed);
return isFull() ? nullptr : &data_buff[tmp_head & buffer_mask];
}

/*!
* \brief Gets the n'th element on consumed side
*
* It is safe to use and modify item contents only on consumer side
Expand Down Expand Up @@ -466,6 +544,45 @@ namespace jnk0le
return read;
}

using PosixRead = int (*)(int, void *, size_t );
using PosixWrite = int (*)(int, void const *, size_t );

template<typename T, size_t buffer_size, bool fake_tso, size_t cacheline_size, typename index_t>
int writeBuff(Ringbuffer<T, buffer_size, fake_tso, cacheline_size, index_t> & buff,
PosixRead fread, int file_descriptor)
{
unsigned total = 0;
while (!buff.isFull())
{
int res = fread(file_descriptor, buff.space(), buff.writeAvailableContinous());
if (res == 0)
break;
if (res < 0)
return res;
total += static_cast<unsigned>(res);
buff.bulkInsert(res);
}
return static_cast<int>(total);
}

template<typename T, size_t buffer_size, bool fake_tso, size_t cacheline_size, typename index_t>
int readBuff(Ringbuffer<T, buffer_size, fake_tso, cacheline_size, index_t> & buff,
PosixWrite fwrite, int file_descriptor)
{
unsigned total = 0;
while (!buff.isEmpty())
{
int res = fwrite(file_descriptor, buff.peek(), buff.readAvailableContinous());
if (res == 0)
break;
if (res < 0)
return res;
total += static_cast<unsigned>(res);
buff.remove(res);
}
return static_cast<int>(total);
}

} // namespace

#endif //RINGBUFFER_HPP
151 changes: 151 additions & 0 deletions test_posix_api.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#include <algorithm>
#include <iostream>
#include <cassert>

#include <sys/types.h>

#include "ringbuffer.hpp"

using namespace jnk0le;

/* Variable that defines the simulated bufsize for sockets */
static unsigned bufsize = 0;

/* Emulates a socket read function */
int test_read( int fd, void * data, size_t length )
{
size_t ret = std::min( bufsize, unsigned(length));
uint8_t * iter = static_cast<uint8_t*>(data);
std::cout << "Writing " << ret << " bytes into ring buffer: ";
std::cout << "{ ";
if ( ret > 0 )
std::cout << char(fd);
for ( uint8_t i = 0; i < ret; ++i )
{
* iter = static_cast<uint8_t>(fd);
std::cout << ", " << char(fd);
iter++;
}
std::cout << " }\n";
bufsize -= ret;
return ret;
}

/* Emulates a socket write function */
int test_write( int fd, void const * data, size_t length )
{
size_t ret = std::min( bufsize, unsigned(length));
std::cout << "Reading " << ret << " bytes from ring buffer: ";
std::cout << "{ ";
uint8_t * beg = (uint8_t*)data;
uint8_t * end = beg + ret;
if ( beg < end )
std::cout << *beg++;
for ( uint8_t * i = beg; i < end; ++i )
{
std::cout << ", " << *i;
}
std::cout << " }\n";
bufsize -= ret;
return ret;
}

/* Print ringbuffer content for debug purposes */
template<typename RB>
void print_ringbuf(RB & rb )
{
std::cout << "{ ";
if (!rb.isEmpty())
{
std::cout << *rb.at(0);
for (unsigned i = 1; i < rb.readAvailable();++i)
{
std::cout << ", " << *rb.at(i);
}
}
std::cout << " };" << std::endl;
}

int main()
{
Ringbuffer<uint8_t, 16> buf;

std::cout << "Write 22 elements to buffer (fills up buffer compeltely)\n";
bufsize = 22;
ssize_t nwrite = writeBuff( buf, test_read, 'a' );
std::cout << "data_buff: ";
buf.print();
std::cout << "Ring_Buff: ";
print_ringbuf(buf);
assert( nwrite == 16);
assert( buf.writeAvailableContinous() == 0);
assert( buf.readAvailableContinous() == 16);
assert( buf.readAvailable() == 16);

std::cout << "\nRead 16 elements from buffer. Empties buffer completely\n";
bufsize = 16;
ssize_t nread = readBuff( buf, test_write, 'b' );
std::cout << "data_buff: ";
buf.print();
std::cout << "Ring_Buff: ";
print_ringbuf(buf);
assert( nread == 16);
std::cout << "buf.readAvailable() = " << buf.readAvailable() << std::endl;
assert( buf.readAvailable() == 0);
assert( buf.writeAvailable() == 16);
assert( buf.writeAvailableContinous() == 16);
assert( buf.readAvailableContinous() == 0);

std::cout << "\nWrite 8 bytes to fill the buffer halfway\n";
bufsize = 8;
nwrite = writeBuff( buf, test_read, 'c' );
std::cout << "data_buff: ";
buf.print();
std::cout << "Ring_Buff: ";
print_ringbuf(buf);
assert( nwrite == 8);
std::cout << "buf.readAvailable() = " << buf.readAvailable() << std::endl;

std::cout << "\nRead 6 bytes to make space at the beginning of the data_buff array";
std::cout << "Read from buffer once\n";
bufsize = 6;
nread = readBuff( buf, test_write, 'd' );
std::cout << "data_buff: ";
buf.print();
std::cout << "Ring_Buff: ";
print_ringbuf(buf);
assert( nread == 6);
std::cout << "buf.readAvailable() = " << buf.readAvailable() << std::endl;

std::cout << "\nWriting 10 e's into buffer. This requires two accesses as the sequence not is continous\n";
bufsize = 10;
nwrite = writeBuff( buf, test_read, 'e' );
std::cout << "data_buff: ";
buf.print();
std::cout << "Ring_Buff: ";
print_ringbuf(buf);
assert( nwrite == 10);
std::cout << "buf.readAvailable() = " << buf.readAvailable() << std::endl;

std::cout << "\nReading 11 elements from buffer. This requires two accesses as the sequence not is continous\n";
bufsize = 11;
nread = readBuff( buf, test_write, 'f' );
std::cout << "data_buff: ";
buf.print();
std::cout << "Ring_Buff: ";
print_ringbuf(buf);
assert( nread == 11);
std::cout << "buf.readAvailable() = " << buf.readAvailable() << std::endl;

std::cout << "\nReading 10 elements from buffer. Should only give us one element\n";
bufsize = 10;
nread = readBuff( buf, test_write, 'g' );
std::cout << "data_buff: ";
buf.print();
std::cout << "Ring_Buff: ";
print_ringbuf(buf);
assert( nread == 1);
std::cout << "buf.readAvailable() = " << buf.readAvailable() << std::endl;
}