diff --git a/src/flow-hash.c b/src/flow-hash.c index fcd957c72e27..c08b6e12c7ee 100644 --- a/src/flow-hash.c +++ b/src/flow-hash.c @@ -764,10 +764,9 @@ static Flow *TcpReuseReplace(ThreadVars *tv, FlowLookupStruct *fls, FlowBucket * #ifdef UNITTESTS } #endif - /* time out immediately */ - old_f->timeout_at = 0; /* get some settings that we move over to the new flow */ FlowThreadId thread_id[2] = { old_f->thread_id[0], old_f->thread_id[1] }; + old_f->flow_end_flags |= FLOW_END_FLAG_TCPREUSE; /* flow is unlocked by caller */ @@ -790,7 +789,7 @@ static Flow *TcpReuseReplace(ThreadVars *tv, FlowLookupStruct *fls, FlowBucket * f->thread_id[0] = thread_id[0]; f->thread_id[1] = thread_id[1]; - STREAM_PKT_FLAG_SET(p, STREAM_PKT_FLAG_TCP_PORT_REUSE); + STREAM_PKT_FLAG_SET(p, STREAM_PKT_FLAG_TCP_SESSION_REUSE); return f; } @@ -832,19 +831,45 @@ static inline void MoveToWorkQueue(ThreadVars *tv, FlowLookupStruct *fls, } } -static inline bool FlowIsTimedOut(const Flow *f, const uint32_t sec, const bool emerg) +static inline bool FlowIsTimedOut( + const FlowThreadId ftid, const Flow *f, const SCTime_t pktts, const bool emerg) { - if (unlikely(f->timeout_at < sec)) { - return true; - } else if (unlikely(emerg)) { - extern FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX]; + SCTime_t timesout_at; + if (emerg) { + extern FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX]; + timesout_at = SCTIME_ADD_SECS(f->lastts, + FlowGetFlowTimeoutDirect(flow_timeouts_emerg, f->flow_state, f->protomap)); + } else { + timesout_at = SCTIME_ADD_SECS(f->lastts, f->timeout_policy); + } + /* if time is live, we just use the pktts */ + if (TimeModeIsLive() || ftid == f->thread_id[0] || f->thread_id[0] == 0) { + if (SCTIME_CMP_LT(pktts, timesout_at)) { + return false; + } + } else { + SCTime_t checkts = TmThreadsGetThreadTime(f->thread_id[0]); + /* do the timeout check */ + if (SCTIME_CMP_LT(checkts, timesout_at)) { + return false; + } + } + return true; +} - int64_t timeout_at = f->timeout_at - - FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap); - if ((int64_t)sec >= timeout_at) - return true; +static inline uint16_t GetTvId(const ThreadVars *tv) +{ + uint16_t tv_id; +#ifdef UNITTESTS + if (RunmodeIsUnittests()) { + tv_id = 0; + } else { + tv_id = (uint16_t)tv->id; } - return false; +#else + tv_id = (uint16_t)tv->id; +#endif + return tv_id; } /** \brief Get Flow for packet @@ -898,40 +923,45 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, FlowLookupStruct *fls, Packet *p, Flow return f; } + const uint16_t tv_id = GetTvId(tv); const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0; const uint32_t fb_nextts = !emerg ? SC_ATOMIC_GET(fb->next_ts) : 0; + const bool timeout_check = (fb_nextts <= (uint32_t)SCTIME_SECS(p->ts)); /* ok, we have a flow in the bucket. Let's find out if it is our flow */ Flow *prev_f = NULL; /* previous flow */ f = fb->head; do { Flow *next_f = NULL; - const bool timedout = (fb_nextts < (uint32_t)SCTIME_SECS(p->ts) && - FlowIsTimedOut(f, (uint32_t)SCTIME_SECS(p->ts), emerg)); - if (timedout) { + const bool our_flow = FlowCompare(f, p) != 0; + if (our_flow || timeout_check) { FLOWLOCK_WRLOCK(f); - next_f = f->next; - MoveToWorkQueue(tv, fls, fb, f, prev_f); - FLOWLOCK_UNLOCK(f); - goto flow_removed; - } else if (FlowCompare(f, p) != 0) { - FLOWLOCK_WRLOCK(f); - /* found a matching flow that is not timed out */ - if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx))) { - Flow *new_f = TcpReuseReplace(tv, fls, fb, f, hash, p); - if (prev_f == NULL) /* if we have no prev it means new_f is now our prev */ - prev_f = new_f; - MoveToWorkQueue(tv, fls, fb, f, prev_f); /* evict old flow */ - FLOWLOCK_UNLOCK(f); /* unlock old replaced flow */ - - if (new_f == NULL) { - FBLOCK_UNLOCK(fb); - return NULL; + const bool timedout = (timeout_check && FlowIsTimedOut(tv_id, f, p->ts, emerg)); + if (timedout) { + next_f = f->next; + MoveToWorkQueue(tv, fls, fb, f, prev_f); + FLOWLOCK_UNLOCK(f); + goto flow_removed; + } else if (our_flow) { + /* found a matching flow that is not timed out */ + if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx))) { + Flow *new_f = TcpReuseReplace(tv, fls, fb, f, hash, p); + if (prev_f == NULL) /* if we have no prev it means new_f is now our prev */ + prev_f = new_f; + MoveToWorkQueue(tv, fls, fb, f, prev_f); /* evict old flow */ + FLOWLOCK_UNLOCK(f); /* unlock old replaced flow */ + + if (new_f == NULL) { + FBLOCK_UNLOCK(fb); + return NULL; + } + f = new_f; } - f = new_f; + FlowReference(dest, f); + FBLOCK_UNLOCK(fb); + return f; /* return w/o releasing flow lock */ + } else { + FLOWLOCK_UNLOCK(f); } - FlowReference(dest, f); - FBLOCK_UNLOCK(fb); - return f; /* return w/o releasing flow lock */ } /* unless we removed 'f', prev_f needs to point to * current 'f' when adding a new flow below. */ diff --git a/src/flow-manager.c b/src/flow-manager.c index 4c6f165888d7..4a3567b75b7e 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -175,25 +175,46 @@ void FlowDisableFlowManagerThread(void) /** \internal * \brief check if a flow is timed out * + * Takes lastts, adds the timeout policy to it, compared to current time `ts`. + * In case of emergency mode, timeout_policy is ignored and the emerg table + * is used. + * * \param f flow - * \param ts timestamp + * \param ts timestamp - realtime or a minimum of active threads in offline mode + * \param next_ts tracking the next timeout ts, so FM can skip the row until that time + * \param emerg bool to indicate if emergency timeout settings should be used * * \retval false not timed out * \retval true timed out */ static bool FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const bool emerg) { - uint32_t flow_times_out_at = f->timeout_at; + SCTime_t timesout_at; + if (emerg) { - extern FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX]; - flow_times_out_at -= FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap); + extern FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX]; + timesout_at = SCTIME_ADD_SECS(f->lastts, + FlowGetFlowTimeoutDirect(flow_timeouts_emerg, f->flow_state, f->protomap)); + } else { + timesout_at = SCTIME_ADD_SECS(f->lastts, f->timeout_policy); } - if (*next_ts == 0 || flow_times_out_at < *next_ts) - *next_ts = flow_times_out_at; - - /* do the timeout check */ - if ((uint64_t)flow_times_out_at >= SCTIME_SECS(ts)) { - return false; + /* update next_ts if needed */ + if (*next_ts == 0 || (uint32_t)SCTIME_SECS(timesout_at) < *next_ts) + *next_ts = (uint32_t)SCTIME_SECS(timesout_at); + + /* if time is live, we just use the `ts` */ + if (TimeModeIsLive() || f->thread_id[0] == 0) { + /* do the timeout check */ + if (SCTIME_CMP_LT(ts, timesout_at)) { + return false; + } + } else { + /* offline: take last ts from "owning" thread */ + SCTime_t checkts = TmThreadsGetThreadTime(f->thread_id[0]); + /* do the timeout check */ + if (SCTIME_CMP_LT(checkts, timesout_at)) { + return false; + } } return true; @@ -327,6 +348,8 @@ static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCT do { checked++; + FLOWLOCK_WRLOCK(f); + /* check flow timeout based on lastts and state. Both can be * accessed w/o Flow lock as we do have the hash row lock (so flow * can't disappear) and flow_state is atomic. lastts can only @@ -334,6 +357,7 @@ static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCT /* timeout logic goes here */ if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == false) { + FLOWLOCK_UNLOCK(f); counters->flows_notimeout++; prev_f = f; @@ -341,8 +365,6 @@ static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCT continue; } - FLOWLOCK_WRLOCK(f); - Flow *next_flow = f->next; #ifdef CAPTURE_OFFLOAD @@ -495,7 +517,7 @@ static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const * \brief handle timeout for a slice of hash rows * If we wrap around we call FlowTimeoutHash twice * \param td FM timeout thread - * \param ts timeout in seconds + * \param ts timeout timestamp * \param hash_min lower bound of the row slice * \param hash_max upper bound of the row slice * \param counters Flow timeout counters to be passed @@ -805,7 +827,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) uint64_t sleep_per_wu = 0; bool prev_emerg = false; uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */ - SCTime_t ts; /* don't start our activities until time is setup */ while (!TimeModeIsReady()) { @@ -828,8 +849,10 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) while (run) { bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0); - /* Get the time */ - ts = TimeGet(); + /* Get the time: real time in live mode, or a min() of the + * "active" threads in offline mode. See TmThreadsGetMinimalTimestamp */ + SCTime_t ts = TimeGet(); + SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(ts)); uint64_t ts_ms = SCTIME_MSECS(ts); const bool emerge_p = (emerg && !prev_emerg); diff --git a/src/flow-util.c b/src/flow-util.c index 31e22b9341ac..9e90ae5be9bc 100644 --- a/src/flow-util.c +++ b/src/flow-util.c @@ -195,8 +195,6 @@ void FlowInit(ThreadVars *tv, Flow *f, const Packet *p) f->protomap = FlowGetProtoMapping(f->proto); f->timeout_policy = FlowGetTimeoutPolicy(f); - const uint32_t timeout_at = (uint32_t)SCTIME_SECS(f->startts) + f->timeout_policy; - f->timeout_at = timeout_at; if (MacSetFlowStorageEnabled()) { DEBUG_VALIDATE_BUG_ON(FlowGetStorageById(f, MacSetGetFlowStorageID()) != NULL); diff --git a/src/flow-util.h b/src/flow-util.h index 368c955d876a..4af2e4eafe0f 100644 --- a/src/flow-util.h +++ b/src/flow-util.h @@ -41,7 +41,6 @@ (f)->dp = 0; \ (f)->proto = 0; \ (f)->livedev = NULL; \ - (f)->timeout_at = 0; \ (f)->timeout_policy = 0; \ (f)->vlan_idx = 0; \ (f)->next = NULL; \ @@ -88,7 +87,6 @@ (f)->vlan_idx = 0; \ (f)->ffr = 0; \ (f)->next = NULL; \ - (f)->timeout_at = 0; \ (f)->timeout_policy = 0; \ (f)->flow_state = 0; \ (f)->tenant_id = 0; \ diff --git a/src/flow-worker.c b/src/flow-worker.c index 63de42a26650..3c95fe8789ad 100644 --- a/src/flow-worker.c +++ b/src/flow-worker.c @@ -554,11 +554,6 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data) SCLogDebug("packet %"PRIu64, p->pcap_cnt); - /* update time */ - if (!(PKT_IS_PSEUDOPKT(p))) { - TimeSetByThread(tv->id, p->ts); - } - /* handle Flow */ if (p->flags & PKT_WANTS_FLOW) { FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW); @@ -567,6 +562,10 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data) if (likely(p->flow != NULL)) { DEBUG_ASSERT_FLOW_LOCKED(p->flow); if (FlowUpdate(tv, fw, p) == TM_ECODE_DONE) { + /* update time */ + if (!(PKT_IS_PSEUDOPKT(p))) { + TimeSetByThread(tv->id, p->ts); + } goto housekeeping; } } @@ -581,6 +580,11 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data) DEBUG_VALIDATE_BUG_ON(p->pkt_src != PKT_SRC_FFR); } + /* update time */ + if (!(PKT_IS_PSEUDOPKT(p))) { + TimeSetByThread(tv->id, p->ts); + } + SCLogDebug("packet %"PRIu64" has flow? %s", p->pcap_cnt, p->flow ? "yes" : "no"); /* handle TCP and app layer */ diff --git a/src/flow.c b/src/flow.c index aea79d23bf08..5508ea7b923e 100644 --- a/src/flow.c +++ b/src/flow.c @@ -397,10 +397,6 @@ void FlowHandlePacketUpdate(Flow *f, Packet *p, ThreadVars *tv, DecodeThreadVars /* update the last seen timestamp of this flow */ if (SCTIME_CMP_GT(p->ts, f->lastts)) { f->lastts = p->ts; - const uint32_t timeout_at = (uint32_t)SCTIME_SECS(f->lastts) + f->timeout_policy; - if (timeout_at != f->timeout_at) { - f->timeout_at = timeout_at; - } } #ifdef CAPTURE_OFFLOAD } else { @@ -1169,9 +1165,6 @@ void FlowUpdateState(Flow *f, const enum FlowState s) const uint32_t timeout_policy = FlowGetTimeoutPolicy(f); if (timeout_policy != f->timeout_policy) { f->timeout_policy = timeout_policy; - const uint32_t timeout_at = (uint32_t)SCTIME_SECS(f->lastts) + timeout_policy; - if (timeout_at != f->timeout_at) - f->timeout_at = timeout_at; } } #ifdef UNITTESTS diff --git a/src/flow.h b/src/flow.h index 554f9fca4a32..5afbc57012ad 100644 --- a/src/flow.h +++ b/src/flow.h @@ -244,6 +244,7 @@ typedef struct AppLayerParserState_ AppLayerParserState; #define FLOW_END_FLAG_TIMEOUT 0x02 #define FLOW_END_FLAG_FORCED 0x04 #define FLOW_END_FLAG_SHUTDOWN 0x08 +#define FLOW_END_FLAG_TCPREUSE 0x10 /** Mutex or RWLocks for the flow. */ //#define FLOWLOCK_RWLOCK @@ -390,11 +391,6 @@ typedef struct Flow_ uint8_t ffr; }; - /** timestamp in seconds of the moment this flow will timeout - * according to the timeout policy. Does *not* take emergency - * mode into account. */ - uint32_t timeout_at; - /** Thread ID for the stream/detect portion of this flow */ FlowThreadId thread_id[2]; @@ -405,8 +401,8 @@ typedef struct Flow_ /** flow hash - the flow hash before hash table size mod. */ uint32_t flow_hash; - /** timeout policy value in seconds to add to the lastts.tv_sec - * when a packet has been received. */ + /** timeout in seconds by policy, add to Flow::lastts to get actual time this times out. + * Ignored in emergency mode. */ uint32_t timeout_policy; /* time stamp of last update (last packet). Set/updated under the diff --git a/src/output-eve-stream.c b/src/output-eve-stream.c index 4b44d86835e7..3059f2518108 100644 --- a/src/output-eve-stream.c +++ b/src/output-eve-stream.c @@ -1,4 +1,4 @@ -/* Copyright (C) 2023 Open Information Security Foundation +/* Copyright (C) 2023-2024 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -157,6 +157,7 @@ static OutputInitResult EveStreamLogInitCtxSub(ConfNode *conf, OutputCtx *parent ctx->trigger_flags |= SetFlag(conf, "state-update", STREAM_PKT_FLAG_STATE_UPDATE); ctx->trigger_flags |= SetFlag(conf, "spurious-retransmission", STREAM_PKT_FLAG_SPURIOUS_RETRANSMISSION); + ctx->trigger_flags |= SetFlag(conf, "tcp-session-reuse", STREAM_PKT_FLAG_TCP_SESSION_REUSE); ctx->trigger_flags |= SetFlag(conf, "all", 0xFFFF); SCLogDebug("trigger_flags %04x", ctx->trigger_flags); @@ -368,8 +369,8 @@ static int EveStreamLogger(ThreadVars *tv, void *thread_data, const Packet *p) jb_append_string(js, "dsack"); if (p->l4.vars.tcp.stream_pkt_flags & STREAM_PKT_FLAG_ACK_UNSEEN_DATA) jb_append_string(js, "ack_unseen_data"); - if (p->l4.vars.tcp.stream_pkt_flags & STREAM_PKT_FLAG_TCP_PORT_REUSE) - jb_append_string(js, "tcp_port_reuse"); + if (p->l4.vars.tcp.stream_pkt_flags & STREAM_PKT_FLAG_TCP_SESSION_REUSE) + jb_append_string(js, "tcp_session_reuse"); if (p->l4.vars.tcp.stream_pkt_flags & STREAM_PKT_FLAG_TCP_ZERO_WIN_PROBE) jb_append_string(js, "zero_window_probe"); if (p->l4.vars.tcp.stream_pkt_flags & STREAM_PKT_FLAG_TCP_ZERO_WIN_PROBE_ACK) diff --git a/src/output-json-flow.c b/src/output-json-flow.c index 015c72f8c7bc..d30866636f17 100644 --- a/src/output-json-flow.c +++ b/src/output-json-flow.c @@ -258,7 +258,9 @@ static void EveFlowLogJSON(OutputJsonThreadCtx *aft, JsonBuilder *jb, Flow *f) } const char *reason = NULL; - if (f->flow_end_flags & FLOW_END_FLAG_FORCED) + if (f->flow_end_flags & FLOW_END_FLAG_TCPREUSE) + reason = "tcp_reuse"; + else if (f->flow_end_flags & FLOW_END_FLAG_FORCED) reason = "forced"; else if (f->flow_end_flags & FLOW_END_FLAG_SHUTDOWN) reason = "shutdown"; diff --git a/src/runmodes.c b/src/runmodes.c index 006199bb94c6..cd5eed32b518 100644 --- a/src/runmodes.c +++ b/src/runmodes.c @@ -436,6 +436,7 @@ void RunModeDispatch(int runmode, const char *custom_mode, const char *capture_p BypassedFlowManagerThreadSpawn(); } StatsSpawnThreads(); + TmThreadsSealThreads(); } } diff --git a/src/stream-tcp-private.h b/src/stream-tcp-private.h index e1862211377c..e87508695b04 100644 --- a/src/stream-tcp-private.h +++ b/src/stream-tcp-private.h @@ -319,7 +319,7 @@ typedef struct TcpSession_ { #define STREAM_PKT_FLAG_DUP_ACK BIT_U16(7) #define STREAM_PKT_FLAG_DSACK BIT_U16(8) #define STREAM_PKT_FLAG_ACK_UNSEEN_DATA BIT_U16(9) -#define STREAM_PKT_FLAG_TCP_PORT_REUSE BIT_U16(10) +#define STREAM_PKT_FLAG_TCP_SESSION_REUSE BIT_U16(10) #define STREAM_PKT_FLAG_TCP_ZERO_WIN_PROBE BIT_U16(11) #define STREAM_PKT_FLAG_TCP_ZERO_WIN_PROBE_ACK BIT_U16(12) diff --git a/src/stream-tcp.c b/src/stream-tcp.c index 8aa0edb0c6c3..7352f2bdc74f 100644 --- a/src/stream-tcp.c +++ b/src/stream-tcp.c @@ -3062,32 +3062,6 @@ static int HandleEstablishedPacketToClient( return 0; } -/** - * \internal - * - * \brief Find the highest sequence number needed to consider all segments as ACK'd - * - * Used to treat all segments as ACK'd upon receiving a valid RST. - * - * \param stream stream to inspect the segments from - * \param seq sequence number to check against - * - * \retval ack highest ack we need to set - */ -static inline uint32_t StreamTcpResetGetMaxAck(TcpStream *stream, uint32_t seq) -{ - uint32_t ack = seq; - - if (STREAM_HAS_SEEN_DATA(stream)) { - const uint32_t tail_seq = STREAM_SEQ_RIGHT_EDGE(stream); - if (SEQ_GT(tail_seq, ack)) { - ack = tail_seq; - } - } - - SCReturnUInt(ack); -} - static bool StreamTcpPacketIsZeroWindowProbeAck(const TcpSession *ssn, const Packet *p) { const TCPHdr *tcph = PacketGetTCP(p); @@ -3309,10 +3283,9 @@ static int StreamTcpPacketStateEstablished( ssn->client.window = window << ssn->client.wscale; if ((tcph->th_flags & TH_ACK) && StreamTcpValidateAck(ssn, &ssn->server, p) == 0) - StreamTcpUpdateLastAck( - ssn, &ssn->server, StreamTcpResetGetMaxAck(&ssn->server, window)); + StreamTcpUpdateLastAck(ssn, &ssn->server, ack); - StreamTcpUpdateLastAck(ssn, &ssn->client, StreamTcpResetGetMaxAck(&ssn->client, seq)); + StreamTcpUpdateLastAck(ssn, &ssn->client, seq); if (ssn->flags & STREAMTCP_FLAG_TIMESTAMP) { StreamTcpHandleTimestamp(ssn, p); @@ -3337,10 +3310,9 @@ static int StreamTcpPacketStateEstablished( ssn->server.window = window << ssn->server.wscale; if ((tcph->th_flags & TH_ACK) && StreamTcpValidateAck(ssn, &ssn->client, p) == 0) - StreamTcpUpdateLastAck( - ssn, &ssn->client, StreamTcpResetGetMaxAck(&ssn->client, ack)); + StreamTcpUpdateLastAck(ssn, &ssn->client, ack); - StreamTcpUpdateLastAck(ssn, &ssn->server, StreamTcpResetGetMaxAck(&ssn->server, seq)); + StreamTcpUpdateLastAck(ssn, &ssn->server, seq); if (ssn->flags & STREAMTCP_FLAG_TIMESTAMP) { StreamTcpHandleTimestamp(ssn, p); @@ -3641,10 +3613,9 @@ static int StreamTcpPacketStateFinWait1( if (PKT_IS_TOSERVER(p)) { if ((tcph->th_flags & TH_ACK) && StreamTcpValidateAck(ssn, &ssn->server, p) == 0) - StreamTcpUpdateLastAck( - ssn, &ssn->server, StreamTcpResetGetMaxAck(&ssn->server, ack)); + StreamTcpUpdateLastAck(ssn, &ssn->server, ack); - StreamTcpUpdateLastAck(ssn, &ssn->client, StreamTcpResetGetMaxAck(&ssn->client, seq)); + StreamTcpUpdateLastAck(ssn, &ssn->client, seq); if (ssn->flags & STREAMTCP_FLAG_TIMESTAMP) { StreamTcpHandleTimestamp(ssn, p); @@ -3653,10 +3624,9 @@ static int StreamTcpPacketStateFinWait1( StreamTcpReassembleHandleSegment(tv, stt->ra_ctx, ssn, &ssn->client, p); } else { if ((tcph->th_flags & TH_ACK) && StreamTcpValidateAck(ssn, &ssn->client, p) == 0) - StreamTcpUpdateLastAck( - ssn, &ssn->client, StreamTcpResetGetMaxAck(&ssn->client, ack)); + StreamTcpUpdateLastAck(ssn, &ssn->client, ack); - StreamTcpUpdateLastAck(ssn, &ssn->server, StreamTcpResetGetMaxAck(&ssn->server, seq)); + StreamTcpUpdateLastAck(ssn, &ssn->server, seq); if (ssn->flags & STREAMTCP_FLAG_TIMESTAMP) { StreamTcpHandleTimestamp(ssn, p); @@ -4085,10 +4055,9 @@ static int StreamTcpPacketStateFinWait2( if (PKT_IS_TOSERVER(p)) { if ((tcph->th_flags & TH_ACK) && StreamTcpValidateAck(ssn, &ssn->server, p) == 0) - StreamTcpUpdateLastAck( - ssn, &ssn->server, StreamTcpResetGetMaxAck(&ssn->server, ack)); + StreamTcpUpdateLastAck(ssn, &ssn->server, ack); - StreamTcpUpdateLastAck(ssn, &ssn->client, StreamTcpResetGetMaxAck(&ssn->client, seq)); + StreamTcpUpdateLastAck(ssn, &ssn->client, seq); if (ssn->flags & STREAMTCP_FLAG_TIMESTAMP) { StreamTcpHandleTimestamp(ssn, p); @@ -4097,10 +4066,9 @@ static int StreamTcpPacketStateFinWait2( StreamTcpReassembleHandleSegment(tv, stt->ra_ctx, ssn, &ssn->client, p); } else { if ((tcph->th_flags & TH_ACK) && StreamTcpValidateAck(ssn, &ssn->client, p) == 0) - StreamTcpUpdateLastAck( - ssn, &ssn->client, StreamTcpResetGetMaxAck(&ssn->client, ack)); + StreamTcpUpdateLastAck(ssn, &ssn->client, ack); - StreamTcpUpdateLastAck(ssn, &ssn->server, StreamTcpResetGetMaxAck(&ssn->server, seq)); + StreamTcpUpdateLastAck(ssn, &ssn->server, seq); if (ssn->flags & STREAMTCP_FLAG_TIMESTAMP) { StreamTcpHandleTimestamp(ssn, p); @@ -4385,10 +4353,9 @@ static int StreamTcpPacketStateClosing( if (PKT_IS_TOSERVER(p)) { if ((tcph->th_flags & TH_ACK) && StreamTcpValidateAck(ssn, &ssn->server, p) == 0) - StreamTcpUpdateLastAck( - ssn, &ssn->server, StreamTcpResetGetMaxAck(&ssn->server, ack)); + StreamTcpUpdateLastAck(ssn, &ssn->server, ack); - StreamTcpUpdateLastAck(ssn, &ssn->client, StreamTcpResetGetMaxAck(&ssn->client, seq)); + StreamTcpUpdateLastAck(ssn, &ssn->client, seq); if (ssn->flags & STREAMTCP_FLAG_TIMESTAMP) { StreamTcpHandleTimestamp(ssn, p); @@ -4397,10 +4364,9 @@ static int StreamTcpPacketStateClosing( StreamTcpReassembleHandleSegment(tv, stt->ra_ctx, ssn, &ssn->client, p); } else { if ((tcph->th_flags & TH_ACK) && StreamTcpValidateAck(ssn, &ssn->client, p) == 0) - StreamTcpUpdateLastAck( - ssn, &ssn->client, StreamTcpResetGetMaxAck(&ssn->client, ack)); + StreamTcpUpdateLastAck(ssn, &ssn->client, ack); - StreamTcpUpdateLastAck(ssn, &ssn->server, StreamTcpResetGetMaxAck(&ssn->server, seq)); + StreamTcpUpdateLastAck(ssn, &ssn->server, seq); if (ssn->flags & STREAMTCP_FLAG_TIMESTAMP) { StreamTcpHandleTimestamp(ssn, p); @@ -4560,10 +4526,9 @@ static int StreamTcpPacketStateCloseWait( if (PKT_IS_TOSERVER(p)) { if ((tcph->th_flags & TH_ACK) && StreamTcpValidateAck(ssn, &ssn->server, p) == 0) - StreamTcpUpdateLastAck( - ssn, &ssn->server, StreamTcpResetGetMaxAck(&ssn->server, ack)); + StreamTcpUpdateLastAck(ssn, &ssn->server, ack); - StreamTcpUpdateLastAck(ssn, &ssn->client, StreamTcpResetGetMaxAck(&ssn->client, seq)); + StreamTcpUpdateLastAck(ssn, &ssn->client, seq); if (ssn->flags & STREAMTCP_FLAG_TIMESTAMP) { StreamTcpHandleTimestamp(ssn, p); @@ -4572,10 +4537,9 @@ static int StreamTcpPacketStateCloseWait( StreamTcpReassembleHandleSegment(tv, stt->ra_ctx, ssn, &ssn->client, p); } else { if ((tcph->th_flags & TH_ACK) && StreamTcpValidateAck(ssn, &ssn->client, p) == 0) - StreamTcpUpdateLastAck( - ssn, &ssn->client, StreamTcpResetGetMaxAck(&ssn->client, ack)); + StreamTcpUpdateLastAck(ssn, &ssn->client, ack); - StreamTcpUpdateLastAck(ssn, &ssn->server, StreamTcpResetGetMaxAck(&ssn->server, seq)); + StreamTcpUpdateLastAck(ssn, &ssn->server, seq); if (ssn->flags & STREAMTCP_FLAG_TIMESTAMP) { StreamTcpHandleTimestamp(ssn, p); @@ -4845,10 +4809,9 @@ static int StreamTcpPacketStateLastAck( if (PKT_IS_TOSERVER(p)) { if ((tcph->th_flags & TH_ACK) && StreamTcpValidateAck(ssn, &ssn->server, p) == 0) - StreamTcpUpdateLastAck( - ssn, &ssn->server, StreamTcpResetGetMaxAck(&ssn->server, ack)); + StreamTcpUpdateLastAck(ssn, &ssn->server, ack); - StreamTcpUpdateLastAck(ssn, &ssn->client, StreamTcpResetGetMaxAck(&ssn->client, seq)); + StreamTcpUpdateLastAck(ssn, &ssn->client, seq); if (ssn->flags & STREAMTCP_FLAG_TIMESTAMP) { StreamTcpHandleTimestamp(ssn, p); @@ -4857,10 +4820,9 @@ static int StreamTcpPacketStateLastAck( StreamTcpReassembleHandleSegment(tv, stt->ra_ctx, ssn, &ssn->client, p); } else { if ((tcph->th_flags & TH_ACK) && StreamTcpValidateAck(ssn, &ssn->client, p) == 0) - StreamTcpUpdateLastAck( - ssn, &ssn->client, StreamTcpResetGetMaxAck(&ssn->client, ack)); + StreamTcpUpdateLastAck(ssn, &ssn->client, ack); - StreamTcpUpdateLastAck(ssn, &ssn->server, StreamTcpResetGetMaxAck(&ssn->server, seq)); + StreamTcpUpdateLastAck(ssn, &ssn->server, seq); if (ssn->flags & STREAMTCP_FLAG_TIMESTAMP) { StreamTcpHandleTimestamp(ssn, p); @@ -4968,10 +4930,9 @@ static int StreamTcpPacketStateTimeWait( if (PKT_IS_TOSERVER(p)) { if ((tcph->th_flags & TH_ACK) && StreamTcpValidateAck(ssn, &ssn->server, p) == 0) - StreamTcpUpdateLastAck( - ssn, &ssn->server, StreamTcpResetGetMaxAck(&ssn->server, ack)); + StreamTcpUpdateLastAck(ssn, &ssn->server, ack); - StreamTcpUpdateLastAck(ssn, &ssn->client, StreamTcpResetGetMaxAck(&ssn->client, seq)); + StreamTcpUpdateLastAck(ssn, &ssn->client, seq); if (ssn->flags & STREAMTCP_FLAG_TIMESTAMP) { StreamTcpHandleTimestamp(ssn, p); @@ -4980,10 +4941,9 @@ static int StreamTcpPacketStateTimeWait( StreamTcpReassembleHandleSegment(tv, stt->ra_ctx, ssn, &ssn->client, p); } else { if ((tcph->th_flags & TH_ACK) && StreamTcpValidateAck(ssn, &ssn->client, p) == 0) - StreamTcpUpdateLastAck( - ssn, &ssn->client, StreamTcpResetGetMaxAck(&ssn->client, ack)); + StreamTcpUpdateLastAck(ssn, &ssn->client, ack); - StreamTcpUpdateLastAck(ssn, &ssn->server, StreamTcpResetGetMaxAck(&ssn->server, seq)); + StreamTcpUpdateLastAck(ssn, &ssn->server, seq); if (ssn->flags & STREAMTCP_FLAG_TIMESTAMP) { StreamTcpHandleTimestamp(ssn, p); diff --git a/src/suricata.c b/src/suricata.c index 6a01b55dd3ca..7c238c48cc5e 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -2210,7 +2210,6 @@ static int InitSignalHandler(SCInstance *suri) * Will be run once per pcap in unix-socket mode */ void PreRunInit(const int runmode) { - HttpRangeContainersInit(); if (runmode == RUNMODE_UNIX_SOCKET) return; @@ -2233,37 +2232,34 @@ void PreRunInit(const int runmode) AppLayerParserPostStreamSetup(); AppLayerRegisterGlobalCounters(); OutputFilestoreRegisterGlobalCounters(); + HttpRangeContainersInit(); } /* tasks we need to run before packets start flowing, * but after we dropped privs */ void PreRunPostPrivsDropInit(const int runmode) { - StatsSetupPostConfigPreOutput(); - RunModeInitializeOutputs(); - DatasetsInit(); - if (runmode == RUNMODE_UNIX_SOCKET) { - /* As the above did some necessary startup initialization, it - * also setup some outputs where only one is allowed, so - * deinitialize to the state that unix-mode does after every - * pcap. */ - PostRunDeinit(RUNMODE_PCAP_FILE, NULL); return; } + StatsSetupPostConfigPreOutput(); + RunModeInitializeOutputs(); + DatasetsInit(); StatsSetupPostConfigPostOutput(); } -/* clean up / shutdown code for both the main modes and for - * unix socket mode. +/** \brief clean up / shutdown code for packet modes * - * Will be run once per pcap in unix-socket mode */ + * Shuts down packet modes, so regular packet runmodes and the + * per pcap mode in the unix socket. */ void PostRunDeinit(const int runmode, struct timeval *start_time) { if (runmode == RUNMODE_UNIX_SOCKET) return; + TmThreadsUnsealThreads(); + /* needed by FlowWorkToDoCleanup */ PacketPoolInit(); @@ -2968,12 +2964,11 @@ void SuricataInit(void) prerun_snap = SystemHugepageSnapshotCreate(); SCSetStartTime(&suricata); - RunModeDispatch(suricata.run_mode, suricata.runmode_custom_mode, - suricata.capture_plugin_name, suricata.capture_plugin_args); if (suricata.run_mode != RUNMODE_UNIX_SOCKET) { UnixManagerThreadSpawnNonRunmode(suricata.unix_socket_enabled); } - + RunModeDispatch(suricata.run_mode, suricata.runmode_custom_mode, suricata.capture_plugin_name, + suricata.capture_plugin_args); return; out: diff --git a/src/tm-threads.c b/src/tm-threads.c index 07f9a9390df0..5da183c311be 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -1,4 +1,4 @@ -/* Copyright (C) 2007-2022 Open Information Security Foundation +/* Copyright (C) 2007-2024 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -1683,7 +1683,7 @@ TmEcode TmThreadSpawn(ThreadVars *tv) int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv); if (rc) { - FatalError("Unable to create thread with pthread_create(): retval %d: %s", rc, + FatalError("Unable to create thread %s with pthread_create(): retval %d: %s", tv->name, rc, strerror(errno)); } @@ -2064,17 +2064,19 @@ static void TmThreadDumpThreads(void) } #endif +/* Aligned to CLS to avoid false sharing between atomic ops. */ typedef struct Thread_ { ThreadVars *tv; /**< threadvars structure */ const char *name; int type; int in_use; /**< bool to indicate this is in use */ - SCTime_t pktts; /**< current packet time of this thread - * (offline mode) */ + SC_ATOMIC_DECLARE(SCTime_t, pktts); /**< current packet time of this thread + * (offline mode) */ SCTime_t sys_sec_stamp; /**< timestamp in real system * time when the pktts was last updated. */ -} Thread; + SCSpinlock spin; +} __attribute__((aligned(CLS))) Thread; typedef struct Threads_ { Thread *threads; @@ -2082,9 +2084,26 @@ typedef struct Threads_ { int threads_cnt; } Threads; +static bool thread_store_sealed = false; static Threads thread_store = { NULL, 0, 0 }; static SCMutex thread_store_lock = SCMUTEX_INITIALIZER; +void TmThreadsSealThreads(void) +{ + SCMutexLock(&thread_store_lock); + DEBUG_VALIDATE_BUG_ON(thread_store_sealed); + thread_store_sealed = true; + SCMutexUnlock(&thread_store_lock); +} + +void TmThreadsUnsealThreads(void) +{ + SCMutexLock(&thread_store_lock); + DEBUG_VALIDATE_BUG_ON(!thread_store_sealed); + thread_store_sealed = false; + SCMutexUnlock(&thread_store_lock); +} + void TmThreadsListThreads(void) { SCMutexLock(&thread_store_lock); @@ -2112,6 +2131,7 @@ void TmThreadsListThreads(void) int TmThreadsRegisterThread(ThreadVars *tv, const int type) { SCMutexLock(&thread_store_lock); + DEBUG_VALIDATE_BUG_ON(thread_store_sealed); if (thread_store.threads == NULL) { thread_store.threads = SCCalloc(STEP, sizeof(Thread)); BUG_ON(thread_store.threads == NULL); @@ -2122,10 +2142,13 @@ int TmThreadsRegisterThread(ThreadVars *tv, const int type) for (s = 0; s < thread_store.threads_size; s++) { if (thread_store.threads[s].in_use == 0) { Thread *t = &thread_store.threads[s]; + SCSpinInit(&t->spin, 0); + SCSpinLock(&t->spin); t->name = tv->name; t->type = type; t->tv = tv; t->in_use = 1; + SCSpinUnlock(&t->spin); SCMutexUnlock(&thread_store_lock); return (int)(s+1); @@ -2139,10 +2162,13 @@ int TmThreadsRegisterThread(ThreadVars *tv, const int type) memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread)); Thread *t = &thread_store.threads[thread_store.threads_size]; + SCSpinInit(&t->spin, 0); + SCSpinLock(&t->spin); t->name = tv->name; t->type = type; t->tv = tv; t->in_use = 1; + SCSpinUnlock(&t->spin); s = thread_store.threads_size; thread_store.threads_size += STEP; @@ -2155,6 +2181,7 @@ int TmThreadsRegisterThread(ThreadVars *tv, const int type) void TmThreadsUnregisterThread(const int id) { SCMutexLock(&thread_store_lock); + DEBUG_VALIDATE_BUG_ON(thread_store_sealed); if (id <= 0 || id > (int)thread_store.threads_size) { SCMutexUnlock(&thread_store_lock); return; @@ -2187,16 +2214,11 @@ void TmThreadsUnregisterThread(const int id) void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts) { - SCMutexLock(&thread_store_lock); - if (unlikely(id <= 0 || id > (int)thread_store.threads_size)) { - SCMutexUnlock(&thread_store_lock); - return; - } - + SCTime_t now = SCTimeGetTime(); int idx = id - 1; Thread *t = &thread_store.threads[idx]; - t->pktts = ts; - SCTime_t now = SCTimeGetTime(); + SCSpinLock(&t->spin); + SC_ATOMIC_SET(t->pktts, ts); #ifdef DEBUG if (t->sys_sec_stamp.secs != 0) { @@ -2208,43 +2230,58 @@ void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts) #endif t->sys_sec_stamp = now; - SCMutexUnlock(&thread_store_lock); + SCSpinUnlock(&t->spin); } bool TmThreadsTimeSubsysIsReady(void) { static SCTime_t nullts = SCTIME_INITIALIZER; bool ready = true; - SCMutexLock(&thread_store_lock); for (size_t s = 0; s < thread_store.threads_size; s++) { Thread *t = &thread_store.threads[s]; - if (!t->in_use) + if (!t->in_use) { break; - if (t->type != TVT_PPT) + } + SCSpinLock(&t->spin); + if (t->type != TVT_PPT) { + SCSpinUnlock(&t->spin); continue; + } if (SCTIME_CMP_EQ(t->sys_sec_stamp, nullts)) { ready = false; + SCSpinUnlock(&t->spin); break; } + SCSpinUnlock(&t->spin); } - SCMutexUnlock(&thread_store_lock); return ready; } void TmThreadsInitThreadsTimestamp(const SCTime_t ts) { SCTime_t now = SCTimeGetTime(); - SCMutexLock(&thread_store_lock); for (size_t s = 0; s < thread_store.threads_size; s++) { Thread *t = &thread_store.threads[s]; - if (!t->in_use) + if (!t->in_use) { break; - if (t->type != TVT_PPT) + } + SCSpinLock(&t->spin); + if (t->type != TVT_PPT) { + SCSpinUnlock(&t->spin); continue; - t->pktts = ts; + } + SC_ATOMIC_SET(t->pktts, ts); t->sys_sec_stamp = now; + SCSpinUnlock(&t->spin); } - SCMutexUnlock(&thread_store_lock); +} + +SCTime_t TmThreadsGetThreadTime(const int idx) +{ + BUG_ON(idx == 0); + const int i = idx - 1; + Thread *t = &thread_store.threads[i]; + return SC_ATOMIC_GET(t->pktts); } void TmThreadsGetMinimalTimestamp(struct timeval *ts) @@ -2252,34 +2289,38 @@ void TmThreadsGetMinimalTimestamp(struct timeval *ts) struct timeval local = { 0 }; static SCTime_t nullts = SCTIME_INITIALIZER; bool set = false; - size_t s; SCTime_t now = SCTimeGetTime(); - SCMutexLock(&thread_store_lock); - for (s = 0; s < thread_store.threads_size; s++) { + for (size_t s = 0; s < thread_store.threads_size; s++) { Thread *t = &thread_store.threads[s]; - if (t->in_use == 0) + if (t->in_use == 0) { break; + } + SCSpinLock(&t->spin); /* only packet threads set timestamps based on packets */ - if (t->type != TVT_PPT) + if (t->type != TVT_PPT) { + SCSpinUnlock(&t->spin); continue; - if (SCTIME_CMP_NEQ(t->pktts, nullts)) { - SCTime_t sys_sec_stamp = SCTIME_ADD_SECS(t->sys_sec_stamp, 1); + } + SCTime_t pktts = SC_ATOMIC_GET(t->pktts); + if (SCTIME_CMP_NEQ(pktts, nullts)) { + SCTime_t sys_sec_stamp = SCTIME_ADD_SECS(t->sys_sec_stamp, 5); /* ignore sleeping threads */ - if (SCTIME_CMP_LT(sys_sec_stamp, now)) + if (SCTIME_CMP_LT(sys_sec_stamp, now)) { + SCSpinUnlock(&t->spin); continue; - + } if (!set) { - SCTIME_TO_TIMEVAL(&local, t->pktts); + SCTIME_TO_TIMEVAL(&local, pktts); set = true; } else { - if (SCTIME_CMP_LT(t->pktts, SCTIME_FROM_TIMEVAL(&local))) { - SCTIME_TO_TIMEVAL(&local, t->pktts); + if (SCTIME_CMP_LT(pktts, SCTIME_FROM_TIMEVAL(&local))) { + SCTIME_TO_TIMEVAL(&local, pktts); } } } + SCSpinUnlock(&t->spin); } - SCMutexUnlock(&thread_store_lock); *ts = local; SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec); } diff --git a/src/tm-threads.h b/src/tm-threads.h index 63fbef85b0a3..dde0f2029ee9 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2007-2011 Open Information Security Foundation +/* Copyright (C) 2007-2024 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -276,6 +276,8 @@ static inline void TmThreadsCaptureBreakLoop(ThreadVars *tv) } } +void TmThreadsSealThreads(void); +void TmThreadsUnsealThreads(void); void TmThreadsListThreads(void); int TmThreadsRegisterThread(ThreadVars *tv, const int type); void TmThreadsUnregisterThread(const int id); @@ -284,6 +286,7 @@ void TmThreadsInjectFlowById(Flow *f, const int id); void TmThreadsInitThreadsTimestamp(const SCTime_t ts); void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts); void TmThreadsGetMinimalTimestamp(struct timeval *ts); +SCTime_t TmThreadsGetThreadTime(const int idx); uint16_t TmThreadsGetWorkerThreadMax(void); bool TmThreadsTimeSubsysIsReady(void);