NemoReport AI v2 — Datová pipeline (Phase B + C)¶
Stav 2026-04-30: pipeline production-ready od uploadu po
/retrieve. Phase D (chat s RAG injection) zatím nehotová —/chatpoužívá full report text, ne chunks.
Tento dokument popisuje co se přesně stane s daty od chvíle, kdy uživatel klikne na "Nahrát report", až po stav kdy lze klást dotazy a dostat top-K chunks. Krok po kroku, vodopád.
Přehled (jednou větou)¶
User uploadne PDF/MHTML/DOCX → Backend ho uloží do Supabase Storage (jejich managed infra) + DB row s relativním storage_path → Worker (Docker kontejner) ho asynchronně zpracuje (5 stages) → DB obsahuje sekce, figury, chunks s vector embeddingy → User volá POST /retrieve s dotazem → vrátí top-K relevantních chunks. Chat zatím chunks nepoužívá (Phase D).
Komponenty (kdo s čím mluví)¶
┌─────────────────────┐ ┌──────────────────────┐
│ Frontend │ HTTP │ Backend │
│ CF Workers │────────►│ FastAPI │
│ Next.js 16 │ JWT │ Docker kontejner │
│ workers.dev │◄────────│ uvicorn :8000 │
└─────────────────────┘ Realtime└─────────┬────────────┘
│ │
│ Realtime WebSocket │ taskiq enqueue
▼ ▼
┌─────────────────────────────────────────────────────────┐
│ Supabase Redis (Docker kontejner) │
│ Postgres + Auth + queue + rate limit │
│ Storage + Realtime AOF persistent │
└─────────┬───────────────────────────────────┬───────────┘
│ DB read/write │ Stream consume
│ ▼
│ ┌──────────────────────┐
│ │ Worker │
│ │ taskiq │
│ │ Docker kontejner │
│ │ app.worker_entry │
└──────────────────────►│ │
DB write (service_role)└──────────┬──────────┘
│ External APIs
▼
┌─────────────────────────────────┐
│ Mistral OCR (PDF/Image) │
│ Gemini-3-Flash (figures) │
│ Gemini-Embedding-2 (chunks) │
│ Cohere Rerank 4.0 (retrieval) │
└─────────────────────────────────┘
Krok 0 — Auth (jednou per session)¶
- User otevře
https://nemoreport-ai-frontend-v2.algaweb.workers.dev/login - Zadá email → frontend volá
supabase.auth.signInWithOtp({email})(anon key) - Supabase pošle magic link přes Resend SMTP (sandbox = jen
jiri@slimarik.cz) - Email obsahuje URL
https://nemoreport-ai-frontend-v2.algaweb.workers.dev/auth/confirm?token_hash=XYZ&type=magiclink - User klikne → frontend route
/auth/confirmzavolásupabase.auth.verifyOtp({token_hash, type})server-side - Supabase ověří + vrátí
access_token+refresh_token - JWT hook (migrace 0007) injektuje custom claims do JWT payload:
personal_tenant_id— UUID osobního tenanta useraactive_tenant_id— kde aktuálně user pracuje (default = personal)- Cookies jsou nastavené, redirect na
/
Výstup: User má JWT cookie. Backend ho ověří přes JWKS endpoint ({supabase_url}/auth/v1/.well-known/jwks.json).
Krok 1 — Upload PDF/MHTML/DOCX¶
Frontend (src/components/UploadZone.tsx):
- User drag-and-dropne soubor nebo klikne "Nahrát"
- Client-side validace:
- Velikost ≤ 50 MB
- MIME type ∈ {PDF, PNG, JPEG, WEBP, MHTML, HTML, DOCX}
- Build
FormDatas polemfile+ volitelnýtitle fetch('https://nemoreport-ai-backend-v2.sliplane.app/ingest', { method: 'POST', body: formData, headers: { Authorization: 'Bearer <jwt>' } })
Backend (app/routers/ingestion.py:upload_for_ingest):
- Auth dep
CurrentUservaliduje JWT (pyjwt + JWKS cache 1h TTL) →AuthUser(id, tenant_id, ...) - Rate limit
@limiter.limit("20/hour")per IP (slowapi + Redis) request.form()→UploadFile(Starlette)bytes = await file.read()(50 MB max přesstorage.validate_size)- MIME detection přes
python-magic(libmagic1 wrapper, čte file header): - Verify proti
ALLOWED_CONTENT_TYPES - Pokud nevyhovuje → HTTP 415 Unsupported Media Type
- Generate
report_id = uuid4(), buildstorage_path = "{tenant_id}/reports/{report_id}/original.{ext}" - Storage upload (service_role) do bucketu
nemoreport-uploads: supabase.storage.from_("nemoreport-uploads").upload(path, bytes)- DB INSERT
nemoreport.reports(service_role): - Enqueue worker job přes taskiq broker (Redis Stream):
await broker.startup()(lazy init)await run_ingestion.kiq("report", report_id)→ publish nanemoreport_ingestionconsumer group- Return
202 Accepted {id, status: 'uploaded', filename, size_bytes}
Důsledek: file v R2, DB row existuje, worker dostal task.
Krok 2 — Frontend sleduje progress (Realtime)¶
Frontend (src/components/IngestionProgress.tsx):
- Po 202 response otevře Supabase Realtime channel:
- 8s polling fallback přes
GET /ingest/{id}/status(pokud Realtime drop) statusLabel(status)mapping:uploaded→ "Nahráno, příprava ke zpracování…" (5%)parsing→ "Prozkoumáváme dokument…" (25%)parsed→ "Sekce extrahované, příprava anotací…" (55%)annotating→ "Popisujeme obrázky a mapy…" (70%)annotated→ "Anotace dokončené, finalizace…" (90%)embedding→ "Připravujeme vyhledávání…" (95%)ready→ "Hotovo" (100%)failed→ "Zpracování selhalo" (0%)
Důsledek: User vidí progress bar v UI a každou změnu statusu hned.
Krok 3 — Worker zpracovává (5-stage pipeline)¶
Worker (app/worker.py:run_ingestion) je orchestrátor — volá 5 stages sekvenčně přes taskiq, každý stage je samostatný retry-able task. Každý dostane _Target dataclass (sjednocuje "report" a "attachment" cesty).
Stage 1 — scan_target (1-2 sekundy)¶
- PATCH
reports.status='parsing'+ingestion_started_at=now() - Re-fetch full row z DB
- Validate
storage_pathexists (download HEAD) - Return
{stage:'scan', ok:true}
Stage 2 — parse_target (3 sec až 5 min, podle velikosti)¶
Routing per content_type:
| Content type | Cesta | Co dělá |
|---|---|---|
application/pdf, image/* |
_parse_via_mistral |
Mistral OCR |
application/vnd.openxmlformats-officedocument.wordprocessingml.document |
_parse_via_docx |
python-docx + Gemini per image |
text/html, message/rfc822, multipart/related |
_parse_via_bs4_mhtml |
BS4 + trafilatura (port z v1) |
PDF/Image cesta (Mistral OCR):
- Idempotence: DELETE existing
parsed_sections,parsed_tables,figurespro tento target (re-run safety) - Storage → signed URL (1h TTL)
- Mistral API call:
- Pro velké PDFs (> 500 KB) skip bbox+document annotations (timeout — viz B.7 hotfix)
- Per page:
- Assemble
parsed_markdown(text + obrázky placeholdery) - INSERT
parsed_sectionsrow (tenant_id, report_id, section_name, slug, markdown, tokens, order_in_doc) - Per figure (per page):
- Decode base64 → bytes
- Upload do bucketu
nemoreport-figures, path{tenant_id}/reports/{report_id}/figures/{figure_id}.{ext} - INSERT
figuresrow s:annotation_json(FigureAnnotation:image_type,summary,entities[],key_observations[])annotation_source = 'mistral'pokudnot annotation_is_thin(ann)(summary ≥ 40 chars + nějaké entities)annotation_source = 'pending'jinak (pošle se Gemini fallback ve stage 3)
- PATCH
reports.parsed_markdown(full doc) +parsed_metadata(addresses[],parcel_numbers[],municipalities[]) +status='parsed' - Cost tracking +3 halíře / page (Mistral OCR pricing)
MHTML cesta (NemoReport reporty z Nette):
- Storage → bytes
- BS4 parse → najít trafilatura-extracted clean text
- Heading-aware section split (podle CZ headings: "Souhrnný přehled", "Riziko povodní", atd.)
- Per section: INSERT
parsed_sectionsrow - Embedded images (z MHTML attachments, např. Intermap mapy):
- libmagic re-detect (MHTML obvykle dropne JS-rendered canvas, takže obrázky jsou broken/black)
- Upload do
nemoreport-figuresbucketu - INSERT
figuresrow sannotation_source='pending'(vždy → Gemini fallback)
DOCX cesta: python-docx extract paragraphs (heading-aware section split na Heading* style) + embedded images z doc.part.rels → INSERT figures s annotation_source='pending'.
Stage 3 — annotate_target (Gemini fallback, 5-30 sec)¶
- PATCH
reports.status='annotating' - SELECT
figures WHERE annotation_source='pending'(pro tento target) - Per pending figure:
- Download bytes z
nemoreport-figuresbucket - Pydantic AI Agent s
output_type=FigureAnnotation: - Prompt obsahuje:
- "Toto je obrázek (mapa, výkres, foto, scan, schéma) ze sekce '{section_name}' (strana {page})"
- Adresa hint z
parsed_metadata.addresses[0] - Excerpt
parsed_markdown[:2000] BinaryContent(data=img_bytes, media_type=mime)- CZ instrukce pro RAG-friendly popis
- Output:
FigureAnnotationPydantic struktura → JSON → UPDATEfigures.annotation_json,annotation_source='gemini',annotation_quality_score(heuristika) - Cost +1 halíř / call
- Resilient: per-figure exception loguje + skipne, neabortuje batch
- PATCH
reports.status='annotated'
Důsledek: Každá figura má AI anotaci. Většina figur (~85 % v dataset) má source='gemini' protože:
- Velké PDFs > 500 KB skip Mistral bbox → všechny 'pending' → Gemini
- MHTML reporty Mistral neběží → všechny 'pending' → Gemini
- DOCX images Mistral neběží → všechny 'pending' → Gemini
Stage 4 — embed_target (Phase C, 10-60 sec podle počtu chunks)¶
- PATCH
reports.status='embedding' - Soft-fail check (D7): pokud
GEMINI_API_KEYchybí →parsed_metadata.embedding_status='skipped', return ok (pipeline pokračuje na finalize) - Idempotence: DELETE existing
chunkspro tento target - Load:
list_chunkable_sections(report_id, attachment_id)— sekce daného targetulist_chunkable_tables(report_id, attachment_id)— tabulkylist_chunkable_figures(report_id, attachment_id)— figurychunk_target()(app/ingestion/chunking.py) generujeChunkSpeclist:- Sekce (text):
- Krátká (< 1200 tokens) → 1 chunk
- Dlouhá → naive paragraph split na ~1000 token kusy (\n\n boundaries)
- Tabulky: 1 chunk per tabulka (nikdy nedělíme — header+rows vazba)
- Figury: filter přes
should_embed_figure():- Annotation summary ≥ 80 chars
- Aspoň 1 entity
image_type ∉ {decorative, logo, footer}- 1 chunk per qualifying figure (text =
build_figure_text(fig)= caption + summary + entities + observations)
- Per chunk embed (asyncio with Semaphore=4):
- Text chunk →
provider.embed_text(content, task_type='RETRIEVAL_DOCUMENT') - Figure chunk → download bytes z
nemoreport-figures→provider.embed_multimodal(text, image_bytes, mime, RETRIEVAL_DOCUMENT) - Provider =
GeminiEmbedding2Provider(gemini-embedding-2GA, native multimodal) - Output: 3072-dim vector → Matryoshka truncate
[:1536]+ L2 normalize → halfvec literal[v1,v2,...] - Per-chunk failure → INSERT s
embedding=NULL(soft-fail, BM25 leg pořád funguje) - Bulk INSERT chunks (100/batch) do
nemoreport.chunks: tenant_id, report_id, attachment_id, section_id, figure_id, table_idsection_name, section_slug, attachment_filename, source_label(denormalized pro citaci)content_type ∈ {text, table, figure},source_type ∈ {main, attachment, figure}content, content_tokens, order_in_doc, order_in_sectionembedding halfvec(1536),embedding_type ∈ {text, multimodal},embedding_model='gemini-embedding-2',embedding_version='ga-2026-04'tsvGENERATED column zto_tsvector('nemoreport.czech_unaccent', content)— BM25 leg- Cost tracking +sum(cost_cents) →
reports.ingestion_cost_cents mark_embedding_status:'ok'|'partial'(≥ 1 fail) |'failed'(all fail) |'skipped'
Důsledek: chunks tabulka populated. HNSW partial index WHERE embedding IS NOT NULL zajistí rychlé vector search.
Stage 5 — finalize_target (1-2 sec)¶
- PATCH
reports.status='ready'+ingestion_finished_at=now() - Realtime publication automaticky emit UPDATE event → frontend re-render
Failure handling: pokud kterýkoliv stage vrátí ok=False, pipeline se přeruší s status='failed' + ingestion_error='...' v DB.
Krok 4 — User vidí ready report¶
Frontend /reports/[id] (server component s auth gate):
- Header: title, status badge "Hotovo", delete button (× s confirm), "Otevřít chat" link
- Main file panel: filename, status, parsed_sections preview, ingestion_cost
- Attachments section: list (initially empty pro single-file), source badges (Nette / user_upload), delete × jen pro user_upload, "+ Přidat soubor" upload
- Figures grid: thumbnails + AI anotace badge ("Gemini AI" — Phase C změna z "Gemini fallback")
- Klik na náhled → modal s plnou velikostí + plnou
FigureAnnotation(image_type, summary, entities, key_observations, quality_score) - Realtime subscribe na
reports+attachments(filterreport_id) → live status pokud user přidá attachment
Krok 5 — User přidá attachment (volitelně, folder model)¶
Folder model = report je container pro main + N attachments + figures napříč.
Frontend: drag soubor do "+ Přidat soubor" → POST /ingest/{report_id}/uploads
Backend (app/routers/ingestion.py:add_user_upload):
1. Validate JWT, allowlist, size
2. INSERT nemoreport.attachments row (source='user_upload', report_id=parent)
3. Storage upload do nemoreport-attachments bucket
4. Enqueue run_ingestion("attachment", attachment_id)
Worker projde stejných 5 stages, ale s _Target.kind='attachment'. figures.report_id je VŽDY parent report (multi-source folder retrieval = WHERE chunks.report_id = X zachytí cokoliv).
Nette HMAC cesta (app/routers/nette.py:POST /reports/{id}/attachments/system): stejné, ale auth přes HMAC signature místo JWT, source='nette'.
Krok 6 — POST /retrieve (Phase C end)¶
Toto je kde aktuálně pipeline končí (Phase C). Phase D napojí chunks na chat.
Endpoint POST /retrieve (JWT-authed) nebo POST /admin/retrieve/{report_id} (admin diagnostic, bypassuje JWT):
POST /retrieve
{
"query": "občanská vybavenost obchody dostupnost",
"scope": { "type": "folder", "report_id": "uuid" },
"top_k": 5
}
Backend (app/retrieval/service.py:retrieve):
1. Tenant scope verify (pre-flight)¶
- Fetch report row přes
db.get_report(report_id) - Pokud
report.tenant_id != user.tenant_id→ 404 "report not in tenant scope"
2a. Standalone rewrite (Phase C MVP no-op)¶
rewrite_standalone(query, history=None)→ vrací original (Phase D bude full LLM rewrite)
2b. HyDE conditional (should_use_hyde)¶
- Aktivuje pro: query < 4 slova NEBO
scope.type=='multi_report' - Pokud aktivní:
generate_hyde(query)→ Gemini-3-flash-preview vygeneruje 2-4 věty hypotetické odpovědi →embed_input = hyde_doc - Jinak:
embed_input = query
2c. Embed query¶
provider.embed_text(embed_input, task_type='RETRIEVAL_QUERY')- Gemini-2 má asymmetric embeddings — RETRIEVAL_QUERY produces different vector než RETRIEVAL_DOCUMENT (důležité, jinak by recall klesl)
- Output: 1536-dim halfvec
3. Hybrid retrieval (Postgres RPC)¶
- Pokud rerank enabled:
fetch_top_k = top_k * 4(kandidáti pro Cohere) - Volání
nemoreport.hybrid_search_chunks_by_folder(p_query_text, p_query_vec, p_report_id, p_top_k=fetch_top_k, ...): - Vector leg:
ORDER BY embedding <=> p_query_vec LIMIT p_pre_fusion_npřes HNSW partial index - BM25 leg:
tsv @@ websearch_to_tsquery + prefix wildcard fix(per_build_prefix_tsquery— kompenzuje czech_unaccent no-stemmer per §C9) - Fusion:
FULL OUTER JOIN ON id, RRF score1/(60+v_rank) + 1/(60+b_rank) ORDER BY rrf_score DESC LIMIT p_top_k- Vrací rows s
vector_rank, vector_dist, bm25_rank, bm25_score, rrf_score
4. Cohere Rerank 4.0 (volitelně, ENV-flagged)¶
- Pokud
COHERE_RERANK_ENABLED=trueANDlen(chunks) > 1: await rerank_documents(query, [c.content for c in chunks], top_n=top_k)- Cohere AsyncClientV2 →
rerank-v4.0-procross-encoder (32K context, multilingual CZ) - Re-order chunks per Cohere ranking + populate
rerank_score - Timeout 8s, graceful fallback na hybrid order pokud API down/rate limit
- Jinak: trim na top_k z hybrid order
4b. Per-source diversity (C.10)¶
- Pokud
scope.type='folder'ANDtop_k >= 4AND candidate set má > 1 source_type: - Detect dominant + missing types
- Swap lowest-scored chunk dominant_type za highest-scored missing_type
- Cap 2 swaps → udržuje balanced ratio bez injection irrelevant chunks
5. Insert retrieval_log (C.11 observability)¶
- Best-effort INSERT do
nemoreport.retrieval_log: query_text, rewritten_query, used_hyde, fusion, reranked, rerank_model, top_k, result_count, result_chunk_ids[], embed_ms, retrieval_ms, rerank_ms- Failure jen loguje, nerozbije response
6. Response¶
{
"chunks": [
{
"id": "uuid", "content": "...", "content_type": "text|table|figure",
"section_name": "...", "source_label": "...",
"rrf_score": 0.0327, "vector_rank": 1, "bm25_rank": 1, "vector_dist": 0.34,
"bm25_score": 0.20, "rerank_score": 0.7151
},
...
],
"embed_ms": 360, "retrieval_ms": 58, "rerank_ms": 667,
"scope_type": "folder", "top_k": 5,
"fusion": "hybrid_rrf", "reranked": true, "used_hyde": false
}
Latence E2E (z reálných měření): - Long query (5 slov, no HyDE): ~1085 ms (embed 360 + retrieve 58 + rerank 667) - Short query (1 slovo, HyDE active): ~700 ms (embed 287 + rerank 350 + LLM HyDE call)
Krok 7 — Chat (Phase D — NEHOTOVO)¶
Aktuální stav POST /chat (app/routers/chat.py):
- Načte report row → parsed_markdown (v2) nebo clean_text (v1-import legacy)
- Volá Pydantic AI Agent s plným textem reportu v system promptu
- Streamovaný SSE response
Co Phase D změní:
1. /chat interně zavolá /retrieve pro každý turn
2. Top-K chunks (5-8) injektnout do system promptu místo full text
3. AI cituje per chunk ("Z přílohy 'Vyjádření ČEZ' vyplývá...")
4. Token budget šetří dramaticky — 8 chunks à 1000 tokens = 8K vs 40K full text
5. Multi-report scope support — user může chatovat napříč reporty (aktuálně 1:1)
Co kde žije v DB (zjednodušený ER)¶
auth.users ──┐
├──► nemoreport.user_profiles ──► personal_tenant
│
└──► nemoreport.tenant_members ──► nemoreport.tenants
│
▼
nemoreport.reports (parent / folder)
│ status FSM: uploaded → parsing → parsed
│ → annotating → annotated
│ → embedding → ready / failed
│
┌────────────────────┼─────────────────────┐
▼ ▼ ▼
attachments parsed_sections figures
(FK report_id) (FK report_id + (FK report_id +
attachment_id) attachment_id +
section_id)
│ │ │
│ │ │
└────────────┬───────┴─────────────────────┘
▼
chunks (Phase C)
FK report_id (vždy parent)
FK attachment_id (NULL = main)
FK section_id / table_id / figure_id
halfvec(1536) embedding + tsvector
partial HNSW index WHERE embedding NOT NULL
│
▼
retrieval_log (per-call)
Storage layout (Supabase Storage, managed backend)¶
nemoreport-uploads/ # main uploaded files
└── {tenant_id}/
└── reports/
└── {report_id}/
└── original.{pdf|docx|mhtml|...}
nemoreport-attachments/ # přílohy (Nette + user_upload)
└── {tenant_id}/
└── reports/
└── {report_id}/
└── attachments/
└── {attachment_id}.{ext}
nemoreport-figures/ # extrahované obrázky / mapy
└── {tenant_id}/
└── reports/
└── {report_id}/
└── figures/
└── {figure_id}.{png|jpg|...}
RLS policies: bucket nemoreport-uploads má path-based scoping {tenant_id}/... — user vidí jen svoje cesty. Service role bypassuje (worker).
Ceny (per typický folder, ~250-300 chunks)¶
| Stage | Engine | Cost |
|---|---|---|
| Mistral OCR (PDF) | $0.001-0.030 / page | ~3-30 halířů per report |
| Gemini fallback per figure | ~$0.0004 / image | ~1 halíř / figure |
| Gemini-2 embed (text + multimodal) | ~$0.0001 / 1K tokens | ~5-15 halířů per folder |
| Total ingestion | ~10-50 halířů per folder | |
| Per query embed | ~$0.0001 | 0.002 Kč |
| Cohere Rerank 4.0 | $0.0025 / search | 5.8 halíře / query |
| Per-query LLM answer (Phase D) | ~$0.017 | ~40 halířů |
Reálná čísla z B.14 + C.13 backfill: 2.81 Kč za 21 reportů (290 chunks, 51 multimodal). Avg 13 halířů per report.
Co ještě chybí (pro produkci)¶
| Item | Phase | Impact |
|---|---|---|
| Phase D — chat s RAG injection (chunks místo full text) | D | Hlavní cíl, current chat je "dumb" |
| Image resize 1568 long-edge před multimodal embed | C.9 | Optimalizace — current raw bytes work |
| Golden set v1 + eval harness (recall@10 ≥ 0.85) | C.12 | Quality gate, A/B testing |
| Resend production domain (mimo sandbox) | infra | Real testers — aktuálně jen owner email |
| Cohere production key (mimo trial) | infra | Trial limit ~1000 calls/měsíc |
/admin/retrieval/* endpoints |
admin | Per-tenant analytics, query logs (C.11 data ready) |
/admin/cost/* UI v admin frontu |
admin | Cost dashboard (B.13 data ready) |
Metriky které máme k dispozici¶
Per report (nemoreport.reports):
- ingestion_cost_cents — sum všech stages
- ingestion_started_at / ingestion_finished_at — celková latence
- parsed_metadata.embedding_status — ok / partial / failed / skipped
Per ingestion stage (nemoreport.ingestion_jobs):
- Per stage status, attempt count, started/finished, error_text, metrics jsonb
Per retrieve call (nemoreport.retrieval_log):
- query_text, rewritten_query, used_hyde, fusion, reranked, rerank_model
- top_k, result_count, result_chunk_ids[]
- embed_ms / retrieval_ms / rerank_ms
- user_feedback (Phase D thumbs)
Admin endpointy (Phase B B.13):
- GET /admin/cost/global?days=N — top tenants by cost
- GET /admin/cost/tenant/{id}?days=N — per-tenant breakdown