FreeBibTec2 / app.py
C2MV's picture
Update app.py
38e5acc verified
import os
import re
import json
import logging
import zipfile
import asyncio
from typing import Dict, List, Optional, Any
from datetime import datetime
import gradio as gr
from enum import Enum
import hashlib
import aiohttp
# Configuración de logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# ========== CONFIGURACIÓN DE APIs ==========
class APIProvider:
"""Gestor de diferentes APIs de IA"""
def __init__(self):
self.available_apis = {
"nebius": {
"name": "Nebius AI",
"base_url": "https://api.nebius.ai/v1",
"models": ["neural-chat-7b-v3-1", "llama-2-70b-chat", "mistral-7b-instruct"],
"headers": {"Content-Type": "application/json"}
},
"moonshot": {
"name": "Moonshot AI",
"base_url": "https://api.moonshot.cn/v1",
"models": ["moonshot-v1-8k", "moonshot-v1-32k", "moonshot-v1-128k"],
"headers": {"Content-Type": "application/json"}
},
"openai": {
"name": "OpenAI",
"base_url": "https://api.openai.com/v1",
"models": ["gpt-4", "gpt-3.5-turbo", "gpt-4-turbo"],
"headers": {"Content-Type": "application/json"}
},
"anthropic": {
"name": "Anthropic",
"base_url": "https://api.anthropic.com/v1",
"models": ["claude-3-opus-20240229", "claude-3-sonnet-20240229", "claude-3-haiku-20240307"],
"headers": {"Content-Type": "application/json", "anthropic-version": "2023-06-01"}
},
"deepseek": {
"name": "DeepSeek",
"base_url": "https://api.deepseek.com/v1",
"models": ["deepseek-chat", "deepseek-coder"],
"headers": {"Content-Type": "application/json"}
}
}
# Para Kimi, necesitamos configurar un endpoint específico
self.custom_models = {
"moonshotai/Kimi-K2-Instruct": {
"provider": "moonshot",
"model_id": "moonshot-v1-8k", # Asumiendo que es compatible
"requires_special_handling": True
}
}
async def call_api(self, provider: str, api_key: str, model: str,
messages: List[Dict], max_tokens: int = 1000) -> Optional[str]:
"""Llamar a la API del proveedor seleccionado"""
if provider not in self.available_apis and provider not in ["custom", "moonshot"]:
logger.error(f"Proveedor no soportado: {provider}")
return None
try:
# Manejo especial para Kimi
if model == "moonshotai/Kimi-K2-Instruct":
return await self._call_moonshot_kimi(api_key, messages, max_tokens)
# Configuración según el proveedor
if provider in ["moonshot", "custom"]:
base_url = self.available_apis["moonshot"]["base_url"]
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
else:
api_config = self.available_apis[provider]
base_url = api_config["base_url"]
headers = {**api_config["headers"], "Authorization": f"Bearer {api_key}"}
# Preparar payload
payload = {
"model": model,
"messages": messages,
"max_tokens": max_tokens,
"temperature": 0.7,
"top_p": 0.95
}
# Realizar la llamada
url = f"{base_url}/chat/completions"
async with aiohttp.ClientSession() as session:
async with session.post(
url,
headers=headers,
json=payload,
timeout=30
) as response:
if response.status == 200:
data = await response.json()
return data.get("choices", [{}])[0].get("message", {}).get("content", "")
else:
error_text = await response.text()
logger.error(f"API Error {response.status}: {error_text}")
return None
except Exception as e:
logger.error(f"Error calling API {provider}: {e}")
return None
async def _call_moonshot_kimi(self, api_key: str, messages: List[Dict], max_tokens: int) -> Optional[str]:
"""Llamada específica para Kimi de Moonshot"""
try:
url = "https://api.moonshot.cn/v1/chat/completions"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "moonshot-v1-8k", # Modelo base para Kimi
"messages": messages,
"max_tokens": max_tokens,
"temperature": 0.7,
"top_p": 0.95
}
async with aiohttp.ClientSession() as session:
async with session.post(
url,
headers=headers,
json=payload,
timeout=30
) as response:
if response.status == 200:
data = await response.json()
return data.get("choices", [{}])[0].get("message", {}).get("content", "")
else:
error_text = await response.text()
logger.error(f"Kimi API Error {response.status}: {error_text}")
return None
except Exception as e:
logger.error(f"Error calling Kimi API: {e}")
return None
# ========== EXTRACTOR DE REFERENCIAS ==========
class ReferenceExtractor:
"""Extrae referencias bibliográficas de texto"""
def __init__(self):
self.patterns = {
"doi": [
r'\b10\.\d{4,9}/[-._;()/:A-Z0-9]+\b',
r'doi:\s*(10\.\d{4,9}/[-._;()/:A-Z0-9]+)',
r'DOI:\s*(10\.\d{4,9}/[-._;()/:A-Z0-9]+)'
],
"arxiv": [
r'arXiv:\s*(\d{4}\.\d{4,5}(v\d+)?)',
r'arxiv:\s*([a-z\-]+/\d{7})',
r'\b\d{4}\.\d{4,5}(v\d+)?\b'
],
"isbn": [
r'ISBN(?:-1[03])?:?\s*(97[89][- ]?)?[0-9]{1,5}[- ]?[0-9]+[- ]?[0-9]+[- ]?[0-9X]',
r'\b(?:97[89][- ]?)?[0-9]{1,5}[- ]?[0-9]+[- ]?[0-9]+[- ]?[0-9X]\b'
],
"url": [
r'https?://[^\s<>"]+|www\.[^\s<>"]+'
],
"pmid": [
r'PMID:\s*(\d+)',
r'PubMed ID:\s*(\d+)'
]
}
def extract_from_text(self, text: str) -> Dict[str, List[str]]:
"""Extrae todos los identificadores del texto"""
results = {}
for ref_type, patterns in self.patterns.items():
matches = []
for pattern in patterns:
found = re.findall(pattern, text, re.IGNORECASE)
# Limpiar los resultados
for match in found:
if isinstance(match, tuple):
match = match[0]
if match:
match = self._clean_identifier(match, ref_type)
if match and match not in matches:
matches.append(match)
if matches:
results[ref_type] = matches
return results
def _clean_identifier(self, identifier: str, ref_type: str) -> str:
"""Limpia el identificador"""
identifier = identifier.strip()
# Eliminar prefijos
prefixes = ['doi:', 'DOI:', 'arxiv:', 'arXiv:', 'isbn:', 'ISBN:', 'pmid:', 'PMID:']
for prefix in prefixes:
if identifier.startswith(prefix):
identifier = identifier[len(prefix):].strip()
# Limpiar caracteres
identifier = identifier.strip('"\'<>()[]{}')
# Para URLs, asegurar protocolo
if ref_type == "url" and not identifier.startswith(('http://', 'https://')):
identifier = f"https://{identifier}"
return identifier
# ========== VERIFICADOR DE REFERENCIAS ==========
class ReferenceVerifier:
"""Verifica y descarga referencias"""
def __init__(self):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
async def verify_doi(self, doi: str) -> Dict[str, Any]:
"""Verifica un DOI y obtiene metadatos"""
import requests
result = {
"identifier": doi,
"type": "doi",
"verified": False,
"metadata": {},
"download_url": None,
"error": None
}
try:
# Intentar con Crossref
url = f"https://api.crossref.org/works/{doi}"
response = requests.get(url, headers=self.headers, timeout=10)
if response.status_code == 200:
data = response.json()
work = data.get('message', {})
result["verified"] = True
result["metadata"] = {
"title": work.get('title', [''])[0],
"authors": work.get('author', []),
"journal": work.get('container-title', [''])[0],
"year": work.get('published', {}).get('date-parts', [[None]])[0][0],
"url": work.get('URL')
}
# Buscar PDF
links = work.get('link', [])
for link in links:
if link.get('content-type') == 'application/pdf':
result["download_url"] = link.get('URL')
break
# Si no hay PDF en Crossref, probar Unpaywall
if not result["download_url"]:
unpaywall_url = f"https://api.unpaywall.org/v2/{doi}[email protected]"
unpaywall_response = requests.get(unpaywall_url, timeout=10)
if unpaywall_response.status_code == 200:
unpaywall_data = unpaywall_response.json()
if unpaywall_data.get('is_oa'):
result["download_url"] = unpaywall_data.get('best_oa_location', {}).get('url')
else:
result["error"] = f"Crossref API returned {response.status_code}"
except Exception as e:
result["error"] = str(e)
return result
async def verify_arxiv(self, arxiv_id: str) -> Dict[str, Any]:
"""Verifica un arXiv ID"""
import requests
result = {
"identifier": arxiv_id,
"type": "arxiv",
"verified": False,
"metadata": {},
"download_url": None,
"error": None
}
try:
# Limpiar ID
if 'arxiv:' in arxiv_id.lower():
arxiv_id = arxiv_id.split(':')[-1].strip()
# Obtener metadatos
api_url = f"http://export.arxiv.org/api/query?id_list={arxiv_id}"
response = requests.get(api_url, headers=self.headers, timeout=10)
if response.status_code == 200:
result["verified"] = True
result["download_url"] = f"https://arxiv.org/pdf/{arxiv_id}.pdf"
# Parsear metadatos básicos del XML
import xml.etree.ElementTree as ET
root = ET.fromstring(response.text)
ns = {'atom': 'http://www.w3.org/2005/Atom'}
entry = root.find('.//atom:entry', ns)
if entry is not None:
title = entry.find('atom:title', ns)
if title is not None:
result["metadata"]["title"] = title.text
summary = entry.find('atom:summary', ns)
if summary is not None:
result["metadata"]["abstract"] = summary.text
else:
result["error"] = f"arXiv API returned {response.status_code}"
except Exception as e:
result["error"] = str(e)
return result
async def download_paper(self, url: str, filename: str) -> Optional[str]:
"""Descarga un paper desde una URL"""
import requests
import os
try:
response = requests.get(url, headers=self.headers, stream=True, timeout=30)
if response.status_code == 200:
# Crear directorio de descargas si no existe
os.makedirs("downloads", exist_ok=True)
# Determinar extensión
content_type = response.headers.get('content-type', '')
if 'application/pdf' in content_type:
ext = '.pdf'
elif 'application/epub' in content_type:
ext = '.epub'
else:
ext = '.pdf' # Por defecto
filepath = os.path.join("downloads", f"{filename}{ext}")
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return filepath
except Exception as e:
logger.error(f"Error downloading {url}: {e}")
return None
# ========== SISTEMA PRINCIPAL ==========
class BibliographySystem:
"""Sistema principal de procesamiento bibliográfico"""
def __init__(self):
self.extractor = ReferenceExtractor()
self.verifier = ReferenceVerifier()
self.api_provider = APIProvider()
# Directorios
os.makedirs("downloads", exist_ok=True)
os.makedirs("reports", exist_ok=True)
async def process_document(self, text: str, use_ai: bool = False,
api_provider: str = "openai", api_key: str = "",
api_model: str = "") -> Dict[str, Any]:
"""Procesa un documento y extrae referencias"""
start_time = datetime.now()
# 1. Extraer referencias
logger.info("Extracting references...")
references = self.extractor.extract_from_text(text)
total_refs = sum(len(v) for v in references.values())
logger.info(f"Found {total_refs} references")
# 2. Verificar referencias
logger.info("Verifying references...")
verified_refs = []
download_tasks = []
# Procesar DOIs
for doi in references.get("doi", []):
result = await self.verifier.verify_doi(doi)
if result["verified"]:
verified_refs.append(result)
if result["download_url"]:
# Programar descarga
filename = hashlib.md5(doi.encode()).hexdigest()[:8]
download_tasks.append(
self.verifier.download_paper(result["download_url"], filename)
)
# Procesar arXiv
for arxiv_id in references.get("arxiv", []):
result = await self.verifier.verify_arxiv(arxiv_id)
if result["verified"]:
verified_refs.append(result)
if result["download_url"]:
filename = hashlib.md5(arxiv_id.encode()).hexdigest()[:8]
download_tasks.append(
self.verifier.download_paper(result["download_url"], filename)
)
# 3. Usar IA para análisis si está activado
ai_analysis = None
if use_ai and api_key and api_provider:
logger.info("Using AI for analysis...")
ai_analysis = await self._analyze_with_ai(
text, references, verified_refs,
api_provider, api_key, api_model
)
# 4. Descargar archivos
logger.info("Downloading files...")
downloaded_files = []
if download_tasks:
download_results = await asyncio.gather(*download_tasks)
downloaded_files = [r for r in download_results if r]
# 5. Crear reporte
processing_time = (datetime.now() - start_time).total_seconds()
report = {
"timestamp": datetime.now().isoformat(),
"processing_time": processing_time,
"total_references_found": total_refs,
"references_by_type": references,
"verified_references": len(verified_refs),
"verification_details": verified_refs,
"downloaded_files": downloaded_files,
"ai_analysis": ai_analysis,
"statistics": {
"verification_rate": len(verified_refs) / max(1, total_refs),
"download_rate": len(downloaded_files) / max(1, len(verified_refs))
}
}
# 6. Guardar reporte
report_filename = f"report_{hashlib.md5(text.encode()).hexdigest()[:8]}.json"
report_path = os.path.join("reports", report_filename)
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
# 7. Crear ZIP
zip_path = self._create_zip(report, downloaded_files)
return {
"success": True,
"report": report,
"zip_path": zip_path,
"summary": {
"found": total_refs,
"verified": len(verified_refs),
"downloaded": len(downloaded_files),
"time": f"{processing_time:.2f}s"
}
}
async def _analyze_with_ai(self, text: str, references: Dict,
verified_refs: List, api_provider: str,
api_key: str, api_model: str) -> Optional[Dict]:
"""Analiza el documento con IA"""
try:
# Preparar prompt
prompt = f"""Analiza el siguiente documento académico y sus referencias:
Documento (primeros 2000 caracteres):
{text[:2000]}...
Referencias encontradas:
{json.dumps(references, indent=2, ensure_ascii=False)}
Referencias verificadas: {len(verified_refs)}
Proporciona un análisis que incluya:
1. Temas principales del documento
2. Calidad de las referencias (relevancia, actualidad)
3. Sugerencias de referencias faltantes
4. Evaluación general de la solidez bibliográfica
Responde en formato JSON con las siguientes claves:
- main_topics (lista de temas)
- reference_quality (score 1-10)
- missing_references (sugerencias)
- overall_assessment (texto)
- recommendations (lista)"""
messages = [
{"role": "system", "content": "Eres un experto en análisis bibliográfico académico."},
{"role": "user", "content": prompt}
]
# Llamar a la API
analysis_text = await self.api_provider.call_api(
api_provider, api_key, api_model, messages, max_tokens=1500
)
if analysis_text:
# Intentar extraer JSON
try:
# Buscar JSON en la respuesta
json_match = re.search(r'\{.*\}', analysis_text, re.DOTALL)
if json_match:
return json.loads(json_match.group())
else:
return {"raw_analysis": analysis_text}
except:
return {"raw_analysis": analysis_text}
except Exception as e:
logger.error(f"AI analysis error: {e}")
return None
def _create_zip(self, report: Dict, downloaded_files: List[str]) -> str:
"""Crea un archivo ZIP con los resultados"""
import zipfile
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
zip_filename = f"bibliography_results_{timestamp}.zip"
with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
# Agregar reporte JSON
report_path = os.path.join("reports", f"report_{timestamp}.json")
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
zipf.write(report_path, "report.json")
# Agregar archivos descargados
for file_path in downloaded_files:
if os.path.exists(file_path):
zipf.write(file_path, f"downloads/{os.path.basename(file_path)}")
# Agregar resumen en texto
summary = self._generate_summary_text(report)
zipf.writestr("summary.txt", summary)
return zip_filename
def _generate_summary_text(self, report: Dict) -> str:
"""Genera un resumen en texto"""
return f"""
RESUMEN DE PROCESAMIENTO BIBLIOGRÁFICO
======================================
Fecha: {report.get('timestamp', 'N/A')}
Tiempo de procesamiento: {report.get('processing_time', 0):.2f} segundos
ESTADÍSTICAS:
------------
• Referencias encontradas: {report.get('total_references_found', 0)}
• Referencias verificadas: {report.get('verified_references', 0)}
• Archivos descargados: {len(report.get('downloaded_files', []))}
• Tasa de verificación: {report.get('statistics', {}).get('verification_rate', 0) * 100:.1f}%
• Tasa de descarga: {report.get('statistics', {}).get('download_rate', 0) * 100:.1f}%
REFERENCIAS POR TIPO:
---------------------
{json.dumps(report.get('references_by_type', {}), indent=2, ensure_ascii=False)}
Para más detalles, consulte el reporte JSON incluido.
"""
# ========== INTERFAZ GRADIO SIMPLIFICADA ==========
def create_simple_interface():
"""Crea una interfaz Gradio simple y funcional"""
system = BibliographySystem()
async def process_text(text_input, use_ai, api_provider, api_key, api_model):
"""Procesa el texto ingresado"""
if not text_input.strip():
return None, "❌ Error: No se ingresó texto", "", "", {}
try:
result = await system.process_document(
text_input, use_ai, api_provider, api_key, api_model
)
if result["success"]:
summary = result["summary"]
# Generar HTML para visualización
html_output = f"""
<div style="font-family: Arial, sans-serif; padding: 20px;">
<h2 style="color: #2c3e50;">📊 Resultados del Procesamiento</h2>
<div style="background: #ecf0f1; padding: 15px; border-radius: 10px; margin: 15px 0;">
<h3 style="color: #34495e;">📈 Estadísticas</h3>
<div style="display: grid; grid-template-columns: repeat(2, 1fr); gap: 10px;">
<div style="background: white; padding: 10px; border-radius: 5px;">
<strong>Referencias Encontradas</strong><br>
<span style="font-size: 24px; color: #3498db;">{summary['found']}</span>
</div>
<div style="background: white; padding: 10px; border-radius: 5px;">
<strong>Verificadas</strong><br>
<span style="font-size: 24px; color: #2ecc71;">{summary['verified']}</span>
</div>
<div style="background: white; padding: 10px; border-radius: 5px;">
<strong>Descargadas</strong><br>
<span style="font-size: 24px; color: #9b59b6;">{summary['downloaded']}</span>
</div>
<div style="background: white; padding: 10px; border-radius: 5px;">
<strong>Tiempo</strong><br>
<span style="font-size: 24px; color: #e74c3c;">{summary['time']}</span>
</div>
</div>
</div>
</div>
"""
# Generar texto simple
text_output = f"""
Procesamiento completado exitosamente.
• Referencias encontradas: {summary['found']}
• Referencias verificadas: {summary['verified']}
• Archivos descargados: {summary['downloaded']}
• Tiempo de procesamiento: {summary['time']}
El archivo ZIP con los resultados está listo para descargar.
"""
# JSON del reporte (limitado)
report_json = json.dumps(result["report"], indent=2, ensure_ascii=False)
if len(report_json) > 5000:
report_json = report_json[:5000] + "\n... (reporte truncado por tamaño)"
return result["zip_path"], "✅ Procesamiento completado", html_output, text_output, report_json
else:
return None, f"❌ Error: {result.get('error', 'Error desconocido')}", "", "", {}
except Exception as e:
logger.error(f"Processing error: {e}")
return None, f"❌ Error: {str(e)}", "", "", {}
# Crear interfaz simple
with gr.Blocks(title="Sistema de Recopilación Bibliográfica", theme=gr.themes.Soft()) as interface:
gr.Markdown("# 📚 Sistema de Recopilación Bibliográfica")
gr.Markdown("Extrae, verifica y descarga referencias académicas de textos")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### ⚙️ Configuración")
use_ai = gr.Checkbox(
label="Usar IA para análisis avanzado",
value=False
)
api_provider = gr.Dropdown(
choices=["openai", "moonshot", "nebius", "anthropic", "deepseek"],
label="Proveedor de IA",
value="moonshot"
)
api_key = gr.Textbox(
label="API Key",
type="password",
placeholder="Ingresa tu API key"
)
api_model = gr.Textbox(
label="Modelo (opcional)",
value="moonshotai/Kimi-K2-Instruct",
placeholder="Deja vacío para usar el modelo por defecto"
)
gr.Markdown("""
### 🔑 APIs Soportadas
- **Moonshot**: moonshotai/Kimi-K2-Instruct
- **Nebius**: neural-chat-7b-v3-1
- **OpenAI**: gpt-4, gpt-3.5-turbo
- **Anthropic**: Claude 3
- **DeepSeek**: deepseek-chat
""")
with gr.Column(scale=2):
gr.Markdown("### 📄 Ingresar Texto")
text_input = gr.Textbox(
label="Texto con referencias bibliográficas",
placeholder="Pega aquí tu texto con referencias académicas...",
lines=15,
max_lines=50
)
process_btn = gr.Button("🔍 Procesar Texto", variant="primary")
gr.Markdown("### 📦 Resultados")
result_file = gr.File(label="Descargar Resultados (ZIP)")
result_status = gr.Markdown()
with gr.Tabs():
with gr.TabItem("📋 Vista HTML"):
html_output = gr.HTML(label="Resultados Visuales")
with gr.TabItem("📝 Texto"):
text_output = gr.Textbox(
label="Resumen",
lines=10,
max_lines=20
)
with gr.TabItem("🔧 JSON"):
json_output = gr.Code(
label="Datos del Reporte",
language="json",
lines=15
)
# Conectar eventos
process_btn.click(
process_text,
inputs=[text_input, use_ai, api_provider, api_key, api_model],
outputs=[result_file, result_status, html_output, text_output, json_output]
)
# Ejemplos
gr.Markdown("### 📖 Ejemplo de Texto")
gr.Examples(
examples=[["""Este es un ejemplo de texto con referencias académicas.
1. El paper seminal de AlexNet (Krizhevsky et al., 2012) tiene DOI: 10.1145/3065386
2. El trabajo sobre Transformers está en arXiv: arXiv:1706.03762
3. El libro de Deep Learning tiene ISBN: 978-0262035613
4. Más referencias:
- DOI: 10.1038/nature14539
- DOI: 10.1109/CVPR.2016.90
- arXiv: 1506.02640
URLs académicas:
- https://arxiv.org/abs/1706.03762
- https://doi.org/10.1145/3065386"""]],
inputs=[text_input],
label="Ejemplo básico"
)
return interface
# ========== EJECUCIÓN PRINCIPAL ==========
def main():
"""Función principal"""
# Crear e iniciar la interfaz
interface = create_simple_interface()
# Configuración para Hugging Face Spaces
interface.launch(
server_name="0.0.0.0",
server_port=7860,
share=False, # Desactivar share en Spaces
debug=False
)
if __name__ == "__main__":
main()