local_mcp_server

suqingdong/local_mcp_server

3.1

If you are the rightful owner of local_mcp_server and would like to certify it and/or have it hosted online, please leave a comment on the right or send an email to henry@mcphub.com.

The MCP Server is a versatile tool designed to facilitate communication between local tools and functions using the Model Context Protocol (MCP).

Start a MCP Server with local tools/functions

Installation

pip install local-mcp-server

Start Server

local_mcp_server -h

local_mcp_server -H 0.0.0.0 -P 8000 -T streamable-http

Add Custom Tools

add custom script to tools directory, for example: tools/test_func.py

Client Usage Example

async support only.

1. streamable_http transport
import asyncio
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent
from langchain_openai import AzureChatOpenAI
import dotenv

dotenv.load_dotenv()


client = MultiServerMCPClient({
    "math": {
        "url": "http://localhost:8000/mcp",
        "transport": "streamable_http",
    },
})

llm = AzureChatOpenAI(model="gpt-4o")

async def main():
    tools = await client.get_tools()
    print('>>> found tools:', tools)

    agent_executor = create_react_agent(llm, tools)

    query = "武清明天天气如何?"
    response = await agent_executor.ainvoke({"messages": f"{query}"})
    
    print(response["messages"][-1].content)


if __name__ == "__main__":
    asyncio.run(main())

2. stdio transport
import asyncio
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent
from langchain_openai import AzureChatOpenAI
import dotenv

dotenv.load_dotenv()


client = MultiServerMCPClient({
    "math": {
        "command": "python",
        "args": ["-m", 'local_mcp_server.mcp'],
        'transport': 'stdio',
    },
})

llm = AzureChatOpenAI(model="gpt-4o")

async def main():
    tools = await client.get_tools()

    print('>>> found tools:', tools)

    agent_executor = create_react_agent(llm, tools)

    query = "武清明天天气如何?"
    response = await agent_executor.ainvoke({"messages": f"{query}"})
    
    print(response["messages"][-1])

if __name__ == "__main__":
    asyncio.run(main())