Murder.Ai / app.py
Justxd22's picture
fix height
8e1c709
import gradio as gr
import os
import json
import uvicorn
import time
import base64
from fastapi import FastAPI, Response
from fastapi.staticfiles import StaticFiles
from game import game_engine
from pydantic import BaseModel
# --- Setup FastAPI for Static Files ---
app = FastAPI()
# Ensure directories exist
os.makedirs("ui/static", exist_ok=True)
app.mount("/static", StaticFiles(directory="ui/static"), name="static")
# --- Global Logging ---
LOG_BUFFER = []
def add_log(message):
timestamp = time.strftime("%H:%M:%S")
entry = f"[{timestamp}] {message}\n" + "-"*40 + "\n"
LOG_BUFFER.append(entry)
# Keep last 50 logs
if len(LOG_BUFFER) > 50:
LOG_BUFFER.pop(0)
# --- API Bridge ---
class BridgeRequest(BaseModel):
action: str
data: dict = {}
class TTSRequest(BaseModel):
text: str
voice_id: str
@app.post("/api/bridge")
async def api_bridge(request: BridgeRequest):
"""Direct API endpoint for game logic communication."""
input_data = json.dumps({"action": request.action, "data": request.data})
# Log Request
add_log(f"IN: {input_data}")
print(f"API Bridge Received: {input_data}")
response = session.handle_input(input_data)
# Log Response
if response:
add_log(f"OUT: {json.dumps(response)}")
return response or {}
# --- Game Logic Wrapper ---
class GameSession:
def __init__(self):
self.session_id = None
self.game = None
self.voice_enabled = False
self.game_mode = "interactive"
def start(self, difficulty="medium", mode="interactive", voice=True):
self.session_id, self.game = game_engine.start_game(difficulty)
self.voice_enabled = voice
self.game_mode = mode
return self._get_init_data()
def _get_init_data(self):
if not self.game:
return None
# Prepare static data for tools
cameras = list(self.game.scenario["evidence"]["footage_data"].keys())
dna_map = {}
for k, v in self.game.scenario["evidence"]["dna_evidence"].items():
dna_map[k] = v.get("label", k) # Fallback to ID if no label
return {
"action": "init_game",
"data": {
"scenario": self.game.scenario,
"round": self.game.round,
"points": self.game.points,
"available_cameras": cameras,
"dna_map": dna_map,
"unlocked_evidence": self.game.unlocked_evidence,
"mode": self.game_mode
}
}
def handle_input(self, input_json):
if not input_json:
return None
try:
data = json.loads(input_json)
except:
return None
action = data.get("action")
payload = data.get("data", {})
if action == "ready":
# Wait for explicit start from Gradio UI, or return existing state
if self.game:
return self._get_init_data()
return None # Wait for user to pick case
if not self.game:
return None
if action == "ai_step":
step_data = self.game.run_ai_step()
# Format result for frontend
# We need to map the AI's internal result to frontend actions
actions = []
# 1. Thought Bubble
actions.append({
"action": "ai_thought",
"data": {"thought": step_data["thought"]}
})
res = step_data["result"]
res_type = step_data["action"] # use_tool, chat, accuse
if res_type == "use_tool":
# We need to construct the same evidence payload as handle_input
tool_name = step_data["result"].get("location") # Wait, result of use_tool is the raw dict
# I need the tool name from decision
# Re-running logic here is messy.
# Better: run_ai_step should return enough info.
# It returns "result" which is the output of use_tool.
# But we don't know which tool it was easily unless we parse "action" or store it.
# In run_ai_step I did:
# tool_name = decision.get("tool_name")
# kwargs = decision.get("args", {})
# Let's update run_ai_step to include tool_name in result wrapper?
# No, let's just infer or update run_ai_step.
# Actually, the loop in JS will call this.
# I'll handle formatting here.
pass
return {
"action": "ai_step_result",
"data": step_data
}
if action == "select_suspect":
return None
if action == "next_round":
if self.game.advance_round():
return {
"action": "update_status",
"data": {
"round": self.game.round,
"points": self.game.points
}
}
else:
return {
"action": "game_over",
"data": {
"message": "COLD CASE. You ran out of time.",
"verdict": False
}
}
if action == "chat_message":
suspect_id = payload.get("suspect_id")
message = payload.get("message")
response = self.game.question_suspect(suspect_id, message)
suspect = next((s for s in self.game.scenario["suspects"] if s["id"] == suspect_id), None)
suspect_name = next((s["name"] for s in self.game.scenario["suspects"] if s["id"] == suspect_id), "Suspect")
# Clean text for ElevenLabs
cleaned_response = response
# Remove text within leading parentheses (e.g., "(A bit defensively)")
if cleaned_response.strip().startswith('(') and ')' in cleaned_response:
end_paren_idx = cleaned_response.find(')')
if end_paren_idx != -1:
cleaned_response = cleaned_response[end_paren_idx + 1:].strip()
# Remove all asterisks
cleaned_response = cleaned_response.replace('*', '')
# Trim leading/trailing whitespace
cleaned_response = cleaned_response.strip()
# Generate Audio
audio_b64 = None
# Check Voice Enabled
if self.voice_enabled and suspect and "voice_id" in suspect and cleaned_response:
audio_bytes = self.game.voice_manager.generate_audio(cleaned_response, suspect["voice_id"])
if audio_bytes:
audio_b64 = "data:audio/mpeg;base64," + base64.b64encode(audio_bytes).decode('utf-8')
return {
"action": "update_chat",
"data": {
"role": "suspect",
"name": suspect_name,
"content": response,
"audio": audio_b64
}
}
if action == "use_tool":
tool_name = payload.get("tool")
arg = payload.get("input") # Default for single-input tools
if tool_name == "accuse":
result = self.game.make_accusation(payload.get("suspect_id"))
if result["result"] == "win":
return {
"action": "game_over",
"data": {
"message": result["message"],
"verdict": True
}
}
elif result["result"] == "loss":
return {
"action": "game_over",
"data": {
"message": result["message"],
"verdict": False
}
}
else:
return {
"action": "round_failure",
"data": {
"message": result["message"],
"eliminated_id": result["eliminated_id"],
"round": result["new_round"],
"points": result["new_points"]
}
}
kwargs = {}
if tool_name == "get_location":
kwargs = {"phone_number": arg}
elif tool_name == "call_alibi":
# Support both simple string (old) and structured (new)
if "alibi_id" in payload:
arg = payload.get("alibi_id") # Update arg for formatter
kwargs = {
"alibi_id": arg,
"question": payload.get("question")
}
else:
kwargs = {"phone_number": arg} # Fallback
elif tool_name == "get_dna_test":
kwargs = {"evidence_id": arg}
elif tool_name == "get_footage":
kwargs = {"location": arg}
result = self.game.use_tool(tool_name, **kwargs)
if "error" in result:
return {
"action": "tool_error",
"data": {"message": result["error"]}
}
# Format the result nicely
evidence_data = format_tool_response(tool_name, arg, result, self.game.scenario)
# Include updated points and unlocks in response
evidence_data["updated_points"] = self.game.points
evidence_data["unlocked_evidence"] = self.game.unlocked_evidence
if "newly_unlocked" in result and result["newly_unlocked"]:
evidence_data["newly_unlocked"] = result["newly_unlocked"]
return {
"action": "add_evidence",
"data": evidence_data
}
return None
def format_tool_response(tool_name, arg, result, scenario):
"""Formats tool output into HTML and finds associated suspect."""
suspect_id = None
suspect_name = None
html = ""
title = f"Tool: {tool_name}"
# Helpers to find suspect
def find_by_phone(phone):
clean_input = "".join(filter(str.isdigit, str(phone)))
for s in scenario["suspects"]:
s_phone = "".join(filter(str.isdigit, str(s.get("phone_number", ""))))
if clean_input and s_phone.endswith(clean_input):
return s
return None
def find_by_name(name):
for s in scenario["suspects"]:
if s["name"].lower() == name.lower():
return s
return None
def find_by_alibi_id(aid):
for s in scenario["suspects"]:
if s.get("alibi_id") == aid:
return s
return None
# Logic per tool
if tool_name == "get_location":
suspect = find_by_phone(arg)
if suspect:
suspect_id = suspect["id"]
suspect_name = suspect["name"]
title = f"πŸ“ Location Data"
if "history" in result:
html += "<ul>"
for entry in result["history"]:
html += f"<li>{entry}</li>"
html += "</ul>"
elif "description" in result:
html += f"<div><strong>Time:</strong> {result.get('timestamp')}</div>"
html += f"<div><strong>Loc:</strong> {result.get('description')}</div>"
elif "error" in result:
html += f"<div style='color:red'>{result['error']}</div>"
else:
html += str(result)
elif tool_name == "call_alibi":
suspect = find_by_phone(arg) # Try phone first
if not suspect:
suspect = find_by_alibi_id(arg) # Try ID
if suspect:
suspect_id = suspect["id"]
suspect_name = suspect["name"]
title = f"πŸ“ž Alibi Check"
if "error" in result:
html += f"<div style='color:red'>{result['error']}</div>"
else:
html += f"<div><strong>Contact:</strong> {result.get('contact_name')}</div>"
html += f"<div style='margin-top:5px; font-style:italic;'>\"{result.get('response')}\"</div>"
html += f"<div style='font-size:0.8em; color:#555'>Confidence: {result.get('confidence')}</div>"
elif tool_name == "get_dna_test":
# Get the label for the evidence item
evidence_label = scenario["evidence"]["dna_evidence"].get(arg, {}).get("label", arg)
title = f"🧬 DNA Result for {evidence_label}"
if "matches" in result:
# Multiple matches
html += f"<div><strong>Mixed Sample:</strong></div><ul>"
for name in result["matches"]:
html += f"<li>{name}</li>"
html += "</ul>"
html += f"<div><strong>Notes:</strong> {result.get('notes')}</div>"
# Note: We don't auto-assign a suspect_id for mixed results to avoid cluttering one card
else:
# Single match
if "primary_match" in result and result["primary_match"] != "Unknown":
suspect = find_by_name(result["primary_match"])
if suspect:
suspect_id = suspect["id"]
suspect_name = suspect["name"]
if "error" in result:
html += f"<div style='color:red'>{result['error']}</div>"
else:
html += f"<div><strong>Match:</strong> {result.get('primary_match')}</div>"
html += f"<div><strong>Confidence:</strong> {result.get('confidence')}</div>"
html += f"<div><strong>Notes:</strong> {result.get('notes')}</div>"
elif tool_name == "get_footage":
title = "πŸ“Ή Security Footage"
if "error" in result:
html += f"<div style='color:red'>{result['error']}</div>"
else:
html += f"<div><strong>Cam:</strong> {result.get('location')}</div>"
html += f"<div><strong>Time:</strong> {result.get('time_range')}</div>"
html += f"<div><strong>Visible:</strong> {', '.join(result.get('visible_people', []))}</div>"
html += f"<div><strong>Detail:</strong> {result.get('key_details')}</div>"
else:
html = str(result)
return {
"title": title,
"html_content": html,
"suspect_id": suspect_id,
"suspect_name": suspect_name
}
session = GameSession()
# --- Gradio App ---
def get_game_iframe():
with open("ui/templates/game_interface.html", "r") as f:
html_content = f.read()
html_content = html_content.replace('../static/', '/static/')
html_content_escaped = html_content.replace('"', '&quot;')
# Iframe is hidden initially
iframe = f"""
<iframe
id="game-iframe"
srcdoc="{html_content_escaped}"
style="width: 100%; height: 50vh; border: none;"
allow="autoplay; fullscreen"
></iframe>
"""
return iframe
def start_game_from_ui(case_name, mode, voice):
difficulty = "medium"
if "Coffee" in case_name: difficulty = "easy"
if "Gallery" in case_name: difficulty = "hard"
mode_slug = "spectator" if "Spectator" in mode else "interactive"
init_data = session.start(difficulty, mode_slug, voice)
# Extract data for tools
phones = [s["phone_number"] for s in init_data["data"]["scenario"]["suspects"]]
cameras = init_data["data"]["available_cameras"]
suspects = [s["name"] for s in init_data["data"]["scenario"]["suspects"]]
# Return visible updates
return (
gr.update(visible=False), # Hide selector row
gr.update(visible=True), # Show game frame
json.dumps(init_data), # Send init data to bridge
gr.update(choices=phones, value=phones[0] if phones else None),
gr.update(choices=cameras, value=cameras[0] if cameras else None),
gr.update(choices=suspects, value=suspects[0] if suspects else None)
)
css = """
#bridge-input, #bridge-output, #log-input { display: none !important; }
/* Remove viewport-based height constraints */
.gradio-container {
padding: 0 !important;
max-width: 100% !important;
display: flex;
flex-direction: column;
}
/* Fixed height for game container */
#game-frame-container {
height: 70vh !important;
min-height: 500px;
max-height: 800px;
border: none;
overflow: hidden;
padding: 0;
margin-bottom: 2rem;
}
#game-frame-container > .html-container {
height: 100% !important;
display: flex;
flex-direction: column;
}
#game-frame-container .prose {
height: 100% !important;
max-width: 100% !important;
}
/* Setup screen styling */
#setup-container {
max-width: 800px;
margin: 2rem auto;
padding: 2rem;
}
footer { display: none !important; }
"""
with gr.Blocks(title="Murder.Ai") as demo:
gr.HTML(f"<style>{css}</style>")
# --- Initial Setup Screen ---
with gr.Row(elem_id="setup-container", visible=True) as setup_col:
with gr.Column():
gr.Markdown("# πŸ•΅οΈ MURDER.AI")
gr.Markdown("### 1. Select Case File")
case_dropdown = gr.Dropdown(
choices=["The Silicon Valley Incident (Medium)", "The Coffee Shop Murder (Easy)", "The Gallery Heist (Hard)"],
value="The Silicon Valley Incident (Medium)",
show_label=False
)
gr.Markdown("### 2. Game Configuration")
with gr.Row():
game_mode = gr.Radio(["Interactive", "AI Spectator (Beta)"], value="Interactive", label="Game Mode")
voice_toggle = gr.Checkbox(value=True, label="Enable Voice (ElevenLabs)")
gr.Markdown("### 3. Investigation")
start_btn = gr.Button("πŸ“‚ OPEN CASE FILE", variant="primary", size="lg")
# Game Frame (Hidden Initially)
with gr.Group(visible=False, elem_id="game-frame-container") as game_group:
game_html = gr.HTML(value=get_game_iframe())
bridge_input = gr.Textbox(elem_id="bridge-input", visible=True)
bridge_output = gr.Textbox(elem_id="bridge-output", visible=True)
log_input = gr.Textbox(elem_id="log-input", visible=True) # Input from JS for logs
# Log Box
with gr.Accordion("System Logs (MCP Traffic)", open=False):
with gr.Row():
refresh_logs_btn = gr.Button("πŸ”„ Refresh Logs", scale=0)
auto_refresh = gr.Checkbox(label="Auto-refresh (1s)", value=False, scale=0)
log_box = gr.Textbox(label="Traffic", lines=10, max_lines=10, interactive=False, autoscroll=True, elem_id="visible-log-box")
# Log Polling
log_timer = gr.Timer(1, active=False)
def poll_logs():
return "".join(LOG_BUFFER)
log_timer.tick(fn=poll_logs, outputs=log_box)
refresh_logs_btn.click(fn=poll_logs, outputs=log_box)
def toggle_timer(active):
return gr.Timer(active=active)
auto_refresh.change(fn=toggle_timer, inputs=auto_refresh, outputs=log_timer)
# --- Footer / Showcase ---
gr.Markdown("---")
gr.Markdown("### πŸŽ₯ How to Play (Demo)")
gr.HTML('<iframe width="100%" height="400" src="https://www.youtube.com/embed/uPPwrhlSzdA" frameborder="0" allow="autoplay; encrypted-media" allowfullscreen></iframe>')
gr.Markdown("# πŸ› οΈ Try MCP Tools Directly (Click Open Case Files first)")
gr.Markdown("*Note: Start a game first to populate these tools.*")
with gr.Tabs():
with gr.Tab("πŸ“ Location"):
loc_phone = gr.Radio(label="Select Phone Number", choices=[], interactive=True)
loc_btn = gr.Button("Get Location")
loc_out = gr.JSON(label="Result")
with gr.Tab("πŸ“Ή Footage"):
foot_cam = gr.Radio(label="Select Camera", choices=[], interactive=True)
foot_btn = gr.Button("Get Footage")
foot_out = gr.JSON(label="Result")
with gr.Tab("🧬 DNA"):
dna_id = gr.Textbox(label="Evidence ID")
dna_btn = gr.Button("Test DNA")
dna_out = gr.JSON(label="Result")
with gr.Tab("πŸ“ž Alibi"):
alibi_id = gr.Textbox(label="Alibi ID")
alibi_q = gr.Textbox(label="Question", value="Where were they?")
alibi_btn = gr.Button("Call Alibi")
alibi_out = gr.JSON(label="Result")
with gr.Tab("πŸ’¬ Interrogate"):
int_suspect = gr.Radio(label="Select Suspect", choices=[], interactive=True)
int_q = gr.Textbox(label="Question", value="Where were you?")
int_btn = gr.Button("Interrogate")
int_out_text = gr.Markdown(label="Response")
int_out_audio = gr.Audio(label="Voice Response", autoplay=True)
# --- Event Handlers for Tools ---
def wrap_tool(tool_name, *args):
if not session.game: return {"error": "Start game first"}
kwargs = {}
if tool_name == "get_location": kwargs = {"phone_number": args[0]}
elif tool_name == "get_footage": kwargs = {"location": args[0]}
elif tool_name == "get_dna_test": kwargs = {"evidence_id": args[0]}
elif tool_name == "call_alibi": kwargs = {"alibi_id": args[0], "question": args[1]}
return session.game.use_tool(tool_name, **kwargs)
loc_btn.click(lambda p: wrap_tool("get_location", p), inputs=loc_phone, outputs=loc_out)
foot_btn.click(lambda c: wrap_tool("get_footage", c), inputs=foot_cam, outputs=foot_out)
dna_btn.click(lambda e: wrap_tool("get_dna_test", e), inputs=dna_id, outputs=dna_out)
alibi_btn.click(lambda i, q: wrap_tool("call_alibi", i, q), inputs=[alibi_id, alibi_q], outputs=alibi_out)
def wrap_chat(suspect_name, question):
if not session.game: return "Start game first", None
# Find ID from name
s_id = next((s["id"] for s in session.game.scenario["suspects"] if s["name"] == suspect_name), None)
if not s_id: return "Suspect not found", None
resp = session.game.question_suspect(s_id, question)
# Audio
audio_path = None
suspect = next((s for s in session.game.scenario["suspects"] if s["id"] == s_id), None)
# Clean text
cleaned = resp
if cleaned.strip().startswith('(') and ')' in cleaned:
cleaned = cleaned[cleaned.find(')')+1:].strip()
cleaned = cleaned.replace('*', '').strip()
if suspect and "voice_id" in suspect and cleaned:
audio_bytes = session.game.voice_manager.generate_audio(cleaned, suspect["voice_id"])
if audio_bytes:
import tempfile
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as fp:
fp.write(audio_bytes)
audio_path = fp.name
return resp, audio_path
int_btn.click(wrap_chat, inputs=[int_suspect, int_q], outputs=[int_out_text, int_out_audio])
# Start Game Event
start_btn.click(
fn=start_game_from_ui,
inputs=[case_dropdown, game_mode, voice_toggle],
outputs=[setup_col, game_group, bridge_output, loc_phone, foot_cam, int_suspect]
)
# Bridge Logic with Logging (Legacy/Fallback)
def bridge_logic_with_log(input_data, current_log):
# ... existing logic ...
return None, None # Disabled
# Bridge Logic (Python -> JS)
bridge_output.change(
None,
inputs=[bridge_output],
js="""
(data) => {
if (!data) return;
const iframe = document.querySelector('#game-frame-container iframe');
if (iframe && iframe.contentWindow) {
iframe.contentWindow.postMessage(JSON.parse(data), '*');
}
}
"""
)
app = gr.mount_gradio_app(app, demo, path="/")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)