Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow timeout timing/v16.2 stream fixes/v6.1 v2 #12370

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 67 additions & 37 deletions src/flow-hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -764,10 +764,9 @@ static Flow *TcpReuseReplace(ThreadVars *tv, FlowLookupStruct *fls, FlowBucket *
#ifdef UNITTESTS
}
#endif
/* time out immediately */
old_f->timeout_at = 0;
/* get some settings that we move over to the new flow */
FlowThreadId thread_id[2] = { old_f->thread_id[0], old_f->thread_id[1] };
old_f->flow_end_flags |= FLOW_END_FLAG_TCPREUSE;

/* flow is unlocked by caller */

Expand All @@ -790,7 +789,7 @@ static Flow *TcpReuseReplace(ThreadVars *tv, FlowLookupStruct *fls, FlowBucket *
f->thread_id[0] = thread_id[0];
f->thread_id[1] = thread_id[1];

STREAM_PKT_FLAG_SET(p, STREAM_PKT_FLAG_TCP_PORT_REUSE);
STREAM_PKT_FLAG_SET(p, STREAM_PKT_FLAG_TCP_SESSION_REUSE);
return f;
}

Expand Down Expand Up @@ -832,19 +831,45 @@ static inline void MoveToWorkQueue(ThreadVars *tv, FlowLookupStruct *fls,
}
}

static inline bool FlowIsTimedOut(const Flow *f, const uint32_t sec, const bool emerg)
static inline bool FlowIsTimedOut(
const FlowThreadId ftid, const Flow *f, const SCTime_t pktts, const bool emerg)
{
if (unlikely(f->timeout_at < sec)) {
return true;
} else if (unlikely(emerg)) {
extern FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX];
SCTime_t timesout_at;
if (emerg) {
extern FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX];
timesout_at = SCTIME_ADD_SECS(f->lastts,
FlowGetFlowTimeoutDirect(flow_timeouts_emerg, f->flow_state, f->protomap));
} else {
timesout_at = SCTIME_ADD_SECS(f->lastts, f->timeout_policy);
}
/* if time is live, we just use the pktts */
if (TimeModeIsLive() || ftid == f->thread_id[0] || f->thread_id[0] == 0) {
if (SCTIME_CMP_LT(pktts, timesout_at)) {
return false;
}
} else {
SCTime_t checkts = TmThreadsGetThreadTime(f->thread_id[0]);
/* do the timeout check */
if (SCTIME_CMP_LT(checkts, timesout_at)) {
return false;
}
}
return true;
}

int64_t timeout_at = f->timeout_at -
FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap);
if ((int64_t)sec >= timeout_at)
return true;
static inline uint16_t GetTvId(const ThreadVars *tv)
{
uint16_t tv_id;
#ifdef UNITTESTS
if (RunmodeIsUnittests()) {
tv_id = 0;
} else {
tv_id = (uint16_t)tv->id;
}
return false;
#else
tv_id = (uint16_t)tv->id;
#endif
return tv_id;
}

/** \brief Get Flow for packet
Expand Down Expand Up @@ -898,40 +923,45 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, FlowLookupStruct *fls, Packet *p, Flow
return f;
}

const uint16_t tv_id = GetTvId(tv);
const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0;
const uint32_t fb_nextts = !emerg ? SC_ATOMIC_GET(fb->next_ts) : 0;
const bool timeout_check = (fb_nextts <= (uint32_t)SCTIME_SECS(p->ts));
/* ok, we have a flow in the bucket. Let's find out if it is our flow */
Flow *prev_f = NULL; /* previous flow */
f = fb->head;
do {
Flow *next_f = NULL;
const bool timedout = (fb_nextts < (uint32_t)SCTIME_SECS(p->ts) &&
FlowIsTimedOut(f, (uint32_t)SCTIME_SECS(p->ts), emerg));
if (timedout) {
const bool our_flow = FlowCompare(f, p) != 0;
if (our_flow || timeout_check) {
FLOWLOCK_WRLOCK(f);
next_f = f->next;
MoveToWorkQueue(tv, fls, fb, f, prev_f);
FLOWLOCK_UNLOCK(f);
goto flow_removed;
} else if (FlowCompare(f, p) != 0) {
FLOWLOCK_WRLOCK(f);
/* found a matching flow that is not timed out */
if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx))) {
Flow *new_f = TcpReuseReplace(tv, fls, fb, f, hash, p);
if (prev_f == NULL) /* if we have no prev it means new_f is now our prev */
prev_f = new_f;
MoveToWorkQueue(tv, fls, fb, f, prev_f); /* evict old flow */
FLOWLOCK_UNLOCK(f); /* unlock old replaced flow */

if (new_f == NULL) {
FBLOCK_UNLOCK(fb);
return NULL;
const bool timedout = (timeout_check && FlowIsTimedOut(tv_id, f, p->ts, emerg));
if (timedout) {
next_f = f->next;
MoveToWorkQueue(tv, fls, fb, f, prev_f);
FLOWLOCK_UNLOCK(f);
goto flow_removed;
} else if (our_flow) {
/* found a matching flow that is not timed out */
if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx))) {
Flow *new_f = TcpReuseReplace(tv, fls, fb, f, hash, p);
if (prev_f == NULL) /* if we have no prev it means new_f is now our prev */
prev_f = new_f;
MoveToWorkQueue(tv, fls, fb, f, prev_f); /* evict old flow */
FLOWLOCK_UNLOCK(f); /* unlock old replaced flow */

if (new_f == NULL) {
FBLOCK_UNLOCK(fb);
return NULL;
}
f = new_f;
}
f = new_f;
FlowReference(dest, f);
FBLOCK_UNLOCK(fb);
return f; /* return w/o releasing flow lock */
} else {
FLOWLOCK_UNLOCK(f);
}
FlowReference(dest, f);
FBLOCK_UNLOCK(fb);
return f; /* return w/o releasing flow lock */
}
/* unless we removed 'f', prev_f needs to point to
* current 'f' when adding a new flow below. */
Expand Down
55 changes: 39 additions & 16 deletions src/flow-manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,25 +175,46 @@ void FlowDisableFlowManagerThread(void)
/** \internal
* \brief check if a flow is timed out
*
* Takes lastts, adds the timeout policy to it, compared to current time `ts`.
* In case of emergency mode, timeout_policy is ignored and the emerg table
* is used.
*
* \param f flow
* \param ts timestamp
* \param ts timestamp - realtime or a minimum of active threads in offline mode
* \param next_ts tracking the next timeout ts, so FM can skip the row until that time
* \param emerg bool to indicate if emergency timeout settings should be used
*
* \retval false not timed out
* \retval true timed out
*/
static bool FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const bool emerg)
{
uint32_t flow_times_out_at = f->timeout_at;
SCTime_t timesout_at;

if (emerg) {
extern FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX];
flow_times_out_at -= FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap);
extern FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX];
timesout_at = SCTIME_ADD_SECS(f->lastts,
FlowGetFlowTimeoutDirect(flow_timeouts_emerg, f->flow_state, f->protomap));
} else {
timesout_at = SCTIME_ADD_SECS(f->lastts, f->timeout_policy);
}
if (*next_ts == 0 || flow_times_out_at < *next_ts)
*next_ts = flow_times_out_at;

/* do the timeout check */
if ((uint64_t)flow_times_out_at >= SCTIME_SECS(ts)) {
return false;
/* update next_ts if needed */
if (*next_ts == 0 || (uint32_t)SCTIME_SECS(timesout_at) < *next_ts)
*next_ts = (uint32_t)SCTIME_SECS(timesout_at);

/* if time is live, we just use the `ts` */
if (TimeModeIsLive() || f->thread_id[0] == 0) {
/* do the timeout check */
if (SCTIME_CMP_LT(ts, timesout_at)) {
return false;
}
} else {
/* offline: take last ts from "owning" thread */
SCTime_t checkts = TmThreadsGetThreadTime(f->thread_id[0]);
/* do the timeout check */
if (SCTIME_CMP_LT(checkts, timesout_at)) {
return false;
}
}

return true;
Expand Down Expand Up @@ -327,22 +348,23 @@ static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCT
do {
checked++;

FLOWLOCK_WRLOCK(f);

/* check flow timeout based on lastts and state. Both can be
* accessed w/o Flow lock as we do have the hash row lock (so flow
* can't disappear) and flow_state is atomic. lastts can only
* be modified when we have both the flow and hash row lock */

/* timeout logic goes here */
if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == false) {
FLOWLOCK_UNLOCK(f);
counters->flows_notimeout++;

prev_f = f;
f = f->next;
continue;
}

FLOWLOCK_WRLOCK(f);

Flow *next_flow = f->next;

#ifdef CAPTURE_OFFLOAD
Expand Down Expand Up @@ -495,7 +517,7 @@ static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const
* \brief handle timeout for a slice of hash rows
* If we wrap around we call FlowTimeoutHash twice
* \param td FM timeout thread
* \param ts timeout in seconds
* \param ts timeout timestamp
* \param hash_min lower bound of the row slice
* \param hash_max upper bound of the row slice
* \param counters Flow timeout counters to be passed
Expand Down Expand Up @@ -805,7 +827,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
uint64_t sleep_per_wu = 0;
bool prev_emerg = false;
uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */
SCTime_t ts;

/* don't start our activities until time is setup */
while (!TimeModeIsReady()) {
Expand All @@ -828,8 +849,10 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
while (run) {
bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0);

/* Get the time */
ts = TimeGet();
/* Get the time: real time in live mode, or a min() of the
* "active" threads in offline mode. See TmThreadsGetMinimalTimestamp */
SCTime_t ts = TimeGet();

SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(ts));
uint64_t ts_ms = SCTIME_MSECS(ts);
const bool emerge_p = (emerg && !prev_emerg);
Expand Down
2 changes: 0 additions & 2 deletions src/flow-util.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,6 @@ void FlowInit(ThreadVars *tv, Flow *f, const Packet *p)

f->protomap = FlowGetProtoMapping(f->proto);
f->timeout_policy = FlowGetTimeoutPolicy(f);
const uint32_t timeout_at = (uint32_t)SCTIME_SECS(f->startts) + f->timeout_policy;
f->timeout_at = timeout_at;

if (MacSetFlowStorageEnabled()) {
DEBUG_VALIDATE_BUG_ON(FlowGetStorageById(f, MacSetGetFlowStorageID()) != NULL);
Expand Down
2 changes: 0 additions & 2 deletions src/flow-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
(f)->dp = 0; \
(f)->proto = 0; \
(f)->livedev = NULL; \
(f)->timeout_at = 0; \
(f)->timeout_policy = 0; \
(f)->vlan_idx = 0; \
(f)->next = NULL; \
Expand Down Expand Up @@ -88,7 +87,6 @@
(f)->vlan_idx = 0; \
(f)->ffr = 0; \
(f)->next = NULL; \
(f)->timeout_at = 0; \
(f)->timeout_policy = 0; \
(f)->flow_state = 0; \
(f)->tenant_id = 0; \
Expand Down
14 changes: 9 additions & 5 deletions src/flow-worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -554,11 +554,6 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)

SCLogDebug("packet %"PRIu64, p->pcap_cnt);

/* update time */
if (!(PKT_IS_PSEUDOPKT(p))) {
TimeSetByThread(tv->id, p->ts);
}

/* handle Flow */
if (p->flags & PKT_WANTS_FLOW) {
FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW);
Expand All @@ -567,6 +562,10 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
if (likely(p->flow != NULL)) {
DEBUG_ASSERT_FLOW_LOCKED(p->flow);
if (FlowUpdate(tv, fw, p) == TM_ECODE_DONE) {
/* update time */
if (!(PKT_IS_PSEUDOPKT(p))) {
TimeSetByThread(tv->id, p->ts);
}
goto housekeeping;
}
}
Expand All @@ -581,6 +580,11 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
DEBUG_VALIDATE_BUG_ON(p->pkt_src != PKT_SRC_FFR);
}

/* update time */
if (!(PKT_IS_PSEUDOPKT(p))) {
TimeSetByThread(tv->id, p->ts);
}

SCLogDebug("packet %"PRIu64" has flow? %s", p->pcap_cnt, p->flow ? "yes" : "no");

/* handle TCP and app layer */
Expand Down
7 changes: 0 additions & 7 deletions src/flow.c
Original file line number Diff line number Diff line change
Expand Up @@ -397,10 +397,6 @@ void FlowHandlePacketUpdate(Flow *f, Packet *p, ThreadVars *tv, DecodeThreadVars
/* update the last seen timestamp of this flow */
if (SCTIME_CMP_GT(p->ts, f->lastts)) {
f->lastts = p->ts;
const uint32_t timeout_at = (uint32_t)SCTIME_SECS(f->lastts) + f->timeout_policy;
if (timeout_at != f->timeout_at) {
f->timeout_at = timeout_at;
}
}
#ifdef CAPTURE_OFFLOAD
} else {
Expand Down Expand Up @@ -1169,9 +1165,6 @@ void FlowUpdateState(Flow *f, const enum FlowState s)
const uint32_t timeout_policy = FlowGetTimeoutPolicy(f);
if (timeout_policy != f->timeout_policy) {
f->timeout_policy = timeout_policy;
const uint32_t timeout_at = (uint32_t)SCTIME_SECS(f->lastts) + timeout_policy;
if (timeout_at != f->timeout_at)
f->timeout_at = timeout_at;
}
}
#ifdef UNITTESTS
Expand Down
10 changes: 3 additions & 7 deletions src/flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ typedef struct AppLayerParserState_ AppLayerParserState;
#define FLOW_END_FLAG_TIMEOUT 0x02
#define FLOW_END_FLAG_FORCED 0x04
#define FLOW_END_FLAG_SHUTDOWN 0x08
#define FLOW_END_FLAG_TCPREUSE 0x10

/** Mutex or RWLocks for the flow. */
//#define FLOWLOCK_RWLOCK
Expand Down Expand Up @@ -390,11 +391,6 @@ typedef struct Flow_
uint8_t ffr;
};

/** timestamp in seconds of the moment this flow will timeout
* according to the timeout policy. Does *not* take emergency
* mode into account. */
uint32_t timeout_at;

/** Thread ID for the stream/detect portion of this flow */
FlowThreadId thread_id[2];

Expand All @@ -405,8 +401,8 @@ typedef struct Flow_
/** flow hash - the flow hash before hash table size mod. */
uint32_t flow_hash;

/** timeout policy value in seconds to add to the lastts.tv_sec
* when a packet has been received. */
/** timeout in seconds by policy, add to Flow::lastts to get actual time this times out.
* Ignored in emergency mode. */
uint32_t timeout_policy;

/* time stamp of last update (last packet). Set/updated under the
Expand Down
7 changes: 4 additions & 3 deletions src/output-eve-stream.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright (C) 2023 Open Information Security Foundation
/* Copyright (C) 2023-2024 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
Expand Down Expand Up @@ -157,6 +157,7 @@ static OutputInitResult EveStreamLogInitCtxSub(ConfNode *conf, OutputCtx *parent
ctx->trigger_flags |= SetFlag(conf, "state-update", STREAM_PKT_FLAG_STATE_UPDATE);
ctx->trigger_flags |=
SetFlag(conf, "spurious-retransmission", STREAM_PKT_FLAG_SPURIOUS_RETRANSMISSION);
ctx->trigger_flags |= SetFlag(conf, "tcp-session-reuse", STREAM_PKT_FLAG_TCP_SESSION_REUSE);

ctx->trigger_flags |= SetFlag(conf, "all", 0xFFFF);
SCLogDebug("trigger_flags %04x", ctx->trigger_flags);
Expand Down Expand Up @@ -368,8 +369,8 @@ static int EveStreamLogger(ThreadVars *tv, void *thread_data, const Packet *p)
jb_append_string(js, "dsack");
if (p->l4.vars.tcp.stream_pkt_flags & STREAM_PKT_FLAG_ACK_UNSEEN_DATA)
jb_append_string(js, "ack_unseen_data");
if (p->l4.vars.tcp.stream_pkt_flags & STREAM_PKT_FLAG_TCP_PORT_REUSE)
jb_append_string(js, "tcp_port_reuse");
if (p->l4.vars.tcp.stream_pkt_flags & STREAM_PKT_FLAG_TCP_SESSION_REUSE)
jb_append_string(js, "tcp_session_reuse");
if (p->l4.vars.tcp.stream_pkt_flags & STREAM_PKT_FLAG_TCP_ZERO_WIN_PROBE)
jb_append_string(js, "zero_window_probe");
if (p->l4.vars.tcp.stream_pkt_flags & STREAM_PKT_FLAG_TCP_ZERO_WIN_PROBE_ACK)
Expand Down
Loading
Loading