Making file parser flexible to deprecate pdf parser (#3073)
Co-authored-by: Suchintan <suchintan@users.noreply.github.com>
This commit is contained in:
@@ -21,6 +21,7 @@ from typing import Annotated, Any, Awaitable, Callable, Literal, Union
|
||||
from urllib.parse import quote, urlparse
|
||||
|
||||
import filetype
|
||||
import pandas as pd
|
||||
import structlog
|
||||
from email_validator import EmailNotValidError, validate_email
|
||||
from jinja2.sandbox import SandboxedEnvironment
|
||||
@@ -2342,6 +2343,8 @@ class SendEmailBlock(Block):
|
||||
|
||||
class FileType(StrEnum):
|
||||
CSV = "csv"
|
||||
EXCEL = "excel"
|
||||
PDF = "pdf"
|
||||
|
||||
|
||||
class FileParserBlock(Block):
|
||||
@@ -2349,6 +2352,7 @@ class FileParserBlock(Block):
|
||||
|
||||
file_url: str
|
||||
file_type: FileType
|
||||
json_schema: dict[str, Any] | None = None
|
||||
|
||||
def get_all_parameters(
|
||||
self,
|
||||
@@ -2364,6 +2368,18 @@ class FileParserBlock(Block):
|
||||
self.file_url, workflow_run_context
|
||||
)
|
||||
|
||||
def _detect_file_type_from_url(self, file_url: str) -> FileType:
|
||||
"""Detect file type based on file extension in the URL."""
|
||||
url_lower = file_url.lower()
|
||||
if url_lower.endswith((".xlsx", ".xls", ".xlsm")):
|
||||
return FileType.EXCEL
|
||||
elif url_lower.endswith(".pdf"):
|
||||
return FileType.PDF
|
||||
elif url_lower.endswith(".tsv"):
|
||||
return FileType.CSV # TSV files are handled by the CSV parser
|
||||
else:
|
||||
return FileType.CSV # Default to CSV for .csv and any other extensions
|
||||
|
||||
def validate_file_type(self, file_url_used: str, file_path: str) -> None:
|
||||
if self.file_type == FileType.CSV:
|
||||
try:
|
||||
@@ -2371,6 +2387,121 @@ class FileParserBlock(Block):
|
||||
csv.Sniffer().sniff(file.read(1024))
|
||||
except csv.Error as e:
|
||||
raise InvalidFileType(file_url=file_url_used, file_type=self.file_type, error=str(e))
|
||||
elif self.file_type == FileType.EXCEL:
|
||||
try:
|
||||
# Try to read the file with pandas to validate it's a valid Excel file
|
||||
pd.read_excel(file_path, nrows=1, engine="openpyxl")
|
||||
except Exception as e:
|
||||
raise InvalidFileType(
|
||||
file_url=file_url_used, file_type=self.file_type, error=f"Invalid Excel file format: {str(e)}"
|
||||
)
|
||||
elif self.file_type == FileType.PDF:
|
||||
try:
|
||||
# Try to read the file with PyPDF to validate it's a valid PDF file
|
||||
reader = PdfReader(file_path)
|
||||
# Just check if we can access pages, don't read content yet
|
||||
_ = len(reader.pages)
|
||||
except Exception as e:
|
||||
raise InvalidFileType(file_url=file_url_used, file_type=self.file_type, error=str(e))
|
||||
|
||||
async def _parse_csv_file(self, file_path: str) -> list[dict[str, Any]]:
|
||||
"""Parse CSV/TSV file and return list of dictionaries."""
|
||||
parsed_data = []
|
||||
with open(file_path) as file:
|
||||
# Try to detect the delimiter (comma for CSV, tab for TSV)
|
||||
sample = file.read(1024)
|
||||
file.seek(0) # Reset file pointer
|
||||
|
||||
# Use csv.Sniffer to detect the delimiter
|
||||
try:
|
||||
dialect = csv.Sniffer().sniff(sample)
|
||||
delimiter = dialect.delimiter
|
||||
except csv.Error:
|
||||
# Default to comma if detection fails
|
||||
delimiter = ","
|
||||
|
||||
reader = csv.DictReader(file, delimiter=delimiter)
|
||||
for row in reader:
|
||||
parsed_data.append(row)
|
||||
return parsed_data
|
||||
|
||||
def _clean_dataframe_for_json(self, df: pd.DataFrame) -> list[dict[str, Any]]:
|
||||
"""Clean DataFrame to ensure it can be serialized to JSON."""
|
||||
# Replace NaN and NaT values with "nan" string
|
||||
df_cleaned = df.replace({pd.NA: "nan", pd.NaT: "nan"})
|
||||
df_cleaned = df_cleaned.where(pd.notna(df_cleaned), "nan")
|
||||
|
||||
# Convert to list of dictionaries
|
||||
records = df_cleaned.to_dict("records")
|
||||
|
||||
# Additional cleaning for any remaining problematic values
|
||||
for record in records:
|
||||
for key, value in record.items():
|
||||
if pd.isna(value) or value == "NaN" or value == "NaT":
|
||||
record[key] = "nan"
|
||||
elif isinstance(value, (pd.Timestamp, pd.DatetimeTZDtype)):
|
||||
# Convert pandas timestamps to ISO format strings
|
||||
record[key] = value.isoformat() if pd.notna(value) else "nan"
|
||||
|
||||
return records
|
||||
|
||||
async def _parse_excel_file(self, file_path: str) -> list[dict[str, Any]]:
|
||||
"""Parse Excel file and return list of dictionaries."""
|
||||
try:
|
||||
# Read Excel file with pandas, specifying engine explicitly
|
||||
df = pd.read_excel(file_path, engine="openpyxl")
|
||||
# Clean and convert DataFrame to list of dictionaries
|
||||
return self._clean_dataframe_for_json(df)
|
||||
except ImportError as e:
|
||||
raise InvalidFileType(
|
||||
file_url=self.file_url,
|
||||
file_type=self.file_type,
|
||||
error=f"Missing required dependency for Excel parsing: {str(e)}. Please install openpyxl: pip install openpyxl",
|
||||
)
|
||||
except Exception as e:
|
||||
raise InvalidFileType(
|
||||
file_url=self.file_url, file_type=self.file_type, error=f"Failed to parse Excel file: {str(e)}"
|
||||
)
|
||||
|
||||
async def _parse_pdf_file(self, file_path: str) -> str:
|
||||
"""Parse PDF file and return extracted text."""
|
||||
try:
|
||||
reader = PdfReader(file_path)
|
||||
extracted_text = ""
|
||||
page_count = len(reader.pages)
|
||||
for i in range(page_count):
|
||||
extracted_text += reader.pages[i].extract_text() + "\n"
|
||||
return extracted_text
|
||||
except PdfReadError as e:
|
||||
raise InvalidFileType(file_url=self.file_url, file_type=self.file_type, error=str(e))
|
||||
|
||||
async def _extract_with_ai(
|
||||
self, content: str | list[dict[str, Any]], workflow_run_context: WorkflowRunContext
|
||||
) -> dict[str, Any]:
|
||||
"""Extract structured data using AI based on json_schema."""
|
||||
# Use local variable to avoid mutating the instance
|
||||
schema_to_use = self.json_schema or {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"output": {
|
||||
"type": "object",
|
||||
"description": "Information extracted from the file",
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
# Convert content to string for AI processing
|
||||
if isinstance(content, list):
|
||||
# For CSV/Excel data, convert to a readable format
|
||||
content_str = json.dumps(content, indent=2)
|
||||
else:
|
||||
content_str = content
|
||||
|
||||
llm_prompt = prompt_engine.load_prompt(
|
||||
"extract-information-from-file-text", extracted_text_content=content_str, json_schema=schema_to_use
|
||||
)
|
||||
llm_response = await app.LLM_API_HANDLER(prompt=llm_prompt, prompt_name="extract-information-from-file-text")
|
||||
return llm_response
|
||||
|
||||
async def execute(
|
||||
self,
|
||||
@@ -2381,6 +2512,7 @@ class FileParserBlock(Block):
|
||||
**kwargs: dict,
|
||||
) -> BlockResult:
|
||||
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
|
||||
|
||||
if (
|
||||
self.file_url
|
||||
and workflow_run_context.has_parameter(self.file_url)
|
||||
@@ -2412,21 +2544,71 @@ class FileParserBlock(Block):
|
||||
file_path = await download_from_s3(self.get_async_aws_client(), self.file_url)
|
||||
else:
|
||||
file_path = await download_file(self.file_url)
|
||||
|
||||
# Auto-detect file type based on file extension
|
||||
detected_file_type = self._detect_file_type_from_url(self.file_url)
|
||||
self.file_type = detected_file_type
|
||||
|
||||
# Validate the file type
|
||||
self.validate_file_type(self.file_url, file_path)
|
||||
# Parse the file into a list of dictionaries where each dictionary represents a row in the file
|
||||
parsed_data = []
|
||||
with open(file_path) as file:
|
||||
if self.file_type == FileType.CSV:
|
||||
reader = csv.DictReader(file)
|
||||
for row in reader:
|
||||
parsed_data.append(row)
|
||||
|
||||
LOG.debug(
|
||||
"FileParserBlock: After file type validation",
|
||||
file_type=self.file_type,
|
||||
json_schema_present=self.json_schema is not None,
|
||||
json_schema_type=type(self.json_schema),
|
||||
)
|
||||
|
||||
# Parse the file based on type
|
||||
parsed_data: str | list[dict[str, Any]]
|
||||
if self.file_type == FileType.CSV:
|
||||
parsed_data = await self._parse_csv_file(file_path)
|
||||
elif self.file_type == FileType.EXCEL:
|
||||
parsed_data = await self._parse_excel_file(file_path)
|
||||
elif self.file_type == FileType.PDF:
|
||||
parsed_data = await self._parse_pdf_file(file_path)
|
||||
else:
|
||||
return await self.build_block_result(
|
||||
success=False,
|
||||
failure_reason=f"Unsupported file type: {self.file_type}",
|
||||
output_parameter_value=None,
|
||||
status=BlockStatus.failed,
|
||||
workflow_run_block_id=workflow_run_block_id,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
|
||||
# If json_schema is provided, use AI to extract structured data
|
||||
final_data: str | list[dict[str, Any]] | dict[str, Any]
|
||||
LOG.debug(
|
||||
"FileParserBlock: JSON schema check",
|
||||
has_json_schema=self.json_schema is not None,
|
||||
json_schema_type=type(self.json_schema),
|
||||
json_schema=self.json_schema,
|
||||
)
|
||||
|
||||
if self.json_schema:
|
||||
try:
|
||||
ai_extracted_data = await self._extract_with_ai(parsed_data, workflow_run_context)
|
||||
final_data = ai_extracted_data
|
||||
except Exception as e:
|
||||
return await self.build_block_result(
|
||||
success=False,
|
||||
failure_reason=f"Failed to extract data with AI: {str(e)}",
|
||||
output_parameter_value=None,
|
||||
status=BlockStatus.failed,
|
||||
workflow_run_block_id=workflow_run_block_id,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
else:
|
||||
# Return raw parsed data
|
||||
final_data = parsed_data
|
||||
|
||||
# Record the parsed data
|
||||
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, parsed_data)
|
||||
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, final_data)
|
||||
return await self.build_block_result(
|
||||
success=True,
|
||||
failure_reason=None,
|
||||
output_parameter_value=parsed_data,
|
||||
output_parameter_value=final_data,
|
||||
status=BlockStatus.completed,
|
||||
workflow_run_block_id=workflow_run_block_id,
|
||||
organization_id=organization_id,
|
||||
@@ -2434,6 +2616,11 @@ class FileParserBlock(Block):
|
||||
|
||||
|
||||
class PDFParserBlock(Block):
|
||||
"""
|
||||
DEPRECATED: Use FileParserBlock with file_type=FileType.PDF instead.
|
||||
This block will be removed in a future version.
|
||||
"""
|
||||
|
||||
block_type: Literal[BlockType.PDF_PARSER] = BlockType.PDF_PARSER
|
||||
|
||||
file_url: str
|
||||
|
||||
@@ -244,6 +244,7 @@ class FileParserBlockYAML(BlockYAML):
|
||||
|
||||
file_url: str
|
||||
file_type: FileType
|
||||
json_schema: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class PDFParserBlockYAML(BlockYAML):
|
||||
|
||||
@@ -1926,6 +1926,7 @@ class WorkflowService:
|
||||
output_parameter=output_parameter,
|
||||
file_url=block_yaml.file_url,
|
||||
file_type=block_yaml.file_type,
|
||||
json_schema=block_yaml.json_schema,
|
||||
continue_on_failure=block_yaml.continue_on_failure,
|
||||
)
|
||||
elif block_yaml.block_type == BlockType.PDF_PARSER:
|
||||
|
||||
Reference in New Issue
Block a user