Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extension for IPv4Reassembler to prevent a possible memory leak #301

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
build/**
include/tins/config.h
.vscode/**
85 changes: 85 additions & 0 deletions include/tins/ip_reassembler.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,19 @@
#ifndef TINS_IP_REASSEMBLER_H
#define TINS_IP_REASSEMBLER_H

#if TINS_IS_CXX11
#include <chrono>
#include <functional>
#elif defined(_WIN32)
#include <winsock2.h>
#include <windows.h>
#else
#include <sys/time.h>
#endif

#include <vector>
#include <map>
#include <list>
#include <tins/pdu.h>
#include <tins/macros.h>
#include <tins/ip_address.h>
Expand Down Expand Up @@ -70,11 +81,20 @@ class IPv4Fragment {
class TINS_API IPv4Stream {
public:
IPv4Stream();

#if TINS_IS_CXX11
typedef std::chrono::system_clock::time_point time_point;
#else
typedef uint64_t time_point;
static uint64_t current_time();
#endif

void add_fragment(IP* ip);
bool is_complete() const;
PDU* allocate_pdu() const;
const IP& first_fragment() const;
size_t number_fragments() const;
time_point start_time_point() const;
private:
typedef std::vector<IPv4Fragment> fragments_type;

Expand All @@ -86,6 +106,7 @@ class TINS_API IPv4Stream {
size_t total_size_;
IP first_fragment_;
bool received_end_;
time_point start_time_point_;
};
} // namespace Internals

Expand Down Expand Up @@ -130,6 +151,12 @@ class TINS_API IPv4Reassembler {

TINS_DEPRECATED(typedef PacketStatus packet_status);

#if TINS_IS_CXX11
typedef std::function<void(PDU& pdu)> StreamCallback;
#else
typedef void (*StreamCallback)(PDU& pdu);
#endif

/**
* The type used to represent the overlapped segment reassembly
* technique to be used.
Expand Down Expand Up @@ -183,16 +210,74 @@ class TINS_API IPv4Reassembler {
* \sa IP::id
*/
void remove_stream(uint16_t id, IPv4Address addr1, IPv4Address addr2);

/**
* \brief A limit is set for each streams.
* If max_number == 0, then there are no restrictions.
*
* \param max_number Maximum number of packets per stream
* \param callback If set, it is called for each overflow stream
*/
void set_max_number_packets_to_stream(uint64_t max_number, StreamCallback callback = 0);

/**
* \brief Set the lifetime for each streams.
* The list of existing streams is checked with a specified time step.
* Attention, the check does not occur in a separate thread,
* but on each incoming package.
*
* \param stream_timeout_ms The lifetime of a single stream (milliseconds)
* If stream_timeout_ms == 0, then there will be no verification
* \param time_to_check_ms Time step for verification (milliseconds)
* If time_to_check_ms == 0 and stream_timeout_ms != 0, then the check will be with each new package
* \param callback If set, it is called for each expired valid stream
*/
void set_timeout_to_stream(uint64_t stream_timeout_ms, uint64_t time_to_check_ms = 1000, StreamCallback callback = 0);

/**
* \brief Return the total number of complete packets
*/
size_t total_number_complete_packages() const;

/**
* \brief Return the total number of damaged packages
*/
size_t total_number_damaged_packages() const;

/**
* \brief Return the current number of incomplete packets
*/
size_t current_number_incomplete_packages() const;

/**
* \brief Returns the current size of the partial-packet buffer
*/
size_t current_buffer_size_incomplete_packages() const;
private:
typedef std::pair<IPv4Address, IPv4Address> address_pair;
typedef std::pair<uint16_t, address_pair> key_type;
typedef std::map<key_type, Internals::IPv4Stream> streams_type;
typedef std::list< std::pair<key_type, Internals::IPv4Stream::time_point> > streams_history;

key_type make_key(const IP* ip) const;
address_pair make_address_pair(IPv4Address addr1, IPv4Address addr2) const;
void removal_expired_streams();

streams_type streams_;
OverlappingTechnique technique_;
uint64_t max_number_packets_to_stream_;
uint64_t stream_timeout_ms_;
uint64_t time_to_check_ms_;
streams_history streams_history_;

StreamCallback stream_overflow_callback_;
StreamCallback stream_timeout_callback_;

Internals::IPv4Stream::time_point origin_cycle_time_;

// Statistics
size_t total_number_complete_packages_;
size_t total_number_damaged_packages_;
};

/**
Expand Down
178 changes: 171 additions & 7 deletions src/ip_reassembler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,30 @@ namespace Internals {

IPv4Stream::IPv4Stream()
: received_size_(), total_size_(), received_end_(false) {
#if TINS_IS_CXX11
start_time_point_ = std::chrono::system_clock::now();
#else
start_time_point_ = current_time();
#endif
}

#if TINS_IS_CXX11 == 0
uint64_t IPv4Stream::current_time() {
#ifdef _WIN32
FILETIME file_time;
GetSystemTimeAsFileTime(&file_time);
ULARGE_INTEGER ul;
ul.LowPart = file_time.dwLowDateTime;
ul.HighPart = file_time.dwHighDateTime;
uint64_t file_time_64 = ul.QuadPart;
return file_time_64;
#else
timespec ts = { 0 };
clock_gettime(CLOCK_MONOTONIC, &ts);
return ((uint64_t)ts.tv_sec) * 1000 + ((uint64_t)ts.tv_nsec) / 1000000;
#endif
}
#endif

void IPv4Stream::add_fragment(IP* ip) {
const uint16_t offset = extract_offset(ip);
Expand Down Expand Up @@ -100,50 +122,109 @@ const IP& IPv4Stream::first_fragment() const {
return first_fragment_;
}

size_t IPv4Stream::number_fragments() const {
return fragments_.size();
}

IPv4Stream::time_point IPv4Stream::start_time_point() const {
return start_time_point_;
}

uint16_t IPv4Stream::extract_offset(const IP* ip) {
return ip->fragment_offset() * 8;
}

} // Internals

IPv4Reassembler::IPv4Reassembler()
: technique_(NONE) {

: technique_(NONE), max_number_packets_to_stream_(0),
stream_timeout_ms_(0), stream_overflow_callback_(0), stream_timeout_callback_(0),
total_number_complete_packages_(0), total_number_damaged_packages_(0) {
#if TINS_IS_CXX11
origin_cycle_time_ = std::chrono::system_clock::now();
#else
origin_cycle_time_ = Internals::IPv4Stream::current_time();
#endif
}

IPv4Reassembler::IPv4Reassembler(OverlappingTechnique technique)
: technique_(technique) {

: technique_(technique), max_number_packets_to_stream_(0),
stream_timeout_ms_(0), stream_overflow_callback_(0), stream_timeout_callback_(0),
total_number_complete_packages_(0), total_number_damaged_packages_(0) {
#if TINS_IS_CXX11
origin_cycle_time_ = std::chrono::system_clock::now();
#else
origin_cycle_time_ = Internals::IPv4Stream::current_time();
#endif
}

IPv4Reassembler::PacketStatus IPv4Reassembler::process(PDU& pdu) {
// Removal of expired streams
removal_expired_streams();

IP* ip = pdu.find_pdu<IP>();
if (ip && ip->inner_pdu()) {
// There's fragmentation
if (ip->is_fragmented()) {
key_type key = make_key(ip);
// Create it or look it up, it's the same
Internals::IPv4Stream& stream = streams_[key];
streams_type::iterator stream_it = streams_.find(key);
Internals::IPv4Stream& stream = (stream_it != streams_.end() ? stream_it->second : streams_[key]);

stream.add_fragment(ip);
if (stream.is_complete()) {
++total_number_complete_packages_;

PDU* pdu = stream.allocate_pdu();
// Use all field values from the first fragment
*ip = stream.first_fragment();

// Erase this stream, since it's already assembled
streams_.erase(key);

// The packet is corrupt
if (!pdu) {
++total_number_damaged_packages_;
return FRAGMENTED;
}
ip->inner_pdu(pdu);
ip->fragment_offset(0);
ip->flags(static_cast<IP::Flags>(0));

return REASSEMBLED;
}
else {
return FRAGMENTED;

// Only non-complete packages fall into the list
if (stream_timeout_ms_ && stream_it == streams_.end()) {
#if TINS_IS_CXX11
streams_history_.emplace_back(key, stream.start_time_point());
#else
streams_history_.push_back(std::make_pair(key, stream.start_time_point()));
#endif
}

// Tracking overflow stream
if (max_number_packets_to_stream_
&& stream.number_fragments() >= max_number_packets_to_stream_)
{
if (stream_overflow_callback_)
{
PDU* pdu = stream.allocate_pdu();

// The packet is not corrupt
if (pdu) {
stream_overflow_callback_(*pdu);
delete pdu;
} else {
++total_number_damaged_packages_;
}
}

// Erase this stream
streams_.erase(key);
}

return FRAGMENTED;
}
}
return NOT_FRAGMENTED;
Expand All @@ -165,6 +246,62 @@ IPv4Reassembler::address_pair IPv4Reassembler::make_address_pair(IPv4Address add
}
}

void IPv4Reassembler::removal_expired_streams()
{
if (!stream_timeout_ms_ || streams_history_.empty()) return;

#if TINS_IS_CXX11
Internals::IPv4Stream::time_point now = std::chrono::system_clock::now();
auto step = std::chrono::duration_cast<std::chrono::milliseconds>(now - origin_cycle_time_);
if (step < std::chrono::milliseconds(time_to_check_ms_)) {
return;
}
#else
uint64_t now = Internals::IPv4Stream::current_time();
uint64_t step = now - origin_cycle_time_;
if (step < time_to_check_ms_) {
return;
}
#endif
origin_cycle_time_ = now;

while (!streams_history_.empty()) {
streams_history::value_type & history_front = streams_history_.front();
streams_type::iterator stream_it = streams_.find(history_front.first);
if (stream_it == streams_.end()) {
streams_history_.pop_front();
continue;
}
Internals::IPv4Stream& stream_tmp = stream_it->second;
if (stream_tmp.start_time_point() != history_front.second) {
streams_history_.pop_front();
continue;
}
#if TINS_IS_CXX11
auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(now - history_front.second);
if ((uint64_t)diff.count() > stream_timeout_ms_) {
#else
uint64_t diff = now - history_front.second;
if (diff > stream_timeout_ms_) {
#endif
if (stream_timeout_callback_) {
PDU* pdu = stream_tmp.allocate_pdu();

// The packet is not corrupt
if (pdu) {
stream_timeout_callback_(*pdu);
delete pdu;
} else {
++total_number_damaged_packages_;
}
}
// Erase this stream
streams_.erase(history_front.first);
streams_history_.pop_front();
} else break;
}
}

void IPv4Reassembler::clear_streams() {
streams_.clear();
}
Expand All @@ -178,4 +315,31 @@ void IPv4Reassembler::remove_stream(uint16_t id, IPv4Address addr1, IPv4Address
);
}

void IPv4Reassembler::set_max_number_packets_to_stream(uint64_t max_number, StreamCallback callback) {
max_number_packets_to_stream_ = max_number;
stream_overflow_callback_ = callback;
}

void IPv4Reassembler::set_timeout_to_stream(uint64_t stream_timeout_ms, uint64_t time_to_check_ms, StreamCallback callback) {
stream_timeout_ms_ = stream_timeout_ms;
time_to_check_ms_ = time_to_check_ms;
stream_timeout_callback_ = callback;
}

size_t IPv4Reassembler::total_number_complete_packages() const {
return total_number_complete_packages_;
}

size_t IPv4Reassembler::total_number_damaged_packages() const {
return total_number_damaged_packages_;
}

size_t IPv4Reassembler::current_number_incomplete_packages() const {
return streams_.size();
}

size_t IPv4Reassembler::current_buffer_size_incomplete_packages() const {
return streams_history_.size();
}

} // Tins
Loading