add & use _build_base_uri to avoid repetition (#2676)

This commit is contained in:
Asher Foa
2025-06-11 16:56:13 -04:00
committed by GitHub
parent 3100ff9543
commit 529a859d92

View File

@@ -37,7 +37,7 @@ class S3Storage(BaseStorage):
def build_uri(self, *, organization_id: str, artifact_id: str, step: Step, artifact_type: ArtifactType) -> str:
file_ext = FILE_EXTENTSION_MAP[artifact_type]
return f"s3://{self.bucket}/{self._PATH_VERSION}/{settings.ENV}/{organization_id}/{step.task_id}/{step.order:02d}_{step.retry_index}_{step.step_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}"
return f"{self._build_base_uri(organization_id)}/{step.task_id}/{step.order:02d}_{step.retry_index}_{step.step_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}"
async def retrieve_global_workflows(self) -> list[str]:
uri = f"s3://{self.bucket}/{settings.ENV}/global_workflows.txt"
@@ -46,23 +46,26 @@ class S3Storage(BaseStorage):
return []
return [line.strip() for line in data.decode("utf-8").split("\n") if line.strip()]
def _build_base_uri(self, organization_id: str) -> str:
return f"s3://{self.bucket}/{self._PATH_VERSION}/{settings.ENV}/{organization_id}"
def build_log_uri(
self, *, organization_id: str, log_entity_type: LogEntityType, log_entity_id: str, artifact_type: ArtifactType
) -> str:
file_ext = FILE_EXTENTSION_MAP[artifact_type]
return f"s3://{self.bucket}/{self._PATH_VERSION}/{settings.ENV}/{organization_id}/logs/{log_entity_type}/{log_entity_id}/{datetime.utcnow().isoformat()}_{artifact_type}.{file_ext}"
return f"{self._build_base_uri(organization_id)}/logs/{log_entity_type}/{log_entity_id}/{datetime.utcnow().isoformat()}_{artifact_type}.{file_ext}"
def build_thought_uri(
self, *, organization_id: str, artifact_id: str, thought: Thought, artifact_type: ArtifactType
) -> str:
file_ext = FILE_EXTENTSION_MAP[artifact_type]
return f"s3://{self.bucket}/{self._PATH_VERSION}/{settings.ENV}/{organization_id}/observers/{thought.observer_cruise_id}/{thought.observer_thought_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}"
return f"{self._build_base_uri(organization_id)}/observers/{thought.observer_cruise_id}/{thought.observer_thought_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}"
def build_task_v2_uri(
self, *, organization_id: str, artifact_id: str, task_v2: TaskV2, artifact_type: ArtifactType
) -> str:
file_ext = FILE_EXTENTSION_MAP[artifact_type]
return f"s3://{self.bucket}/{self._PATH_VERSION}/{settings.ENV}/{organization_id}/observers/{task_v2.observer_cruise_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}"
return f"{self._build_base_uri(organization_id)}/observers/{task_v2.observer_cruise_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}"
def build_workflow_run_block_uri(
self,
@@ -73,13 +76,13 @@ class S3Storage(BaseStorage):
artifact_type: ArtifactType,
) -> str:
file_ext = FILE_EXTENTSION_MAP[artifact_type]
return f"s3://{self.bucket}/{self._PATH_VERSION}/{settings.ENV}/{organization_id}/workflow_runs/{workflow_run_block.workflow_run_id}/{workflow_run_block.workflow_run_block_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}"
return f"{self._build_base_uri(organization_id)}/workflow_runs/{workflow_run_block.workflow_run_id}/{workflow_run_block.workflow_run_block_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}"
def build_ai_suggestion_uri(
self, *, organization_id: str, artifact_id: str, ai_suggestion: AISuggestion, artifact_type: ArtifactType
) -> str:
file_ext = FILE_EXTENTSION_MAP[artifact_type]
return f"s3://{self.bucket}/{self._PATH_VERSION}/{settings.ENV}/{organization_id}/ai_suggestions/{ai_suggestion.ai_suggestion_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}"
return f"{self._build_base_uri(organization_id)}/ai_suggestions/{ai_suggestion.ai_suggestion_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}"
async def store_artifact(self, artifact: Artifact, data: bytes) -> None:
sc = await self._get_storage_class_for_org(artifact.organization_id)