migrate to wait_for_upload_aiotasks for aiotasks cleanup (#1353)
This commit is contained in:
@@ -1536,7 +1536,7 @@ class ForgeAgent:
|
|||||||
await self.cleanup_browser_and_create_artifacts(close_browser_on_completion, last_step, task)
|
await self.cleanup_browser_and_create_artifacts(close_browser_on_completion, last_step, task)
|
||||||
|
|
||||||
# Wait for all tasks to complete before generating the links for the artifacts
|
# Wait for all tasks to complete before generating the links for the artifacts
|
||||||
await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks_for_task(task.task_id)
|
await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks([task.task_id])
|
||||||
|
|
||||||
if need_call_webhook:
|
if need_call_webhook:
|
||||||
await self.execute_task_webhook(task=task, last_step=last_step, api_key=api_key)
|
await self.execute_task_webhook(task=task, last_step=last_step, api_key=api_key)
|
||||||
|
|||||||
@@ -135,48 +135,28 @@ class ArtifactManager:
|
|||||||
async def get_share_links(self, artifacts: list[Artifact]) -> list[str] | None:
|
async def get_share_links(self, artifacts: list[Artifact]) -> list[str] | None:
|
||||||
return await app.STORAGE.get_share_links(artifacts)
|
return await app.STORAGE.get_share_links(artifacts)
|
||||||
|
|
||||||
async def wait_for_upload_aiotasks_for_task(self, task_id: str) -> None:
|
async def wait_for_upload_aiotasks(self, primary_keys: list[str]) -> None:
|
||||||
try:
|
|
||||||
st = time.time()
|
|
||||||
async with asyncio.timeout(30):
|
|
||||||
await asyncio.gather(
|
|
||||||
*[aio_task for aio_task in self.upload_aiotasks_map[task_id] if not aio_task.done()]
|
|
||||||
)
|
|
||||||
LOG.info(
|
|
||||||
f"S3 upload tasks for task_id={task_id} completed in {time.time() - st:.2f}s",
|
|
||||||
task_id=task_id,
|
|
||||||
duration=time.time() - st,
|
|
||||||
)
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
LOG.error(
|
|
||||||
f"Timeout (30s) while waiting for upload tasks for task_id={task_id}",
|
|
||||||
task_id=task_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
del self.upload_aiotasks_map[task_id]
|
|
||||||
|
|
||||||
async def wait_for_upload_aiotasks_for_tasks(self, task_ids: list[str]) -> None:
|
|
||||||
try:
|
try:
|
||||||
st = time.time()
|
st = time.time()
|
||||||
async with asyncio.timeout(30):
|
async with asyncio.timeout(30):
|
||||||
await asyncio.gather(
|
await asyncio.gather(
|
||||||
*[
|
*[
|
||||||
aio_task
|
aio_task
|
||||||
for task_id in task_ids
|
for primary_key in primary_keys
|
||||||
for aio_task in self.upload_aiotasks_map[task_id]
|
for aio_task in self.upload_aiotasks_map[primary_key]
|
||||||
if not aio_task.done()
|
if not aio_task.done()
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
LOG.info(
|
LOG.info(
|
||||||
f"S3 upload tasks for task_ids={task_ids} completed in {time.time() - st:.2f}s",
|
f"S3 upload aio tasks for primary_keys={primary_keys} completed in {time.time() - st:.2f}s",
|
||||||
task_ids=task_ids,
|
primary_keys=primary_keys,
|
||||||
duration=time.time() - st,
|
duration=time.time() - st,
|
||||||
)
|
)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
LOG.error(
|
LOG.error(
|
||||||
f"Timeout (30s) while waiting for upload tasks for task_ids={task_ids}",
|
f"Timeout (30s) while waiting for upload aio tasks for primary_keys={primary_keys}",
|
||||||
task_ids=task_ids,
|
primary_keys=primary_keys,
|
||||||
)
|
)
|
||||||
|
|
||||||
for task_id in task_ids:
|
for primary_key in primary_keys:
|
||||||
del self.upload_aiotasks_map[task_id]
|
del self.upload_aiotasks_map[primary_key]
|
||||||
|
|||||||
@@ -859,7 +859,7 @@ class WorkflowService:
|
|||||||
)
|
)
|
||||||
LOG.info("Persisted browser session for workflow run", workflow_run_id=workflow_run.workflow_run_id)
|
LOG.info("Persisted browser session for workflow run", workflow_run_id=workflow_run.workflow_run_id)
|
||||||
|
|
||||||
await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks_for_tasks(all_workflow_task_ids)
|
await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks(all_workflow_task_ids)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT):
|
async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT):
|
||||||
|
|||||||
Reference in New Issue
Block a user