Replace file_suffix with file_name (but keep vairable name for backwards compatibility), Add path to Azure Blob, remove defaulted {workflow_run_id} (#3557)
This commit is contained in:
@@ -58,6 +58,10 @@ class WorkflowRunContext:
|
||||
cls,
|
||||
aws_client: AsyncAWSClient,
|
||||
organization: Organization,
|
||||
workflow_run_id: str,
|
||||
workflow_title: str,
|
||||
workflow_id: str,
|
||||
workflow_permanent_id: str,
|
||||
workflow_parameter_tuples: list[tuple[WorkflowParameter, "WorkflowRunParameter"]],
|
||||
workflow_output_parameters: list[OutputParameter],
|
||||
context_parameters: list[ContextParameter],
|
||||
@@ -71,7 +75,14 @@ class WorkflowRunContext:
|
||||
block_outputs: dict[str, Any] | None = None,
|
||||
) -> Self:
|
||||
# key is label name
|
||||
workflow_run_context = cls(aws_client=aws_client)
|
||||
workflow_run_context = cls(
|
||||
workflow_title=workflow_title,
|
||||
workflow_id=workflow_id,
|
||||
workflow_permanent_id=workflow_permanent_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
aws_client=aws_client,
|
||||
)
|
||||
|
||||
for parameter, run_parameter in workflow_parameter_tuples:
|
||||
if parameter.workflow_parameter_type == WorkflowParameterType.CREDENTIAL_ID:
|
||||
await workflow_run_context.register_secret_workflow_parameter_value(
|
||||
@@ -133,7 +144,18 @@ class WorkflowRunContext:
|
||||
|
||||
return workflow_run_context
|
||||
|
||||
def __init__(self, aws_client: AsyncAWSClient) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
workflow_title: str,
|
||||
workflow_id: str,
|
||||
workflow_permanent_id: str,
|
||||
workflow_run_id: str,
|
||||
aws_client: AsyncAWSClient,
|
||||
) -> None:
|
||||
self.workflow_title = workflow_title
|
||||
self.workflow_id = workflow_id
|
||||
self.workflow_permanent_id = workflow_permanent_id
|
||||
self.workflow_run_id = workflow_run_id
|
||||
self.blocks_metadata: dict[str, BlockMetadata] = {}
|
||||
self.parameters: dict[str, PARAMETER_TYPE] = {}
|
||||
self.values: dict[str, Any] = {}
|
||||
@@ -953,6 +975,9 @@ class WorkflowContextManager:
|
||||
self,
|
||||
organization: Organization,
|
||||
workflow_run_id: str,
|
||||
workflow_title: str,
|
||||
workflow_id: str,
|
||||
workflow_permanent_id: str,
|
||||
workflow_parameter_tuples: list[tuple[WorkflowParameter, "WorkflowRunParameter"]],
|
||||
workflow_output_parameters: list[OutputParameter],
|
||||
context_parameters: list[ContextParameter],
|
||||
@@ -967,6 +992,10 @@ class WorkflowContextManager:
|
||||
workflow_run_context = await WorkflowRunContext.init(
|
||||
self.aws_client,
|
||||
organization,
|
||||
workflow_run_id,
|
||||
workflow_title,
|
||||
workflow_id,
|
||||
workflow_permanent_id,
|
||||
workflow_parameter_tuples,
|
||||
workflow_output_parameters,
|
||||
context_parameters,
|
||||
|
||||
@@ -196,6 +196,7 @@ class Block(BaseModel, abc.ABC):
|
||||
|
||||
template_data[self.label] = block_reference_data
|
||||
|
||||
# TODO (suchintan): This is pretty hacky - we should have a standard way to initialize the workflow run context
|
||||
# inject the forloop metadata as global variables
|
||||
if "current_index" in block_reference_data:
|
||||
template_data["current_index"] = block_reference_data["current_index"]
|
||||
@@ -204,6 +205,16 @@ class Block(BaseModel, abc.ABC):
|
||||
if "current_value" in block_reference_data:
|
||||
template_data["current_value"] = block_reference_data["current_value"]
|
||||
|
||||
# Initialize workflow-level parameters
|
||||
if "workflow_title" not in template_data:
|
||||
template_data["workflow_title"] = workflow_run_context.workflow_title
|
||||
if "workflow_id" not in template_data:
|
||||
template_data["workflow_id"] = workflow_run_context.workflow_id
|
||||
if "workflow_permanent_id" not in template_data:
|
||||
template_data["workflow_permanent_id"] = workflow_run_context.workflow_permanent_id
|
||||
if "workflow_run_id" not in template_data:
|
||||
template_data["workflow_run_id"] = workflow_run_context.workflow_run_id
|
||||
|
||||
return template.render(template_data)
|
||||
|
||||
@classmethod
|
||||
@@ -1933,6 +1944,7 @@ class FileUploadBlock(Block):
|
||||
def format_potential_template_parameters(self, workflow_run_context: WorkflowRunContext) -> None:
|
||||
if self.path:
|
||||
self.path = self.format_block_parameter_template_from_workflow_run_context(self.path, workflow_run_context)
|
||||
|
||||
if self.s3_bucket:
|
||||
self.s3_bucket = self.format_block_parameter_template_from_workflow_run_context(
|
||||
self.s3_bucket, workflow_run_context
|
||||
@@ -1959,14 +1971,14 @@ class FileUploadBlock(Block):
|
||||
)
|
||||
|
||||
def _get_s3_uri(self, workflow_run_id: str, path: str) -> str:
|
||||
s3_suffix = f"{workflow_run_id}/{uuid.uuid4()}_{Path(path).name}"
|
||||
if not self.path:
|
||||
return f"s3://{self.s3_bucket}/{s3_suffix}"
|
||||
return f"s3://{self.s3_bucket}/{self.path}/{s3_suffix}"
|
||||
folder_path = self.path or f"{workflow_run_id}"
|
||||
s3_suffix = f"{uuid.uuid4()}_{Path(path).name}"
|
||||
return f"s3://{self.s3_bucket}/{folder_path}/{s3_suffix}"
|
||||
|
||||
def _get_azure_blob_uri(self, workflow_run_id: str, file_path: str) -> str:
|
||||
blob_name = Path(file_path).name
|
||||
return f"https://{self.azure_storage_account_name}.blob.core.windows.net/{self.azure_blob_container_name}/{workflow_run_id}/{uuid.uuid4()}_{blob_name}"
|
||||
folder_path = self.path or workflow_run_id
|
||||
return f"https://{self.azure_storage_account_name}.blob.core.windows.net/{self.azure_blob_container_name}/{folder_path}/{uuid.uuid4()}_{blob_name}"
|
||||
|
||||
async def execute(
|
||||
self,
|
||||
|
||||
@@ -8,7 +8,15 @@ from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
from skyvern.exceptions import InvalidWorkflowParameter
|
||||
|
||||
RESERVED_PARAMETER_KEYS = ["current_item", "current_value", "current_index"]
|
||||
RESERVED_PARAMETER_KEYS = [
|
||||
"current_item",
|
||||
"current_value",
|
||||
"current_index",
|
||||
"workflow_title",
|
||||
"workflow_id",
|
||||
"workflow_permanent_id",
|
||||
"workflow_run_id",
|
||||
]
|
||||
|
||||
|
||||
class ParameterType(StrEnum):
|
||||
|
||||
@@ -320,6 +320,9 @@ class WorkflowService:
|
||||
await app.WORKFLOW_CONTEXT_MANAGER.initialize_workflow_run_context(
|
||||
organization,
|
||||
workflow_run_id,
|
||||
workflow.title,
|
||||
workflow.workflow_id,
|
||||
workflow.workflow_permanent_id,
|
||||
wp_wps_tuples,
|
||||
workflow_output_parameters,
|
||||
context_parameters,
|
||||
|
||||
Reference in New Issue
Block a user