diff --git a/alembic/versions/2024_07_09_1823-bea545cb21b4_add_workflow_permanent_id_and_.py b/alembic/versions/2024_07_09_1823-bea545cb21b4_add_workflow_permanent_id_and_.py new file mode 100644 index 00000000..1c132313 --- /dev/null +++ b/alembic/versions/2024_07_09_1823-bea545cb21b4_add_workflow_permanent_id_and_.py @@ -0,0 +1,61 @@ +"""Add workflow_permanent_id and organization_id to workflow_runs table + +Revision ID: bea545cb21b4 +Revises: 485667adef01 +Create Date: 2024-07-09 18:23:03.641136+00:00 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "bea545cb21b4" +down_revision: Union[str, None] = "485667adef01" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column("workflow_runs", sa.Column("workflow_permanent_id", sa.String(), nullable=True)) + op.add_column("workflow_runs", sa.Column("organization_id", sa.String(), nullable=True)) + + # Backfill the new columns with data from the workflows table + connection = op.get_bind() + connection.execute( + sa.text(""" + UPDATE workflow_runs wr + SET workflow_permanent_id = ( + SELECT workflow_permanent_id + FROM workflows w + WHERE w.workflow_id = wr.workflow_id + ), + organization_id = ( + SELECT organization_id + FROM workflows w + WHERE w.workflow_id = wr.workflow_id + ) + """) + ) + + # Now set the columns to be non-nullable + op.alter_column("workflow_runs", "workflow_permanent_id", nullable=False) + op.alter_column("workflow_runs", "organization_id", nullable=False) + + # Create foreign keys and indices after backfilling + op.create_foreign_key( + "fk_workflow_runs_organization_id", "workflow_runs", "organizations", ["organization_id"], ["organization_id"] + ) + op.create_index("ix_workflow_runs_organization_id", "workflow_runs", ["organization_id"], unique=False) + op.create_index("ix_workflow_runs_workflow_permanent_id", "workflow_runs", ["workflow_permanent_id"], unique=False) + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint("fk_workflow_runs_organization_id", "workflow_runs", type_="foreignkey") + op.drop_column("workflow_runs", "organization_id") + op.drop_column("workflow_runs", "workflow_permanent_id") + # ### end Alembic commands ### diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 6f9d2a64..4f82a060 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -971,14 +971,18 @@ class AgentDB: async def create_workflow_run( self, + workflow_permanent_id: str, workflow_id: str, + organization_id: str, proxy_location: ProxyLocation | None = None, webhook_callback_url: str | None = None, ) -> WorkflowRun: try: async with self.Session() as session: workflow_run = WorkflowRunModel( + workflow_permanent_id=workflow_permanent_id, workflow_id=workflow_id, + organization_id=organization_id, proxy_location=proxy_location, status="created", webhook_callback_url=webhook_callback_url, @@ -1026,8 +1030,7 @@ class AgentDB: workflow_runs = ( await session.scalars( select(WorkflowRunModel) - .join(WorkflowModel, WorkflowModel.workflow_id == WorkflowRunModel.workflow_id) - .filter(WorkflowModel.organization_id == organization_id) + .filter(WorkflowRunModel.organization_id == organization_id) .order_by(WorkflowRunModel.created_at.desc()) .limit(page_size) .offset(db_page * page_size) @@ -1047,9 +1050,8 @@ class AgentDB: workflow_runs = ( await session.scalars( select(WorkflowRunModel) - .join(WorkflowModel, WorkflowModel.workflow_id == WorkflowRunModel.workflow_id) - .filter(WorkflowModel.workflow_permanent_id == workflow_permanent_id) - .filter(WorkflowModel.organization_id == organization_id) + .filter(WorkflowRunModel.workflow_permanent_id == workflow_permanent_id) + .filter(WorkflowRunModel.organization_id == organization_id) .order_by(WorkflowRunModel.created_at.desc()) .limit(page_size) .offset(db_page * page_size) diff --git a/skyvern/forge/sdk/db/models.py b/skyvern/forge/sdk/db/models.py index 218af1a3..da593c04 100644 --- a/skyvern/forge/sdk/db/models.py +++ b/skyvern/forge/sdk/db/models.py @@ -197,6 +197,8 @@ class WorkflowRunModel(Base): workflow_run_id = Column(String, primary_key=True, index=True, default=generate_workflow_run_id) workflow_id = Column(String, ForeignKey("workflows.workflow_id"), nullable=False) + workflow_permanent_id = Column(String, nullable=False, index=True) + organization_id = Column(String, ForeignKey("organizations.organization_id"), nullable=False, index=True) status = Column(String, nullable=False) proxy_location = Column(Enum(ProxyLocation)) webhook_callback_url = Column(String) diff --git a/skyvern/forge/sdk/db/utils.py b/skyvern/forge/sdk/db/utils.py index 6bd00b74..1f2167f8 100644 --- a/skyvern/forge/sdk/db/utils.py +++ b/skyvern/forge/sdk/db/utils.py @@ -178,7 +178,9 @@ def convert_to_workflow_run(workflow_run_model: WorkflowRunModel, debug_enabled: return WorkflowRun( workflow_run_id=workflow_run_model.workflow_run_id, + workflow_permanent_id=workflow_run_model.workflow_permanent_id, workflow_id=workflow_run_model.workflow_id, + organization_id=workflow_run_model.organization_id, status=WorkflowRunStatus[workflow_run_model.status], proxy_location=( ProxyLocation(workflow_run_model.proxy_location) if workflow_run_model.proxy_location else None diff --git a/skyvern/forge/sdk/experimentation/providers.py b/skyvern/forge/sdk/experimentation/providers.py index bc2118f2..0e666fe3 100644 --- a/skyvern/forge/sdk/experimentation/providers.py +++ b/skyvern/forge/sdk/experimentation/providers.py @@ -37,7 +37,7 @@ class BaseExperimentationProvider(ABC): variant = self.get_value(feature_name, distinct_id, properties) self.variant_map[feature_name][distinct_id] = variant if variant: - LOG.info("Feature is found", flag=feature_name, distinct_id=distinct_id) + LOG.info("Feature is found", flag=feature_name, distinct_id=distinct_id, variant=variant) return self.variant_map[feature_name][distinct_id] diff --git a/skyvern/forge/sdk/workflow/models/workflow.py b/skyvern/forge/sdk/workflow/models/workflow.py index c0371383..cf12cad9 100644 --- a/skyvern/forge/sdk/workflow/models/workflow.py +++ b/skyvern/forge/sdk/workflow/models/workflow.py @@ -66,6 +66,8 @@ class WorkflowRunStatus(StrEnum): class WorkflowRun(BaseModel): workflow_run_id: str workflow_id: str + workflow_permanent_id: str + organization_id: str status: WorkflowRunStatus proxy_location: ProxyLocation | None = None webhook_callback_url: str | None = None diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 1f267858..90a91590 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -93,7 +93,12 @@ class WorkflowService: if workflow_request.webhook_callback_url is None and workflow.webhook_callback_url is not None: workflow_request.webhook_callback_url = workflow.webhook_callback_url # Create the workflow run and set skyvern context - workflow_run = await self.create_workflow_run(workflow_request=workflow_request, workflow_id=workflow_id) + workflow_run = await self.create_workflow_run( + workflow_request=workflow_request, + workflow_permanent_id=workflow_permanent_id, + workflow_id=workflow_id, + organization_id=workflow.organization_id, + ) LOG.info( f"Created workflow run {workflow_run.workflow_run_id} for workflow {workflow.workflow_id}", request_id=request_id, @@ -378,9 +383,13 @@ class WorkflowService: page_size=page_size, ) - async def create_workflow_run(self, workflow_request: WorkflowRequestBody, workflow_id: str) -> WorkflowRun: + async def create_workflow_run( + self, workflow_request: WorkflowRequestBody, workflow_permanent_id: str, workflow_id: str, organization_id: str + ) -> WorkflowRun: return await app.DATABASE.create_workflow_run( + workflow_permanent_id=workflow_permanent_id, workflow_id=workflow_id, + organization_id=organization_id, proxy_location=workflow_request.proxy_location, webhook_callback_url=workflow_request.webhook_callback_url, )