forked from cloudflare/bpftools
-
Notifications
You must be signed in to change notification settings - Fork 1
/
filter
executable file
·120 lines (92 loc) · 2.98 KB
/
filter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/env python
import getopt
import itertools
import os
import pcappy
import pcappy.types
import sys
import bpftools.utils
def usage():
print r"""
filter [ OPTIONS ] [ pcap file... ]
Read pcap data from stdin or given files, run it through a BPF filter
and write matching packets to stdout as pcap.
Options are:
-h, --help print this message
-b, --bytecode filter with given BPF bytecode
-e, --expr fitler with given BPF expression
-c, --compile FILE compile given bpf file and use as filter
For example to select only IP packets you can use:
filter -e "ip"
filter -b "4,40 0 0 12,21 0 1 2048,6 0 0 65535,6 0 0 0,"
Where the bytecode might have been generated by tcpdump:
sudo tcpdump -p -n -ddd -i eth0 "ip" |tr "\n" ","
""".lstrip()
sys.exit(2)
def bpf_from_bytecode(bytecode):
instructions = bytecode.strip(",").split(',')[1:]
class X: pass
bpf = X()
bpf._bpf = pcappy.types.bpf_program()
bpf._bpf.bf_len = len(instructions)
bpf._bpf.bf_insns = (pcappy.types.bpf_insn * (len(instructions)+10))()
for i, ins_txt in enumerate(instructions):
r = bpf._bpf.bf_insns[i]
r.code, r.jt, r.jf, r.k = map(int, itertools.chain(ins_txt.split(), (0,0,0)))[:4]
return bpf
def bpf_from_expr(expr):
return pcappy.PcapPyBpfProgram(expr, 0, '0.0.0.0',
linktype=pcappy.LINKTYPE_ETHERNET,
snaplen=65536)
def main():
bpf = None
try:
opts, args = getopt.getopt(sys.argv[1:], "he:b:c:",
["help", "expr=", "bytecode=", "compile="])
except getopt.GetoptError as err:
print str(err)
usage()
for o, a in opts:
if o in ("-h", "--help"):
usage()
sys.exit()
elif o in ("-e", "--expr"):
bpf = bpf_from_expr(a)
elif o in ("-b", "--bytecode"):
bpf = bpf_from_bytecode(a)
elif o in ("-c", "--compile"):
x = open(a, 'rb') # check if can open
bpf = bpf_from_bytecode(bpftools.utils.bpf_compile(x.read()))
else:
assert False, "unhandled option"
if not args:
readfds = [sys.stdin]
else:
readfds = [open(fname, 'rb') for fname in args]
dump = pcappy.PcapPyDead(snaplen=65536).dump_open(sys.stdout)
for fd in readfds:
p = pcappy.open_offline(fd)
if bpf:
p.filter = bpf
while True:
try:
r = p.next_ex()
except pcappy.PcapPyException:
break
if r is None:
break
hdr, data = r
dump.write(hdr, data)
sys.stdout.flush()
dump.flush()
# normal exit crashes due to a double free error in pcappy
os._exit(0)
if __name__ == "__main__":
try:
main()
except IOError, e:
if e.errno == 32:
os._exit(-1)
raise
except KeyboardInterrupt:
os._exit(-1)