记录日期:2026-04-21 | 更新日期:2026-04-22
在切换 API key、provider 或工作区后,Codex 历史会话在客户端中可能出现“只显示少量新会话”的现象。本文目标是给出一套可复用、可回滚、低风险的 sessions recovery 方法,优先恢复可见性与索引一致性,不改 rollout 正文内容。
.codexstate_5.sqlite(threads 表)sessions/ 与 archived_sessions/本文采用的核心脚本能力:session_audit.py(只读审计)、session_repair.py(路径修复/反归档)、provider_migrate.py(可见性迁移)。
| 阶段 | 动作 | 结果 |
|---|---|---|
| A 只读检查 | 统计 rollout、threads 与一致性指标 | 发现归档会话与少量孤立 rollout,明确问题边界 |
| B dry-run | 预估路径修复 + 反归档影响 | 输出可执行清单,等待确认后写入 |
| C 安全修复 | 路径修复(如可唯一定位)+ 反归档 | 历史会话索引恢复,归档状态回到可见集 |
| D 孤立会话核验 | 检查 orphan 是否为活动会话 | 活动会话标记为 ACTIVE_ORPHAN,禁止导入 |
| E 可见性对齐 | provider 诊断 + provider 迁移 | 当前 provider 下可见会话显著提升;再按 cwd 验证 |
仅执行 DB 层 provider 迁移,不修改 rollout/cwd/source/archived。推荐 SQL 模板如下:
UPDATE threads
SET model_provider='<current_provider>'
WHERE archived=0 AND model_provider<>'<current_provider>';
state_5.sqlite.pre_provider_migration_<timestamp>.bakprovider_migration_<timestamp>.csv(字段:id,old_provider,new_provider,ts)回滚原则:按 CSV 将每个 id 的 provider 恢复为 old_provider;全程单事务,确保可恢复。
以下为迁移技能包内文件全文(文本展开版),用于静态网站直接查阅,无需开发附件上传/下载功能。
---
name: codex-session-migration
description: Safely audit, repair, and migrate Codex session visibility when sessions appear missing after switching API keys/providers, moving machines, or changing workspaces. Use for `~/.codex` troubleshooting involving `state_5.sqlite`, `sessions/`, `archived_sessions/`, provider/cwd filtering, path mismatches, and reversible migration with dry-run plus backup.
---
# Codex Session Migration
## Overview
Run a repeatable, low-risk session migration workflow for Codex data directories. Prioritize read-only diagnostics, explicit dry-run summaries, and reversible write operations.
## Workflow
1. Stop active Codex processes before any write operation.
2. Run read-only audit first and report counts.
3. Repair path mismatches and unarchive sessions when needed.
4. Migrate `model_provider` only when visibility still fails.
5. Re-run integrity checks and report before/after deltas.
## Preconditions
1. Use this skill against a Codex home directory, typically `~/.codex` on Linux or `%USERPROFILE%\\.codex` on Windows.
2. Keep at least one backup copy of the entire `.codex` folder before migration.
3. Treat `state_5.sqlite` as authoritative thread index; treat `sessions/` and `archived_sessions/` as rollout storage.
4. Do not edit rollout message content.
## Phase 1: Audit (Read-Only)
Run:
```bash
python scripts/session_audit.py --codex-dir ~/.codex
```
Confirm:
1. `threads_total` and rollout file totals are sensible.
2. `db_not_on_disk == 0`.
3. `missing_rollout_path == 0` or only known mismatches.
4. `model_provider`/`cwd` distribution explains visibility behavior.
Interpretation:
1. `disk_not_in_db > 0` often means active/incomplete session files.
2. `provider` skew usually causes "only newest session visible".
3. `cwd` skew causes partial visibility by workspace.
## Phase 2: Repair Paths and Unarchive
Dry-run first:
```bash
python scripts/session_repair.py --codex-dir ~/.codex
```
Apply only after review:
```bash
python scripts/session_repair.py --codex-dir ~/.codex --apply
```
Behavior:
1. Move uniquely matched rollout files to expected `threads.rollout_path` targets.
2. Set `archived=0, archived_at=NULL` for archived threads.
3. Create SQLite backup in `.tmp/`.
## Phase 3: Provider Visibility Migration (Optional)
Run only if sessions remain hidden under the new API/provider.
Dry-run:
```bash
python scripts/provider_migrate.py --codex-dir ~/.codex --target-provider new_provider
```
Apply:
```bash
python scripts/provider_migrate.py --codex-dir ~/.codex --target-provider new_provider --apply
```
Behavior:
1. Update only `threads.model_provider` for `archived=0` rows.
2. Keep rollout files untouched.
3. Emit rollback CSV + SQLite backup in `.tmp/`.
## Phase 4: Final Verification
Run:
```bash
python scripts/session_audit.py --codex-dir ~/.codex
```
Check:
1. `db_not_on_disk == 0`
2. `missing_rollout_path == 0`
3. Active thread count matches expectation
4. Provider counts align with migration objective
## Reporting Format
Always report:
1. Before/after totals: `threads`, `archived`, `provider` distribution.
2. Integrity deltas: `disk_not_in_db`, `db_not_on_disk`, `missing_rollout_path`.
3. Concrete file/database artifacts created for rollback.
## References
1. Prompt templates for delegated migration runs: `references/prompts.md`
2. Audit script: `scripts/session_audit.py`
3. Repair script: `scripts/session_repair.py`
4. Provider migration script: `scripts/provider_migrate.py`
interface:
display_name: "Codex Session Migration"
short_description: "Safely migrate Codex sessions across API changes"
default_prompt: "Use this skill to audit, repair, and migrate Codex sessions with dry-run first and rollback safety."
# Session Migration Prompt Templates
Use these templates when delegating migration work to another Codex agent.
## Template 1: Full Safe Recovery
```text
Use $codex-session-migration at <SKILL_PATH>.
Goal:
Recover and make historical sessions visible after API/provider switch.
Constraints:
1. Run read-only audit first.
2. Show dry-run before any write.
3. Create rollback artifacts before writes.
4. Avoid rollout content edits.
Execution:
1. Run session_audit.py and report counts/distributions/integrity mismatches.
2. Run session_repair.py dry-run; wait for my confirmation.
3. If confirmed, run session_repair.py --apply and report before/after.
4. If visibility is still blocked by provider filtering, run provider_migrate.py dry-run.
5. If confirmed, run provider_migrate.py --apply with target provider from config.toml.
6. Run final audit and produce acceptance checklist.
```
## Template 2: Provider-Only Visibility Migration
```text
Use $codex-session-migration at <SKILL_PATH>.
I only want provider visibility migration.
Do not modify rollout files, cwd, source, or archived state.
1. Run session_audit.py and show active provider distribution.
2. Run provider_migrate.py dry-run to target provider <TARGET_PROVIDER>.
3. Wait for confirmation.
4. Apply migration, then run final session_audit.py.
5. Report rollback paths (db backup + manifest csv).
```
## Template 3: Integrity Check Only
```text
Use $codex-session-migration at <SKILL_PATH>.
Read-only mode only.
Run session_audit.py and explain:
1) Whether sessions are missing or only filtered,
2) Whether DB/file index is consistent,
3) What minimal next step is recommended.
```
#!/usr/bin/env python3
"""
Migrate threads.model_provider for visibility alignment after API/provider switches.
"""
from __future__ import annotations
import argparse
import csv
import sqlite3
from collections import Counter
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple
def make_db_backup(db_path: Path, codex_dir: Path, label: str) -> Path:
tmp_dir = codex_dir / ".tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = tmp_dir / f"state_5.sqlite.{label}_{ts}.bak"
conn = sqlite3.connect(db_path)
backup_conn = sqlite3.connect(backup_path)
try:
conn.backup(backup_conn)
finally:
backup_conn.close()
conn.close()
return backup_path
def load_rows(db_path: Path) -> List[Tuple[str, int, str]]:
conn = sqlite3.connect(db_path)
try:
rows = conn.execute(
"SELECT id, archived, COALESCE(model_provider, '') FROM threads"
).fetchall()
finally:
conn.close()
return [(str(r[0]), int(r[1]), str(r[2])) for r in rows]
def provider_distribution(rows: List[Tuple[str, int, str]], active_only: bool = True) -> Dict[str, int]:
counter: Counter = Counter()
for _id, archived, provider in rows:
if active_only and archived != 0:
continue
counter[provider] += 1
return dict(counter.most_common())
def build_candidates(
rows: List[Tuple[str, int, str]], target_provider: str
) -> List[Tuple[str, str, str]]:
candidates: List[Tuple[str, str, str]] = []
ts = datetime.now().isoformat(timespec="seconds")
for thread_id, archived, old_provider in rows:
if archived != 0:
continue
if old_provider == target_provider:
continue
candidates.append((thread_id, old_provider, ts))
return candidates
def write_manifest(codex_dir: Path, target_provider: str, candidates: List[Tuple[str, str, str]]) -> Path:
tmp_dir = codex_dir / ".tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
manifest = tmp_dir / f"provider_migration_{ts}.csv"
with manifest.open("w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow(["id", "old_provider", "new_provider", "ts"])
for thread_id, old_provider, row_ts in candidates:
writer.writerow([thread_id, old_provider, target_provider, row_ts])
return manifest
def apply_migration(db_path: Path, target_provider: str) -> int:
conn = sqlite3.connect(db_path)
try:
conn.isolation_level = None
conn.execute("BEGIN IMMEDIATE")
cur = conn.execute(
"UPDATE threads SET model_provider=? WHERE archived=0 AND COALESCE(model_provider,'')<>?",
(target_provider, target_provider),
)
changed = int(cur.rowcount if cur.rowcount is not None else 0)
conn.execute("COMMIT")
return changed
except Exception:
conn.execute("ROLLBACK")
raise
finally:
conn.close()
def main() -> None:
parser = argparse.ArgumentParser(
description="Migrate active threads.model_provider to a target provider."
)
parser.add_argument(
"--codex-dir",
default=str(Path.home() / ".codex"),
help="Path to Codex home directory (default: ~/.codex).",
)
parser.add_argument(
"--target-provider",
required=True,
help="Provider value to set for active threads.",
)
parser.add_argument(
"--apply",
action="store_true",
help="Apply changes. Default behavior is dry-run.",
)
parser.add_argument(
"--sample",
type=int,
default=10,
help="Show first N candidate rows.",
)
args = parser.parse_args()
codex_dir = Path(args.codex_dir).expanduser().resolve()
db_path = codex_dir / "state_5.sqlite"
if not db_path.exists():
raise FileNotFoundError(f"state_5.sqlite not found: {db_path}")
rows_before = load_rows(db_path)
dist_before = provider_distribution(rows_before, active_only=True)
candidates = build_candidates(rows_before, args.target_provider)
print("=== Provider Migration Plan ===")
print(f"codex_dir: {codex_dir}")
print(f"target_provider: {args.target_provider}")
print(f"dry_run: {not args.apply}")
print(f"candidates: {len(candidates)}")
print(f"provider_distribution_before: {dist_before}")
for thread_id, old_provider, _ts in candidates[: max(1, args.sample)]:
print(f" {thread_id}: {old_provider} -> {args.target_provider}")
if not args.apply:
print("No changes applied (dry-run).")
return
backup_path = make_db_backup(db_path, codex_dir, "pre_provider_migration")
manifest_path = write_manifest(codex_dir, args.target_provider, candidates)
changed = apply_migration(db_path, args.target_provider)
rows_after = load_rows(db_path)
dist_after = provider_distribution(rows_after, active_only=True)
print("")
print("=== Provider Migration Result ===")
print(f"db_backup: {backup_path}")
print(f"manifest_csv: {manifest_path}")
print(f"updated_rows: {changed}")
print(f"provider_distribution_after: {dist_after}")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
Read-only integrity audit for Codex session storage.
"""
from __future__ import annotations
import argparse
import json
import re
import sqlite3
from collections import Counter
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Tuple
ROLLOUT_RE = re.compile(r"rollout-[0-9T-]+-([0-9a-f-]{36})\.jsonl$", re.IGNORECASE)
@dataclass
class ThreadRow:
thread_id: str
archived: int
model_provider: str
cwd: str
rollout_path: str
def parse_rollout_id(path: Path) -> str | None:
match = ROLLOUT_RE.search(path.name)
if not match:
return None
return match.group(1)
def collect_rollouts(codex_dir: Path) -> Tuple[List[Path], List[Path], Dict[str, List[Path]]]:
sessions_dir = codex_dir / "sessions"
archived_dir = codex_dir / "archived_sessions"
session_files = sorted(sessions_dir.rglob("rollout-*.jsonl")) if sessions_dir.exists() else []
archived_files = sorted(archived_dir.rglob("rollout-*.jsonl")) if archived_dir.exists() else []
id_to_paths: Dict[str, List[Path]] = {}
for file_path in session_files + archived_files:
rollout_id = parse_rollout_id(file_path)
if rollout_id:
id_to_paths.setdefault(rollout_id, []).append(file_path)
return session_files, archived_files, id_to_paths
def load_threads(db_path: Path) -> List[ThreadRow]:
conn = sqlite3.connect(db_path)
try:
rows = conn.execute(
"SELECT id, archived, model_provider, cwd, rollout_path FROM threads"
).fetchall()
finally:
conn.close()
return [
ThreadRow(
thread_id=str(row[0]),
archived=int(row[1]),
model_provider=str(row[2] or ""),
cwd=str(row[3] or ""),
rollout_path=str(row[4] or ""),
)
for row in rows
]
def summarize(rows: List[ThreadRow]) -> Dict[str, Counter]:
provider_counter: Counter = Counter()
cwd_counter: Counter = Counter()
for row in rows:
provider_counter[row.model_provider] += 1
cwd_counter[row.cwd] += 1
return {"provider": provider_counter, "cwd": cwd_counter}
def sample(items: List[str], max_items: int) -> List[str]:
if len(items) <= max_items:
return items
return items[:max_items]
def build_report(codex_dir: Path, max_samples: int) -> Dict[str, object]:
db_path = codex_dir / "state_5.sqlite"
if not db_path.exists():
raise FileNotFoundError(f"state_5.sqlite not found: {db_path}")
session_files, archived_files, id_to_paths = collect_rollouts(codex_dir)
thread_rows = load_threads(db_path)
disk_ids = set(id_to_paths.keys())
db_ids = {row.thread_id for row in thread_rows}
disk_not_in_db = sorted(disk_ids - db_ids)
db_not_on_disk = sorted(db_ids - disk_ids)
missing_rollout = sorted(
[row.thread_id for row in thread_rows if row.rollout_path and not Path(row.rollout_path).exists()]
)
archived_count = sum(1 for row in thread_rows if row.archived == 1)
active_count = sum(1 for row in thread_rows if row.archived == 0)
summary = summarize(thread_rows)
duplicated_rollouts = sorted([rid for rid, paths in id_to_paths.items() if len(paths) > 1])
duplicated_rollout_paths = {
rid: [str(p) for p in paths] for rid, paths in id_to_paths.items() if len(paths) > 1
}
report = {
"codex_dir": str(codex_dir),
"counts": {
"sessions_rollouts": len(session_files),
"archived_rollouts": len(archived_files),
"rollouts_total": len(session_files) + len(archived_files),
"threads_total": len(thread_rows),
"threads_active": active_count,
"threads_archived": archived_count,
},
"distribution": {
"model_provider": dict(summary["provider"].most_common()),
"cwd": dict(summary["cwd"].most_common()),
},
"integrity": {
"disk_not_in_db_count": len(disk_not_in_db),
"db_not_on_disk_count": len(db_not_on_disk),
"missing_rollout_path_count": len(missing_rollout),
"duplicate_rollout_id_count": len(duplicated_rollouts),
"disk_not_in_db_sample": sample(disk_not_in_db, max_samples),
"db_not_on_disk_sample": sample(db_not_on_disk, max_samples),
"missing_rollout_path_sample": sample(missing_rollout, max_samples),
"duplicate_rollout_id_sample": sample(duplicated_rollouts, max_samples),
"duplicate_rollout_paths": {
key: duplicated_rollout_paths[key] for key in sample(duplicated_rollouts, max_samples)
},
},
}
return report
def print_text_report(report: Dict[str, object]) -> None:
counts = report["counts"]
integrity = report["integrity"]
distribution = report["distribution"]
print("=== Codex Session Audit ===")
print(f"codex_dir: {report['codex_dir']}")
print("")
print("counts:")
print(f" sessions_rollouts: {counts['sessions_rollouts']}")
print(f" archived_rollouts: {counts['archived_rollouts']}")
print(f" rollouts_total: {counts['rollouts_total']}")
print(f" threads_total: {counts['threads_total']}")
print(f" threads_active: {counts['threads_active']}")
print(f" threads_archived: {counts['threads_archived']}")
print("")
print("model_provider distribution:")
for provider, value in distribution["model_provider"].items():
print(f" {provider or '<empty>'}: {value}")
print("")
print("cwd distribution (top shown):")
shown = 0
for cwd, value in distribution["cwd"].items():
print(f" {cwd or '<empty>'}: {value}")
shown += 1
if shown >= 12:
break
print("")
print("integrity:")
print(f" disk_not_in_db_count: {integrity['disk_not_in_db_count']}")
print(f" db_not_on_disk_count: {integrity['db_not_on_disk_count']}")
print(f" missing_rollout_path_count: {integrity['missing_rollout_path_count']}")
print(f" duplicate_rollout_id_count: {integrity['duplicate_rollout_id_count']}")
print(f" disk_not_in_db_sample: {integrity['disk_not_in_db_sample']}")
print(f" db_not_on_disk_sample: {integrity['db_not_on_disk_sample']}")
print(f" missing_rollout_path_sample: {integrity['missing_rollout_path_sample']}")
print(f" duplicate_rollout_id_sample: {integrity['duplicate_rollout_id_sample']}")
def main() -> None:
parser = argparse.ArgumentParser(description="Read-only audit for Codex session integrity.")
parser.add_argument(
"--codex-dir",
default=str(Path.home() / ".codex"),
help="Path to Codex home directory (default: ~/.codex).",
)
parser.add_argument(
"--max-samples",
type=int,
default=10,
help="Maximum sample IDs per mismatch category.",
)
parser.add_argument("--json", action="store_true", help="Print JSON output.")
args = parser.parse_args()
codex_dir = Path(args.codex_dir).expanduser().resolve()
report = build_report(codex_dir, max_samples=max(1, args.max_samples))
if args.json:
print(json.dumps(report, ensure_ascii=False, indent=2))
else:
print_text_report(report)
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
Repair rollout-path mismatches and unarchive threads with explicit dry-run control.
"""
from __future__ import annotations
import argparse
import shutil
import sqlite3
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import List
@dataclass
class ThreadRow:
thread_id: str
archived: int
rollout_path: str
@dataclass
class MovePlan:
thread_id: str
source: Path
target: Path
def load_threads(db_path: Path) -> List[ThreadRow]:
conn = sqlite3.connect(db_path)
try:
rows = conn.execute("SELECT id, archived, rollout_path FROM threads").fetchall()
finally:
conn.close()
return [ThreadRow(thread_id=str(r[0]), archived=int(r[1]), rollout_path=str(r[2] or "")) for r in rows]
def discover_move_plans(codex_dir: Path, rows: List[ThreadRow]) -> List[MovePlan]:
plans: List[MovePlan] = []
candidate_bases = [codex_dir / "sessions", codex_dir / "archived_sessions"]
for row in rows:
if not row.rollout_path:
continue
target = Path(row.rollout_path)
if target.exists():
continue
filename = target.name
candidates: List[Path] = []
for base in candidate_bases:
if base.exists():
candidates.extend(base.rglob(filename))
unique_candidates = sorted(set(candidates))
if len(unique_candidates) == 1 and unique_candidates[0] != target:
plans.append(MovePlan(thread_id=row.thread_id, source=unique_candidates[0], target=target))
return plans
def count_archived(rows: List[ThreadRow]) -> int:
return sum(1 for row in rows if row.archived == 1)
def make_db_backup(db_path: Path, codex_dir: Path, label: str) -> Path:
tmp_dir = codex_dir / ".tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = tmp_dir / f"state_5.sqlite.{label}_{ts}.bak"
conn = sqlite3.connect(db_path)
backup_conn = sqlite3.connect(backup_path)
try:
conn.backup(backup_conn)
finally:
backup_conn.close()
conn.close()
return backup_path
def apply_moves(plans: List[MovePlan]) -> int:
moved = 0
for plan in plans:
plan.target.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(plan.source), str(plan.target))
moved += 1
return moved
def apply_unarchive(db_path: Path) -> int:
conn = sqlite3.connect(db_path)
try:
conn.isolation_level = None
conn.execute("BEGIN IMMEDIATE")
cur = conn.execute(
"UPDATE threads SET archived=0, archived_at=NULL WHERE archived=1"
)
updated = int(cur.rowcount if cur.rowcount is not None else 0)
conn.execute("COMMIT")
return updated
except Exception:
conn.execute("ROLLBACK")
raise
finally:
conn.close()
def main() -> None:
parser = argparse.ArgumentParser(
description="Repair Codex session index/path mismatches and unarchive threads."
)
parser.add_argument(
"--codex-dir",
default=str(Path.home() / ".codex"),
help="Path to Codex home directory (default: ~/.codex).",
)
parser.add_argument(
"--apply",
action="store_true",
help="Apply changes. Default behavior is dry-run.",
)
parser.add_argument(
"--no-fix-paths",
action="store_true",
help="Skip rollout-path file move repairs.",
)
parser.add_argument(
"--no-unarchive",
action="store_true",
help="Skip archived->active update.",
)
parser.add_argument(
"--sample",
type=int,
default=10,
help="Number of planned move entries to show.",
)
args = parser.parse_args()
codex_dir = Path(args.codex_dir).expanduser().resolve()
db_path = codex_dir / "state_5.sqlite"
if not db_path.exists():
raise FileNotFoundError(f"state_5.sqlite not found: {db_path}")
rows_before = load_threads(db_path)
archived_before = count_archived(rows_before)
plans = [] if args.no_fix_paths else discover_move_plans(codex_dir, rows_before)
print("=== Session Repair Plan ===")
print(f"codex_dir: {codex_dir}")
print(f"dry_run: {not args.apply}")
print(f"path_fix_enabled: {not args.no_fix_paths}")
print(f"unarchive_enabled: {not args.no_unarchive}")
print(f"planned_path_moves: {len(plans)}")
print(f"archived_before: {archived_before}")
for plan in plans[: max(1, args.sample)]:
print(f" move {plan.thread_id}: {plan.source} -> {plan.target}")
if not args.apply:
print("No changes applied (dry-run).")
return
backup_path = make_db_backup(db_path, codex_dir, "pre_repair")
moved_count = 0
unarchived_count = 0
if not args.no_fix_paths:
moved_count = apply_moves(plans)
if not args.no_unarchive:
unarchived_count = apply_unarchive(db_path)
rows_after = load_threads(db_path)
archived_after = count_archived(rows_after)
print("")
print("=== Session Repair Result ===")
print(f"db_backup: {backup_path}")
print(f"path_moves_applied: {moved_count}")
print(f"unarchived_rows: {unarchived_count}")
print(f"archived_after: {archived_after}")
if __name__ == "__main__":
main()