← 返回首页
运维记录 Codex

Codex Sessions Recovery 实战记录

记录日期:2026-04-21 | 更新日期:2026-04-22

一、背景与目标

在切换 API key、provider 或工作区后,Codex 历史会话在客户端中可能出现“只显示少量新会话”的现象。本文目标是给出一套可复用、可回滚、低风险的 sessions recovery 方法,优先恢复可见性与索引一致性,不改 rollout 正文内容。

二、方法来源与工具

本文采用的核心脚本能力:session_audit.py(只读审计)、session_repair.py(路径修复/反归档)、provider_migrate.py(可见性迁移)。

三、执行原则

四、A-E 阶段执行纪要(示例)

阶段 动作 结果
A 只读检查 统计 rollout、threads 与一致性指标 发现归档会话与少量孤立 rollout,明确问题边界
B dry-run 预估路径修复 + 反归档影响 输出可执行清单,等待确认后写入
C 安全修复 路径修复(如可唯一定位)+ 反归档 历史会话索引恢复,归档状态回到可见集
D 孤立会话核验 检查 orphan 是否为活动会话 活动会话标记为 ACTIVE_ORPHAN,禁止导入
E 可见性对齐 provider 诊断 + provider 迁移 当前 provider 下可见会话显著提升;再按 cwd 验证

五、关键判断与原因

六、实际写入(最小方案)

仅执行 DB 层 provider 迁移,不修改 rollout/cwd/source/archived。推荐 SQL 模板如下:

UPDATE threads
SET model_provider='<current_provider>'
WHERE archived=0 AND model_provider<>'<current_provider>';

七、回滚资产与回滚方式

回滚原则:按 CSV 将每个 id 的 provider 恢复为 old_provider;全程单事务,确保可恢复。

八、给其他人和 Agent 的建议

九、附录:迁移技能包文本附件

以下为迁移技能包内文件全文(文本展开版),用于静态网站直接查阅,无需开发附件上传/下载功能。

codex-session-migration/SKILL.md

---
name: codex-session-migration
description: Safely audit, repair, and migrate Codex session visibility when sessions appear missing after switching API keys/providers, moving machines, or changing workspaces. Use for `~/.codex` troubleshooting involving `state_5.sqlite`, `sessions/`, `archived_sessions/`, provider/cwd filtering, path mismatches, and reversible migration with dry-run plus backup.
---

# Codex Session Migration

## Overview

Run a repeatable, low-risk session migration workflow for Codex data directories. Prioritize read-only diagnostics, explicit dry-run summaries, and reversible write operations.

## Workflow

1. Stop active Codex processes before any write operation.
2. Run read-only audit first and report counts.
3. Repair path mismatches and unarchive sessions when needed.
4. Migrate `model_provider` only when visibility still fails.
5. Re-run integrity checks and report before/after deltas.

## Preconditions

1. Use this skill against a Codex home directory, typically `~/.codex` on Linux or `%USERPROFILE%\\.codex` on Windows.
2. Keep at least one backup copy of the entire `.codex` folder before migration.
3. Treat `state_5.sqlite` as authoritative thread index; treat `sessions/` and `archived_sessions/` as rollout storage.
4. Do not edit rollout message content.

## Phase 1: Audit (Read-Only)

Run:

```bash
python scripts/session_audit.py --codex-dir ~/.codex
```

Confirm:

1. `threads_total` and rollout file totals are sensible.
2. `db_not_on_disk == 0`.
3. `missing_rollout_path == 0` or only known mismatches.
4. `model_provider`/`cwd` distribution explains visibility behavior.

Interpretation:

1. `disk_not_in_db > 0` often means active/incomplete session files.
2. `provider` skew usually causes "only newest session visible".
3. `cwd` skew causes partial visibility by workspace.

## Phase 2: Repair Paths and Unarchive

Dry-run first:

```bash
python scripts/session_repair.py --codex-dir ~/.codex
```

Apply only after review:

```bash
python scripts/session_repair.py --codex-dir ~/.codex --apply
```

Behavior:

1. Move uniquely matched rollout files to expected `threads.rollout_path` targets.
2. Set `archived=0, archived_at=NULL` for archived threads.
3. Create SQLite backup in `.tmp/`.

## Phase 3: Provider Visibility Migration (Optional)

Run only if sessions remain hidden under the new API/provider.

Dry-run:

```bash
python scripts/provider_migrate.py --codex-dir ~/.codex --target-provider new_provider
```

Apply:

```bash
python scripts/provider_migrate.py --codex-dir ~/.codex --target-provider new_provider --apply
```

Behavior:

1. Update only `threads.model_provider` for `archived=0` rows.
2. Keep rollout files untouched.
3. Emit rollback CSV + SQLite backup in `.tmp/`.

## Phase 4: Final Verification

Run:

```bash
python scripts/session_audit.py --codex-dir ~/.codex
```

Check:

1. `db_not_on_disk == 0`
2. `missing_rollout_path == 0`
3. Active thread count matches expectation
4. Provider counts align with migration objective

## Reporting Format

Always report:

1. Before/after totals: `threads`, `archived`, `provider` distribution.
2. Integrity deltas: `disk_not_in_db`, `db_not_on_disk`, `missing_rollout_path`.
3. Concrete file/database artifacts created for rollback.

## References

1. Prompt templates for delegated migration runs: `references/prompts.md`
2. Audit script: `scripts/session_audit.py`
3. Repair script: `scripts/session_repair.py`
4. Provider migration script: `scripts/provider_migrate.py`

                
codex-session-migration/agents/openai.yaml

interface:
  display_name: "Codex Session Migration"
  short_description: "Safely migrate Codex sessions across API changes"
  default_prompt: "Use this skill to audit, repair, and migrate Codex sessions with dry-run first and rollback safety."

                
codex-session-migration/references/prompts.md

# Session Migration Prompt Templates

Use these templates when delegating migration work to another Codex agent.

## Template 1: Full Safe Recovery

```text
Use $codex-session-migration at <SKILL_PATH>.

Goal:
Recover and make historical sessions visible after API/provider switch.

Constraints:
1. Run read-only audit first.
2. Show dry-run before any write.
3. Create rollback artifacts before writes.
4. Avoid rollout content edits.

Execution:
1. Run session_audit.py and report counts/distributions/integrity mismatches.
2. Run session_repair.py dry-run; wait for my confirmation.
3. If confirmed, run session_repair.py --apply and report before/after.
4. If visibility is still blocked by provider filtering, run provider_migrate.py dry-run.
5. If confirmed, run provider_migrate.py --apply with target provider from config.toml.
6. Run final audit and produce acceptance checklist.
```

## Template 2: Provider-Only Visibility Migration

```text
Use $codex-session-migration at <SKILL_PATH>.

I only want provider visibility migration.
Do not modify rollout files, cwd, source, or archived state.

1. Run session_audit.py and show active provider distribution.
2. Run provider_migrate.py dry-run to target provider <TARGET_PROVIDER>.
3. Wait for confirmation.
4. Apply migration, then run final session_audit.py.
5. Report rollback paths (db backup + manifest csv).
```

## Template 3: Integrity Check Only

```text
Use $codex-session-migration at <SKILL_PATH>.

Read-only mode only.
Run session_audit.py and explain:
1) Whether sessions are missing or only filtered,
2) Whether DB/file index is consistent,
3) What minimal next step is recommended.
```

                
codex-session-migration/scripts/provider_migrate.py

#!/usr/bin/env python3
"""
Migrate threads.model_provider for visibility alignment after API/provider switches.
"""

from __future__ import annotations

import argparse
import csv
import sqlite3
from collections import Counter
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple


def make_db_backup(db_path: Path, codex_dir: Path, label: str) -> Path:
    tmp_dir = codex_dir / ".tmp"
    tmp_dir.mkdir(parents=True, exist_ok=True)
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_path = tmp_dir / f"state_5.sqlite.{label}_{ts}.bak"

    conn = sqlite3.connect(db_path)
    backup_conn = sqlite3.connect(backup_path)
    try:
        conn.backup(backup_conn)
    finally:
        backup_conn.close()
        conn.close()
    return backup_path


def load_rows(db_path: Path) -> List[Tuple[str, int, str]]:
    conn = sqlite3.connect(db_path)
    try:
        rows = conn.execute(
            "SELECT id, archived, COALESCE(model_provider, '') FROM threads"
        ).fetchall()
    finally:
        conn.close()
    return [(str(r[0]), int(r[1]), str(r[2])) for r in rows]


def provider_distribution(rows: List[Tuple[str, int, str]], active_only: bool = True) -> Dict[str, int]:
    counter: Counter = Counter()
    for _id, archived, provider in rows:
        if active_only and archived != 0:
            continue
        counter[provider] += 1
    return dict(counter.most_common())


def build_candidates(
    rows: List[Tuple[str, int, str]], target_provider: str
) -> List[Tuple[str, str, str]]:
    candidates: List[Tuple[str, str, str]] = []
    ts = datetime.now().isoformat(timespec="seconds")
    for thread_id, archived, old_provider in rows:
        if archived != 0:
            continue
        if old_provider == target_provider:
            continue
        candidates.append((thread_id, old_provider, ts))
    return candidates


def write_manifest(codex_dir: Path, target_provider: str, candidates: List[Tuple[str, str, str]]) -> Path:
    tmp_dir = codex_dir / ".tmp"
    tmp_dir.mkdir(parents=True, exist_ok=True)
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    manifest = tmp_dir / f"provider_migration_{ts}.csv"
    with manifest.open("w", encoding="utf-8", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["id", "old_provider", "new_provider", "ts"])
        for thread_id, old_provider, row_ts in candidates:
            writer.writerow([thread_id, old_provider, target_provider, row_ts])
    return manifest


def apply_migration(db_path: Path, target_provider: str) -> int:
    conn = sqlite3.connect(db_path)
    try:
        conn.isolation_level = None
        conn.execute("BEGIN IMMEDIATE")
        cur = conn.execute(
            "UPDATE threads SET model_provider=? WHERE archived=0 AND COALESCE(model_provider,'')<>?",
            (target_provider, target_provider),
        )
        changed = int(cur.rowcount if cur.rowcount is not None else 0)
        conn.execute("COMMIT")
        return changed
    except Exception:
        conn.execute("ROLLBACK")
        raise
    finally:
        conn.close()


def main() -> None:
    parser = argparse.ArgumentParser(
        description="Migrate active threads.model_provider to a target provider."
    )
    parser.add_argument(
        "--codex-dir",
        default=str(Path.home() / ".codex"),
        help="Path to Codex home directory (default: ~/.codex).",
    )
    parser.add_argument(
        "--target-provider",
        required=True,
        help="Provider value to set for active threads.",
    )
    parser.add_argument(
        "--apply",
        action="store_true",
        help="Apply changes. Default behavior is dry-run.",
    )
    parser.add_argument(
        "--sample",
        type=int,
        default=10,
        help="Show first N candidate rows.",
    )
    args = parser.parse_args()

    codex_dir = Path(args.codex_dir).expanduser().resolve()
    db_path = codex_dir / "state_5.sqlite"
    if not db_path.exists():
        raise FileNotFoundError(f"state_5.sqlite not found: {db_path}")

    rows_before = load_rows(db_path)
    dist_before = provider_distribution(rows_before, active_only=True)
    candidates = build_candidates(rows_before, args.target_provider)

    print("=== Provider Migration Plan ===")
    print(f"codex_dir: {codex_dir}")
    print(f"target_provider: {args.target_provider}")
    print(f"dry_run: {not args.apply}")
    print(f"candidates: {len(candidates)}")
    print(f"provider_distribution_before: {dist_before}")
    for thread_id, old_provider, _ts in candidates[: max(1, args.sample)]:
        print(f"  {thread_id}: {old_provider} -> {args.target_provider}")

    if not args.apply:
        print("No changes applied (dry-run).")
        return

    backup_path = make_db_backup(db_path, codex_dir, "pre_provider_migration")
    manifest_path = write_manifest(codex_dir, args.target_provider, candidates)
    changed = apply_migration(db_path, args.target_provider)

    rows_after = load_rows(db_path)
    dist_after = provider_distribution(rows_after, active_only=True)

    print("")
    print("=== Provider Migration Result ===")
    print(f"db_backup: {backup_path}")
    print(f"manifest_csv: {manifest_path}")
    print(f"updated_rows: {changed}")
    print(f"provider_distribution_after: {dist_after}")


if __name__ == "__main__":
    main()

                
codex-session-migration/scripts/session_audit.py

#!/usr/bin/env python3
"""
Read-only integrity audit for Codex session storage.
"""

from __future__ import annotations

import argparse
import json
import re
import sqlite3
from collections import Counter
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Tuple

ROLLOUT_RE = re.compile(r"rollout-[0-9T-]+-([0-9a-f-]{36})\.jsonl$", re.IGNORECASE)


@dataclass
class ThreadRow:
    thread_id: str
    archived: int
    model_provider: str
    cwd: str
    rollout_path: str


def parse_rollout_id(path: Path) -> str | None:
    match = ROLLOUT_RE.search(path.name)
    if not match:
        return None
    return match.group(1)


def collect_rollouts(codex_dir: Path) -> Tuple[List[Path], List[Path], Dict[str, List[Path]]]:
    sessions_dir = codex_dir / "sessions"
    archived_dir = codex_dir / "archived_sessions"

    session_files = sorted(sessions_dir.rglob("rollout-*.jsonl")) if sessions_dir.exists() else []
    archived_files = sorted(archived_dir.rglob("rollout-*.jsonl")) if archived_dir.exists() else []

    id_to_paths: Dict[str, List[Path]] = {}
    for file_path in session_files + archived_files:
        rollout_id = parse_rollout_id(file_path)
        if rollout_id:
            id_to_paths.setdefault(rollout_id, []).append(file_path)
    return session_files, archived_files, id_to_paths


def load_threads(db_path: Path) -> List[ThreadRow]:
    conn = sqlite3.connect(db_path)
    try:
        rows = conn.execute(
            "SELECT id, archived, model_provider, cwd, rollout_path FROM threads"
        ).fetchall()
    finally:
        conn.close()

    return [
        ThreadRow(
            thread_id=str(row[0]),
            archived=int(row[1]),
            model_provider=str(row[2] or ""),
            cwd=str(row[3] or ""),
            rollout_path=str(row[4] or ""),
        )
        for row in rows
    ]


def summarize(rows: List[ThreadRow]) -> Dict[str, Counter]:
    provider_counter: Counter = Counter()
    cwd_counter: Counter = Counter()
    for row in rows:
        provider_counter[row.model_provider] += 1
        cwd_counter[row.cwd] += 1
    return {"provider": provider_counter, "cwd": cwd_counter}


def sample(items: List[str], max_items: int) -> List[str]:
    if len(items) <= max_items:
        return items
    return items[:max_items]


def build_report(codex_dir: Path, max_samples: int) -> Dict[str, object]:
    db_path = codex_dir / "state_5.sqlite"
    if not db_path.exists():
        raise FileNotFoundError(f"state_5.sqlite not found: {db_path}")

    session_files, archived_files, id_to_paths = collect_rollouts(codex_dir)
    thread_rows = load_threads(db_path)

    disk_ids = set(id_to_paths.keys())
    db_ids = {row.thread_id for row in thread_rows}

    disk_not_in_db = sorted(disk_ids - db_ids)
    db_not_on_disk = sorted(db_ids - disk_ids)

    missing_rollout = sorted(
        [row.thread_id for row in thread_rows if row.rollout_path and not Path(row.rollout_path).exists()]
    )

    archived_count = sum(1 for row in thread_rows if row.archived == 1)
    active_count = sum(1 for row in thread_rows if row.archived == 0)

    summary = summarize(thread_rows)

    duplicated_rollouts = sorted([rid for rid, paths in id_to_paths.items() if len(paths) > 1])
    duplicated_rollout_paths = {
        rid: [str(p) for p in paths] for rid, paths in id_to_paths.items() if len(paths) > 1
    }

    report = {
        "codex_dir": str(codex_dir),
        "counts": {
            "sessions_rollouts": len(session_files),
            "archived_rollouts": len(archived_files),
            "rollouts_total": len(session_files) + len(archived_files),
            "threads_total": len(thread_rows),
            "threads_active": active_count,
            "threads_archived": archived_count,
        },
        "distribution": {
            "model_provider": dict(summary["provider"].most_common()),
            "cwd": dict(summary["cwd"].most_common()),
        },
        "integrity": {
            "disk_not_in_db_count": len(disk_not_in_db),
            "db_not_on_disk_count": len(db_not_on_disk),
            "missing_rollout_path_count": len(missing_rollout),
            "duplicate_rollout_id_count": len(duplicated_rollouts),
            "disk_not_in_db_sample": sample(disk_not_in_db, max_samples),
            "db_not_on_disk_sample": sample(db_not_on_disk, max_samples),
            "missing_rollout_path_sample": sample(missing_rollout, max_samples),
            "duplicate_rollout_id_sample": sample(duplicated_rollouts, max_samples),
            "duplicate_rollout_paths": {
                key: duplicated_rollout_paths[key] for key in sample(duplicated_rollouts, max_samples)
            },
        },
    }
    return report


def print_text_report(report: Dict[str, object]) -> None:
    counts = report["counts"]
    integrity = report["integrity"]
    distribution = report["distribution"]

    print("=== Codex Session Audit ===")
    print(f"codex_dir: {report['codex_dir']}")
    print("")
    print("counts:")
    print(f"  sessions_rollouts: {counts['sessions_rollouts']}")
    print(f"  archived_rollouts: {counts['archived_rollouts']}")
    print(f"  rollouts_total: {counts['rollouts_total']}")
    print(f"  threads_total: {counts['threads_total']}")
    print(f"  threads_active: {counts['threads_active']}")
    print(f"  threads_archived: {counts['threads_archived']}")
    print("")
    print("model_provider distribution:")
    for provider, value in distribution["model_provider"].items():
        print(f"  {provider or '<empty>'}: {value}")
    print("")
    print("cwd distribution (top shown):")
    shown = 0
    for cwd, value in distribution["cwd"].items():
        print(f"  {cwd or '<empty>'}: {value}")
        shown += 1
        if shown >= 12:
            break
    print("")
    print("integrity:")
    print(f"  disk_not_in_db_count: {integrity['disk_not_in_db_count']}")
    print(f"  db_not_on_disk_count: {integrity['db_not_on_disk_count']}")
    print(f"  missing_rollout_path_count: {integrity['missing_rollout_path_count']}")
    print(f"  duplicate_rollout_id_count: {integrity['duplicate_rollout_id_count']}")
    print(f"  disk_not_in_db_sample: {integrity['disk_not_in_db_sample']}")
    print(f"  db_not_on_disk_sample: {integrity['db_not_on_disk_sample']}")
    print(f"  missing_rollout_path_sample: {integrity['missing_rollout_path_sample']}")
    print(f"  duplicate_rollout_id_sample: {integrity['duplicate_rollout_id_sample']}")


def main() -> None:
    parser = argparse.ArgumentParser(description="Read-only audit for Codex session integrity.")
    parser.add_argument(
        "--codex-dir",
        default=str(Path.home() / ".codex"),
        help="Path to Codex home directory (default: ~/.codex).",
    )
    parser.add_argument(
        "--max-samples",
        type=int,
        default=10,
        help="Maximum sample IDs per mismatch category.",
    )
    parser.add_argument("--json", action="store_true", help="Print JSON output.")
    args = parser.parse_args()

    codex_dir = Path(args.codex_dir).expanduser().resolve()
    report = build_report(codex_dir, max_samples=max(1, args.max_samples))
    if args.json:
        print(json.dumps(report, ensure_ascii=False, indent=2))
    else:
        print_text_report(report)


if __name__ == "__main__":
    main()

                
codex-session-migration/scripts/session_repair.py

#!/usr/bin/env python3
"""
Repair rollout-path mismatches and unarchive threads with explicit dry-run control.
"""

from __future__ import annotations

import argparse
import shutil
import sqlite3
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import List


@dataclass
class ThreadRow:
    thread_id: str
    archived: int
    rollout_path: str


@dataclass
class MovePlan:
    thread_id: str
    source: Path
    target: Path


def load_threads(db_path: Path) -> List[ThreadRow]:
    conn = sqlite3.connect(db_path)
    try:
        rows = conn.execute("SELECT id, archived, rollout_path FROM threads").fetchall()
    finally:
        conn.close()
    return [ThreadRow(thread_id=str(r[0]), archived=int(r[1]), rollout_path=str(r[2] or "")) for r in rows]


def discover_move_plans(codex_dir: Path, rows: List[ThreadRow]) -> List[MovePlan]:
    plans: List[MovePlan] = []
    candidate_bases = [codex_dir / "sessions", codex_dir / "archived_sessions"]

    for row in rows:
        if not row.rollout_path:
            continue
        target = Path(row.rollout_path)
        if target.exists():
            continue

        filename = target.name
        candidates: List[Path] = []
        for base in candidate_bases:
            if base.exists():
                candidates.extend(base.rglob(filename))

        unique_candidates = sorted(set(candidates))
        if len(unique_candidates) == 1 and unique_candidates[0] != target:
            plans.append(MovePlan(thread_id=row.thread_id, source=unique_candidates[0], target=target))
    return plans


def count_archived(rows: List[ThreadRow]) -> int:
    return sum(1 for row in rows if row.archived == 1)


def make_db_backup(db_path: Path, codex_dir: Path, label: str) -> Path:
    tmp_dir = codex_dir / ".tmp"
    tmp_dir.mkdir(parents=True, exist_ok=True)
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_path = tmp_dir / f"state_5.sqlite.{label}_{ts}.bak"

    conn = sqlite3.connect(db_path)
    backup_conn = sqlite3.connect(backup_path)
    try:
        conn.backup(backup_conn)
    finally:
        backup_conn.close()
        conn.close()
    return backup_path


def apply_moves(plans: List[MovePlan]) -> int:
    moved = 0
    for plan in plans:
        plan.target.parent.mkdir(parents=True, exist_ok=True)
        shutil.move(str(plan.source), str(plan.target))
        moved += 1
    return moved


def apply_unarchive(db_path: Path) -> int:
    conn = sqlite3.connect(db_path)
    try:
        conn.isolation_level = None
        conn.execute("BEGIN IMMEDIATE")
        cur = conn.execute(
            "UPDATE threads SET archived=0, archived_at=NULL WHERE archived=1"
        )
        updated = int(cur.rowcount if cur.rowcount is not None else 0)
        conn.execute("COMMIT")
        return updated
    except Exception:
        conn.execute("ROLLBACK")
        raise
    finally:
        conn.close()


def main() -> None:
    parser = argparse.ArgumentParser(
        description="Repair Codex session index/path mismatches and unarchive threads."
    )
    parser.add_argument(
        "--codex-dir",
        default=str(Path.home() / ".codex"),
        help="Path to Codex home directory (default: ~/.codex).",
    )
    parser.add_argument(
        "--apply",
        action="store_true",
        help="Apply changes. Default behavior is dry-run.",
    )
    parser.add_argument(
        "--no-fix-paths",
        action="store_true",
        help="Skip rollout-path file move repairs.",
    )
    parser.add_argument(
        "--no-unarchive",
        action="store_true",
        help="Skip archived->active update.",
    )
    parser.add_argument(
        "--sample",
        type=int,
        default=10,
        help="Number of planned move entries to show.",
    )
    args = parser.parse_args()

    codex_dir = Path(args.codex_dir).expanduser().resolve()
    db_path = codex_dir / "state_5.sqlite"
    if not db_path.exists():
        raise FileNotFoundError(f"state_5.sqlite not found: {db_path}")

    rows_before = load_threads(db_path)
    archived_before = count_archived(rows_before)
    plans = [] if args.no_fix_paths else discover_move_plans(codex_dir, rows_before)

    print("=== Session Repair Plan ===")
    print(f"codex_dir: {codex_dir}")
    print(f"dry_run: {not args.apply}")
    print(f"path_fix_enabled: {not args.no_fix_paths}")
    print(f"unarchive_enabled: {not args.no_unarchive}")
    print(f"planned_path_moves: {len(plans)}")
    print(f"archived_before: {archived_before}")

    for plan in plans[: max(1, args.sample)]:
        print(f"  move {plan.thread_id}: {plan.source} -> {plan.target}")

    if not args.apply:
        print("No changes applied (dry-run).")
        return

    backup_path = make_db_backup(db_path, codex_dir, "pre_repair")
    moved_count = 0
    unarchived_count = 0

    if not args.no_fix_paths:
        moved_count = apply_moves(plans)

    if not args.no_unarchive:
        unarchived_count = apply_unarchive(db_path)

    rows_after = load_threads(db_path)
    archived_after = count_archived(rows_after)

    print("")
    print("=== Session Repair Result ===")
    print(f"db_backup: {backup_path}")
    print(f"path_moves_applied: {moved_count}")
    print(f"unarchived_rows: {unarchived_count}")
    print(f"archived_after: {archived_after}")


if __name__ == "__main__":
    main()