#!/usr/bin/env python3
"""afl-health — health, trend, and action analysis for AFL++/cargo-afl/ziggy campaigns.

(c) 2026 Marc "vanHauser" Heuse

A single-file, dependency-free (stdlib-only) CLI that diagnoses AFL++ / cargo-afl /
ziggy fuzzing campaigns — locally or over SSH — and emits a verdict, a within/cross-run
trend, and ranked machine-actionable action directives. It is read-only: it never
writes to or otherwise touches the campaign it inspects.

Run it directly (no install needed):

    python3 afl-health.py /path/to/out
    ./afl-health.py user@host:/path/to/out --json
    ./afl-health.py --summary host:/runs/output/*/afl   # glob expands remotely

It needs only Python >= 3.11 locally; the remote side of an ssh run needs nothing
beyond POSIX sh + coreutils.

The pipeline is one-directional, mirrored by the sections below:

    collect    Target ──ssh|local──▶ RawCampaign      (raw fuzzer_stats / plot_data text + counts)
    snapshot   RawCampaign ─────────▶ CampaignSnapshot (normalized facts)
    analyze    CampaignSnapshot ─────▶ Report          (+ optional prior) verdict/findings/actions
    report     Report ──────────────▶ text | JSON
    store      Report ──────────────▶ SQLite row       (for next run's cross-run delta)
    cli        orchestrates all of the above

License: AGPL-3.0 (GNU Affero General Public License, version 3).
"""

from __future__ import annotations

import argparse
import contextlib
import glob
import json
import os
import re
import shlex
import sqlite3
import subprocess
import sys
import time
from collections import Counter
from concurrent.futures import ThreadPoolExecutor
from dataclasses import asdict, dataclass, field, replace
from pathlib import Path
from typing import Any

__version__ = "0.1"
__license__ = "AGPL-3.0"
SCHEMA_VERSION = 1


# ============================================================================
# model — raw collector output, the normalized snapshot, and the analysis report
# ============================================================================


# --- raw collector output (one per campaign) ------------------------------------
@dataclass
class RawInstance:
    name: str
    fuzzer_stats: str
    plot_data: str | None
    newest_cov_mtime: float | None
    pid_alive: bool
    crash_count: int
    hang_count: int
    core_bytes: int


@dataclass
class RawCampaign:
    campaign_id: str
    host: str
    output_dir: str
    now: int
    layout: str  # "flat" | "ziggy-afl-subdir"
    honggfuzz_present: bool
    collection_error: str | None
    instances: list[RawInstance]


# --- normalized snapshot --------------------------------------------------------
@dataclass
class Instance:
    name: str
    role: str  # main | secondary | solo
    variant: str  # generic | cmplog | asan | compcov | cfisan
    toolchain: str
    alive: bool
    pid_alive: bool
    target_mode: str
    execs_per_sec: float
    execs_ps_last_min: float
    stability: float
    bitmap_cvg: float
    edges_found: int
    total_edges: int
    pending_favs: int
    pending_total: int
    cycles_done: int
    cycles_wo_finds: int
    time_wo_finds: int
    corpus_count: int
    corpus_variable: int
    saved_crashes: int
    saved_hangs: int
    crash_files: int
    hang_files: int
    slowest_exec_ms: int
    peak_rss_mb: int
    exec_timeout: int
    run_time_s: int
    last_update_age_s: int
    last_find_age_s: int | None
    absent_fields: list[str] = field(default_factory=list)
    status: str = ""  # filled by analyze
    issues: list[str] = field(default_factory=list)


@dataclass
class Trend:
    source: str  # plot_data | none
    window: str
    edges_found_now: int
    edges_found_peak: int
    edges_slope_recent_per_min: float
    execs_per_sec_now: float
    execs_per_sec_peak: float
    last_cov_entry_age_s: int | None
    last_find_age_s: int | None
    expected_eps: float | None = None
    prior: dict[str, Any] | None = None  # previous stored snapshot (store)
    cross_run: dict[str, Any] | None = None  # deltas vs prior (analyze)


@dataclass
class Crashes:
    total: int
    hangs_total: int
    core_bytes: int
    by_instance: dict[str, int]


@dataclass
class CampaignSnapshot:
    campaign_id: str
    host: str
    output_dir: str
    generated_at: int
    toolchain: str
    layout: str
    honggfuzz_present: bool
    run_time_s: int
    instance_count: int
    alive_count: int
    roles: dict[str, int]
    variants: dict[str, int]
    trend: Trend
    crashes: Crashes
    instances: list[Instance]
    collection_error: str | None = None

    def edges_found_max(self) -> int:
        """Representative campaign edge count: the max across (synced) instances."""
        return max((i.edges_found for i in self.instances), default=0)

    def to_dict(self) -> dict[str, Any]:
        return {"schema_version": SCHEMA_VERSION, **asdict(self)}


# --- analysis -------------------------------------------------------------------
@dataclass
class Finding:
    severity: str  # high | medium | low
    scope: str  # campaign | instance
    instance: str | None
    signal: str
    likely_cause: str


@dataclass
class ActionDirective:
    action: str
    severity: str
    scope: str
    target: dict[str, Any]
    rationale: str
    evidence: dict[str, Any]
    suggested_command: str | None = None


@dataclass
class Report:
    snapshot: CampaignSnapshot
    overall: str  # healthy | degraded | stalled | misconfigured | dead | collection_error
    verdict_explainer: str
    findings: list[Finding] = field(default_factory=list)
    actions: list[ActionDirective] = field(default_factory=list)
    notes: list[str] = field(default_factory=list)
    headline: dict[str, str] = field(default_factory=dict)  # one-word status per dimension

    def to_dict(self) -> dict[str, Any]:
        d = self.snapshot.to_dict()
        d.update(
            {
                "overall": self.overall,
                "verdict_explainer": self.verdict_explainer,
                "headline": dict(self.headline),
                "findings": [asdict(f) for f in self.findings],
                "actions": [asdict(a) for a in self.actions],
                "notes": list(self.notes),
            },
        )
        return d


# ============================================================================
# parse — version-robust parsing and classification of AFL++ campaign artifacts.
#
# Pure functions only — no filesystem or network. They turn raw fuzzer_stats /
# plot_data text and command lines into normalized facts. build_snapshot()
# assembles these into a CampaignSnapshot.
# ============================================================================


def parse_fuzzer_stats(text: str) -> dict[str, str]:
    """Parse ``key : value`` lines into a dict (order-independent).

    Splits on the first colon so values may themselves contain colons. Trailing
    ``%`` is left on (callers strip it where numeric). Missing keys are simply
    absent — callers must treat absence as "unknown", never as ``0``.
    """
    kv: dict[str, str] = {}
    for line in text.splitlines():
        if ":" not in line:
            continue
        key, _, value = line.partition(":")
        kv[key.strip()] = value.strip()
    return kv


def cmplog_enabled(cmdline: str) -> bool:
    """True if CMPLOG is enabled on this afl-fuzz command line.

    ``-c <path>`` or ``-c0`` (the fuzz target doubles as the cmplog binary) enable
    it; ``-c-`` / ``-c -`` explicitly disable it; absent means off. Never inferred
    from an instance's name.
    """
    prev = ""
    for tok in cmdline.split():
        if tok == "-c-":
            return False
        if tok == "-c0":
            return True
        if tok.startswith("-c") and len(tok) > 2:  # attached value, e.g. -c/path or -c-
            return tok[2:] != "-"
        if tok == "-c":
            prev = "-c"
            continue
        if prev == "-c":
            return tok != "-"
        prev = ""
    return False


def classify_role(cmdline: str, instance_count: int) -> str:
    """Topology role: ``main`` (-M), ``secondary`` (-S), or ``solo`` (lone, no -M/-S)."""
    toks = cmdline.split()
    if "-M" in toks:
        return "main"
    if instance_count == 1 and "-M" not in toks and "-S" not in toks:
        return "solo"
    return "secondary"


def classify_variant(cmdline: str, banner: str) -> str:
    """Instance type, orthogonal to role.

    A build-instrumentation banner (asan / laf==compcov / cfisan) wins; otherwise
    ``cmplog`` if CMPLOG is enabled; otherwise ``generic``.
    """
    low = banner.lower()
    if "asan" in low:
        return "asan"
    if "laf" in low or "compcov" in low:
        return "compcov"
    if "cfisan" in low:
        return "cfisan"
    if cmplog_enabled(cmdline):
        return "cmplog"
    return "generic"


def classify_toolchain(cmdline: str, *, is_afl_subdir: bool) -> str:
    """``ziggy`` / ``cargo-afl`` / ``aflpp`` from layout + command line."""
    if is_afl_subdir or "target/afl/" in cmdline:
        return "ziggy"
    if "target/debug/" in cmdline or "target/release/" in cmdline or "cargo afl" in cmdline:
        return "cargo-afl"
    return "aflpp"


def _to_float(value: str) -> float:
    try:
        return float(value)
    except (TypeError, ValueError):
        return 0.0


def parse_plot_data(text: str) -> dict[str, float | int]:
    """Header-indexed trend from ``plot_data``.

    Tolerant of column drift: reads ``edges_found`` (falling back to ``map_size``),
    ``execs_per_sec``, ``relative_time`` by *name*. Returns peak vs now for edges
    and execs/sec plus the edge slope (edges/min) over the last 25% of the run.
    ``rows == 0`` means no usable data (trend source ``none``).
    """
    cols: dict[str, int] = {}
    rows: list[list[str]] = []
    for line in text.splitlines():
        s = line.strip()
        if not s:
            continue
        if s.startswith("#"):
            names = [c.strip() for c in s.lstrip("#").split(",")]
            cols = {n: i for i, n in enumerate(names)}
            continue
        parts = [p.strip() for p in s.split(",")]
        if len(parts) >= 2:
            rows.append(parts)

    out: dict[str, float | int] = {
        "rows": len(rows),
        "edges_found_now": 0,
        "edges_found_peak": 0,
        "execs_per_sec_now": 0.0,
        "execs_per_sec_peak": 0.0,
        "edges_slope_recent_per_min": 0.0,
    }
    if not rows or not cols:
        return out

    ef_idx = cols.get("edges_found", cols.get("map_size"))
    eps_idx = cols.get("execs_per_sec")
    rt_idx = cols.get("relative_time")

    def col(row: list[str], idx: int | None) -> float:
        return _to_float(row[idx]) if idx is not None and idx < len(row) else 0.0

    efs = [col(r, ef_idx) for r in rows]
    epss = [col(r, eps_idx) for r in rows]
    ts = [col(r, rt_idx) for r in rows] if rt_idx is not None else [float(i) for i in range(len(rows))]

    out["edges_found_now"] = int(efs[-1])
    out["edges_found_peak"] = int(max(efs))
    out["execs_per_sec_now"] = epss[-1]
    out["execs_per_sec_peak"] = max(epss)

    tspan = ts[-1] - ts[0]
    wstart = ts[-1] - 0.25 * tspan
    wi = len(ts) - 1
    for i in range(len(ts) - 1, -1, -1):
        if ts[i] >= wstart:
            wi = i
    dt = ts[-1] - ts[wi]
    de = efs[-1] - efs[wi]
    out["edges_slope_recent_per_min"] = (de / (dt / 60.0)) if dt > 0 else 0.0
    return out


# ============================================================================
# snapshot — assemble a normalized CampaignSnapshot from raw collector artifacts.
# ============================================================================

STALE_SECONDS = 180  # since last_update beyond which an instance is presumed not running

# Fields newer AFL++ builds may omit; tracked as "absent" so callers never read 0.
_WATCHED_OPTIONAL = ("slowest_exec_ms", "peak_rss_mb")


def _fnum(kv: dict[str, str], key: str) -> tuple[float, bool]:
    """(value, absent). Strips a trailing ``%``; non-numeric/missing -> (0.0, True)."""
    raw = kv.get(key)
    if raw is None:
        return 0.0, True
    try:
        return float(raw.rstrip("%")), False
    except ValueError:
        return 0.0, True


def _inum(kv: dict[str, str], key: str) -> int:
    return int(_fnum(kv, key)[0])


def build_snapshot(raw: RawCampaign) -> CampaignSnapshot:
    n = len(raw.instances)
    is_afl_subdir = raw.layout == "ziggy-afl-subdir"

    instances: list[Instance] = []
    roles: dict[str, int] = {}
    variants: dict[str, int] = {}
    by_instance: dict[str, int] = {}
    crash_total = hang_total = core_bytes = 0
    run_time_max = last_find_max = alive_count = 0
    tc_ziggy = tc_cargo = False

    main_mtime: float | None = None
    primary_plot: str | None = None
    primary_edges = -1
    have_main = False

    for ri in raw.instances:
        kv = parse_fuzzer_stats(ri.fuzzer_stats)
        cl = kv.get("command_line", "")
        banner = kv.get("afl_banner", "")
        role = classify_role(cl, n)
        variant = classify_variant(cl, banner)
        tc = classify_toolchain(cl, is_afl_subdir=is_afl_subdir)
        tc_ziggy = tc_ziggy or tc == "ziggy"
        tc_cargo = tc_cargo or tc == "cargo-afl"

        run_time = _inum(kv, "run_time")
        run_time_max = max(run_time_max, run_time)
        last_update = _inum(kv, "last_update")
        lu_age = max(0, raw.now - last_update)
        last_find = _inum(kv, "last_find")
        last_find_max = max(last_find_max, last_find)
        lf_age = max(0, raw.now - last_find) if last_find > 0 else None
        alive = ri.pid_alive or lu_age < STALE_SECONDS
        if alive:
            alive_count += 1

        edges_found = _inum(kv, "edges_found")
        if role == "main":
            have_main = True
            main_mtime = ri.newest_cov_mtime
            primary_plot, primary_edges = ri.plot_data, 1 << 62
        elif edges_found > primary_edges:
            primary_plot, primary_edges = ri.plot_data, edges_found

        crash_total += ri.crash_count
        hang_total += ri.hang_count
        core_bytes += ri.core_bytes
        if ri.crash_count:
            by_instance[ri.name] = ri.crash_count

        roles[role] = roles.get(role, 0) + 1
        variants[variant] = variants.get(variant, 0) + 1
        absent = [f for f in _WATCHED_OPTIONAL if f not in kv]

        instances.append(
            Instance(
                name=ri.name,
                role=role,
                variant=variant,
                toolchain=tc,
                alive=alive,
                pid_alive=ri.pid_alive,
                target_mode=kv.get("target_mode", ""),
                execs_per_sec=_fnum(kv, "execs_per_sec")[0],
                execs_ps_last_min=_fnum(kv, "execs_ps_last_min")[0],
                stability=_fnum(kv, "stability")[0],
                bitmap_cvg=_fnum(kv, "bitmap_cvg")[0],
                edges_found=edges_found,
                total_edges=_inum(kv, "total_edges"),
                pending_favs=_inum(kv, "pending_favs"),
                pending_total=_inum(kv, "pending_total"),
                cycles_done=_inum(kv, "cycles_done"),
                cycles_wo_finds=_inum(kv, "cycles_wo_finds"),
                time_wo_finds=_inum(kv, "time_wo_finds"),
                corpus_count=_inum(kv, "corpus_count"),
                corpus_variable=_inum(kv, "corpus_variable"),
                saved_crashes=_inum(kv, "saved_crashes"),
                saved_hangs=_inum(kv, "saved_hangs"),
                crash_files=ri.crash_count,
                hang_files=ri.hang_count,
                slowest_exec_ms=_inum(kv, "slowest_exec_ms"),
                peak_rss_mb=_inum(kv, "peak_rss_mb"),
                exec_timeout=_inum(kv, "exec_timeout"),
                run_time_s=run_time,
                last_update_age_s=lu_age,
                last_find_age_s=lf_age,
                absent_fields=absent,
            ),
        )

    toolchain = "ziggy" if tc_ziggy else "cargo-afl" if tc_cargo else "aflpp"

    # +cov age: prefer the main queue's newest +cov mtime, else the max across all
    cov_mtime = (
        main_mtime
        if have_main
        else max(
            (ri.newest_cov_mtime for ri in raw.instances if ri.newest_cov_mtime is not None),
            default=None,
        )
    )
    last_cov_age = int(raw.now - cov_mtime) if cov_mtime is not None else None
    if last_cov_age is not None and last_cov_age < 0:
        last_cov_age = 0
    last_find_age = max(0, raw.now - last_find_max) if last_find_max > 0 else None

    pt = parse_plot_data(primary_plot or "")
    trend = Trend(
        source="plot_data" if pt["rows"] else "none",
        window="last 25% of run",
        edges_found_now=int(pt["edges_found_now"]),
        edges_found_peak=int(pt["edges_found_peak"]),
        edges_slope_recent_per_min=float(pt["edges_slope_recent_per_min"]),
        execs_per_sec_now=float(pt["execs_per_sec_now"]),
        execs_per_sec_peak=float(pt["execs_per_sec_peak"]),
        last_cov_entry_age_s=last_cov_age,
        last_find_age_s=last_find_age,
    )

    return CampaignSnapshot(
        campaign_id=raw.campaign_id,
        host=raw.host,
        output_dir=raw.output_dir,
        generated_at=raw.now,
        toolchain=toolchain,
        layout=raw.layout,
        honggfuzz_present=raw.honggfuzz_present,
        run_time_s=run_time_max,
        instance_count=n,
        alive_count=alive_count,
        roles=roles,
        variants=variants,
        trend=trend,
        crashes=Crashes(crash_total, hang_total, core_bytes, by_instance),
        instances=instances,
        collection_error=raw.collection_error,
    )


# ============================================================================
# analyze — snapshot (+ optional history) -> verdict, findings, directives.
#
# All rate/size judgments are self-relative (vs the campaign's own plot_data peak,
# prior snapshots, or an optional expected eps). The only absolute judgments are
# objective/degenerate (execs==0, edges==0, dead) and AFL-normalized (stability
# bands, bitmap saturation). Read-only: directives are emitted, never executed.
# ============================================================================

STABILITY_BAD = 85.0
STABILITY_WARN = 90.0
FLAT_SLOPE = 0.5  # edges/min at/under which coverage is "flat"
SATURATION_CVG = 99.5
CORE_BYTES_WARN = 2 * 1024**3  # 2 GB of .core dumps
THROUGHPUT_RUNTIME = 6 * 3600  # only call a campaign "throughput-bound" after this long
EPS_DROP_FRACTION = 0.5  # execs/sec below this fraction of its own peak = slowdown


def _target(snap: CampaignSnapshot, instance: str | None = None) -> dict[str, Any]:
    t = {"host": snap.host, "output_dir": snap.output_dir}
    if instance:
        t["instance"] = instance
    return t


def analyze(
    snap: CampaignSnapshot,
    *,
    expected_eps: float | None = None,
    prior: dict[str, Any] | None = None,
) -> Report:
    if snap.collection_error:
        return Report(
            snap, "collection_error", snap.collection_error, notes=[f"collection failed: {snap.collection_error}"]
        )

    findings: list[Finding] = []
    actions: list[ActionDirective] = []
    notes: list[str] = []
    snap.trend.expected_eps = expected_eps
    snap.trend.prior = prior

    n = snap.instance_count
    alive = [i for i in snap.instances if i.alive]
    dead = [i for i in snap.instances if not i.alive]
    t = snap.trend
    pending_favs_total = sum(i.pending_favs for i in snap.instances)
    cycles_max = max((i.cycles_done for i in snap.instances), default=0)

    # --- cross-run delta (from a prior stored snapshot) -------------------------
    if prior:
        prev_edges = prior.get("edges_found")
        t.cross_run = {
            "since_ts": prior.get("generated_at"),
            "edges_delta": (snap.edges_found_max() - prev_edges) if isinstance(prev_edges, (int, float)) else None,
        }

    # --- crashes ----------------------------------------------------------------
    if snap.crashes.total > 0:
        findings.append(
            Finding(
                "high",
                "campaign",
                None,
                f"{snap.crashes.total} crash testcases saved",
                "the target crashed on saved inputs — real findings to triage",
            )
        )
        actions.append(
            ActionDirective(
                "triage_crashes",
                "high",
                "campaign",
                _target(snap),
                f"{snap.crashes.total} crashes ({snap.crashes.hangs_total} hangs) await triage",
                {"crashes": snap.crashes.total, "by_instance": snap.crashes.by_instance},
                suggested_command=f"casr-afl -i {snap.output_dir} -o {snap.output_dir}-triaged",
            )
        )

    if snap.crashes.core_bytes > CORE_BYTES_WARN:
        gb = snap.crashes.core_bytes / 1024**3
        findings.append(
            Finding(
                "low",
                "campaign",
                None,
                f"{gb:.0f} GB of .core dumps next to crashes",
                "the crash handler is writing core dumps; disk will fill",
            )
        )
        actions.append(
            ActionDirective(
                "prune_cores",
                "medium",
                "campaign",
                _target(snap),
                f"{gb:.0f} GB of .core files and growing",
                {"core_bytes": snap.crashes.core_bytes},
                suggested_command="disable core dumps (ulimit -c 0) or prune */crashes/*.core",
            )
        )

    # --- objective / AFL-normalized: misconfigured ------------------------------
    all_zero_edges = n > 0 and snap.run_time_s > 0 and all(i.edges_found == 0 for i in snap.instances)
    all_zero_eps = n > 0 and snap.run_time_s > 0 and all(i.execs_per_sec == 0 for i in snap.instances)
    saturated = [i for i in snap.instances if i.bitmap_cvg >= SATURATION_CVG]
    if all_zero_edges:
        findings.append(
            Finding(
                "high",
                "campaign",
                None,
                f"edges_found == 0 after run_time = {snap.run_time_s}s",
                "instrumentation didn't take, or afl-fuzz is on the wrong binary",
            )
        )
    if all_zero_eps:
        findings.append(
            Finding("high", "campaign", None, "execs_per_sec == 0 everywhere", "campaign not actually executing")
        )
    if saturated:
        findings.append(
            Finding(
                "high",
                "campaign",
                None,
                f"bitmap_cvg ~100% on {len(saturated)} instance(s)",
                "map saturation: hash collisions degrade the feedback signal",
            )
        )
    misconfigured = bool(all_zero_edges or all_zero_eps or saturated)

    # --- structural: no main on a multi-instance run ----------------------------
    if n > 1 and snap.roles.get("main", 0) == 0:
        findings.append(
            Finding(
                "medium",
                "campaign",
                None,
                "no -M main instance",
                "no sync hub / final-sync; cross-pollination is weaker",
            )
        )

    # --- per-instance: stability, liveness, eps drop ----------------------------
    low_stability = False
    eps_drop = False
    for i in snap.instances:
        issues: list[str] = []
        status = "healthy"
        if not i.alive:
            status = "dead"
            issues.append("not running (pid dead / last_update stale)")
            findings.append(
                Finding(
                    "medium" if alive else "high",
                    "instance",
                    i.name,
                    "instance not running",
                    "crashed or OOM-killed, or the campaign was stopped",
                )
            )
            actions.append(
                ActionDirective(
                    "relaunch_instance",
                    "medium",
                    "instance",
                    _target(snap, i.name),
                    "instance is not running while others are" if alive else "instance not running",
                    {"last_update_age_s": i.last_update_age_s},
                    suggested_command="re-run its launch line with AFL_AUTORESUME=1",
                )
            )
        elif i.stability and i.stability < STABILITY_BAD:
            low_stability = True
            status = "unstable"
            issues.append(f"stability {i.stability:.0f}% (<{STABILITY_BAD:.0f}%)")
            findings.append(
                Finding(
                    "high",
                    "instance",
                    i.name,
                    f"stability {i.stability:.0f}% (<{STABILITY_BAD:.0f}%)",
                    "nondeterministic edges: uninit memory / hashmap order / "
                    "threads / persistent-loop state (Rust: HashMap RandomState)",
                )
            )
            actions.append(
                ActionDirective(
                    "investigate_instability",
                    "high",
                    "instance",
                    _target(snap, i.name),
                    f"stability {i.stability:.0f}% is below the 85% floor",
                    {"stability": i.stability, "variant": i.variant},
                    suggested_command="diagnose unstable edges; consider moving off ASAN",
                )
            )
        elif i.stability and i.stability < STABILITY_WARN:
            issues.append(f"stability {i.stability:.0f}% (<{STABILITY_WARN:.0f}%)")
            findings.append(
                Finding(
                    "low",
                    "instance",
                    i.name,
                    f"stability {i.stability:.0f}% (<{STABILITY_WARN:.0f}%)",
                    "mild nondeterminism",
                )
            )
        i.status = status
        i.issues = issues

    if t.execs_per_sec_peak > 0 and 0 < t.execs_per_sec_now < EPS_DROP_FRACTION * t.execs_per_sec_peak:
        eps_drop = True
        findings.append(
            Finding(
                "medium",
                "campaign",
                None,
                f"execs/sec {t.execs_per_sec_now:.0f} is far below its own peak {t.execs_per_sec_peak:.0f}",
                "slow inputs accumulating, sync overhead, or a slow path",
            )
        )
        actions.append(
            ActionDirective(
                "tune",
                "medium",
                "campaign",
                _target(snap),
                "recent exec rate dropped well below the campaign's own peak",
                {"eps_now": t.execs_per_sec_now, "eps_peak": t.execs_per_sec_peak},
                suggested_command="verify persistent mode; afl-persistent-config + afl-system-config; raise AFL_TESTCACHE_SIZE",
            )
        )

    # --- throughput-bound vs plateau (need a trend, not a snapshot) -------------
    cov_old = t.last_cov_entry_age_s is not None and t.last_cov_entry_age_s >= max(1800, snap.run_time_s // 4)
    flat = t.source == "plot_data" and t.edges_slope_recent_per_min <= FLAT_SLOPE
    drained = pending_favs_total <= n  # favored queue essentially exhausted
    throughput_bound = cycles_max == 0 and snap.run_time_s >= THROUGHPUT_RUNTIME and pending_favs_total > 0
    plateau = flat and cov_old and drained and not throughput_bound

    if throughput_bound:
        notes.append(
            "throughput-bound, not stalled: 0 completed cycles after a long run with a "
            "large pending-favorites backlog — coverage is gated by exec speed, not corpus."
        )
        findings.append(
            Finding(
                "medium",
                "campaign",
                None,
                f"cycles_done = 0 after {snap.run_time_s // 3600}h; {pending_favs_total} favorites pending",
                "heavy per-exec target caps coverage growth (throughput-bound)",
            )
        )
        actions.append(
            ActionDirective(
                "tune",
                "medium",
                "campaign",
                _target(snap),
                "growth is limited by exec speed, not by corpus",
                {"cycles_done": cycles_max, "pending_favs": pending_favs_total, "eps_now": t.execs_per_sec_now},
                suggested_command="profile the harness/target per-exec cost; faster execs > more seeds",
            )
        )

    if plateau:
        findings.append(
            Finding(
                "high",
                "campaign",
                None,
                f"edge slope ~{t.edges_slope_recent_per_min:.1f}/min, last +cov "
                f"{t.last_cov_entry_age_s}s ago, favorites drained",
                "genuine coverage plateau (bitmap not saturated → input/harness limited)",
            )
        )
        actions.append(
            ActionDirective(
                "inject_seeds",
                "high",
                "campaign",
                _target(snap),
                "coverage plateaued with the favored queue exhausted",
                {"slope_per_min": t.edges_slope_recent_per_min, "last_cov_age_s": t.last_cov_entry_age_s},
                suggested_command="build/inject a richer seed corpus + dictionary (seed-corpus-and-dictionary)",
            )
        )
        actions.append(
            ActionDirective(
                "run_coverage_analysis",
                "medium",
                "campaign",
                _target(snap),
                "find which reachable code is still uncovered",
                {},
                suggested_command="coverage-analysis skill (llvm-cov)",
            )
        )

    if (plateau or throughput_bound) and snap.variants.get("cmplog", 0) == 0:
        actions.append(
            ActionDirective(
                "add_cmplog",
                "medium",
                "campaign",
                _target(snap),
                "no cmplog instance present; comparison-gated edges may be blocking progress",
                {"variants": snap.variants},
                suggested_command="add a -c <cmplog_bin> or -c0 instance (AFL_CMPLOG_ONLY_NEW=1 on restart)",
            )
        )

    if snap.honggfuzz_present:
        notes.append(
            "ziggy honggfuzz instance present but not interpreted (out of scope); "
            "run casr-crash-triage on the shared crashes/ dir for honggfuzz findings."
        )
    absent = sorted({f for i in snap.instances for f in i.absent_fields})
    if absent:
        notes.append(
            f"AFL build omits {', '.join(absent)} from fuzzer_stats — reported as 0 but "
            f"unknown; do not interpret as real zeros."
        )

    # --- verdict ----------------------------------------------------------------
    if misconfigured:
        overall = "misconfigured"
    elif n > 0 and snap.alive_count == 0:
        overall = "dead"
    elif dead or low_stability or (eps_drop and not throughput_bound):
        overall = "degraded"
    elif plateau:
        overall = "stalled"
    else:
        overall = "healthy"

    headline = _headline(
        snap,
        low_stability=low_stability,
        eps_drop=eps_drop,
        throughput_bound=throughput_bound,
        plateau=plateau,
        all_zero_edges=all_zero_edges,
        all_zero_eps=all_zero_eps,
        saturated=bool(saturated),
    )
    explainer = _explain(overall, snap, throughput_bound=throughput_bound)
    return Report(snap, overall, explainer, findings=findings, actions=actions, notes=notes, headline=headline)


def _headline(
    snap: CampaignSnapshot,
    *,
    low_stability: bool,
    eps_drop: bool,
    throughput_bound: bool,
    plateau: bool,
    all_zero_edges: bool,
    all_zero_eps: bool,
    saturated: bool,
) -> dict[str, str]:
    """One-word status per dimension for the header block."""
    stab_vals = [i.stability for i in snap.instances if i.stability > 0]
    stab_min = min(stab_vals) if stab_vals else None
    if low_stability or (stab_min is not None and stab_min < STABILITY_BAD):
        stability = "low"
    elif stab_min is not None and stab_min < STABILITY_WARN:
        stability = "marginal"
    else:
        stability = "OK"

    if all_zero_eps:
        speed = "none"
    elif throughput_bound:
        speed = "slow"
    elif eps_drop:
        speed = "dropping"
    else:
        speed = "OK"

    if all_zero_edges:
        coverage = "none"
    elif saturated:
        coverage = "saturated"
    elif plateau:
        coverage = "plateaued"
    elif throughput_bound or snap.trend.edges_slope_recent_per_min > FLAT_SLOPE:
        coverage = "gaining coverage"
    else:
        coverage = "flat"

    return {"stability": stability, "speed": speed, "coverage": coverage}


def _explain(overall: str, snap: CampaignSnapshot, *, throughput_bound: bool) -> str:
    t = snap.trend
    alive = f"{snap.alive_count}/{snap.instance_count} instances alive"
    if overall == "misconfigured":
        return f"Setup failure: {alive}, but coverage/feedback is not working (see findings)."
    if overall == "dead":
        return f"No instance is running ({alive}) — campaign stopped (possibly intended)."
    if overall == "degraded":
        return f"{alive}, but a subset is unwell (dead instance, low stability, or dropped exec rate)."
    if overall == "stalled":
        return (
            f"{alive} and running, but coverage plateaued: edge slope "
            f"~{t.edges_slope_recent_per_min:.1f}/min, last +cov {t.last_cov_entry_age_s}s ago, "
            f"favored queue drained."
        )
    tail = " Throughput-bound: still gaining coverage, just slowly (exec-speed limited)." if throughput_bound else ""
    return f"{alive}; coverage progressing and stability OK.{tail}"


# ============================================================================
# report — machine JSON (single or fleet envelope) and a human text report.
# ============================================================================

# Process exit codes for cron/CI.
EXIT = {
    "healthy": 0,
    "degraded": 1,
    "stalled": 1,
    "misconfigured": 2,
    "dead": 2,
    "collection_error": 3,
}


def exit_code(reports: list[Report]) -> int:
    return max((EXIT.get(r.overall, 0) for r in reports), default=0)


def fleet_verdict(reports: list[Report]) -> str:
    """The worst verdict across the fleet (by exit-code rank)."""
    if not reports:
        return "healthy"
    return max(reports, key=lambda r: EXIT.get(r.overall, 0)).overall


def render_json(reports: list[Report], *, now: int, version: str) -> str:
    if len(reports) == 1:
        return json.dumps(reports[0].to_dict(), indent=2)
    envelope = {
        "tool": "afl-health",
        "version": version,
        "generated_at": now,
        "campaigns": [r.to_dict() for r in reports],
        "fleet": {
            "verdict": fleet_verdict(reports),
            "counts": dict(Counter(r.overall for r in reports)),
        },
    }
    return json.dumps(envelope, indent=2)


def _hm(seconds: int) -> str:
    h, m = divmod(max(0, seconds) // 60, 60)
    return f"{h}h{m:02d}m"


def _toolchain_label(toolchain: str) -> str:
    return "AFL++" if toolchain == "aflpp" else toolchain


def render_text(report: Report, summary: bool = False) -> str:
    s = report.snapshot
    if report.overall == "collection_error":
        return f"=== {s.campaign_id} ===\nCOLLECTION ERROR: {report.verdict_explainer}\n"

    hl = report.headline
    tc = _toolchain_label(s.toolchain)
    lines: list[str] = [f"=== {s.campaign_id} ==="]

    if summary:
        lines.append(
            f"Verdict: {report.overall.upper()}  Type: {tc}  "
            f"Running: {s.alive_count}/{s.instance_count}  Runtime: {_hm(s.run_time_s)}  "
            f"Stability: {hl.get('stability', '?')}  Speed: {hl.get('speed', '?')}  "
            f"Coverage: {hl.get('coverage', '?')}"
        )
    else:
        lines += [
            f"{'Type:':<10} {tc}",
            f"{'Running:':<10} {s.alive_count}/{s.instance_count}",
            f"{'Runtime:':<10} {_hm(s.run_time_s)}",
            f"{'Stability:':<10} {hl.get('stability', '?')}",
            f"{'Speed:':<10} {hl.get('speed', '?')}",
            f"{'Coverage:':<10} {hl.get('coverage', '?')}",
            "",
            f"{'Verdict:':<10} {report.overall.upper()}",
            "",
        ]

        header = (
            f"{'instance name':<18}{'role/variant':<18}{'alive':<6}{'exec/s':>8}{'stab':>7}"
            f"{'cvg':>9}{'favs':>7}{'crash':>6}  status"
        )
        lines.append(header)
        lines.append("-" * len(header))
        for i in sorted(s.instances, key=lambda x: -x.crash_files):
            lines.append(
                f"{i.name:<18}{i.role + '/' + i.variant:<18}{('yes' if i.alive else 'NO'):<6}"
                f"{i.execs_per_sec:>8.0f}{i.stability:>6.1f}%{i.bitmap_cvg:>8.2f}%{i.pending_favs:>7}"
                f"{i.crash_files:>6}  {i.status}",
            )

        t = s.trend
        lines.append("")
        delta = ""
        if t.cross_run and t.cross_run.get("edges_delta") is not None:
            delta = f", Δ{t.cross_run['edges_delta']:+d} edges since last run"
        cov = "n/a" if t.last_cov_entry_age_s is None else _hm(t.last_cov_entry_age_s)
        total_edges = max((i.total_edges for i in s.instances), default=0)
        lines.append(
            f"Trend [source: {t.source}]: edges {t.edges_found_now}/{total_edges} (peak {t.edges_found_peak}, "
            f"{t.edges_slope_recent_per_min:+.1f}/min), +cov age {cov}, "
            f"exec/s {t.execs_per_sec_now:.1f}/{t.execs_per_sec_peak:.1f}{delta}",
        )
        lines.append(
            f"Crashes: {s.crashes.total} ({s.crashes.hangs_total} hangs)"
            + (f", cores {s.crashes.core_bytes / 1024**3:.0f}GB" if s.crashes.core_bytes else ""),
        )

    if report.actions:
        lines.append("")
        lines.append("Action required:")
        for a in sorted(report.actions, key=lambda x: {"high": 0, "medium": 1, "low": 2}.get(x.severity, 3)):
            where = a.target.get("instance", "campaign")
            lines.append(f"  [{a.severity}] {a.action} ({where}) — {a.rationale}")
            if a.suggested_command:
                lines.append(f"          $ {a.suggested_command}")
    for note in report.notes:
        lines.append(f"note: {note}")
    return "\n".join(lines) + "\n"


# ============================================================================
# store — persisted snapshot history (SQLite) for cross-run trend.
#
# One row per run, keyed by campaign_id (host:abspath). prior() returns the most
# recent stored row's key metrics so the analyzer can compute cross-run deltas
# (call prior() before save() so it reflects the *previous* run). sqlite3 is
# stdlib, so this works everywhere with no extra dependency.
# ============================================================================

_SCHEMA = """
CREATE TABLE IF NOT EXISTS snapshots (
    campaign_id    TEXT NOT NULL,
    generated_at   INTEGER NOT NULL,
    run_time_s     INTEGER,
    edges_found    INTEGER,
    eps_now        REAL,
    eps_peak       REAL,
    corpus_count   INTEGER,
    pending_favs   INTEGER,
    crashes        INTEGER,
    hangs          INTEGER,
    last_cov_age_s INTEGER,
    verdict        TEXT,
    raw_json       TEXT
);
CREATE INDEX IF NOT EXISTS idx_snapshots_cid_ts ON snapshots(campaign_id, generated_at);
"""

_METRIC_COLS = (
    "generated_at",
    "run_time_s",
    "edges_found",
    "eps_now",
    "eps_peak",
    "corpus_count",
    "pending_favs",
    "crashes",
    "hangs",
    "last_cov_age_s",
    "verdict",
)


class TrendStore:
    def __init__(self, path: str | Path) -> None:
        self.path = Path(path)
        self.path.parent.mkdir(parents=True, exist_ok=True)
        self._conn = sqlite3.connect(str(self.path))
        self._conn.row_factory = sqlite3.Row
        self._conn.executescript(_SCHEMA)
        self._conn.commit()

    def save(self, report: Report) -> None:
        s = report.snapshot
        t = s.trend
        self._conn.execute(
            "INSERT INTO snapshots (campaign_id, generated_at, run_time_s, edges_found, "
            "eps_now, eps_peak, corpus_count, pending_favs, crashes, hangs, last_cov_age_s, "
            "verdict, raw_json) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)",
            (
                s.campaign_id,
                s.generated_at,
                s.run_time_s,
                s.edges_found_max(),
                t.execs_per_sec_now,
                t.execs_per_sec_peak,
                sum(i.corpus_count for i in s.instances),
                sum(i.pending_favs for i in s.instances),
                s.crashes.total,
                s.crashes.hangs_total,
                t.last_cov_entry_age_s,
                report.overall,
                json.dumps(report.to_dict()),
            ),
        )
        self._conn.commit()

    def prior(self, campaign_id: str) -> dict[str, Any] | None:
        """Most recent stored metrics for this campaign, or None if never seen."""
        row = self._conn.execute(
            f"SELECT {', '.join(_METRIC_COLS)} FROM snapshots WHERE campaign_id = ? "  # noqa: S608
            "ORDER BY generated_at DESC, rowid DESC LIMIT 1",
            (campaign_id,),
        ).fetchone()
        return dict(row) if row is not None else None

    def close(self) -> None:
        self._conn.close()


# ============================================================================
# collect — gather raw campaign artifacts, locally or over SSH, into a RawCampaign.
#
# Both paths run the *same* POSIX-sh collector so behavior is identical. The remote
# needs nothing beyond sh + coreutils; we never transfer corpora or core dumps —
# only fuzzer_stats, a plot_data tail, the newest +cov mtime, and crash/hang/core
# counts. The collector emits a length-prefixed, binary-safe dump that parse_dump
# turns into a RawCampaign.
# ============================================================================

# POSIX-sh collector. $1 = campaign output dir. Honours $AFL_HEALTH_NOW for tests.
COLLECTOR_SH = r"""
DIR=$1
abs=$(cd "$DIR" 2>/dev/null && pwd) || { printf 'ERROR no such dir: %s\n' "$DIR"; exit 9; }
now=${AFL_HEALTH_NOW:-$(date +%s)}
hf=0
if [ -n "$(find "$abs" -type d -name honggfuzz -print 2>/dev/null | head -n1)" ]; then hf=1; fi
layout=flat
if find "$abs" -maxdepth 3 -type f -name fuzzer_stats 2>/dev/null | grep -q '/afl/[^/]*/fuzzer_stats'; then
  layout=ziggy-afl-subdir
fi
printf 'CAMPAIGN now=%s honggfuzz=%s layout=%s\n' "$now" "$hf" "$layout"
printf 'DIR %s\n' "$abs"
find "$abs" -maxdepth 3 -type f -name fuzzer_stats 2>/dev/null | sort | while IFS= read -r f; do
  d=$(dirname "$f"); name=$(basename "$d")
  pid=$(awk -F: '/^fuzzer_pid/{gsub(/[ \t]/,"",$2); print $2; exit}' "$f")
  pa=0; if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then pa=1; fi
  cov=$(find "$d/queue" -maxdepth 1 -type f -name '*+cov' -printf '%T@\n' 2>/dev/null | sort -n | tail -n1)
  cr=$(find "$d/crashes" -maxdepth 1 -type f -name 'id:*' ! -name '*.core' ! -name '*.txt' ! -name '*.metadata' ! -name 'README.txt' 2>/dev/null | wc -l | tr -d ' ')
  hg=$(find "$d/hangs" -maxdepth 1 -type f -name 'id:*' ! -name '*.core' ! -name '*.txt' ! -name '*.metadata' ! -name 'README.txt' 2>/dev/null | wc -l | tr -d ' ')
  cb=$(find "$d/crashes" -maxdepth 1 -type f -name '*.core' -printf '%s\n' 2>/dev/null | awk '{s+=$1} END{printf "%d", s+0}')
  printf 'INSTANCE %s\n' "$name"
  printf 'PIDALIVE %s\n' "$pa"
  printf 'COVMTIME %s\n' "$cov"
  printf 'CRASHES %s\n' "$cr"
  printf 'HANGS %s\n' "$hg"
  printf 'COREBYTES %s\n' "$cb"
  sn=$(wc -c < "$f" | tr -d ' ')
  printf 'BLOB stats %s\n' "$sn"
  cat "$f"
  if [ -f "$d/plot_data" ]; then
    pt=$( { head -n1 "$d/plot_data"; tail -n 200 "$d/plot_data"; } )
    pn=$(printf '%s' "$pt" | wc -c | tr -d ' ')
    printf 'BLOB plot %s\n' "$pn"
    printf '%s' "$pt"
  else
    printf 'BLOB plot 0\n'
  fi
  printf 'ENDINSTANCE\n'
done
printf 'END\n'
"""


@dataclass
class Target:
    output_dir: str
    host: str = "local"  # "local" or an ssh destination ([user@]host)
    port: int | None = None
    ssh_opts: list[str] = field(default_factory=list)
    ssh_exe: str = "ssh"
    connect_timeout: int = 12

    @property
    def is_local(self) -> bool:
        return self.host == "local"


def _err(target: Target, msg: str, now: int | None) -> RawCampaign:
    return RawCampaign(
        campaign_id=f"{target.host}:{target.output_dir}",
        host=target.host,
        output_dir=target.output_dir,
        now=now if now is not None else int(time.time()),
        layout="flat",
        honggfuzz_present=False,
        collection_error=msg,
        instances=[],
    )


def _ssh_base_opts(target: Target) -> list[str]:
    """Common ssh ``-o`` options for both collection and glob expansion."""
    opts = ["-o", "BatchMode=yes", "-o", f"ConnectTimeout={target.connect_timeout}"]
    if target.port:
        opts += ["-p", str(target.port)]
    return opts + target.ssh_opts


def _run(target: Target) -> subprocess.CompletedProcess[bytes]:
    script = COLLECTOR_SH.encode()
    if target.is_local:
        argv = ["sh", "-s", target.output_dir]
    else:
        argv = [target.ssh_exe, *_ssh_base_opts(target), target.host, f"sh -s {shlex.quote(target.output_dir)}"]
    return subprocess.run(
        argv,
        input=script,
        capture_output=True,
        timeout=target.connect_timeout + 60,
        check=False,
    )


def collect(target: Target, *, now_override: int | None = None) -> RawCampaign:
    try:
        proc = _run(target)
    except (subprocess.TimeoutExpired, OSError) as e:
        return _err(target, f"{type(e).__name__}: {e}", now_override)
    out = proc.stdout
    if not out.startswith((b"CAMPAIGN", b"ERROR")):
        stderr = proc.stderr.decode("utf-8", "replace").strip()
        return _err(target, stderr or f"collector exited {proc.returncode}", now_override)
    raw = parse_dump(out, host=target.host)
    if now_override is not None:
        raw.now = now_override
    return raw


def parse_dump(data: bytes, *, host: str) -> RawCampaign:
    pos = 0

    def read_line() -> str:
        nonlocal pos
        nl = data.find(b"\n", pos)
        if nl < 0:
            s = data[pos:].decode("utf-8", "replace")
            pos = len(data)
            return s
        s = data[pos:nl].decode("utf-8", "replace")
        pos = nl + 1
        return s

    def read_bytes(n: int) -> bytes:
        nonlocal pos
        b = data[pos : pos + n]
        pos += n
        return b

    def field_after_space() -> str:
        parts = read_line().split(" ", 1)
        return parts[1] if len(parts) > 1 else ""

    first = read_line()
    if first.startswith("ERROR"):
        return RawCampaign(
            f"{host}:?", host, "", 0, "flat", honggfuzz_present=False, collection_error=first, instances=[]
        )

    meta = dict(tok.split("=", 1) for tok in first.split()[1:] if "=" in tok)
    now = int(meta.get("now") or 0)
    honggfuzz = meta.get("honggfuzz") == "1"
    layout = meta.get("layout", "flat")
    dirline = read_line()
    absdir = dirline[4:] if dirline.startswith("DIR ") else ""

    instances: list[RawInstance] = []
    while True:
        line = read_line()
        if line in ("END", ""):
            break
        if not line.startswith("INSTANCE "):
            continue
        name = line[len("INSTANCE ") :]
        pid_alive = field_after_space() == "1"
        cov = field_after_space().strip()
        cov_mtime = float(cov) if cov else None
        crash_count = int(field_after_space() or 0)
        hang_count = int(field_after_space() or 0)
        core_bytes = int(field_after_space() or 0)
        b = read_line().split()  # BLOB stats N
        stats = read_bytes(int(b[2])).decode("utf-8", "replace")
        b = read_line().split()  # BLOB plot N
        nplot = int(b[2])
        plot = read_bytes(nplot).decode("utf-8", "replace") if nplot > 0 else None
        read_line()  # ENDINSTANCE
        instances.append(RawInstance(name, stats, plot, cov_mtime, pid_alive, crash_count, hang_count, core_bytes))

    return RawCampaign(
        f"{host}:{absdir}",
        host,
        absdir,
        now,
        layout,
        honggfuzz_present=honggfuzz,
        collection_error=None,
        instances=instances,
    )


# ============================================================================
# config — target resolution: CLI specs + optional JSON config into Target objects.
# ============================================================================


def parse_target(spec: str, *, ssh_opts: list[str], connect_timeout: int) -> Target:
    """Parse one target spec.

    ``[user@]host:/path`` is an ssh target; anything else is a local path. The ssh
    form is recognized when the part before the first ``:`` contains no ``/``.
    """
    if ":" in spec:
        head, _, path = spec.partition(":")
        if head and "/" not in head and head != ".":
            return Target(output_dir=path, host=head, ssh_opts=list(ssh_opts), connect_timeout=connect_timeout)
    return Target(output_dir=spec, ssh_opts=list(ssh_opts), connect_timeout=connect_timeout)


_GLOB_META = re.compile(r"[*?\[]")


def _has_glob(path: str) -> bool:
    return bool(_GLOB_META.search(path))


def _shell_glob_quote(path: str) -> str:
    """Quote ``path`` for a remote shell while leaving glob metacharacters bare.

    shlex.quote would single-quote ``*?[]`` and kill globbing; instead we quote
    only the literal runs so the *remote* shell expands the pattern, yet paths with
    spaces / shell metacharacters stay injection-safe.
    """
    parts = re.split(r"([*?\[\]])", path)
    return "".join(tok if tok in "*?[]" else shlex.quote(tok) for tok in parts if tok != "")


def _expand_remote_glob(target: Target) -> list[str]:
    """Expand a glob ``output_dir`` on the remote host, returning matched directories."""
    pattern = _shell_glob_quote(target.output_dir)
    remote = f'for d in {pattern}; do [ -d "$d" ] && printf "%s\\n" "$d"; done'
    argv = [target.ssh_exe, *_ssh_base_opts(target), target.host, remote]
    try:
        proc = subprocess.run(argv, capture_output=True, timeout=target.connect_timeout + 30, check=False)
    except (subprocess.TimeoutExpired, OSError):
        return []
    return [ln for ln in proc.stdout.decode("utf-8", "replace").splitlines() if ln.strip()]


def expand_globs(targets: list[Target]) -> list[Target]:
    """Expand any glob ``output_dir`` into one Target per matched directory.

    Local globs that the shell left literal (e.g. quoted) are expanded with the
    ``glob`` module; remote globs are expanded over ssh. A pattern that matches
    nothing is kept verbatim so it surfaces a clean "no such dir" collection error.
    """
    out: list[Target] = []
    for t in targets:
        if not _has_glob(t.output_dir):
            out.append(t)
            continue
        if t.is_local:
            matches = sorted(p for p in glob.glob(t.output_dir) if Path(p).is_dir())
        else:
            matches = _expand_remote_glob(t)
        if matches:
            out.extend(replace(t, output_dir=m) for m in matches)
        else:
            out.append(t)
    return out


def load_config(path: str) -> dict[str, Any]:
    return json.loads(Path(path).read_text(encoding="utf-8"))


def resolve_targets(
    specs: list[str],
    *,
    config: dict[str, Any] | None,
    ssh_opts: list[str],
    connect_timeout: int,
) -> list[Target]:
    targets: list[Target] = []
    if config:
        for entry in config.get("targets", []):
            if isinstance(entry, str):
                targets.append(parse_target(entry, ssh_opts=ssh_opts, connect_timeout=connect_timeout))
            elif isinstance(entry, dict):
                targets.append(
                    Target(
                        output_dir=entry["path"],
                        host=entry.get("host", "local"),
                        port=entry.get("port"),
                        ssh_opts=entry.get("ssh_opts", list(ssh_opts)),
                        connect_timeout=entry.get("connect_timeout", connect_timeout),
                    )
                )
    for spec in specs:
        targets.append(parse_target(spec, ssh_opts=ssh_opts, connect_timeout=connect_timeout))
    return targets


# ============================================================================
# cli — entry point: one-shot / --watch, fleet, exit codes, on-change hook.
# ============================================================================


def detect_change(report: Report, prior: dict[str, Any] | None) -> dict[str, Any] | None:
    """A change worth alerting on vs the previous run: verdict flip or new crashes.

    Returns ``None`` on the first-ever run (no baseline) or when nothing changed.
    """
    if prior is None:
        return None
    changes: list[str] = []
    if report.overall != prior.get("verdict"):
        changes.append(f"verdict {prior.get('verdict')} -> {report.overall}")
    prev_crashes = int(prior.get("crashes") or 0)
    if report.snapshot.crashes.total > prev_crashes:
        changes.append(f"new crashes {prev_crashes} -> {report.snapshot.crashes.total}")
    if not changes:
        return None
    return {
        "campaign_id": report.snapshot.campaign_id,
        "change": changes,
        "overall": report.overall,
        "prev_overall": prior.get("verdict"),
        "crashes": report.snapshot.crashes.total,
        "prev_crashes": prev_crashes,
        "generated_at": report.snapshot.generated_at,
    }


def run_hook(cmd: str, event: dict[str, Any]) -> None:
    """Run the user's --on-change hook, piping the change event as JSON on stdin."""
    with contextlib.suppress(subprocess.TimeoutExpired, OSError):
        subprocess.run(
            cmd,
            shell=True,
            input=json.dumps(event),
            text=True,
            capture_output=True,
            timeout=30,
            check=False,
        )


def run_once(
    targets: list[Target],
    *,
    store: TrendStore | None = None,
    expected_eps: float | None = None,
    now: int | None = None,
    on_change: str | None = None,
) -> list[Report]:
    # Collection is the slow, I/O-bound (ssh) step → parallelize it; analysis and the
    # (single-threaded) store run on the main thread afterwards.
    workers = min(8, max(1, len(targets)))
    with ThreadPoolExecutor(max_workers=workers) as pool:
        raws = list(pool.map(lambda t: collect(t, now_override=now), targets))

    reports: list[Report] = []
    for raw in raws:
        snap = build_snapshot(raw)
        prior = store.prior(snap.campaign_id) if store else None
        report = analyze(snap, expected_eps=expected_eps, prior=prior)
        if store:
            store.save(report)
        event = detect_change(report, prior)
        if event and on_change:
            run_hook(on_change, event)
        reports.append(report)
    return reports


def _ssh_opts(raw_opts: list[str]) -> list[str]:
    out: list[str] = []
    for opt in raw_opts:
        out += ["-o", opt]
    return out


def _default_state_dir() -> Path:
    base = os.environ.get("XDG_STATE_HOME") or str(Path.home() / ".local" / "state")
    return Path(base) / "afl-health"


class _HelpFormatter(argparse.HelpFormatter):
    def __init__(self, prog, *args, **kwargs):
        super().__init__(prog, *args, **kwargs)
        self.add_text("health, trend, and action analysis tool for AFL++ campaigns")
        self.add_text("")
        self.add_text("%s [ options ] [TARGET ...]" % prog)


def _build_parser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(
        prog="afl-health",
        formatter_class=_HelpFormatter,
        description="Health, trend, and action analysis for AFL++/cargo-afl/ziggy campaigns (local or over ssh).",
    )
    p.add_argument(
        "targets",
        nargs="*",
        metavar="TARGET",
        help="local /path/to/out, or [user@]host:/path/to/out for ssh; "
        "glob patterns (e.g. host:/runs/*/afl) expand to one campaign per match (remotely for ssh)",
    )
    p.add_argument("--config", help="JSON config defining targets / ssh opts / state dir")
    p.add_argument("--json", action="store_true", help="machine report (incl. action directives)")
    p.add_argument("--summary", action="store_true", help="condensed one-line per campaign (fleet roll-up for many)")
    p.add_argument("--watch", type=float, metavar="SECONDS", help="live refresh every N seconds")
    p.add_argument(
        "--on-change", metavar="CMD", help="run CMD when a target changes state; event JSON is piped on stdin"
    )
    p.add_argument("--expect-eps", type=float, help="expected execs/sec (sharpens speed judgment)")
    p.add_argument("--state-dir", help="trend history location (default: XDG state dir)")
    p.add_argument("--no-store", action="store_true", help="do not persist/read trend history")
    p.add_argument("--ssh-opt", action="append", default=[], metavar="KEY=VAL", help="extra ssh -o option (repeatable)")
    p.add_argument("--fail-on", choices=sorted(EXIT), help="exit nonzero only at/above this verdict")
    p.add_argument("--connect-timeout", type=int, default=12, help="ssh connect timeout (seconds)")
    p.add_argument("--quiet", action="store_true", help="suppress human output (for scripting)")
    p.add_argument("--now", type=int, help=argparse.SUPPRESS)  # deterministic clock for tests
    p.add_argument(
        "--version", action="version", version=f"afl-health {__version__} (license: {__license__})"
    )
    return p


def _emit(reports: list[Report], args: argparse.Namespace, now: int) -> None:
    if args.json:
        print(render_json(reports, now=now, version=__version__))
        return
    if args.quiet:
        return
    if args.summary and len(reports) > 1:
        print(_fleet_summary(reports))
        return
    print("".join(render_text(r, summary=args.summary) for r in reports), end="")


def _fleet_summary(reports: list[Report]) -> str:
    lines = [f"FLEET verdict: {fleet_verdict(reports).upper()}  ({len(reports)} campaigns)", ""]
    for r in reports:
        s = r.snapshot
        lines.append(
            f"  {r.overall.upper():<14} {s.campaign_id}  "
            f"({s.alive_count}/{s.instance_count} alive, {s.crashes.total} crashes)"
        )
    return "\n".join(lines)


def _final_rc(reports: list[Report], args: argparse.Namespace) -> int:
    rc = exit_code(reports)
    if args.fail_on:
        return rc if rc >= EXIT[args.fail_on] else 0
    return rc


def _watch(targets: list[Target], args: argparse.Namespace, store: TrendStore | None, now: int | None) -> int:
    reports: list[Report] = []
    try:
        while True:
            reports = run_once(targets, store=store, expected_eps=args.expect_eps, now=now, on_change=args.on_change)
            sys.stdout.write("\x1b[2J\x1b[H")  # clear screen
            _emit(reports, args, now or int(time.time()))
            sys.stdout.flush()
            time.sleep(args.watch)
    except KeyboardInterrupt:
        return _final_rc(reports, args) if reports else 0


def main(argv: list[str] | None = None) -> int:
    parser = _build_parser()
    args = parser.parse_args(argv)
    now = args.now
    if now is None and os.environ.get("AFL_HEALTH_NOW"):
        now = int(os.environ["AFL_HEALTH_NOW"])

    config = load_config(args.config) if args.config else None
    ssh_opts = _ssh_opts(args.ssh_opt)
    targets = resolve_targets(args.targets, config=config, ssh_opts=ssh_opts, connect_timeout=args.connect_timeout)
    if not targets:
        parser.print_help()  # no target given → behave like -h
        return 0
    targets = expand_globs(targets)

    store = (
        None
        if args.no_store
        else TrendStore((Path(args.state_dir) if args.state_dir else _default_state_dir()) / "history.sqlite")
    )

    if args.watch:
        return _watch(targets, args, store, now)

    reports = run_once(targets, store=store, expected_eps=args.expect_eps, now=now, on_change=args.on_change)
    _emit(reports, args, now or int(time.time()))
    return _final_rc(reports, args)


if __name__ == "__main__":
    sys.exit(main())
