#!/usr/bin/env python3 import argparse import ipaddress import shutil import subprocess import sys from collections import defaultdict import geoip2.database from rich.console import Console from rich.table import Table from rich import box console = Console() def is_public_ip(ip: str) -> bool: try: addr = ipaddress.ip_address(ip) return not ( addr.is_private or addr.is_loopback or addr.is_multicast or addr.is_link_local or addr.is_reserved ) except ValueError: return False def geo_lookup(reader, ip: str): try: r = reader.country(ip) iso = r.country.iso_code or "??" name = r.country.name or "Inconnu" return iso, name except Exception: return "??", "Inconnu" def run_tshark_capture(interface: str, duration: int, output_pcap: str): if shutil.which("tshark") is None: console.print("[red]tshark n'est pas installé ou pas dans le PATH.[/red]") sys.exit(1) cmd = [ "tshark", "-i", interface, "-a", f"duration:{duration}", "-w", output_pcap, ] console.print( f"[cyan]Capture en cours sur {interface} pendant {duration} s...[/cyan]\n" "[yellow]Envoie maintenant tes messages Telegram.[/yellow]" ) result = subprocess.run(cmd) if result.returncode != 0: console.print("[red]La capture TShark a échoué.[/red]") sys.exit(result.returncode) def extract_remote_ips(pcap_file: str, local_ip: str = None): fields = ["-e", "ip.src", "-e", "ip.dst", "-e", "tcp.dstport", "-e", "udp.dstport"] cmd = [ "tshark", "-r", pcap_file, "-T", "fields", "-E", "separator=|", "-E", "occurrence=f", *fields, ] proc = subprocess.run(cmd, capture_output=True, text=True) if proc.returncode != 0: console.print("[red]Impossible de relire le fichier pcapng.[/red]") console.print(proc.stderr) sys.exit(proc.returncode) stats = defaultdict(lambda: {"count": 0, "ports": set(), "direction": set()}) for line in proc.stdout.splitlines(): parts = line.split("|") if len(parts) < 4: continue src = parts[0].strip() dst = parts[1].strip() tcp_port = parts[2].strip() udp_port = parts[3].strip() if not src or not dst: continue if local_ip: if src == local_ip and is_public_ip(dst): remote = dst direction = "sortant" elif dst == local_ip and is_public_ip(src): remote = src direction = "entrant" else: continue else: candidates = [] if is_public_ip(src): candidates.append((src, "entrant")) if is_public_ip(dst): candidates.append((dst, "sortant")) for remote, direction in candidates: stats[remote]["count"] += 1 stats[remote]["direction"].add(direction) if tcp_port: stats[remote]["ports"].add(f"tcp/{tcp_port}") if udp_port: stats[remote]["ports"].add(f"udp/{udp_port}") continue stats[remote]["count"] += 1 stats[remote]["direction"].add(direction) if tcp_port: stats[remote]["ports"].add(f"tcp/{tcp_port}") if udp_port: stats[remote]["ports"].add(f"udp/{udp_port}") return stats def render_table(stats, mmdb_path: str): reader = geoip2.database.Reader(mmdb_path) table = Table( title="IP observées pendant l'activité Telegram", box=box.SIMPLE_HEAVY, show_lines=False ) table.add_column("IP", style="bold") table.add_column("Pays") table.add_column("Code") table.add_column("Direction") table.add_column("Ports") table.add_column("Paquets", justify="right") table.add_column("Alerte") rows = [] for ip, data in stats.items(): iso, country = geo_lookup(reader, ip) non_fr = iso != "FR" rows.append(( non_fr, ip, country, iso, ",".join(sorted(data["direction"])), ", ".join(sorted(data["ports"]))[:80], str(data["count"]), "NON FR" if non_fr else "" )) rows.sort(key=lambda r: (not r[0], r[1])) non_fr_count = 0 for non_fr, ip, country, iso, direction, ports, count, alert in rows: if non_fr: non_fr_count += 1 table.add_row( f"[red]{ip}[/red]", f"[red]{country}[/red]", f"[red]{iso}[/red]", f"[red]{direction}[/red]", f"[red]{ports}[/red]", f"[red]{count}[/red]", f"[bold red]{alert}[/bold red]", ) else: table.add_row( ip, country, iso, direction, ports, count, "" ) console.print(table) console.print( f"\n[bold]Total IP publiques observées :[/bold] {len(rows)}\n" f"[bold red]IP non françaises :[/bold red] {non_fr_count}" ) def main(): parser = argparse.ArgumentParser( description="Surveille les IP vues pendant une activité Telegram et met en évidence les IP non françaises." ) parser.add_argument("-i", "--interface", default="ens3", help="Interface réseau, ex: ens3") parser.add_argument("-d", "--duration", type=int, default=20, help="Durée de capture en secondes") parser.add_argument("-o", "--output", default="telegram-test.pcapng", help="Fichier de capture") parser.add_argument("--local-ip", help="IP locale à filtrer, ex: 192.168.1.112") parser.add_argument( "--mmdb", default="/usr/share/GeoIP/GeoLite2-Country.mmdb", help="Chemin vers GeoLite2-Country.mmdb" ) parser.add_argument( "--read-only", action="store_true", help="N'effectue pas de capture, relit seulement le fichier -o" ) args = parser.parse_args() if not args.read_only: run_tshark_capture(args.interface, args.duration, args.output) stats = extract_remote_ips(args.output, args.local_ip) if not stats: console.print("[yellow]Aucune IP publique trouvée dans cette capture.[/yellow]") sys.exit(0) render_table(stats, args.mmdb) if __name__ == "__main__": main()