fix harfile zstd compression in s3 (#4567)
This commit is contained in:
@@ -29,6 +29,9 @@ from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock
|
|||||||
LOG = structlog.get_logger()
|
LOG = structlog.get_logger()
|
||||||
|
|
||||||
|
|
||||||
|
S3_ZSTD_COMPRESSED_SUFFIX = ".zst"
|
||||||
|
|
||||||
|
|
||||||
class S3Storage(BaseStorage):
|
class S3Storage(BaseStorage):
|
||||||
_PATH_VERSION = "v1"
|
_PATH_VERSION = "v1"
|
||||||
|
|
||||||
@@ -38,6 +41,9 @@ class S3Storage(BaseStorage):
|
|||||||
|
|
||||||
def build_uri(self, *, organization_id: str, artifact_id: str, step: Step, artifact_type: ArtifactType) -> str:
|
def build_uri(self, *, organization_id: str, artifact_id: str, step: Step, artifact_type: ArtifactType) -> str:
|
||||||
file_ext = FILE_EXTENTSION_MAP[artifact_type]
|
file_ext = FILE_EXTENTSION_MAP[artifact_type]
|
||||||
|
if artifact_type == ArtifactType.HAR:
|
||||||
|
file_ext = f"{file_ext}{S3_ZSTD_COMPRESSED_SUFFIX}"
|
||||||
|
|
||||||
return f"{self._build_base_uri(organization_id)}/{step.task_id}/{step.order:02d}_{step.retry_index}_{step.step_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}"
|
return f"{self._build_base_uri(organization_id)}/{step.task_id}/{step.order:02d}_{step.retry_index}_{step.step_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}"
|
||||||
|
|
||||||
async def retrieve_global_workflows(self) -> list[str]:
|
async def retrieve_global_workflows(self) -> list[str]:
|
||||||
@@ -106,12 +112,9 @@ class S3Storage(BaseStorage):
|
|||||||
# HARs are easily compressible because they are mostly JSON.
|
# HARs are easily compressible because they are mostly JSON.
|
||||||
# Other artifacts are not compressed because they are not easily compressible.
|
# Other artifacts are not compressed because they are not easily compressible.
|
||||||
uri = artifact.uri
|
uri = artifact.uri
|
||||||
if artifact.artifact_type == ArtifactType.HAR:
|
if uri.endswith(S3_ZSTD_COMPRESSED_SUFFIX):
|
||||||
cctx = zstd.ZstdCompressor(level=3)
|
cctx = zstd.ZstdCompressor(level=3)
|
||||||
data = cctx.compress(data)
|
data = cctx.compress(data)
|
||||||
file_ext = FILE_EXTENTSION_MAP[artifact.artifact_type]
|
|
||||||
uri = uri.replace(f".{file_ext}", f".{file_ext}.zst")
|
|
||||||
artifact.uri = uri
|
|
||||||
|
|
||||||
sc = await self._get_storage_class_for_org(artifact.organization_id, self.bucket, len(data))
|
sc = await self._get_storage_class_for_org(artifact.organization_id, self.bucket, len(data))
|
||||||
tags = await self._get_tags_for_org(artifact.organization_id)
|
tags = await self._get_tags_for_org(artifact.organization_id)
|
||||||
@@ -139,7 +142,7 @@ class S3Storage(BaseStorage):
|
|||||||
async def retrieve_artifact(self, artifact: Artifact) -> bytes | None:
|
async def retrieve_artifact(self, artifact: Artifact) -> bytes | None:
|
||||||
data = await self.async_client.download_file(artifact.uri)
|
data = await self.async_client.download_file(artifact.uri)
|
||||||
# Decompress zstd-compressed files
|
# Decompress zstd-compressed files
|
||||||
if data and artifact.uri.endswith(".zst"):
|
if data and artifact.uri.endswith(S3_ZSTD_COMPRESSED_SUFFIX):
|
||||||
dctx = zstd.ZstdDecompressor()
|
dctx = zstd.ZstdDecompressor()
|
||||||
data = dctx.decompress(data)
|
data = dctx.decompress(data)
|
||||||
return data
|
return data
|
||||||
|
|||||||
@@ -488,15 +488,11 @@ class TestS3StorageHARCompression:
|
|||||||
# Create sample HAR JSON data (easily compressible)
|
# Create sample HAR JSON data (easily compressible)
|
||||||
har_data = b'{"log": {"version": "1.2", "entries": [{"request": {}, "response": {}}]}}'
|
har_data = b'{"log": {"version": "1.2", "entries": [{"request": {}, "response": {}}]}}'
|
||||||
artifact = self._create_har_artifact(s3_storage, TEST_STEP_ID)
|
artifact = self._create_har_artifact(s3_storage, TEST_STEP_ID)
|
||||||
original_uri = artifact.uri
|
assert artifact.uri.endswith(".har.zst")
|
||||||
|
|
||||||
# Store the artifact
|
# Store the artifact
|
||||||
await s3_storage.store_artifact(artifact, har_data)
|
await s3_storage.store_artifact(artifact, har_data)
|
||||||
|
|
||||||
# Verify URI was updated to .har.zst
|
|
||||||
assert artifact.uri.endswith(".har.zst")
|
|
||||||
assert artifact.uri == original_uri.replace(".har", ".har.zst")
|
|
||||||
|
|
||||||
# Verify the stored data is compressed
|
# Verify the stored data is compressed
|
||||||
s3uri = S3Uri(artifact.uri)
|
s3uri = S3Uri(artifact.uri)
|
||||||
obj_response = boto3_test_client.get_object(Bucket=TEST_BUCKET, Key=s3uri.key)
|
obj_response = boto3_test_client.get_object(Bucket=TEST_BUCKET, Key=s3uri.key)
|
||||||
|
|||||||
Reference in New Issue
Block a user