Files
Fichiers_perso/telegram_watch.py
2026-05-04 11:16:20 +02:00

222 lines
6.4 KiB
Python

#!/usr/bin/env python3
import argparse
import ipaddress
import shutil
import subprocess
import sys
from collections import defaultdict
import geoip2.database
from rich.console import Console
from rich.table import Table
from rich import box
console = Console()
def is_public_ip(ip: str) -> bool:
try:
addr = ipaddress.ip_address(ip)
return not (
addr.is_private
or addr.is_loopback
or addr.is_multicast
or addr.is_link_local
or addr.is_reserved
)
except ValueError:
return False
def geo_lookup(reader, ip: str):
try:
r = reader.country(ip)
iso = r.country.iso_code or "??"
name = r.country.name or "Inconnu"
return iso, name
except Exception:
return "??", "Inconnu"
def run_tshark_capture(interface: str, duration: int, output_pcap: str):
if shutil.which("tshark") is None:
console.print("[red]tshark n'est pas installé ou pas dans le PATH.[/red]")
sys.exit(1)
cmd = [
"tshark",
"-i", interface,
"-a", f"duration:{duration}",
"-w", output_pcap,
]
console.print(
f"[cyan]Capture en cours sur {interface} pendant {duration} s...[/cyan]\n"
"[yellow]Envoie maintenant tes messages Telegram.[/yellow]"
)
result = subprocess.run(cmd)
if result.returncode != 0:
console.print("[red]La capture TShark a échoué.[/red]")
sys.exit(result.returncode)
def extract_remote_ips(pcap_file: str, local_ip: str = None):
fields = ["-e", "ip.src", "-e", "ip.dst", "-e", "tcp.dstport", "-e", "udp.dstport"]
cmd = [
"tshark",
"-r", pcap_file,
"-T", "fields",
"-E", "separator=|",
"-E", "occurrence=f",
*fields,
]
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
console.print("[red]Impossible de relire le fichier pcapng.[/red]")
console.print(proc.stderr)
sys.exit(proc.returncode)
stats = defaultdict(lambda: {"count": 0, "ports": set(), "direction": set()})
for line in proc.stdout.splitlines():
parts = line.split("|")
if len(parts) < 4:
continue
src = parts[0].strip()
dst = parts[1].strip()
tcp_port = parts[2].strip()
udp_port = parts[3].strip()
if not src or not dst:
continue
if local_ip:
if src == local_ip and is_public_ip(dst):
remote = dst
direction = "sortant"
elif dst == local_ip and is_public_ip(src):
remote = src
direction = "entrant"
else:
continue
else:
candidates = []
if is_public_ip(src):
candidates.append((src, "entrant"))
if is_public_ip(dst):
candidates.append((dst, "sortant"))
for remote, direction in candidates:
stats[remote]["count"] += 1
stats[remote]["direction"].add(direction)
if tcp_port:
stats[remote]["ports"].add(f"tcp/{tcp_port}")
if udp_port:
stats[remote]["ports"].add(f"udp/{udp_port}")
continue
stats[remote]["count"] += 1
stats[remote]["direction"].add(direction)
if tcp_port:
stats[remote]["ports"].add(f"tcp/{tcp_port}")
if udp_port:
stats[remote]["ports"].add(f"udp/{udp_port}")
return stats
def render_table(stats, mmdb_path: str):
reader = geoip2.database.Reader(mmdb_path)
table = Table(
title="IP observées pendant l'activité Telegram",
box=box.SIMPLE_HEAVY,
show_lines=False
)
table.add_column("IP", style="bold")
table.add_column("Pays")
table.add_column("Code")
table.add_column("Direction")
table.add_column("Ports")
table.add_column("Paquets", justify="right")
table.add_column("Alerte")
rows = []
for ip, data in stats.items():
iso, country = geo_lookup(reader, ip)
non_fr = iso != "FR"
rows.append((
non_fr,
ip,
country,
iso,
",".join(sorted(data["direction"])),
", ".join(sorted(data["ports"]))[:80],
str(data["count"]),
"NON FR" if non_fr else ""
))
rows.sort(key=lambda r: (not r[0], r[1]))
non_fr_count = 0
for non_fr, ip, country, iso, direction, ports, count, alert in rows:
if non_fr:
non_fr_count += 1
table.add_row(
f"[red]{ip}[/red]",
f"[red]{country}[/red]",
f"[red]{iso}[/red]",
f"[red]{direction}[/red]",
f"[red]{ports}[/red]",
f"[red]{count}[/red]",
f"[bold red]{alert}[/bold red]",
)
else:
table.add_row(
ip, country, iso, direction, ports, count, ""
)
console.print(table)
console.print(
f"\n[bold]Total IP publiques observées :[/bold] {len(rows)}\n"
f"[bold red]IP non françaises :[/bold red] {non_fr_count}"
)
def main():
parser = argparse.ArgumentParser(
description="Surveille les IP vues pendant une activité Telegram et met en évidence les IP non françaises."
)
parser.add_argument("-i", "--interface", default="ens3", help="Interface réseau, ex: ens3")
parser.add_argument("-d", "--duration", type=int, default=20, help="Durée de capture en secondes")
parser.add_argument("-o", "--output", default="telegram-test.pcapng", help="Fichier de capture")
parser.add_argument("--local-ip", help="IP locale à filtrer, ex: 192.168.1.112")
parser.add_argument(
"--mmdb",
default="/usr/share/GeoIP/GeoLite2-Country.mmdb",
help="Chemin vers GeoLite2-Country.mmdb"
)
parser.add_argument(
"--read-only",
action="store_true",
help="N'effectue pas de capture, relit seulement le fichier -o"
)
args = parser.parse_args()
if not args.read_only:
run_tshark_capture(args.interface, args.duration, args.output)
stats = extract_remote_ips(args.output, args.local_ip)
if not stats:
console.print("[yellow]Aucune IP publique trouvée dans cette capture.[/yellow]")
sys.exit(0)
render_table(stats, args.mmdb)
if __name__ == "__main__":
main()