Upload downloaded files to S3 after every block so they can be used for subsequent blocks (for real) (#1804)
This commit is contained in:
@@ -309,8 +309,6 @@ class ForgeAgent:
|
|||||||
status=StepStatus.canceled,
|
status=StepStatus.canceled,
|
||||||
is_last=True,
|
is_last=True,
|
||||||
)
|
)
|
||||||
# We don't send task response for now as the task is canceled
|
|
||||||
# TODO: shall we send task response here?
|
|
||||||
await self.clean_up_task(
|
await self.clean_up_task(
|
||||||
task=task,
|
task=task,
|
||||||
last_step=step,
|
last_step=step,
|
||||||
@@ -1687,7 +1685,7 @@ class ForgeAgent:
|
|||||||
try:
|
try:
|
||||||
async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT):
|
async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT):
|
||||||
await app.STORAGE.save_downloaded_files(
|
await app.STORAGE.save_downloaded_files(
|
||||||
task.organization_id, task_id=task.task_id, workflow_run_id=None
|
task.organization_id, task_id=task.task_id, workflow_run_id=task.workflow_run_id
|
||||||
)
|
)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
LOG.warning(
|
LOG.warning(
|
||||||
|
|||||||
@@ -640,7 +640,7 @@ class BaseTaskBlock(Block):
|
|||||||
downloaded_file_urls = await app.STORAGE.get_downloaded_files(
|
downloaded_file_urls = await app.STORAGE.get_downloaded_files(
|
||||||
organization_id=workflow_run.organization_id,
|
organization_id=workflow_run.organization_id,
|
||||||
task_id=updated_task.task_id,
|
task_id=updated_task.task_id,
|
||||||
workflow_run_id=None,
|
workflow_run_id=workflow_run_id,
|
||||||
)
|
)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
LOG.warning("Timeout getting downloaded files", task_id=updated_task.task_id)
|
LOG.warning("Timeout getting downloaded files", task_id=updated_task.task_id)
|
||||||
@@ -699,8 +699,9 @@ class BaseTaskBlock(Block):
|
|||||||
downloaded_file_urls = await app.STORAGE.get_downloaded_files(
|
downloaded_file_urls = await app.STORAGE.get_downloaded_files(
|
||||||
organization_id=workflow_run.organization_id,
|
organization_id=workflow_run.organization_id,
|
||||||
task_id=updated_task.task_id,
|
task_id=updated_task.task_id,
|
||||||
workflow_run_id=None,
|
workflow_run_id=workflow_run_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
LOG.warning("Timeout getting downloaded files", task_id=updated_task.task_id)
|
LOG.warning("Timeout getting downloaded files", task_id=updated_task.task_id)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user