-
Notifications
You must be signed in to change notification settings - Fork 1
/
mysnmp.py
119 lines (101 loc) · 4.51 KB
/
mysnmp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env python3
from pysnmp.hlapi import *
import datetime
class mqttSNMP():
def __init__(self, host, port):
self.host = host
self.port = port
self.errorIndic = None
self.status = None
self.name = ""
self.value = ""
self.pollError = False
self.data = {}
def get(self, oid, community):
self._clearFields()
self.name = oid
try:
iterator = getCmd(SnmpEngine(),
CommunityData(community),
UdpTransportTarget((self.host, self.port),
timeout = 1, retries=0),
ContextData(), ObjectType(ObjectIdentity(oid)))
errorIndication, errorStatus, errorIndex, varBinds = next(iterator)
if errorIndication:
self.errorIndic = self.value = errorIndication
else:
if errorStatus:
self.status = self.value = "Could Not Issue Get-Next Request"
else:
self.name, self.value = varBinds[0]
except:
self.pollError = True
self.errorIndic = "Error"
self.status = self.value ="Could Not Issue Get-Next Request"
return self._constructResp()
def getNext(self, oid, community):
self._clearFields()
self.name = oid
try:
iterator = nextCmd(SnmpEngine(),
CommunityData(community),
UdpTransportTarget((self.host, self.port),
timeout = 1, retries=0),
ContextData(),
ObjectType(ObjectIdentity(oid)))
errorIndication, errorStatus, errorIndex, varBinds = next(iterator)
if errorIndication:
self.errorIndic = self.value = errorIndication
else:
if errorStatus:
self.status = self.value ="Could Not Issue Get-Next Request"
else:
self.name, self.value = varBinds[0]
except:
self.pollError = True
self.errorIndic = "Error"
self.status = self.value ="Could Not Issue Get-Next Request"
return self._constructResp()
def _constructResp(self):
self.data['Error Indication'] = ("%s" % self.errorIndic)
self.data['Error Status'] = ("%s" % self.status)
self.data['OID'] = ("%s" % self.name)
self.data['Value'] = ("%s" % self.value)
self.data['Time'] = ("%s" % datetime.datetime.now())
self.data['Poll Error'] = self.pollError
self.data['Host'] = self.host
self.data['Port'] = self.port
return self.data
def _clearFields(self):
self.errorIndic = None
self.status = None
self.value = ""
self.data = {}
## def getBulk(self, oid):
## iterator = bulkCmd(SnmpEngine(), CommunityData('public'), UdpTransportTarget((host, 161)), ContextData(), 0, 25, ObjectType(ObjectIdentity(oid)))
## errorIndication, errorStatus, errorIndex, varBinds = next(iterator)
## if errorIndication:
## print(errorIndication)
## else:
## if errorStatus: # SNMP agent errors
## print('%s at %s' % (errorStatus.prettyPrint(), varBinds[int(errorIndex)-1] if errorIndex else '?'))
## else:
## for varBind in varBinds: # SNMP response contents
## print(varBind )
## #print(' = '.join([x.prettyPrint() for x in varBind]))
##
## def walk(self, oid):
## for (errorIndication,errorStatus,errorIndex,varBinds) in nextCmd(SnmpEngine(),
## CommunityData('public'), UdpTransportTarget((host, 161)), ContextData(),
## ObjectType(ObjectIdentity(oid)), lexicographicMode=False):
## if errorIndication:
## print(errorIndication, file=sys.stderr)
## break
## elif errorStatus:
## print('%s at %s' % (errorStatus.prettyPrint(),
## errorIndex and varBinds[int(errorIndex) - 1][0] or '?'),
## file=sys.stderr)
## break
## else:
## for varBind in varBinds:
## print(varBind)