forked from stevelorenz/comnetsemu
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tun_ebpf.py
executable file
·82 lines (64 loc) · 1.95 KB
/
tun_ebpf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
"""
About: Basic test of TUN interfaces and XDP (eBPF)
The compiling and attachment of the XDP program use IOVisor/BCC, please install
it with ../util/install.sh -b before running this example.
"""
from bcc import BPF # pylint: disable=import-error
from comnetsemu.net import Containernet
from mininet.link import TCLink
from mininet.log import info, setLogLevel
from mininet.node import Controller
b = BPF(
text="""
#include <uapi/linux/bpf.h>
int drop_all() {
return XDP_DROP;
}
"""
)
def testTopo():
"Create an empty network and add nodes to it."
net = Containernet(controller=Controller, link=TCLink)
info("*** Adding controller\n")
net.addController("c0")
info("*** Adding hosts\n")
h1 = net.addDockerHost(
"h1",
dimage="dev_test",
ip="10.0.0.1",
docker_args={"cpuset_cpus": "0", "nano_cpus": int(1e8)},
)
h2 = net.addDockerHost(
"h2",
dimage="dev_test",
ip="10.0.0.2",
docker_args={"cpuset_cpus": "0", "nano_cpus": int(1e8)},
)
info("*** Adding switch\n")
s1 = net.addSwitch("s1")
info("*** Creating links\n")
net.addLinkNamedIfce(s1, h1, bw=10, delay="10ms")
net.addLinkNamedIfce(s1, h2, bw=10, delay="10ms")
info("*** Starting network\n")
net.start()
info("*** Create TUN interfaces in h1\n")
h1.cmd("ip tuntap add mode tun tun-test")
h1.cmd("ip link set tun-test up")
print("* Interfaces in the main namespace of h1:")
ret = h1.cmd("ip link")
print(ret)
info("*** Load a XDP(eBPF) program to drop all frames sent to and from h2\n")
fn = b.load_func("drop_all", BPF.XDP)
b.attach_xdp("s1-h2", fn, 0)
net.ping([h1, h2])
info("*** Remove the XDP(eBPF) program\n")
b.remove_xdp("s1-h2", 0)
net.ping([h1, h2])
info("*** Stopping network")
net.stop()
if __name__ == "__main__":
setLogLevel("info")
testTopo()