-
Notifications
You must be signed in to change notification settings - Fork 0
/
example-agent.py
121 lines (86 loc) · 3.77 KB
/
example-agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
Rayed Alrashed 2015-06-14
AgentX sub agent that implement some parts of NET-SNMP-EXAMPLES-MIB:
<http://www.net-snmp.org/docs/mibs/NET-SNMP-EXAMPLES-MIB.txt>
snmpwalk -v 2c -c public localhost NET-SNMP-EXAMPLES-MIB::netSnmpExampleScalars
snmptable -v 2c -c public -Ci localhost NET-SNMP-EXAMPLES-MIB::netSnmpIETFWGTable
Try snmpset:
snmpset -v 2c -c public localhost NET-SNMP-EXAMPLES-MIB::netSnmpExampleInteger.0 i 10
snmpset -v 2c -c public localhost NET-SNMP-EXAMPLES-MIB::netSnmpExampleInteger.0 i 200
snmpset -v 2c -c public localhost NET-SNMP-EXAMPLES-MIB::netSnmpExampleString.0 s "Test"
'''
import random
import pyagentx2
def str_to_oid(data):
length = len(data)
oid_int = [str(ord(i)) for i in data]
return str(length) + '.' + '.'.join(oid_int)
class NetSnmpTestMibScalar(pyagentx2.Updater):
def update(self, mib):
# mib.set_INTEGER('1.3.6.1.4.1.8072.2.1.1.0', 1000)
# mib.set_OCTETSTRING('1.3.6.1.4.1.8072.2.1.3.0', 'String for NET-SNMP-EXAMPLES-MIB')
mib.set_OBJECTIDENTIFIER('1.3.6.1.4.1.8072.2.1.4.0', '1.2')
mib.set_IPADDRESS('1.3.6.1.4.1.8072.2.1.5.0', '127.0.0.1')
mib.set_COUNTER32('1.3.6.1.4.1.8072.2.1.6.0', 2000)
mib.set_GAUGE32('1.3.6.1.4.1.8072.2.1.7.0', 2000)
mib.set_TIMETICKS('1.3.6.1.4.1.8072.2.1.8.0', 1000000)
mib.set_OPAQUE('1.3.6.1.4.1.8072.2.1.9.0', 'Test')
mib.set_COUNTER64('1.3.6.1.4.1.8072.2.1.10.0', 2000)
class NetSnmpTestMibTable(pyagentx2.Updater):
def update(self, mib):
# implement netSnmpIETFWGTable from NET-SNMP-EXAMPLES-MIB.txt
# Number of entries in table is random to show that MIB is reset
# on every update
for i in range(random.randint(3, 5)):
idx = str_to_oid('group%s' % (i+1))
mib.set_OCTETSTRING('1.3.6.1.4.1.8072.2.2.1.1.2.' + idx, 'member 1')
mib.set_OCTETSTRING('1.3.6.1.4.1.8072.2.2.1.1.3.' + idx, 'member 2')
class NetSnmpIntegerSet(pyagentx2.SetHandler):
def test(self, oid, type, value, mib):
if int(value) > 100:
raise pyagentx2.WrongValueException()
def commit(self, oid, type, value, mib):
print("COMMIT CALLED: %s = %s" % (oid, value))
mib.set(oid, type, value)
class NetSnmpIpSet(pyagentx2.SetHandler):
def test(self, oid, type, value, mib):
pass
# if int(value) > 100:
# raise pyagentx2.SetHandlerError()
def commit(self, oid, type, value, mib):
print("COMMIT CALLED: %s = %s" % (oid, value))
mib.set(oid, type, value)
class NetSnmpOctectStringSet(pyagentx2.SetHandler):
def test(self, oid, type, value, mib):
pass
# if int(value) > 100:
# raise pyagentx2.SetHandlerError()
def commit(self, oid, type, value, mib):
print("COMMIT CALLED: %s = %s" % (oid, value))
mib.set(oid, type, value)
class MyAgent(pyagentx2.Agent):
def setup(self):
# Register OID to the master
self.register('1.3.6.1.4.1.8072.2.1')
self.register('1.3.6.1.4.1.8072.2.2')
# Register Set handlers that would take care of SET operations
self.register_set('1.3.6.1.4.1.8072.2.1.1.0', NetSnmpIntegerSet)
self.register_set('1.3.6.1.4.1.8072.2.1.2.0', NetSnmpIpSet)
self.register_set('1.3.6.1.4.1.8072.2.1.3.0', NetSnmpOctectStringSet)
# Register updater
self.register_updater('1.3.6.1.4.1.8072.2.1', NetSnmpTestMibScalar)
self.register_updater('1.3.6.1.4.1.8072.2.2', NetSnmpTestMibTable)
def main():
pyagentx2.setup_logging(debug=False)
a = MyAgent()
try:
a.start()
except Exception as e:
print("Unhandled exception:", e)
a.stop()
except KeyboardInterrupt:
a.stop()
if __name__=="__main__":
main()