| | @@ -18,6 +18,31 @@ pub struct MigrationReport { |
| | pub corpus: i64, |
| | } |
| | |
| + | #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] |
| + | pub struct RegistryCounts { |
| + | pub manifests: i64, |
| + | pub beacons: i64, |
| + | pub watermarks: i64, |
| + | pub events: i64, |
| + | pub corpus: i64, |
| + | } |
| + | |
| + | #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] |
| + | pub struct RegistryIntegrityReport { |
| + | pub ok: bool, |
| + | pub counts: RegistryCounts, |
| + | pub orphan_beacons: i64, |
| + | pub orphan_watermarks: i64, |
| + | pub orphan_events: i64, |
| + | pub orphan_corpus: i64, |
| + | pub beacon_identity_mismatches: i64, |
| + | pub watermark_identity_mismatches: i64, |
| + | pub event_identity_mismatches: i64, |
| + | pub malformed_manifest_json: i64, |
| + | pub invalid_manifest_signatures: i64, |
| + | pub mismatched_manifest_file_ids: i64, |
| + | } |
| + | |
| | pub async fn create_pool(db_path: &Path) -> Result<SqlitePool> { |
| | if let Some(parent) = db_path.parent() { |
| | std::fs::create_dir_all(parent) |
| | @@ -198,6 +223,110 @@ pub async fn migrate_from_sqlite( |
| | result |
| | } |
| | |
| + | pub async fn validate_registry_integrity(pool: &SqlitePool) -> Result<RegistryIntegrityReport> { |
| + | let counts = registry_counts(pool).await?; |
| + | let orphan_beacons = count_query( |
| + | pool, |
| + | "SELECT COUNT(*) FROM beacons b LEFT JOIN manifests m ON b.file_id = m.file_id WHERE m.file_id IS NULL", |
| + | ) |
| + | .await?; |
| + | let orphan_watermarks = count_query( |
| + | pool, |
| + | "SELECT COUNT(*) FROM watermarks w LEFT JOIN manifests m ON w.file_id = m.file_id WHERE m.file_id IS NULL", |
| + | ) |
| + | .await?; |
| + | let orphan_events = count_query( |
| + | pool, |
| + | "SELECT COUNT(*) FROM events e LEFT JOIN manifests m ON e.file_id = m.file_id WHERE e.file_id IS NOT NULL AND m.file_id IS NULL", |
| + | ) |
| + | .await?; |
| + | let orphan_corpus = count_query( |
| + | pool, |
| + | "SELECT COUNT(*) FROM corpus c LEFT JOIN manifests m ON c.file_id = m.file_id WHERE m.file_id IS NULL", |
| + | ) |
| + | .await?; |
| + | let beacon_identity_mismatches = count_query( |
| + | pool, |
| + | "SELECT COUNT(*) FROM beacons b JOIN manifests m ON b.file_id = m.file_id WHERE b.recipient_id != m.recipient_id OR b.issuer_id != m.issuer_id", |
| + | ) |
| + | .await?; |
| + | let watermark_identity_mismatches = count_query( |
| + | pool, |
| + | "SELECT COUNT(*) FROM watermarks w JOIN manifests m ON w.file_id = m.file_id WHERE w.recipient_id != m.recipient_id OR w.issuer_id != m.issuer_id", |
| + | ) |
| + | .await?; |
| + | let event_identity_mismatches = count_query( |
| + | pool, |
| + | "SELECT COUNT(*) FROM events e JOIN manifests m ON e.file_id = m.file_id WHERE (e.recipient_id IS NOT NULL AND e.recipient_id != m.recipient_id) OR (e.issuer_id IS NOT NULL AND e.issuer_id != m.issuer_id)", |
| + | ) |
| + | .await?; |
| + | |
| + | let mut malformed_manifest_json = 0; |
| + | let mut invalid_manifest_signatures = 0; |
| + | let mut mismatched_manifest_file_ids = 0; |
| + | let manifest_rows: Vec<(String, String)> = |
| + | sqlx::query_as("SELECT file_id, manifest_json FROM manifests") |
| + | .fetch_all(pool) |
| + | .await?; |
| + | |
| + | for (file_id, manifest_json) in manifest_rows { |
| + | match oversight_manifest::Manifest::from_json(manifest_json.as_bytes()) { |
| + | Ok(manifest) => { |
| + | if manifest.file_id != file_id { |
| + | mismatched_manifest_file_ids += 1; |
| + | } |
| + | if !manifest.verify().unwrap_or(false) { |
| + | invalid_manifest_signatures += 1; |
| + | } |
| + | } |
| + | Err(_) => { |
| + | malformed_manifest_json += 1; |
| + | } |
| + | } |
| + | } |
| + | |
| + | let ok = orphan_beacons == 0 |
| + | && orphan_watermarks == 0 |
| + | && orphan_events == 0 |
| + | && orphan_corpus == 0 |
| + | && beacon_identity_mismatches == 0 |
| + | && watermark_identity_mismatches == 0 |
| + | && event_identity_mismatches == 0 |
| + | && malformed_manifest_json == 0 |
| + | && invalid_manifest_signatures == 0 |
| + | && mismatched_manifest_file_ids == 0; |
| + | |
| + | Ok(RegistryIntegrityReport { |
| + | ok, |
| + | counts, |
| + | orphan_beacons, |
| + | orphan_watermarks, |
| + | orphan_events, |
| + | orphan_corpus, |
| + | beacon_identity_mismatches, |
| + | watermark_identity_mismatches, |
| + | event_identity_mismatches, |
| + | malformed_manifest_json, |
| + | invalid_manifest_signatures, |
| + | mismatched_manifest_file_ids, |
| + | }) |
| + | } |
| + | |
| + | async fn registry_counts(pool: &SqlitePool) -> Result<RegistryCounts> { |
| + | Ok(RegistryCounts { |
| + | manifests: count_query(pool, "SELECT COUNT(*) FROM manifests").await?, |
| + | beacons: count_query(pool, "SELECT COUNT(*) FROM beacons").await?, |
| + | watermarks: count_query(pool, "SELECT COUNT(*) FROM watermarks").await?, |
| + | events: count_query(pool, "SELECT COUNT(*) FROM events").await?, |
| + | corpus: count_query(pool, "SELECT COUNT(*) FROM corpus").await?, |
| + | }) |
| + | } |
| + | |
| + | async fn count_query(pool: &SqlitePool, sql: &str) -> Result<i64> { |
| + | let (count,): (i64,) = sqlx::query_as(sql).fetch_one(pool).await?; |
| + | Ok(count) |
| + | } |
| + | |
| | async fn validate_source_schema(conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>) -> Result<()> { |
| | for table in MIGRATED_TABLES { |
| | let exists: Option<(String,)> = sqlx::query_as( |
| | @@ -561,6 +690,8 @@ pub async fn get_semantic_candidates( |
| | #[cfg(test)] |
| | mod tests { |
| | use super::*; |
| + | use oversight_crypto::ClassicIdentity; |
| + | use oversight_manifest::{Manifest, Recipient}; |
| | use std::path::PathBuf; |
| | |
| | fn temp_dir(label: &str) -> PathBuf { |
| | @@ -575,13 +706,14 @@ mod tests { |
| | } |
| | |
| | async fn seed_source(pool: &SqlitePool) { |
| + | let (issuer_pub, manifest_json) = signed_manifest_json("file-1"); |
| | upsert_manifest( |
| | pool, |
| | "file-1", |
| | "recipient-1", |
| | "issuer-1", |
| - | &"ab".repeat(32), |
| - | r#"{"file_id":"file-1"}"#, |
| + | &issuer_pub, |
| + | &manifest_json, |
| | 10, |
| | ) |
| | .await |
| | @@ -637,6 +769,35 @@ mod tests { |
| | .unwrap(); |
| | } |
| | |
| + | fn signed_manifest_json(file_id: &str) -> (String, String) { |
| + | let issuer = ClassicIdentity::generate(); |
| + | let recipient = ClassicIdentity::generate(); |
| + | let mut manifest = Manifest::new( |
| + | "fixture.txt", |
| + | "ab".repeat(32), |
| + | 4096, |
| + | "issuer-1", |
| + | hex::encode(issuer.ed25519_pub), |
| + | Recipient { |
| + | recipient_id: "recipient-1".into(), |
| + | x25519_pub: hex::encode(recipient.x25519_pub), |
| + | ed25519_pub: None, |
| + | p256_pub: None, |
| + | }, |
| + | "https://registry.test", |
| + | "text/plain", |
| + | None, |
| + | None, |
| + | "GLOBAL", |
| + | ); |
| + | manifest.file_id = file_id.into(); |
| + | manifest.sign(issuer.ed25519_priv.as_ref()).unwrap(); |
| + | ( |
| + | hex::encode(issuer.ed25519_pub), |
| + | String::from_utf8(manifest.to_json().unwrap()).unwrap(), |
| + | ) |
| + | } |
| + | |
| | #[tokio::test] |
| | async fn migrate_from_sqlite_copies_python_registry_tables() { |
| | let source_dir = temp_dir("source"); |
| | @@ -728,4 +889,91 @@ mod tests { |
| | let _ = std::fs::remove_dir_all(source_dir); |
| | let _ = std::fs::remove_dir_all(dest_dir); |
| | } |
| + | |
| + | #[tokio::test] |
| + | async fn validate_registry_integrity_accepts_clean_rows() { |
| + | let dir = temp_dir("validate-clean"); |
| + | std::fs::create_dir_all(&dir).unwrap(); |
| + | let db_path = dir.join("registry.sqlite"); |
| + | let pool = create_pool(&db_path).await.unwrap(); |
| + | run_migrations(&pool).await.unwrap(); |
| + | seed_source(&pool).await; |
| + | |
| + | let report = validate_registry_integrity(&pool).await.unwrap(); |
| + | assert!(report.ok); |
| + | assert_eq!(report.counts.manifests, 1); |
| + | assert_eq!(report.counts.beacons, 1); |
| + | assert_eq!(report.malformed_manifest_json, 0); |
| + | assert_eq!(report.invalid_manifest_signatures, 0); |
| + | |
| + | pool.close().await; |
| + | let _ = std::fs::remove_dir_all(dir); |
| + | } |
| + | |
| + | #[tokio::test] |
| + | async fn validate_registry_integrity_reports_bad_rows() { |
| + | let dir = temp_dir("validate-bad"); |
| + | std::fs::create_dir_all(&dir).unwrap(); |
| + | let db_path = dir.join("registry.sqlite"); |
| + | let pool = create_pool(&db_path).await.unwrap(); |
| + | run_migrations(&pool).await.unwrap(); |
| + | seed_source(&pool).await; |
| + | |
| + | sqlx::query( |
| + | "INSERT INTO manifests (file_id, recipient_id, issuer_id, issuer_ed25519_pub, manifest_json, registered_at) VALUES (?, ?, ?, ?, ?, ?)", |
| + | ) |
| + | .bind("bad-file") |
| + | .bind("recipient-1") |
| + | .bind("issuer-1") |
| + | .bind("00") |
| + | .bind("{") |
| + | .bind(20_i64) |
| + | .execute(&pool) |
| + | .await |
| + | .unwrap(); |
| + | upsert_beacon(&pool, "orphan-token", "missing-file", "r", "i", "dns", 21) |
| + | .await |
| + | .unwrap(); |
| + | upsert_watermark(&pool, "orphan-mark", "L1", "missing-file", "r", "i", 21) |
| + | .await |
| + | .unwrap(); |
| + | insert_event( |
| + | &pool, |
| + | "orphan-token", |
| + | Some("missing-file"), |
| + | Some("r"), |
| + | Some("i"), |
| + | "dns", |
| + | None, |
| + | None, |
| + | None, |
| + | 21, |
| + | None, |
| + | None, |
| + | ) |
| + | .await |
| + | .unwrap(); |
| + | sqlx::query( |
| + | "INSERT INTO corpus (file_id, hash_kind, hash_value, metadata, registered_at) VALUES (?, ?, ?, ?, ?)", |
| + | ) |
| + | .bind("missing-file") |
| + | .bind("perceptual") |
| + | .bind("phash-missing") |
| + | .bind(None::<String>) |
| + | .bind(21_i64) |
| + | .execute(&pool) |
| + | .await |
| + | .unwrap(); |
| + | |
| + | let report = validate_registry_integrity(&pool).await.unwrap(); |
| + | assert!(!report.ok); |
| + | assert_eq!(report.orphan_beacons, 1); |
| + | assert_eq!(report.orphan_watermarks, 1); |
| + | assert_eq!(report.orphan_events, 1); |
| + | assert_eq!(report.orphan_corpus, 1); |
| + | assert_eq!(report.malformed_manifest_json, 1); |
| + | |
| + | pool.close().await; |
| + | let _ = std::fs::remove_dir_all(dir); |
| + | } |
| | } |