MCP
Module haystack_integrations.tools.mcp.mcp_tool
AsyncExecutor
Thread-safe event loop executor for running async code from sync contexts.
AsyncExecutor.get_instance
Get or create the global singleton executor instance.
AsyncExecutor.__init__
Initialize a dedicated event loop
AsyncExecutor.run
Run a coroutine in the event loop.
Arguments:
coro: Coroutine to executetimeout: Optional timeout in seconds
Raises:
TimeoutError: If execution exceeds timeout
Returns:
Result of the coroutine
AsyncExecutor.get_loop
Get the event loop.
Returns:
The event loop
AsyncExecutor.run_background
def run_background(
coro_factory: Callable[[asyncio.Event], Coroutine[Any, Any, Any]],
timeout: float | None = None
) -> tuple[concurrent.futures.Future[Any], asyncio.Event]
Schedule coro_factory to run in the executor's event loop without blocking the
caller thread.
The factory receives an :class:asyncio.Event that can be used to cooperatively shut
the coroutine down. The method returns both the concurrent future (to observe
completion or failure) and the created stop_event so that callers can signal termination.
Arguments:
coro_factory: A callable receiving the stop_event and returning the coroutine to execute.timeout: Optional timeout while waiting for the stop_event to be created.
Returns:
Tuple (future, stop_event).
AsyncExecutor.shutdown
Shut down the background event loop and thread.
Arguments:
timeout: Timeout in seconds for shutting down the event loop
MCPError
Base class for MCP-related errors.
MCPError.__init__
Initialize the MCPError.
Arguments:
message: Descriptive error message
MCPConnectionError
Error connecting to MCP server.
MCPConnectionError.__init__
def __init__(message: str,
server_info: "MCPServerInfo | None" = None,
operation: str | None = None) -> None
Initialize the MCPConnectionError.
Arguments:
message: Descriptive error messageserver_info: Server connection information that was usedoperation: Name of the operation that was being attempted
MCPToolNotFoundError
Error when a tool is not found on the server.
MCPToolNotFoundError.__init__
def __init__(message: str,
tool_name: str,
available_tools: list[str] | None = None) -> None
Initialize the MCPToolNotFoundError.
Arguments:
message: Descriptive error messagetool_name: Name of the tool that was requested but not foundavailable_tools: List of available tool names, if known
MCPInvocationError
Error during tool invocation.
MCPInvocationError.__init__
def __init__(message: str,
tool_name: str,
tool_args: dict[str, Any] | None = None) -> None
Initialize the MCPInvocationError.
Arguments:
message: Descriptive error messagetool_name: Name of the tool that was being invokedtool_args: Arguments that were passed to the tool
MCPClient
Abstract base class for MCP clients.
This class defines the common interface and shared functionality for all MCP clients, regardless of the transport mechanism used.
MCPClient.connect
Connect to an MCP server.
Raises:
MCPConnectionError: If connection to the server fails
Returns:
List of available tools on the server
MCPClient.call_tool
Call a tool on the connected MCP server.
Arguments:
tool_name: Name of the tool to calltool_args: Arguments to pass to the tool
Raises:
MCPConnectionError: If not connected to an MCP serverMCPInvocationError: If the tool invocation fails
Returns:
JSON string representation of the tool invocation result
MCPClient.aclose
Close the connection and clean up resources.
This method ensures all resources are properly released, even if errors occur.
StdioClient
MCP client that connects to servers using stdio transport.
StdioClient.__init__
def __init__(command: str,
args: list[str] | None = None,
env: dict[str, str | Secret] | None = None,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0) -> None
Initialize a stdio MCP client.
Arguments:
command: Command to run (e.g., "python", "node")args: Arguments to pass to the commandenv: Environment variables for the commandmax_retries: Maximum number of reconnection attemptsbase_delay: Base delay for exponential backoff in seconds
StdioClient.connect
Connect to an MCP server using stdio transport.
Raises:
MCPConnectionError: If connection to the server fails
Returns:
List of available tools on the server
SSEClient
MCP client that connects to servers using SSE transport.
SSEClient.__init__
def __init__(server_info: "SSEServerInfo",
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0) -> None
Initialize an SSE MCP client using server configuration.
Arguments:
server_info: Configuration object containing URL, token, timeout, etc.max_retries: Maximum number of reconnection attemptsbase_delay: Base delay for exponential backoff in seconds
SSEClient.connect
Connect to an MCP server using SSE transport.
Raises:
MCPConnectionError: If connection to the server fails
Returns:
List of available tools on the server
StreamableHttpClient
MCP client that connects to servers using streamable HTTP transport.
StreamableHttpClient.__init__
def __init__(server_info: "StreamableHttpServerInfo",
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0) -> None
Initialize a streamable HTTP MCP client using server configuration.
Arguments:
server_info: Configuration object containing URL, token, timeout, etc.max_retries: Maximum number of reconnection attemptsbase_delay: Base delay for exponential backoff in seconds
StreamableHttpClient.connect
Connect to an MCP server using streamable HTTP transport.
Raises:
MCPConnectionError: If connection to the server fails
Returns:
List of available tools on the server
MCPServerInfo
Abstract base class for MCP server connection parameters.
This class defines the common interface for all MCP server connection types.
MCPServerInfo.create_client
Create an appropriate MCP client for this server info.
Returns:
An instance of MCPClient configured with this server info
MCPServerInfo.to_dict
Serialize this server info to a dictionary.
Returns:
Dictionary representation of this server info
MCPServerInfo.from_dict
Deserialize server info from a dictionary.
Arguments:
data: Dictionary containing serialized server info
Returns:
Instance of the appropriate server info class
SSEServerInfo
Data class that encapsulates SSE MCP server connection parameters.
For authentication tokens containing sensitive data, you can use Secret objects for secure handling and serialization:
server_info = SSEServerInfo(
url="https://my-mcp-server.com",
token=Secret.from_env_var("API_KEY"),
)
Arguments:
url: Full URL of the MCP server (including /sse endpoint)base_url: Base URL of the MCP server (deprecated, use url instead)token: Authentication token for the server (optional)timeout: Connection timeout in seconds
base_url
deprecated
SSEServerInfo.__post_init__
Validate that either url or base_url is provided.
SSEServerInfo.create_client
Create an SSE MCP client.
Returns:
Configured MCPClient instance
StreamableHttpServerInfo
Data class that encapsulates streamable HTTP MCP server connection parameters.
For authentication tokens containing sensitive data, you can use Secret objects for secure handling and serialization:
server_info = StreamableHttpServerInfo(
url="https://my-mcp-server.com",
token=Secret.from_env_var("API_KEY"),
)
Arguments:
url: Full URL of the MCP server (streamable HTTP endpoint)token: Authentication token for the server (optional)timeout: Connection timeout in seconds
StreamableHttpServerInfo.__post_init__
Validate the URL.
StreamableHttpServerInfo.create_client
Create a streamable HTTP MCP client.
Returns:
Configured StreamableHttpClient instance
StdioServerInfo
Data class that encapsulates stdio MCP server connection parameters.
Arguments:
command: Command to run (e.g., "python", "node")args: Arguments to pass to the commandenv: Environment variables for the command For environment variables containing sensitive data, you can use Secret objects for secure handling and serialization:
server_info = StdioServerInfo(
command="uv",
args=["run", "my-mcp-server"],
env={
"WORKSPACE_PATH": "/path/to/workspace", # Plain string
"API_KEY": Secret.from_env_var("API_KEY"), # Secret object
}
)
Secret objects will be properly serialized and deserialized without exposing the secret value, while plain strings will be preserved as-is. Use Secret objects for sensitive data that needs to be handled securely.
StdioServerInfo.create_client
Create a stdio MCP client.
Returns:
Configured StdioMCPClient instance
MCPTool
A Tool that represents a single tool from an MCP server.
This implementation uses the official MCP SDK for protocol handling while maintaining compatibility with the Haystack tool ecosystem.
Response handling:
- Text and image content are supported and returned as JSON strings
- The JSON contains the structured response from the MCP server
- Use json.loads() to parse the response into a dictionary
Example using Streamable HTTP:
import json
from haystack_integrations.tools.mcp import MCPTool, StreamableHttpServerInfo
# Create tool instance
tool = MCPTool(
name="multiply",
server_info=StreamableHttpServerInfo(url="http://localhost:8000/mcp")
)
# Use the tool and parse result
result_json = tool.invoke(a=5, b=3)
result = json.loads(result_json)
Example using SSE (deprecated):
import json
from haystack.tools import MCPTool, SSEServerInfo
# Create tool instance
tool = MCPTool(
name="add",
server_info=SSEServerInfo(url="http://localhost:8000/sse")
)
# Use the tool and parse result
result_json = tool.invoke(a=5, b=3)
result = json.loads(result_json)
Example using stdio:
import json
from haystack.tools import MCPTool, StdioServerInfo
# Create tool instance
tool = MCPTool(
name="get_current_time",
server_info=StdioServerInfo(command="python", args=["path/to/server.py"])
)
# Use the tool and parse result
result_json = tool.invoke(timezone="America/New_York")
result = json.loads(result_json)
MCPTool.__init__
def __init__(name: str,
server_info: MCPServerInfo,
description: str | None = None,
connection_timeout: int = 30,
invocation_timeout: int = 30,
eager_connect: bool = False)
Initialize the MCP tool.
Arguments:
name: Name of the tool to useserver_info: Server connection informationdescription: Custom description (if None, server description will be used)connection_timeout: Timeout in seconds for server connectioninvocation_timeout: Default timeout in seconds for tool invocationseager_connect: If True, connect to server during initialization. If False (default), defer connection until warm_up or first tool use, whichever comes first.
Raises:
MCPConnectionError: If connection to the server failsMCPToolNotFoundError: If no tools are available or the requested tool is not foundTimeoutError: If connection times out
MCPTool.ainvoke
Asynchronous tool invocation.
Arguments:
kwargs: Arguments to pass to the tool
Raises:
MCPInvocationError: If the tool invocation failsTimeoutError: If the operation times out
Returns:
JSON string representation of the tool invocation result
MCPTool.warm_up
Connect and fetch the tool schema if eager_connect is turned off.
MCPTool.to_dict
Serializes the MCPTool to a dictionary.
The serialization preserves all information needed to recreate the tool, including server connection parameters and timeout settings. Note that the active connection is not maintained.
Returns:
Dictionary with serialized data in the format:
{"type": fully_qualified_class_name, "data": {parameters}}
MCPTool.from_dict
Deserializes the MCPTool from a dictionary.
This method reconstructs an MCPTool instance from a serialized dictionary, including recreating the server_info object. A new connection will be established to the MCP server during initialization.
Arguments:
data: Dictionary containing serialized tool data
Raises:
None: Various exceptions if connection fails
Returns:
A fully initialized MCPTool instance
MCPTool.close
Close the tool synchronously.
MCPTool.__del__
Cleanup resources when the tool is garbage collected.
MCPTool.tool_spec
Return the Tool specification to be used by the Language Model.
MCPTool.invoke
Invoke the Tool with the provided keyword arguments.
_MCPClientSessionManager
Runs an MCPClient connect/close inside the AsyncExecutor's event loop.
Life-cycle:
- Create the worker to schedule a long-running coroutine in the dedicated background loop.
- The coroutine calls connect on mcp client; when it has the tool list it fulfils a concurrent future so the synchronous thread can continue.
- It then waits on an
asyncio.Event. stop()sets the event from any thread. The same coroutine then calls close() on mcp client and finishes without the dreadedAttempted to exit cancel scope in a different task than it was entered inerror thus properly closing the client.
_MCPClientSessionManager.tools
Return the tool list already collected during startup.
_MCPClientSessionManager.stop
Request the worker to shut down and block until done.
Module haystack_integrations.tools.mcp.mcp_toolset
MCPToolset
A Toolset that connects to an MCP (Model Context Protocol) server and provides access to its tools.
MCPToolset dynamically discovers and loads all tools from any MCP-compliant server, supporting both network-based streaming connections (Streamable HTTP, SSE) and local process-based stdio connections. This dual connectivity allows for integrating with both remote and local MCP servers.
Example using MCPToolset in a Haystack Pipeline:
# Prerequisites:
# 1. pip install uvx mcp-server-time # Install required MCP server and tools
# 2. export OPENAI_API_KEY="your-api-key" # Set up your OpenAI API key
import os
from haystack import Pipeline
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.tools import ToolInvoker
from haystack.dataclasses import ChatMessage
from haystack_integrations.tools.mcp import MCPToolset, StdioServerInfo
# Create server info for the time service (can also use SSEServerInfo for remote servers)
server_info = StdioServerInfo(command="uvx", args=["mcp-server-time", "--local-timezone=Europe/Berlin"])
# Create the toolset - this will automatically discover all available tools
# You can optionally specify which tools to include
mcp_toolset = MCPToolset(
server_info=server_info,
tool_names=["get_current_time"] # Only include the get_current_time tool
)
# Create a pipeline with the toolset
pipeline = Pipeline()
pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o-mini", tools=mcp_toolset))
pipeline.add_component("tool_invoker", ToolInvoker(tools=mcp_toolset))
pipeline.add_component(
"adapter",
OutputAdapter(
template="{{ initial_msg + initial_tool_messages + tool_messages }}",
output_type=list[ChatMessage],
unsafe=True,
),
)
pipeline.add_component("response_llm", OpenAIChatGenerator(model="gpt-4o-mini"))
pipeline.connect("llm.replies", "tool_invoker.messages")
pipeline.connect("llm.replies", "adapter.initial_tool_messages")
pipeline.connect("tool_invoker.tool_messages", "adapter.tool_messages")
pipeline.connect("adapter.output", "response_llm.messages")
# Run the pipeline with a user question
user_input = "What is the time in New York? Be brief."
user_input_msg = ChatMessage.from_user(text=user_input)
result = pipeline.run({"llm": {"messages": [user_input_msg]}, "adapter": {"initial_msg": [user_input_msg]}})
print(result["response_llm"]["replies"][0].text)
You can also use the toolset via Streamable HTTP to talk to remote servers:
from haystack_integrations.tools.mcp import MCPToolset, StreamableHttpServerInfo
# Create the toolset with streamable HTTP connection
toolset = MCPToolset(
server_info=StreamableHttpServerInfo(url="http://localhost:8000/mcp"),
tool_names=["multiply"] # Optional: only include specific tools
)
# Use the toolset as shown in the pipeline example above
Example using SSE (deprecated):
from haystack_integrations.tools.mcp import MCPToolset, SSEServerInfo
from haystack.components.tools import ToolInvoker
# Create the toolset with an SSE connection
sse_toolset = MCPToolset(
server_info=SSEServerInfo(url="http://some-remote-server.com:8000/sse"),
tool_names=["add", "subtract"] # Only include specific tools
)
# Use the toolset as shown in the pipeline example above
MCPToolset.__init__
def __init__(server_info: MCPServerInfo,
tool_names: list[str] | None = None,
connection_timeout: float = 30.0,
invocation_timeout: float = 30.0,
eager_connect: bool = False)
Initialize the MCP toolset.
Arguments:
server_info: Connection information for the MCP servertool_names: Optional list of tool names to include. If provided, only tools with matching names will be added to the toolset.connection_timeout: Timeout in seconds for server connectioninvocation_timeout: Default timeout in seconds for tool invocationseager_connect: If True, connect to server and load tools during initialization. If False (default), defer connection to warm_up.
Raises:
MCPToolNotFoundError: If any of the specified tool names are not found on the server
MCPToolset.warm_up
Connect and load tools when eager_connect is turned off.
This method is automatically called by ToolInvoker.warm_up() and Pipeline.warm_up().
You can also call it directly before using the toolset to ensure all tool schemas
are available without performing a real invocation.
MCPToolset.to_dict
Serialize the MCPToolset to a dictionary.
Returns:
A dictionary representation of the MCPToolset
MCPToolset.from_dict
Deserialize an MCPToolset from a dictionary.
Arguments:
data: Dictionary representation of the MCPToolset
Returns:
A new MCPToolset instance
MCPToolset.close
Close the underlying MCP client safely.
MCPToolset.__post_init__
Validate and set up the toolset after initialization.
This handles the case when tools are provided during initialization.
MCPToolset.__iter__
Return an iterator over the Tools in this Toolset.
This allows the Toolset to be used wherever a list of Tools is expected.
Returns:
An iterator yielding Tool instances
MCPToolset.__contains__
Check if a tool is in this Toolset.
Supports checking by:
- Tool instance: tool in toolset
- Tool name: "tool_name" in toolset
Arguments:
item: Tool instance or tool name string
Returns:
True if contained, False otherwise
MCPToolset.add
Add a new Tool or merge another Toolset.
Arguments:
tool: A Tool instance or another Toolset to add
Raises:
ValueError: If adding the tool would result in duplicate tool namesTypeError: If the provided object is not a Tool or Toolset
MCPToolset.__add__
Concatenate this Toolset with another Tool, Toolset, or list of Tools.
Arguments:
other: Another Tool, Toolset, or list of Tools to concatenate
Raises:
TypeError: If the other parameter is not a Tool, Toolset, or list of ToolsValueError: If the combination would result in duplicate tool names
Returns:
A new Toolset containing all tools
MCPToolset.__len__
Return the number of Tools in this Toolset.
Returns:
Number of Tools
MCPToolset.__getitem__
Get a Tool by index.
Arguments:
index: Index of the Tool to get
Returns:
The Tool at the specified index