import json import os import sqlite3 import time from datetime import datetime from ipaddress import ip_address import geoip2.database DB_PATH = "/host_logs/synolog/.SYNOCONNDB" GEOIP_DB = "/config/GeoLite2-Country.mmdb" STATE_FILE = "/data/state.json" OUTPUT_LOG = "/logs/watcher.log" POLL_SECONDS = 60 KNOWN_USERS = { "Michel", "Samsung_A13", "Aurélie", "Gilberte", "Chloé", } WHITELIST_IPS = { "192.168.1.254", "192.168.1.90", "192.168.1.80", } WATCH_CHAT_ONLY = True def load_state(): if os.path.exists(STATE_FILE): try: with open(STATE_FILE, "r", encoding="utf-8") as f: return json.load(f) except Exception: return {} return {} def save_state(state): with open(STATE_FILE, "w", encoding="utf-8") as f: json.dump(state, f, indent=2) def log_event(message): ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") line = f"[{ts}] {message}\n" with open(OUTPUT_LOG, "a", encoding="utf-8") as f: f.write(line) print(line, end="") def is_private_ip(value: str) -> bool: try: ip = ip_address(value) return ip.is_private or ip.is_loopback except Exception: return False def get_country_code(reader, ip: str): if not ip or is_private_ip(ip): return "PRIVATE" try: response = reader.country(ip) return response.country.iso_code or "UNKNOWN" except Exception: return "UNKNOWN" def fetch_new_rows(last_id: int): if not os.path.exists(DB_PATH): log_event(f"ERREUR base introuvable: {DB_PATH}") return [] conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row query = """ SELECT id, time, level, username, msg, user, uid, ip, protocol, token, useragent FROM logs WHERE id > ? ORDER BY id ASC """ rows = conn.execute(query, (last_id,)).fetchall() conn.close() return rows def classify_event(row, reader): username = (row["username"] or "").strip() ip = (row["ip"] or "").strip() protocol = (row["protocol"] or "").strip() msg = (row["msg"] or "").strip() useragent = (row["useragent"] or "").strip() if WATCH_CHAT_ONLY and "Synology_Chat" not in useragent: return None reasons = [] level = "INFO" country = get_country_code(reader, ip) if username not in KNOWN_USERS: level = "ALERTE" reasons.append("user_inconnu") if ip in WHITELIST_IPS: reasons.append("ip_whitelist") country = "PRIVATE" if is_private_ip(ip): reasons.append("ip_privee") else: reasons.append("ip_publique") if country == "FR": reasons.append("pays_fr") if level == "INFO": level = "INFO" elif country == "UNKNOWN": reasons.append("pays_inconnu") if level != "ALERTE": level = "SURVEILLANCE" else: reasons.append(f"pays_{country}") if level != "ALERTE": level = "SUSPECT" if "failed" in msg.lower(): reasons.append("echec_connexion") level = "ALERTE" return { "id": row["id"], "dt": datetime.fromtimestamp(row["time"]).strftime("%Y-%m-%d %H:%M:%S"), "username": username, "ip": ip, "protocol": protocol, "country": country, "level": level, "reasons": ",".join(reasons), "msg": msg, } def main(): if not os.path.exists(GEOIP_DB): raise FileNotFoundError(f"Base GeoIP introuvable: {GEOIP_DB}") log_event("Démarrage ipwatcher sqlite + GeoIP") state = load_state() last_id = state.get("last_id", 0) with geoip2.database.Reader(GEOIP_DB) as reader: while True: try: rows = fetch_new_rows(last_id) for row in rows: last_id = row["id"] event = classify_event(row, reader) if event is None: continue log_event( f"{event['level']} " f"id={event['id']} " f"dt={event['dt']} " f"user={event['username']} " f"ip={event['ip']} " f"country={event['country']} " f"protocol={event['protocol']} " f"reasons={event['reasons']} " f"msg={event['msg']}" ) state["last_id"] = last_id save_state(state) except Exception as e: log_event(f"ERREUR traitement sqlite: {e}") time.sleep(POLL_SECONDS) if __name__ == "__main__": main()