222 lines
6.4 KiB
Python
222 lines
6.4 KiB
Python
#!/usr/bin/env python3
|
|
import argparse
|
|
import ipaddress
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
from collections import defaultdict
|
|
|
|
import geoip2.database
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
from rich import box
|
|
|
|
console = Console()
|
|
|
|
|
|
def is_public_ip(ip: str) -> bool:
|
|
try:
|
|
addr = ipaddress.ip_address(ip)
|
|
return not (
|
|
addr.is_private
|
|
or addr.is_loopback
|
|
or addr.is_multicast
|
|
or addr.is_link_local
|
|
or addr.is_reserved
|
|
)
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def geo_lookup(reader, ip: str):
|
|
try:
|
|
r = reader.country(ip)
|
|
iso = r.country.iso_code or "??"
|
|
name = r.country.name or "Inconnu"
|
|
return iso, name
|
|
except Exception:
|
|
return "??", "Inconnu"
|
|
|
|
|
|
def run_tshark_capture(interface: str, duration: int, output_pcap: str):
|
|
if shutil.which("tshark") is None:
|
|
console.print("[red]tshark n'est pas installé ou pas dans le PATH.[/red]")
|
|
sys.exit(1)
|
|
|
|
cmd = [
|
|
"tshark",
|
|
"-i", interface,
|
|
"-a", f"duration:{duration}",
|
|
"-w", output_pcap,
|
|
]
|
|
|
|
console.print(
|
|
f"[cyan]Capture en cours sur {interface} pendant {duration} s...[/cyan]\n"
|
|
"[yellow]Envoie maintenant tes messages Telegram.[/yellow]"
|
|
)
|
|
result = subprocess.run(cmd)
|
|
if result.returncode != 0:
|
|
console.print("[red]La capture TShark a échoué.[/red]")
|
|
sys.exit(result.returncode)
|
|
|
|
|
|
def extract_remote_ips(pcap_file: str, local_ip: str = None):
|
|
fields = ["-e", "ip.src", "-e", "ip.dst", "-e", "tcp.dstport", "-e", "udp.dstport"]
|
|
|
|
cmd = [
|
|
"tshark",
|
|
"-r", pcap_file,
|
|
"-T", "fields",
|
|
"-E", "separator=|",
|
|
"-E", "occurrence=f",
|
|
*fields,
|
|
]
|
|
|
|
proc = subprocess.run(cmd, capture_output=True, text=True)
|
|
if proc.returncode != 0:
|
|
console.print("[red]Impossible de relire le fichier pcapng.[/red]")
|
|
console.print(proc.stderr)
|
|
sys.exit(proc.returncode)
|
|
|
|
stats = defaultdict(lambda: {"count": 0, "ports": set(), "direction": set()})
|
|
|
|
for line in proc.stdout.splitlines():
|
|
parts = line.split("|")
|
|
if len(parts) < 4:
|
|
continue
|
|
|
|
src = parts[0].strip()
|
|
dst = parts[1].strip()
|
|
tcp_port = parts[2].strip()
|
|
udp_port = parts[3].strip()
|
|
|
|
if not src or not dst:
|
|
continue
|
|
|
|
if local_ip:
|
|
if src == local_ip and is_public_ip(dst):
|
|
remote = dst
|
|
direction = "sortant"
|
|
elif dst == local_ip and is_public_ip(src):
|
|
remote = src
|
|
direction = "entrant"
|
|
else:
|
|
continue
|
|
else:
|
|
candidates = []
|
|
if is_public_ip(src):
|
|
candidates.append((src, "entrant"))
|
|
if is_public_ip(dst):
|
|
candidates.append((dst, "sortant"))
|
|
for remote, direction in candidates:
|
|
stats[remote]["count"] += 1
|
|
stats[remote]["direction"].add(direction)
|
|
if tcp_port:
|
|
stats[remote]["ports"].add(f"tcp/{tcp_port}")
|
|
if udp_port:
|
|
stats[remote]["ports"].add(f"udp/{udp_port}")
|
|
continue
|
|
|
|
stats[remote]["count"] += 1
|
|
stats[remote]["direction"].add(direction)
|
|
if tcp_port:
|
|
stats[remote]["ports"].add(f"tcp/{tcp_port}")
|
|
if udp_port:
|
|
stats[remote]["ports"].add(f"udp/{udp_port}")
|
|
|
|
return stats
|
|
|
|
|
|
def render_table(stats, mmdb_path: str):
|
|
reader = geoip2.database.Reader(mmdb_path)
|
|
|
|
table = Table(
|
|
title="IP observées pendant l'activité Telegram",
|
|
box=box.SIMPLE_HEAVY,
|
|
show_lines=False
|
|
)
|
|
table.add_column("IP", style="bold")
|
|
table.add_column("Pays")
|
|
table.add_column("Code")
|
|
table.add_column("Direction")
|
|
table.add_column("Ports")
|
|
table.add_column("Paquets", justify="right")
|
|
table.add_column("Alerte")
|
|
|
|
rows = []
|
|
for ip, data in stats.items():
|
|
iso, country = geo_lookup(reader, ip)
|
|
non_fr = iso != "FR"
|
|
rows.append((
|
|
non_fr,
|
|
ip,
|
|
country,
|
|
iso,
|
|
",".join(sorted(data["direction"])),
|
|
", ".join(sorted(data["ports"]))[:80],
|
|
str(data["count"]),
|
|
"NON FR" if non_fr else ""
|
|
))
|
|
|
|
rows.sort(key=lambda r: (not r[0], r[1]))
|
|
|
|
non_fr_count = 0
|
|
for non_fr, ip, country, iso, direction, ports, count, alert in rows:
|
|
if non_fr:
|
|
non_fr_count += 1
|
|
table.add_row(
|
|
f"[red]{ip}[/red]",
|
|
f"[red]{country}[/red]",
|
|
f"[red]{iso}[/red]",
|
|
f"[red]{direction}[/red]",
|
|
f"[red]{ports}[/red]",
|
|
f"[red]{count}[/red]",
|
|
f"[bold red]{alert}[/bold red]",
|
|
)
|
|
else:
|
|
table.add_row(
|
|
ip, country, iso, direction, ports, count, ""
|
|
)
|
|
|
|
console.print(table)
|
|
console.print(
|
|
f"\n[bold]Total IP publiques observées :[/bold] {len(rows)}\n"
|
|
f"[bold red]IP non françaises :[/bold red] {non_fr_count}"
|
|
)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Surveille les IP vues pendant une activité Telegram et met en évidence les IP non françaises."
|
|
)
|
|
parser.add_argument("-i", "--interface", default="ens3", help="Interface réseau, ex: ens3")
|
|
parser.add_argument("-d", "--duration", type=int, default=20, help="Durée de capture en secondes")
|
|
parser.add_argument("-o", "--output", default="telegram-test.pcapng", help="Fichier de capture")
|
|
parser.add_argument("--local-ip", help="IP locale à filtrer, ex: 192.168.1.112")
|
|
parser.add_argument(
|
|
"--mmdb",
|
|
default="/usr/share/GeoIP/GeoLite2-Country.mmdb",
|
|
help="Chemin vers GeoLite2-Country.mmdb"
|
|
)
|
|
parser.add_argument(
|
|
"--read-only",
|
|
action="store_true",
|
|
help="N'effectue pas de capture, relit seulement le fichier -o"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.read_only:
|
|
run_tshark_capture(args.interface, args.duration, args.output)
|
|
|
|
stats = extract_remote_ips(args.output, args.local_ip)
|
|
|
|
if not stats:
|
|
console.print("[yellow]Aucune IP publique trouvée dans cette capture.[/yellow]")
|
|
sys.exit(0)
|
|
|
|
render_table(stats, args.mmdb)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |