#!/usr/bin/env python3 import os import uuid import json import requests import logging import torch import gc from magic_pdf.data.dataset import PymuDocDataset from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze from magic_pdf.data.io.s3 import S3Writer from magic_pdf.data.data_reader_writer.base import DataWriter from inference_svm_model import SVMModel import concurrent.futures import boto3 from io import BytesIO logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", handlers=[ logging.StreamHandler(), # This will output to console logging.FileHandler('mineru.log') # This will save to a file ] ) logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # Ensure logger level is set to INFO class Processor: def __init__(self): try: self.s3_writer = s3Writer( ak=os.getenv("S3_ACCESS_KEY"), sk=os.getenv("S3_SECRET_KEY"), bucket=os.getenv("S3_BUCKET_NAME"), endpoint_url=os.getenv("S3_ENDPOINT"), ) # self.svm_model = SVMModel() # logger.info("Classification model initialized successfully") with open("/home/user/magic-pdf.json", "r") as f: config = json.load(f) # self.layout_mode = "doclayout_yolo" self.layout_mode = config["layout-config"]["model"] self.formula_enable = config["formula-config"]["enable"] self.table_enable = False self.language = "en" endpoint = os.getenv("S3_ENDPOINT", "").rstrip("/") bucket = os.getenv("S3_BUCKET_NAME", "") self.prefix = "document-extracts/" logger.info("Processor initialized successfully") except Exception as e: logger.error("Failed to initialize Processor: %s", str(e)) raise def cleanup_gpu(self): """ Releases GPU memory, use garbage collection to clear PyTorch's CUDA cache. This helps prevent VRAM accumulation. """ try: gc.collect() #garbage collection torch.cuda.empty_cache() # Clear memory cache on GPU logger.info("GPU memory cleaned up.") except Exception as e: logger.error("Error during GPU cleanup: %s", e) def process(self, file_url: str, key: str) -> str: """ Process a single PDF, returning final Markdown with irrelevant images removed. """ logger.info("Processing file: %s", file_url) try: response = requests.get(file_url) if response.status_code != 200: logger.error("Failed to download PDF from %s. Status code: %d", file_url, response.status_code) raise Exception(f"Failed to download PDF: {file_url}") pdf_bytes = response.content logger.info("Downloaded %d bytes for file_url='%s'", len(pdf_bytes), file_url) # Analyze PDF with OCR dataset = PymuDocDataset(pdf_bytes) inference = doc_analyze( dataset, ocr=True, lang=self.language, layout_model=self.layout_mode, formula_enable=self.formula_enable, table_enable=self.table_enable ) logger.info("doc_analyze complete for key='%s'. Started extracting images...", key) # Classify images and remove irrelevant ones # image_writer = ImageWriter(self.s3_writer) image_writer = ImageWriter(self.s3_writer, f"{self.prefix}{key}/") # Pass base path to ImageWriter pipe_result = inference.pipe_ocr_mode(image_writer, lang=self.language) logger.info("OCR pipeline completed for key='%s'.", key) md_content = pipe_result.get_markdown(f"{self.prefix}{key}/") final_markdown = image_writer.post_process(f"{self.prefix}{key}/",md_content) logger.info("Completed PDF process for key='%s'. Final MD length=%d", key, len(final_markdown)) return final_markdown finally: # GPU memory is cleaned up after each processing. self.cleanup_gpu() class s3Writer: def __init__(self, ak: str, sk: str, bucket: str, endpoint_url: str): self.bucket = bucket self.client = boto3.client('s3', aws_access_key_id=ak, aws_secret_access_key=sk, endpoint_url=endpoint_url ) def write(self, path: str, data: bytes) -> None: """Upload data to S3 using proper keyword arguments""" try: # Convert bytes to file-like object file_obj = BytesIO(data) # Upload using upload_fileobj self.client.upload_fileobj( file_obj, self.bucket, path ) except Exception as e: logger.error(f"Failed to upload to S3: {str(e)}") raise class ImageWriter(DataWriter): """ Receives each extracted image. Classifies it, uploads if relevant, or flags it for removal if irrelevant. """ def __init__(self, s3_writer: s3Writer, base_path: str): self.s3_writer = s3_writer self.base_path = base_path # self.svm_model = svm_model self._redundant_images_paths = [] self.descriptions = {} """ { "{path}": { "description": "{description}", "full_path": "{full_path}" } } """ def write(self, path: str, data: bytes) -> None: """ Called for each extracted image. If relevant, upload to S3; otherwise mark for removal. """ full_path = f"{self.base_path}" + path.split("/")[-1] self.s3_writer.write(full_path, data) self.descriptions[path] = { "data": data, "full_path": full_path } def post_process(self, key: str, md_content: str) -> str: max_workers = len(self.descriptions) with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_file = { executor.submit( call_gemini_for_image_description, self.descriptions[path]['data'] ): path for path in self.descriptions.keys() } for future in concurrent.futures.as_completed(future_to_file): path = future_to_file[future] try: description = future.result() if description: self.descriptions[path]['description'] = description except Exception as e: logger.error(f"[ERROR] Processing {path}: {str(e)}") for path, info in self.descriptions.items(): description = info['description'] full_path = info['full_path'] md_content = md_content.replace(f"![]({key}{path})", f"![{description}]({full_path})") return md_content def call_gemini_for_image_description(image_data: bytes) -> str: """Convert image bytes to Gemini-compatible format and get description""" from google import genai import base64 try: # Initialize Gemini client client = genai.Client(api_key="AIzaSyDtoakpXa2pjJwcQB6TJ5QaXHNSA5JxcrU") # Generate content with proper image format response = client.models.generate_content( model="gemini-2.0-flash", contents=[ { "parts": [ {"text": """The provided image is a part of a question paper or markscheme. Extract all the necessary information from the image to be able to identify the question. For example, if there is an image that contains text like: "Q1 Part A Answer: Life on earth was created by diety..." you should return "Q1 Part A Answer" If there is no text on this image, return the description of the image. 20 words max."""}, { "inline_data": { "mime_type": "image/jpeg", "data": base64.b64encode(image_data).decode('utf-8') } } ] } ] ) # Get the response text description = response.text.strip() if response and response.text else "Image description unavailable" return description except Exception as e: logger.error(f"Error getting image description: {str(e)}") return ("error", "Error describing image", None) if __name__ == "__main__": processor = Processor() single_url = "https://quextro-resources.s3.eu-west-2.amazonaws.com/1739967958667-643657-mark-scheme-computer-principles.pdf?response-content-disposition=inline&X-Amz-Content-Sha256=UNSIGNED-PAYLOAD&X-Amz-Security-Token=IQoJb3JpZ2luX2VjEJT%2F%2F%2F%2F%2F%2F%2F%2F%2F%2FwEaCWV1LXdlc3QtMiJGMEQCIARfSyuot0h2RNrcqVQkc2T%2B1fJZ64NfjmkmAFgCkTG6AiArmbJDAUr7T85HdqAT2RbyLhmiIgpSo3ci4%2FUtSap2wCrUAwi8%2F%2F%2F%2F%2F%2F%2F%2F%2F%2F8BEAAaDDUwOTM5OTYxODAzMCIMkfFm%2FgBrHsH1qh59KqgDjfZd1%2BKGzxkn7JorfQ07dL%2BL5fjCA6kmNAzCnCjDpTLnNjBfB1vnO2ZLvtC8RNvnaewY6tFWUfl39dC62ldnfajHeFmxkZqBcbDf3oOGnuO2PIvBgb5%2BvppVDkYjWz7vv5TzpgC2sVzjA38QMwxAnausYWDgspap7qjlfoLJUiBOq9SIMZyKVsfeAf4OiUl0TDc2nheqvNXOJy9TPh94KWbBT35vP3fU9A7ZdF4sElm4nVZMnOPdbR7%2Ba6F57nPLZvUaLZC5Nb011ef6%2BhAxr9yeONh5MAoTGUH2qzedDmN%2FbKannddBy%2FNIaP%2BhF7lWUkKemQrM5vajwU6k2Q45pLruKWRkjtrWxdmkQE4zb67ETj5eGL%2BlPPj%2BPtQWzF7UaoWPUH4tGBZ%2Bqdu479rU1ZSg%2B15lR%2F8SAgP%2BydATGwyRtXEvMRJZIiUems8i6ehxWC%2FscY2%2FtCk9OREKhLwOEEdJDAR4vqt68lnnvVomHrVjwNQvyP9A4V8Ct%2B0SjxP%2F86kJnX3o%2FVEoFT44JWICuMuf8kwoelUbZGPl6SaftGsRSUvoy7PV5TCN3du9BjrlAjKhLpjsCwgp1rJ8cPBFcUgOmL3iXrtHs3FhDLljxbXRZ%2FadHkxAlzf%2BXym%2BFBnhdCkDfmWcMEH3GAOFfv%2FlE5SsZMO1JoXbzQlO3OX6nrUacj7LF7ZoO8TYMVoTyEZSLEABNOU7KCILaFeDGRDJ8Ia5I3jnXvOVouFn2VnhykCuWPTunjkMEQBiHa3mbZP0mVcSviujHXatN11INiR%2BPwAN5oxKXeT25B%2FCCI3wib5Av2tzp8zuw8joib5PWNXOYfRgMR7R0Sj%2FjW5SxWr%2BTD9TAD3%2Fqj5pj3Oo13dNGdv5RwGqk1iHd8okpkFYlxEmXD2tTanpxX8ON1%2FLHz%2BNEUJDOogx8TLw5I6mkVs3zjoMhhwn2%2BWrlnNa%2F3i9lAGyLY6Ps4U23Hv7b4gpH4%2BeJN72Z95hrNtcumq4uuf0pRoJPQ9pjiZttjeDwNZzb7d3XuiEQeOgK8rpTeEgduxhdJOOLwZGrg%3D%3D&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=ASIAXNGUVKHXFLYKHBHD%2F20250220%2Feu-west-2%2Fs3%2Faws4_request&X-Amz-Date=20250220T111935Z&X-Amz-Expires=10800&X-Amz-SignedHeaders=host&X-Amz-Signature=64aa008fdafe72f1a693078156451c0f6f702e89e546954d6b3d61abf9f73ec8" markdown_result = processor.process(single_url, key="1234323") print("Single file Markdown:\n", markdown_result)