mcp-market-data-server

rtavarezz/mcp-market-data-server

3.2

If you are the rightful owner of mcp-market-data-server and would like to certify it and/or have it hosted online, please leave a comment on the right or send an email to dayong@mcphub.com.

MCP Server + Stock Price Ingestion Pipeline is a system designed to fetch, store, and expose stock prices using Airflow, Postgres, and FastAPI with MCP tools and REST APIs.

Tools
2
Resources
0
Prompts
0

MCP Server + Stock Price Ingestion Pipeline

What it does: Fetches 5 stock prices every 5 minutes via Airflow, stores them in Postgres, and exposes real-time MCP tools + historical REST APIs.

My learning process: MCP was new to me, so I started by building a simpler version following this tutorial and reading the official MCP docs to understand tool registration, validation, and error handling. Once I understood the pattern, I applied it here with production considerations.

How I built it:

  • Started with Postgres (not a specialized time-series DB) because it's simpler to operate at this scale
  • Built observability into every layer: structured logs, metrics counters, and clear error messages
  • Made Airflow retries safe with ON CONFLICT DO NOTHING so duplicate runs don't break anything
  • Tested everything with mocks so tests run fast without hitting external APIs

What makes this production-ready: Every design choice has a "why" documented below, the code is typed and tested, and if something breaks at 3am, the logs tell you exactly what failed and why.

┌────────────┐  yfinance / CoinGecko
│ MCP tools  │──────────────┐
└────────────┘              │
           ┌────────────────┴───────┐
           │  FastAPI (/mcp,/prices)│
           └──────┬─────────────────┘
                  │
                  ▼
      ┌──────────────────────┐
      │ Postgres / Timescale │  ◄── Airflow DAG (5mins)
      └──────────────────────┘

Architecture Overview

Data flow:

  1. Airflow scheduler triggers every 5 minutes
  2. Ingestion CLI fetches prices for 5 tickers (AAPL, MSFT, GOOGL, AMZN, TSLA) via yfinance
  3. Postgres stores prices with idempotent writes (ON CONFLICT DO NOTHING)
  4. FastAPI exposes two surfaces:
    • MCP tools (/mcp/tools/*) - Real-time stock/crypto prices, bypasses DB
    • REST API (/prices/*) - Historical queries from DB (latest, range, 24h return)

Key components:

  • app/services/market_data.py - Fetches from yfinance/CoinGecko with 3-tier fallback, 5s timeout, 60s cache
  • app/services/ingestion.py - Per-ticker retry with exponential backoff (0s, 1s, 2s)
  • dags/stock_ingestion_dag.py - Airflow DAG running docker exec price-api-1 python3 -m app.cli.ingest_once

Design Justification

Storage: Postgres with Timescale option

Chose Postgres over specialized time-series databases (InfluxDB, QuestDB) because:

  • Simpler operations: everyone knows Postgres, standard tooling works
  • Good enough performance: 5 tickers at 5-minute intervals doesn't need specialized DB
  • Easy upgrade path: CREATE EXTENSION timescaledb if we outgrow vanilla Postgres
  • ACID guarantees: important for financial data integrity

Schema design:

CREATE TABLE prices (
    ticker TEXT NOT NULL,
    price NUMERIC(18, 8) NOT NULL,
    bucket_start TIMESTAMPTZ NOT NULL,  -- 5-minute aligned UTC timestamp
    source_ts TIMESTAMPTZ NULL,         -- when upstream reported the price
    PRIMARY KEY (ticker, bucket_start)
);
CREATE INDEX idx_prices_source_ts ON prices (source_ts);
  • Composite PK (ticker, bucket_start) enforces uniqueness and enables idempotent writes
  • Index on source_ts allows tracing when Yahoo Finance actually sent data
  • NUMERIC(18,8) for prices avoids floating-point errors

Idempotent writes: Using ON CONFLICT DO NOTHING makes Airflow retries safe. If a run executes twice (network blip), the second attempt silently skips existing rows. No constraint violations, no duplicate data.

Error handling:

  • Per-ticker try/except: if TSLA times out, the other 4 still get written
  • Exponential backoff: 0s, 1s, 2s between retries
  • 5-second timeout on all yfinance calls (API can hang 20+ seconds during market open)

Observability:

  • Structured logs: every run logs inserted/skipped/failed counts
  • Metrics registry: tracks cache hits, ingestion runs for future Prometheus integration
  • Clear error messages: 404 for unknown symbols, 429 for rate limits, 503 for upstream failures

Evaluation

What I measured:

Generated ~100k rows (1 year of 5-minute data for 5 tickers) and tested all endpoints:

EndpointPayloadAvg LatencyNotes
/prices/latest1 row3.2 msSingle PK lookup
/prices/range (24h)288 rows6.8 msQuery + JSON serialization
/prices/range (1 year)105k rows41.5 msFull scan, still acceptable
/prices/24h-return2 rows4.1 msDerived metric calculation

Tested with FastAPI TestClient on SQLite. Postgres should match or beat these numbers.

Query optimizations:

  • /latest uses composite PK for O(1) lookup
  • /range benefits from sequential bucket_start values (good for time-series scans)
  • source_ts index supports tracing queries without full table scan

Scaling notes:

  • Current bottleneck: yfinance API calls (sequential, ~500ms each)
  • At 100 tickers: switch to async batch fetching or parallel task sharding
  • At 1-minute intervals: consider streaming feeds (websocket/Kafka) instead of polling
  • Read-heavy: add Postgres read replicas or Redis cache for /latest

Next Steps (with more time)

Performance:

  • Enable Timescale hypertables with compression for older data (chunk_time_interval => '1 day')
  • Partition by month once ingesting millions of rows per day
  • Archive old data to Parquet in S3/GCS after 90 days

Reliability:

  • Add SLA monitoring: alert if ingest_last_success_epoch doesn't advance for >10 minutes
  • Implement backfill logic for missed intervals during outages
  • Add circuit breaker for yfinance (auto-disable ticker after N consecutive failures)

Operations:

  • Switch to LocalExecutor + Postgres metadata for Airflow (production-ready)
  • Add Prometheus metrics export for dashboards
  • Set up automated schema migration testing

Features:

  • Support custom ticker lists via API (POST /config/tickers)
  • Add more MCP tools: historical stats, volatility calculations, earnings calendar
  • WebSocket endpoint for real-time price streaming to frontends

Setup Instructions

Prerequisites:

  • Docker & Docker Compose
  • Python 3.11+ (for local testing)

Quick start:

# 1. Start all services
docker compose up -d

# 2. Wait ~60s for Airflow to initialize, then check http://localhost:8080
#    (username: admin, password: auto-generated in logs)

# 3. Verify ingestion is running
docker exec price-airflow-1 airflow dags list

# 4. Test MCP tools
curl http://localhost:8000/mcp/tools | jq
curl -X POST http://localhost:8000/mcp/tools/get_stock_price \
  -H "Content-Type: application/json" \
  -d '{"ticker":"AAPL"}' | jq

# 5. Test REST API (after first ingestion completes)
curl "http://localhost:8000/prices/latest?ticker=AAPL" | jq

Running tests:

python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements/base.txt -r requirements/dev.txt
pytest -v

Project structure:

app/
 ├── api/         # FastAPI routers for MCP + /prices
 ├── cli/         # CLI script for ingestion (called by Airflow)
 ├── core/        # Settings, logging, metrics
 ├── db/          # SQLAlchemy models + queries
 ├── services/    # Market data client + ingestion logic
 └── main.py      # FastAPI app factory
dags/             # Airflow DAG (5-minute schedule)
migrations/       # SQL DDL (idempotent)
tests/            # Pytest suite (mocked externals)
docs/evidence/    # Proof of working system (logs, API responses)

Configuration: All settings via environment variables (see .env.example):

  • DB_URL: Postgres connection string
  • TICKERS: Comma-separated ticker list (default: AAPL,MSFT,GOOGL,AMZN,TSLA)
  • CACHE_TTL_SECONDS: MCP tool cache duration (default: 60)
  • YFINANCE_TIMEOUT_SECONDS: API call timeout (default: 5)

Evidence

Airflow 5-minute schedule proof: docs/evidence/airflow-5min-schedule-proof.txt shows 6 consecutive successful runs at 02:15, 02:20, 02:25, 02:30, 02:35, 02:40 UTC, all with state=success.

API responses:

  • docs/evidence/mcp-tools.json - List of 3 MCP tools
  • docs/evidence/mcp-stock-price.json - Stock price via MCP
  • docs/evidence/mcp-crypto-price.json - Crypto price via MCP
  • docs/evidence/prices-latest.json - Latest price from DB
  • docs/evidence/prices-range.json - Historical price range
  • docs/evidence/pytest-output.txt - All 10 tests passing