diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 3489573e..6aba935e 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -283,7 +283,7 @@ class WorkflowService: skyvern_context.current() # Set workflow run status to running, create workflow run parameters - workflow_run = await self.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id) + workflow_run = await self.mark_workflow_run_as_running(workflow_run_id=workflow_run_id) # Get all context parameters from the workflow definition context_parameters = [ @@ -309,7 +309,7 @@ class WorkflowService: ] # Get all tuples - wp_wps_tuples = await self.get_workflow_run_parameter_tuples(workflow_run_id=workflow_run.workflow_run_id) + wp_wps_tuples = await self.get_workflow_run_parameter_tuples(workflow_run_id=workflow_run_id) workflow_output_parameters = await self.get_workflow_output_parameters(workflow_id=workflow.workflow_id) try: await app.WORKFLOW_CONTEXT_MANAGER.initialize_workflow_run_context( @@ -323,8 +323,8 @@ class WorkflowService: ) except Exception as e: LOG.exception( - f"Error while initializing workflow run context for workflow run {workflow_run.workflow_run_id}", - workflow_run_id=workflow_run.workflow_run_id, + f"Error while initializing workflow run context for workflow run {workflow_run_id}", + workflow_run_id=workflow_run_id, ) exception_message = f"Unexpected error: {str(e)}" @@ -333,7 +333,7 @@ class WorkflowService: failure_reason = f"Failed to initialize workflow run context. failure reason: {exception_message}" workflow_run = await self.mark_workflow_run_as_failed( - workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason + workflow_run_id=workflow_run_id, failure_reason=failure_reason ) await self.clean_up_workflow( workflow=workflow, @@ -354,264 +354,27 @@ class WorkflowService: organization_id=organization_id, workflow_script_id=workflow_script.script_id, ) - return await self._execute_workflow_script( + workflow_run = await self._execute_workflow_script( script_id=workflow_script.script_id, workflow=workflow, workflow_run=workflow_run, api_key=api_key, organization=organization, ) - - top_level_blocks = workflow.workflow_definition.blocks - all_blocks = get_all_blocks(top_level_blocks) - - if block_labels and len(block_labels): - blocks: list[BlockTypeVar] = [] - all_labels = {block.label: block for block in all_blocks} - for label in block_labels: - if label not in all_labels: - raise BlockNotFound(block_label=label) - - blocks.append(all_labels[label]) - - LOG.info( - "Executing workflow blocks via whitelist", - workflow_run_id=workflow_run.workflow_run_id, - block_cnt=len(blocks), + else: + workflow_run = await self._execute_workflow_blocks( + workflow=workflow, + workflow_run=workflow_run, + api_key=api_key, + organization=organization, + close_browser_on_completion=close_browser_on_completion, + browser_session_id=browser_session_id, block_labels=block_labels, block_outputs=block_outputs, ) - else: - blocks = top_level_blocks - - if not blocks: - raise SkyvernException(f"No blocks found for the given block labels: {block_labels}") - - # Execute workflow blocks - blocks_cnt = len(blocks) - block_result = None - for block_idx, block in enumerate(blocks): - try: - if refreshed_workflow_run := await app.DATABASE.get_workflow_run( - workflow_run_id=workflow_run.workflow_run_id, - organization_id=organization_id, - ): - workflow_run = refreshed_workflow_run - if workflow_run.status == WorkflowRunStatus.canceled: - LOG.info( - "Workflow run is canceled, stopping execution inside workflow execution loop", - workflow_run_id=workflow_run.workflow_run_id, - block_idx=block_idx, - block_type=block.block_type, - block_label=block.label, - ) - await self.clean_up_workflow( - workflow=workflow, - workflow_run=workflow_run, - api_key=api_key, - need_call_webhook=True, - close_browser_on_completion=close_browser_on_completion, - browser_session_id=browser_session_id, - ) - return workflow_run - - if workflow_run.status == WorkflowRunStatus.timed_out: - LOG.info( - "Workflow run is timed out, stopping execution inside workflow execution loop", - workflow_run_id=workflow_run.workflow_run_id, - block_idx=block_idx, - block_type=block.block_type, - block_label=block.label, - ) - await self.clean_up_workflow( - workflow=workflow, - workflow_run=workflow_run, - api_key=api_key, - need_call_webhook=True, - close_browser_on_completion=close_browser_on_completion, - browser_session_id=browser_session_id, - ) - return workflow_run - - parameters = block.get_all_parameters(workflow_run_id) - await app.WORKFLOW_CONTEXT_MANAGER.register_block_parameters_for_workflow_run( - workflow_run_id, parameters, organization - ) - LOG.info( - f"Executing root block {block.block_type} at index {block_idx}/{blocks_cnt - 1} for workflow run {workflow_run_id}", - block_type=block.block_type, - workflow_run_id=workflow_run.workflow_run_id, - block_idx=block_idx, - block_type_var=block.block_type, - block_label=block.label, - model=block.model, - ) - block_result = await block.execute_safe( - workflow_run_id=workflow_run_id, - organization_id=organization_id, - browser_session_id=browser_session_id, - ) - if block_result.status == BlockStatus.canceled: - LOG.info( - f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was canceled for workflow run {workflow_run_id}, cancelling workflow run", - block_type=block.block_type, - workflow_run_id=workflow_run.workflow_run_id, - block_idx=block_idx, - block_result=block_result, - block_type_var=block.block_type, - block_label=block.label, - ) - workflow_run = await self.mark_workflow_run_as_canceled( - workflow_run_id=workflow_run.workflow_run_id - ) - # We're not sending a webhook here because the workflow run is manually marked as canceled. - await self.clean_up_workflow( - workflow=workflow, - workflow_run=workflow_run, - api_key=api_key, - need_call_webhook=False, - close_browser_on_completion=close_browser_on_completion, - browser_session_id=browser_session_id, - ) - return workflow_run - elif block_result.status == BlockStatus.failed: - LOG.error( - f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} failed for workflow run {workflow_run_id}", - block_type=block.block_type, - workflow_run_id=workflow_run.workflow_run_id, - block_idx=block_idx, - block_result=block_result, - block_type_var=block.block_type, - block_label=block.label, - ) - if not block.continue_on_failure: - failure_reason = ( - f"{block.block_type} block failed. failure reason: {block_result.failure_reason}" - ) - workflow_run = await self.mark_workflow_run_as_failed( - workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason - ) - await self.clean_up_workflow( - workflow=workflow, - workflow_run=workflow_run, - api_key=api_key, - close_browser_on_completion=close_browser_on_completion, - browser_session_id=browser_session_id, - ) - return workflow_run - - LOG.warning( - f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} failed but will continue executing the workflow run {workflow_run_id}", - block_type=block.block_type, - workflow_run_id=workflow_run.workflow_run_id, - block_idx=block_idx, - block_result=block_result, - continue_on_failure=block.continue_on_failure, - block_type_var=block.block_type, - block_label=block.label, - ) - - elif block_result.status == BlockStatus.terminated: - LOG.info( - f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was terminated for workflow run {workflow_run_id}, marking workflow run as terminated", - block_type=block.block_type, - workflow_run_id=workflow_run.workflow_run_id, - block_idx=block_idx, - block_result=block_result, - block_type_var=block.block_type, - block_label=block.label, - ) - - if not block.continue_on_failure: - failure_reason = f"{block.block_type} block terminated. Reason: {block_result.failure_reason}" - workflow_run = await self.mark_workflow_run_as_terminated( - workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason - ) - await self.clean_up_workflow( - workflow=workflow, - workflow_run=workflow_run, - api_key=api_key, - close_browser_on_completion=close_browser_on_completion, - browser_session_id=browser_session_id, - ) - return workflow_run - - LOG.warning( - f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was terminated for workflow run {workflow_run_id}, but will continue executing the workflow run", - block_type=block.block_type, - workflow_run_id=workflow_run.workflow_run_id, - block_idx=block_idx, - block_result=block_result, - continue_on_failure=block.continue_on_failure, - block_type_var=block.block_type, - block_label=block.label, - ) - - elif block_result.status == BlockStatus.timed_out: - LOG.info( - f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out for workflow run {workflow_run_id}, marking workflow run as failed", - block_type=block.block_type, - workflow_run_id=workflow_run.workflow_run_id, - block_idx=block_idx, - block_result=block_result, - block_type_var=block.block_type, - block_label=block.label, - ) - - if not block.continue_on_failure: - failure_reason = f"{block.block_type} block timed out. Reason: {block_result.failure_reason}" - workflow_run = await self.mark_workflow_run_as_failed( - workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason - ) - await self.clean_up_workflow( - workflow=workflow, - workflow_run=workflow_run, - api_key=api_key, - close_browser_on_completion=close_browser_on_completion, - browser_session_id=browser_session_id, - ) - return workflow_run - - LOG.warning( - f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out for workflow run {workflow_run_id}, but will continue executing the workflow run", - block_type=block.block_type, - workflow_run_id=workflow_run.workflow_run_id, - block_idx=block_idx, - block_result=block_result, - continue_on_failure=block.continue_on_failure, - block_type_var=block.block_type, - block_label=block.label, - ) - - except Exception as e: - LOG.exception( - f"Error while executing workflow run {workflow_run.workflow_run_id}", - workflow_run_id=workflow_run.workflow_run_id, - block_idx=block_idx, - block_type=block.block_type, - block_label=block.label, - ) - - exception_message = f"Unexpected error: {str(e)}" - if isinstance(e, SkyvernException): - exception_message = f"unexpected SkyvernException({e.__class__.__name__}): {str(e)}" - - failure_reason = f"{block.block_type} block failed. failure reason: {exception_message}" - workflow_run = await self.mark_workflow_run_as_failed( - workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason - ) - await self.clean_up_workflow( - workflow=workflow, - workflow_run=workflow_run, - api_key=api_key, - browser_session_id=browser_session_id, - close_browser_on_completion=close_browser_on_completion, - ) - return workflow_run - if refreshed_workflow_run := await app.DATABASE.get_workflow_run( - workflow_run_id=workflow_run.workflow_run_id, + workflow_run_id=workflow_run_id, organization_id=organization_id, ): workflow_run = refreshed_workflow_run @@ -621,7 +384,7 @@ class WorkflowService: WorkflowRunStatus.terminated, WorkflowRunStatus.timed_out, ): - workflow_run = await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id) + workflow_run = await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run_id) await self.generate_script_if_needed( workflow=workflow, workflow_run=workflow_run, @@ -630,7 +393,7 @@ class WorkflowService: else: LOG.info( "Workflow run is already timed_out, canceled, failed, or terminated, not marking as completed", - workflow_run_id=workflow_run.workflow_run_id, + workflow_run_id=workflow_run_id, workflow_run_status=workflow_run.status if workflow_run else None, ) await self.clean_up_workflow( @@ -648,12 +411,220 @@ class WorkflowService: workflow_run_id=workflow_run_id, workflow_id=workflow_run.workflow_id, duration_seconds=duration_seconds, - workflow_run_status=WorkflowRunStatus.completed, + workflow_run_status=workflow_run.status, organization_id=organization_id, + is_script_run=workflow_script is not None, ) return workflow_run + async def _execute_workflow_blocks( + self, + workflow: Workflow, + workflow_run: WorkflowRun, + api_key: str, + organization: Organization, + close_browser_on_completion: bool, + browser_session_id: str | None = None, + block_labels: list[str] | None = None, + block_outputs: dict[str, Any] | None = None, + ) -> WorkflowRun: + organization_id = organization.organization_id + workflow_run_id = workflow_run.workflow_run_id + top_level_blocks = workflow.workflow_definition.blocks + all_blocks = get_all_blocks(top_level_blocks) + + if block_labels and len(block_labels): + blocks: list[BlockTypeVar] = [] + all_labels = {block.label: block for block in all_blocks} + for label in block_labels: + if label not in all_labels: + raise BlockNotFound(block_label=label) + + blocks.append(all_labels[label]) + + LOG.info( + "Executing workflow blocks via whitelist", + workflow_run_id=workflow_run_id, + block_cnt=len(blocks), + block_labels=block_labels, + block_outputs=block_outputs, + ) + + else: + blocks = top_level_blocks + + if not blocks: + raise SkyvernException(f"No blocks found for the given block labels: {block_labels}") + + # Execute workflow blocks + blocks_cnt = len(blocks) + block_result = None + for block_idx, block in enumerate(blocks): + try: + if refreshed_workflow_run := await app.DATABASE.get_workflow_run( + workflow_run_id=workflow_run_id, + organization_id=organization_id, + ): + workflow_run = refreshed_workflow_run + if workflow_run.status == WorkflowRunStatus.canceled: + LOG.info( + "Workflow run is canceled, stopping execution inside workflow execution loop", + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_type=block.block_type, + block_label=block.label, + ) + break + + if workflow_run.status == WorkflowRunStatus.timed_out: + LOG.info( + "Workflow run is timed out, stopping execution inside workflow execution loop", + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_type=block.block_type, + block_label=block.label, + ) + break + + parameters = block.get_all_parameters(workflow_run_id) + await app.WORKFLOW_CONTEXT_MANAGER.register_block_parameters_for_workflow_run( + workflow_run_id, parameters, organization + ) + LOG.info( + f"Executing root block {block.block_type} at index {block_idx}/{blocks_cnt - 1} for workflow run {workflow_run_id}", + block_type=block.block_type, + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_type_var=block.block_type, + block_label=block.label, + model=block.model, + ) + block_result = await block.execute_safe( + workflow_run_id=workflow_run_id, + organization_id=organization_id, + browser_session_id=browser_session_id, + ) + if block_result.status == BlockStatus.canceled: + LOG.info( + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was canceled for workflow run {workflow_run_id}, cancelling workflow run", + block_type=block.block_type, + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_result=block_result, + block_type_var=block.block_type, + block_label=block.label, + ) + workflow_run = await self.mark_workflow_run_as_canceled(workflow_run_id=workflow_run_id) + break + elif block_result.status == BlockStatus.failed: + LOG.error( + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} failed for workflow run {workflow_run_id}", + block_type=block.block_type, + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_result=block_result, + block_type_var=block.block_type, + block_label=block.label, + ) + if not block.continue_on_failure: + failure_reason = ( + f"{block.block_type} block failed. failure reason: {block_result.failure_reason}" + ) + workflow_run = await self.mark_workflow_run_as_failed( + workflow_run_id=workflow_run_id, failure_reason=failure_reason + ) + break + + LOG.warning( + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} failed but will continue executing the workflow run {workflow_run_id}", + block_type=block.block_type, + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_result=block_result, + continue_on_failure=block.continue_on_failure, + block_type_var=block.block_type, + block_label=block.label, + ) + + elif block_result.status == BlockStatus.terminated: + LOG.info( + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was terminated for workflow run {workflow_run_id}, marking workflow run as terminated", + block_type=block.block_type, + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_result=block_result, + block_type_var=block.block_type, + block_label=block.label, + ) + + if not block.continue_on_failure: + failure_reason = f"{block.block_type} block terminated. Reason: {block_result.failure_reason}" + workflow_run = await self.mark_workflow_run_as_terminated( + workflow_run_id=workflow_run_id, failure_reason=failure_reason + ) + break + + LOG.warning( + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was terminated for workflow run {workflow_run_id}, but will continue executing the workflow run", + block_type=block.block_type, + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_result=block_result, + continue_on_failure=block.continue_on_failure, + block_type_var=block.block_type, + block_label=block.label, + ) + + elif block_result.status == BlockStatus.timed_out: + LOG.info( + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out for workflow run {workflow_run_id}, marking workflow run as failed", + block_type=block.block_type, + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_result=block_result, + block_type_var=block.block_type, + block_label=block.label, + ) + + if not block.continue_on_failure: + failure_reason = f"{block.block_type} block timed out. Reason: {block_result.failure_reason}" + workflow_run = await self.mark_workflow_run_as_failed( + workflow_run_id=workflow_run_id, failure_reason=failure_reason + ) + break + + LOG.warning( + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out for workflow run {workflow_run_id}, but will continue executing the workflow run", + block_type=block.block_type, + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_result=block_result, + continue_on_failure=block.continue_on_failure, + block_type_var=block.block_type, + block_label=block.label, + ) + + except Exception as e: + LOG.exception( + f"Error while executing workflow run {workflow_run_id}", + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_type=block.block_type, + block_label=block.label, + ) + + exception_message = f"Unexpected error: {str(e)}" + if isinstance(e, SkyvernException): + exception_message = f"unexpected SkyvernException({e.__class__.__name__}): {str(e)}" + + failure_reason = f"{block.block_type} block failed. failure reason: {exception_message}" + workflow_run = await self.mark_workflow_run_as_failed( + workflow_run_id=workflow_run_id, failure_reason=failure_reason + ) + break + return workflow_run + async def create_workflow( self, organization_id: str,