MCP Client Tutorial: Connect to Any MCP Server in 5 Minutes (mcp_client Part2)
You’ve read about MCP. You understand it’s “Function Calling + Standardization.” You’ve seen dozens of MCP servers on GitHub — Gmail, Slack, databases, weather APIs. Claude Desktop and Cursor have MCP client built-in.

The Problem Nobody Talks About
You’ve read about MCP. You understand it’s “Function Calling + Standardization.” You’ve seen dozens of MCP servers on GitHub — Gmail, Slack, databases, weather APIs. Claude Desktop and Cursor have MCP client built-in.
But here’s what nobody tells you: if you want to use MCP in YOUR product, you can’t just use Claude Desktop.
You need your own MCP client.
The MCP ecosystem has focused entirely on servers. Everyone builds servers, shares servers, talks about servers. But clients? That’s the missing piece.
This article shows you how to build a production-ready MCP client that actually works.
Understanding the Foundation
Before we dive into the code, you need to understand how MCP actually works. I wrote a detailed article explaining the MCP protocol from the ground up — no magic, just raw JSON messages and real server interactions.
Read this first: MCP = Function Calling + Standardization (Just for Tools)
That article shows you:
How clients and servers communicate (the actual JSON messages)
The three transport types: stdio, SSE, and Streamable HTTP
Real examples with curl commands showing the raw protocol
Why MCP is both simple and powerful
Once you understand the protocol, building a client becomes straightforward. Let’s build one.
MCP Python SDK: The Building Blocks
Before we build our client, let’s understand the key functions from the MCP Python SDK that we’ll use. Remember those curl commands from the previous article? The SDK wraps all that complexity into simple Python functions.
Transport Clients: Establishing Connections
stdio_client(server_params) - Connects to local servers via standard input/output
This is what happens when you run a local Python or Node.js script. The SDK launches the process and creates a communication channel.
from mcp.client.stdio import stdio_client
from mcp import StdioServerParameters
# Creates a subprocess and connects to it
transport = stdio_client(StdioServerParameters(
command="python",
args=["weather.py"]
))
read, write = transportRemember from the curl article: with stdio, you type JSON directly into the terminal. The SDK does this programmatically — it launches the subprocess and connects to its stdin/stdout.
sse_client(url, headers, timeout, sse_read_timeout) - Connects to remote servers via Server-Sent Events
This opens a persistent HTTP connection for receiving events from the server.
from mcp.client.sse import sse_client
# Opens SSE connection to remote server
transport = sse_client(
"https://mcp.deepwiki.com/sse",
headers=None,
timeout=5.0, # Connection timeout
sse_read_timeout=300.0 # How long to wait for events
)
read, write = transportIn the curl article, we had two terminals — one listening with curl GET (receiving events), another sending with curl POST (sending commands). The SDK manages both channels for you in one function call.
streamablehttp_client(url, headers, timeout, sse_read_timeout, terminate_on_close) - Connects via modern Streamable HTTP
The newest transport that combines request-response and streaming in one clean protocol.
from mcp.client.streamable_http import streamablehttp_client
from datetime import timedelta
# Opens streamable HTTP connection
transport = streamablehttp_client(
"http://localhost:8080/mcp",
headers={"Authorization": "Bearer token"},
timeout=timedelta(seconds=30),
sse_read_timeout=timedelta(seconds=300),
terminate_on_close=True # Clean shutdown
)
read, write, session_id = transportNotice it returns a session_id—just like the Mcp-Session-Id header we used in curl commands! This ID identifies your session for all subsequent requests.
NB: streamablehttp_client returns a session_id because HTTP is stateless—each POST request is independent. The server generates this ID to identify your session across requests (like the Mcp-Session-Id header in our curl examples). The SDK automatically includes it in every request, so you never use it manually. stdio and SSE don't need this because they maintain persistent connections.
ClientSession: The MCP Protocol Layer
ClientSession(read, write) - Creates an MCP session from a transport
This handles the MCP protocol itself — the JSON-RPC messages we saw in the curl examples.
from mcp import ClientSession
# Wraps the transport with MCP protocol handling
session = ClientSession(read, write)The session takes care of formatting JSON-RPC messages, managing request IDs, and parsing responses.
session.initialize() - Performs the complete MCP handshake
Remember the two-step handshake from the curl article? First we sent initialize, then notifications/initialized. This method does both automatically!
# This does TWO things:
# 1. Sends: {"jsonrpc":"2.0","id":1,"method":"initialize",...}
# Receives: {"jsonrpc":"2.0","id":1,"result":{...}}
#
# 2. Sends: {"jsonrpc":"2.0","method":"notifications/initialized","params":{}}
# (No response expected - it's a notification)
await session.initialize()Why two steps? The first exchange negotiates protocol version and capabilities. The second confirms “I got it, I’m ready to work.” In our curl examples, we sent both messages manually. The SDK combines them into one convenient call.
session.list_tools() - Discovers available tools
This sends the tools/list request we used in curl.
# Sends: {"jsonrpc":"2.0","id":2,"method":"tools/list","params":{}}
# Receives: {"jsonrpc":"2.0","id":2,"result":{"tools":[...]}}
tools_response = await session.list_tools()
tools = tools_response.tools # List of Tool objectsEach tool has: name, description, inputSchema (just like in the JSON responses we saw). The SDK parses the JSON and gives you nice Python objects.
session.call_tool(tool_name, arguments) - Executes a tool
This sends the tools/call request with your arguments.
# Sends: {"jsonrpc":"2.0","id":3,"method":"tools/call",
# "params":{"name":"get_alerts","arguments":{"state":"CA"}}}
# Receives: {"jsonrpc":"2.0","id":3,"result":{"content":[...]}}
result = await session.call_tool("get_alerts", {"state": "CA"})The result contains the tool’s output, just like in our curl examples. The SDK handles the JSON-RPC wrapping and unwrapping.
AsyncExitStack: Resource Management
AsyncExitStack() - Manages cleanup of async resources
This ensures all connections close properly in the correct order.
from contextlib import AsyncExitStack
exit_stack = AsyncExitStack()
# Add resources to the stack
transport = await exit_stack.enter_async_context(stdio_client(...))
session = await exit_stack.enter_async_context(ClientSession(...))
# Later: close everything in reverse order
await exit_stack.aclose()Why reverse order? The session must close before the transport (so it can send a clean shutdown message), and the transport before the subprocess (so the process can exit gracefully). The exit stack handles this automatically.
Think of it like a stack of plates — you take them off in reverse order of how you put them on.
Now that we understand these building blocks, let’s build the client!
The Complete MCP Client: Code Walkthrough
Part 1: Configuration Types
First, we need clean configuration. No messy dictionaries — proper types that your IDE understands:
from dataclasses import dataclass
from typing import Literal, Optional, List, Dict, Union
from pathlib import Path
TransportType = Literal["stdio", "sse", "streamable_http"]
@dataclass
class ServerConfig:
"""MCP server configuration
For stdio (local servers):
ServerConfig(
name="weather",
transport="stdio",
command="python",
args=["weather.py"]
)
For HTTP (remote servers):
ServerConfig(
name="github",
transport="streamable_http",
url="http://localhost:8080/mcp"
)
"""
name: str
transport: TransportType
# stdio parameters
command: Optional[str] = None
args: Optional[List[str]] = None
env: Optional[Dict[str, str]] = None
cwd: Optional[Union[str, Path]] = None
# HTTP parameters (SSE & Streamable HTTP)
url: Optional[str] = None
headers: Optional[Dict[str, str]] = None
timeout: Optional[float] = None
sse_read_timeout: Optional[float] = NoneWhy this matters: Type safety catches errors before runtime. Your IDE autocompletes. Python’s type checker validates. The Literal type ensures you can only use valid transports—try to pass "http" and your type checker will complain immediately.
We also track connected servers:
from datetime import datetime
from mcp.types import Tool
from mcp import ClientSession
@dataclass
class ServerInfo:
"""Information about a connected server"""
name: str
config: ServerConfig
session: ClientSession
tools: List[Tool]
connected_at: datetimeThis stores everything you need to know about a connected server — its configuration, active session, available tools, and when it connected.
Part 2: The Universal MCP Client
Now the main client class. This handles all three transports and manages persistent connections:
from contextlib import AsyncExitStack
class UniversalMCPClient:
"""Universal MCP client with persistent sessions
Basic usage:
async with UniversalMCPClient() as client:
await client.add_server(ServerConfig(...))
result = await client.call_tool("server", "tool", {"arg": "value"})
"""
def __init__(self):
self._exit_stack = AsyncExitStack()
self._servers: Dict[str, ServerInfo] = {}The AsyncExitStack is crucial. It automatically manages cleanup of all connections in the correct order. When the client closes, everything gets cleaned up properly — no leaked connections, no zombie processes, no hanging HTTP sessions.
Part 3: Adding Servers
The add_server method connects to an MCP server and stores the session:
async def add_server(self, config: ServerConfig) -> ServerInfo:
"""Add and connect to an MCP server"""
# Validation
if config.name in self._servers:
raise ValueError(f"Server '{config.name}' already exists")
# Validate transport
valid_transports = ["stdio", "sse", "streamable_http"]
if config.transport not in valid_transports:
raise ValueError(
f"Invalid transport '{config.transport}'. "
f"Must be one of: {valid_transports}"
)
logger.info(f"🔌 Connecting to {config.name} ({config.transport})...")
try:
# Create session based on transport
if config.transport == "stdio":
session = await self._connect_stdio(config)
elif config.transport == "sse":
session = await self._connect_sse(config)
elif config.transport == "streamable_http":
session = await self._connect_streamable_http(config)
else:
raise ValueError(f"Unknown transport: {config.transport}")
# Get available tools (calls session.list_tools())
tools_response = await session.list_tools()
tools = tools_response.tools
# Store server info
server_info = ServerInfo(
name=config.name,
config=config,
session=session,
tools=tools,
connected_at=datetime.utcnow()
)
self._servers[config.name] = server_info
logger.info(f"✅ Connected to {config.name}")
logger.info(f" Tools: {[t.name for t in tools]}")
return server_info
except Exception as e:
print(f"❌ Failed to connect to {config.name}: {e}")
raise ConnectionError(f"Failed to connect to {config.name}") from eWhy this design? The method handles all transports uniformly. Once connected, it immediately discovers available tools using session.list_tools(). If anything fails, you get a clear error message with context.
Part 4: Transport Implementations
Each transport has its own connection method. Here’s stdio (local servers):
from mcp import StdioServerParameters
from mcp.client.stdio import stdio_client
async def _connect_stdio(self, config: ServerConfig) -> ClientSession:
"""Connect via stdio transport"""
if not config.command or not config.args:
raise ValueError("command and args required for stdio")
server_params = StdioServerParameters(
command=config.command,
args=config.args,
env=config.env,
cwd=config.cwd
)
# Enter context with exit_stack
# This calls stdio_client() which launches the subprocess
transport = await self._exit_stack.enter_async_context(
stdio_client(server_params)
)
read, write = transport
# Create MCP session from transport
session = await self._exit_stack.enter_async_context(
ClientSession(read, write)
)
# Initialize session (MCP handshake - both initialize + notifications/initialized)
await session.initialize()
return sessionThe key detail: We enter both the transport and session into the exit stack. When cleanup happens, the session closes first (sending any final messages), then the transport closes (closing the pipes), then the subprocess terminates. This prevents the common bug where the process dies before the session can shutdown cleanly.
Here’s SSE (remote servers with Server-Sent Events):
from mcp.client.sse import sse_client
async def _connect_sse(self, config: ServerConfig) -> ClientSession:
"""Connect via SSE transport"""
if not config.url:
raise ValueError("url required for SSE")
timeout = config.timeout or 5.0
sse_read_timeout = config.sse_read_timeout or 300.0
# Opens SSE connection to remote server
# This is like our curl GET command that listened for events
transport = await self._exit_stack.enter_async_context(
sse_client(
config.url,
config.headers,
timeout,
sse_read_timeout
)
)
read, write = transport
# Create MCP session
session = await self._exit_stack.enter_async_context(
ClientSession(read, write)
)
# Perform handshake (initialize + notifications/initialized)
await session.initialize()
return sessionTwo timeouts explained:
timeout(5s default): How long to wait for the initial connection to establishsse_read_timeout(5 minutes default): How long to keep the event stream alive waiting for messages
These are different concerns — connection should be fast, but a streaming response might legitimately take minutes if the server is doing heavy computation.
And Streamable HTTP (the modern approach):
from datetime import timedelta
from mcp.client.streamable_http import streamablehttp_client
async def _connect_streamable_http(self, config: ServerConfig) -> ClientSession:
"""Connect via Streamable HTTP transport"""
if not config.url:
raise ValueError("url required for Streamable HTTP")
timeout = timedelta(seconds=config.timeout or 30)
sse_read_timeout = timedelta(seconds=config.sse_read_timeout or 300)
# Opens streamable HTTP connection
transport = await self._exit_stack.enter_async_context(
streamablehttp_client(
config.url,
config.headers,
timeout,
sse_read_timeout,
terminate_on_close=True
)
)
read, write, session_id = transport
logger.info(f"Session ID: {session_id}")
# Create MCP session
session = await self._exit_stack.enter_async_context(
ClientSession(read, write) #the session id is in write!
)
# Perform handshake (initialize + notifications/initialized)
await session.initialize()
return sessionThe terminate_on_close=True flag actively terminates the HTTP session on disconnect, preventing resource leaks on the server side. Notice we get a session_id—this is exactly like the Mcp-Session-Id header from our curl examples! Every subsequent request will use this ID to identify our session.
Part 5: Using Tools
Once connected, calling tools is simple:
from typing import Any
async def call_tool(
self,
server_name: str,
tool_name: str,
arguments: Optional[Dict[str, Any]] = None
) -> Any:
"""Call a tool on a server"""
if server_name not in self._servers:
available = list(self._servers.keys())
raise ValueError(
f"Server '{server_name}' not found. "
f"Available: {available}"
)
server = self._servers[server_name]
arguments = arguments or {}
logger.info(f"🔧 {server_name}.{tool_name}({arguments})")
# This calls session.call_tool() which sends the "tools/call" JSON-RPC message
result = await server.session.call_tool(tool_name, arguments)
logger.info(f"✅ Success")
return resultError handling: If you try to use a non-existent server, you get a clear error with a list of available servers. The session.call_tool() method handles all the JSON-RPC formatting—it wraps your arguments, sends the request, waits for the response, and unwraps the result.
Part 6: Discovery and Management
The client provides methods to explore what’s connected:
def list_servers(self) -> List[str]:
"""List all connected server names"""
return list(self._servers.keys())
def list_tools(
self,
server_name: Optional[str] = None
) -> Dict[str, List[Tool]]:
"""List available tools
Args:
server_name: If specified, list only this server's tools
Returns:
Dict {server_name: [tools]}
"""
if server_name:
if server_name not in self._servers:
return {}
return {server_name: self._servers[server_name].tools}
return {
name: server.tools
for name, server in self._servers.items()
}
def get_server_info(self, server_name: str) -> Optional[ServerInfo]:
"""Get detailed information about a server"""
return self._servers.get(server_name)Why these methods? They mirror how you think about the problem:
“What servers do I have?” →
client.list_servers()“What can they do?” →
client.list_tools()“Tell me more about this server” →
client.get_server_info()
Simple, intuitive, and powerful.
Part 7: Cleanup and Context Manager
Proper cleanup is critical:
async def disconnect_all(self):
"""Disconnect all servers"""
print("🔌 Disconnecting all servers...")
# This closes all sessions and transports in reverse order
# Sessions close first (can send final messages)
# Then transports close (pipes/HTTP connections)
# Then subprocesses terminate
await self._exit_stack.aclose()
self._servers.clear()
print("✅ All servers disconnected")
# Context manager support
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.disconnect_all()The context manager pattern means you can use async with UniversalMCPClient() as client: and cleanup happens automatically when you exit the block. The exit_stack.aclose() ensures everything closes in the right order—no manual tracking needed, and for product and production we can also handle manual the context (for test is good to use with and for production just object).
Complete Working Example
Here’s a full example showing everything in action:
import asyncio
async def main():
"""Complete example of using the MCP client"""
print("=" * 60)
print("Universal MCP Client - Example")
print("=" * 60)
print()
async with UniversalMCPClient() as client:
# connect to stdio server
await client.add_server(ServerConfig(
name="weather",
transport="stdio",
command="python",
args=["/home/said/Bureau/MCP/weather/weather.py"]
))
# Connect to a remote SSE server
await client.add_server(ServerConfig(
name="deepwiki",
transport="sse",
url="https://mcp.deepwiki.com/sse"
))
# You can also connect to Streamable HTTP servers
# await client.add_server(ServerConfig(
# name="github",
# transport="streamable_http",
# url="http://localhost:8080/mcp",
# headers={"Authorization": "Bearer your-token"}
# ))
print()
print("=" * 60)
print("CONNECTED SERVERS & THEIR TOOLS")
print("=" * 60)
# List all connected servers
print(f"\n📋 Connected servers: {client.list_servers()}")
# List all available tools
all_tools = client.list_tools()
for server_name, tools in all_tools.items():
print(f"\n🔧 {server_name}:")
for tool in tools:
print(f" - {tool.name}: {tool.description}")
print()
print("=" * 60)
print("CALLING TOOLS")
print("=" * 60)
print()
# Call a tool on the weather server
try:
result = await client.call_tool(
"weather",
"get_alerts",
{"state": "CA"}
)
print(f"\n📊 Weather Result:")
print(result)
except Exception as e:
print(f"\n❌ Error: {e}")
# Call a tool on the deepwiki server
try:
result = await client.call_tool(
"deepwiki",
"read_wiki_structure",
{"repoName": "facebook/react"}
)
print(f"\n📊 DeepWiki Result:")
print(result)
except Exception as e:
print(f"\n❌ Error: {e}")
print()
print("=" * 60)
print("SERVER DETAILS")
print("=" * 60)
# Get detailed info about a server
weather_info = client.get_server_info("weather")
if weather_info:
print(f"\n📡 {weather_info.name}:")
print(f" Transport: {weather_info.config.transport}")
print(f" Connected at: {weather_info.connected_at}")
print(f" Available tools: {len(weather_info.tools)}")
# Cleanup happens automatically here
print()
print("=" * 60)
print("✅ All servers disconnected automatically")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())to use this test:
run: python test_client.py
You’ll see:
Connections being established (using
stdio_client,sse_client, orstreamablehttp_client)The MCP handshake happening (
initialize+notifications/initialized)Available tools being discovered (via
session.list_tools())Tools being called (via
session.call_tool())Results being returned
Automatic cleanup on exit (via
exit_stack.aclose())
What’s Next: The Roadmap
This is Version 1 of the Universal MCP Client. It handles the core functionality: connecting to servers via all three transports, performing the MCP handshake, discovering tools, and calling them with proper resource management.
Version 2 will add:
Direct LLM integration (OpenAI, Anthropic, etc.)
Automatic tool selection based on user queries
Conversation management with context
Streaming responses
Retry logic and error recovery
Version 3 will include:
A beautiful UI like Claude Desktop
Visual server management dashboard
Interactive tool testing interface
Conversation history and persistence
Multi-server orchestration and routing
Full code available: contact me, and if you have any think to add i will be here to discuss with you.
Related reading: MCP = Function Calling + Standardization
Coming soon: Version 2 with LLM integration, Version 3 with UI
The future of AI integration is here, and now you have the client to make it work.
Enjoyed this article? Share it!