fix script creation post workflow run (#3160)
This commit is contained in:
@@ -0,0 +1,31 @@
|
|||||||
|
"""script_blocks.script_file_id from required to optional
|
||||||
|
|
||||||
|
Revision ID: 20e960cf015d
|
||||||
|
Revises: 944ef972e5a8
|
||||||
|
Create Date: 2025-08-11 05:53:41.480218+00:00
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision: str = "20e960cf015d"
|
||||||
|
down_revision: Union[str, None] = "944ef972e5a8"
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.alter_column("script_blocks", "script_file_id", existing_type=sa.VARCHAR(), nullable=True)
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.alter_column("script_blocks", "script_file_id", existing_type=sa.VARCHAR(), nullable=False)
|
||||||
|
# ### end Alembic commands ###
|
||||||
@@ -367,6 +367,8 @@ async def generate_workflow_script(
|
|||||||
actions_by_task: dict[str, list[dict[str, Any]]],
|
actions_by_task: dict[str, list[dict[str, Any]]],
|
||||||
organization_id: str | None = None,
|
organization_id: str | None = None,
|
||||||
run_id: str | None = None,
|
run_id: str | None = None,
|
||||||
|
script_id: str | None = None,
|
||||||
|
script_revision_id: str | None = None,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""
|
"""
|
||||||
Build a LibCST Module and emit .code (PEP-8-formatted source).
|
Build a LibCST Module and emit .code (PEP-8-formatted source).
|
||||||
@@ -406,20 +408,6 @@ async def generate_workflow_script(
|
|||||||
length_of_tasks = len(tasks)
|
length_of_tasks = len(tasks)
|
||||||
|
|
||||||
# Create script first if organization_id is provided
|
# Create script first if organization_id is provided
|
||||||
script_id = None
|
|
||||||
script_revision_id = None
|
|
||||||
if organization_id:
|
|
||||||
try:
|
|
||||||
script = await app.DATABASE.create_script(
|
|
||||||
organization_id=organization_id,
|
|
||||||
run_id=run_id,
|
|
||||||
)
|
|
||||||
script_id = script.script_id
|
|
||||||
script_revision_id = script.script_revision_id
|
|
||||||
except Exception as e:
|
|
||||||
LOG.error("Failed to create script", error=str(e), exc_info=True)
|
|
||||||
# Continue without script creation if it fails
|
|
||||||
|
|
||||||
for idx, task in enumerate(tasks):
|
for idx, task in enumerate(tasks):
|
||||||
block_fn_def = _build_block_fn(task, actions_by_task.get(task.get("task_id", ""), []))
|
block_fn_def = _build_block_fn(task, actions_by_task.get(task.get("task_id", ""), []))
|
||||||
|
|
||||||
@@ -473,39 +461,6 @@ async def generate_workflow_script(
|
|||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create main script file if we have script context
|
|
||||||
if script_id and script_revision_id and organization_id:
|
|
||||||
try:
|
|
||||||
main_script_code = module.code
|
|
||||||
main_file_name = "main.py"
|
|
||||||
main_file_path = main_file_name
|
|
||||||
|
|
||||||
# Create artifact and upload to S3
|
|
||||||
artifact_id = await app.ARTIFACT_MANAGER.create_script_file_artifact(
|
|
||||||
organization_id=organization_id,
|
|
||||||
script_id=script_id,
|
|
||||||
script_version=1, # Assuming version 1 for now
|
|
||||||
file_path=main_file_path,
|
|
||||||
data=main_script_code.encode("utf-8"),
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create script file record for main file
|
|
||||||
await app.DATABASE.create_script_file(
|
|
||||||
script_revision_id=script_revision_id,
|
|
||||||
script_id=script_id,
|
|
||||||
organization_id=organization_id,
|
|
||||||
file_path=main_file_path,
|
|
||||||
file_name=main_file_name,
|
|
||||||
file_type="file",
|
|
||||||
content_hash=f"sha256:{hashlib.sha256(main_script_code.encode('utf-8')).hexdigest()}",
|
|
||||||
file_size=len(main_script_code.encode("utf-8")),
|
|
||||||
mime_type="text/x-python",
|
|
||||||
artifact_id=artifact_id,
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
LOG.error("Failed to create main script file", error=str(e), exc_info=True)
|
|
||||||
# Continue without main script file creation if it fails
|
|
||||||
|
|
||||||
with open(file_name, "w") as f:
|
with open(file_name, "w") as f:
|
||||||
f.write(module.code)
|
f.write(module.code)
|
||||||
return module.code
|
return module.code
|
||||||
@@ -532,11 +487,9 @@ async def create_script_block(
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# Step 1: Transform the block function definition to a string
|
# Step 1: Transform the block function definition to a string
|
||||||
block_code = block_fn_def.code
|
# Create a temporary module to convert FunctionDef to source code
|
||||||
|
temp_module = cst.Module(body=[block_fn_def])
|
||||||
# Step 2: Use the function name as block name if not provided
|
block_code = temp_module.code
|
||||||
if not block_name:
|
|
||||||
block_name = block_fn_def.name.value
|
|
||||||
|
|
||||||
# Step 3: Create script block in database
|
# Step 3: Create script block in database
|
||||||
script_block = await app.DATABASE.create_script_block(
|
script_block = await app.DATABASE.create_script_block(
|
||||||
@@ -578,7 +531,7 @@ async def create_script_block(
|
|||||||
await app.DATABASE.update_script_block(
|
await app.DATABASE.update_script_block(
|
||||||
script_block_id=script_block.script_block_id,
|
script_block_id=script_block.script_block_id,
|
||||||
organization_id=organization_id,
|
organization_id=organization_id,
|
||||||
script_file_id=script_file.script_file_id,
|
script_file_id=script_file.file_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@@ -879,7 +879,7 @@ class ScriptBlockModel(Base):
|
|||||||
script_id = Column(String, nullable=False)
|
script_id = Column(String, nullable=False)
|
||||||
script_revision_id = Column(String, nullable=False, index=True)
|
script_revision_id = Column(String, nullable=False, index=True)
|
||||||
script_block_label = Column(String, nullable=False)
|
script_block_label = Column(String, nullable=False)
|
||||||
script_file_id = Column(String, nullable=False)
|
script_file_id = Column(String, nullable=True)
|
||||||
|
|
||||||
created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
|
created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
|
||||||
modified_at = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
|
modified_at = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
|
||||||
|
|||||||
@@ -623,6 +623,12 @@ class WorkflowService:
|
|||||||
|
|
||||||
# generate script for workflow if the workflow.generate_script is True AND there's no script cached for the workflow
|
# generate script for workflow if the workflow.generate_script is True AND there's no script cached for the workflow
|
||||||
if workflow.generate_script:
|
if workflow.generate_script:
|
||||||
|
LOG.info(
|
||||||
|
"Generating script for workflow",
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
workflow_id=workflow.workflow_id,
|
||||||
|
workflow_name=workflow.title,
|
||||||
|
)
|
||||||
await self.generate_script_for_workflow(workflow=workflow, workflow_run=workflow_run)
|
await self.generate_script_for_workflow(workflow=workflow, workflow_run=workflow_run)
|
||||||
|
|
||||||
return workflow_run
|
return workflow_run
|
||||||
@@ -2278,11 +2284,24 @@ class WorkflowService:
|
|||||||
"Found cached script for workflow",
|
"Found cached script for workflow",
|
||||||
workflow_id=workflow.workflow_id,
|
workflow_id=workflow.workflow_id,
|
||||||
cache_key_value=rendered_cache_key_value,
|
cache_key_value=rendered_cache_key_value,
|
||||||
|
workflow_run_id=workflow_run.workflow_run_id,
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
created_script = await app.DATABASE.create_script(
|
||||||
|
organization_id=workflow.organization_id,
|
||||||
|
run_id=workflow_run.workflow_run_id,
|
||||||
|
)
|
||||||
|
|
||||||
# 3) Generate script code from workflow run
|
# 3) Generate script code from workflow run
|
||||||
try:
|
try:
|
||||||
|
LOG.info(
|
||||||
|
"Generating script for workflow",
|
||||||
|
workflow_run_id=workflow_run.workflow_run_id,
|
||||||
|
workflow_id=workflow.workflow_id,
|
||||||
|
workflow_name=workflow.title,
|
||||||
|
cache_key_value=rendered_cache_key_value,
|
||||||
|
)
|
||||||
codegen_input = await transform_workflow_run_to_code_gen_input(
|
codegen_input = await transform_workflow_run_to_code_gen_input(
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
workflow_run_id=workflow_run.workflow_run_id,
|
||||||
organization_id=workflow.organization_id,
|
organization_id=workflow.organization_id,
|
||||||
@@ -2293,6 +2312,9 @@ class WorkflowService:
|
|||||||
workflow=codegen_input.workflow,
|
workflow=codegen_input.workflow,
|
||||||
tasks=codegen_input.workflow_blocks,
|
tasks=codegen_input.workflow_blocks,
|
||||||
actions_by_task=codegen_input.actions_by_task,
|
actions_by_task=codegen_input.actions_by_task,
|
||||||
|
organization_id=workflow.organization_id,
|
||||||
|
script_id=created_script.script_id,
|
||||||
|
script_revision_id=created_script.script_revision_id,
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.error("Failed to generate workflow script source", exc_info=True)
|
LOG.error("Failed to generate workflow script source", exc_info=True)
|
||||||
@@ -2310,24 +2332,19 @@ class WorkflowService:
|
|||||||
)
|
)
|
||||||
]
|
]
|
||||||
|
|
||||||
created = await app.DATABASE.create_script(
|
|
||||||
organization_id=workflow.organization_id,
|
|
||||||
run_id=workflow_run.workflow_run_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Upload script file(s) as artifacts and create rows
|
# Upload script file(s) as artifacts and create rows
|
||||||
await script_service.build_file_tree(
|
await script_service.build_file_tree(
|
||||||
files=files,
|
files=files,
|
||||||
organization_id=workflow.organization_id,
|
organization_id=workflow.organization_id,
|
||||||
script_id=created.script_id,
|
script_id=created_script.script_id,
|
||||||
script_version=created.version,
|
script_version=created_script.version,
|
||||||
script_revision_id=created.script_revision_id,
|
script_revision_id=created_script.script_revision_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Record the workflow->script mapping for cache lookup
|
# Record the workflow->script mapping for cache lookup
|
||||||
await app.DATABASE.create_workflow_script(
|
await app.DATABASE.create_workflow_script(
|
||||||
organization_id=workflow.organization_id,
|
organization_id=workflow.organization_id,
|
||||||
script_id=created.script_id,
|
script_id=created_script.script_id,
|
||||||
workflow_permanent_id=workflow.workflow_permanent_id,
|
workflow_permanent_id=workflow.workflow_permanent_id,
|
||||||
cache_key=cache_key or "",
|
cache_key=cache_key or "",
|
||||||
cache_key_value=rendered_cache_key_value,
|
cache_key_value=rendered_cache_key_value,
|
||||||
|
|||||||
@@ -133,7 +133,7 @@ class ScriptBlock(BaseModel):
|
|||||||
script_id: str
|
script_id: str
|
||||||
script_revision_id: str
|
script_revision_id: str
|
||||||
script_block_label: str
|
script_block_label: str
|
||||||
script_file_id: str
|
script_file_id: str | None = None
|
||||||
created_at: datetime
|
created_at: datetime
|
||||||
modified_at: datetime
|
modified_at: datetime
|
||||||
deleted_at: datetime | None = None
|
deleted_at: datetime | None = None
|
||||||
|
|||||||
Reference in New Issue
Block a user