[Feature] Adding Azure Blob Storage support to File Upload workflow block (#3130)

This commit is contained in:
Trevor Sullivan
2025-08-07 22:59:37 -06:00
committed by GitHub
parent 71f71b8e77
commit b3e17c12b3
17 changed files with 667 additions and 169 deletions

View File

@@ -14,6 +14,7 @@ from skyvern.exceptions import (
)
from skyvern.forge import app
from skyvern.forge.sdk.api.aws import AsyncAWSClient
from skyvern.forge.sdk.api.azure import AsyncAzureClient
from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType
from skyvern.forge.sdk.schemas.credentials import PasswordCredential
from skyvern.forge.sdk.schemas.organizations import Organization
@@ -24,6 +25,7 @@ from skyvern.forge.sdk.workflow.exceptions import OutputParameterKeyCollisionErr
from skyvern.forge.sdk.workflow.models.parameter import (
PARAMETER_TYPE,
AWSSecretParameter,
AzureSecretParameter,
BitwardenCreditCardDataParameter,
BitwardenLoginCredentialParameter,
BitwardenSensitiveInformationParameter,
@@ -50,6 +52,7 @@ class WorkflowRunContext:
async def init(
cls,
aws_client: AsyncAWSClient,
azure_client: AsyncAzureClient | None,
organization: Organization,
workflow_parameter_tuples: list[tuple[WorkflowParameter, "WorkflowRunParameter"]],
workflow_output_parameters: list[OutputParameter],
@@ -63,7 +66,7 @@ class WorkflowRunContext:
],
) -> Self:
# key is label name
workflow_run_context = cls(aws_client=aws_client)
workflow_run_context = cls(aws_client=aws_client, azure_client=azure_client)
for parameter, run_parameter in workflow_parameter_tuples:
if parameter.workflow_parameter_type == WorkflowParameterType.CREDENTIAL_ID:
await workflow_run_context.register_secret_workflow_parameter_value(
@@ -88,6 +91,8 @@ class WorkflowRunContext:
for secrete_parameter in secret_parameters:
if isinstance(secrete_parameter, AWSSecretParameter):
await workflow_run_context.register_aws_secret_parameter_value(secrete_parameter)
elif isinstance(secrete_parameter, AzureSecretParameter):
await workflow_run_context.register_azure_secret_parameter_value(secrete_parameter)
elif isinstance(secrete_parameter, CredentialParameter):
await workflow_run_context.register_credential_parameter_value(secrete_parameter, organization)
elif isinstance(secrete_parameter, OnePasswordCredentialParameter):
@@ -115,12 +120,13 @@ class WorkflowRunContext:
return workflow_run_context
def __init__(self, aws_client: AsyncAWSClient) -> None:
def __init__(self, aws_client: AsyncAWSClient, azure_client: AsyncAzureClient | None) -> None:
self.blocks_metadata: dict[str, BlockMetadata] = {}
self.parameters: dict[str, PARAMETER_TYPE] = {}
self.values: dict[str, Any] = {}
self.secrets: dict[str, Any] = {}
self._aws_client = aws_client
self._azure_client = azure_client
def get_parameter(self, key: str) -> Parameter:
return self.parameters[key]
@@ -316,6 +322,23 @@ class WorkflowRunContext:
self.values[parameter.key] = random_secret_id
self.parameters[parameter.key] = parameter
async def register_azure_secret_parameter_value(
self,
parameter: AzureSecretParameter,
) -> None:
# If the parameter is an Azure secret, fetch the secret value and store it in the secrets dict
# The value of the parameter will be the random secret id with format `secret_<uuid>`.
# We'll replace the random secret id with the actual secret value when we need to use it.
if self._azure_client is None:
LOG.error("Azure client not initialized, cannot register Azure secret parameter value")
raise ValueError("Azure client not initialized")
secret_value = await self._azure_client.get_secret(parameter.azure_key)
if secret_value is not None:
random_secret_id = self.generate_random_secret_id()
self.secrets[random_secret_id] = secret_value
self.values[parameter.key] = random_secret_id
self.parameters[parameter.key] = parameter
async def register_onepassword_credential_parameter_value(
self, parameter: OnePasswordCredentialParameter, organization: Organization
) -> None:
@@ -801,6 +824,7 @@ class WorkflowRunContext:
parameter,
(
AWSSecretParameter,
AzureSecretParameter,
BitwardenLoginCredentialParameter,
BitwardenCreditCardDataParameter,
BitwardenSensitiveInformationParameter,
@@ -823,6 +847,7 @@ class WorkflowRunContext:
class WorkflowContextManager:
aws_client: AsyncAWSClient
azure_client: AsyncAzureClient | None
workflow_run_contexts: dict[str, WorkflowRunContext]
parameters: dict[str, PARAMETER_TYPE]
@@ -831,6 +856,12 @@ class WorkflowContextManager:
def __init__(self) -> None:
self.aws_client = AsyncAWSClient()
self.azure_client = None
if settings.AZURE_STORAGE_ACCOUNT_NAME and settings.AZURE_STORAGE_ACCOUNT_KEY:
self.azure_client = AsyncAzureClient(
account_name=settings.AZURE_STORAGE_ACCOUNT_NAME,
account_key=settings.AZURE_STORAGE_ACCOUNT_KEY,
)
self.workflow_run_contexts = {}
def _validate_workflow_run_context(self, workflow_run_id: str) -> None:
@@ -854,6 +885,7 @@ class WorkflowContextManager:
) -> WorkflowRunContext:
workflow_run_context = await WorkflowRunContext.init(
self.aws_client,
self.azure_client,
organization,
workflow_parameter_tuples,
workflow_output_parameters,

View File

@@ -31,7 +31,11 @@ from pypdf import PdfReader
from pypdf.errors import PdfReadError
from skyvern.config import settings
from skyvern.constants import GET_DOWNLOADED_FILES_TIMEOUT, MAX_UPLOAD_FILE_COUNT
from skyvern.constants import (
AZURE_BLOB_STORAGE_MAX_UPLOAD_FILE_COUNT,
GET_DOWNLOADED_FILES_TIMEOUT,
MAX_UPLOAD_FILE_COUNT,
)
from skyvern.exceptions import (
ContextParameterValueNotFound,
MissingBrowserState,
@@ -43,6 +47,7 @@ from skyvern.exceptions import (
from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.api.aws import AsyncAWSClient
from skyvern.forge.sdk.api.azure import AsyncAzureClient
from skyvern.forge.sdk.api.files import (
calculate_sha256_for_file,
create_named_temporary_file,
@@ -1872,6 +1877,9 @@ class FileUploadBlock(Block):
aws_access_key_id: str | None = None
aws_secret_access_key: str | None = None
region_name: str | None = None
azure_storage_account_name: str | None = None
azure_storage_account_key: str | None = None
azure_blob_container_name: str | None = None
path: str | None = None
def get_all_parameters(
@@ -1893,6 +1901,15 @@ class FileUploadBlock(Block):
if self.aws_secret_access_key and workflow_run_context.has_parameter(self.aws_secret_access_key):
parameters.append(workflow_run_context.get_parameter(self.aws_secret_access_key))
if self.azure_storage_account_name and workflow_run_context.has_parameter(self.azure_storage_account_name):
parameters.append(workflow_run_context.get_parameter(self.azure_storage_account_name))
if self.azure_storage_account_key and workflow_run_context.has_parameter(self.azure_storage_account_key):
parameters.append(workflow_run_context.get_parameter(self.azure_storage_account_key))
if self.azure_blob_container_name and workflow_run_context.has_parameter(self.azure_blob_container_name):
parameters.append(workflow_run_context.get_parameter(self.azure_blob_container_name))
return parameters
def format_potential_template_parameters(self, workflow_run_context: WorkflowRunContext) -> None:
@@ -1910,6 +1927,18 @@ class FileUploadBlock(Block):
self.aws_secret_access_key = self.format_block_parameter_template_from_workflow_run_context(
self.aws_secret_access_key, workflow_run_context
)
if self.azure_storage_account_name:
self.azure_storage_account_name = self.format_block_parameter_template_from_workflow_run_context(
self.azure_storage_account_name, workflow_run_context
)
if self.azure_storage_account_key:
self.azure_storage_account_key = self.format_block_parameter_template_from_workflow_run_context(
self.azure_storage_account_key, workflow_run_context
)
if self.azure_blob_container_name:
self.azure_blob_container_name = self.format_block_parameter_template_from_workflow_run_context(
self.azure_blob_container_name, workflow_run_context
)
def _get_s3_uri(self, workflow_run_id: str, path: str) -> str:
s3_suffix = f"{workflow_run_id}/{uuid.uuid4()}_{Path(path).name}"
@@ -1917,6 +1946,10 @@ class FileUploadBlock(Block):
return f"s3://{self.s3_bucket}/{s3_suffix}"
return f"s3://{self.s3_bucket}/{self.path}/{s3_suffix}"
def _get_azure_blob_uri(self, workflow_run_id: str, file_path: str) -> str:
blob_name = Path(file_path).name
return f"https://{self.azure_storage_account_name}.blob.core.windows.net/{self.azure_blob_container_name}/{workflow_run_id}/{uuid.uuid4()}_{blob_name}"
async def execute(
self,
workflow_run_id: str,
@@ -1930,12 +1963,29 @@ class FileUploadBlock(Block):
# get all parameters into a dictionary
# data validate before uploading
missing_parameters = []
if not self.s3_bucket:
missing_parameters.append("s3_bucket")
if not self.aws_access_key_id:
missing_parameters.append("aws_access_key_id")
if not self.aws_secret_access_key:
missing_parameters.append("aws_secret_access_key")
if self.storage_type == FileStorageType.S3:
if not self.s3_bucket:
missing_parameters.append("s3_bucket")
if not self.aws_access_key_id:
missing_parameters.append("aws_access_key_id")
if not self.aws_secret_access_key:
missing_parameters.append("aws_secret_access_key")
elif self.storage_type == FileStorageType.AZURE:
if not self.azure_storage_account_name or self.azure_storage_account_name == "":
missing_parameters.append("azure_storage_account_name")
if not self.azure_storage_account_key or self.azure_storage_account_key == "":
missing_parameters.append("azure_storage_account_key")
if not self.azure_blob_container_name or self.azure_blob_container_name == "":
missing_parameters.append("azure_blob_container_name")
else:
return await self.build_block_result(
success=False,
failure_reason=f"Unsupported storage type: {self.storage_type}",
output_parameter_value=None,
status=BlockStatus.failed,
workflow_run_block_id=workflow_run_block_id,
organization_id=organization_id,
)
if missing_parameters:
return await self.build_block_result(
@@ -1961,57 +2011,87 @@ class FileUploadBlock(Block):
download_files_path = str(get_path_for_workflow_download_directory(workflow_run_id).absolute())
s3_uris = []
uploaded_uris = []
try:
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
actual_aws_access_key_id = (
workflow_run_context.get_original_secret_value_or_none(self.aws_access_key_id) or self.aws_access_key_id
)
actual_aws_secret_access_key = (
workflow_run_context.get_original_secret_value_or_none(self.aws_secret_access_key)
or self.aws_secret_access_key
)
client = AsyncAWSClient(
aws_access_key_id=actual_aws_access_key_id,
aws_secret_access_key=actual_aws_secret_access_key,
region_name=self.region_name,
)
# is the file path a file or a directory?
files_to_upload = []
if os.path.isdir(download_files_path):
# get all files in the directory, if there are more than 25 files, we will not upload them
files = os.listdir(download_files_path)
if len(files) > MAX_UPLOAD_FILE_COUNT:
raise ValueError("Too many files in the directory, not uploading")
max_file_count = (
MAX_UPLOAD_FILE_COUNT
if self.storage_type == FileStorageType.S3
else AZURE_BLOB_STORAGE_MAX_UPLOAD_FILE_COUNT
)
if len(files) > max_file_count:
raise ValueError(f"Too many files in the directory, not uploading. Max: {max_file_count}")
for file in files:
# if the file is a directory, we will not upload it
if os.path.isdir(os.path.join(download_files_path, file)):
LOG.warning("FileUploadBlock: Skipping directory", file=file)
continue
file_path = os.path.join(download_files_path, file)
s3_uri = self._get_s3_uri(workflow_run_id, file_path)
s3_uris.append(s3_uri)
await client.upload_file_from_path(uri=s3_uri, file_path=file_path, raise_exception=True)
files_to_upload.append(os.path.join(download_files_path, file))
else:
s3_uri = self._get_s3_uri(workflow_run_id, download_files_path)
s3_uris.append(s3_uri)
await client.upload_file_from_path(uri=s3_uri, file_path=download_files_path, raise_exception=True)
files_to_upload.append(download_files_path)
if self.storage_type == FileStorageType.S3:
actual_aws_access_key_id = (
workflow_run_context.get_original_secret_value_or_none(self.aws_access_key_id)
or self.aws_access_key_id
)
actual_aws_secret_access_key = (
workflow_run_context.get_original_secret_value_or_none(self.aws_secret_access_key)
or self.aws_secret_access_key
)
aws_client = AsyncAWSClient(
aws_access_key_id=actual_aws_access_key_id,
aws_secret_access_key=actual_aws_secret_access_key,
region_name=self.region_name,
)
for file_path in files_to_upload:
s3_uri = self._get_s3_uri(workflow_run_id, file_path)
uploaded_uris.append(s3_uri)
await aws_client.upload_file_from_path(uri=s3_uri, file_path=file_path, raise_exception=True)
LOG.info("FileUploadBlock: File(s) uploaded to S3", file_path=self.path)
elif self.storage_type == FileStorageType.AZURE:
actual_azure_storage_account_name = (
workflow_run_context.get_original_secret_value_or_none(self.azure_storage_account_name)
or self.azure_storage_account_name
)
actual_azure_storage_account_key = (
workflow_run_context.get_original_secret_value_or_none(self.azure_storage_account_key)
or self.azure_storage_account_key
)
azure_client = AsyncAzureClient(
account_name=actual_azure_storage_account_name or "",
account_key=actual_azure_storage_account_key or "",
)
for file_path in files_to_upload:
blob_name = Path(file_path).name
azure_uri = self._get_azure_blob_uri(workflow_run_id, file_path)
uploaded_uris.append(azure_uri)
await azure_client.upload_file_from_path(
container_name=self.azure_blob_container_name or "", blob_name=blob_name, file_path=file_path
)
LOG.info("FileUploadBlock: File(s) uploaded to Azure Blob Storage", file_path=self.path)
else:
# This case should ideally be caught by the initial validation
raise ValueError(f"Unsupported storage type: {self.storage_type}")
except Exception as e:
LOG.exception("FileUploadBlock: Failed to upload file to S3", file_path=self.path)
LOG.exception("FileUploadBlock: Failed to upload file", file_path=self.path, storage_type=self.storage_type)
return await self.build_block_result(
success=False,
failure_reason=f"Failed to upload file to S3: {str(e)}",
failure_reason=f"Failed to upload file to {self.storage_type}: {str(e)}",
output_parameter_value=None,
status=BlockStatus.failed,
workflow_run_block_id=workflow_run_block_id,
organization_id=organization_id,
)
LOG.info("FileUploadBlock: File(s) uploaded to S3", file_path=self.path)
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, s3_uris)
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, uploaded_uris)
return await self.build_block_result(
success=True,
failure_reason=None,
output_parameter_value=s3_uris,
output_parameter_value=uploaded_uris,
status=BlockStatus.completed,
workflow_run_block_id=workflow_run_block_id,
organization_id=organization_id,

View File

@@ -3,3 +3,4 @@ from enum import StrEnum
class FileStorageType(StrEnum):
S3 = "s3"
AZURE = "azure"

View File

@@ -21,6 +21,7 @@ class ParameterType(StrEnum):
ONEPASSWORD = "onepassword"
OUTPUT = "output"
CREDENTIAL = "credential"
AZURE_SECRET = "azure_secret"
class Parameter(BaseModel, abc.ABC):
@@ -49,6 +50,18 @@ class AWSSecretParameter(Parameter):
deleted_at: datetime | None = None
class AzureSecretParameter(Parameter):
parameter_type: Literal[ParameterType.AZURE_SECRET] = ParameterType.AZURE_SECRET
azure_secret_parameter_id: str
workflow_id: str
azure_key: str
created_at: datetime
modified_at: datetime
deleted_at: datetime | None = None
class BitwardenLoginCredentialParameter(Parameter):
parameter_type: Literal[ParameterType.BITWARDEN_LOGIN_CREDENTIAL] = ParameterType.BITWARDEN_LOGIN_CREDENTIAL
# parameter fields
@@ -214,6 +227,7 @@ ParameterSubclasses = Union[
WorkflowParameter,
ContextParameter,
AWSSecretParameter,
AzureSecretParameter,
BitwardenLoginCredentialParameter,
BitwardenSensitiveInformationParameter,
BitwardenCreditCardDataParameter,

View File

@@ -218,6 +218,9 @@ class FileUploadBlockYAML(BlockYAML):
aws_access_key_id: str | None = None
aws_secret_access_key: str | None = None
region_name: str | None = None
azure_storage_account_name: str | None = None
azure_storage_account_key: str | None = None
azure_blob_container_name: str | None = None
path: str | None = None

View File

@@ -1902,6 +1902,9 @@ class WorkflowService:
aws_access_key_id=block_yaml.aws_access_key_id,
aws_secret_access_key=block_yaml.aws_secret_access_key,
region_name=block_yaml.region_name,
azure_storage_account_name=block_yaml.azure_storage_account_name,
azure_storage_account_key=block_yaml.azure_storage_account_key,
azure_blob_container_name=block_yaml.azure_blob_container_name,
path=block_yaml.path,
continue_on_failure=block_yaml.continue_on_failure,
)