Skip to content

Commit

Permalink
Add packet de-duplication for nfpcapd
Browse files Browse the repository at this point in the history
  • Loading branch information
phaag committed Jun 29, 2024
1 parent 3f8fcdd commit 65ad2f9
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 15 deletions.
8 changes: 8 additions & 0 deletions man/nfpcapd.1
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ Specify name of pidfile. Default is no pidfile.
Daemon mode: fork to background and detach from terminal.
Nfpcapd terminates on signal TERM, INT and HUP.
.TP 3
.B -d
De-duplicate packets with a window size of 4. This option is useful, if
packets in a pcap file are duplicated for some unknown reason. If packets
on multiple span ports are sent to the collector they got de-duplicated within
the window size. Note: The de-duplication works on the IP layer to the end
of the packet, which means layer 1 and VLAN/MPLS layers are stripped.
If used together with -p, the resulting pcaps are de-duplicated as well.
.TP 3
.B -E
Verbose flow printing. Print flows on stdout, when flushed to disk.
Use verbose printing only for debugging purpose in order to see if your
Expand Down
2 changes: 2 additions & 0 deletions src/nfpcapd/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ if HAVEPCAPAPPEND
AM_CPPFLAGS += -DHAVEPCAPAPPEND
endif

EXTRA_DIST = murmurhash.c

CLEANFILES = *.gch
14 changes: 10 additions & 4 deletions src/nfpcapd/nfpcapd.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ static void usage(char *name) {
"-r pcapfile\tread packets from file\n"
"-b num\tset socket buffer size in MB. (default 20MB)\n"
"-B num\tset the node cache size. (default 524288)\n"
"-d\t\tDe-duplicate packets with window size 8.\n"
"-s snaplen\tset the snapshot length - default 1522\n"
"-e active,inactive\tset the active,inactive flow expire time (s) - default 300,60\n"
"-o options \tAdd flow options, separated with ','. Available: 'fat', 'payload'\n"
Expand Down Expand Up @@ -280,7 +281,7 @@ static int scanOptions(flowParam_t *flowParam, char *options) {
int main(int argc, char *argv[]) {
sigset_t signal_set;
struct sigaction sa;
int c, snaplen, bufflen, err, do_daemonize;
int c, snaplen, bufflen, err, do_daemonize, doDedup;
int subdir_index, compress, expire, cache_size, buff_size;
int activeTimeout, inactiveTimeout, metricInterval, workers;
dirstat_t *dirstat;
Expand All @@ -293,6 +294,7 @@ int main(int argc, char *argv[]) {
snaplen = 1522;
bufflen = 0;
do_daemonize = 0;
doDedup = 0;
launcher_pid = 0;
device = NULL;
pcapfile = NULL;
Expand All @@ -319,7 +321,7 @@ int main(int argc, char *argv[]) {
inactiveTimeout = 0;
workers = 0;

while ((c = getopt(argc, argv, "b:B:C:De:g:hH:I:i:j:l:m:o:p:P:r:s:S:T:t:u:vVw:yz::")) != EOF) {
while ((c = getopt(argc, argv, "b:B:C:dDe:g:hH:I:i:j:l:m:o:p:P:r:s:S:T:t:u:vVw:yz::")) != EOF) {
switch (c) {
struct stat fstat;
case 'h':
Expand All @@ -341,6 +343,9 @@ int main(int argc, char *argv[]) {
configFile = optarg;
}
break;
case 'd':
doDedup = 1;
break;
case 'D':
do_daemonize = 1;
break;
Expand Down Expand Up @@ -570,6 +575,7 @@ int main(int argc, char *argv[]) {
flowParam_t flowParam = {0};
flushParam.extensionFormat = time_extension;
flowParam.extensionFormat = time_extension;
packetParam.doDedup = doDedup;

if (options && scanOptions(&flowParam, options) < 0) {
exit(EXIT_FAILURE);
Expand Down Expand Up @@ -778,8 +784,8 @@ int main(int argc, char *argv[]) {

CloseMetric();

LogInfo("Total: Processed: %u, skipped: %u, short caplen: %u, unknown: %u\n", packetParam.proc_stat.packets, packetParam.proc_stat.skipped,
packetParam.proc_stat.short_snap, packetParam.proc_stat.unknown);
LogInfo("Total: Processed: %u, skipped: %u, short caplen: %u, unknown: %u, duplicates: %llu\n", packetParam.proc_stat.packets,
packetParam.proc_stat.skipped, packetParam.proc_stat.short_snap, packetParam.proc_stat.unknown, packetParam.proc_stat.duplicates);

if (pidfile) remove_pid(pidfile);

Expand Down
2 changes: 2 additions & 0 deletions src/nfpcapd/packet_pcap.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ typedef struct proc_stat_s {
uint32_t skipped;
uint32_t unknown;
uint32_t short_snap;
uint64_t duplicates;
} proc_stat_t;

#ifdef USE_TPACKETV3
Expand Down Expand Up @@ -96,6 +97,7 @@ typedef struct packetParam_s {
pcap_t *pcap_dev;
int t_win;
int *done;
int doDedup;

uint32_t snaplen;
uint32_t linktype;
Expand Down
60 changes: 50 additions & 10 deletions src/nfpcapd/pcaproc.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ static inline void ProcessICMPFlow(packetParam_t *packetParam, struct FlowNode *

static inline void ProcessOtherFlow(packetParam_t *packetParam, struct FlowNode *NewNode, void *payload, size_t payloadSize);

#include "murmurhash.c"

pcapfile_t *OpenNewPcapFile(pcap_t *p, char *filename, pcapfile_t *pcapfile) {
if (!pcapfile) {
// Create struct
Expand Down Expand Up @@ -499,7 +501,7 @@ static inline void ProcessOtherFlow(packetParam_t *packetParam, struct FlowNode

} // End of ProcessOtherFlow

void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, const u_char *data) {
int ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, const u_char *data) {
struct FlowNode *Node = NULL;
uint16_t version, IPproto;
char s1[64];
Expand All @@ -526,6 +528,7 @@ void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, co
// link layer processing
uint16_t protocol = 0;
uint32_t linktype = packetParam->linktype;
int redoLink = 0;
REDO_LINK:
switch (linktype) {
case DLT_EN10MB:
Expand All @@ -535,7 +538,7 @@ void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, co
int IEEE802 = protocol <= 1500;
if (IEEE802) {
packetParam->proc_stat.skipped++;
return;
return 1;
}
// unwrap link layer
dataptr += 14;
Expand Down Expand Up @@ -582,7 +585,7 @@ void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, co
break;
default:
LogInfo("Packet: %u: unsupported DLT_NULL protocol: 0x%x, packet: %u", pkg_cnt, header);
return;
return 1;
}
} break;
case DLT_LINUX_SLL:
Expand All @@ -601,12 +604,12 @@ void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, co
nflog_hdr_t *nflog_hdr = (nflog_hdr_t *)dataptr;
if (hdr->caplen < sizeof(nflog_hdr_t)) {
LogInfo("Packet: %u: NFLOG: not enough data", pkg_cnt);
return;
return 1;
}

if (nflog_hdr->nflog_version != 0) {
LogInfo("Packet: %u: unsupported NFLOG version: %d", pkg_cnt, nflog_hdr->nflog_version);
return;
return 1;
}
dbg_printf("Linktype: DLT_NFLOG\n");
dbg_printf("NFLOG: %s, rid: %u\n", nflog_hdr->nflog_family == 2 ? "IPv4" : "IPv6", ntohs(nflog_hdr->nflog_rid));
Expand All @@ -620,7 +623,7 @@ void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, co
if (size % 4 != 0) size += 4 - size % 4;
if (size < sizeof(nflog_tlv_t)) {
LogInfo("Packet: %u: NFLOG: tlv size error: %u", pkg_cnt, size);
return;
return 1;
}

if (tlv->tlv_type == NFULA_PAYLOAD) {
Expand All @@ -636,7 +639,7 @@ void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, co
pflog_hdr_t *pfloghdr = (pflog_hdr_t *)dataptr;
if (hdr->caplen < PFLOG_HDRLEN) {
LogInfo("Packet: %u: PFLOG: not enough data", pkg_cnt);
return;
return 1;
}
pflog = malloc(sizeof(pflog_hdr_t));
memcpy(pflog, pfloghdr, sizeof(pflog_hdr_t));
Expand All @@ -647,20 +650,20 @@ void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, co
} break;
default:
LogInfo("Packet: %u: unsupported link type: 0x%x, packet: %u", pkg_cnt, linktype);
return;
return 1;
}

REDO_LINK_PROTO:
if (dataptr >= eodata) {
packetParam->proc_stat.short_snap++;
dbg_printf("Short packet: %u, Check line: %u", hdr->caplen, __LINE__);
return;
return 1;
}
dbg_printf("Next protocol: 0x%x\n", protocol);
int IEEE802 = protocol <= 1500;
if (IEEE802) {
packetParam->proc_stat.skipped++;
return;
return 1;
}
switch (protocol) {
case ETHERTYPE_IP: // IPv4
Expand Down Expand Up @@ -790,6 +793,23 @@ void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, co
goto END_FUNC;
}

// IPv6 duplicate check
// dublicate check starts from the IP header over the rest of the packet
// vlan, mpls and layer 1 headers are ignored
if (unlikely(packetParam->doDedup && redoLink == 0)) {
// check for de-dup
uint32_t hopLimit = ip6->ip6_ctlun.ip6_un1.ip6_un1_hlim;
ip6->ip6_ctlun.ip6_un1.ip6_un1_hlim = 0;
uint16_t len = ntohs(ip6->ip6_ctlun.ip6_un1.ip6_un1_plen);
if (is_duplicate((const uint8_t *)ip, len + 40)) {
packetParam->proc_stat.duplicates++;
return 0;
}
ip6->ip6_ctlun.ip6_un1.ip6_un1_hlim = hopLimit;
// prevent recursive dedub checks with IP in IP packets
redoLink++;
}

// ipv6 Extension headers not processed
IPproto = ip6->ip6_ctlun.ip6_un1.ip6_un1_nxt;
dbg_printf("Packet IPv6, SRC %s, DST %s\n", inet_ntop(AF_INET6, &ip6->ip6_src, s1, sizeof(s1)),
Expand Down Expand Up @@ -826,6 +846,25 @@ void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, co
goto END_FUNC;
}

// IPv4 duplicate check
// dublicate check starts from the IP header over the rest of the packet
// vlan, mpls and layer 1 headers are ignored
if (unlikely(packetParam->doDedup && redoLink == 0)) {
// check for de-dup
uint32_t ttl = ip->ip_ttl;
uint32_t sum = ip->ip_sum;
ip->ip_ttl = 0;
ip->ip_sum = 0;
if (is_duplicate((const uint8_t *)ip, ntohs(ip->ip_len))) {
packetParam->proc_stat.duplicates++;
return 0;
}
ip->ip_ttl = ttl;
ip->ip_sum = sum;
// prevent recursive dedub checks with IP in IP packets
redoLink++;
}

IPproto = ip->ip_p;
dbg_printf("Packet IPv4 SRC %s, DST %s\n", inet_ntop(AF_INET, &ip->ip_src, s1, sizeof(s1)), inet_ntop(AF_INET, &ip->ip_dst, s2, sizeof(s2)));

Expand Down Expand Up @@ -1137,4 +1176,5 @@ void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, co
lastRun = hdr->ts.tv_sec;
}

return 1;
} // End of ProcessPacket
2 changes: 1 addition & 1 deletion src/nfpcapd/pcaproc.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,6 @@ void RotateFile(pcapfile_t *pcapfile, time_t t_CloseRename, int live);

void ProcessFlowNode(FlowSource_t *fs, struct FlowNode *node);

void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, const u_char *data);
int ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, const u_char *data);

#endif // _PCAPROC_H

0 comments on commit 65ad2f9

Please sign in to comment.