Agentic AI Deep Research Platform for Interal Documents

Overview
I built this to turn scattered docs and systems into clear answers you can act on. It coordinates a few focused agents, looks where your information actually lives, and assembles a sourced, executive‑ready brief—while you watch it work in real time.
Full source code: github.com/ibiggy9/agentic-knowledge-base
Why I built this:
I started to notice that the way that cloud first companies were building their data footprints was not compatible with all employees equally. To get the data you need, it requires SQL skills, knowledge of your particular cloud's products. Beyond that you have to know where the data is stored and have teams keeping it organized. All of this is fine for software engineers but for business users that aren't expected to have these skills, they chronically had much slower speed-to-insight. They'd have to either hire a technical person, or put together a large formal request for data which came with slow turnarounds.
Beyond that, even if you get the information you need, you likely still need support summarizing it, visualizing and otherwise getting insights out of it. This is the role of junior or even mid level employees. In fact, there are swaths of roles that are entirely about writing up documents on large amounts of source data for executives to save them time.
In the age of LLMs and especially agentic AI, I decided to see what the state of the tech could do to help us with all the above. MCP was all the rage and I decided to build a custom MCP client and a series of servers that could help access huge amounts of information in a few minutes and produce a high quality consulting-style report.
The demo video only shows it accessing one server which accesses a Google Drive folder that has a ton of open source insights about a company. I was also able to build one that provides access to a SQL database and an API.
Problem & Goal
Most teams already know the questions they need to answer. The slowdown happens because the facts are spread across wikis, databases, shared drives, PDFs, spreadsheets, and third‑party tools. That leads to manual hunting, partial answers, and lots of follow‑ups.
The goal here is simple: compress days of research into minutes by orchestrating deep research across many sources and returning a brief that leaders can trust—complete with citations and an appendix you can audit.
What it does well
- Coordinates multiple agents so the right tool looks in the right place at the right time, instead of one model trying to do everything.
- Streams progress and output so you can see what it’s doing, where it’s looking, and how the conclusions are forming.
- Pulls from the places you actually work (SQLite, Google Drive, external APIs, raw docs) and stitches the results together.
- Handles the rough edges of production with sane session management and error handling so it behaves predictably.
Deep research output
When you ask a question, the system fan‑outs across your sources and returns a structured deliverable:
- Executive Summary: the direct answer and confidence, written for decision‑makers.
- Key Findings: the strongest insights with links back to the evidence.
- Supporting Evidence: quotes, data snippets, tables, and references.
- Gaps & Next Steps: what’s missing and what to do next.
- Appendix: raw extracts for traceability and review.
You see results arrive in real time, including progress updates, so there’s no “black box” step.
How it works
- Interface: a Next.js app with a chat‑like UI that shows streaming output and progress.
- Gateway: a FastAPI service that manages sessions and streams results back.
- Orchestrator: an MCP client that decides which agents/tools to use and merges their work.
- Agents: focused workers for SQLite analysis, Google Drive documents, external data, and raw file processing.
Simple flow: you ask → the gateway routes → the orchestrator assigns agents → they work in parallel → results are merged and streamed back.
Choices I made
- Orchestrator‑first so intent is evaluated up front and the right agents are called. Results are merged with a consistent structure.
- Streaming by default so you can monitor progress and stay engaged while it works.
- Structured outputs (exec summary, findings, evidence, appendix) with citations to build trust.
- Operational resilience with retries, timeouts, and idempotent task keys to reduce flakiness.
- Observability so long‑running tasks can be debugged and tuned.
About MCP + Gemini
When I started this, Gemini 1.5 Flash didn’t support MCP publicly. I bridged the gap with a small compatibility layer that:
- Translated function‑calling into MCP‑shaped requests/responses.
- Normalized partial streaming chunks into coherent SSE updates.
- Centralized error/retry policies with backoff and basic circuit‑breaking.
- Preserved tool/agent context so downstream MCP servers could interoperate.
That kept Gemini’s large‑context benefits while letting MCP handle standardized orchestration. Adding Claude later was a configuration change, not a rewrite.
Tech stack
- Frontend: Next.js 15, React 19, Tailwind 4, NextUI, Framer Motion (SSE for streaming)
- Backend: FastAPI (async), session management, structured logging
- Orchestration: MCP protocol with Gemini/Claude integrations
- Agents & Data: SQLite (aiosqlite), Google Drive API, external APIs; PDF/DOCX/XLSX/PPTX processing
Architecture overview
-
Backend: FastAPI gateway (
backend/api/gateway/app.py
)- CORS enabled for all origins.
- Session lifecycle:
POST /api/init
creates or replaces a session, instantiates Gemini MCP client, connects to a selected MCP server via stdio, cachessession_id
→ client inactive_sessions
, and returns available tool names.DELETE /api/session/{session_id}
cleans up and removes a session.GET /api/health
returns health andactive_sessions
count.
- Query processing:
POST /api/query
runs a single-turn request via the MCP client and returns the final response JSON.GET /api/query-stream?session_id=…&query=…
opens an SSE stream:- Sets a progress callback into the MCP client.
- Spawns the analysis task; streams progress events and a final event.
- Emits keepalive messages on idle timeouts, and a final error message if timeouts accumulate post-completion.
-
Orchestrator: Gemini MCP Client (
backend/api/gateway/client_gemini.py
)- LLM: Google GenAI
genai.Client()
; models:"gemini-2.5-flash"
primary and final, with"gemini-1.5-flash"
fallback. - MCP connection:
mcp.client.stdio
to spawn and connect to an MCP server script path;ClientSession
handlesinitialize
,list_tools
, andcall_tool
. - Flow per query:
- Conversation history management and context compaction.
- Classification: LLM returns
CASUAL_CONVERSATION | HELP_REQUEST | ANALYSIS_NEEDED
. - If casual/help → respond directly; else run strategic analysis:
- Phase 1–2: Analyze the user query and plan steps using available MCP tools (from
list_tools
). - Phase 3: Execute steps:
- Prompt LLM with tools enabled to produce function calls.
- Parse
function_calls
; enqueue each intorequest_queue
; queue worker callssession.call_tool
. - Collect results; update metrics; stream progress via
_report_progress
.
- Phase 4: Critical evaluation with LLM over accumulated context.
- Phase 5: Synthesis with LLM for final answer; return text.
- Phase 1–2: Analyze the user query and plan steps using available MCP tools (from
- Tool calling: Cleans schemas for Gemini; extracts function calls; manages async queue; aggregates results; formats results for follow-up prompts.
- Progress streaming:
set_progress_callback
is injected by gateway; client calls it with messages, step markers, and metrics. - Token tracking: rough usage/cost estimates recorded for each call.
- LLM: Google GenAI
-
Alternative orchestrator (deprecated): Claude MCP Client (
backend/api/gateway/client_claude.py
)- Similar orchestration structure using Anthropic Messages API.
- Maintained but the gateway defaults to Gemini.
-
Agents (MCP servers)
- Google Drive document processing server (
backend/agents/document_processor/generic_google_drive_mcp_server/server.py
):- Exposes MCP tools:
list_drive_folders(parent_folder_id?)
list_drive_files(folder_id?, file_types="all")
get_file_content(file_id, max_chars=100000)
with content extraction for PDF, Excel, Word, PowerPoint, TXT.get_file_metadata(file_id)
search_drive_files(query, folder_id?)
- Uses Service Account credentials for Drive API; caches file contents; robust per-type extraction and chunking helpers.
- Exposes MCP tools:
- Other MCP servers referenced via
SERVER_PATHS
in the gateway:rfx
:./backend/agents/rfx_analyzer/mcp-server_rfx/server_test.py
samsara
:./backend/agents/samsara_integration/mcp_server_samsara/server.py
raw_rfx
:./backend/agents/raw_data_processor/mcp_server_rfx_raw_data/server.py
- These scripts are intended to be spawned via stdio and expose tools discoverable by
list_tools
.
- Google Drive document processing server (
-
Frontend: Next.js chat UI (
frontend/src/pages/page.js
)- On mount or server type change:
GET /api/server-types
to populate selection.POST /api/init
with{ server_type }
, storessession_id
, shows welcome message, and enables suggestions.
- Sending messages:
- Preferred: opens SSE
GET /api/query-stream?session_id&query=…
.- Receives progress events (used by
ProgressTracker
) and a final event which appends the assistant message.
- Receives progress events (used by
- Fallback:
POST /api/query
with{ session_id, query }
on SSE error.
- Preferred: opens SSE
- Cleanup: closes SSE and
DELETE /api/session/{session_id}
on reset or unmount. - Rendering: streams progress, renders assistant text with markdown and table extraction (
ChatUtils.js
).
- On mount or server type change:
Request flow
-
Session
POST /api/init
:- Validates server type against
SERVER_PATHS
. - Creates Gemini MCP client →
connect_to_server(server_path)
→initialize
→list_tools
. - Stores
active_sessions[session_id] = client
, returns{status: "connected", response: {"session_id", "server_type", "available_tools": [...]}}
.
- Validates server type against
-
Query (SSE)
GET /api/query-stream?session_id&query
:- Validates session; yields an initial progress event.
- Injects a
progress_callback
into the MCP client to enqueue progress messages. - Runs
client.process_query(query)
in background task. - For each queued item: writes an SSE event.
- On completion: emits a final event with
response
orerror
. - Keepalive every 20s if idle, with cut-off after 3 consecutive timeouts.
-
Query (non-streaming fallback)
POST /api/query
:- Calls
client.process_query(query)
and returns{status: "success", response}
or{status: "error", message}
.
- Calls
-
Frontend consumption
- Establishes SSE, updates
progressDetails
andprogressIndicator
with each event, appends final assistant message whentype === 'final'
.
- Establishes SSE, updates
Notable implementation details
- The gateway currently initializes Gemini-based MCP client by default (
load_mcp_client
usesGeminiClient
). - SSE formatting includes enriched step completion detection by parsing messages for “Successfully … step X”.
- The MCP client tracks operation metrics (queries executed, documents processed, folders scanned) and includes these in progress details.
- Tool schemas returned by MCP are normalized for Gemini compatibility (
_clean_schema_for_gemini
). - The Google Drive MCP server robustly handles different Google MIME types by exporting to Office formats then parsing text.
- Cleanup is implemented on FastAPI lifespan shutdown and explicit
DELETE /api/session/{id}
.
Diagrams
Component/flow overview
flowchart LR
UI["Next.js Chat UI (frontend/src/pages/page.js)"]
API["FastAPI Gateway (backend/api/gateway/app.py)"]
ORCH["Orchestrator: Gemini MCP Client (backend/api/gateway/client_gemini.py)"]
QUEUE(("Async request_queue worker"))
GENAI["Google GenAI models.generate_content"]
MCP1[["MCP Server: gdrive-competitor-analysis (backend/agents/.../generic_google_drive_mcp_server/server.py)"]]
MCPx[["Other MCP servers (paths from SERVER_PATHS)"]]
SSE(("SSE stream to browser"))
UI -->|"POST /api/init {server_type}"| API
API -->|"spawn + connect_to_server(server_path)"| ORCH
ORCH -->|"stdio_client"| MCPx
ORCH -->|"list_tools"| MCPx
API -->|"{status: connected, session_id}"| UI
UI -->|"GET /api/query-stream?session_id&query"| API
API -->|"set_progress_callback"| ORCH
ORCH -->|"classify / plan / execute"| GENAI
GENAI --> ORCH
ORCH -->|"tool function_calls"| QUEUE
QUEUE -->|"call_tool"| MCP1
MCP1 -->|"tool_result"| QUEUE
QUEUE --> ORCH
ORCH -->|"_report_progress(...)"| API
API -->|"SSE {type: progress, details, metrics}"| SSE
SSE --> UI
ORCH -->|"final synthesis text"| API
API -->|"SSE {type: final, response}"| UI
API -->|"keepalive on timeout"| UI
Sequence
sequenceDiagram
autonumber
participant FE as "Next.js UI"
participant GW as "FastAPI Gateway"
participant MCP as "Gemini MCP Client"
participant SRV as "MCP Server (Google Drive)"
participant LLM as "Google GenAI"
FE->>GW: "POST /api/init {server_type}"
GW->>MCP: "connect_to_server(server_path)"
MCP->>SRV: "stdio initialize"
MCP->>SRV: "list_tools()"
SRV-->>MCP: "tools[]"
MCP-->>GW: "ready"
GW-->>FE: "{status: connected, session_id}"
FE->>GW: "GET /api/query-stream?session_id&query"
GW->>MCP: "set_progress_callback(cb)"
par "classify and plan"
MCP->>LLM: "classify (CASUAL|HELP|ANALYSIS)"
LLM-->>MCP: "label"
MCP->>LLM: "planning prompt with tool list"
LLM-->>MCP: "step-by-step plan"
end
loop "for each planned step"
MCP->>LLM: "execution prompt (tools enabled)"
LLM-->>MCP: "function_calls"
MCP->>SRV: "call_tool(name,args) via queue"
SRV-->>MCP: "tool_result"
MCP-->>GW: "_report_progress('Successfully completed step i', details)"
GW-->>FE: "SSE progress event"
end
MCP->>LLM: "synthesis prompt (full context)"
LLM-->>MCP: "final analysis text"
MCP-->>GW: "final response"
GW-->>FE: "SSE final event"
Key data contracts
-
Init session
- Request:
POST /api/init
with JSON{"server_type": "rfx"|"samsara"|"raw_rfx"}
- Response (success):
{"status":"connected","response":"{\"session_id\":\"…\",\"server_type\":\"…\",\"available_tools\":[…]}"}
- Request:
-
Streaming query
- Request:
GET /api/query-stream?session_id=...&query=...
(SSE) - Events:
- Progress:
{"type":"progress","message":"…","details":{...},"metrics":{...},"timestamp":...}
- Keepalive:
{"type":"keepalive","consecutive_timeouts":n,"processing_complete":bool,"queue_empty":bool,"final_sent":bool,"timestamp":...}
- Final:
{"type":"final","response":"<final_text>","timestamp":...}
- Error:
{"type":"error","message":"…","timestamp":...}
- Progress:
- Request:
-
Non-streaming query
- Request:
POST /api/query
with{"session_id":"…","query":"…"}
. - Response:
{"status":"success","response":"<final_text>"}
or{"status":"error","message":"…"}
.
- Request:
-
Frontend message model
{ role: 'assistant'|'user', content: string, hasCompleted?: boolean }
- SSE consumes progress events to populate
progressDetails
andprogressIndicator
.
What you’ll find in the repo
- Complete frontend (chat UI, streaming, mobile‑ready)
- FastAPI gateway with sessions and SSE
- MCP client and multiple specialized MCP servers
- Error handling, monitoring, and progress tracking
What’s not included
- Proprietary data, credentials, keys, and domain‑specific logic (kept out intentionally)