Files
dns-inspector/parser/parse_dns_logs.py

60 lines
2.2 KiB
Python

#!/usr/bin/env python3
import re
import sqlite3
import os
from datetime import datetime
LOG_DIR = "../logs"
DB_PATH = "../db/dns.sqlite"
# Функция для преобразования DNS-имени
def decode_dns_name(raw):
parts = re.findall(r'\((\d+)\)([a-zA-Z0-9\-]+)', raw)
return '.'.join([label for _, label in parts])
# Регулярка для строки запроса
query_pattern = re.compile(r'^(\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2}).*UDP Rcv\s+(\d+\.\d+\.\d+\.\d+).*Q\s+\[[^\]]*\]\s+(\w+)\s+((?:\(\d+\)[a-zA-Z0-9\-]+)+\(0\))')
# Подключение к БД
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
src_ip TEXT,
qtype TEXT,
domain TEXT,
UNIQUE(timestamp, src_ip, qtype, domain)
)
""")
# Обработка логов
for filename in os.listdir(LOG_DIR):
if filename.startswith("dns") and filename.endswith(".log"):
filepath = os.path.join(LOG_DIR, filename)
print(f"🔍 Обрабатываю: {filepath}")
with open(filepath, encoding="utf-8", errors="ignore") as f:
for line in f:
match = query_pattern.search(line)
if match:
try:
ts = datetime.strptime(match.group(1), "%d.%m.%Y %H:%M:%S").isoformat()
src_ip = match.group(2)
qtype = match.group(3)
raw_domain = match.group(4)
domain = decode_dns_name(raw_domain)
cursor.execute("""
INSERT OR IGNORE INTO logs (timestamp, src_ip, qtype, domain)
VALUES (?, ?, ?, ?)
""", (ts, src_ip, qtype, domain))
except Exception as e:
print(f"⚠️ Ошибка: {e}\n→ Строка: {line.strip()}")
else:
# Логируем пропущенные строки
with open("parser_errors.log", "a") as log:
log.write(line)
conn.commit()
conn.close()
print("✅ Парсинг завершён.")