import time, os, subprocess
INTEL_LOG = "/tmp/zeek_block_intel.log"
RULES_FILE = "/usr/local/var/lib/suricata/rules/dynamic_block.rules"
SEEN_FILE = "/usr/local/var/lib/suricata/rules/dynamic_block_seen.txt"
SID_START = 2000000
def make_rule(itype, value, sid):
# HTTP Host
if itype == "HTTP_HOST":
return (
f'drop http any any -> any any ('
f'msg:"ZEEK BLOCK HTTP HOST {value}"; '
f'http.host; content:"{value}"; nocase; '
f'sid:{sid}; rev:1;)\n'
)
# User-Agent
elif itype == "HTTP_USERAGENT":
return (
f'drop http any any -> any any ('
f'msg:"ZEEK BLOCK UA {value}"; '
f'http.user_agent; content:"{value}"; nocase; '
f'sid:{sid}; rev:1;)\n'
)
# FTP port 21
elif itype == "FTP_PORT":
return (
f'drop tcp any any -> any 21 ('
f'msg:"ZEEK BLOCK FTP port 21"; '
f'flow:to_server; '
f'sid:{sid}; rev:1;)\n'
)
return None
seen = set()
if os.path.exists(SEEN_FILE):
with open(SEEN_FILE) as f:
seen = set(line.strip() for line in f if line.strip())
sid = SID_START + len(seen)
print("[*] Zeek → Suricata bridge started")
while True:
if os.path.exists(INTEL_LOG):
with open(INTEL_LOG) as f:
lines = f.readlines()
# очищаем лог после чтения (важно!)
open(INTEL_LOG, "w").close()
for line in lines:
indicator = line.strip()
if not indicator or indicator in seen:
continue
if ":" not in indicator:
continue
itype, value = indicator.split(":", 1)
rule = make_rule(itype, value, sid)
if not rule:
print(f"[!] Unknown type: {itype}")
continue
# пишем rule
with open(RULES_FILE, "a") as rf:
rf.write(rule)
# сохраняем seen
with open(SEEN_FILE, "a") as sf:
sf.write(indicator + "\n")
seen.add(indicator)
sid += 1
# reload Suricata (HUP)
subprocess.run(["killall", "-HUP", "suricata"])
print(f"[+] Rule added: {itype}:{value}")
print("[+] Suricata reload: HUP sent")
time.sleep(2)