SNMP Checks¶
Version added: Tactical RMM v0.19.0 / Agent v2.8.0
Video Walkthru¶
SNMP monitoring can now be done using the portable python distribution on windows agents.
Here is a sample script (written by Claude) that can be used to query and monitor a printer. It takes the printer's IP address as the first argument.
#!/usr/bin/python3
import socket
import random
import sys
SNMP_VERSION = 0 # 0 = SNMPv1 (mpModel=0), 1 = SNMPv2c
COMMUNITY = "public"
PORT = 161
TIMEOUT = 2.0 # seconds per attempt
RETRIES = 1
def _enc_len(n):
if n < 0x80:
return bytes([n])
out = bytearray()
while n:
out.insert(0, n & 0xFF)
n >>= 8
return bytes([0x80 | len(out)]) + bytes(out)
def _tlv(tag, value):
return bytes([tag]) + _enc_len(len(value)) + value
def _enc_uint(value): # non-negative INTEGER
if value == 0:
return _tlv(0x02, b"\x00")
out = bytearray()
v = value
while v:
out.insert(0, v & 0xFF)
v >>= 8
if out[0] & 0x80:
out.insert(0, 0x00)
return _tlv(0x02, bytes(out))
def _enc_b128(n):
if n == 0:
return b"\x00"
out = bytearray()
while n:
out.insert(0, n & 0x7F)
n >>= 7
for i in range(len(out) - 1):
out[i] |= 0x80
return bytes(out)
def _enc_oid(oid):
p = [int(x) for x in oid.strip().strip(".").split(".")]
body = _enc_b128(40 * p[0] + p[1])
for x in p[2:]:
body += _enc_b128(x)
return _tlv(0x06, body)
def _enc_octstr(s):
if isinstance(s, str):
s = s.encode("latin-1")
return _tlv(0x04, s)
_NULL = _tlv(0x05, b"")
def _build_get(community, oid, request_id, version):
varbind = _tlv(0x30, _enc_oid(oid) + _NULL)
varbind_list = _tlv(0x30, varbind)
pdu = _tlv(0xA0, _enc_uint(request_id) + _enc_uint(0) + _enc_uint(0) + varbind_list)
return _tlv(0x30, _enc_uint(version) + _enc_octstr(community) + pdu)
def _read_len(d, i):
b = d[i]
i += 1
if b < 0x80:
return b, i
n = 0
for _ in range(b & 0x7F):
n = (n << 8) | d[i]
i += 1
return n, i
def _tlv_read(d, i):
tag = d[i]
i += 1
length, i = _read_len(d, i)
return tag, d[i:i + length], i + length
def _dec_oid(b):
if not b:
return ""
first = b[0]
a = min(first // 40, 2)
vals = [a, first - 40 * a]
n = 0
for byte in b[1:]:
n = (n << 7) | (byte & 0x7F)
if not (byte & 0x80):
vals.append(n)
n = 0
return ".".join(map(str, vals))
def _dec_value(tag, b):
if tag == 0x02: # INTEGER
return int.from_bytes(b, "big", signed=True) if b else 0
if tag in (0x41, 0x42, 0x43, 0x46): # Counter32/Gauge32/TimeTicks/Counter64
return int.from_bytes(b, "big") if b else 0
if tag == 0x40: # IpAddress
return ".".join(map(str, b))
if tag == 0x06: # OID
return _dec_oid(b)
if tag == 0x05: # NULL
return None
if tag == 0x80:
return "noSuchObject"
if tag == 0x81:
return "noSuchInstance"
if tag == 0x82:
return "endOfMibView"
if tag == 0x04: # OCTET STRING
if any(c < 9 or 13 < c < 32 for c in b): # binary -> show hex
return "0x" + b.hex()
try:
return b.decode("utf-8").rstrip("\x00")
except UnicodeDecodeError:
return b.decode("latin-1").rstrip("\x00")
return "0x" + b.hex()
_ERRORS = {0: "noError", 1: "tooBig", 2: "noSuchName",
3: "badValue", 4: "readOnly", 5: "genErr"}
def _parse_response(data):
_, msg, _ = _tlv_read(data, 0)
i = 0
_, _, i = _tlv_read(msg, i) # version
_, _, i = _tlv_read(msg, i) # community
_, pdu, _ = _tlv_read(msg, i) # PDU
j = 0
_, rid_b, j = _tlv_read(pdu, j)
_, est_b, j = _tlv_read(pdu, j)
_, eix_b, j = _tlv_read(pdu, j)
_, vbl, j = _tlv_read(pdu, j)
request_id = int.from_bytes(rid_b, "big")
error_status = int.from_bytes(est_b, "big", signed=True)
error_index = int.from_bytes(eix_b, "big", signed=True)
binds = []
k = 0
while k < len(vbl):
_, vb, k = _tlv_read(vbl, k)
m = 0
_, oid_b, m = _tlv_read(vb, m)
vtag, val_b, m = _tlv_read(vb, m)
binds.append((_dec_oid(oid_b), _dec_value(vtag, val_b)))
return request_id, error_status, error_index, binds
def snmp_get(ip, oid, community=COMMUNITY, version=SNMP_VERSION,
port=PORT, timeout=TIMEOUT, retries=RETRIES):
"""Return (error_status, error_index, [(oid, value), ...]) for one GET."""
request_id = random.randint(1, 0x7FFFFFFF)
packet = _build_get(community, oid, request_id, version)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(timeout)
try:
for attempt in range(retries + 1):
sock.sendto(packet, (ip, port))
try:
while True:
data, _ = sock.recvfrom(65535)
rid, est, eix, binds = _parse_response(data)
if rid == request_id: # ignore stray/late packets
return est, eix, binds
except socket.timeout:
if attempt == retries:
raise
finally:
sock.close()
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Missing required argument: snmp device IP address")
sys.exit(1)
printer_ip = sys.argv[1]
oids = {
"Printer Model": "1.3.6.1.2.1.1.1.0",
"Total Page Count": "1.3.6.1.2.1.43.10.2.1.4.1.1",
"Toner Level Black": "1.3.6.1.2.1.43.11.1.1.9.1.1",
"Toner Level Cyan": "1.3.6.1.2.1.43.11.1.1.9.1.2",
"Toner Level Magenta": "1.3.6.1.2.1.43.11.1.1.9.1.3",
"Toner Level Yellow": "1.3.6.1.2.1.43.11.1.1.9.1.4",
"Device Status": "1.3.6.1.2.1.25.3.2.1.5.1",
"Serial Number": "1.3.6.1.2.1.43.5.1.1.17.1",
}
for name, oid in oids.items():
try:
error_status, error_index, binds = snmp_get(printer_ip, oid)
except socket.timeout:
print(f"Error: no response from {printer_ip} (timeout)")
continue
except OSError as exc:
print(f"Error: {exc}")
continue
if error_status:
print(f"Error: {_ERRORS.get(error_status, error_status)} at index {error_index}")
else:
for _, value in binds:
print(f"{name}: {value}")