Skip to content

Commit

Permalink
Cleanup collector loop
Browse files Browse the repository at this point in the history
  • Loading branch information
phaag committed Mar 29, 2024
1 parent 2d806f6 commit 5cffb3a
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 33 deletions.
4 changes: 2 additions & 2 deletions src/collector/collector_inline.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2009-2020, Peter Haag
* Copyright (c) 2009-2024, Peter Haag
* Copyright (c) 2008, SWITCH - Teleinformatikdienste fuer Lehre und Forschung
* All rights reserved.
*
Expand Down Expand Up @@ -82,7 +82,7 @@ static inline FlowSource_t *GetFlowSource(struct sockaddr_storage *ss) {
port = 0;
ptr = NULL;

LogError("Unknown sa fanily: %d in '%s', line '%d'", ss->ss_family, __FILE__, __LINE__);
LogError("Unknown sa family: %d in '%s', line '%d'", ss->ss_family, __FILE__, __LINE__);
return NULL;
}

Expand Down
8 changes: 3 additions & 5 deletions src/netflow/nfd_raw.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ void Process_nfd(void *in_buff, ssize_t in_buff_cnt, FlowSource_t *fs) {

if ((sizeof(nfd_header_t) + sizeof(recordHeaderV3_t)) > size_left) {
LogError("Process_nfd: Not enough data.");
dbg_printf("Process_nfd: Not enough data.");
return;
}

Expand All @@ -195,14 +194,13 @@ void Process_nfd(void *in_buff, ssize_t in_buff_cnt, FlowSource_t *fs) {
dbg_printf("Next record - type: %u, size: %u\n", recordHeaderV3->type, recordHeaderV3->size);
// verify received record.
if (VerifyV3Record(recordHeaderV3) == 0) {
LogError("Process_nfd(): Malformed nfd record received");
LogError("Process_nfd(): expected %u records, processd: %u", count, numRecords);
LogError("Process_nfd: Corrupt nfd record: expected %u records, processd: %u", count, numRecords);
return;
}

if (recordHeaderV3->size > size_left) {
LogError("Process_nfd: record size error. Size v3header: %u > size left: %u", recordHeaderV3->size, size_left);
LogError("Process_nfd(): expected %u records, processd: %u", count, numRecords);
LogError("Process_nfd: record size error. Size v3header: %u > size left: %d", recordHeaderV3->size, size_left);
LogError("Process_nfd: expected %u records, processd: %u", count, numRecords);
return;
}

Expand Down
36 changes: 11 additions & 25 deletions src/nfcapd/nfcapd.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ static void signalPrivsepChild(pid_t child_pid, int pfd) {

static void IntHandler(int signal) {
switch (signal) {
periodic_trigger = 1;
case SIGALRM:
periodic_trigger = 1;
break;
case SIGHUP:
case SIGINT:
Expand Down Expand Up @@ -230,7 +230,6 @@ static void ChildDied(void) {
}
gotSIGCHLD--;
}

} // End of ChildDied

#include "collector_inline.c"
Expand Down Expand Up @@ -313,7 +312,7 @@ static void run(packet_function_t receive_packet, int socket, int pfd, int rfd,
alarm(t_start + twin + 1 - time(NULL));
/*
* Main processing loop:
* this loop, continues until done = 1, set by the signal handler
* this loop, continues until = 1, set by the signal handler
* The while loop will be breaked by the periodic file renaming code
* for proper cleanup
*/
Expand All @@ -333,7 +332,7 @@ static void run(packet_function_t receive_packet, int socket, int pfd, int rfd,
#endif

if (cnt == -1 && errno != EINTR) {
LogError("ERROR: recvfrom: %s", strerror(errno));
LogError("recvfrom() error in '%s', line '%d', cnt: %d:, %s", __FILE__, __LINE__, cnt, strerror(errno));
continue;
}
}
Expand All @@ -356,6 +355,7 @@ static void run(packet_function_t receive_packet, int socket, int pfd, int rfd,

LogInfo("Total ignored packets: %u", ignored_packets);
ignored_packets = 0;
periodic_trigger = 0;

if (done) break;

Expand All @@ -370,27 +370,13 @@ static void run(packet_function_t receive_packet, int socket, int pfd, int rfd,
alarm(t_start + twin + 1 - t_now);
}

/* check for error condition or done. errno may only be EINTR */
if (periodic_trigger) {
if (cnt < 0) {
periodic_trigger = 0;
// alarm triggered, no new flow data
continue;
}
if (done) {
// signaled to terminate - exit from loop
break;
} else {
// A child could have died
ChildDied();
LogError("recvfrom() error in '%s', line '%d', cnt: %d:, %s", __FILE__, __LINE__, cnt, strerror(errno));
continue;
}
/* check for EINTR and continue */
if (cnt < 0) {
// Check if a child could have died
ChildDied();
continue;
}

/* enough data? */
if (cnt == 0) continue;

// repeat this packet
if (rfd) {
if (SendRepeaterMessage(rfd, in_buff, cnt, &nf_sender, nf_sender_size) != 0) {
Expand Down Expand Up @@ -424,8 +410,8 @@ static void run(packet_function_t receive_packet, int socket, int pfd, int rfd,
}

/* check for too little data - cnt must be > 0 at this point */
if (cnt < sizeof(common_flow_header_t)) {
LogError("Ident: %s, Data length error: too little data for common netflow header. cnt: %i", fs->Ident, (int)cnt);
if (cnt < (ssize_t)sizeof(common_flow_header_t)) {
LogError("Ident: %s, Data size error: not enough data for netflow header - cnt: %i", fs->Ident, (int)cnt);
fs->bad_packets++;
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion src/nfpcapd/flowsend.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ static int ProcessFlow(flowParam_t *flowParam, struct FlowNode *Node) {

return 1;

} /* End of StorePcapFlow */
} /* End of ProcessFlow */

static inline int CloseSender(flowParam_t *flowParam, time_t timestamp) {
repeater_t *sendHost = flowParam->sendHost;
Expand Down

0 comments on commit 5cffb3a

Please sign in to comment.