Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 187 additions & 21 deletions fastapi_mcp/openapi/convert.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import logging
from typing import Any, Dict, List, Tuple
from typing import Any, Dict, List, Optional, Tuple

import mcp.types as types

Expand All @@ -9,6 +9,9 @@
generate_example_from_schema,
resolve_schema_references,
get_single_param_type_from_schema,
detect_form_encoded_content_type,
detect_multipart_content_type,
extract_form_field_names,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -52,12 +55,40 @@ def convert_openapi_to_mcp_tools(
logger.warning(f"Skipping operation with no operationId: {operation}")
continue

# Detect content type and form fields from request body
request_body = operation.get("requestBody", {})
try:
content_type, form_fields = _detect_content_type_and_form_fields(
request_body
)
if content_type:
logger.info(
"Content type detection successful for operation %s: %s with %d form fields",
operation_id,
content_type,
len(form_fields),
)
else:
logger.debug(
"No specific content type detected for operation %s, will use default JSON behavior",
operation_id,
)
except Exception as e:
logger.error(
"Content type detection failed for operation %s: %s. Using default JSON behavior",
operation_id,
str(e),
)
content_type, form_fields = None, []

# Save operation details for later HTTP calls
operation_map[operation_id] = {
"path": path,
"method": method,
"parameters": operation.get("parameters", []),
"request_body": operation.get("requestBody", {}),
"request_body": request_body,
"content_type": content_type,
"form_fields": form_fields,
}

summary = operation.get("summary", "")
Expand Down Expand Up @@ -85,7 +116,9 @@ def convert_openapi_to_mcp_tools(
responses_to_include = responses
if not describe_all_responses and success_response:
# If we're not describing all responses, only include the success response
success_code = next((code for code in success_codes if str(code) in responses), None)
success_code = next(
(code for code in success_codes if str(code) in responses), None
)
if success_code:
responses_to_include = {str(success_code): success_response}

Expand All @@ -100,7 +133,9 @@ def convert_openapi_to_mcp_tools(

# Add schema information if available
if "content" in response_data:
for content_type, content_data in response_data["content"].items():
for content_type, content_data in response_data[
"content"
].items():
if "schema" in content_data:
schema = content_data["schema"]
response_info += f"\nContent-Type: {content_type}"
Expand All @@ -113,7 +148,9 @@ def convert_openapi_to_mcp_tools(

# Check if content has examples
if "examples" in content_data:
for example_key, example_data in content_data["examples"].items():
for example_data in content_data[
"examples"
].values():
if "value" in example_data:
example_response = example_data["value"]
break
Expand All @@ -123,33 +160,56 @@ def convert_openapi_to_mcp_tools(

# If we have an example response, add it to the docs
if example_response:
response_info += "\n\n**Example Response:**\n```json\n"
response_info += json.dumps(example_response, indent=2)
response_info += (
"\n\n**Example Response:**\n```json\n"
)
response_info += json.dumps(
example_response, indent=2
)
response_info += "\n```"
# Otherwise generate an example from the schema
else:
generated_example = generate_example_from_schema(display_schema)
generated_example = generate_example_from_schema(
display_schema
)
if generated_example:
response_info += "\n\n**Example Response:**\n```json\n"
response_info += json.dumps(generated_example, indent=2)
response_info += (
"\n\n**Example Response:**\n```json\n"
)
response_info += json.dumps(
generated_example, indent=2
)
response_info += "\n```"

# Only include full schema information if requested
if describe_full_response_schema:
# Format schema information based on its type
if display_schema.get("type") == "array" and "items" in display_schema:
if (
display_schema.get("type") == "array"
and "items" in display_schema
):
items_schema = display_schema["items"]

response_info += "\n\n**Output Schema:** Array of items with the following structure:\n```json\n"
response_info += json.dumps(items_schema, indent=2)
response_info += json.dumps(
items_schema, indent=2
)
response_info += "\n```"
elif "properties" in display_schema:
response_info += "\n\n**Output Schema:**\n```json\n"
response_info += json.dumps(display_schema, indent=2)
response_info += (
"\n\n**Output Schema:**\n```json\n"
)
response_info += json.dumps(
display_schema, indent=2
)
response_info += "\n```"
else:
response_info += "\n\n**Output Schema:**\n```json\n"
response_info += json.dumps(display_schema, indent=2)
response_info += (
"\n\n**Output Schema:**\n```json\n"
)
response_info += json.dumps(
display_schema, indent=2
)
response_info += "\n```"

tool_description += response_info
Expand Down Expand Up @@ -200,7 +260,9 @@ def convert_openapi_to_mcp_tools(
for param_name, param in path_params:
param_schema = param.get("schema", {})
param_desc = param.get("description", "")
param_required = param.get("required", True) # Path params are usually required
param_required = param.get(
"required", True
) # Path params are usually required

properties[param_name] = param_schema.copy()
properties[param_name]["title"] = param_name
Expand All @@ -225,7 +287,9 @@ def convert_openapi_to_mcp_tools(
properties[param_name]["description"] = param_desc

if "type" not in properties[param_name]:
properties[param_name]["type"] = get_single_param_type_from_schema(param_schema)
properties[param_name]["type"] = get_single_param_type_from_schema(
param_schema
)

if "default" in param_schema:
properties[param_name]["default"] = param_schema["default"]
Expand All @@ -245,7 +309,9 @@ def convert_openapi_to_mcp_tools(
properties[param_name]["description"] = param_desc

if "type" not in properties[param_name]:
properties[param_name]["type"] = get_single_param_type_from_schema(param_schema)
properties[param_name]["type"] = get_single_param_type_from_schema(
param_schema
)

if "default" in param_schema:
properties[param_name]["default"] = param_schema["default"]
Expand All @@ -254,14 +320,114 @@ def convert_openapi_to_mcp_tools(
required_props.append(param_name)

# Create a proper input schema for the tool
input_schema = {"type": "object", "properties": properties, "title": f"{operation_id}Arguments"}
input_schema = {
"type": "object",
"properties": properties,
"title": f"{operation_id}Arguments",
}

if required_props:
input_schema["required"] = required_props

# Create the MCP tool definition
tool = types.Tool(name=operation_id, description=tool_description, inputSchema=input_schema)
tool = types.Tool(
name=operation_id,
description=tool_description,
inputSchema=input_schema,
)

tools.append(tool)

return tools, operation_map


def _detect_content_type_and_form_fields(
request_body: Optional[Dict[str, Any]]
) -> Tuple[Optional[str], List[str]]:
"""
Detect the content type and form fields from a request body schema.

Args:
request_body: The requestBody section from OpenAPI operation

Returns:
A tuple of (content_type, form_fields) where:
- content_type is the detected content type or None
- form_fields is a list of form field names or empty list
"""
if not request_body or "content" not in request_body:
logger.debug("No request body or content found, using default JSON behavior")
return None, []

content = request_body["content"]
available_content_types = list(content.keys())
logger.debug("Available content types for analysis: %s", available_content_types)

# Priority order: form-encoded > multipart > JSON
detected_content_type = None

# Check for form-encoded first (highest priority)
if detect_form_encoded_content_type(request_body):
detected_content_type = "application/x-www-form-urlencoded"
logger.debug("Detected form-encoded content type (priority 1)")
# Check for multipart second
elif detect_multipart_content_type(request_body):
detected_content_type = "multipart/form-data"
logger.debug("Detected multipart content type (priority 2)")
# Check for JSON as fallback
elif "application/json" in content:
detected_content_type = "application/json"
logger.debug("Detected JSON content type (fallback)")

# If no supported content type found, log and return None
if not detected_content_type:
logger.warning(
"No supported content type found in %s, falling back to default JSON behavior",
available_content_types,
)
return None, []

# Extract form fields for form-based content types
form_fields = []
if detected_content_type in [
"application/x-www-form-urlencoded",
"multipart/form-data",
]:
try:
content_schema = content[detected_content_type].get("schema", {})
if not content_schema:
logger.warning(
"No schema found for %s content type, cannot extract form fields",
detected_content_type,
)
return None, []

form_fields = extract_form_field_names(content_schema)
logger.debug(
"Successfully extracted %d form fields for %s: %s",
len(form_fields),
detected_content_type,
form_fields,
)

if not form_fields:
logger.warning(
"No form fields found in schema for %s, falling back to JSON behavior",
detected_content_type,
)
return None, []

except Exception as e:
logger.error(
"Failed to extract form fields from schema for %s: %s. Falling back to JSON behavior",
detected_content_type,
str(e),
)
return None, []

logger.info(
"Content type detection complete: %s with %d form fields",
detected_content_type,
len(form_fields),
)
return detected_content_type, form_fields
55 changes: 54 additions & 1 deletion fastapi_mcp/openapi/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict
from typing import Any, Dict, List


def get_single_param_type_from_schema(param_schema: Dict[str, Any]) -> str:
Expand Down Expand Up @@ -162,3 +162,56 @@ def generate_example_from_schema(schema: Dict[str, Any]) -> Any:

# Default case
return None


def detect_form_encoded_content_type(request_body: Dict[str, Any]) -> bool:
"""
Detect if a request body uses application/x-www-form-urlencoded content type.

Args:
request_body: The requestBody section from OpenAPI operation

Returns:
True if form-encoded content type is detected, False otherwise
"""
if not request_body or "content" not in request_body:
return False

content = request_body["content"]
return "application/x-www-form-urlencoded" in content


def detect_multipart_content_type(request_body: Dict[str, Any]) -> bool:
"""
Detect if a request body uses multipart/form-data content type.

Args:
request_body: The requestBody section from OpenAPI operation

Returns:
True if multipart content type is detected, False otherwise
"""
if not request_body or "content" not in request_body:
return False

content = request_body["content"]
return "multipart/form-data" in content


def extract_form_field_names(schema: Dict[str, Any]) -> List[str]:
"""
Extract form field names from schema properties.

Args:
schema: The schema object containing properties

Returns:
List of form field names, or empty list if no properties found
"""
if not schema or not isinstance(schema, dict):
return []

if "properties" not in schema:
return []

return list(schema["properties"].keys())
Loading