Ian Bigford

Agentic AI Deep Research Platform for Interal Documents

2/23/2025

Overview

I built this to turn scattered docs and systems into clear answers you can act on. It coordinates a few focused agents, looks where your information actually lives, and assembles a sourced, executive‑ready brief—while you watch it work in real time.

Full source code: github.com/ibiggy9/agentic-knowledge-base

Why I built this:

I started to notice that the way that cloud first companies were building their data footprints was not compatible with all employees equally. To get the data you need, it requires SQL skills, knowledge of your particular cloud's products. Beyond that you have to know where the data is stored and have teams keeping it organized. All of this is fine for software engineers but for business users that aren't expected to have these skills, they chronically had much slower speed-to-insight. They'd have to either hire a technical person, or put together a large formal request for data which came with slow turnarounds.

Beyond that, even if you get the information you need, you likely still need support summarizing it, visualizing and otherwise getting insights out of it. This is the role of junior or even mid level employees. In fact, there are swaths of roles that are entirely about writing up documents on large amounts of source data for executives to save them time.

In the age of LLMs and especially agentic AI, I decided to see what the state of the tech could do to help us with all the above. MCP was all the rage and I decided to build a custom MCP client and a series of servers that could help access huge amounts of information in a few minutes and produce a high quality consulting-style report.

The demo video only shows it accessing one server which accesses a Google Drive folder that has a ton of open source insights about a company. I was also able to build one that provides access to a SQL database and an API.

Problem & Goal

Most teams already know the questions they need to answer. The slowdown happens because the facts are spread across wikis, databases, shared drives, PDFs, spreadsheets, and third‑party tools. That leads to manual hunting, partial answers, and lots of follow‑ups.

The goal here is simple: compress days of research into minutes by orchestrating deep research across many sources and returning a brief that leaders can trust—complete with citations and an appendix you can audit.

What it does well

  • Coordinates multiple agents so the right tool looks in the right place at the right time, instead of one model trying to do everything.
  • Streams progress and output so you can see what it’s doing, where it’s looking, and how the conclusions are forming.
  • Pulls from the places you actually work (SQLite, Google Drive, external APIs, raw docs) and stitches the results together.
  • Handles the rough edges of production with sane session management and error handling so it behaves predictably.

Deep research output

When you ask a question, the system fan‑outs across your sources and returns a structured deliverable:

  • Executive Summary: the direct answer and confidence, written for decision‑makers.
  • Key Findings: the strongest insights with links back to the evidence.
  • Supporting Evidence: quotes, data snippets, tables, and references.
  • Gaps & Next Steps: what’s missing and what to do next.
  • Appendix: raw extracts for traceability and review.

You see results arrive in real time, including progress updates, so there’s no “black box” step.

How it works

  • Interface: a Next.js app with a chat‑like UI that shows streaming output and progress.
  • Gateway: a FastAPI service that manages sessions and streams results back.
  • Orchestrator: an MCP client that decides which agents/tools to use and merges their work.
  • Agents: focused workers for SQLite analysis, Google Drive documents, external data, and raw file processing.

Simple flow: you ask → the gateway routes → the orchestrator assigns agents → they work in parallel → results are merged and streamed back.

Choices I made

  • Orchestrator‑first so intent is evaluated up front and the right agents are called. Results are merged with a consistent structure.
  • Streaming by default so you can monitor progress and stay engaged while it works.
  • Structured outputs (exec summary, findings, evidence, appendix) with citations to build trust.
  • Operational resilience with retries, timeouts, and idempotent task keys to reduce flakiness.
  • Observability so long‑running tasks can be debugged and tuned.

About MCP + Gemini

When I started this, Gemini 1.5 Flash didn’t support MCP publicly. I bridged the gap with a small compatibility layer that:

  • Translated function‑calling into MCP‑shaped requests/responses.
  • Normalized partial streaming chunks into coherent SSE updates.
  • Centralized error/retry policies with backoff and basic circuit‑breaking.
  • Preserved tool/agent context so downstream MCP servers could interoperate.

That kept Gemini’s large‑context benefits while letting MCP handle standardized orchestration. Adding Claude later was a configuration change, not a rewrite.

Tech stack

  • Frontend: Next.js 15, React 19, Tailwind 4, NextUI, Framer Motion (SSE for streaming)
  • Backend: FastAPI (async), session management, structured logging
  • Orchestration: MCP protocol with Gemini/Claude integrations
  • Agents & Data: SQLite (aiosqlite), Google Drive API, external APIs; PDF/DOCX/XLSX/PPTX processing

Architecture overview

  • Backend: FastAPI gateway (backend/api/gateway/app.py)

    • CORS enabled for all origins.
    • Session lifecycle:
      • POST /api/init creates or replaces a session, instantiates Gemini MCP client, connects to a selected MCP server via stdio, caches session_id → client in active_sessions, and returns available tool names.
      • DELETE /api/session/{session_id} cleans up and removes a session.
      • GET /api/health returns health and active_sessions count.
    • Query processing:
      • POST /api/query runs a single-turn request via the MCP client and returns the final response JSON.
      • GET /api/query-stream?session_id=…&query=… opens an SSE stream:
        • Sets a progress callback into the MCP client.
        • Spawns the analysis task; streams progress events and a final event.
        • Emits keepalive messages on idle timeouts, and a final error message if timeouts accumulate post-completion.
  • Orchestrator: Gemini MCP Client (backend/api/gateway/client_gemini.py)

    • LLM: Google GenAI genai.Client(); models: "gemini-2.5-flash" primary and final, with "gemini-1.5-flash" fallback.
    • MCP connection: mcp.client.stdio to spawn and connect to an MCP server script path; ClientSession handles initialize, list_tools, and call_tool.
    • Flow per query:
      • Conversation history management and context compaction.
      • Classification: LLM returns CASUAL_CONVERSATION | HELP_REQUEST | ANALYSIS_NEEDED.
      • If casual/help → respond directly; else run strategic analysis:
        • Phase 1–2: Analyze the user query and plan steps using available MCP tools (from list_tools).
        • Phase 3: Execute steps:
          • Prompt LLM with tools enabled to produce function calls.
          • Parse function_calls; enqueue each into request_queue; queue worker calls session.call_tool.
          • Collect results; update metrics; stream progress via _report_progress.
        • Phase 4: Critical evaluation with LLM over accumulated context.
        • Phase 5: Synthesis with LLM for final answer; return text.
    • Tool calling: Cleans schemas for Gemini; extracts function calls; manages async queue; aggregates results; formats results for follow-up prompts.
    • Progress streaming: set_progress_callback is injected by gateway; client calls it with messages, step markers, and metrics.
    • Token tracking: rough usage/cost estimates recorded for each call.
  • Alternative orchestrator (deprecated): Claude MCP Client (backend/api/gateway/client_claude.py)

    • Similar orchestration structure using Anthropic Messages API.
    • Maintained but the gateway defaults to Gemini.
  • Agents (MCP servers)

    • Google Drive document processing server (backend/agents/document_processor/generic_google_drive_mcp_server/server.py):
      • Exposes MCP tools:
        • list_drive_folders(parent_folder_id?)
        • list_drive_files(folder_id?, file_types="all")
        • get_file_content(file_id, max_chars=100000) with content extraction for PDF, Excel, Word, PowerPoint, TXT.
        • get_file_metadata(file_id)
        • search_drive_files(query, folder_id?)
      • Uses Service Account credentials for Drive API; caches file contents; robust per-type extraction and chunking helpers.
    • Other MCP servers referenced via SERVER_PATHS in the gateway:
      • rfx: ./backend/agents/rfx_analyzer/mcp-server_rfx/server_test.py
      • samsara: ./backend/agents/samsara_integration/mcp_server_samsara/server.py
      • raw_rfx: ./backend/agents/raw_data_processor/mcp_server_rfx_raw_data/server.py
      • These scripts are intended to be spawned via stdio and expose tools discoverable by list_tools.
  • Frontend: Next.js chat UI (frontend/src/pages/page.js)

    • On mount or server type change:
      • GET /api/server-types to populate selection.
      • POST /api/init with { server_type }, stores session_id, shows welcome message, and enables suggestions.
    • Sending messages:
      • Preferred: opens SSE GET /api/query-stream?session_id&query=….
        • Receives progress events (used by ProgressTracker) and a final event which appends the assistant message.
      • Fallback: POST /api/query with { session_id, query } on SSE error.
    • Cleanup: closes SSE and DELETE /api/session/{session_id} on reset or unmount.
    • Rendering: streams progress, renders assistant text with markdown and table extraction (ChatUtils.js).

Request flow

  • Session

    • POST /api/init:
      • Validates server type against SERVER_PATHS.
      • Creates Gemini MCP client → connect_to_server(server_path)initializelist_tools.
      • Stores active_sessions[session_id] = client, returns {status: "connected", response: {"session_id", "server_type", "available_tools": [...]}}.
  • Query (SSE)

    • GET /api/query-stream?session_id&query:
      • Validates session; yields an initial progress event.
      • Injects a progress_callback into the MCP client to enqueue progress messages.
      • Runs client.process_query(query) in background task.
      • For each queued item: writes an SSE event.
      • On completion: emits a final event with response or error.
      • Keepalive every 20s if idle, with cut-off after 3 consecutive timeouts.
  • Query (non-streaming fallback)

    • POST /api/query:
      • Calls client.process_query(query) and returns {status: "success", response} or {status: "error", message}.
  • Frontend consumption

    • Establishes SSE, updates progressDetails and progressIndicator with each event, appends final assistant message when type === 'final'.

Notable implementation details

  • The gateway currently initializes Gemini-based MCP client by default (load_mcp_client uses GeminiClient).
  • SSE formatting includes enriched step completion detection by parsing messages for “Successfully … step X”.
  • The MCP client tracks operation metrics (queries executed, documents processed, folders scanned) and includes these in progress details.
  • Tool schemas returned by MCP are normalized for Gemini compatibility (_clean_schema_for_gemini).
  • The Google Drive MCP server robustly handles different Google MIME types by exporting to Office formats then parsing text.
  • Cleanup is implemented on FastAPI lifespan shutdown and explicit DELETE /api/session/{id}.

Diagrams

Component/flow overview

flowchart LR
  UI["Next.js Chat UI (frontend/src/pages/page.js)"]
  API["FastAPI Gateway (backend/api/gateway/app.py)"]
  ORCH["Orchestrator: Gemini MCP Client (backend/api/gateway/client_gemini.py)"]
  QUEUE(("Async request_queue worker"))
  GENAI["Google GenAI models.generate_content"]
  MCP1[["MCP Server: gdrive-competitor-analysis (backend/agents/.../generic_google_drive_mcp_server/server.py)"]]
  MCPx[["Other MCP servers (paths from SERVER_PATHS)"]]
  SSE(("SSE stream to browser"))

  UI -->|"POST /api/init {server_type}"| API
  API -->|"spawn + connect_to_server(server_path)"| ORCH
  ORCH -->|"stdio_client"| MCPx
  ORCH -->|"list_tools"| MCPx
  API -->|"{status: connected, session_id}"| UI

  UI -->|"GET /api/query-stream?session_id&query"| API
  API -->|"set_progress_callback"| ORCH
  ORCH -->|"classify / plan / execute"| GENAI
  GENAI --> ORCH

  ORCH -->|"tool function_calls"| QUEUE
  QUEUE -->|"call_tool"| MCP1
  MCP1 -->|"tool_result"| QUEUE
  QUEUE --> ORCH
  ORCH -->|"_report_progress(...)"| API
  API -->|"SSE {type: progress, details, metrics}"| SSE
  SSE --> UI

  ORCH -->|"final synthesis text"| API
  API -->|"SSE {type: final, response}"| UI

  API -->|"keepalive on timeout"| UI

Sequence

sequenceDiagram
  autonumber
  participant FE as "Next.js UI"
  participant GW as "FastAPI Gateway"
  participant MCP as "Gemini MCP Client"
  participant SRV as "MCP Server (Google Drive)"
  participant LLM as "Google GenAI"

  FE->>GW: "POST /api/init {server_type}"
  GW->>MCP: "connect_to_server(server_path)"
  MCP->>SRV: "stdio initialize"
  MCP->>SRV: "list_tools()"
  SRV-->>MCP: "tools[]"
  MCP-->>GW: "ready"
  GW-->>FE: "{status: connected, session_id}"

  FE->>GW: "GET /api/query-stream?session_id&query"
  GW->>MCP: "set_progress_callback(cb)"
  par "classify and plan"
    MCP->>LLM: "classify (CASUAL|HELP|ANALYSIS)"
    LLM-->>MCP: "label"
    MCP->>LLM: "planning prompt with tool list"
    LLM-->>MCP: "step-by-step plan"
  end
  loop "for each planned step"
    MCP->>LLM: "execution prompt (tools enabled)"
    LLM-->>MCP: "function_calls"
    MCP->>SRV: "call_tool(name,args) via queue"
    SRV-->>MCP: "tool_result"
    MCP-->>GW: "_report_progress('Successfully completed step i', details)"
    GW-->>FE: "SSE progress event"
  end
  MCP->>LLM: "synthesis prompt (full context)"
  LLM-->>MCP: "final analysis text"
  MCP-->>GW: "final response"
  GW-->>FE: "SSE final event"

Key data contracts

  • Init session

    • Request: POST /api/init with JSON {"server_type": "rfx"|"samsara"|"raw_rfx"}
    • Response (success):
      • {"status":"connected","response":"{\"session_id\":\"…\",\"server_type\":\"…\",\"available_tools\":[…]}"}
  • Streaming query

    • Request: GET /api/query-stream?session_id=...&query=... (SSE)
    • Events:
      • Progress: {"type":"progress","message":"…","details":{...},"metrics":{...},"timestamp":...}
      • Keepalive: {"type":"keepalive","consecutive_timeouts":n,"processing_complete":bool,"queue_empty":bool,"final_sent":bool,"timestamp":...}
      • Final: {"type":"final","response":"<final_text>","timestamp":...}
      • Error: {"type":"error","message":"…","timestamp":...}
  • Non-streaming query

    • Request: POST /api/query with {"session_id":"…","query":"…"}.
    • Response: {"status":"success","response":"<final_text>"} or {"status":"error","message":"…"}.
  • Frontend message model

    • { role: 'assistant'|'user', content: string, hasCompleted?: boolean }
    • SSE consumes progress events to populate progressDetails and progressIndicator.

What you’ll find in the repo

  • Complete frontend (chat UI, streaming, mobile‑ready)
  • FastAPI gateway with sessions and SSE
  • MCP client and multiple specialized MCP servers
  • Error handling, monitoring, and progress tracking

What’s not included

  • Proprietary data, credentials, keys, and domain‑specific logic (kept out intentionally)

Reference