Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 110 additions & 3 deletions fastapi_mcp/openapi/convert.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import logging
from typing import Any, Dict, List, Tuple
from typing import Any, Dict, List, Optional, Tuple

import mcp.types as types

Expand All @@ -9,6 +9,9 @@
generate_example_from_schema,
resolve_schema_references,
get_single_param_type_from_schema,
detect_form_encoded_content_type,
detect_multipart_content_type,
extract_form_field_names,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -52,12 +55,38 @@ def convert_openapi_to_mcp_tools(
logger.warning(f"Skipping operation with no operationId: {operation}")
continue

# Detect content type and form fields from request body
request_body = operation.get("requestBody", {})
try:
content_type, form_fields = _detect_content_type_and_form_fields(request_body)
if content_type:
logger.info(
"Content type detection successful for operation %s: %s with %d form fields",
operation_id,
content_type,
len(form_fields),
)
else:
logger.debug(
"No specific content type detected for operation %s, will use default JSON behavior",
operation_id,
)
except Exception as e:
logger.error(
"Content type detection failed for operation %s: %s. Using default JSON behavior",
operation_id,
str(e),
)
content_type, form_fields = None, []

# Save operation details for later HTTP calls
operation_map[operation_id] = {
"path": path,
"method": method,
"parameters": operation.get("parameters", []),
"request_body": operation.get("requestBody", {}),
"request_body": request_body,
"content_type": content_type,
"form_fields": form_fields,
}

summary = operation.get("summary", "")
Expand Down Expand Up @@ -113,7 +142,7 @@ def convert_openapi_to_mcp_tools(

# Check if content has examples
if "examples" in content_data:
for example_key, example_data in content_data["examples"].items():
for example_data in content_data["examples"].values():
if "value" in example_data:
example_response = example_data["value"]
break
Expand Down Expand Up @@ -265,3 +294,81 @@ def convert_openapi_to_mcp_tools(
tools.append(tool)

return tools, operation_map


def _detect_content_type_and_form_fields(request_body: Dict[str, Any]) -> Tuple[Optional[str], List[str]]:
"""
Detect the content type and form fields from a request body schema.

Args:
request_body: The requestBody section from OpenAPI operation

Returns:
A tuple of (content_type, form_fields) where:
- content_type is the detected content type or None
- form_fields is a list of form field names or empty list
"""
if not request_body or "content" not in request_body:
logger.debug("No request body or content found, using default JSON behavior")
return None, []

content = request_body["content"]
available_content_types = list(content.keys())
logger.debug("Available content types for analysis: %s", available_content_types)

# Priority order: form-encoded > multipart > JSON
detected_content_type = None

# Check for form-encoded first (highest priority)
if detect_form_encoded_content_type(request_body):
detected_content_type = "application/x-www-form-urlencoded"
logger.debug("Detected form-encoded content type (priority 1)")
# Check for multipart second
elif detect_multipart_content_type(request_body):
detected_content_type = "multipart/form-data"
logger.debug("Detected multipart content type (priority 2)")
# Check for JSON as fallback
elif "application/json" in content:
detected_content_type = "application/json"
logger.debug("Detected JSON content type (fallback)")

# If no supported content type found, log and return None
if not detected_content_type:
logger.warning(
"No supported content type found in %s, falling back to default JSON behavior", available_content_types
)
return None, []

# Extract form fields for form-based content types
form_fields = []
if detected_content_type in [
"application/x-www-form-urlencoded",
"multipart/form-data",
]:
try:
content_schema = content[detected_content_type].get("schema", {})
if not content_schema:
logger.warning("No schema found for %s content type, cannot extract form fields", detected_content_type)
return None, []

form_fields = extract_form_field_names(content_schema)
logger.debug(
"Successfully extracted %d form fields for %s: %s", len(form_fields), detected_content_type, form_fields
)

if not form_fields:
logger.warning(
"No form fields found in schema for %s, falling back to JSON behavior", detected_content_type
)
return None, []

except Exception as e:
logger.error(
"Failed to extract form fields from schema for %s: %s. Falling back to JSON behavior",
detected_content_type,
str(e),
)
return None, []

logger.info("Content type detection complete: %s with %d form fields", detected_content_type, len(form_fields))
return detected_content_type, form_fields
55 changes: 54 additions & 1 deletion fastapi_mcp/openapi/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict
from typing import Any, Dict, List


def get_single_param_type_from_schema(param_schema: Dict[str, Any]) -> str:
Expand Down Expand Up @@ -162,3 +162,56 @@ def generate_example_from_schema(schema: Dict[str, Any]) -> Any:

# Default case
return None


def detect_form_encoded_content_type(request_body: Dict[str, Any]) -> bool:
"""
Detect if a request body uses application/x-www-form-urlencoded content type.

Args:
request_body: The requestBody section from OpenAPI operation

Returns:
True if form-encoded content type is detected, False otherwise
"""
if not request_body or "content" not in request_body:
return False

content = request_body["content"]
return "application/x-www-form-urlencoded" in content


def detect_multipart_content_type(request_body: Dict[str, Any]) -> bool:
"""
Detect if a request body uses multipart/form-data content type.

Args:
request_body: The requestBody section from OpenAPI operation

Returns:
True if multipart content type is detected, False otherwise
"""
if not request_body or "content" not in request_body:
return False

content = request_body["content"]
return "multipart/form-data" in content


def extract_form_field_names(schema: Dict[str, Any]) -> List[str]:
"""
Extract form field names from schema properties.

Args:
schema: The schema object containing properties

Returns:
List of form field names, or empty list if no properties found
"""
if not schema or not isinstance(schema, dict):
return []

if "properties" not in schema:
return []

return list(schema["properties"].keys())
40 changes: 35 additions & 5 deletions fastapi_mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,11 +537,21 @@ async def _execute_api_tool(
if name.lower() in self._forward_headers:
headers[name] = value

body = arguments if arguments else None
# Separate form fields from other parameters based on operation metadata
content_type = operation.get("content_type")
form_fields = operation.get("form_fields", [])

body: Optional[Any]
if content_type in ["application/x-www-form-urlencoded", "multipart/form-data"] and form_fields:
# Only include form fields in the body for form-encoded requests
body = {k: v for k, v in arguments.items() if k in form_fields}
else:
# For JSON or other content types, include all remaining arguments
body = arguments or None

try:
logger.debug(f"Making {method.upper()} request to {path}")
response = await self._request(client, method, path, query, headers, body)
response = await self._request(client, method, path, query, headers, body, content_type)

# TODO: Better typing for the AsyncClientProtocol. It should return a ResponseProtocol that has a json() method that returns a dict/list/etc.
try:
Expand Down Expand Up @@ -577,20 +587,40 @@ async def _request(
query: Dict[str, Any],
headers: Dict[str, str],
body: Optional[Any],
content_type: Optional[str] = None,
) -> Any:
if method.lower() == "get":
return await client.get(path, params=query, headers=headers)
elif method.lower() == "post":
return await client.post(path, params=query, headers=headers, json=body)
return await self._request_with_body(client, "post", path, query, headers, body, content_type)
elif method.lower() == "put":
return await client.put(path, params=query, headers=headers, json=body)
return await self._request_with_body(client, "put", path, query, headers, body, content_type)
elif method.lower() == "delete":
return await client.delete(path, params=query, headers=headers)
elif method.lower() == "patch":
return await client.patch(path, params=query, headers=headers, json=body)
return await self._request_with_body(client, "patch", path, query, headers, body, content_type)
else:
raise ValueError(f"Unsupported HTTP method: {method}")

async def _request_with_body(
self,
client: httpx.AsyncClient,
method: str,
path: str,
query: Dict[str, Any],
headers: Dict[str, str],
body: Optional[Any],
content_type: Optional[str] = None,
) -> Any:
"""Handle requests with body content, using appropriate encoding based on content type."""
if content_type == "application/x-www-form-urlencoded":
return await client.request(method, path, params=query, headers=headers, data=body)
elif content_type == "multipart/form-data":
return await client.request(method, path, params=query, headers=headers, files=body)
else:
# Default to JSON for backward compatibility
return await client.request(method, path, params=query, headers=headers, json=body)

def _filter_tools(self, tools: List[types.Tool], openapi_schema: Dict[str, Any]) -> List[types.Tool]:
"""
Filter tools based on operation IDs and tags.
Expand Down
Loading