# References for model evaluation metrics:
# - Chatbot Arena: https://colab.research.google.com/drive/1KdwokPjirkTmpO_P1WByFNFiqxWQquwH
# - Evalica: https://github.com/dustalov/evalica/blob/master/Chatbot-Arena.ipynb
import dotenv
import evalica
import gitlab
import io
import json
import os
import random
import threading
import warnings
import gradio as gr
import pandas as pd
from datetime import datetime
from github import Auth, Github
from urllib.parse import urlparse
from gradio_leaderboard import Leaderboard, ColumnFilter
from huggingface_hub import upload_file, hf_hub_download, HfApi
from openai import OpenAI
# Load environment variables
dotenv.load_dotenv(override=True)
# Initialize OpenAI Client
api_key = os.getenv("OPENROUTER_API_KEY")
base_url = "https://openrouter.ai/api/v1"
openai_client = OpenAI(api_key=api_key, base_url=base_url)
# Hugging Face repository names for data storage
LEADERBOARD_REPO = "SWE-Arena/leaderboard_data"
VOTE_REPO = "SWE-Arena/vote_data"
CONVERSATION_REPO = "SWE-Arena/conversation_data"
LEADERBOARD_FILE = "chatbot_arena"
MODEL_REPO = "SWE-Arena/model_data"
# Timeout in seconds for model responses
TIMEOUT = 90
# Leaderboard update time frame in days
LEADERBOARD_UPDATE_TIME_FRAME_DAYS = 365
# Hint string constant
SHOW_HINT_STRING = True # Set to False to hide the hint string altogether
HINT_STRING = "Once signed in, your votes will be recorded securely."
# Load model metadata from Hugging Face
model_context_window = {}
model_name_to_id = {}
model_organization = {}
available_models = []
_api = HfApi()
for _file in _api.list_repo_files(repo_id=MODEL_REPO, repo_type="dataset"):
if not _file.endswith(".json"):
continue
_local_path = hf_hub_download(repo_id=MODEL_REPO, filename=_file, repo_type="dataset")
with open(_local_path, "r") as f:
_record = json.load(f)
# model_name is derived from the filename (without .json extension)
_model_name = _file.rsplit("/", 1)[-1].replace(".json", "")
available_models.append(_model_name)
model_context_window[_model_name] = _record["context_window"]
model_name_to_id[_model_name] = _record["model_id"]
model_organization[_model_name] = _model_name.split(": ")[0]
# ---------------------------------------------------------------------------
# URL parsing helpers
# ---------------------------------------------------------------------------
def _parse_url_path(url):
"""Parse a URL and return (hostname, path_segments).
Returns:
tuple: (hostname: str, segments: list[str]) where segments
are the non-empty parts of the URL path.
Returns (None, []) if URL cannot be parsed.
"""
try:
parsed = urlparse(url)
hostname = parsed.hostname or ""
segments = [s for s in parsed.path.split("/") if s]
return hostname, segments
except Exception:
return None, []
# ---------------------------------------------------------------------------
# GitHub
# ---------------------------------------------------------------------------
def _classify_github_url(segments):
"""Classify a GitHub URL from its path segments into resource type + params."""
if len(segments) < 2:
return None
owner, repo = segments[0], segments[1]
if repo.endswith(".git"):
repo = repo[:-4]
base = {"owner": owner, "repo": repo}
if len(segments) == 2:
return {**base, "resource": None}
res = segments[2]
if res == "issues" and len(segments) >= 4:
return {**base, "resource": "issues", "id": segments[3]}
elif res == "pull" and len(segments) >= 4:
return {**base, "resource": "pull", "id": segments[3]}
elif res == "commit" and len(segments) >= 4:
return {**base, "resource": "commit", "sha": segments[3]}
elif res == "blob" and len(segments) >= 4:
return {**base, "resource": "blob", "branch": segments[3],
"path": "/".join(segments[4:]) if len(segments) > 4 else ""}
elif res == "tree" and len(segments) >= 4:
return {**base, "resource": "tree", "branch": segments[3],
"path": "/".join(segments[4:]) if len(segments) > 4 else ""}
elif res == "discussions" and len(segments) >= 4:
return {**base, "resource": "discussions", "id": segments[3]}
elif res == "releases" and len(segments) >= 5 and segments[3] == "tag":
return {**base, "resource": "releases", "tag": segments[4]}
elif res == "compare" and len(segments) >= 4:
return {**base, "resource": "compare", "spec": segments[3]}
elif res == "actions" and len(segments) >= 5 and segments[3] == "runs":
return {**base, "resource": "actions", "run_id": segments[4]}
elif res == "wiki":
page = segments[3] if len(segments) >= 4 else None
return {**base, "resource": "wiki", "page": page}
else:
return {**base, "resource": "unknown"}
# -- GitHub formatters -------------------------------------------------------
def _fmt_github_repo(repo):
parts = [f"Repository: {repo.full_name}"]
if repo.description:
parts.append(f"Description: {repo.description}")
try:
readme = repo.get_readme()
content = readme.decoded_content.decode("utf-8", errors="replace")
parts.append(f"README (first 2000 chars):\n{content[:2000]}")
except Exception:
pass
return "\n\n".join(parts)
def _fmt_github_issue(repo, issue_id):
issue = repo.get_issue(issue_id)
parts = [
f"Issue #{issue.number}: {issue.title}",
f"State: {issue.state}",
f"Body:\n{issue.body or '(empty)'}",
]
comments = issue.get_comments()
comment_texts = []
for i, c in enumerate(comments):
if i >= 10:
break
comment_texts.append(f" Comment by {c.user.login}:\n {c.body}")
if comment_texts:
parts.append("Comments (first 10):\n" + "\n---\n".join(comment_texts))
return "\n\n".join(parts)
def _fmt_github_pr(repo, pr_id):
pr = repo.get_pull(pr_id)
parts = [
f"Pull Request #{pr.number}: {pr.title}",
f"State: {pr.state} Merged: {pr.merged}",
f"Body:\n{pr.body or '(empty)'}",
]
diff_parts = []
for f in pr.get_files():
header = f"--- {f.filename} ({f.status}, +{f.additions}/-{f.deletions})"
patch = f.patch or "(binary or too large)"
diff_parts.append(f"{header}\n{patch}")
if diff_parts:
diff_text = "\n\n".join(diff_parts)
if len(diff_text) > 5000:
diff_text = diff_text[:5000] + "\n... (diff truncated)"
parts.append(f"Diff:\n{diff_text}")
return "\n\n".join(parts)
def _fmt_github_commit(repo, sha):
commit = repo.get_commit(sha)
parts = [
f"Commit: {commit.sha}",
f"Message: {commit.commit.message}",
f"Author: {commit.commit.author.name}",
f"Stats: +{commit.stats.additions}/-{commit.stats.deletions}",
]
file_patches = []
for f in commit.files:
file_patches.append(f" {f.filename} ({f.status}): {f.patch or '(binary)'}")
if file_patches:
patch_text = "\n".join(file_patches)
if len(patch_text) > 5000:
patch_text = patch_text[:5000] + "\n... (patch truncated)"
parts.append(f"Files changed:\n{patch_text}")
return "\n\n".join(parts)
def _fmt_github_blob(repo, branch, path):
contents = repo.get_contents(path, ref=branch)
if isinstance(contents, list):
listing = "\n".join(f" {c.path} ({c.type})" for c in contents)
return f"Directory listing at {branch}/{path}:\n{listing}"
content = contents.decoded_content.decode("utf-8", errors="replace")
if len(content) > 5000:
content = content[:5000] + "\n... (content truncated)"
return f"File: {path} (branch: {branch})\n\n{content}"
def _fmt_github_tree(repo, branch, path):
if path:
contents = repo.get_contents(path, ref=branch)
if not isinstance(contents, list):
contents = [contents]
else:
contents = repo.get_contents("", ref=branch)
listing = "\n".join(f" {c.path} ({c.type}, {c.size} bytes)" for c in contents)
return f"Tree at {branch}/{path or '(root)'}:\n{listing}"
_DISCUSSION_GRAPHQL_SCHEMA = """
title
body
number
author { login }
comments(first: 10) {
nodes {
body
author { login }
}
}
"""
def _fmt_github_discussion(repo, discussion_id):
try:
discussion = repo.get_discussion(discussion_id, _DISCUSSION_GRAPHQL_SCHEMA)
parts = [
f"Discussion #{discussion.number}: {discussion.title}",
f"Body:\n{discussion.body or '(empty)'}",
]
if hasattr(discussion, "comments") and discussion.comments:
comment_texts = []
for c in discussion.comments:
author = c.author.login if hasattr(c, "author") and c.author else "unknown"
comment_texts.append(f" Comment by {author}: {c.body}")
if comment_texts:
parts.append("Comments:\n" + "\n---\n".join(comment_texts))
return "\n\n".join(parts)
except Exception as e:
print(f"Discussion fetch failed (GraphQL): {e}")
return None
def _fmt_github_release(repo, tag):
release = repo.get_release(tag)
parts = [
f"Release: {release.title or release.tag_name}",
f"Tag: {release.tag_name}",
f"Body:\n{release.body or '(empty)'}",
]
return "\n\n".join(parts)
def _fmt_github_compare(repo, spec):
if "..." in spec:
base, head = spec.split("...", 1)
elif ".." in spec:
base, head = spec.split("..", 1)
else:
return None
comparison = repo.compare(base, head)
parts = [
f"Comparison: {base}...{head}",
f"Status: {comparison.status}",
f"Ahead by: {comparison.ahead_by}, Behind by: {comparison.behind_by}",
f"Total commits: {comparison.total_commits}",
]
commit_summaries = []
for c in comparison.commits[:20]:
commit_summaries.append(f" {c.sha[:8]}: {c.commit.message.splitlines()[0]}")
if commit_summaries:
parts.append("Commits:\n" + "\n".join(commit_summaries))
file_summaries = []
for f in comparison.files[:30]:
file_summaries.append(f" {f.filename} ({f.status}, +{f.additions}/-{f.deletions})")
if file_summaries:
parts.append("Files changed:\n" + "\n".join(file_summaries))
return "\n\n".join(parts)
def _fmt_github_actions(repo, run_id):
run = repo.get_workflow_run(run_id)
parts = [
f"Workflow Run: {run.name} #{run.run_number}",
f"Status: {run.status} Conclusion: {run.conclusion}",
f"SHA: {run.head_sha}",
]
try:
jobs = run.jobs()
for job in jobs:
if job.conclusion == "failure":
parts.append(f"Failed job: {job.name}")
for step in job.steps:
if step.conclusion == "failure":
parts.append(f" Failed step: {step.name}")
except Exception:
pass
return "\n\n".join(parts)
def _fmt_github_wiki(owner, repo_name, page):
if page:
return f"Wiki page: {page} (from {owner}/{repo_name}/wiki)\nNote: Wiki content cannot be fetched via API."
return f"Wiki: {owner}/{repo_name}/wiki\nNote: Wiki content cannot be fetched via API."
def fetch_github_content(url):
"""Fetch detailed content from a GitHub URL using PyGithub."""
token = os.getenv("GITHUB_TOKEN")
if not token:
print("GITHUB_TOKEN not set.")
return None
g = Github(auth=Auth.Token(token))
hostname, segments = _parse_url_path(url)
if not hostname or "github.com" not in hostname:
return None
info = _classify_github_url(segments)
if not info:
return None
try:
repo = g.get_repo(f"{info['owner']}/{info['repo']}")
resource = info["resource"]
if resource is None:
return _fmt_github_repo(repo)
elif resource == "issues":
return _fmt_github_issue(repo, int(info["id"]))
elif resource == "pull":
return _fmt_github_pr(repo, int(info["id"]))
elif resource == "commit":
return _fmt_github_commit(repo, info["sha"])
elif resource == "blob":
return _fmt_github_blob(repo, info["branch"], info["path"])
elif resource == "tree":
return _fmt_github_tree(repo, info["branch"], info.get("path", ""))
elif resource == "discussions":
return _fmt_github_discussion(repo, int(info["id"]))
elif resource == "releases":
return _fmt_github_release(repo, info["tag"])
elif resource == "compare":
return _fmt_github_compare(repo, info["spec"])
elif resource == "actions":
return _fmt_github_actions(repo, int(info["run_id"]))
elif resource == "wiki":
return _fmt_github_wiki(info["owner"], info["repo"], info.get("page"))
else:
return None
except Exception as e:
print(f"GitHub API error: {e}")
return None
# ---------------------------------------------------------------------------
# GitLab
# ---------------------------------------------------------------------------
def _classify_gitlab_url(segments):
"""Classify a GitLab URL from its path segments.
GitLab uses /-/ as separator between project path and resource.
Project paths can be nested: group/subgroup/project.
"""
try:
dash_idx = segments.index("-")
except ValueError:
# No /-/ separator -- treat all segments as the project path
if len(segments) >= 2:
return {"project_path": "/".join(segments), "resource": None}
return None
project_path = "/".join(segments[:dash_idx])
res_segments = segments[dash_idx + 1:]
if not project_path or not res_segments:
return {"project_path": project_path, "resource": None}
res = res_segments[0]
if res == "issues" and len(res_segments) >= 2:
return {"project_path": project_path, "resource": "issues", "id": res_segments[1]}
elif res == "merge_requests" and len(res_segments) >= 2:
return {"project_path": project_path, "resource": "merge_requests", "id": res_segments[1]}
elif res in ("commit", "commits") and len(res_segments) >= 2:
return {"project_path": project_path, "resource": "commit", "sha": res_segments[1]}
elif res == "blob" and len(res_segments) >= 2:
branch = res_segments[1]
file_path = "/".join(res_segments[2:]) if len(res_segments) > 2 else ""
return {"project_path": project_path, "resource": "blob", "branch": branch, "path": file_path}
elif res == "tree" and len(res_segments) >= 2:
branch = res_segments[1]
tree_path = "/".join(res_segments[2:]) if len(res_segments) > 2 else ""
return {"project_path": project_path, "resource": "tree", "branch": branch, "path": tree_path}
elif res == "releases" and len(res_segments) >= 2:
return {"project_path": project_path, "resource": "releases", "tag": res_segments[1]}
elif res == "compare" and len(res_segments) >= 2:
return {"project_path": project_path, "resource": "compare", "spec": res_segments[1]}
elif res == "pipelines" and len(res_segments) >= 2:
return {"project_path": project_path, "resource": "pipelines", "id": res_segments[1]}
elif res == "wikis":
page = res_segments[1] if len(res_segments) >= 2 else None
return {"project_path": project_path, "resource": "wikis", "page": page}
else:
return {"project_path": project_path, "resource": "unknown"}
# -- GitLab formatters -------------------------------------------------------
def _fmt_gitlab_repo(project):
parts = [f"Repository: {project.path_with_namespace}"]
if project.description:
parts.append(f"Description: {project.description}")
try:
readme = project.files.get(file_path="README.md", ref=project.default_branch)
content = readme.decode().decode("utf-8", errors="replace")
parts.append(f"README (first 2000 chars):\n{content[:2000]}")
except Exception:
pass
return "\n\n".join(parts)
def _fmt_gitlab_issue(project, issue_id):
issue = project.issues.get(issue_id)
parts = [
f"Issue #{issue.iid}: {issue.title}",
f"State: {issue.state}",
f"Body:\n{issue.description or '(empty)'}",
]
notes = issue.notes.list(get_all=False, per_page=10)
note_texts = [f" Comment by {n.author['username']}: {n.body}" for n in notes]
if note_texts:
parts.append("Comments (first 10):\n" + "\n---\n".join(note_texts))
return "\n\n".join(parts)
def _fmt_gitlab_mr(project, mr_id):
mr = project.mergerequests.get(mr_id)
parts = [
f"Merge Request !{mr.iid}: {mr.title}",
f"State: {mr.state}",
f"Body:\n{mr.description or '(empty)'}",
]
try:
changes = mr.changes()
if isinstance(changes, dict) and "changes" in changes:
diff_parts = []
for change in changes["changes"][:30]:
diff_parts.append(f" {change.get('new_path', '?')}: {change.get('diff', '')[:500]}")
if diff_parts:
diff_text = "\n".join(diff_parts)
if len(diff_text) > 5000:
diff_text = diff_text[:5000] + "\n... (diff truncated)"
parts.append(f"Changes:\n{diff_text}")
except Exception:
pass
return "\n\n".join(parts)
def _fmt_gitlab_commit(project, sha):
commit = project.commits.get(sha)
parts = [
f"Commit: {commit.id}",
f"Title: {commit.title}",
f"Message: {commit.message}",
f"Author: {commit.author_name}",
]
try:
diffs = commit.diff()
diff_parts = []
for d in diffs[:30]:
diff_parts.append(f" {d.get('new_path', '?')}: {d.get('diff', '')[:500]}")
if diff_parts:
diff_text = "\n".join(diff_parts)
if len(diff_text) > 5000:
diff_text = diff_text[:5000] + "\n... (diff truncated)"
parts.append(f"Diff:\n{diff_text}")
except Exception:
pass
return "\n\n".join(parts)
def _fmt_gitlab_blob(project, branch, path):
f = project.files.get(file_path=path, ref=branch)
content = f.decode().decode("utf-8", errors="replace")
if len(content) > 5000:
content = content[:5000] + "\n... (content truncated)"
return f"File: {path} (branch: {branch})\n\n{content}"
def _fmt_gitlab_tree(project, branch, path):
items = project.repository_tree(path=path or "", ref=branch, get_all=False, per_page=100)
listing = "\n".join(f" {item['path']} ({item['type']})" for item in items)
return f"Tree at {branch}/{path or '(root)'}:\n{listing}"
def _fmt_gitlab_release(project, tag):
release = project.releases.get(tag)
parts = [
f"Release: {release.name or release.tag_name}",
f"Tag: {release.tag_name}",
f"Description:\n{release.description or '(empty)'}",
]
return "\n\n".join(parts)
def _fmt_gitlab_compare(project, spec):
if "..." in spec:
base, head = spec.split("...", 1)
elif ".." in spec:
base, head = spec.split("..", 1)
else:
return None
result = project.repository_compare(base, head)
parts = [f"Comparison: {base}...{head}"]
if isinstance(result, dict):
commits = result.get("commits", [])
commit_summaries = []
for c in commits[:20]:
commit_summaries.append(f" {c.get('short_id', '?')}: {c.get('title', '')}")
if commit_summaries:
parts.append("Commits:\n" + "\n".join(commit_summaries))
diffs = result.get("diffs", [])
diff_parts = []
for d in diffs[:30]:
diff_parts.append(f" {d.get('new_path', '?')}: {d.get('diff', '')[:500]}")
if diff_parts:
diff_text = "\n".join(diff_parts)
if len(diff_text) > 5000:
diff_text = diff_text[:5000] + "\n... (diff truncated)"
parts.append(f"Diffs:\n{diff_text}")
return "\n\n".join(parts)
def _fmt_gitlab_pipeline(project, pipeline_id):
pipeline = project.pipelines.get(pipeline_id)
parts = [
f"Pipeline #{pipeline.id}",
f"Status: {pipeline.status}",
f"Ref: {pipeline.ref}",
f"SHA: {pipeline.sha}",
]
try:
jobs = pipeline.jobs.list(get_all=False, per_page=20)
failed_jobs = [j for j in jobs if j.status == "failed"]
if failed_jobs:
parts.append("Failed jobs:")
for j in failed_jobs:
parts.append(f" {j.name}: {j.status} (stage: {j.stage})")
except Exception:
pass
return "\n\n".join(parts)
def _fmt_gitlab_wiki(project, page):
if page:
try:
wiki_page = project.wikis.get(page)
return f"Wiki page: {wiki_page.title}\n\n{wiki_page.content}"
except Exception:
return f"Wiki page: {page}\nNote: Could not fetch wiki page content."
try:
pages = project.wikis.list(get_all=False, per_page=20)
listing = "\n".join(f" {p.slug}: {p.title}" for p in pages)
return f"Wiki pages:\n{listing}"
except Exception:
return "Wiki: Could not fetch wiki pages."
def fetch_gitlab_content(url):
"""Fetch content from GitLab URL using python-gitlab."""
token = os.getenv("GITLAB_TOKEN")
if not token:
print("GITLAB_TOKEN not set.")
return None
gl = gitlab.Gitlab("https://gitlab.com", private_token=token)
hostname, segments = _parse_url_path(url)
if not hostname or "gitlab.com" not in hostname:
return None
info = _classify_gitlab_url(segments)
if not info:
return None
try:
project = gl.projects.get(info["project_path"])
resource = info["resource"]
if resource is None:
return _fmt_gitlab_repo(project)
elif resource == "issues":
return _fmt_gitlab_issue(project, int(info["id"]))
elif resource == "merge_requests":
return _fmt_gitlab_mr(project, int(info["id"]))
elif resource == "commit":
return _fmt_gitlab_commit(project, info["sha"])
elif resource == "blob":
return _fmt_gitlab_blob(project, info["branch"], info["path"])
elif resource == "tree":
return _fmt_gitlab_tree(project, info["branch"], info.get("path", ""))
elif resource == "releases":
return _fmt_gitlab_release(project, info["tag"])
elif resource == "compare":
return _fmt_gitlab_compare(project, info["spec"])
elif resource == "pipelines":
return _fmt_gitlab_pipeline(project, int(info["id"]))
elif resource == "wikis":
return _fmt_gitlab_wiki(project, info.get("page"))
else:
return None
except Exception as e:
print(f"GitLab API error: {e}")
return None
# ---------------------------------------------------------------------------
# HuggingFace
# ---------------------------------------------------------------------------
def _classify_huggingface_url(segments):
"""Classify a HuggingFace URL from its path segments.
HF URLs:
- huggingface.co/{user}/{repo} -> model
- huggingface.co/datasets/{user}/{repo} -> dataset
- huggingface.co/spaces/{user}/{repo} -> space
"""
if not segments:
return None
# Detect repo_type prefix
repo_type = None
segs = list(segments)
if segs[0] in ("datasets", "spaces"):
repo_type = segs[0].rstrip("s") # "dataset" or "space"
segs = segs[1:]
if len(segs) < 2:
return None
repo_id = f"{segs[0]}/{segs[1]}"
base = {"repo_id": repo_id, "repo_type": repo_type}
if len(segs) == 2:
return {**base, "resource": None}
res = segs[2]
if res == "blob" and len(segs) >= 4:
return {**base, "resource": "blob", "revision": segs[3],
"path": "/".join(segs[4:]) if len(segs) > 4 else ""}
elif res == "resolve" and len(segs) >= 4:
return {**base, "resource": "resolve", "revision": segs[3],
"path": "/".join(segs[4:]) if len(segs) > 4 else ""}
elif res == "tree" and len(segs) >= 4:
return {**base, "resource": "tree", "revision": segs[3],
"path": "/".join(segs[4:]) if len(segs) > 4 else ""}
elif res == "commit" and len(segs) >= 4:
return {**base, "resource": "commit", "sha": segs[3]}
elif res == "discussions" and len(segs) >= 4:
return {**base, "resource": "discussions", "num": segs[3]}
else:
return {**base, "resource": "unknown"}
# -- HuggingFace formatters --------------------------------------------------
def _fmt_hf_repo(api, repo_id, repo_type):
info = api.repo_info(repo_id=repo_id, repo_type=repo_type)
parts = [f"Repository: {repo_id}"]
if hasattr(info, "description") and info.description:
parts.append(f"Description: {info.description}")
if hasattr(info, "card_data") and info.card_data:
parts.append(f"Card data: {str(info.card_data)[:1000]}")
try:
readme_path = api.hf_hub_download(
repo_id=repo_id, filename="README.md", repo_type=repo_type
)
with open(readme_path, "r", errors="replace") as f:
content = f.read()[:2000]
parts.append(f"README (first 2000 chars):\n{content}")
except Exception:
pass
return "\n\n".join(parts)
def _fmt_hf_commit(api, repo_id, repo_type, sha):
commits = api.list_repo_commits(repo_id=repo_id, revision=sha, repo_type=repo_type)
if commits:
c = commits[0]
return (
f"Commit: {c.commit_id}\n"
f"Title: {c.title}\n"
f"Message: {c.message}\n"
f"Authors: {', '.join(c.authors) if c.authors else 'unknown'}\n"
f"Date: {c.created_at}"
)
return None
def _fmt_hf_discussion(api, repo_id, repo_type, discussion_num):
discussion = api.get_discussion_details(
repo_id=repo_id, discussion_num=discussion_num, repo_type=repo_type
)
parts = [
f"Discussion #{discussion.num}: {discussion.title}",
f"Status: {discussion.status}",
f"Author: {discussion.author}",
f"Is Pull Request: {discussion.is_pull_request}",
]
comment_texts = []
for event in discussion.events:
if hasattr(event, "content") and event.content:
author = event.author if hasattr(event, "author") else "unknown"
comment_texts.append(f" {author}: {event.content[:500]}")
if len(comment_texts) >= 10:
break
if comment_texts:
parts.append("Comments:\n" + "\n---\n".join(comment_texts))
return "\n\n".join(parts)
def _fmt_hf_file(api, repo_id, repo_type, revision, path):
local_path = api.hf_hub_download(
repo_id=repo_id, filename=path, revision=revision, repo_type=repo_type
)
try:
with open(local_path, "r", errors="replace") as f:
content = f.read()
if len(content) > 5000:
content = content[:5000] + "\n... (content truncated)"
return f"File: {path} (revision: {revision})\n\n{content}"
except Exception:
return f"File: {path} (revision: {revision})\n(binary or unreadable file)"
def _fmt_hf_tree(api, repo_id, repo_type, revision, path):
items = api.list_repo_tree(
repo_id=repo_id, path_in_repo=path or None,
revision=revision, repo_type=repo_type
)
listing = []
for item in items:
if hasattr(item, "size") and item.size is not None:
listing.append(f" {item.rfilename} (file, {item.size} bytes)")
else:
listing.append(f" {item.rfilename} (folder)")
if len(listing) >= 100:
listing.append(" ... (truncated)")
break
return f"Tree at {revision}/{path or '(root)'}:\n" + "\n".join(listing)
def fetch_huggingface_content(url):
"""Fetch detailed content from a Hugging Face URL using huggingface_hub API."""
token = os.getenv("HF_TOKEN")
if not token:
print("HF_TOKEN not set.")
return None
api = HfApi(token=token)
hostname, segments = _parse_url_path(url)
if not hostname or "huggingface.co" not in hostname:
return None
info = _classify_huggingface_url(segments)
if not info:
return None
try:
resource = info["resource"]
repo_id = info["repo_id"]
repo_type = info["repo_type"]
if resource is None:
return _fmt_hf_repo(api, repo_id, repo_type)
elif resource == "commit":
return _fmt_hf_commit(api, repo_id, repo_type, info["sha"])
elif resource == "discussions":
return _fmt_hf_discussion(api, repo_id, repo_type, int(info["num"]))
elif resource in ("blob", "resolve"):
return _fmt_hf_file(api, repo_id, repo_type, info["revision"], info["path"])
elif resource == "tree":
return _fmt_hf_tree(api, repo_id, repo_type, info["revision"], info.get("path", ""))
else:
return None
except Exception as e:
print(f"Hugging Face API error: {e}")
return None
# ---------------------------------------------------------------------------
# URL router
# ---------------------------------------------------------------------------
def fetch_url_content(url):
"""Main URL content fetcher that routes to platform-specific handlers."""
if not url or not url.strip():
return ""
url = url.strip()
try:
hostname, _ = _parse_url_path(url)
if hostname and "github.com" in hostname:
return fetch_github_content(url)
elif hostname and "gitlab.com" in hostname:
return fetch_gitlab_content(url)
elif hostname and "huggingface.co" in hostname:
return fetch_huggingface_content(url)
except Exception as e:
print(f"Error fetching URL content: {e}")
return ""
# Truncate prompt
def truncate_prompt(model_alias, models, conversation_state):
"""
Truncate the conversation history and user input to fit within the model's context window.
Args:
model_alias (str): Alias for the model being used (i.e., "left", "right").
models (dict): Dictionary mapping model aliases to their names.
conversation_state (dict): State containing the conversation history for all models.
Returns:
str: Truncated conversation history and user input.
"""
# Get the full conversation history for the model
full_conversation = conversation_state[f"{model_alias}_chat"]
# Get the context length for the model
context_length = model_context_window[models[model_alias]]
# Single loop to handle both FIFO removal and content truncation
while len(json.dumps(full_conversation)) > context_length:
# If we have more than one message, remove the oldest (FIFO)
if len(full_conversation) > 1:
full_conversation.pop(0)
# If only one message remains, truncate its content
else:
current_length = len(json.dumps(full_conversation))
# Calculate how many characters we need to remove
excess = current_length - context_length
# Add a buffer to ensure we remove enough (accounting for JSON encoding)
truncation_size = min(excess + 10, len(full_conversation[0]["content"]))
if truncation_size <= 0:
break # Can't truncate further
# Truncate the content from the end to fit
full_conversation[0]["content"] = full_conversation[0]["content"][
:-truncation_size
]
return full_conversation
def chat_with_models(model_alias, models, conversation_state, timeout=TIMEOUT):
truncated_input = truncate_prompt(model_alias, models, conversation_state)
response_event = threading.Event() # Event to signal response completion
model_response = {"content": None, "error": None}
def request_model_response():
try:
# Get model_id from the model_name using the mapping
model_name = models[model_alias]
model_id = model_name_to_id.get(model_name, model_name)
request_params = {"model": model_id, "messages": truncated_input}
response = openai_client.chat.completions.create(**request_params)
model_response["content"] = response.choices[0].message.content
except Exception as e:
model_response["error"] = (
f"{models[model_alias]} model is not available. Error: {e}"
)
finally:
response_event.set() # Signal that the response is completed
# Start the model request in a separate thread
response_thread = threading.Thread(target=request_model_response)
response_thread.start()
# Wait for the specified timeout
response_event_occurred = response_event.wait(timeout)
if not response_event_occurred:
raise TimeoutError(
f"The {model_alias} model did not respond within {timeout} seconds."
)
elif model_response["error"]:
raise Exception(model_response["error"])
else:
# Get the full conversation history for the model
model_key = f"{model_alias}_chat"
# Add the model's response to the conversation state
conversation_state[model_key].append(
{"role": "assistant", "content": model_response["content"]}
)
# Format the complete conversation history with different colors
formatted_history = format_conversation_history(
conversation_state[model_key][1:]
)
return formatted_history
def format_conversation_history(conversation_history):
"""
Format the conversation history with different colors for user and model messages.
Args:
conversation_history (list): List of conversation messages with role and content.
Returns:
str: Markdown formatted conversation history.
"""
formatted_text = ""
for message in conversation_history:
if message["role"] == "user":
# Format user messages with blue text
formatted_text += f"
User: {message['content']}
\n\n"
else:
# Format assistant messages with dark green text
formatted_text += f"Model: {message['content']}
\n\n"
return formatted_text
def save_content_to_hf(data, repo_name, file_name, token=None):
"""
Save feedback content to Hugging Face repository.
"""
# Serialize the content to JSON and encode it as bytes
json_content = json.dumps(data, indent=4).encode("utf-8")
# Create a binary file-like object
file_like_object = io.BytesIO(json_content)
# Define the path in the repository
filename = f"{file_name}.json"
# Ensure the user is authenticated with HF
if token is None:
token = HfApi().token
if token is None:
raise ValueError("Please log in to Hugging Face to submit votes.")
# Upload to Hugging Face repository
upload_file(
path_or_fileobj=file_like_object,
path_in_repo=filename,
repo_id=repo_name,
repo_type="dataset",
token=token,
)
def is_file_within_time_frame(file_path, days):
try:
# Extract timestamp from filename
timestamp_str = file_path.split("/")[-1].split(".")[0]
file_datetime = datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S")
time_diff = datetime.now() - file_datetime
return time_diff.days <= days
except:
return False
def load_content_from_hf(repo_name, file_name):
"""
Read feedback content from a Hugging Face repository within the last LEADERBOARD_UPDATE_TIME_FRAME_DAYS days.
Args:
repo_name (str): Hugging Face repository name.
file_name (str): Only load files under this prefix directory.
Returns:
list: Aggregated feedback data read from the repository.
"""
data = []
try:
api = HfApi()
# List all files in the repository, only under the file_name
for file in api.list_repo_files(repo_id=repo_name, repo_type="dataset"):
if not file.startswith(f"{file_name}/"):
continue
# Filter files by last LEADERBOARD_UPDATE_TIME_FRAME_DAYS days
if not is_file_within_time_frame(file, LEADERBOARD_UPDATE_TIME_FRAME_DAYS):
continue
# Download and aggregate data
local_path = hf_hub_download(
repo_id=repo_name, filename=file, repo_type="dataset"
)
with open(local_path, "r") as f:
entry = json.load(f)
entry["timestamp"] = file.split("/")[-1].split(".")[0]
data.append(entry)
return data
except:
raise Exception("Error loading feedback data from Hugging Face repository.")
def get_leaderboard_data(vote_entry=None, use_cache=True):
# Try to load cached leaderboard first
if use_cache:
try:
cached_path = hf_hub_download(
repo_id=LEADERBOARD_REPO,
filename=f'{LEADERBOARD_FILE}.json',
repo_type="dataset",
)
with open(cached_path, "r") as f:
leaderboard_data = pd.read_json(f)
# Round all numeric columns to two decimal places
round_cols = {
"Elo Score": 2,
"Win Rate": 2,
"Conversation Efficiency Index": 2,
"Consistency Score": 2,
"Bradley-Terry Coefficient": 2,
"Eigenvector Centrality Value": 2,
"Newman Modularity Score": 2,
"PageRank Score": 2,
}
for col, decimals in round_cols.items():
if col in leaderboard_data.columns:
leaderboard_data[col] = pd.to_numeric(leaderboard_data[col], errors="coerce").round(decimals)
return leaderboard_data
except Exception as e:
print(f"No cached leaderboard found, computing from votes...")
# Load feedback data from the Hugging Face repository
data = load_content_from_hf(VOTE_REPO, LEADERBOARD_FILE)
vote_df = pd.DataFrame(data)
# Concatenate the new feedback with the existing leaderboard data
if vote_entry is not None:
vote_df = pd.concat([vote_df, pd.DataFrame([vote_entry])], ignore_index=True)
if vote_df.empty:
return pd.DataFrame(
columns=[
"Rank",
"Model",
"Elo Score",
"Win Rate",
"Conversation Efficiency Index",
"Consistency Score",
"Bradley-Terry Coefficient",
"Eigenvector Centrality Value",
"Newman Modularity Score",
"PageRank Score",
]
)
# Load conversation data from the Hugging Face repository
conversation_data = load_content_from_hf(CONVERSATION_REPO, LEADERBOARD_FILE)
conversation_df = pd.DataFrame(conversation_data)
# Merge vote data with conversation data
all_df = pd.merge(
vote_df, conversation_df, on=["timestamp", "left", "right"], how="inner"
)
# Create dictionaries to track scores and match counts
model_stats = {}
# Process each row once and accumulate scores
for _, row in all_df.iterrows():
left_model = row["left"]
right_model = row["right"]
is_self_match = left_model == right_model
# Initialize dictionaries for models if they don't exist yet
for model in [left_model, right_model]:
if model not in model_stats:
model_stats[model] = {
"cei_sum": 0, # Sum of per-round scores
"cei_max": 0, # Sum of per-round maximums
"self_matches": 0, # Count of self-matches
"self_draws": 0, # Count of draws in self-matches
}
# Handle self-matches (same model on both sides)
if is_self_match:
model_stats[left_model]["self_matches"] += 1
if row["winner"] == "both_bad" or row["winner"] == "tie":
model_stats[left_model]["self_draws"] += 1
continue
# Determine scores based on winner for competitive matches
match row["winner"]:
case "left":
left_score = 1
right_score = -1
case "right":
left_score = -1
right_score = 1
case "tie":
left_score = 0.3
right_score = 0.3
case "both_bad":
left_score = -0.3
right_score = -0.3
# Count rounds for each side
left_round = sum(1 for msg in row["left_chat"] if msg["role"] == "assistant")
right_round = sum(1 for msg in row["right_chat"] if msg["role"] == "assistant")
# Update CEI metrics
model_stats[left_model]["cei_max"] += 1 / left_round
model_stats[right_model]["cei_max"] += 1 / right_round
model_stats[left_model]["cei_sum"] += left_score / left_round
model_stats[right_model]["cei_sum"] += right_score / right_round
# map vote to winner
vote_df["winner"] = vote_df["winner"].map(
{
"left": evalica.Winner.X,
"right": evalica.Winner.Y,
"tie": evalica.Winner.Draw,
"both_bad": evalica.Winner.Draw,
}
)
# Calculate scores using various metrics
avr_result = evalica.average_win_rate(
vote_df["left"],
vote_df["right"],
vote_df["winner"],
tie_weight=0, # Chatbot Arena excludes ties
)
bt_result = evalica.bradley_terry(
vote_df["left"], vote_df["right"], vote_df["winner"], tie_weight=0
)
newman_result = evalica.newman(
vote_df["left"], vote_df["right"], vote_df["winner"], tie_weight=0
)
eigen_result = evalica.eigen(
vote_df["left"], vote_df["right"], vote_df["winner"], tie_weight=0
)
elo_result = evalica.elo(
vote_df["left"], vote_df["right"], vote_df["winner"], tie_weight=0
)
pagerank_result = evalica.pagerank(
vote_df["left"], vote_df["right"], vote_df["winner"], tie_weight=0
)
# Clean up potential inf/NaN values in the results by extracting cleaned scores
avr_scores = avr_result.scores.replace([float("inf"), float("-inf")], float("nan"))
bt_scores = bt_result.scores.replace([float("inf"), float("-inf")], float("nan"))
newman_scores = newman_result.scores.replace([float("inf"), float("-inf")], float("nan"))
eigen_scores = eigen_result.scores.replace([float("inf"), float("-inf")], float("nan"))
elo_scores = elo_result.scores.replace([float("inf"), float("-inf")], float("nan"))
pagerank_scores = pagerank_result.scores.replace([float("inf"), float("-inf")], float("nan"))
# Calculate CEI results
cei_result = {}
for model in elo_scores.index:
if model in model_stats and model_stats[model]["cei_max"] > 0:
cei_result[model] = round(
model_stats[model]["cei_sum"] / model_stats[model]["cei_max"], 2
)
else:
cei_result[model] = None
cei_result = pd.Series(cei_result)
# Calculate MCS results
mcs_result = {}
for model in elo_scores.index:
if model in model_stats and model_stats[model]["self_matches"] > 0:
mcs_result[model] = round(
model_stats[model]["self_draws"] / model_stats[model]["self_matches"], 2
)
else:
mcs_result[model] = None
mcs_result = pd.Series(mcs_result)
organization_values = [model_organization.get(model, "") for model in elo_scores.index]
leaderboard_data = pd.DataFrame(
{
"Model": [name.split(": ", 1)[-1] for name in elo_scores.index],
"Organization": organization_values,
"Elo Score": elo_scores.values,
"Win Rate": avr_scores.values,
"Conversation Efficiency Index": cei_result.values,
"Consistency Score": mcs_result.values,
"Bradley-Terry Coefficient": bt_scores.values,
"Eigenvector Centrality Value": eigen_scores.values,
"Newman Modularity Score": newman_scores.values,
"PageRank Score": pagerank_scores.values,
}
)
# Round all numeric columns to two decimal places
round_cols = {
"Elo Score": 2,
"Win Rate": 2,
"Bradley-Terry Coefficient": 2,
"Eigenvector Centrality Value": 2,
"Newman Modularity Score": 2,
"PageRank Score": 2,
}
for col, decimals in round_cols.items():
if col in leaderboard_data.columns:
leaderboard_data[col] = pd.to_numeric(leaderboard_data[col], errors="coerce").round(decimals)
# Add a Rank column based on Elo scores
leaderboard_data["Rank"] = (
leaderboard_data["Elo Score"].rank(method="min", ascending=False).astype(int)
)
# Place rank in the first column
leaderboard_data = leaderboard_data[
["Rank"] + [col for col in leaderboard_data.columns if col != "Rank"]
]
# Save leaderboard data if this is a new vote
if vote_entry is not None:
try:
# Convert DataFrame to JSON and save
json_content = leaderboard_data.to_json(orient="records", indent=4).encode(
"utf-8"
)
file_like_object = io.BytesIO(json_content)
upload_file(
path_or_fileobj=file_like_object,
path_in_repo=f'{LEADERBOARD_FILE}.json',
repo_id=LEADERBOARD_REPO,
repo_type="dataset",
token=HfApi().token,
)
except Exception as e:
print(f"Failed to save leaderboard cache: {e}")
return leaderboard_data
# Function to enable or disable submit buttons based on textbox content
def toggle_submit_button(text):
if not text or text.strip() == "":
return gr.update(interactive=False) # Disable the button
else:
return gr.update(interactive=True) # Enable the button
# Function to check initial authentication status
def check_auth_on_load(request: gr.Request):
"""Check if user is already authenticated when page loads."""
# Try to get token from environment (for Spaces) or HfApi (for local)
token = os.getenv("HF_TOKEN") or HfApi().token
# Check if user is authenticated via OAuth
is_authenticated = (hasattr(request, 'username') and request.username is not None and request.username != "")
if is_authenticated or token:
# User is logged in OR we have a token available
return (
gr.update(interactive=True), # repo_url
gr.update(interactive=True), # shared_input
gr.update(interactive=False), # send_first (disabled until text entered)
gr.update(interactive=True), # feedback
gr.update(interactive=True), # submit_feedback_btn
gr.update(visible=False), # hint_markdown
gr.update(visible=True), # login_button (keep visible for logout)
gr.update(visible=False), # refresh_auth_button (hide when authenticated)
token, # oauth_token
)
else:
# User not logged in
return (
gr.update(interactive=False), # repo_url
gr.update(interactive=False), # shared_input
gr.update(interactive=False), # send_first
gr.update(interactive=False), # feedback
gr.update(interactive=False), # submit_feedback_btn
gr.update(visible=True), # hint_markdown
gr.update(visible=True), # login_button
gr.update(visible=True), # refresh_auth_button (show when not authenticated)
None, # oauth_token
)
# Suppress the deprecation warning for theme parameter until Gradio 6.0 is released
warnings.filterwarnings('ignore', category=DeprecationWarning, message=".*'theme' parameter.*")
with gr.Blocks(title="SWE-Chatbot-Arena", theme=gr.themes.Soft()) as app:
user_authenticated = gr.State(False)
models_state = gr.State({})
conversation_state = gr.State({})
# Add OAuth information state to track user
oauth_token = gr.State(None)
with gr.Tab("🏆Leaderboard"):
# Add title and description as a Markdown component
gr.Markdown("# 🏆 LLM4SE Leaderboard")
gr.Markdown(
"Community-Driven Evaluation of Top Large Language Models (LLMs) in Software Engineering (SE) Tasks"
)
gr.Markdown(
"*The SWE-Chatbot-Arena is an open-source platform designed to evaluate LLMs through human preference, "
"fostering transparency and collaboration. This platform aims to empower the SE community to assess and compare the "
"performance of leading LLMs in related tasks. For technical details, check out our [paper](https://arxiv.org/abs/2502.01860).*"
)
# Initialize the leaderboard with the DataFrame containing the expected columns
leaderboard_component = Leaderboard(
value=get_leaderboard_data(use_cache=True),
select_columns=[
"Rank",
"Model",
"Organization",
"Elo Score",
"Conversation Efficiency Index",
"Consistency Score",
],
search_columns=["Model"],
filter_columns=[
ColumnFilter(
"Elo Score",
min=800,
max=1600,
default=[800, 1600],
type="slider",
label="Elo Score"
),
ColumnFilter(
"Win Rate",
min=0,
max=1,
default=[0, 1],
type="slider",
label="Win Rate"
),
ColumnFilter(
"Conversation Efficiency Index",
min=0,
max=1,
default=[0, 1],
type="slider",
label="Conversation Efficiency Index"
),
ColumnFilter(
"Consistency Score",
min=0,
max=1,
default=[0, 1],
type="slider",
label="Consistency Score"
),
],
datatype=[
"number",
"str",
"str",
"number",
"number",
"number",
"number",
"number",
"number",
"number",
"number",
],
)
# Add a divider
gr.Markdown("---")
# Add a citation block in Markdown
gr.Markdown(
"""
Made with ❤️ for SWE-Chatbot-Arena. If this work is useful to you, please consider citing our vision paper:
```
@inproceedings{zhao2025se,
title={SE Arena: An Interactive Platform for Evaluating Foundation Models in Software Engineering},
author={Zhao, Zhimin},
booktitle={2025 IEEE/ACM Second International Conference on AI Foundation Models and Software Engineering (Forge)},
pages={78--81},
year={2025},
organization={IEEE}
}
```
"""
)
with gr.Tab("⚔️Arena"):
# Add title and description as a Markdown component
gr.Markdown("# ⚔️ SWE-Chatbot-Arena")
gr.Markdown("Explore and Test Top LLMs with SE Tasks by Community Voting")
gr.Markdown("### 📜 How It Works")
gr.Markdown(
f"""
- **Blind Comparison**: Submit a SE-related query to two anonymous LLMs randomly selected from up to {len(available_models)} top models from ChatGPT, Gemini, Grok, Claude, Qwen, Deepseek, Mistral, and others.
- **Interactive Voting**: Engage in multi-turn dialogues with both LLMs and compare their responses. You can continue the conversation until you confidently choose the better model.
- **Fair Play Rules**: Votes are counted only if LLM identities remain anonymous. Revealing a LLM's identity disqualifies the session.
"""
)
gr.Markdown(f"*Note: Due to budget constraints, responses that take longer than {TIMEOUT} seconds to generate will be discarded.*")
# Add a divider
gr.Markdown("---")
# Add Hugging Face Sign In button and message
with gr.Row():
# Define the markdown text with or without the hint string
markdown_text = "### Please sign in first to vote!"
if SHOW_HINT_STRING:
markdown_text += f"\n*{HINT_STRING}*"
hint_markdown = gr.Markdown(markdown_text)
with gr.Column():
login_button = gr.LoginButton(
"Sign in with Hugging Face", elem_id="oauth-button"
)
refresh_auth_button = gr.Button(
"Refresh Login Status",
variant="secondary",
size="sm",
visible=True
)
guardrail_message = gr.Markdown("", visible=False, elem_id="guardrail-message")
# NEW: Add a textbox for the repository URL above the user prompt
repo_url = gr.Textbox(
show_label=False,
placeholder="Optional: Enter any GitHub, GitLab, or Hugging Face URL.",
lines=1,
interactive=False,
)
# Components with initial non-interactive state
shared_input = gr.Textbox(
show_label=False,
placeholder="Enter your query for both models here.",
lines=2,
interactive=False, # Initially non-interactive
)
send_first = gr.Button(
"Submit", visible=True, interactive=False
) # Initially non-interactive
# Add event listener to shared_input to toggle send_first button
shared_input.change(
fn=toggle_submit_button, inputs=shared_input, outputs=send_first
)
user_prompt_md = gr.Markdown(value="", visible=False)
with gr.Column():
shared_input
user_prompt_md
with gr.Row():
response_a_title = gr.Markdown(value="", visible=False)
response_b_title = gr.Markdown(value="", visible=False)
with gr.Row():
response_a = gr.Markdown(label="Response from Model A")
response_b = gr.Markdown(label="Response from Model B")
# Add a popup component for timeout notification
with gr.Row(visible=False) as timeout_popup:
timeout_message = gr.Markdown(
"### Timeout\n\nOne of the models did not respond within 1 minute. Please try again."
)
close_popup_btn = gr.Button("Okay")
def close_timeout_popup():
# Re-enable or disable the submit buttons based on the current textbox content
shared_input_state = gr.update(interactive=True)
send_first_state = toggle_submit_button(shared_input.value)
model_a_input_state = gr.update(interactive=True)
model_a_send_state = toggle_submit_button(model_a_input.value)
model_b_input_state = gr.update(interactive=True)
model_b_send_state = toggle_submit_button(model_b_input.value)
# Keep repo_url in sync with shared_input
repo_url_state = gr.update(interactive=True)
return (
gr.update(visible=False), # Hide the timeout popup
shared_input_state, # Update shared_input
send_first_state, # Update send_first button
model_a_input_state, # Update model_a_input
model_a_send_state, # Update model_a_send button
model_b_input_state, # Update model_b_input
model_b_send_state, # Update model_b_send button
repo_url_state, # Update repo_url button
)
# Multi-round inputs, initially hidden
with gr.Row(visible=False) as multi_round_inputs:
model_a_input = gr.Textbox(label="Model A Input", lines=1)
model_a_send = gr.Button(
"Send to Model A", interactive=False
) # Initially disabled
model_b_input = gr.Textbox(label="Model B Input", lines=1)
model_b_send = gr.Button(
"Send to Model B", interactive=False
) # Initially disabled
# Add event listeners to model_a_input and model_b_input to toggle their submit buttons
model_a_input.change(
fn=toggle_submit_button, inputs=model_a_input, outputs=model_a_send
)
model_b_input.change(
fn=toggle_submit_button, inputs=model_b_input, outputs=model_b_send
)
close_popup_btn.click(
close_timeout_popup,
inputs=[],
outputs=[
timeout_popup,
shared_input,
send_first,
model_a_input,
model_a_send,
model_b_input,
model_b_send,
repo_url,
],
)
def guardrail_check_se_relevance(user_input):
"""
Use openai/gpt-oss-safeguard-20b to check if the user input is SE-related.
Return True if it is SE-related, otherwise False.
"""
# Example instructions for classification — adjust to your needs
system_message = {
"role": "system",
"content": (
"You are a classifier that decides if a user's question is relevant to software engineering. "
"If the question is about software engineering concepts, tools, processes, or code, respond with 'Yes'. "
"Otherwise, respond with 'No'."
),
}
user_message = {"role": "user", "content": user_input}
try:
# Make the chat completion call
response = openai_client.chat.completions.create(
model="openai/gpt-oss-safeguard-20b", messages=[system_message, user_message]
)
classification = response.choices[0].message.content.strip().lower()
# Check if the LLM responded with 'Yes'
return classification.lower().startswith("yes")
except Exception as e:
print(f"Guardrail check failed: {e}")
# If there's an error, you might decide to fail open (allow) or fail closed (block).
# Here we default to fail open, but you can change as needed.
return True
def disable_first_submit_ui():
"""First function to immediately disable UI elements"""
return (
# [0] guardrail_message: hide
gr.update(visible=False),
# [1] shared_input: disable but keep visible
gr.update(interactive=False),
# [2] repo_url: disable but keep visible
gr.update(interactive=False),
# [3] send_first: disable and show loading state
gr.update(interactive=False, value="Processing..."),
)
# Function to update model titles and responses
def update_model_titles_and_responses(
repo_url, user_input, models_state, conversation_state
):
# Guardrail check first
if not repo_url and not guardrail_check_se_relevance(user_input):
# Return updates to show the guardrail message and re-enable UI
return (
# [0] guardrail_message: Show guardrail message
gr.update(
value="### Oops! Try asking something about software engineering. Thanks!",
visible=True,
),
# [1] shared_input: clear and re-enable
gr.update(value="", interactive=True, visible=True),
# [2] repo_url: clear and re-enable
gr.update(value="", interactive=True, visible=True),
# [3] user_prompt_md: clear and hide
gr.update(value="", visible=False),
# [4] response_a_title: clear and hide
gr.update(value="", visible=False),
# [5] response_b_title: clear and hide
gr.update(value="", visible=False),
# [6] response_a: clear response
gr.update(value=""),
# [7] response_b: clear response
gr.update(value=""),
# [8] multi_round_inputs: hide
gr.update(visible=False),
# [9] vote_panel: hide
gr.update(visible=False),
# [10] send_first: re-enable button with original text
gr.update(visible=True, interactive=True, value="Submit"),
# [11] feedback: enable the selection
gr.update(interactive=True),
# [12] models_state: pass state as-is
models_state,
# [13] conversation_state: pass state as-is
conversation_state,
# [14] timeout_popup: hide
gr.update(visible=False),
# [15] model_a_send: disable
gr.update(interactive=False),
# [16] model_b_send: disable
gr.update(interactive=False),
# [17] thanks_message: hide
gr.update(visible=False),
)
# Fetch repository info if a URL is provided
repo_info = fetch_url_content(repo_url)
combined_user_input = (
f"Context: {repo_info}\n\nInquiry: {user_input}"
if repo_info
else user_input
)
# Randomly select two models for the comparison
selected_models = [random.choice(available_models) for _ in range(2)]
models = {"left": selected_models[0], "right": selected_models[1]}
# Create a copy to avoid modifying the original
conversations = models.copy()
conversations.update(
{
"url": repo_url,
"left_chat": [{"role": "user", "content": combined_user_input}],
"right_chat": [{"role": "user", "content": combined_user_input}],
}
)
# Clear previous states
models_state.clear()
conversation_state.clear()
# Update the states
models_state.update(models)
conversation_state.update(conversations)
try:
response_a = chat_with_models("left", models_state, conversation_state)
response_b = chat_with_models("right", models_state, conversation_state)
except TimeoutError as e:
# Handle timeout by resetting components and showing a popup.
return (
# [0] guardrail_message: hide
gr.update(visible=False),
# [1] shared_input: re-enable, preserve user input
gr.update(interactive=True, visible=True),
# [2] repo_url: re-enable, preserve user input
gr.update(interactive=True, visible=True),
# [3] user_prompt_md: hide
gr.update(value="", visible=False),
# [4] response_a_title: hide
gr.update(value="", visible=False),
# [5] response_b_title: hide
gr.update(value="", visible=False),
# [6] response_a: clear
gr.update(value=""),
# [7] response_b: clear
gr.update(value=""),
# [8] multi_round_inputs: hide
gr.update(visible=False),
# [9] vote_panel: hide
gr.update(visible=False),
# [10] send_first: re-enable with original text
gr.update(visible=True, interactive=True, value="Submit"),
# [11] feedback: disable
gr.update(interactive=False),
# [12] models_state: pass state as-is
models_state,
# [13] conversation_state: pass state as-is
conversation_state,
# [14] timeout_popup: show popup
gr.update(visible=True),
# [15] model_a_send: disable
gr.update(interactive=False),
# [16] model_b_send: disable
gr.update(interactive=False),
# [17] thanks_message: hide
gr.update(visible=False),
)
except Exception as e:
# Handle other errors by resetting UI state and showing error message
return (
# [0] guardrail_message: show error message
gr.update(value=f"### Error: {str(e)}", visible=True),
# [1] shared_input: re-enable, preserve user input
gr.update(interactive=True, visible=True),
# [2] repo_url: re-enable, preserve user input
gr.update(interactive=True, visible=True),
# [3] user_prompt_md: hide
gr.update(value="", visible=False),
# [4] response_a_title: hide
gr.update(value="", visible=False),
# [5] response_b_title: hide
gr.update(value="", visible=False),
# [6] response_a: clear
gr.update(value=""),
# [7] response_b: clear
gr.update(value=""),
# [8] multi_round_inputs: hide
gr.update(visible=False),
# [9] vote_panel: hide
gr.update(visible=False),
# [10] send_first: re-enable with original text
gr.update(visible=True, interactive=True, value="Submit"),
# [11] feedback: disable
gr.update(interactive=False),
# [12] models_state: pass state as-is
models_state,
# [13] conversation_state: pass state as-is
conversation_state,
# [14] timeout_popup: hide popup
gr.update(visible=False),
# [15] model_a_send: disable
gr.update(interactive=False),
# [16] model_b_send: disable
gr.update(interactive=False),
# [17] thanks_message: hide
gr.update(visible=False),
)
# Determine the initial state of the multi-round send buttons
model_a_send_state = toggle_submit_button("")
model_b_send_state = toggle_submit_button("")
display_content = f"### Your Query:\n\n{user_input}"
if repo_info:
display_content += f"\n\n### Repo-related URL:\n\n{repo_url}"
# Return the updates for all 18 outputs.
return (
# [0] guardrail_message: hide (since no guardrail issue)
gr.update(visible=False),
# [1] shared_input: re-enable but hide
gr.update(interactive=True, visible=False),
# [2] repo_url: re-enable but hide
gr.update(interactive=True, visible=False),
# [3] user_prompt_md: display the user's query
gr.update(value=display_content, visible=True),
# [4] response_a_title: show anonymized title for Model A
gr.update(value="### Model A", visible=True),
# [5] response_b_title: show anonymized title for Model B
gr.update(value="### Model B", visible=True),
# [6] response_a: display Model A response
gr.update(value=response_a),
# [7] response_b: display Model B response
gr.update(value=response_b),
# [8] multi_round_inputs: show the input section for multi-round dialogues
gr.update(visible=True),
# [9] vote_panel: show vote panel
gr.update(visible=True),
# [10] send_first: hide the submit button but restore label
gr.update(visible=False, value="Submit"),
# [11] feedback: enable the feedback selection
gr.update(interactive=True),
# [12] models_state: pass updated models_state
models_state,
# [13] conversation_state: pass updated conversation_state
conversation_state,
# [14] timeout_popup: hide any timeout popup if visible
gr.update(visible=False),
# [15] model_a_send: set state of the model A send button
model_a_send_state,
# [16] model_b_send: set state of the model B send button
model_b_send_state,
# [17] thanks_message: hide the thank-you message
gr.update(visible=False),
)
# Feedback panel, initially hidden
with gr.Column(visible=False) as vote_panel:
gr.Markdown("### Which model do you prefer?")
with gr.Row():
feedback = gr.Radio(
choices=["Model A", "Model B", "Tie", "Tie (Both Bad)"],
show_label=False,
value="Tie",
interactive=False,
)
submit_feedback_btn = gr.Button("Submit Feedback", interactive=False)
thanks_message = gr.Markdown(
value="## Thanks for your vote!", visible=False
) # Add thank you message
def hide_thanks_message():
return gr.update(visible=False)
# Function to handle login/refresh - uses gr.Request to get OAuth info
def handle_login(request: gr.Request):
"""
Handle user login using Hugging Face OAuth.
When deployed on HF Spaces with OAuth, request contains user info.
This is also used by the refresh button to re-check auth status.
"""
# Try to get token from environment (for Spaces) or HfApi (for local)
token = os.getenv("HF_TOKEN") or HfApi().token
# Check if user is authenticated through HF Spaces OAuth
is_authenticated = hasattr(request, 'username') and request.username
if is_authenticated or token:
# User is logged in
return (
gr.update(interactive=True), # repo_url -> Enable
gr.update(interactive=True), # Enable shared_input
gr.update(interactive=False), # Keep send_first disabled initially
gr.update(interactive=True), # Enable feedback radio buttons
gr.update(interactive=True), # Enable submit_feedback_btn
gr.update(visible=False), # Hide the hint string
gr.update(visible=True), # Keep login button visible for logout
gr.update(visible=False), # Hide refresh button when authenticated
token, # Store the oauth token
)
else:
# User is not logged in - instruct them to use HF login
return (
gr.update(interactive=False), # repo_url -> disable
gr.update(interactive=False), # Keep shared_input disabled
gr.update(interactive=False), # Keep send_first disabled
gr.update(interactive=False), # Keep feedback radio buttons disabled
gr.update(interactive=False), # Keep submit_feedback_btn disabled
gr.update(visible=True, value="## Please sign in with Hugging Face!\nClick the 'Sign in with Hugging Face' button above, then click 'Refresh Login Status' after you return from the auth page."), # Show instructions
gr.update(visible=True), # Keep login button visible
gr.update(visible=True), # Show refresh button
None, # Clear oauth_token
)
# Handle the refresh button click to re-check auth status
# Note: login_button (gr.LoginButton) handles OAuth redirect natively;
# app.load(check_auth_on_load) detects auth after redirect back.
refresh_auth_button.click(
handle_login,
outputs=[
repo_url, # Keep this in sync with shared_input
shared_input, # Enable shared_input
send_first, # Enable send_first button
feedback, # Enable feedback radio buttons
submit_feedback_btn, # Enable submit_feedback_btn
hint_markdown, # Hide the hint string
login_button, # Control login button visibility
refresh_auth_button, # Control refresh button visibility
oauth_token, # Store the OAuth token
],
)
# First round handling
send_first.click(
fn=hide_thanks_message, inputs=[], outputs=[thanks_message]
).then(
fn=disable_first_submit_ui, # First disable UI
inputs=[],
outputs=[
guardrail_message,
shared_input,
repo_url,
send_first, # Just the essential UI elements to update immediately
],
).then(
fn=update_model_titles_and_responses, # Then do the actual processing
inputs=[repo_url, shared_input, models_state, conversation_state],
outputs=[
guardrail_message,
shared_input,
repo_url,
user_prompt_md,
response_a_title,
response_b_title,
response_a,
response_b,
multi_round_inputs,
vote_panel,
send_first,
feedback,
models_state,
conversation_state,
timeout_popup,
model_a_send,
model_b_send,
thanks_message,
],
)
def disable_model_a_ui():
"""First function to immediately disable model A UI elements"""
return (
# [0] model_a_input: disable
gr.update(interactive=False),
# [1] model_a_send: disable and show loading state
gr.update(interactive=False, value="Processing..."),
)
# Handle subsequent rounds
def handle_model_a_send(user_input, models_state, conversation_state):
try:
conversation_state["left_chat"].append(
{"role": "user", "content": user_input}
)
response = chat_with_models("left", models_state, conversation_state)
# Clear the input box and disable the send button
return (
response,
conversation_state,
gr.update(visible=False),
gr.update(
value="", interactive=True
), # Clear and enable model_a_input
gr.update(
interactive=False, value="Send to Model A"
), # Reset button text
)
except TimeoutError as e:
# Disable inputs when timeout occurs
return (
gr.update(value=""), # Clear response
conversation_state,
gr.update(visible=True), # Show the timeout popup
gr.update(interactive=True), # Re-enable model_a_input
gr.update(
interactive=True, value="Send to Model A"
), # Re-enable model_a_send button
)
except Exception as e:
raise gr.Error(str(e))
def disable_model_b_ui():
"""First function to immediately disable model B UI elements"""
return (
# [0] model_b_input: disable
gr.update(interactive=False),
# [1] model_b_send: disable and show loading state
gr.update(interactive=False, value="Processing..."),
)
def handle_model_b_send(user_input, models_state, conversation_state):
try:
conversation_state["right_chat"].append(
{"role": "user", "content": user_input}
)
response = chat_with_models("right", models_state, conversation_state)
# Clear the input box and disable the send button
return (
response,
conversation_state,
gr.update(visible=False),
gr.update(
value="", interactive=True
), # Clear and enable model_b_input
gr.update(
interactive=False, value="Send to Model B"
), # Reset button text
)
except TimeoutError as e:
# Disable inputs when timeout occurs
return (
gr.update(value=""), # Clear response
conversation_state,
gr.update(visible=True), # Show the timeout popup
gr.update(interactive=True), # Re-enable model_b_input
gr.update(
interactive=True, value="Send to Model B"
), # Re-enable model_b_send button
)
except Exception as e:
raise gr.Error(str(e))
model_a_send.click(
fn=disable_model_a_ui, # First disable UI
inputs=[],
outputs=[model_a_input, model_a_send],
).then(
fn=handle_model_a_send, # Then do the actual processing
inputs=[model_a_input, models_state, conversation_state],
outputs=[
response_a,
conversation_state,
timeout_popup,
model_a_input,
model_a_send,
],
)
model_b_send.click(
fn=disable_model_b_ui, # First disable UI
inputs=[],
outputs=[model_b_input, model_b_send],
).then(
fn=handle_model_b_send, # Then do the actual processing
inputs=[model_b_input, models_state, conversation_state],
outputs=[
response_b,
conversation_state,
timeout_popup,
model_b_input,
model_b_send,
],
)
def submit_feedback(vote, models_state, conversation_state, token):
# Map vote to actual model names
match vote:
case "Model A":
winner_model = "left"
case "Model B":
winner_model = "right"
case "Tie":
winner_model = "tie"
case _:
winner_model = "both_bad"
# Create feedback entry
vote_entry = {
"left": models_state["left"],
"right": models_state["right"],
"winner": winner_model,
}
# Get the current datetime for file naming
file_name = f"{LEADERBOARD_FILE}/{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Save feedback back to the Hugging Face dataset
save_content_to_hf(vote_entry, VOTE_REPO, file_name, token)
conversation_state["right_chat"][0]["content"] = conversation_state[
"right_chat"
][0]["content"].split("\n\nInquiry: ")[-1]
conversation_state["left_chat"][0]["content"] = conversation_state[
"left_chat"
][0]["content"].split("\n\nInquiry: ")[-1]
# Save conversations back to the Hugging Face dataset
save_content_to_hf(conversation_state, CONVERSATION_REPO, file_name, token)
# Clear state
models_state.clear()
conversation_state.clear()
# Adjust output count to match the interface definition
return (
gr.update(
value="", interactive=True, visible=True
), # [0] Clear shared_input textbox
gr.update(
value="", interactive=True, visible=True
), # [1] Clear repo_url textbox
gr.update(
value="", visible=False
), # [2] Hide user_prompt_md markdown component
gr.update(
value="", visible=False
), # [3] Hide response_a_title markdown component
gr.update(
value="", visible=False
), # [4] Hide response_b_title markdown component
gr.update(value=""), # [5] Clear Model A response markdown component
gr.update(value=""), # [6] Clear Model B response markdown component
gr.update(visible=False), # [7] Hide multi_round_inputs row
gr.update(visible=False), # [8] Hide vote_panel row
gr.update(
value="Submit", interactive=True, visible=True
), # [9] Reset send_first button
gr.update(
value="Tie", interactive=True
), # [10] Reset feedback radio selection
get_leaderboard_data(vote_entry, use_cache=False), # [11] Updated leaderboard data
gr.update(
visible=True
), # [12] Show the thanks_message markdown component
)
# Update the click event for the submit feedback button
submit_feedback_btn.click(
submit_feedback,
inputs=[feedback, models_state, conversation_state, oauth_token],
outputs=[
shared_input, # Reset shared_input
repo_url, # Show the repo-related URL message
user_prompt_md, # Hide user_prompt_md
response_a_title, # Hide Model A title
response_b_title, # Hide Model B title
response_a, # Clear Model A response
response_b, # Clear Model B response
multi_round_inputs, # Hide multi-round input section
vote_panel, # Hide vote panel
send_first, # Reset and update send_first button
feedback, # Reset feedback selection
leaderboard_component, # Update leaderboard data dynamically
thanks_message, # Show the "Thanks for your vote!" message
],
)
# Add a divider
gr.Markdown("---")
# Add Terms of Service at the bottom
gr.Markdown("### Terms of Service")
gr.Markdown(
"""
*Users are required to agree to the following terms before using the service:*
- The service is a **research preview**. It only provides limited safety measures and may generate offensive content.
- It must not be used for any **illegal, harmful, violent, racist, or sexual** purposes.
- Please do not upload any **private** information.
- The service collects user dialogue data, including both text and images, and reserves the right to distribute it under a **Creative Commons Attribution (CC-BY)** or a similar license.
"""
)
# Check authentication status when the app loads
app.load(
check_auth_on_load,
outputs=[
repo_url,
shared_input,
send_first,
feedback,
submit_feedback_btn,
hint_markdown,
login_button,
refresh_auth_button,
oauth_token,
],
)
app.launch()