http block support multipart (#4259)
This commit is contained in:
@@ -228,7 +228,7 @@ def get_path_for_workflow_download_directory(run_id: str | None) -> Path:
|
||||
|
||||
|
||||
def get_download_dir(run_id: str | None) -> str:
|
||||
download_dir = f"{REPO_ROOT_DIR}/downloads/{run_id}"
|
||||
download_dir = os.path.join(settings.DOWNLOAD_PATH, str(run_id))
|
||||
os.makedirs(download_dir, exist_ok=True)
|
||||
return download_dir
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import asyncio
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
import aiofiles
|
||||
import aiohttp
|
||||
import structlog
|
||||
|
||||
@@ -15,9 +17,9 @@ async def aiohttp_request(
|
||||
url: str,
|
||||
headers: dict[str, str] | None = None,
|
||||
*,
|
||||
body: dict[str, Any] | str | None = None,
|
||||
data: dict[str, Any] | None = None,
|
||||
data: Any | None = None,
|
||||
json_data: dict[str, Any] | None = None,
|
||||
files: dict[str, str] | None = None,
|
||||
cookies: dict[str, str] | None = None,
|
||||
timeout: int = DEFAULT_REQUEST_TIMEOUT,
|
||||
follow_redirects: bool = True,
|
||||
@@ -26,14 +28,28 @@ async def aiohttp_request(
|
||||
"""
|
||||
Generic HTTP request function that supports all HTTP methods.
|
||||
|
||||
Args:
|
||||
method: HTTP method (GET, POST, etc.)
|
||||
url: Request URL
|
||||
headers: Request headers
|
||||
data: Request body data (dict for form data, or other types)
|
||||
json_data: JSON data to send (takes precedence over data)
|
||||
files: Dictionary mapping field names to file paths for multipart file uploads
|
||||
cookies: Request cookies
|
||||
timeout: Request timeout in seconds
|
||||
follow_redirects: Whether to follow redirects
|
||||
proxy: Proxy URL
|
||||
|
||||
Returns:
|
||||
Tuple of (status_code, response_headers, response_body)
|
||||
where response_body can be dict (for JSON) or str (for text)
|
||||
"""
|
||||
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session:
|
||||
request_kwargs = {
|
||||
# Ensure headers is always a dict for type safety
|
||||
headers_dict: dict[str, str] = headers or {}
|
||||
request_kwargs: dict[str, Any] = {
|
||||
"url": url,
|
||||
"headers": headers or {},
|
||||
"headers": headers_dict,
|
||||
"cookies": cookies,
|
||||
"proxy": proxy,
|
||||
"allow_redirects": follow_redirects,
|
||||
@@ -41,17 +57,41 @@ async def aiohttp_request(
|
||||
|
||||
# Handle body based on content type and method
|
||||
if method.upper() != "GET":
|
||||
# If files are provided, use multipart/form-data
|
||||
if files is not None:
|
||||
form = aiohttp.FormData()
|
||||
|
||||
# Add files to form
|
||||
for field_name, file_path in files.items():
|
||||
if not os.path.exists(file_path):
|
||||
raise FileNotFoundError(f"File not found: {file_path}")
|
||||
|
||||
filename = os.path.basename(file_path)
|
||||
async with aiofiles.open(file_path, "rb") as f:
|
||||
file_content = await f.read()
|
||||
form.add_field(field_name, file_content, filename=filename)
|
||||
|
||||
# Add data fields to form if provided
|
||||
if data is not None and isinstance(data, dict):
|
||||
for key, value in data.items():
|
||||
form.add_field(key, str(value))
|
||||
|
||||
request_kwargs["data"] = form
|
||||
# Remove Content-Type header if present, let aiohttp set it for multipart
|
||||
# headers_dict is already typed as dict[str, str] from initialization
|
||||
if "Content-Type" in headers_dict:
|
||||
del headers_dict["Content-Type"]
|
||||
if "content-type" in headers_dict:
|
||||
del headers_dict["content-type"]
|
||||
# Explicit overrides first
|
||||
if json_data is not None:
|
||||
elif json_data is not None:
|
||||
request_kwargs["json"] = json_data
|
||||
elif data is not None:
|
||||
request_kwargs["data"] = data
|
||||
elif body is not None:
|
||||
content_type = (headers or {}).get("Content-Type") or (headers or {}).get("content-type") or ""
|
||||
if "application/x-www-form-urlencoded" in content_type.lower():
|
||||
request_kwargs["data"] = body
|
||||
content_type = headers_dict.get("Content-Type") or headers_dict.get("content-type") or ""
|
||||
if "application/json" in content_type.lower():
|
||||
request_kwargs["json"] = data
|
||||
else:
|
||||
request_kwargs["json"] = body
|
||||
request_kwargs["data"] = data
|
||||
|
||||
async with session.request(method.upper(), **request_kwargs) as response:
|
||||
response_headers = dict(response.headers)
|
||||
|
||||
@@ -15,7 +15,7 @@ from datetime import datetime
|
||||
from email.message import EmailMessage
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
from typing import Annotated, Any, Awaitable, Callable, Literal, Union
|
||||
from typing import Annotated, Any, Awaitable, Callable, ClassVar, Literal, Union, cast
|
||||
from urllib.parse import quote, urlparse
|
||||
|
||||
import filetype
|
||||
@@ -55,6 +55,7 @@ from skyvern.forge.sdk.api.files import (
|
||||
download_file,
|
||||
download_from_s3,
|
||||
get_path_for_workflow_download_directory,
|
||||
parse_uri_to_path,
|
||||
)
|
||||
from skyvern.forge.sdk.api.llm.api_handler import LLMAPIHandler
|
||||
from skyvern.forge.sdk.api.llm.api_handler_factory import LLMAPIHandlerFactory
|
||||
@@ -3781,12 +3782,36 @@ class HttpRequestBlock(Block):
|
||||
url: str | None = None
|
||||
headers: dict[str, str] | None = None
|
||||
body: dict[str, Any] | None = None # Changed to consistently be dict only
|
||||
files: dict[str, str] | None = None # Dictionary mapping field names to file paths for multipart file uploads
|
||||
timeout: int = 30
|
||||
follow_redirects: bool = True
|
||||
|
||||
# Parameters for templating
|
||||
parameters: list[PARAMETER_TYPE] = []
|
||||
|
||||
# Allowed directories for local file access (class variable, not a Pydantic field)
|
||||
_allowed_dirs: ClassVar[list[str] | None] = None
|
||||
|
||||
@classmethod
|
||||
def get_allowed_dirs(cls) -> list[str]:
|
||||
"""Get the list of allowed directories for local file access.
|
||||
Computed once and cached for performance.
|
||||
"""
|
||||
if cls._allowed_dirs is None:
|
||||
allowed_dirs: list[str] = []
|
||||
if settings.ARTIFACT_STORAGE_PATH:
|
||||
allowed_dirs.append(os.path.abspath(settings.ARTIFACT_STORAGE_PATH))
|
||||
if settings.VIDEO_PATH:
|
||||
allowed_dirs.append(os.path.abspath(settings.VIDEO_PATH))
|
||||
if settings.HAR_PATH:
|
||||
allowed_dirs.append(os.path.abspath(settings.HAR_PATH))
|
||||
if settings.LOG_PATH:
|
||||
allowed_dirs.append(os.path.abspath(settings.LOG_PATH))
|
||||
if settings.DOWNLOAD_PATH:
|
||||
allowed_dirs.append(os.path.abspath(settings.DOWNLOAD_PATH))
|
||||
cls._allowed_dirs = allowed_dirs
|
||||
return cls._allowed_dirs or []
|
||||
|
||||
def get_all_parameters(
|
||||
self,
|
||||
workflow_run_id: str,
|
||||
@@ -3818,6 +3843,14 @@ class HttpRequestBlock(Block):
|
||||
value, workflow_run_context, **template_kwargs
|
||||
)
|
||||
|
||||
if self.files:
|
||||
# Format file paths in files dictionary
|
||||
for field_name, file_path in self.files.items():
|
||||
if isinstance(file_path, str):
|
||||
self.files[field_name] = self.format_block_parameter_template_from_workflow_run_context(
|
||||
file_path, workflow_run_context, **template_kwargs
|
||||
)
|
||||
|
||||
if self.headers:
|
||||
for key, value in self.headers.items():
|
||||
self.headers[key] = self.format_block_parameter_template_from_workflow_run_context(
|
||||
@@ -3877,6 +3910,132 @@ class HttpRequestBlock(Block):
|
||||
organization_id=organization_id,
|
||||
)
|
||||
|
||||
# Add default content-type as application/json if not provided (unless files are being uploaded)
|
||||
if not self.headers:
|
||||
self.headers = {}
|
||||
|
||||
# If files are provided, don't set default Content-Type (aiohttp will set multipart/form-data)
|
||||
if not self.files:
|
||||
if not self.headers.get("Content-Type") or not self.headers.get("content-type"):
|
||||
LOG.info("Adding default content-type as application/json", headers=self.headers)
|
||||
self.headers["Content-Type"] = "application/json"
|
||||
|
||||
# Download files from HTTP URLs or S3 URIs if needed
|
||||
# Also allow local files from allowed directories (ARTIFACT_STORAGE_PATH, VIDEO_PATH, HAR_PATH, LOG_PATH)
|
||||
if self.files:
|
||||
downloaded_files: dict[str, str] = {}
|
||||
for field_name, file_path in self.files.items():
|
||||
# Parse file path (handle file:// URI format)
|
||||
actual_file_path: str | None = None
|
||||
is_file_uri = file_path.startswith("file://")
|
||||
|
||||
if is_file_uri:
|
||||
try:
|
||||
actual_file_path = parse_uri_to_path(file_path)
|
||||
except ValueError as e:
|
||||
return await self.build_block_result(
|
||||
success=False,
|
||||
failure_reason=f"Invalid file URI format: {file_path}. Error: {str(e)}",
|
||||
output_parameter_value=None,
|
||||
status=BlockStatus.failed,
|
||||
workflow_run_block_id=workflow_run_block_id,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
else:
|
||||
actual_file_path = file_path
|
||||
|
||||
# Check if file_path is a URL or S3 URI
|
||||
is_url = (
|
||||
file_path.startswith("http://") or file_path.startswith("https://") or file_path.startswith("www.")
|
||||
)
|
||||
is_s3_uri = file_path.startswith("s3://")
|
||||
|
||||
# Check if file is in allowed directories
|
||||
is_allowed_local_file = False
|
||||
if actual_file_path:
|
||||
# Convert to absolute path for comparison (handles both absolute and relative paths)
|
||||
abs_file_path = os.path.abspath(actual_file_path)
|
||||
|
||||
# Get allowed directory paths (using class method for cached result)
|
||||
allowed_dirs = self.get_allowed_dirs()
|
||||
LOG.debug("HttpRequestBlock: Allowed directories", allowed_dirs=allowed_dirs)
|
||||
|
||||
# Check if file is within any allowed directory
|
||||
for allowed_dir in allowed_dirs:
|
||||
# Use os.path.commonpath to check if file is within allowed directory
|
||||
try:
|
||||
common_path = os.path.commonpath([abs_file_path, allowed_dir])
|
||||
if common_path == allowed_dir:
|
||||
is_allowed_local_file = True
|
||||
break
|
||||
except ValueError:
|
||||
# Paths are on different drives (Windows) or incompatible
|
||||
continue
|
||||
|
||||
# If not URL, S3 URI, or allowed local file, reject
|
||||
if not (is_url or is_s3_uri or is_allowed_local_file):
|
||||
return await self.build_block_result(
|
||||
success=False,
|
||||
failure_reason=f"No permission to access local file: {file_path}. Only HTTP/HTTPS URLs, S3 URIs, or files in allowed directories are allowed.",
|
||||
output_parameter_value=None,
|
||||
status=BlockStatus.failed,
|
||||
workflow_run_block_id=workflow_run_block_id,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
|
||||
# Handle different file sources
|
||||
if is_allowed_local_file:
|
||||
# Use local file directly
|
||||
local_file_path_str: str = cast(str, actual_file_path)
|
||||
if not os.path.exists(local_file_path_str):
|
||||
return await self.build_block_result(
|
||||
success=False,
|
||||
failure_reason=f"File not found: {local_file_path_str}",
|
||||
output_parameter_value=None,
|
||||
status=BlockStatus.failed,
|
||||
workflow_run_block_id=workflow_run_block_id,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
downloaded_files[field_name] = local_file_path_str
|
||||
LOG.info(
|
||||
"HttpRequestBlock: Using allowed local file",
|
||||
field_name=field_name,
|
||||
file_path=local_file_path_str,
|
||||
)
|
||||
else:
|
||||
# Download from remote source
|
||||
try:
|
||||
LOG.info(
|
||||
"HttpRequestBlock: Downloading file from remote source",
|
||||
field_name=field_name,
|
||||
file_path=file_path,
|
||||
is_url=is_url,
|
||||
is_s3_uri=is_s3_uri,
|
||||
)
|
||||
if is_s3_uri:
|
||||
local_file_path = await download_from_s3(self.get_async_aws_client(), file_path)
|
||||
else:
|
||||
local_file_path = await download_file(file_path)
|
||||
downloaded_files[field_name] = local_file_path
|
||||
LOG.info(
|
||||
"HttpRequestBlock: File downloaded successfully",
|
||||
field_name=field_name,
|
||||
original_path=file_path,
|
||||
local_path=local_file_path,
|
||||
)
|
||||
except Exception as e:
|
||||
return await self.build_block_result(
|
||||
success=False,
|
||||
failure_reason=f"Failed to download file {file_path}: {str(e)}",
|
||||
output_parameter_value=None,
|
||||
status=BlockStatus.failed,
|
||||
workflow_run_block_id=workflow_run_block_id,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
|
||||
# Update self.files with local file paths
|
||||
self.files = downloaded_files
|
||||
|
||||
# Execute HTTP request using the generic aiohttp_request function
|
||||
try:
|
||||
LOG.info(
|
||||
@@ -3886,6 +4045,7 @@ class HttpRequestBlock(Block):
|
||||
headers=self.headers,
|
||||
workflow_run_id=workflow_run_id,
|
||||
body=self.body,
|
||||
files=self.files,
|
||||
)
|
||||
|
||||
# Use the generic aiohttp_request function
|
||||
@@ -3893,7 +4053,8 @@ class HttpRequestBlock(Block):
|
||||
method=self.method,
|
||||
url=self.url,
|
||||
headers=self.headers,
|
||||
body=self.body,
|
||||
data=self.body,
|
||||
files=self.files,
|
||||
timeout=self.timeout,
|
||||
follow_redirects=self.follow_redirects,
|
||||
)
|
||||
|
||||
@@ -3646,6 +3646,7 @@ class WorkflowService:
|
||||
url=block_yaml.url,
|
||||
headers=block_yaml.headers,
|
||||
body=block_yaml.body,
|
||||
files=block_yaml.files,
|
||||
timeout=block_yaml.timeout,
|
||||
follow_redirects=block_yaml.follow_redirects,
|
||||
parameters=http_request_block_parameters,
|
||||
|
||||
Reference in New Issue
Block a user