пример расширенного ARP-сервера на Python
Добавлено: 06 дек 2025, 21:54
Установка библиотек: scapy, pysnmp
Файл конфигурации (config.json)
Основной скрипт (arp_server.py)
Запуск
Что делает этот скрипт:
Автоматическое обновление таблицы:
Периодически сканирует сеть (nmap) и узнает активные устройства.
Обновляет IP-MAC таблицу.
Работает в отдельном потоке, не мешая основной работе.
Защита от фальшивых ARP:
Проверяет MAC-адрес по IP при получении ARP-запросов.
В случае несоответствия — логирует и отправляет SNMP-trap.
Можно расширить, добавив блокировки или уведомления.
Интеграция с оборудованием:
Отправка SNMP-трейпов при обнаружении подозрительных ARP.
Можно расширить для управления оборудованием через SNMP, REST API.
Итог
Этот скрипт — база для надежного ARP-сервера с автоматическим управлением и защитой. Его можно расширять под конкретные задачи, добавляя, например, управление списками доверенных устройств, интеграцию с SIEM, автоматическую блокировку и т.п.
Код: Выделить всё
pip install scapy pysnmpКод: Выделить всё
{
"interfaces": ["eth0"],
"arp_table": {
"192.168.1.100": "00:11:22:33:44:55",
"192.168.1.101": "00:11:22:33:44:66"
},
"network_scan": {
"enabled": true,
"subnet": "192.168.1.0/24"
},
"snmp": {
"enabled": true,
"host": "192.168.1.1",
"community": "public"
}
}Код: Выделить всё
import json
import logging
import threading
import time
from scapy.all import *
from pysnmp.hlapi import *
# Настройка логирования
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Загрузка конфигурации
with open('config.json') as f:
config = json.load(f)
interfaces = config.get('interfaces', ['eth0'])
arp_table = dict(config.get('arp_table', {}))
network_scan_config = config.get('network_scan', {})
snmp_config = config.get('snmp', {})
# Обратные словари для проверки MAC по IP
ip_to_mac = dict(arp_table)
# Функция для сканирования сети и обновления таблицы
def scan_network(subnet):
print(f"Scanning network {subnet} for active hosts...")
hosts = []
# Используем nmap для поиска активных хостов
result = subprocess.run(["nmap", "-sn", subnet], capture_output=True, text=True)
for line in result.stdout.splitlines():
if "Nmap scan report for" in line:
ip = line.split()[-1]
hosts.append(ip)
return hosts
# Функция для автоматического обновления таблицы устройств
def auto_update_table():
if network_scan_config.get('enabled', False):
subnet = network_scan_config.get('subnet', '')
while True:
hosts = scan_network(subnet)
for ip in hosts:
# Получаем MAC-адрес через ARP
ans, unans = sr(ARP(pdst=ip), timeout=2, verbose=False)
for sent, received in ans:
ip_addr = received.psrc
mac_addr = received.hwsrc
# Обновляем таблицу
arp_table[ip_addr] = mac_addr
ip_to_mac[ip_addr] = mac_addr
logging.info(f"Обновлена запись: {ip_addr} - {mac_addr}")
time.sleep(300) # обновлять каждые 5 минут
# Функция для отправки SNMP Trap (пример)
def send_snmp_trap(message):
if not snmp_config.get('enabled', False):
return
target = snmp_config.get('host')
community = snmp_config.get('community')
errorIndication, errorStatus, errorIndex, varBinds = next(
sendNotification(
SnmpEngine(),
CommunityData(community),
UdpTransportTarget((target, 162)),
ContextData(),
'trap',
NotificationType(
ObjectIdentity('1.3.6.1.6.3.1.1.5.1') # пример OID
)
)
)
if errorIndication:
logging.error(f"SNMP Error: {errorIndication}")
# Проверка фальшивых ARP-ответов
def validate_arp(pkt):
if ARP in pkt:
src_ip = pkt[ARP].psrc
src_mac = pkt[Ether].src
# Проверяем, есть ли запись в таблице
expected_mac = ip_to_mac.get(src_ip)
if expected_mac:
if src_mac != expected_mac:
# Подозрительный ARP
logging.warning(f"Подозрительный ARP: IP {src_ip} ожидает MAC {expected_mac}, получен {src_mac}")
# Можно предпринять меры: блокировать, логировать, оповещать
send_snmp_trap(f"Подозрительный ARP: IP {src_ip} MAC {src_mac}")
return False
else:
# Новое устройство, добавить в таблицу
arp_table[src_ip] = src_mac
ip_to_mac[src_ip] = src_mac
logging.info(f"Добавлено новое устройство: {src_ip} - {src_mac}")
return True
# Обработчик ARP-пакетов
def handle_arp(pkt):
if ARP in pkt and pkt[ARP].op == 1: # ARP-запрос
src_ip = pkt[ARP].psrc
dst_ip = pkt[ARP].pdst
src_mac = pkt[Ether].src
logging.info(f"ARP запрос: {src_ip} запрашивает {dst_ip} ({src_mac})")
# Проверка валидности ARP
if not validate_arp(pkt):
logging.info("Фальшивый ARP-ответ или подозрительный запрос, игнорируем")
return
# Проверка, есть ли ответ в таблице
if dst_ip in ip_to_mac:
target_mac = ip_to_mac[dst_ip]
ether = Ether(dst=pkt[Ether].src, src=target_mac)
arp_reply = ARP(op=2, hwsrc=target_mac, psrc=dst_ip,
hwdst=pkt[Ether].src, pdst=src_ip)
reply_pkt = ether / arp_reply
sendp(reply_pkt, iface=pkt.sniffed_on, verbose=False)
logging.info(f"Отправлен ARP-ответ: {dst_ip} -> {src_ip}")
else:
logging.warning(f"Нет записи для IP {dst_ip} в таблице")
# Основная функция
def main():
# Запуск автоматического обновления таблицы в отдельном потоке
updater_thread = threading.Thread(target=auto_update_table, daemon=True)
updater_thread.start()
print("Расширенный ARP-сервер запущен")
for iface in interfaces:
print(f"Слушание интерфейса: {iface}")
sniff(filter="arp", prn=handle_arp, iface=iface)
if __name__ == "__main__":
main()Код: Выделить всё
sudo python3 extended_arp_server.pyЧто делает этот скрипт:
Автоматическое обновление таблицы:
Периодически сканирует сеть (nmap) и узнает активные устройства.
Обновляет IP-MAC таблицу.
Работает в отдельном потоке, не мешая основной работе.
Защита от фальшивых ARP:
Проверяет MAC-адрес по IP при получении ARP-запросов.
В случае несоответствия — логирует и отправляет SNMP-trap.
Можно расширить, добавив блокировки или уведомления.
Интеграция с оборудованием:
Отправка SNMP-трейпов при обнаружении подозрительных ARP.
Можно расширить для управления оборудованием через SNMP, REST API.
Итог
Этот скрипт — база для надежного ARP-сервера с автоматическим управлением и защитой. Его можно расширять под конкретные задачи, добавляя, например, управление списками доверенных устройств, интеграцию с SIEM, автоматическую блокировку и т.п.