import os import re import json import logging import zipfile import asyncio import tempfile from typing import Dict, List, Optional, Any, Tuple from dataclasses import dataclass, field from pathlib import Path from datetime import datetime import gradio as gr from enum import Enum import hashlib import urllib.parse # Importar smolagents from smolagents import CodeAgent, ToolCallingAgent, LiteLLMModel from smolagents.tools import Tool, tool from pydantic import BaseModel, Field # Configuración de logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('bibliography_system.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # ========== MODELOS DE DATOS ========== class ResourceType(str, Enum): DOI = "doi" ISBN = "isbn" ARXIV = "arxiv" URL = "url" PMID = "pmid" BIBTEX = "bibtex" CITATION = "citation" UNKNOWN = "unknown" class CitationModel(BaseModel): id: str raw_text: str resource_type: ResourceType identifier: str metadata: Dict[str, Any] = Field(default_factory=dict) confidence: float = 0.0 extracted_from: str position: Tuple[int, int] = (0, 0) class VerificationResult(BaseModel): citation: CitationModel verified: bool verification_source: str download_url: Optional[str] file_format: Optional[str] file_size: Optional[int] quality_score: float notes: List[str] = Field(default_factory=list) class ProcessingReport(BaseModel): input_file: str total_citations: int verified_resources: List[VerificationResult] downloaded_files: List[str] failed_verifications: List[CitationModel] processing_time: float summary: Dict[str, Any] = Field(default_factory=dict) timestamp: str = Field(default_factory=lambda: datetime.now().isoformat()) # ========== HERRAMIENTAS PARA AGENTES ========== class BibliographyExtractionTool(Tool): name = "extract_bibliography" description = """ Extract bibliographic references from text. Identifies DOIs, ISBNs, arXiv IDs, URLs, and other academic identifiers from unstructured text. Args: text (str): The text to analyze source_name (str): Name of the source document Returns: List[CitationModel]: List of extracted citations """ def __init__(self): super().__init__() # Patrones para diferentes tipos de recursos self.patterns = { ResourceType.DOI: [ r'\b10\.\d{4,9}/[-._;()/:A-Z0-9]+\b', r'doi:\s*(10\.\d{4,9}/[-._;()/:A-Z0-9]+)', r'DOI:\s*(10\.\d{4,9}/[-._;()/:A-Z0-9]+)' ], ResourceType.ISBN: [ r'ISBN(?:-1[03])?:?\s*(?=[0-9X]{10}|(?=(?:[0-9]+[- ]){3})[- 0-9X]{13}|97[89][0-9]{10}|(?=(?:[0-9]+[- ]){4})[- 0-9]{17})(?:97[89][- ]?)?[0-9]{1,5}[- ]?[0-9]+[- ]?[0-9]+[- ]?[0-9X]' ], ResourceType.ARXIV: [ r'arXiv:\s*(\d{4}\.\d{4,5}(v\d+)?)', r'arxiv:\s*([a-z\-]+/\d{7})' ], ResourceType.PMID: [ r'PMID:\s*(\d+)', r'PubMed ID:\s*(\d+)' ] } def forward(self, text: str, source_name: str = "unknown") -> List[Dict[str, Any]]: """Extract citations from text""" citations = [] text_lower = text.lower() # Buscar por tipo de recurso for resource_type, patterns in self.patterns.items(): for pattern in patterns: matches = re.finditer(pattern, text, re.IGNORECASE) for match in matches: identifier = match.group(1) if match.groups() else match.group(0) # Limpiar identificador identifier = self._clean_identifier(identifier, resource_type) if identifier: # Calcular confianza basada en el contexto confidence = self._calculate_confidence( identifier, resource_type, text_lower, match.start() ) citation = CitationModel( id=hashlib.md5( f"{identifier}_{source_name}".encode() ).hexdigest()[:12], raw_text=match.group(0), resource_type=resource_type, identifier=identifier, metadata={ "found_at": match.start(), "context": self._get_context(text, match.start(), match.end()) }, confidence=confidence, extracted_from=source_name, position=(match.start(), match.end()) ) citations.append(citation.dict()) # Extraer URLs generales (solo si parecen académicas) url_pattern = r'https?://[^\s<>"]+|www\.[^\s<>"]+' url_matches = re.finditer(url_pattern, text) for match in url_matches: url = match.group(0) if self._is_academic_url(url): citation = CitationModel( id=hashlib.md5(f"{url}_{source_name}".encode()).hexdigest()[:12], raw_text=url, resource_type=ResourceType.URL, identifier=url, metadata={ "found_at": match.start(), "context": self._get_context(text, match.start(), match.end()) }, confidence=0.6, extracted_from=source_name, position=(match.start(), match.end()) ) citations.append(citation.dict()) return citations def _clean_identifier(self, identifier: str, resource_type: ResourceType) -> str: """Clean identifier""" identifier = identifier.strip() # Eliminar prefijos prefixes = ['doi:', 'DOI:', 'arxiv:', 'arXiv:', 'isbn:', 'ISBN:', 'pmid:', 'PMID:'] for prefix in prefixes: if identifier.startswith(prefix): identifier = identifier[len(prefix):].strip() # Limpiar caracteres no deseados identifier = identifier.strip('"\'<>()[]{}') return identifier def _calculate_confidence(self, identifier: str, resource_type: ResourceType, text: str, position: int) -> float: """Calculate confidence score for extracted citation""" confidence = 0.7 # Base confidence # Verificar formato DOI if resource_type == ResourceType.DOI: if re.match(r'^10\.\d{4,9}/.+', identifier): confidence += 0.2 # Verificar contexto context_words = ['paper', 'article', 'journal', 'conference', 'published', 'reference', 'bibliography', 'cite', 'doi', 'url'] context = text[max(0, position-100):min(len(text), position+100)] for word in context_words: if word in context.lower(): confidence += 0.05 return min(confidence, 1.0) def _is_academic_url(self, url: str) -> bool: """Check if URL looks academic""" academic_domains = [ 'arxiv.org', 'doi.org', 'springer.com', 'ieee.org', 'acm.org', 'sciencedirect.com', 'wiley.com', 'tandfonline.com', 'nature.com', 'science.org', 'pnas.org', 'plos.org', 'bmc.com', 'frontiersin.org', 'mdpi.com', 'researchgate.net', 'semanticscholar.org' ] url_lower = url.lower() return any(domain in url_lower for domain in academic_domains) def _get_context(self, text: str, start: int, end: int, window: int = 50) -> str: """Get context around match""" context_start = max(0, start - window) context_end = min(len(text), end + window) return text[context_start:context_end] class ResourceVerificationTool(Tool): name = "verify_resource" description = """ Verify the existence and accessibility of academic resources. Args: citation (Dict[str, Any]): Citation to verify timeout (int): Timeout in seconds Returns: VerificationResult: Verification result with metadata """ def __init__(self): super().__init__() self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } def forward(self, citation: Dict[str, Any], timeout: int = 10) -> Dict[str, Any]: """Verify a citation""" citation_obj = CitationModel(**citation) # Preparar resultado result = { "citation": citation_obj.dict(), "verified": False, "verification_source": "none", "download_url": None, "file_format": None, "file_size": None, "quality_score": 0.0, "notes": [] } try: if citation_obj.resource_type == ResourceType.DOI: return self._verify_doi(citation_obj, timeout) elif citation_obj.resource_type == ResourceType.ARXIV: return self._verify_arxiv(citation_obj, timeout) elif citation_obj.resource_type == ResourceType.URL: return self._verify_url(citation_obj, timeout) elif citation_obj.resource_type == ResourceType.ISBN: return self._verify_isbn(citation_obj, timeout) elif citation_obj.resource_type == ResourceType.PMID: return self._verify_pmid(citation_obj, timeout) else: result["notes"].append(f"Unsupported resource type: {citation_obj.resource_type}") except Exception as e: result["notes"].append(f"Verification error: {str(e)}") return result def _verify_doi(self, citation: CitationModel, timeout: int) -> Dict[str, Any]: """Verify DOI""" import requests result = { "citation": citation.dict(), "verified": False, "verification_source": "crossref", "download_url": None, "file_format": None, "file_size": None, "quality_score": 0.0, "notes": [] } try: # Try Crossref API url = f"https://api.crossref.org/works/{citation.identifier}" response = requests.get(url, headers=self.headers, timeout=timeout) if response.status_code == 200: data = response.json() work = data.get('message', {}) result["verified"] = True result["quality_score"] = 0.9 # Check for open access if work.get('license'): result["notes"].append("Open access available") result["quality_score"] += 0.1 # Try to find PDF URL links = work.get('link', []) for link in links: if link.get('content-type') == 'application/pdf': result["download_url"] = link.get('URL') result["file_format"] = "pdf" break # Try Unpaywall if not result["download_url"]: unpaywall_url = f"https://api.unpaywall.org/v2/{citation.identifier}?email=user@example.com" unpaywall_response = requests.get(unpaywall_url, timeout=timeout) if unpaywall_response.status_code == 200: unpaywall_data = unpaywall_response.json() if unpaywall_data.get('is_oa'): result["download_url"] = unpaywall_data.get('best_oa_location', {}).get('url') result["verification_source"] = "unpaywall" else: result["notes"].append(f"Crossref API returned {response.status_code}") except Exception as e: result["notes"].append(f"DOI verification error: {str(e)}") return result def _verify_arxiv(self, citation: CitationModel, timeout: int) -> Dict[str, Any]: """Verify arXiv ID""" import requests result = { "citation": citation.dict(), "verified": False, "verification_source": "arxiv", "download_url": None, "file_format": None, "file_size": None, "quality_score": 0.0, "notes": [] } try: # Clean arXiv ID arxiv_id = citation.identifier if 'arxiv:' in arxiv_id.lower(): arxiv_id = arxiv_id.split(':')[-1].strip() # Check arXiv API api_url = f"http://export.arxiv.org/api/query?id_list={arxiv_id}" response = requests.get(api_url, headers=self.headers, timeout=timeout) if response.status_code == 200: result["verified"] = True result["quality_score"] = 0.95 result["download_url"] = f"https://arxiv.org/pdf/{arxiv_id}.pdf" result["file_format"] = "pdf" result["notes"].append("arXiv paper available") except Exception as e: result["notes"].append(f"arXiv verification error: {str(e)}") return result def _verify_url(self, citation: CitationModel, timeout: int) -> Dict[str, Any]: """Verify URL""" import requests result = { "citation": citation.dict(), "verified": False, "verification_source": "direct", "download_url": None, "file_format": None, "file_size": None, "quality_score": 0.0, "notes": [] } try: response = requests.head( citation.identifier, headers=self.headers, timeout=timeout, allow_redirects=True ) if response.status_code == 200: content_type = response.headers.get('content-type', '') result["verified"] = True result["quality_score"] = 0.7 result["download_url"] = citation.identifier # Check if it's a PDF if 'application/pdf' in content_type: result["file_format"] = "pdf" result["quality_score"] += 0.2 # Try to get file size content_length = response.headers.get('content-length') if content_length: result["file_size"] = int(content_length) result["notes"].append(f"Content-Type: {content_type}") except Exception as e: result["notes"].append(f"URL verification error: {str(e)}") return result def _verify_isbn(self, citation: CitationModel, timeout: int) -> Dict[str, Any]: """Verify ISBN""" import requests result = { "citation": citation.dict(), "verified": False, "verification_source": "openlibrary", "download_url": None, "file_format": None, "file_size": None, "quality_score": 0.0, "notes": [] } try: # Try Open Library API url = f"https://openlibrary.org/api/books?bibkeys=ISBN:{citation.identifier}&format=json" response = requests.get(url, headers=self.headers, timeout=timeout) if response.status_code == 200: data = response.json() if data: result["verified"] = True result["quality_score"] = 0.8 result["notes"].append("ISBN found in Open Library") except Exception as e: result["notes"].append(f"ISBN verification error: {str(e)}") return result def _verify_pmid(self, citation: CitationModel, timeout: int) -> Dict[str, Any]: """Verify PMID""" import requests result = { "citation": citation.dict(), "verified": False, "verification_source": "pubmed", "download_url": None, "file_format": None, "file_size": None, "quality_score": 0.0, "notes": [] } try: # Try PubMed API url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi?db=pubmed&id={citation.identifier}&retmode=json" response = requests.get(url, headers=self.headers, timeout=timeout) if response.status_code == 200: data = response.json() if data.get('result', {}).get(citation.identifier): result["verified"] = True result["quality_score"] = 0.85 result["notes"].append("PMID found in PubMed") except Exception as e: result["notes"].append(f"PMID verification error: {str(e)}") return result class PaperDownloadTool(Tool): name = "download_paper" description = """ Download academic paper from verified source. Args: verification_result (Dict[str, Any]): Verified resource to download output_dir (str): Directory to save downloaded file Returns: Dict[str, Any]: Download result with file path and metadata """ def __init__(self): super().__init__() self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } def forward(self, verification_result: Dict[str, Any], output_dir: str = "downloads") -> Dict[str, Any]: """Download paper""" import requests import os result = { "success": False, "file_path": None, "file_size": 0, "download_time": 0, "error": None, "metadata": verification_result } try: # Create output directory os.makedirs(output_dir, exist_ok=True) download_url = verification_result.get("download_url") if not download_url: result["error"] = "No download URL available" return result # Generate filename citation = verification_result.get("citation", {}) identifier = citation.get("identifier", "unknown") file_ext = verification_result.get("file_format", "pdf") # Clean filename filename = re.sub(r'[^\w\-\.]', '_', identifier) if not filename.endswith(f'.{file_ext}'): filename = f"{filename}.{file_ext}" file_path = os.path.join(output_dir, filename) # Download file start_time = datetime.now() response = requests.get( download_url, headers=self.headers, stream=True, timeout=30 ) if response.status_code == 200: with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) download_time = (datetime.now() - start_time).total_seconds() file_size = os.path.getsize(file_path) result["success"] = True result["file_path"] = file_path result["file_size"] = file_size result["download_time"] = download_time logger.info(f"Downloaded {filename} ({file_size} bytes)") else: result["error"] = f"HTTP {response.status_code}" except Exception as e: result["error"] = str(e) logger.error(f"Download error: {e}") return result class FileProcessingTool(Tool): name = "process_file" description = """ Process different file types to extract text for bibliography extraction. Args: file_path (str): Path to the file file_type (str): Type of file (auto-detected if None) Returns: Dict[str, Any]: Extracted text and metadata """ def __init__(self): super().__init__() def forward(self, file_path: str, file_type: str = None) -> Dict[str, Any]: """Process file and extract text""" import os result = { "success": False, "text": "", "file_type": file_type, "file_size": 0, "error": None, "metadata": {} } try: if not os.path.exists(file_path): result["error"] = "File not found" return result file_size = os.path.getsize(file_path) result["file_size"] = file_size # Determine file type if not file_type: file_type = self._detect_file_type(file_path) result["file_type"] = file_type # Process based on file type if file_type == "txt": with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: result["text"] = f.read() result["success"] = True elif file_type == "pdf": result["text"] = self._extract_from_pdf(file_path) result["success"] = True elif file_type == "docx": result["text"] = self._extract_from_docx(file_path) result["success"] = True elif file_type == "html": with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: html_content = f.read() result["text"] = self._extract_from_html(html_content) result["success"] = True else: # Try as text file try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: result["text"] = f.read() result["success"] = True except: result["error"] = f"Unsupported file type: {file_type}" except Exception as e: result["error"] = str(e) return result def _detect_file_type(self, file_path: str) -> str: """Detect file type from extension""" ext = os.path.splitext(file_path)[1].lower() type_mapping = { '.txt': 'txt', '.pdf': 'pdf', '.docx': 'docx', '.doc': 'doc', '.html': 'html', '.htm': 'html', '.md': 'markdown', '.rtf': 'rtf' } return type_mapping.get(ext, 'unknown') def _extract_from_pdf(self, file_path: str) -> str: """Extract text from PDF""" try: # Try PyPDF2 import PyPDF2 text = "" with open(file_path, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) for page in pdf_reader.pages: text += page.extract_text() return text except ImportError: logger.warning("PyPDF2 not installed, using fallback") # Fallback: use pdftotext command if available import subprocess try: result = subprocess.run( ['pdftotext', file_path, '-'], capture_output=True, text=True ) if result.returncode == 0: return result.stdout except: pass return "" def _extract_from_docx(self, file_path: str) -> str: """Extract text from DOCX""" try: from docx import Document doc = Document(file_path) return "\n".join([paragraph.text for paragraph in doc.paragraphs]) except ImportError: logger.warning("python-docx not installed") return "" except Exception as e: logger.error(f"Error reading DOCX: {e}") return "" def _extract_from_html(self, html_content: str) -> str: """Extract text from HTML""" try: from bs4 import BeautifulSoup soup = BeautifulSoup(html_content, 'html.parser') # Remove script and style elements for script in soup(["script", "style"]): script.decompose() return soup.get_text() except ImportError: # Simple regex-based extraction import re text = re.sub(r'<[^>]+>', ' ', html_content) text = re.sub(r'\s+', ' ', text) return text # ========== AGENTES PRINCIPALES ========== class BibliographyProcessingSystem: """Sistema principal de procesamiento bibliográfico usando smolagents""" def __init__(self, model_config: Dict[str, Any] = None): self.model_config = model_config or { "model_id": "gpt-4", "api_key": os.getenv("OPENAI_API_KEY", ""), "provider": "openai" } # Inicializar herramientas self.extraction_tool = BibliographyExtractionTool() self.verification_tool = ResourceVerificationTool() self.download_tool = PaperDownloadTool() self.file_tool = FileProcessingTool() # Crear agentes self.extraction_agent = self._create_extraction_agent() self.verification_agent = self._create_verification_agent() self.download_agent = self._create_download_agent() # Directorios self.output_dir = "bibliography_output" self.download_dir = os.path.join(self.output_dir, "downloads") self.report_dir = os.path.join(self.output_dir, "reports") # Crear directorios os.makedirs(self.output_dir, exist_ok=True) os.makedirs(self.download_dir, exist_ok=True) os.makedirs(self.report_dir, exist_ok=True) # Estado self.current_process_id = None self.processing_results = {} def _create_extraction_agent(self) -> ToolCallingAgent: """Crear agente de extracción""" model = self._create_model() agent = ToolCallingAgent( tools=[self.extraction_tool, self.file_tool], model=model, name="ExtractionAgent", description="Extract bibliographic references from documents", max_steps=10 ) return agent def _create_verification_agent(self) -> ToolCallingAgent: """Crear agente de verificación""" model = self._create_model() agent = ToolCallingAgent( tools=[self.verification_tool], model=model, name="VerificationAgent", description="Verify the existence and accessibility of academic resources", max_steps=15 ) return agent def _create_download_agent(self) -> ToolCallingAgent: """Crear agente de descarga""" model = self._create_model() agent = ToolCallingAgent( tools=[self.download_tool], model=model, name="DownloadAgent", description="Download academic papers from verified sources", max_steps=20 ) return agent def _create_model(self): """Crear modelo según configuración""" provider = self.model_config.get("provider", "openai") if provider == "openai": return LiteLLMModel( model_id=self.model_config.get("model_id", "gpt-4"), api_key=self.model_config.get("api_key") ) elif provider == "anthropic": return LiteLLMModel( model_id="claude-3-opus-20240229", api_key=self.model_config.get("api_key") ) elif provider == "huggingface": from smolagents import InferenceClientModel return InferenceClientModel( model_id=self.model_config.get("model_id", "mistralai/Mixtral-8x7B-Instruct-v0.1") ) else: # Default to OpenAI return LiteLLMModel(model_id="gpt-4") async def process_document(self, file_path: str, process_id: str = None) -> Dict[str, Any]: """Procesar documento completo""" import time start_time = time.time() # Generar ID de proceso self.current_process_id = process_id or hashlib.md5( f"{file_path}_{datetime.now().isoformat()}".encode() ).hexdigest()[:8] logger.info(f"Starting process {self.current_process_id} for {file_path}") # 1. Extraer texto del archivo extraction_prompt = f""" Process the file at {file_path} to extract all text content. Focus on extracting any bibliographic references, citations, or academic resources. Steps: 1. Use process_file tool to extract text 2. Return the extracted text for further analysis """ try: # Ejecutar agente de extracción de archivos file_result = await self.extraction_agent.run_async(extraction_prompt) if not file_result or "text" not in str(file_result): return { "success": False, "error": "Failed to extract text from file", "process_id": self.current_process_id } # 2. Extraer referencias bibliográficas text_content = str(file_result) extraction_prompt2 = f""" Analyze the following text and extract all bibliographic references: {text_content[:5000]}... # Limitar tamaño para el prompt Extract: 1. DOIs (Digital Object Identifiers) 2. ISBNs 3. arXiv IDs 4. PubMed IDs (PMID) 5. Academic URLs 6. Any other academic references Return a comprehensive list of all found references. """ extraction_result = await self.extraction_agent.run_async(extraction_prompt2) # Parsear resultado (asumiendo que el agente devuelve texto JSON-like) citations = [] try: # Intentar extraer JSON del resultado import json result_str = str(extraction_result) # Buscar patrón JSON json_match = re.search(r'\{.*\}', result_str, re.DOTALL) if json_match: citations_data = json.loads(json_match.group()) if isinstance(citations_data, list): citations = [CitationModel(**c) for c in citations_data] except: # Fallback: usar la herramienta directamente citations_data = self.extraction_tool.forward(text_content, os.path.basename(file_path)) citations = [CitationModel(**c) for c in citations_data] logger.info(f"Found {len(citations)} citations") # 3. Verificar recursos verified_resources = [] failed_verifications = [] for citation in citations: verification_prompt = f""" Verify the following academic resource: Type: {citation.resource_type} Identifier: {citation.identifier} Source: {citation.extracted_from} Check if this resource exists and is accessible. """ try: verification_result = await self.verification_agent.run_async(verification_prompt) # Parsear resultado if verification_result: verification_dict = self.verification_tool.forward(citation.dict()) verified_resource = VerificationResult(**verification_dict) if verified_resource.verified: verified_resources.append(verified_resource) else: failed_verifications.append(citation) except Exception as e: logger.error(f"Verification error for {citation.identifier}: {e}") failed_verifications.append(citation) # 4. Descargar recursos verificados downloaded_files = [] for verified_resource in verified_resources: if verified_resource.download_url: download_prompt = f""" Download the academic paper from: URL: {verified_resource.download_url} Format: {verified_resource.file_format} Save it to: {self.download_dir} """ try: download_result = await self.download_agent.run_async(download_prompt) if download_result: download_dict = self.download_tool.forward( verified_resource.dict(), self.download_dir ) if download_dict.get("success"): downloaded_files.append(download_dict.get("file_path")) except Exception as e: logger.error(f"Download error: {e}") # 5. Generar reporte processing_time = time.time() - start_time report = ProcessingReport( input_file=file_path, total_citations=len(citations), verified_resources=verified_resources, downloaded_files=downloaded_files, failed_verifications=failed_verifications, processing_time=processing_time, summary={ "success_rate": len(verified_resources) / max(1, len(citations)), "download_rate": len(downloaded_files) / max(1, len(verified_resources)), "file_count": len(downloaded_files) } ) # Guardar reporte report_path = os.path.join( self.report_dir, f"report_{self.current_process_id}.json" ) with open(report_path, 'w', encoding='utf-8') as f: json.dump(report.dict(), f, indent=2, default=str) # 6. Crear archivo ZIP con resultados zip_path = self._create_results_zip(report) # Guardar resultados en estado self.processing_results[self.current_process_id] = { "report": report.dict(), "zip_path": zip_path, "timestamp": datetime.now().isoformat() } logger.info(f"Process {self.current_process_id} completed in {processing_time:.2f}s") return { "success": True, "process_id": self.current_process_id, "report": report.dict(), "zip_path": zip_path, "summary": { "citations_found": len(citations), "resources_verified": len(verified_resources), "files_downloaded": len(downloaded_files), "processing_time": processing_time } } except Exception as e: logger.error(f"Processing error: {e}") return { "success": False, "error": str(e), "process_id": self.current_process_id } def _create_results_zip(self, report: ProcessingReport) -> str: """Crear archivo ZIP con resultados""" import zipfile from datetime import datetime timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") zip_filename = f"bibliography_results_{timestamp}.zip" zip_path = os.path.join(self.output_dir, zip_filename) with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: # Agregar reporte report_path = os.path.join( self.report_dir, f"report_{self.current_process_id}.json" ) if os.path.exists(report_path): zipf.write(report_path, "report.json") # Agregar archivos descargados for file_path in report.downloaded_files: if os.path.exists(file_path): arcname = os.path.join("downloads", os.path.basename(file_path)) zipf.write(file_path, arcname) # Agregar resumen en texto summary_content = self._generate_summary_text(report) zipf.writestr("summary.txt", summary_content) return zip_path def _generate_summary_text(self, report: ProcessingReport) -> str: """Generar resumen en texto""" summary = f""" BIBLIOGRAPHY PROCESSING REPORT ============================== Process ID: {self.current_process_id} Input File: {report.input_file} Processing Time: {report.processing_time:.2f} seconds Timestamp: {report.timestamp} STATISTICS ---------- Total Citations Found: {report.total_citations} Resources Verified: {len(report.verified_resources)} Files Downloaded: {len(report.downloaded_files)} Failed Verifications: {len(report.failed_verifications)} Success Rate: {(len(report.verified_resources) / max(1, report.total_citations)) * 100:.1f}% Download Rate: {(len(report.downloaded_files) / max(1, len(report.verified_resources))) * 100:.1f}% VERIFIED RESOURCES ------------------ """ for i, resource in enumerate(report.verified_resources, 1): summary += f"\n{i}. {resource.citation.identifier}" summary += f"\n Type: {resource.citation.resource_type}" summary += f"\n Source: {resource.verification_source}" summary += f"\n Quality: {resource.quality_score:.2f}" if resource.download_url: summary += f"\n Downloaded: Yes" if resource.file_format: summary += f" ({resource.file_format})" summary += "\n" if report.failed_verifications: summary += f"\nFAILED VERIFICATIONS\n-------------------\n" for citation in report.failed_verifications: summary += f"- {citation.identifier} ({citation.resource_type})\n" summary += f"\nFILES DOWNLOADED\n----------------\n" for file_path in report.downloaded_files: file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0 summary += f"- {os.path.basename(file_path)} ({file_size} bytes)\n" return summary def get_status(self, process_id: str = None) -> Dict[str, Any]: """Obtener estado del proceso""" pid = process_id or self.current_process_id if pid and pid in self.processing_results: return self.processing_results[pid] return {"error": "Process not found"} def cleanup(self, process_id: str = None): """Limpiar archivos temporales""" import shutil if process_id: # Limpiar proceso específico if process_id in self.processing_results: del self.processing_results[process_id] else: # Limpiar todo self.processing_results.clear() # Limpiar directorios (opcional, descomentar si se necesita) # shutil.rmtree(self.download_dir, ignore_errors=True) # shutil.rmtree(self.report_dir, ignore_errors=True) # ========== INTERFAZ GRADIO ========== def create_gradio_interface(): """Crear interfaz Gradio para el sistema""" system = None def initialize_system(provider, model_id, api_key): """Inicializar sistema con configuración""" nonlocal system config = { "provider": provider, "model_id": model_id, "api_key": api_key } try: system = BibliographyProcessingSystem(config) return "✅ Sistema inicializado correctamente" except Exception as e: return f"❌ Error: {str(e)}" async def process_file(file_obj, progress=gr.Progress()): """Procesar archivo""" if not system: return None, "❌ Sistema no inicializado", "", "" try: progress(0, desc="Iniciando procesamiento...") # Guardar archivo temporalmente import tempfile with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file_obj.name)[1]) as tmp: with open(file_obj.name, 'rb') as src: tmp.write(src.read()) tmp_path = tmp.name progress(0.2, desc="Extrayendo texto...") # Procesar archivo result = await system.process_document(tmp_path) if not result.get("success"): return None, f"❌ Error: {result.get('error')}", "", "" # Obtener reporte report_data = result.get("report", {}) summary = result.get("summary", {}) progress(0.8, desc="Generando resultados...") # Preparar resultados para visualización citations_found = summary.get("citations_found", 0) verified = summary.get("resources_verified", 0) downloaded = summary.get("files_downloaded", 0) # Generar HTML para visualización html_output = f"""

📊 Resultados del Procesamiento

📈 Estadísticas

""" # Lista de recursos verificados if verified > 0: html_output += """

✅ Recursos Verificados

" # Lista de fallos failed = len(report_data.get("failed_verifications", [])) if failed > 0: html_output += f"""

❌ Recursos No Verificados ({failed})

Algunos recursos no pudieron ser verificados. Revisa el archivo ZIP para más detalles.

""" html_output += "
" # Texto plano para exportación text_output = f""" Procesamiento Bibliográfico =========================== Archivo: {file_obj.name} Proceso ID: {result.get('process_id')} Fecha: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} Resumen: - Referencias encontradas: {citations_found} - Recursos verificados: {verified} - Archivos descargados: {downloaded} - Tasa de éxito: {(verified/max(1, citations_found))*100:.1f}% Para ver el reporte completo, descarga el archivo ZIP. """ progress(1.0, desc="Completado!") # Devolver resultados return ( result.get("zip_path"), f"✅ Procesamiento completado. ID: {result.get('process_id')}", html_output, text_output ) except Exception as e: logger.error(f"Error en procesamiento: {e}") return None, f"❌ Error: {str(e)}", "", "" def get_status(): """Obtener estado del sistema""" if not system or not system.current_process_id: return "⚠️ No hay procesos activos" status = system.get_status() if "error" in status: return f"⚠️ {status['error']}" return f""" 📊 Estado del Sistema --------------------- Proceso activo: {system.current_process_id} Total procesos: {len(system.processing_results)} Último reporte: {status.get('timestamp', 'N/A')} """ # Crear interfaz with gr.Blocks(title="Sistema de Recopilación Bibliográfica", theme=gr.themes.Soft()) as interface: gr.Markdown("# 📚 Sistema de Recopilación Bibliográfica con IA") gr.Markdown("Procesa documentos y extrae referencias bibliográficas automáticamente") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### ⚙️ Configuración") provider = gr.Dropdown( choices=["openai", "anthropic", "huggingface"], label="Proveedor de IA", value="openai" ) model_id = gr.Textbox( label="Model ID", value="gpt-4", placeholder="Ej: gpt-4, claude-3-opus-20240229, mistralai/Mixtral-8x7B-Instruct-v0.1" ) api_key = gr.Textbox( label="API Key", type="password", placeholder="Ingresa tu API key" ) init_btn = gr.Button("🚀 Inicializar Sistema", variant="primary") init_status = gr.Markdown("") init_btn.click( initialize_system, inputs=[provider, model_id, api_key], outputs=init_status ) gr.Markdown("---") status_btn = gr.Button("📊 Ver Estado") system_status = gr.Markdown("") status_btn.click(get_status, outputs=system_status) with gr.Column(scale=2): gr.Markdown("### 📄 Procesar Documento") file_input = gr.File( label="Sube tu documento", file_types=[".txt", ".pdf", ".docx", ".html", ".md", ".rtf"] ) process_btn = gr.Button("🔍 Procesar Documento", variant="primary") gr.Markdown("### 📊 Resultados") result_file = gr.File(label="Descargar Resultados (ZIP)") result_status = gr.Markdown("") with gr.Tabs(): with gr.TabItem("📋 Vista HTML"): html_output = gr.HTML(label="Resultados Detallados") with gr.TabItem("📝 Texto Plano"): text_output = gr.Textbox( label="Resumen", lines=20, max_lines=50 ) process_btn.click( process_file, inputs=[file_input], outputs=[result_file, result_status, html_output, text_output] ) # Ejemplos gr.Markdown("### 📖 Ejemplos") gr.Examples( examples=[ ["ejemplo_referencias.txt"], ["ejemplo_bibliografia.pdf"], ["paper_con_referencias.docx"] ], inputs=[file_input], label="Archivos de ejemplo (necesitan ser creados)" ) # Información gr.Markdown(""" ### 📌 Información - **Formatos soportados**: TXT, PDF, DOCX, HTML, MD, RTF - **Recursos detectados**: DOI, ISBN, arXiv, PMID, URLs académicas - **Salida**: Archivo ZIP con reportes y documentos descargados ### ⚠️ Notas 1. Necesitas una API key válida para el proveedor seleccionado 2. Los archivos grandes pueden tardar varios minutos 3. La precisión depende del modelo de IA utilizado """) return interface # ========== EJECUCIÓN PRINCIPAL ========== async def main(): """Función principal""" import argparse parser = argparse.ArgumentParser(description="Sistema de Recopilación Bibliográfica") parser.add_argument("--mode", choices=["gui", "cli"], default="gui", help="Modo de ejecución") parser.add_argument("--file", type=str, help="Archivo a procesar (modo CLI)") parser.add_argument("--provider", default="openai", help="Proveedor de IA") parser.add_argument("--model", default="gpt-4", help="Modelo de IA") parser.add_argument("--api-key", help="API Key") args = parser.parse_args() if args.mode == "gui": # Ejecutar interfaz Gradio interface = create_gradio_interface() interface.launch( server_name="0.0.0.0", server_port=7860, share=True, debug=True ) elif args.mode == "cli": # Modo línea de comandos if not args.file: print("❌ Error: Debes especificar un archivo con --file") return if not os.path.exists(args.file): print(f"❌ Error: Archivo no encontrado: {args.file}") return # Configurar sistema config = { "provider": args.provider, "model_id": args.model, "api_key": args.api_key or os.getenv(f"{args.provider.upper()}_API_KEY") } if not config["api_key"]: print(f"❌ Error: Necesitas especificar una API key") return system = BibliographyProcessingSystem(config) print(f"🔍 Procesando archivo: {args.file}") print("⏳ Esto puede tardar varios minutos...") result = await system.process_document(args.file) if result.get("success"): print(f"✅ Procesamiento completado!") print(f"📊 ID del proceso: {result.get('process_id')}") summary = result.get("summary", {}) print(f""" 📈 Resultados: - Referencias encontradas: {summary.get('citations_found', 0)} - Recursos verificados: {summary.get('resources_verified', 0)} - Archivos descargados: {summary.get('files_downloaded', 0)} - Tiempo de procesamiento: {summary.get('processing_time', 0):.2f}s 📦 Archivo ZIP con resultados: {result.get('zip_path')} """) else: print(f"❌ Error: {result.get('error')}") if __name__ == "__main__": import asyncio asyncio.run(main())