Zion Boggan
repos/Oversight/oversight_core/safe_io.py
zionboggan.com ↗
127 lines · python
History for this file →
1
"""Filesystem safety helpers for key and sealed-file writes."""
2
 
3
from __future__ import annotations
4
 
5
import json
6
import os
7
from pathlib import Path
8
import subprocess
9
import tempfile
10
from typing import Iterable
11
 
12
 
13
WINDOWS_RESERVED_NAMES = {
14
    "CON", "PRN", "AUX", "NUL",
15
    *(f"COM{i}" for i in range(1, 10)),
16
    *(f"LPT{i}" for i in range(1, 10)),
17
}
18
 
19
 
20
def is_windows_reserved_path(path: Path) -> bool:
21
    """Return True if the final path component targets a Windows device name."""
22
    name = path.name.rstrip(" .")
23
    if not name:
24
        return False
25
    return name.split(".", 1)[0].upper() in WINDOWS_RESERVED_NAMES
26
 
27
 
28
def is_private_key_file(path: Path) -> bool:
29
    """Best-effort detection for Oversight private identity JSON files."""
30
    if not path.exists() or not path.is_file():
31
        return False
32
    try:
33
        data = json.loads(path.read_text(encoding="utf-8"))
34
    except (OSError, UnicodeDecodeError, json.JSONDecodeError):
35
        return False
36
    return is_private_key_dict(data)
37
 
38
 
39
def is_private_key_dict(data: object) -> bool:
40
    return (
41
        isinstance(data, dict)
42
        and isinstance(data.get("x25519_priv"), str)
43
        and isinstance(data.get("ed25519_priv"), str)
44
    )
45
 
46
 
47
def same_path(a: Path, b: Path) -> bool:
48
    try:
49
        return a.resolve(strict=False) == b.resolve(strict=False)
50
    except OSError:
51
        return os.path.abspath(a) == os.path.abspath(b)
52
 
53
 
54
def validate_output_path(
55
    path: Path,
56
    *,
57
    input_paths: Iterable[Path] = (),
58
    allow_existing: bool = False,
59
    block_private_keys: bool = True,
60
) -> None:
61
    """Reject destructive or confusing output paths before writing."""
62
    if not str(path) or not path.name:
63
        raise ValueError("Please choose an output path.")
64
    if is_windows_reserved_path(path):
65
        raise ValueError(f"Refusing to write to Windows reserved device name: {path.name}")
66
    for input_path in input_paths:
67
        if input_path and same_path(path, input_path):
68
            raise ValueError("Output path must be different from every input path.")
69
    if block_private_keys and is_private_key_file(path):
70
        raise ValueError("Refusing to overwrite an Oversight private key file.")
71
    if path.exists() and not allow_existing:
72
        raise FileExistsError(f"Refusing to overwrite existing file: {path}")
73
 
74
 
75
def atomic_write_bytes(path: Path, data: bytes, *, mode: int | None = None) -> None:
76
    """Write bytes via temp file + fsync + atomic replace in the same directory."""
77
    path.parent.mkdir(parents=True, exist_ok=True)
78
    fd, tmp_name = tempfile.mkstemp(prefix=f".{path.name}.", suffix=".tmp", dir=path.parent)
79
    tmp_path = Path(tmp_name)
80
    try:
81
        if mode is not None and os.name == "posix":
82
            os.fchmod(fd, mode)
83
        with os.fdopen(fd, "wb") as f:
84
            f.write(data)
85
            f.flush()
86
            os.fsync(f.fileno())
87
        os.replace(tmp_path, path)
88
    except Exception:
89
        try:
90
            tmp_path.unlink()
91
        except OSError:
92
            pass
93
        raise
94
 
95
 
96
def atomic_write_text(path: Path, text: str, *, mode: int | None = None) -> None:
97
    atomic_write_bytes(path, text.encode("utf-8"), mode=mode)
98
 
99
 
100
def atomic_write_private_json(path: Path, data: dict) -> None:
101
    payload = json.dumps(data, indent=2)
102
    atomic_write_text(path, payload, mode=0o600)
103
    if os.name == "nt":
104
        harden_windows_private_file_acl(path)
105
 
106
 
107
def harden_windows_private_file_acl(path: Path) -> None:
108
    """Best-effort Windows ACL narrowing for private key files."""
109
    user = os.environ.get("USERNAME")
110
    if not user:
111
        return
112
    domain = os.environ.get("USERDOMAIN")
113
    principal = f"{domain}\\{user}" if domain else user
114
    subprocess.run(
115
        [
116
            "icacls",
117
            str(path),
118
            "/inheritance:r",
119
            "/grant:r",
120
            f"{principal}:(R,W)",
121
            "SYSTEM:(F)",
122
            "Administrators:(F)",
123
        ],
124
        stdout=subprocess.DEVNULL,
125
        stderr=subprocess.DEVNULL,
126
        check=False,
127
    )