import os import re import json import logging import zipfile import asyncio from typing import Dict, List, Optional, Any from datetime import datetime import gradio as gr from enum import Enum import hashlib import aiohttp # Configuración de logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # ========== CONFIGURACIÓN DE APIs ========== class APIProvider: """Gestor de diferentes APIs de IA""" def __init__(self): self.available_apis = { "nebius": { "name": "Nebius AI", "base_url": "https://api.nebius.ai/v1", "models": ["neural-chat-7b-v3-1", "llama-2-70b-chat", "mistral-7b-instruct"], "headers": {"Content-Type": "application/json"} }, "moonshot": { "name": "Moonshot AI", "base_url": "https://api.moonshot.cn/v1", "models": ["moonshot-v1-8k", "moonshot-v1-32k", "moonshot-v1-128k"], "headers": {"Content-Type": "application/json"} }, "openai": { "name": "OpenAI", "base_url": "https://api.openai.com/v1", "models": ["gpt-4", "gpt-3.5-turbo", "gpt-4-turbo"], "headers": {"Content-Type": "application/json"} }, "anthropic": { "name": "Anthropic", "base_url": "https://api.anthropic.com/v1", "models": ["claude-3-opus-20240229", "claude-3-sonnet-20240229", "claude-3-haiku-20240307"], "headers": {"Content-Type": "application/json", "anthropic-version": "2023-06-01"} }, "deepseek": { "name": "DeepSeek", "base_url": "https://api.deepseek.com/v1", "models": ["deepseek-chat", "deepseek-coder"], "headers": {"Content-Type": "application/json"} } } # Para Kimi, necesitamos configurar un endpoint específico self.custom_models = { "moonshotai/Kimi-K2-Instruct": { "provider": "moonshot", "model_id": "moonshot-v1-8k", # Asumiendo que es compatible "requires_special_handling": True } } async def call_api(self, provider: str, api_key: str, model: str, messages: List[Dict], max_tokens: int = 1000) -> Optional[str]: """Llamar a la API del proveedor seleccionado""" if provider not in self.available_apis and provider not in ["custom", "moonshot"]: logger.error(f"Proveedor no soportado: {provider}") return None try: # Manejo especial para Kimi if model == "moonshotai/Kimi-K2-Instruct": return await self._call_moonshot_kimi(api_key, messages, max_tokens) # Configuración según el proveedor if provider in ["moonshot", "custom"]: base_url = self.available_apis["moonshot"]["base_url"] headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } else: api_config = self.available_apis[provider] base_url = api_config["base_url"] headers = {**api_config["headers"], "Authorization": f"Bearer {api_key}"} # Preparar payload payload = { "model": model, "messages": messages, "max_tokens": max_tokens, "temperature": 0.7, "top_p": 0.95 } # Realizar la llamada url = f"{base_url}/chat/completions" async with aiohttp.ClientSession() as session: async with session.post( url, headers=headers, json=payload, timeout=30 ) as response: if response.status == 200: data = await response.json() return data.get("choices", [{}])[0].get("message", {}).get("content", "") else: error_text = await response.text() logger.error(f"API Error {response.status}: {error_text}") return None except Exception as e: logger.error(f"Error calling API {provider}: {e}") return None async def _call_moonshot_kimi(self, api_key: str, messages: List[Dict], max_tokens: int) -> Optional[str]: """Llamada específica para Kimi de Moonshot""" try: url = "https://api.moonshot.cn/v1/chat/completions" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "model": "moonshot-v1-8k", # Modelo base para Kimi "messages": messages, "max_tokens": max_tokens, "temperature": 0.7, "top_p": 0.95 } async with aiohttp.ClientSession() as session: async with session.post( url, headers=headers, json=payload, timeout=30 ) as response: if response.status == 200: data = await response.json() return data.get("choices", [{}])[0].get("message", {}).get("content", "") else: error_text = await response.text() logger.error(f"Kimi API Error {response.status}: {error_text}") return None except Exception as e: logger.error(f"Error calling Kimi API: {e}") return None # ========== EXTRACTOR DE REFERENCIAS ========== class ReferenceExtractor: """Extrae referencias bibliográficas de texto""" def __init__(self): self.patterns = { "doi": [ r'\b10\.\d{4,9}/[-._;()/:A-Z0-9]+\b', r'doi:\s*(10\.\d{4,9}/[-._;()/:A-Z0-9]+)', r'DOI:\s*(10\.\d{4,9}/[-._;()/:A-Z0-9]+)' ], "arxiv": [ r'arXiv:\s*(\d{4}\.\d{4,5}(v\d+)?)', r'arxiv:\s*([a-z\-]+/\d{7})', r'\b\d{4}\.\d{4,5}(v\d+)?\b' ], "isbn": [ r'ISBN(?:-1[03])?:?\s*(97[89][- ]?)?[0-9]{1,5}[- ]?[0-9]+[- ]?[0-9]+[- ]?[0-9X]', r'\b(?:97[89][- ]?)?[0-9]{1,5}[- ]?[0-9]+[- ]?[0-9]+[- ]?[0-9X]\b' ], "url": [ r'https?://[^\s<>"]+|www\.[^\s<>"]+' ], "pmid": [ r'PMID:\s*(\d+)', r'PubMed ID:\s*(\d+)' ] } def extract_from_text(self, text: str) -> Dict[str, List[str]]: """Extrae todos los identificadores del texto""" results = {} for ref_type, patterns in self.patterns.items(): matches = [] for pattern in patterns: found = re.findall(pattern, text, re.IGNORECASE) # Limpiar los resultados for match in found: if isinstance(match, tuple): match = match[0] if match: match = self._clean_identifier(match, ref_type) if match and match not in matches: matches.append(match) if matches: results[ref_type] = matches return results def _clean_identifier(self, identifier: str, ref_type: str) -> str: """Limpia el identificador""" identifier = identifier.strip() # Eliminar prefijos prefixes = ['doi:', 'DOI:', 'arxiv:', 'arXiv:', 'isbn:', 'ISBN:', 'pmid:', 'PMID:'] for prefix in prefixes: if identifier.startswith(prefix): identifier = identifier[len(prefix):].strip() # Limpiar caracteres identifier = identifier.strip('"\'<>()[]{}') # Para URLs, asegurar protocolo if ref_type == "url" and not identifier.startswith(('http://', 'https://')): identifier = f"https://{identifier}" return identifier # ========== VERIFICADOR DE REFERENCIAS ========== class ReferenceVerifier: """Verifica y descarga referencias""" def __init__(self): self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } async def verify_doi(self, doi: str) -> Dict[str, Any]: """Verifica un DOI y obtiene metadatos""" import requests result = { "identifier": doi, "type": "doi", "verified": False, "metadata": {}, "download_url": None, "error": None } try: # Intentar con Crossref url = f"https://api.crossref.org/works/{doi}" response = requests.get(url, headers=self.headers, timeout=10) if response.status_code == 200: data = response.json() work = data.get('message', {}) result["verified"] = True result["metadata"] = { "title": work.get('title', [''])[0], "authors": work.get('author', []), "journal": work.get('container-title', [''])[0], "year": work.get('published', {}).get('date-parts', [[None]])[0][0], "url": work.get('URL') } # Buscar PDF links = work.get('link', []) for link in links: if link.get('content-type') == 'application/pdf': result["download_url"] = link.get('URL') break # Si no hay PDF en Crossref, probar Unpaywall if not result["download_url"]: unpaywall_url = f"https://api.unpaywall.org/v2/{doi}?email=user@example.com" unpaywall_response = requests.get(unpaywall_url, timeout=10) if unpaywall_response.status_code == 200: unpaywall_data = unpaywall_response.json() if unpaywall_data.get('is_oa'): result["download_url"] = unpaywall_data.get('best_oa_location', {}).get('url') else: result["error"] = f"Crossref API returned {response.status_code}" except Exception as e: result["error"] = str(e) return result async def verify_arxiv(self, arxiv_id: str) -> Dict[str, Any]: """Verifica un arXiv ID""" import requests result = { "identifier": arxiv_id, "type": "arxiv", "verified": False, "metadata": {}, "download_url": None, "error": None } try: # Limpiar ID if 'arxiv:' in arxiv_id.lower(): arxiv_id = arxiv_id.split(':')[-1].strip() # Obtener metadatos api_url = f"http://export.arxiv.org/api/query?id_list={arxiv_id}" response = requests.get(api_url, headers=self.headers, timeout=10) if response.status_code == 200: result["verified"] = True result["download_url"] = f"https://arxiv.org/pdf/{arxiv_id}.pdf" # Parsear metadatos básicos del XML import xml.etree.ElementTree as ET root = ET.fromstring(response.text) ns = {'atom': 'http://www.w3.org/2005/Atom'} entry = root.find('.//atom:entry', ns) if entry is not None: title = entry.find('atom:title', ns) if title is not None: result["metadata"]["title"] = title.text summary = entry.find('atom:summary', ns) if summary is not None: result["metadata"]["abstract"] = summary.text else: result["error"] = f"arXiv API returned {response.status_code}" except Exception as e: result["error"] = str(e) return result async def download_paper(self, url: str, filename: str) -> Optional[str]: """Descarga un paper desde una URL""" import requests import os try: response = requests.get(url, headers=self.headers, stream=True, timeout=30) if response.status_code == 200: # Crear directorio de descargas si no existe os.makedirs("downloads", exist_ok=True) # Determinar extensión content_type = response.headers.get('content-type', '') if 'application/pdf' in content_type: ext = '.pdf' elif 'application/epub' in content_type: ext = '.epub' else: ext = '.pdf' # Por defecto filepath = os.path.join("downloads", f"{filename}{ext}") with open(filepath, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) return filepath except Exception as e: logger.error(f"Error downloading {url}: {e}") return None # ========== SISTEMA PRINCIPAL ========== class BibliographySystem: """Sistema principal de procesamiento bibliográfico""" def __init__(self): self.extractor = ReferenceExtractor() self.verifier = ReferenceVerifier() self.api_provider = APIProvider() # Directorios os.makedirs("downloads", exist_ok=True) os.makedirs("reports", exist_ok=True) async def process_document(self, text: str, use_ai: bool = False, api_provider: str = "openai", api_key: str = "", api_model: str = "") -> Dict[str, Any]: """Procesa un documento y extrae referencias""" start_time = datetime.now() # 1. Extraer referencias logger.info("Extracting references...") references = self.extractor.extract_from_text(text) total_refs = sum(len(v) for v in references.values()) logger.info(f"Found {total_refs} references") # 2. Verificar referencias logger.info("Verifying references...") verified_refs = [] download_tasks = [] # Procesar DOIs for doi in references.get("doi", []): result = await self.verifier.verify_doi(doi) if result["verified"]: verified_refs.append(result) if result["download_url"]: # Programar descarga filename = hashlib.md5(doi.encode()).hexdigest()[:8] download_tasks.append( self.verifier.download_paper(result["download_url"], filename) ) # Procesar arXiv for arxiv_id in references.get("arxiv", []): result = await self.verifier.verify_arxiv(arxiv_id) if result["verified"]: verified_refs.append(result) if result["download_url"]: filename = hashlib.md5(arxiv_id.encode()).hexdigest()[:8] download_tasks.append( self.verifier.download_paper(result["download_url"], filename) ) # 3. Usar IA para análisis si está activado ai_analysis = None if use_ai and api_key and api_provider: logger.info("Using AI for analysis...") ai_analysis = await self._analyze_with_ai( text, references, verified_refs, api_provider, api_key, api_model ) # 4. Descargar archivos logger.info("Downloading files...") downloaded_files = [] if download_tasks: download_results = await asyncio.gather(*download_tasks) downloaded_files = [r for r in download_results if r] # 5. Crear reporte processing_time = (datetime.now() - start_time).total_seconds() report = { "timestamp": datetime.now().isoformat(), "processing_time": processing_time, "total_references_found": total_refs, "references_by_type": references, "verified_references": len(verified_refs), "verification_details": verified_refs, "downloaded_files": downloaded_files, "ai_analysis": ai_analysis, "statistics": { "verification_rate": len(verified_refs) / max(1, total_refs), "download_rate": len(downloaded_files) / max(1, len(verified_refs)) } } # 6. Guardar reporte report_filename = f"report_{hashlib.md5(text.encode()).hexdigest()[:8]}.json" report_path = os.path.join("reports", report_filename) with open(report_path, 'w', encoding='utf-8') as f: json.dump(report, f, indent=2, ensure_ascii=False) # 7. Crear ZIP zip_path = self._create_zip(report, downloaded_files) return { "success": True, "report": report, "zip_path": zip_path, "summary": { "found": total_refs, "verified": len(verified_refs), "downloaded": len(downloaded_files), "time": f"{processing_time:.2f}s" } } async def _analyze_with_ai(self, text: str, references: Dict, verified_refs: List, api_provider: str, api_key: str, api_model: str) -> Optional[Dict]: """Analiza el documento con IA""" try: # Preparar prompt prompt = f"""Analiza el siguiente documento académico y sus referencias: Documento (primeros 2000 caracteres): {text[:2000]}... Referencias encontradas: {json.dumps(references, indent=2, ensure_ascii=False)} Referencias verificadas: {len(verified_refs)} Proporciona un análisis que incluya: 1. Temas principales del documento 2. Calidad de las referencias (relevancia, actualidad) 3. Sugerencias de referencias faltantes 4. Evaluación general de la solidez bibliográfica Responde en formato JSON con las siguientes claves: - main_topics (lista de temas) - reference_quality (score 1-10) - missing_references (sugerencias) - overall_assessment (texto) - recommendations (lista)""" messages = [ {"role": "system", "content": "Eres un experto en análisis bibliográfico académico."}, {"role": "user", "content": prompt} ] # Llamar a la API analysis_text = await self.api_provider.call_api( api_provider, api_key, api_model, messages, max_tokens=1500 ) if analysis_text: # Intentar extraer JSON try: # Buscar JSON en la respuesta json_match = re.search(r'\{.*\}', analysis_text, re.DOTALL) if json_match: return json.loads(json_match.group()) else: return {"raw_analysis": analysis_text} except: return {"raw_analysis": analysis_text} except Exception as e: logger.error(f"AI analysis error: {e}") return None def _create_zip(self, report: Dict, downloaded_files: List[str]) -> str: """Crea un archivo ZIP con los resultados""" import zipfile from datetime import datetime timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") zip_filename = f"bibliography_results_{timestamp}.zip" with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf: # Agregar reporte JSON report_path = os.path.join("reports", f"report_{timestamp}.json") with open(report_path, 'w', encoding='utf-8') as f: json.dump(report, f, indent=2, ensure_ascii=False) zipf.write(report_path, "report.json") # Agregar archivos descargados for file_path in downloaded_files: if os.path.exists(file_path): zipf.write(file_path, f"downloads/{os.path.basename(file_path)}") # Agregar resumen en texto summary = self._generate_summary_text(report) zipf.writestr("summary.txt", summary) return zip_filename def _generate_summary_text(self, report: Dict) -> str: """Genera un resumen en texto""" return f""" RESUMEN DE PROCESAMIENTO BIBLIOGRÁFICO ====================================== Fecha: {report.get('timestamp', 'N/A')} Tiempo de procesamiento: {report.get('processing_time', 0):.2f} segundos ESTADÍSTICAS: ------------ • Referencias encontradas: {report.get('total_references_found', 0)} • Referencias verificadas: {report.get('verified_references', 0)} • Archivos descargados: {len(report.get('downloaded_files', []))} • Tasa de verificación: {report.get('statistics', {}).get('verification_rate', 0) * 100:.1f}% • Tasa de descarga: {report.get('statistics', {}).get('download_rate', 0) * 100:.1f}% REFERENCIAS POR TIPO: --------------------- {json.dumps(report.get('references_by_type', {}), indent=2, ensure_ascii=False)} Para más detalles, consulte el reporte JSON incluido. """ # ========== INTERFAZ GRADIO SIMPLIFICADA ========== def create_simple_interface(): """Crea una interfaz Gradio simple y funcional""" system = BibliographySystem() async def process_text(text_input, use_ai, api_provider, api_key, api_model): """Procesa el texto ingresado""" if not text_input.strip(): return None, "❌ Error: No se ingresó texto", "", "", {} try: result = await system.process_document( text_input, use_ai, api_provider, api_key, api_model ) if result["success"]: summary = result["summary"] # Generar HTML para visualización html_output = f"""