Add workflow_permanent_id and organization_id to WorkflowRun (#570)
This commit is contained in:
@@ -0,0 +1,61 @@
|
|||||||
|
"""Add workflow_permanent_id and organization_id to workflow_runs table
|
||||||
|
|
||||||
|
Revision ID: bea545cb21b4
|
||||||
|
Revises: 485667adef01
|
||||||
|
Create Date: 2024-07-09 18:23:03.641136+00:00
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision: str = "bea545cb21b4"
|
||||||
|
down_revision: Union[str, None] = "485667adef01"
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
op.add_column("workflow_runs", sa.Column("workflow_permanent_id", sa.String(), nullable=True))
|
||||||
|
op.add_column("workflow_runs", sa.Column("organization_id", sa.String(), nullable=True))
|
||||||
|
|
||||||
|
# Backfill the new columns with data from the workflows table
|
||||||
|
connection = op.get_bind()
|
||||||
|
connection.execute(
|
||||||
|
sa.text("""
|
||||||
|
UPDATE workflow_runs wr
|
||||||
|
SET workflow_permanent_id = (
|
||||||
|
SELECT workflow_permanent_id
|
||||||
|
FROM workflows w
|
||||||
|
WHERE w.workflow_id = wr.workflow_id
|
||||||
|
),
|
||||||
|
organization_id = (
|
||||||
|
SELECT organization_id
|
||||||
|
FROM workflows w
|
||||||
|
WHERE w.workflow_id = wr.workflow_id
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
)
|
||||||
|
|
||||||
|
# Now set the columns to be non-nullable
|
||||||
|
op.alter_column("workflow_runs", "workflow_permanent_id", nullable=False)
|
||||||
|
op.alter_column("workflow_runs", "organization_id", nullable=False)
|
||||||
|
|
||||||
|
# Create foreign keys and indices after backfilling
|
||||||
|
op.create_foreign_key(
|
||||||
|
"fk_workflow_runs_organization_id", "workflow_runs", "organizations", ["organization_id"], ["organization_id"]
|
||||||
|
)
|
||||||
|
op.create_index("ix_workflow_runs_organization_id", "workflow_runs", ["organization_id"], unique=False)
|
||||||
|
op.create_index("ix_workflow_runs_workflow_permanent_id", "workflow_runs", ["workflow_permanent_id"], unique=False)
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.drop_constraint("fk_workflow_runs_organization_id", "workflow_runs", type_="foreignkey")
|
||||||
|
op.drop_column("workflow_runs", "organization_id")
|
||||||
|
op.drop_column("workflow_runs", "workflow_permanent_id")
|
||||||
|
# ### end Alembic commands ###
|
||||||
@@ -971,14 +971,18 @@ class AgentDB:
|
|||||||
|
|
||||||
async def create_workflow_run(
|
async def create_workflow_run(
|
||||||
self,
|
self,
|
||||||
|
workflow_permanent_id: str,
|
||||||
workflow_id: str,
|
workflow_id: str,
|
||||||
|
organization_id: str,
|
||||||
proxy_location: ProxyLocation | None = None,
|
proxy_location: ProxyLocation | None = None,
|
||||||
webhook_callback_url: str | None = None,
|
webhook_callback_url: str | None = None,
|
||||||
) -> WorkflowRun:
|
) -> WorkflowRun:
|
||||||
try:
|
try:
|
||||||
async with self.Session() as session:
|
async with self.Session() as session:
|
||||||
workflow_run = WorkflowRunModel(
|
workflow_run = WorkflowRunModel(
|
||||||
|
workflow_permanent_id=workflow_permanent_id,
|
||||||
workflow_id=workflow_id,
|
workflow_id=workflow_id,
|
||||||
|
organization_id=organization_id,
|
||||||
proxy_location=proxy_location,
|
proxy_location=proxy_location,
|
||||||
status="created",
|
status="created",
|
||||||
webhook_callback_url=webhook_callback_url,
|
webhook_callback_url=webhook_callback_url,
|
||||||
@@ -1026,8 +1030,7 @@ class AgentDB:
|
|||||||
workflow_runs = (
|
workflow_runs = (
|
||||||
await session.scalars(
|
await session.scalars(
|
||||||
select(WorkflowRunModel)
|
select(WorkflowRunModel)
|
||||||
.join(WorkflowModel, WorkflowModel.workflow_id == WorkflowRunModel.workflow_id)
|
.filter(WorkflowRunModel.organization_id == organization_id)
|
||||||
.filter(WorkflowModel.organization_id == organization_id)
|
|
||||||
.order_by(WorkflowRunModel.created_at.desc())
|
.order_by(WorkflowRunModel.created_at.desc())
|
||||||
.limit(page_size)
|
.limit(page_size)
|
||||||
.offset(db_page * page_size)
|
.offset(db_page * page_size)
|
||||||
@@ -1047,9 +1050,8 @@ class AgentDB:
|
|||||||
workflow_runs = (
|
workflow_runs = (
|
||||||
await session.scalars(
|
await session.scalars(
|
||||||
select(WorkflowRunModel)
|
select(WorkflowRunModel)
|
||||||
.join(WorkflowModel, WorkflowModel.workflow_id == WorkflowRunModel.workflow_id)
|
.filter(WorkflowRunModel.workflow_permanent_id == workflow_permanent_id)
|
||||||
.filter(WorkflowModel.workflow_permanent_id == workflow_permanent_id)
|
.filter(WorkflowRunModel.organization_id == organization_id)
|
||||||
.filter(WorkflowModel.organization_id == organization_id)
|
|
||||||
.order_by(WorkflowRunModel.created_at.desc())
|
.order_by(WorkflowRunModel.created_at.desc())
|
||||||
.limit(page_size)
|
.limit(page_size)
|
||||||
.offset(db_page * page_size)
|
.offset(db_page * page_size)
|
||||||
|
|||||||
@@ -197,6 +197,8 @@ class WorkflowRunModel(Base):
|
|||||||
|
|
||||||
workflow_run_id = Column(String, primary_key=True, index=True, default=generate_workflow_run_id)
|
workflow_run_id = Column(String, primary_key=True, index=True, default=generate_workflow_run_id)
|
||||||
workflow_id = Column(String, ForeignKey("workflows.workflow_id"), nullable=False)
|
workflow_id = Column(String, ForeignKey("workflows.workflow_id"), nullable=False)
|
||||||
|
workflow_permanent_id = Column(String, nullable=False, index=True)
|
||||||
|
organization_id = Column(String, ForeignKey("organizations.organization_id"), nullable=False, index=True)
|
||||||
status = Column(String, nullable=False)
|
status = Column(String, nullable=False)
|
||||||
proxy_location = Column(Enum(ProxyLocation))
|
proxy_location = Column(Enum(ProxyLocation))
|
||||||
webhook_callback_url = Column(String)
|
webhook_callback_url = Column(String)
|
||||||
|
|||||||
@@ -178,7 +178,9 @@ def convert_to_workflow_run(workflow_run_model: WorkflowRunModel, debug_enabled:
|
|||||||
|
|
||||||
return WorkflowRun(
|
return WorkflowRun(
|
||||||
workflow_run_id=workflow_run_model.workflow_run_id,
|
workflow_run_id=workflow_run_model.workflow_run_id,
|
||||||
|
workflow_permanent_id=workflow_run_model.workflow_permanent_id,
|
||||||
workflow_id=workflow_run_model.workflow_id,
|
workflow_id=workflow_run_model.workflow_id,
|
||||||
|
organization_id=workflow_run_model.organization_id,
|
||||||
status=WorkflowRunStatus[workflow_run_model.status],
|
status=WorkflowRunStatus[workflow_run_model.status],
|
||||||
proxy_location=(
|
proxy_location=(
|
||||||
ProxyLocation(workflow_run_model.proxy_location) if workflow_run_model.proxy_location else None
|
ProxyLocation(workflow_run_model.proxy_location) if workflow_run_model.proxy_location else None
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ class BaseExperimentationProvider(ABC):
|
|||||||
variant = self.get_value(feature_name, distinct_id, properties)
|
variant = self.get_value(feature_name, distinct_id, properties)
|
||||||
self.variant_map[feature_name][distinct_id] = variant
|
self.variant_map[feature_name][distinct_id] = variant
|
||||||
if variant:
|
if variant:
|
||||||
LOG.info("Feature is found", flag=feature_name, distinct_id=distinct_id)
|
LOG.info("Feature is found", flag=feature_name, distinct_id=distinct_id, variant=variant)
|
||||||
return self.variant_map[feature_name][distinct_id]
|
return self.variant_map[feature_name][distinct_id]
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -66,6 +66,8 @@ class WorkflowRunStatus(StrEnum):
|
|||||||
class WorkflowRun(BaseModel):
|
class WorkflowRun(BaseModel):
|
||||||
workflow_run_id: str
|
workflow_run_id: str
|
||||||
workflow_id: str
|
workflow_id: str
|
||||||
|
workflow_permanent_id: str
|
||||||
|
organization_id: str
|
||||||
status: WorkflowRunStatus
|
status: WorkflowRunStatus
|
||||||
proxy_location: ProxyLocation | None = None
|
proxy_location: ProxyLocation | None = None
|
||||||
webhook_callback_url: str | None = None
|
webhook_callback_url: str | None = None
|
||||||
|
|||||||
@@ -93,7 +93,12 @@ class WorkflowService:
|
|||||||
if workflow_request.webhook_callback_url is None and workflow.webhook_callback_url is not None:
|
if workflow_request.webhook_callback_url is None and workflow.webhook_callback_url is not None:
|
||||||
workflow_request.webhook_callback_url = workflow.webhook_callback_url
|
workflow_request.webhook_callback_url = workflow.webhook_callback_url
|
||||||
# Create the workflow run and set skyvern context
|
# Create the workflow run and set skyvern context
|
||||||
workflow_run = await self.create_workflow_run(workflow_request=workflow_request, workflow_id=workflow_id)
|
workflow_run = await self.create_workflow_run(
|
||||||
|
workflow_request=workflow_request,
|
||||||
|
workflow_permanent_id=workflow_permanent_id,
|
||||||
|
workflow_id=workflow_id,
|
||||||
|
organization_id=workflow.organization_id,
|
||||||
|
)
|
||||||
LOG.info(
|
LOG.info(
|
||||||
f"Created workflow run {workflow_run.workflow_run_id} for workflow {workflow.workflow_id}",
|
f"Created workflow run {workflow_run.workflow_run_id} for workflow {workflow.workflow_id}",
|
||||||
request_id=request_id,
|
request_id=request_id,
|
||||||
@@ -378,9 +383,13 @@ class WorkflowService:
|
|||||||
page_size=page_size,
|
page_size=page_size,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def create_workflow_run(self, workflow_request: WorkflowRequestBody, workflow_id: str) -> WorkflowRun:
|
async def create_workflow_run(
|
||||||
|
self, workflow_request: WorkflowRequestBody, workflow_permanent_id: str, workflow_id: str, organization_id: str
|
||||||
|
) -> WorkflowRun:
|
||||||
return await app.DATABASE.create_workflow_run(
|
return await app.DATABASE.create_workflow_run(
|
||||||
|
workflow_permanent_id=workflow_permanent_id,
|
||||||
workflow_id=workflow_id,
|
workflow_id=workflow_id,
|
||||||
|
organization_id=organization_id,
|
||||||
proxy_location=workflow_request.proxy_location,
|
proxy_location=workflow_request.proxy_location,
|
||||||
webhook_callback_url=workflow_request.webhook_callback_url,
|
webhook_callback_url=workflow_request.webhook_callback_url,
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user