Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import logging | |
| import zipfile | |
| import asyncio | |
| import tempfile | |
| from typing import Dict, List, Optional, Any, Tuple | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from datetime import datetime | |
| import gradio as gr | |
| from enum import Enum | |
| import hashlib | |
| import urllib.parse | |
| # Importar smolagents | |
| from smolagents import CodeAgent, ToolCallingAgent, LiteLLMModel | |
| from smolagents.tools import Tool, tool | |
| from pydantic import BaseModel, Field | |
| # Configuración de logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('bibliography_system.log'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ========== MODELOS DE DATOS ========== | |
| class ResourceType(str, Enum): | |
| DOI = "doi" | |
| ISBN = "isbn" | |
| ARXIV = "arxiv" | |
| URL = "url" | |
| PMID = "pmid" | |
| BIBTEX = "bibtex" | |
| CITATION = "citation" | |
| UNKNOWN = "unknown" | |
| class CitationModel(BaseModel): | |
| id: str | |
| raw_text: str | |
| resource_type: ResourceType | |
| identifier: str | |
| metadata: Dict[str, Any] = Field(default_factory=dict) | |
| confidence: float = 0.0 | |
| extracted_from: str | |
| position: Tuple[int, int] = (0, 0) | |
| class VerificationResult(BaseModel): | |
| citation: CitationModel | |
| verified: bool | |
| verification_source: str | |
| download_url: Optional[str] | |
| file_format: Optional[str] | |
| file_size: Optional[int] | |
| quality_score: float | |
| notes: List[str] = Field(default_factory=list) | |
| class ProcessingReport(BaseModel): | |
| input_file: str | |
| total_citations: int | |
| verified_resources: List[VerificationResult] | |
| downloaded_files: List[str] | |
| failed_verifications: List[CitationModel] | |
| processing_time: float | |
| summary: Dict[str, Any] = Field(default_factory=dict) | |
| timestamp: str = Field(default_factory=lambda: datetime.now().isoformat()) | |
| # ========== HERRAMIENTAS PARA AGENTES ========== | |
| class BibliographyExtractionTool(Tool): | |
| name = "extract_bibliography" | |
| description = """ | |
| Extract bibliographic references from text. Identifies DOIs, ISBNs, arXiv IDs, URLs, | |
| and other academic identifiers from unstructured text. | |
| Args: | |
| text (str): The text to analyze | |
| source_name (str): Name of the source document | |
| Returns: | |
| List[CitationModel]: List of extracted citations | |
| """ | |
| def __init__(self): | |
| super().__init__() | |
| # Patrones para diferentes tipos de recursos | |
| self.patterns = { | |
| ResourceType.DOI: [ | |
| r'\b10\.\d{4,9}/[-._;()/:A-Z0-9]+\b', | |
| r'doi:\s*(10\.\d{4,9}/[-._;()/:A-Z0-9]+)', | |
| r'DOI:\s*(10\.\d{4,9}/[-._;()/:A-Z0-9]+)' | |
| ], | |
| ResourceType.ISBN: [ | |
| r'ISBN(?:-1[03])?:?\s*(?=[0-9X]{10}|(?=(?:[0-9]+[- ]){3})[- 0-9X]{13}|97[89][0-9]{10}|(?=(?:[0-9]+[- ]){4})[- 0-9]{17})(?:97[89][- ]?)?[0-9]{1,5}[- ]?[0-9]+[- ]?[0-9]+[- ]?[0-9X]' | |
| ], | |
| ResourceType.ARXIV: [ | |
| r'arXiv:\s*(\d{4}\.\d{4,5}(v\d+)?)', | |
| r'arxiv:\s*([a-z\-]+/\d{7})' | |
| ], | |
| ResourceType.PMID: [ | |
| r'PMID:\s*(\d+)', | |
| r'PubMed ID:\s*(\d+)' | |
| ] | |
| } | |
| def forward(self, text: str, source_name: str = "unknown") -> List[Dict[str, Any]]: | |
| """Extract citations from text""" | |
| citations = [] | |
| text_lower = text.lower() | |
| # Buscar por tipo de recurso | |
| for resource_type, patterns in self.patterns.items(): | |
| for pattern in patterns: | |
| matches = re.finditer(pattern, text, re.IGNORECASE) | |
| for match in matches: | |
| identifier = match.group(1) if match.groups() else match.group(0) | |
| # Limpiar identificador | |
| identifier = self._clean_identifier(identifier, resource_type) | |
| if identifier: | |
| # Calcular confianza basada en el contexto | |
| confidence = self._calculate_confidence( | |
| identifier, resource_type, text_lower, match.start() | |
| ) | |
| citation = CitationModel( | |
| id=hashlib.md5( | |
| f"{identifier}_{source_name}".encode() | |
| ).hexdigest()[:12], | |
| raw_text=match.group(0), | |
| resource_type=resource_type, | |
| identifier=identifier, | |
| metadata={ | |
| "found_at": match.start(), | |
| "context": self._get_context(text, match.start(), match.end()) | |
| }, | |
| confidence=confidence, | |
| extracted_from=source_name, | |
| position=(match.start(), match.end()) | |
| ) | |
| citations.append(citation.dict()) | |
| # Extraer URLs generales (solo si parecen académicas) | |
| url_pattern = r'https?://[^\s<>"]+|www\.[^\s<>"]+' | |
| url_matches = re.finditer(url_pattern, text) | |
| for match in url_matches: | |
| url = match.group(0) | |
| if self._is_academic_url(url): | |
| citation = CitationModel( | |
| id=hashlib.md5(f"{url}_{source_name}".encode()).hexdigest()[:12], | |
| raw_text=url, | |
| resource_type=ResourceType.URL, | |
| identifier=url, | |
| metadata={ | |
| "found_at": match.start(), | |
| "context": self._get_context(text, match.start(), match.end()) | |
| }, | |
| confidence=0.6, | |
| extracted_from=source_name, | |
| position=(match.start(), match.end()) | |
| ) | |
| citations.append(citation.dict()) | |
| return citations | |
| def _clean_identifier(self, identifier: str, resource_type: ResourceType) -> str: | |
| """Clean identifier""" | |
| identifier = identifier.strip() | |
| # Eliminar prefijos | |
| prefixes = ['doi:', 'DOI:', 'arxiv:', 'arXiv:', 'isbn:', 'ISBN:', 'pmid:', 'PMID:'] | |
| for prefix in prefixes: | |
| if identifier.startswith(prefix): | |
| identifier = identifier[len(prefix):].strip() | |
| # Limpiar caracteres no deseados | |
| identifier = identifier.strip('"\'<>()[]{}') | |
| return identifier | |
| def _calculate_confidence(self, identifier: str, resource_type: ResourceType, | |
| text: str, position: int) -> float: | |
| """Calculate confidence score for extracted citation""" | |
| confidence = 0.7 # Base confidence | |
| # Verificar formato DOI | |
| if resource_type == ResourceType.DOI: | |
| if re.match(r'^10\.\d{4,9}/.+', identifier): | |
| confidence += 0.2 | |
| # Verificar contexto | |
| context_words = ['paper', 'article', 'journal', 'conference', 'published', | |
| 'reference', 'bibliography', 'cite', 'doi', 'url'] | |
| context = text[max(0, position-100):min(len(text), position+100)] | |
| for word in context_words: | |
| if word in context.lower(): | |
| confidence += 0.05 | |
| return min(confidence, 1.0) | |
| def _is_academic_url(self, url: str) -> bool: | |
| """Check if URL looks academic""" | |
| academic_domains = [ | |
| 'arxiv.org', 'doi.org', 'springer.com', 'ieee.org', 'acm.org', | |
| 'sciencedirect.com', 'wiley.com', 'tandfonline.com', 'nature.com', | |
| 'science.org', 'pnas.org', 'plos.org', 'bmc.com', 'frontiersin.org', | |
| 'mdpi.com', 'researchgate.net', 'semanticscholar.org' | |
| ] | |
| url_lower = url.lower() | |
| return any(domain in url_lower for domain in academic_domains) | |
| def _get_context(self, text: str, start: int, end: int, window: int = 50) -> str: | |
| """Get context around match""" | |
| context_start = max(0, start - window) | |
| context_end = min(len(text), end + window) | |
| return text[context_start:context_end] | |
| class ResourceVerificationTool(Tool): | |
| name = "verify_resource" | |
| description = """ | |
| Verify the existence and accessibility of academic resources. | |
| Args: | |
| citation (Dict[str, Any]): Citation to verify | |
| timeout (int): Timeout in seconds | |
| Returns: | |
| VerificationResult: Verification result with metadata | |
| """ | |
| def __init__(self): | |
| super().__init__() | |
| self.headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
| } | |
| def forward(self, citation: Dict[str, Any], timeout: int = 10) -> Dict[str, Any]: | |
| """Verify a citation""" | |
| citation_obj = CitationModel(**citation) | |
| # Preparar resultado | |
| result = { | |
| "citation": citation_obj.dict(), | |
| "verified": False, | |
| "verification_source": "none", | |
| "download_url": None, | |
| "file_format": None, | |
| "file_size": None, | |
| "quality_score": 0.0, | |
| "notes": [] | |
| } | |
| try: | |
| if citation_obj.resource_type == ResourceType.DOI: | |
| return self._verify_doi(citation_obj, timeout) | |
| elif citation_obj.resource_type == ResourceType.ARXIV: | |
| return self._verify_arxiv(citation_obj, timeout) | |
| elif citation_obj.resource_type == ResourceType.URL: | |
| return self._verify_url(citation_obj, timeout) | |
| elif citation_obj.resource_type == ResourceType.ISBN: | |
| return self._verify_isbn(citation_obj, timeout) | |
| elif citation_obj.resource_type == ResourceType.PMID: | |
| return self._verify_pmid(citation_obj, timeout) | |
| else: | |
| result["notes"].append(f"Unsupported resource type: {citation_obj.resource_type}") | |
| except Exception as e: | |
| result["notes"].append(f"Verification error: {str(e)}") | |
| return result | |
| def _verify_doi(self, citation: CitationModel, timeout: int) -> Dict[str, Any]: | |
| """Verify DOI""" | |
| import requests | |
| result = { | |
| "citation": citation.dict(), | |
| "verified": False, | |
| "verification_source": "crossref", | |
| "download_url": None, | |
| "file_format": None, | |
| "file_size": None, | |
| "quality_score": 0.0, | |
| "notes": [] | |
| } | |
| try: | |
| # Try Crossref API | |
| url = f"https://api.crossref.org/works/{citation.identifier}" | |
| response = requests.get(url, headers=self.headers, timeout=timeout) | |
| if response.status_code == 200: | |
| data = response.json() | |
| work = data.get('message', {}) | |
| result["verified"] = True | |
| result["quality_score"] = 0.9 | |
| # Check for open access | |
| if work.get('license'): | |
| result["notes"].append("Open access available") | |
| result["quality_score"] += 0.1 | |
| # Try to find PDF URL | |
| links = work.get('link', []) | |
| for link in links: | |
| if link.get('content-type') == 'application/pdf': | |
| result["download_url"] = link.get('URL') | |
| result["file_format"] = "pdf" | |
| break | |
| # Try Unpaywall | |
| if not result["download_url"]: | |
| unpaywall_url = f"https://api.unpaywall.org/v2/{citation.identifier}[email protected]" | |
| unpaywall_response = requests.get(unpaywall_url, timeout=timeout) | |
| if unpaywall_response.status_code == 200: | |
| unpaywall_data = unpaywall_response.json() | |
| if unpaywall_data.get('is_oa'): | |
| result["download_url"] = unpaywall_data.get('best_oa_location', {}).get('url') | |
| result["verification_source"] = "unpaywall" | |
| else: | |
| result["notes"].append(f"Crossref API returned {response.status_code}") | |
| except Exception as e: | |
| result["notes"].append(f"DOI verification error: {str(e)}") | |
| return result | |
| def _verify_arxiv(self, citation: CitationModel, timeout: int) -> Dict[str, Any]: | |
| """Verify arXiv ID""" | |
| import requests | |
| result = { | |
| "citation": citation.dict(), | |
| "verified": False, | |
| "verification_source": "arxiv", | |
| "download_url": None, | |
| "file_format": None, | |
| "file_size": None, | |
| "quality_score": 0.0, | |
| "notes": [] | |
| } | |
| try: | |
| # Clean arXiv ID | |
| arxiv_id = citation.identifier | |
| if 'arxiv:' in arxiv_id.lower(): | |
| arxiv_id = arxiv_id.split(':')[-1].strip() | |
| # Check arXiv API | |
| api_url = f"http://export.arxiv.org/api/query?id_list={arxiv_id}" | |
| response = requests.get(api_url, headers=self.headers, timeout=timeout) | |
| if response.status_code == 200: | |
| result["verified"] = True | |
| result["quality_score"] = 0.95 | |
| result["download_url"] = f"https://arxiv.org/pdf/{arxiv_id}.pdf" | |
| result["file_format"] = "pdf" | |
| result["notes"].append("arXiv paper available") | |
| except Exception as e: | |
| result["notes"].append(f"arXiv verification error: {str(e)}") | |
| return result | |
| def _verify_url(self, citation: CitationModel, timeout: int) -> Dict[str, Any]: | |
| """Verify URL""" | |
| import requests | |
| result = { | |
| "citation": citation.dict(), | |
| "verified": False, | |
| "verification_source": "direct", | |
| "download_url": None, | |
| "file_format": None, | |
| "file_size": None, | |
| "quality_score": 0.0, | |
| "notes": [] | |
| } | |
| try: | |
| response = requests.head( | |
| citation.identifier, | |
| headers=self.headers, | |
| timeout=timeout, | |
| allow_redirects=True | |
| ) | |
| if response.status_code == 200: | |
| content_type = response.headers.get('content-type', '') | |
| result["verified"] = True | |
| result["quality_score"] = 0.7 | |
| result["download_url"] = citation.identifier | |
| # Check if it's a PDF | |
| if 'application/pdf' in content_type: | |
| result["file_format"] = "pdf" | |
| result["quality_score"] += 0.2 | |
| # Try to get file size | |
| content_length = response.headers.get('content-length') | |
| if content_length: | |
| result["file_size"] = int(content_length) | |
| result["notes"].append(f"Content-Type: {content_type}") | |
| except Exception as e: | |
| result["notes"].append(f"URL verification error: {str(e)}") | |
| return result | |
| def _verify_isbn(self, citation: CitationModel, timeout: int) -> Dict[str, Any]: | |
| """Verify ISBN""" | |
| import requests | |
| result = { | |
| "citation": citation.dict(), | |
| "verified": False, | |
| "verification_source": "openlibrary", | |
| "download_url": None, | |
| "file_format": None, | |
| "file_size": None, | |
| "quality_score": 0.0, | |
| "notes": [] | |
| } | |
| try: | |
| # Try Open Library API | |
| url = f"https://openlibrary.org/api/books?bibkeys=ISBN:{citation.identifier}&format=json" | |
| response = requests.get(url, headers=self.headers, timeout=timeout) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if data: | |
| result["verified"] = True | |
| result["quality_score"] = 0.8 | |
| result["notes"].append("ISBN found in Open Library") | |
| except Exception as e: | |
| result["notes"].append(f"ISBN verification error: {str(e)}") | |
| return result | |
| def _verify_pmid(self, citation: CitationModel, timeout: int) -> Dict[str, Any]: | |
| """Verify PMID""" | |
| import requests | |
| result = { | |
| "citation": citation.dict(), | |
| "verified": False, | |
| "verification_source": "pubmed", | |
| "download_url": None, | |
| "file_format": None, | |
| "file_size": None, | |
| "quality_score": 0.0, | |
| "notes": [] | |
| } | |
| try: | |
| # Try PubMed API | |
| url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi?db=pubmed&id={citation.identifier}&retmode=json" | |
| response = requests.get(url, headers=self.headers, timeout=timeout) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if data.get('result', {}).get(citation.identifier): | |
| result["verified"] = True | |
| result["quality_score"] = 0.85 | |
| result["notes"].append("PMID found in PubMed") | |
| except Exception as e: | |
| result["notes"].append(f"PMID verification error: {str(e)}") | |
| return result | |
| class PaperDownloadTool(Tool): | |
| name = "download_paper" | |
| description = """ | |
| Download academic paper from verified source. | |
| Args: | |
| verification_result (Dict[str, Any]): Verified resource to download | |
| output_dir (str): Directory to save downloaded file | |
| Returns: | |
| Dict[str, Any]: Download result with file path and metadata | |
| """ | |
| def __init__(self): | |
| super().__init__() | |
| self.headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
| } | |
| def forward(self, verification_result: Dict[str, Any], | |
| output_dir: str = "downloads") -> Dict[str, Any]: | |
| """Download paper""" | |
| import requests | |
| import os | |
| result = { | |
| "success": False, | |
| "file_path": None, | |
| "file_size": 0, | |
| "download_time": 0, | |
| "error": None, | |
| "metadata": verification_result | |
| } | |
| try: | |
| # Create output directory | |
| os.makedirs(output_dir, exist_ok=True) | |
| download_url = verification_result.get("download_url") | |
| if not download_url: | |
| result["error"] = "No download URL available" | |
| return result | |
| # Generate filename | |
| citation = verification_result.get("citation", {}) | |
| identifier = citation.get("identifier", "unknown") | |
| file_ext = verification_result.get("file_format", "pdf") | |
| # Clean filename | |
| filename = re.sub(r'[^\w\-\.]', '_', identifier) | |
| if not filename.endswith(f'.{file_ext}'): | |
| filename = f"{filename}.{file_ext}" | |
| file_path = os.path.join(output_dir, filename) | |
| # Download file | |
| start_time = datetime.now() | |
| response = requests.get( | |
| download_url, | |
| headers=self.headers, | |
| stream=True, | |
| timeout=30 | |
| ) | |
| if response.status_code == 200: | |
| with open(file_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| if chunk: | |
| f.write(chunk) | |
| download_time = (datetime.now() - start_time).total_seconds() | |
| file_size = os.path.getsize(file_path) | |
| result["success"] = True | |
| result["file_path"] = file_path | |
| result["file_size"] = file_size | |
| result["download_time"] = download_time | |
| logger.info(f"Downloaded {filename} ({file_size} bytes)") | |
| else: | |
| result["error"] = f"HTTP {response.status_code}" | |
| except Exception as e: | |
| result["error"] = str(e) | |
| logger.error(f"Download error: {e}") | |
| return result | |
| class FileProcessingTool(Tool): | |
| name = "process_file" | |
| description = """ | |
| Process different file types to extract text for bibliography extraction. | |
| Args: | |
| file_path (str): Path to the file | |
| file_type (str): Type of file (auto-detected if None) | |
| Returns: | |
| Dict[str, Any]: Extracted text and metadata | |
| """ | |
| def __init__(self): | |
| super().__init__() | |
| def forward(self, file_path: str, file_type: str = None) -> Dict[str, Any]: | |
| """Process file and extract text""" | |
| import os | |
| result = { | |
| "success": False, | |
| "text": "", | |
| "file_type": file_type, | |
| "file_size": 0, | |
| "error": None, | |
| "metadata": {} | |
| } | |
| try: | |
| if not os.path.exists(file_path): | |
| result["error"] = "File not found" | |
| return result | |
| file_size = os.path.getsize(file_path) | |
| result["file_size"] = file_size | |
| # Determine file type | |
| if not file_type: | |
| file_type = self._detect_file_type(file_path) | |
| result["file_type"] = file_type | |
| # Process based on file type | |
| if file_type == "txt": | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| result["text"] = f.read() | |
| result["success"] = True | |
| elif file_type == "pdf": | |
| result["text"] = self._extract_from_pdf(file_path) | |
| result["success"] = True | |
| elif file_type == "docx": | |
| result["text"] = self._extract_from_docx(file_path) | |
| result["success"] = True | |
| elif file_type == "html": | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| html_content = f.read() | |
| result["text"] = self._extract_from_html(html_content) | |
| result["success"] = True | |
| else: | |
| # Try as text file | |
| try: | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| result["text"] = f.read() | |
| result["success"] = True | |
| except: | |
| result["error"] = f"Unsupported file type: {file_type}" | |
| except Exception as e: | |
| result["error"] = str(e) | |
| return result | |
| def _detect_file_type(self, file_path: str) -> str: | |
| """Detect file type from extension""" | |
| ext = os.path.splitext(file_path)[1].lower() | |
| type_mapping = { | |
| '.txt': 'txt', | |
| '.pdf': 'pdf', | |
| '.docx': 'docx', | |
| '.doc': 'doc', | |
| '.html': 'html', | |
| '.htm': 'html', | |
| '.md': 'markdown', | |
| '.rtf': 'rtf' | |
| } | |
| return type_mapping.get(ext, 'unknown') | |
| def _extract_from_pdf(self, file_path: str) -> str: | |
| """Extract text from PDF""" | |
| try: | |
| # Try PyPDF2 | |
| import PyPDF2 | |
| text = "" | |
| with open(file_path, 'rb') as file: | |
| pdf_reader = PyPDF2.PdfReader(file) | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() | |
| return text | |
| except ImportError: | |
| logger.warning("PyPDF2 not installed, using fallback") | |
| # Fallback: use pdftotext command if available | |
| import subprocess | |
| try: | |
| result = subprocess.run( | |
| ['pdftotext', file_path, '-'], | |
| capture_output=True, | |
| text=True | |
| ) | |
| if result.returncode == 0: | |
| return result.stdout | |
| except: | |
| pass | |
| return "" | |
| def _extract_from_docx(self, file_path: str) -> str: | |
| """Extract text from DOCX""" | |
| try: | |
| from docx import Document | |
| doc = Document(file_path) | |
| return "\n".join([paragraph.text for paragraph in doc.paragraphs]) | |
| except ImportError: | |
| logger.warning("python-docx not installed") | |
| return "" | |
| except Exception as e: | |
| logger.error(f"Error reading DOCX: {e}") | |
| return "" | |
| def _extract_from_html(self, html_content: str) -> str: | |
| """Extract text from HTML""" | |
| try: | |
| from bs4 import BeautifulSoup | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| # Remove script and style elements | |
| for script in soup(["script", "style"]): | |
| script.decompose() | |
| return soup.get_text() | |
| except ImportError: | |
| # Simple regex-based extraction | |
| import re | |
| text = re.sub(r'<[^>]+>', ' ', html_content) | |
| text = re.sub(r'\s+', ' ', text) | |
| return text | |
| # ========== AGENTES PRINCIPALES ========== | |
| class BibliographyProcessingSystem: | |
| """Sistema principal de procesamiento bibliográfico usando smolagents""" | |
| def __init__(self, model_config: Dict[str, Any] = None): | |
| self.model_config = model_config or { | |
| "model_id": "gpt-4", | |
| "api_key": os.getenv("OPENAI_API_KEY", ""), | |
| "provider": "openai" | |
| } | |
| # Inicializar herramientas | |
| self.extraction_tool = BibliographyExtractionTool() | |
| self.verification_tool = ResourceVerificationTool() | |
| self.download_tool = PaperDownloadTool() | |
| self.file_tool = FileProcessingTool() | |
| # Crear agentes | |
| self.extraction_agent = self._create_extraction_agent() | |
| self.verification_agent = self._create_verification_agent() | |
| self.download_agent = self._create_download_agent() | |
| # Directorios | |
| self.output_dir = "bibliography_output" | |
| self.download_dir = os.path.join(self.output_dir, "downloads") | |
| self.report_dir = os.path.join(self.output_dir, "reports") | |
| # Crear directorios | |
| os.makedirs(self.output_dir, exist_ok=True) | |
| os.makedirs(self.download_dir, exist_ok=True) | |
| os.makedirs(self.report_dir, exist_ok=True) | |
| # Estado | |
| self.current_process_id = None | |
| self.processing_results = {} | |
| def _create_extraction_agent(self) -> ToolCallingAgent: | |
| """Crear agente de extracción""" | |
| model = self._create_model() | |
| agent = ToolCallingAgent( | |
| tools=[self.extraction_tool, self.file_tool], | |
| model=model, | |
| name="ExtractionAgent", | |
| description="Extract bibliographic references from documents", | |
| max_steps=10 | |
| ) | |
| return agent | |
| def _create_verification_agent(self) -> ToolCallingAgent: | |
| """Crear agente de verificación""" | |
| model = self._create_model() | |
| agent = ToolCallingAgent( | |
| tools=[self.verification_tool], | |
| model=model, | |
| name="VerificationAgent", | |
| description="Verify the existence and accessibility of academic resources", | |
| max_steps=15 | |
| ) | |
| return agent | |
| def _create_download_agent(self) -> ToolCallingAgent: | |
| """Crear agente de descarga""" | |
| model = self._create_model() | |
| agent = ToolCallingAgent( | |
| tools=[self.download_tool], | |
| model=model, | |
| name="DownloadAgent", | |
| description="Download academic papers from verified sources", | |
| max_steps=20 | |
| ) | |
| return agent | |
| def _create_model(self): | |
| """Crear modelo según configuración""" | |
| provider = self.model_config.get("provider", "openai") | |
| if provider == "openai": | |
| return LiteLLMModel( | |
| model_id=self.model_config.get("model_id", "gpt-4"), | |
| api_key=self.model_config.get("api_key") | |
| ) | |
| elif provider == "anthropic": | |
| return LiteLLMModel( | |
| model_id="claude-3-opus-20240229", | |
| api_key=self.model_config.get("api_key") | |
| ) | |
| elif provider == "huggingface": | |
| from smolagents import InferenceClientModel | |
| return InferenceClientModel( | |
| model_id=self.model_config.get("model_id", "mistralai/Mixtral-8x7B-Instruct-v0.1") | |
| ) | |
| else: | |
| # Default to OpenAI | |
| return LiteLLMModel(model_id="gpt-4") | |
| async def process_document(self, file_path: str, process_id: str = None) -> Dict[str, Any]: | |
| """Procesar documento completo""" | |
| import time | |
| start_time = time.time() | |
| # Generar ID de proceso | |
| self.current_process_id = process_id or hashlib.md5( | |
| f"{file_path}_{datetime.now().isoformat()}".encode() | |
| ).hexdigest()[:8] | |
| logger.info(f"Starting process {self.current_process_id} for {file_path}") | |
| # 1. Extraer texto del archivo | |
| extraction_prompt = f""" | |
| Process the file at {file_path} to extract all text content. | |
| Focus on extracting any bibliographic references, citations, or academic resources. | |
| Steps: | |
| 1. Use process_file tool to extract text | |
| 2. Return the extracted text for further analysis | |
| """ | |
| try: | |
| # Ejecutar agente de extracción de archivos | |
| file_result = await self.extraction_agent.run_async(extraction_prompt) | |
| if not file_result or "text" not in str(file_result): | |
| return { | |
| "success": False, | |
| "error": "Failed to extract text from file", | |
| "process_id": self.current_process_id | |
| } | |
| # 2. Extraer referencias bibliográficas | |
| text_content = str(file_result) | |
| extraction_prompt2 = f""" | |
| Analyze the following text and extract all bibliographic references: | |
| {text_content[:5000]}... # Limitar tamaño para el prompt | |
| Extract: | |
| 1. DOIs (Digital Object Identifiers) | |
| 2. ISBNs | |
| 3. arXiv IDs | |
| 4. PubMed IDs (PMID) | |
| 5. Academic URLs | |
| 6. Any other academic references | |
| Return a comprehensive list of all found references. | |
| """ | |
| extraction_result = await self.extraction_agent.run_async(extraction_prompt2) | |
| # Parsear resultado (asumiendo que el agente devuelve texto JSON-like) | |
| citations = [] | |
| try: | |
| # Intentar extraer JSON del resultado | |
| import json | |
| result_str = str(extraction_result) | |
| # Buscar patrón JSON | |
| json_match = re.search(r'\{.*\}', result_str, re.DOTALL) | |
| if json_match: | |
| citations_data = json.loads(json_match.group()) | |
| if isinstance(citations_data, list): | |
| citations = [CitationModel(**c) for c in citations_data] | |
| except: | |
| # Fallback: usar la herramienta directamente | |
| citations_data = self.extraction_tool.forward(text_content, os.path.basename(file_path)) | |
| citations = [CitationModel(**c) for c in citations_data] | |
| logger.info(f"Found {len(citations)} citations") | |
| # 3. Verificar recursos | |
| verified_resources = [] | |
| failed_verifications = [] | |
| for citation in citations: | |
| verification_prompt = f""" | |
| Verify the following academic resource: | |
| Type: {citation.resource_type} | |
| Identifier: {citation.identifier} | |
| Source: {citation.extracted_from} | |
| Check if this resource exists and is accessible. | |
| """ | |
| try: | |
| verification_result = await self.verification_agent.run_async(verification_prompt) | |
| # Parsear resultado | |
| if verification_result: | |
| verification_dict = self.verification_tool.forward(citation.dict()) | |
| verified_resource = VerificationResult(**verification_dict) | |
| if verified_resource.verified: | |
| verified_resources.append(verified_resource) | |
| else: | |
| failed_verifications.append(citation) | |
| except Exception as e: | |
| logger.error(f"Verification error for {citation.identifier}: {e}") | |
| failed_verifications.append(citation) | |
| # 4. Descargar recursos verificados | |
| downloaded_files = [] | |
| for verified_resource in verified_resources: | |
| if verified_resource.download_url: | |
| download_prompt = f""" | |
| Download the academic paper from: | |
| URL: {verified_resource.download_url} | |
| Format: {verified_resource.file_format} | |
| Save it to: {self.download_dir} | |
| """ | |
| try: | |
| download_result = await self.download_agent.run_async(download_prompt) | |
| if download_result: | |
| download_dict = self.download_tool.forward( | |
| verified_resource.dict(), | |
| self.download_dir | |
| ) | |
| if download_dict.get("success"): | |
| downloaded_files.append(download_dict.get("file_path")) | |
| except Exception as e: | |
| logger.error(f"Download error: {e}") | |
| # 5. Generar reporte | |
| processing_time = time.time() - start_time | |
| report = ProcessingReport( | |
| input_file=file_path, | |
| total_citations=len(citations), | |
| verified_resources=verified_resources, | |
| downloaded_files=downloaded_files, | |
| failed_verifications=failed_verifications, | |
| processing_time=processing_time, | |
| summary={ | |
| "success_rate": len(verified_resources) / max(1, len(citations)), | |
| "download_rate": len(downloaded_files) / max(1, len(verified_resources)), | |
| "file_count": len(downloaded_files) | |
| } | |
| ) | |
| # Guardar reporte | |
| report_path = os.path.join( | |
| self.report_dir, | |
| f"report_{self.current_process_id}.json" | |
| ) | |
| with open(report_path, 'w', encoding='utf-8') as f: | |
| json.dump(report.dict(), f, indent=2, default=str) | |
| # 6. Crear archivo ZIP con resultados | |
| zip_path = self._create_results_zip(report) | |
| # Guardar resultados en estado | |
| self.processing_results[self.current_process_id] = { | |
| "report": report.dict(), | |
| "zip_path": zip_path, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| logger.info(f"Process {self.current_process_id} completed in {processing_time:.2f}s") | |
| return { | |
| "success": True, | |
| "process_id": self.current_process_id, | |
| "report": report.dict(), | |
| "zip_path": zip_path, | |
| "summary": { | |
| "citations_found": len(citations), | |
| "resources_verified": len(verified_resources), | |
| "files_downloaded": len(downloaded_files), | |
| "processing_time": processing_time | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Processing error: {e}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "process_id": self.current_process_id | |
| } | |
| def _create_results_zip(self, report: ProcessingReport) -> str: | |
| """Crear archivo ZIP con resultados""" | |
| import zipfile | |
| from datetime import datetime | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| zip_filename = f"bibliography_results_{timestamp}.zip" | |
| zip_path = os.path.join(self.output_dir, zip_filename) | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| # Agregar reporte | |
| report_path = os.path.join( | |
| self.report_dir, | |
| f"report_{self.current_process_id}.json" | |
| ) | |
| if os.path.exists(report_path): | |
| zipf.write(report_path, "report.json") | |
| # Agregar archivos descargados | |
| for file_path in report.downloaded_files: | |
| if os.path.exists(file_path): | |
| arcname = os.path.join("downloads", os.path.basename(file_path)) | |
| zipf.write(file_path, arcname) | |
| # Agregar resumen en texto | |
| summary_content = self._generate_summary_text(report) | |
| zipf.writestr("summary.txt", summary_content) | |
| return zip_path | |
| def _generate_summary_text(self, report: ProcessingReport) -> str: | |
| """Generar resumen en texto""" | |
| summary = f""" | |
| BIBLIOGRAPHY PROCESSING REPORT | |
| ============================== | |
| Process ID: {self.current_process_id} | |
| Input File: {report.input_file} | |
| Processing Time: {report.processing_time:.2f} seconds | |
| Timestamp: {report.timestamp} | |
| STATISTICS | |
| ---------- | |
| Total Citations Found: {report.total_citations} | |
| Resources Verified: {len(report.verified_resources)} | |
| Files Downloaded: {len(report.downloaded_files)} | |
| Failed Verifications: {len(report.failed_verifications)} | |
| Success Rate: {(len(report.verified_resources) / max(1, report.total_citations)) * 100:.1f}% | |
| Download Rate: {(len(report.downloaded_files) / max(1, len(report.verified_resources))) * 100:.1f}% | |
| VERIFIED RESOURCES | |
| ------------------ | |
| """ | |
| for i, resource in enumerate(report.verified_resources, 1): | |
| summary += f"\n{i}. {resource.citation.identifier}" | |
| summary += f"\n Type: {resource.citation.resource_type}" | |
| summary += f"\n Source: {resource.verification_source}" | |
| summary += f"\n Quality: {resource.quality_score:.2f}" | |
| if resource.download_url: | |
| summary += f"\n Downloaded: Yes" | |
| if resource.file_format: | |
| summary += f" ({resource.file_format})" | |
| summary += "\n" | |
| if report.failed_verifications: | |
| summary += f"\nFAILED VERIFICATIONS\n-------------------\n" | |
| for citation in report.failed_verifications: | |
| summary += f"- {citation.identifier} ({citation.resource_type})\n" | |
| summary += f"\nFILES DOWNLOADED\n----------------\n" | |
| for file_path in report.downloaded_files: | |
| file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0 | |
| summary += f"- {os.path.basename(file_path)} ({file_size} bytes)\n" | |
| return summary | |
| def get_status(self, process_id: str = None) -> Dict[str, Any]: | |
| """Obtener estado del proceso""" | |
| pid = process_id or self.current_process_id | |
| if pid and pid in self.processing_results: | |
| return self.processing_results[pid] | |
| return {"error": "Process not found"} | |
| def cleanup(self, process_id: str = None): | |
| """Limpiar archivos temporales""" | |
| import shutil | |
| if process_id: | |
| # Limpiar proceso específico | |
| if process_id in self.processing_results: | |
| del self.processing_results[process_id] | |
| else: | |
| # Limpiar todo | |
| self.processing_results.clear() | |
| # Limpiar directorios (opcional, descomentar si se necesita) | |
| # shutil.rmtree(self.download_dir, ignore_errors=True) | |
| # shutil.rmtree(self.report_dir, ignore_errors=True) | |
| # ========== INTERFAZ GRADIO ========== | |
| def create_gradio_interface(): | |
| """Crear interfaz Gradio para el sistema""" | |
| system = None | |
| def initialize_system(provider, model_id, api_key): | |
| """Inicializar sistema con configuración""" | |
| nonlocal system | |
| config = { | |
| "provider": provider, | |
| "model_id": model_id, | |
| "api_key": api_key | |
| } | |
| try: | |
| system = BibliographyProcessingSystem(config) | |
| return "✅ Sistema inicializado correctamente" | |
| except Exception as e: | |
| return f"❌ Error: {str(e)}" | |
| async def process_file(file_obj, progress=gr.Progress()): | |
| """Procesar archivo""" | |
| if not system: | |
| return None, "❌ Sistema no inicializado", "", "" | |
| try: | |
| progress(0, desc="Iniciando procesamiento...") | |
| # Guardar archivo temporalmente | |
| import tempfile | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file_obj.name)[1]) as tmp: | |
| with open(file_obj.name, 'rb') as src: | |
| tmp.write(src.read()) | |
| tmp_path = tmp.name | |
| progress(0.2, desc="Extrayendo texto...") | |
| # Procesar archivo | |
| result = await system.process_document(tmp_path) | |
| if not result.get("success"): | |
| return None, f"❌ Error: {result.get('error')}", "", "" | |
| # Obtener reporte | |
| report_data = result.get("report", {}) | |
| summary = result.get("summary", {}) | |
| progress(0.8, desc="Generando resultados...") | |
| # Preparar resultados para visualización | |
| citations_found = summary.get("citations_found", 0) | |
| verified = summary.get("resources_verified", 0) | |
| downloaded = summary.get("files_downloaded", 0) | |
| # Generar HTML para visualización | |
| html_output = f""" | |
| <div style="font-family: Arial, sans-serif; padding: 20px;"> | |
| <h2>📊 Resultados del Procesamiento</h2> | |
| <div style="background: #f5f5f5; padding: 15px; border-radius: 10px; margin: 20px 0;"> | |
| <h3>📈 Estadísticas</h3> | |
| <ul> | |
| <li><strong>Referencias encontradas:</strong> {citations_found}</li> | |
| <li><strong>Recursos verificados:</strong> {verified}</li> | |
| <li><strong>Archivos descargados:</strong> {downloaded}</li> | |
| <li><strong>Tasa de éxito:</strong> {(verified/max(1, citations_found))*100:.1f}%</li> | |
| <li><strong>ID del proceso:</strong> {result.get('process_id')}</li> | |
| </ul> | |
| </div> | |
| """ | |
| # Lista de recursos verificados | |
| if verified > 0: | |
| html_output += """ | |
| <div style="background: #e8f5e9; padding: 15px; border-radius: 10px; margin: 20px 0;"> | |
| <h3>✅ Recursos Verificados</h3> | |
| <ul> | |
| """ | |
| resources = report_data.get("verified_resources", []) | |
| for i, resource in enumerate(resources[:10], 1): # Mostrar primeros 10 | |
| citation = resource.get("citation", {}) | |
| html_output += f""" | |
| <li> | |
| <strong>{citation.get('identifier', 'Unknown')}</strong><br> | |
| <small>Tipo: {citation.get('resource_type', 'unknown')} | | |
| Fuente: {resource.get('verification_source', 'unknown')} | | |
| Calidad: {resource.get('quality_score', 0):.2f}</small> | |
| </li> | |
| """ | |
| if verified > 10: | |
| html_output += f"<li>... y {verified - 10} más</li>" | |
| html_output += "</ul></div>" | |
| # Lista de fallos | |
| failed = len(report_data.get("failed_verifications", [])) | |
| if failed > 0: | |
| html_output += f""" | |
| <div style="background: #ffebee; padding: 15px; border-radius: 10px; margin: 20px 0;"> | |
| <h3>❌ Recursos No Verificados ({failed})</h3> | |
| <p>Algunos recursos no pudieron ser verificados. Revisa el archivo ZIP para más detalles.</p> | |
| </div> | |
| """ | |
| html_output += "</div>" | |
| # Texto plano para exportación | |
| text_output = f""" | |
| Procesamiento Bibliográfico | |
| =========================== | |
| Archivo: {file_obj.name} | |
| Proceso ID: {result.get('process_id')} | |
| Fecha: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
| Resumen: | |
| - Referencias encontradas: {citations_found} | |
| - Recursos verificados: {verified} | |
| - Archivos descargados: {downloaded} | |
| - Tasa de éxito: {(verified/max(1, citations_found))*100:.1f}% | |
| Para ver el reporte completo, descarga el archivo ZIP. | |
| """ | |
| progress(1.0, desc="Completado!") | |
| # Devolver resultados | |
| return ( | |
| result.get("zip_path"), | |
| f"✅ Procesamiento completado. ID: {result.get('process_id')}", | |
| html_output, | |
| text_output | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error en procesamiento: {e}") | |
| return None, f"❌ Error: {str(e)}", "", "" | |
| def get_status(): | |
| """Obtener estado del sistema""" | |
| if not system or not system.current_process_id: | |
| return "⚠️ No hay procesos activos" | |
| status = system.get_status() | |
| if "error" in status: | |
| return f"⚠️ {status['error']}" | |
| return f""" | |
| 📊 Estado del Sistema | |
| --------------------- | |
| Proceso activo: {system.current_process_id} | |
| Total procesos: {len(system.processing_results)} | |
| Último reporte: {status.get('timestamp', 'N/A')} | |
| """ | |
| # Crear interfaz | |
| with gr.Blocks(title="Sistema de Recopilación Bibliográfica", theme=gr.themes.Soft()) as interface: | |
| gr.Markdown("# 📚 Sistema de Recopilación Bibliográfica con IA") | |
| gr.Markdown("Procesa documentos y extrae referencias bibliográficas automáticamente") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### ⚙️ Configuración") | |
| provider = gr.Dropdown( | |
| choices=["openai", "anthropic", "huggingface"], | |
| label="Proveedor de IA", | |
| value="openai" | |
| ) | |
| model_id = gr.Textbox( | |
| label="Model ID", | |
| value="gpt-4", | |
| placeholder="Ej: gpt-4, claude-3-opus-20240229, mistralai/Mixtral-8x7B-Instruct-v0.1" | |
| ) | |
| api_key = gr.Textbox( | |
| label="API Key", | |
| type="password", | |
| placeholder="Ingresa tu API key" | |
| ) | |
| init_btn = gr.Button("🚀 Inicializar Sistema", variant="primary") | |
| init_status = gr.Markdown("") | |
| init_btn.click( | |
| initialize_system, | |
| inputs=[provider, model_id, api_key], | |
| outputs=init_status | |
| ) | |
| gr.Markdown("---") | |
| status_btn = gr.Button("📊 Ver Estado") | |
| system_status = gr.Markdown("") | |
| status_btn.click(get_status, outputs=system_status) | |
| with gr.Column(scale=2): | |
| gr.Markdown("### 📄 Procesar Documento") | |
| file_input = gr.File( | |
| label="Sube tu documento", | |
| file_types=[".txt", ".pdf", ".docx", ".html", ".md", ".rtf"] | |
| ) | |
| process_btn = gr.Button("🔍 Procesar Documento", variant="primary") | |
| gr.Markdown("### 📊 Resultados") | |
| result_file = gr.File(label="Descargar Resultados (ZIP)") | |
| result_status = gr.Markdown("") | |
| with gr.Tabs(): | |
| with gr.TabItem("📋 Vista HTML"): | |
| html_output = gr.HTML(label="Resultados Detallados") | |
| with gr.TabItem("📝 Texto Plano"): | |
| text_output = gr.Textbox( | |
| label="Resumen", | |
| lines=20, | |
| max_lines=50 | |
| ) | |
| process_btn.click( | |
| process_file, | |
| inputs=[file_input], | |
| outputs=[result_file, result_status, html_output, text_output] | |
| ) | |
| # Ejemplos | |
| gr.Markdown("### 📖 Ejemplos") | |
| gr.Examples( | |
| examples=[ | |
| ["ejemplo_referencias.txt"], | |
| ["ejemplo_bibliografia.pdf"], | |
| ["paper_con_referencias.docx"] | |
| ], | |
| inputs=[file_input], | |
| label="Archivos de ejemplo (necesitan ser creados)" | |
| ) | |
| # Información | |
| gr.Markdown(""" | |
| ### 📌 Información | |
| - **Formatos soportados**: TXT, PDF, DOCX, HTML, MD, RTF | |
| - **Recursos detectados**: DOI, ISBN, arXiv, PMID, URLs académicas | |
| - **Salida**: Archivo ZIP con reportes y documentos descargados | |
| ### ⚠️ Notas | |
| 1. Necesitas una API key válida para el proveedor seleccionado | |
| 2. Los archivos grandes pueden tardar varios minutos | |
| 3. La precisión depende del modelo de IA utilizado | |
| """) | |
| return interface | |
| # ========== EJECUCIÓN PRINCIPAL ========== | |
| async def main(): | |
| """Función principal""" | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Sistema de Recopilación Bibliográfica") | |
| parser.add_argument("--mode", choices=["gui", "cli"], default="gui", | |
| help="Modo de ejecución") | |
| parser.add_argument("--file", type=str, help="Archivo a procesar (modo CLI)") | |
| parser.add_argument("--provider", default="openai", help="Proveedor de IA") | |
| parser.add_argument("--model", default="gpt-4", help="Modelo de IA") | |
| parser.add_argument("--api-key", help="API Key") | |
| args = parser.parse_args() | |
| if args.mode == "gui": | |
| # Ejecutar interfaz Gradio | |
| interface = create_gradio_interface() | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True, | |
| debug=True | |
| ) | |
| elif args.mode == "cli": | |
| # Modo línea de comandos | |
| if not args.file: | |
| print("❌ Error: Debes especificar un archivo con --file") | |
| return | |
| if not os.path.exists(args.file): | |
| print(f"❌ Error: Archivo no encontrado: {args.file}") | |
| return | |
| # Configurar sistema | |
| config = { | |
| "provider": args.provider, | |
| "model_id": args.model, | |
| "api_key": args.api_key or os.getenv(f"{args.provider.upper()}_API_KEY") | |
| } | |
| if not config["api_key"]: | |
| print(f"❌ Error: Necesitas especificar una API key") | |
| return | |
| system = BibliographyProcessingSystem(config) | |
| print(f"🔍 Procesando archivo: {args.file}") | |
| print("⏳ Esto puede tardar varios minutos...") | |
| result = await system.process_document(args.file) | |
| if result.get("success"): | |
| print(f"✅ Procesamiento completado!") | |
| print(f"📊 ID del proceso: {result.get('process_id')}") | |
| summary = result.get("summary", {}) | |
| print(f""" | |
| 📈 Resultados: | |
| - Referencias encontradas: {summary.get('citations_found', 0)} | |
| - Recursos verificados: {summary.get('resources_verified', 0)} | |
| - Archivos descargados: {summary.get('files_downloaded', 0)} | |
| - Tiempo de procesamiento: {summary.get('processing_time', 0):.2f}s | |
| 📦 Archivo ZIP con resultados: {result.get('zip_path')} | |
| """) | |
| else: | |
| print(f"❌ Error: {result.get('error')}") | |
| if __name__ == "__main__": | |
| import asyncio | |
| asyncio.run(main()) |