rtavarezz/mcp-market-data-server
If you are the rightful owner of mcp-market-data-server and would like to certify it and/or have it hosted online, please leave a comment on the right or send an email to dayong@mcphub.com.
MCP Server + Stock Price Ingestion Pipeline is a system designed to fetch, store, and expose stock prices using Airflow, Postgres, and FastAPI with MCP tools and REST APIs.
MCP Server + Stock Price Ingestion Pipeline
What it does: Fetches 5 stock prices every 5 minutes via Airflow, stores them in Postgres, and exposes real-time MCP tools + historical REST APIs.
My learning process: MCP was new to me, so I started by building a simpler version following this tutorial and reading the official MCP docs to understand tool registration, validation, and error handling. Once I understood the pattern, I applied it here with production considerations.
How I built it:
- Started with Postgres (not a specialized time-series DB) because it's simpler to operate at this scale
- Built observability into every layer: structured logs, metrics counters, and clear error messages
- Made Airflow retries safe with
ON CONFLICT DO NOTHINGso duplicate runs don't break anything - Tested everything with mocks so tests run fast without hitting external APIs
What makes this production-ready: Every design choice has a "why" documented below, the code is typed and tested, and if something breaks at 3am, the logs tell you exactly what failed and why.
┌────────────┐ yfinance / CoinGecko
│ MCP tools │──────────────┐
└────────────┘ │
┌────────────────┴───────┐
│ FastAPI (/mcp,/prices)│
└──────┬─────────────────┘
│
▼
┌──────────────────────┐
│ Postgres / Timescale │ ◄── Airflow DAG (5mins)
└──────────────────────┘
Architecture Overview
Data flow:
- Airflow scheduler triggers every 5 minutes
- Ingestion CLI fetches prices for 5 tickers (AAPL, MSFT, GOOGL, AMZN, TSLA) via yfinance
- Postgres stores prices with idempotent writes (
ON CONFLICT DO NOTHING) - FastAPI exposes two surfaces:
- MCP tools (
/mcp/tools/*) - Real-time stock/crypto prices, bypasses DB - REST API (
/prices/*) - Historical queries from DB (latest, range, 24h return)
- MCP tools (
Key components:
app/services/market_data.py- Fetches from yfinance/CoinGecko with 3-tier fallback, 5s timeout, 60s cacheapp/services/ingestion.py- Per-ticker retry with exponential backoff (0s, 1s, 2s)dags/stock_ingestion_dag.py- Airflow DAG runningdocker exec price-api-1 python3 -m app.cli.ingest_once
Design Justification
Storage: Postgres with Timescale option
Chose Postgres over specialized time-series databases (InfluxDB, QuestDB) because:
- Simpler operations: everyone knows Postgres, standard tooling works
- Good enough performance: 5 tickers at 5-minute intervals doesn't need specialized DB
- Easy upgrade path:
CREATE EXTENSION timescaledbif we outgrow vanilla Postgres - ACID guarantees: important for financial data integrity
Schema design:
CREATE TABLE prices (
ticker TEXT NOT NULL,
price NUMERIC(18, 8) NOT NULL,
bucket_start TIMESTAMPTZ NOT NULL, -- 5-minute aligned UTC timestamp
source_ts TIMESTAMPTZ NULL, -- when upstream reported the price
PRIMARY KEY (ticker, bucket_start)
);
CREATE INDEX idx_prices_source_ts ON prices (source_ts);
- Composite PK
(ticker, bucket_start)enforces uniqueness and enables idempotent writes - Index on source_ts allows tracing when Yahoo Finance actually sent data
- NUMERIC(18,8) for prices avoids floating-point errors
Idempotent writes:
Using ON CONFLICT DO NOTHING makes Airflow retries safe. If a run executes twice (network blip), the second attempt silently skips existing rows. No constraint violations, no duplicate data.
Error handling:
- Per-ticker try/except: if TSLA times out, the other 4 still get written
- Exponential backoff: 0s, 1s, 2s between retries
- 5-second timeout on all yfinance calls (API can hang 20+ seconds during market open)
Observability:
- Structured logs: every run logs inserted/skipped/failed counts
- Metrics registry: tracks cache hits, ingestion runs for future Prometheus integration
- Clear error messages: 404 for unknown symbols, 429 for rate limits, 503 for upstream failures
Evaluation
What I measured:
Generated ~100k rows (1 year of 5-minute data for 5 tickers) and tested all endpoints:
| Endpoint | Payload | Avg Latency | Notes |
|---|---|---|---|
/prices/latest | 1 row | 3.2 ms | Single PK lookup |
/prices/range (24h) | 288 rows | 6.8 ms | Query + JSON serialization |
/prices/range (1 year) | 105k rows | 41.5 ms | Full scan, still acceptable |
/prices/24h-return | 2 rows | 4.1 ms | Derived metric calculation |
Tested with FastAPI TestClient on SQLite. Postgres should match or beat these numbers.
Query optimizations:
/latestuses composite PK for O(1) lookup/rangebenefits from sequential bucket_start values (good for time-series scans)source_tsindex supports tracing queries without full table scan
Scaling notes:
- Current bottleneck: yfinance API calls (sequential, ~500ms each)
- At 100 tickers: switch to async batch fetching or parallel task sharding
- At 1-minute intervals: consider streaming feeds (websocket/Kafka) instead of polling
- Read-heavy: add Postgres read replicas or Redis cache for
/latest
Next Steps (with more time)
Performance:
- Enable Timescale hypertables with compression for older data (
chunk_time_interval => '1 day') - Partition by month once ingesting millions of rows per day
- Archive old data to Parquet in S3/GCS after 90 days
Reliability:
- Add SLA monitoring: alert if
ingest_last_success_epochdoesn't advance for >10 minutes - Implement backfill logic for missed intervals during outages
- Add circuit breaker for yfinance (auto-disable ticker after N consecutive failures)
Operations:
- Switch to LocalExecutor + Postgres metadata for Airflow (production-ready)
- Add Prometheus metrics export for dashboards
- Set up automated schema migration testing
Features:
- Support custom ticker lists via API (
POST /config/tickers) - Add more MCP tools: historical stats, volatility calculations, earnings calendar
- WebSocket endpoint for real-time price streaming to frontends
Setup Instructions
Prerequisites:
- Docker & Docker Compose
- Python 3.11+ (for local testing)
Quick start:
# 1. Start all services
docker compose up -d
# 2. Wait ~60s for Airflow to initialize, then check http://localhost:8080
# (username: admin, password: auto-generated in logs)
# 3. Verify ingestion is running
docker exec price-airflow-1 airflow dags list
# 4. Test MCP tools
curl http://localhost:8000/mcp/tools | jq
curl -X POST http://localhost:8000/mcp/tools/get_stock_price \
-H "Content-Type: application/json" \
-d '{"ticker":"AAPL"}' | jq
# 5. Test REST API (after first ingestion completes)
curl "http://localhost:8000/prices/latest?ticker=AAPL" | jq
Running tests:
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements/base.txt -r requirements/dev.txt
pytest -v
Project structure:
app/
├── api/ # FastAPI routers for MCP + /prices
├── cli/ # CLI script for ingestion (called by Airflow)
├── core/ # Settings, logging, metrics
├── db/ # SQLAlchemy models + queries
├── services/ # Market data client + ingestion logic
└── main.py # FastAPI app factory
dags/ # Airflow DAG (5-minute schedule)
migrations/ # SQL DDL (idempotent)
tests/ # Pytest suite (mocked externals)
docs/evidence/ # Proof of working system (logs, API responses)
Configuration:
All settings via environment variables (see .env.example):
DB_URL: Postgres connection stringTICKERS: Comma-separated ticker list (default: AAPL,MSFT,GOOGL,AMZN,TSLA)CACHE_TTL_SECONDS: MCP tool cache duration (default: 60)YFINANCE_TIMEOUT_SECONDS: API call timeout (default: 5)
Evidence
Airflow 5-minute schedule proof: docs/evidence/airflow-5min-schedule-proof.txt shows 6 consecutive successful runs at 02:15, 02:20, 02:25, 02:30, 02:35, 02:40 UTC, all with state=success.
API responses:
docs/evidence/mcp-tools.json- List of 3 MCP toolsdocs/evidence/mcp-stock-price.json- Stock price via MCPdocs/evidence/mcp-crypto-price.json- Crypto price via MCPdocs/evidence/prices-latest.json- Latest price from DBdocs/evidence/prices-range.json- Historical price rangedocs/evidence/pytest-output.txt- All 10 tests passing