Spaces:
Running
on
Zero
Running
on
Zero
| import math, json | |
| import gradio as gr | |
| import torch, pandas as pd | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| # ZeroGPU support | |
| try: | |
| import spaces | |
| ZEROGPU_AVAILABLE = True | |
| print("ZeroGPU support enabled") | |
| except ImportError: | |
| ZEROGPU_AVAILABLE = False | |
| print("ZeroGPU not available, running in standard mode") | |
| # Create dummy decorator for local development | |
| def spaces_gpu_decorator(duration=60): | |
| def decorator(func): | |
| return func | |
| return decorator | |
| spaces = type('spaces', (), {'GPU': spaces_gpu_decorator}) | |
| # Model configuration - Foundation-Sec-8B only | |
| MODEL_NAME = "fdtn-ai/Foundation-Sec-8B" | |
| # Initialize tokenizer and model using pipeline approach | |
| print(f"Loading model: {MODEL_NAME}") | |
| try: | |
| print(f"Initializing Foundation-Sec-8B model...") | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
| text_pipeline = pipeline( | |
| "text-generation", | |
| model=MODEL_NAME, | |
| tokenizer=tokenizer, | |
| torch_dtype=torch.bfloat16, | |
| device_map="auto", | |
| trust_remote_code=True | |
| ) | |
| print(f"Foundation-Sec-8B model initialized successfully") | |
| # Extract model and tokenizer from pipeline for direct access | |
| model = text_pipeline.model | |
| tok = text_pipeline.tokenizer | |
| except Exception as e: | |
| print(f"Error initializing Foundation-Sec-8B model: {str(e)}") | |
| print("Trying with simplified parameters...") | |
| try: | |
| # Try with simpler parameters | |
| text_pipeline = pipeline( | |
| "text-generation", | |
| model=MODEL_NAME, | |
| trust_remote_code=True | |
| ) | |
| model = text_pipeline.model | |
| tok = text_pipeline.tokenizer | |
| print(f"Foundation-Sec-8B model loaded with simplified parameters") | |
| except Exception as e2: | |
| print(f"Failed to load Foundation-Sec-8B model: {str(e2)}") | |
| raise RuntimeError(f"Could not load Foundation-Sec-8B model. Please ensure the model is accessible and try again. Error: {str(e2)}") | |
| # Log device information | |
| if hasattr(model, 'device'): | |
| print(f"Model loaded on device: {model.device}") | |
| else: | |
| device_info = next(model.parameters()).device | |
| print(f"Model parameters on device: {device_info}") | |
| print(f"CUDA available: {torch.cuda.is_available()}") | |
| if torch.cuda.is_available(): | |
| print(f"CUDA device count: {torch.cuda.device_count()}") | |
| print(f"Current CUDA device: {torch.cuda.current_device()}") | |
| print(f"CUDA device name: {torch.cuda.get_device_name()}") | |
| # Configuration parameters | |
| LEN_ALPHA = 0.7 # Length correction factor (0=no correction, 1=full average logP) | |
| # Sample data for testing | |
| CAMPAIGN_LIST = [ | |
| "Operation Aurora", | |
| "Dust Storm", | |
| "ShadowHammer", | |
| "NotPetya", | |
| "SolarWinds", | |
| ] | |
| ACTOR_LIST = ["APT1", "APT28", "APT33", "APT38", "FIN8"] | |
| # Sample ATT&CK technique IDs with names | |
| TECHNIQUE_LIST = [ | |
| "T1059 Command and Scripting Interpreter", | |
| "T1566 Phishing", | |
| "T1027 Obfuscated/Stored Files", | |
| "T1036 Masquerading", | |
| "T1105 Ingress Tool Transfer", | |
| "T1018 Remote System Discovery", | |
| "T1568 Dynamic Resolution", | |
| ] | |
| def phrase_log_prob(prompt, phrase): | |
| """Calculate log probability of a phrase given a prompt using the language model.""" | |
| try: | |
| # Log GPU usage information | |
| device_info = next(model.parameters()).device | |
| print(f"Running phrase_log_prob on device: {device_info}") | |
| ids_prompt = tok(prompt, return_tensors="pt").to(model.device)["input_ids"][0] | |
| ids_phrase = tok(phrase, add_special_tokens=False)["input_ids"] | |
| lp = 0.0 | |
| cur = ids_prompt.unsqueeze(0) | |
| for tid in ids_phrase: | |
| logits = model(cur).logits[0, -1].float() | |
| lp += torch.log_softmax(logits, -1)[tid].item() | |
| cur = torch.cat([cur, torch.tensor([[tid]], device=model.device)], 1) | |
| return lp | |
| except Exception as e: | |
| print(f"Error in phrase_log_prob: {e}") | |
| raise e | |
| def binary_assoc_score(prompt: str, phrase: str, neg="does NOT use", prompt_template="typically uses") -> float: | |
| """ | |
| Calculate binary association score: p ≈ P(use) / (P(use)+P(not use)) | |
| Applies length normalization to correct for longer phrases. | |
| Args: | |
| prompt: Base prompt string | |
| phrase: Phrase to evaluate | |
| neg: Negative template to replace positive template | |
| prompt_template: Positive template to be replaced | |
| Returns: | |
| Length-normalized association score between 0 and 1 | |
| """ | |
| lp_pos = phrase_log_prob(prompt, phrase) | |
| lp_neg = phrase_log_prob(prompt.replace(prompt_template, neg), phrase) | |
| # Logistic transformation | |
| prob = 1 / (1 + math.exp(lp_neg - lp_pos)) | |
| # Length normalization | |
| n_tok = len(tok(phrase, add_special_tokens=False)["input_ids"]) | |
| return prob / (n_tok ** LEN_ALPHA) | |
| def campaign_actor_associations(campaigns, actors): | |
| """Campaign × Actor の関連度を計算し、各CampaignごとにTop Actorを返す""" | |
| results = {} | |
| for camp in campaigns: | |
| prompt_base = CAMPAIGN_ACTOR_PROMPT.format(campaign=camp) | |
| actor_scores = {} | |
| for actor in actors: | |
| score = binary_assoc_score(prompt_base, actor, neg="is NOT associated with") | |
| actor_scores[actor] = score | |
| # スコア順でソート | |
| sorted_actors = sorted(actor_scores.items(), key=lambda x: x[1], reverse=True) | |
| results[camp] = sorted_actors | |
| return results | |
| def campaign_technique_matrix(campaigns, techniques, prompt_template="typically uses", neg_template="typically does NOT use"): | |
| """ | |
| Generate Campaign × Technique association matrix using binary scoring. | |
| Args: | |
| campaigns: List of campaign names | |
| techniques: List of technique names | |
| prompt_template: Template for positive association | |
| neg_template: Template for negative association | |
| Returns: | |
| DataFrame with campaigns as rows, techniques as columns, scores as values | |
| """ | |
| rows = {} | |
| for camp in campaigns: | |
| prompt_base = f"{camp} {prompt_template}" | |
| rows[camp] = { | |
| tech: binary_assoc_score(prompt_base, tech, neg=neg_template, prompt_template=prompt_template) | |
| for tech in techniques | |
| } | |
| return pd.DataFrame.from_dict(rows, orient="index") | |
| def campaign_actor_matrix(campaigns, actors): | |
| """Campaign × Actor 行列を生成""" | |
| rows = {} | |
| for camp in campaigns: | |
| prompt_base = CAMPAIGN_ACTOR_PROMPT.format(campaign=camp) | |
| rows[camp] = { | |
| actor: binary_assoc_score(prompt_base, actor, neg="is NOT associated with") | |
| for actor in actors | |
| } | |
| return pd.DataFrame.from_dict(rows, orient="index") | |
| def campaign_actor_probs(campaigns, actors, prompt_template="is conducted by"): | |
| """ | |
| Generate Campaign × Actor probability matrix using softmax normalization. | |
| Args: | |
| campaigns: List of campaign names | |
| actors: List of actor names | |
| prompt_template: Template for actor association prompt | |
| Returns: | |
| DataFrame with campaigns as rows, actors as columns, probabilities as values | |
| """ | |
| rows = {} | |
| for camp in campaigns: | |
| prompt = f"{camp} {prompt_template}" | |
| logps = [phrase_log_prob(prompt, a) for a in actors] | |
| # Softmax normalization (with max-shift for numerical stability) | |
| m = max(logps) | |
| ps = [math.exp(lp - m) for lp in logps] | |
| s = sum(ps) | |
| rows[camp] = {a: p/s for a, p in zip(actors, ps)} | |
| return pd.DataFrame.from_dict(rows, orient="index") | |
| def generate_actor_heatmap(c_list, a_list, actor_prompt_template): | |
| """Generate Campaign-Actor association heatmap with probability visualization.""" | |
| try: | |
| campaigns = [c.strip() for c in c_list.split(",") if c.strip()] | |
| actors = [a.strip() for a in a_list.split(",") if a.strip()] | |
| if not campaigns or not actors: | |
| fig, ax = plt.subplots(figsize=(8, 6)) | |
| ax.text(0.5, 0.5, 'Please enter both Campaigns and Actors', | |
| ha='center', va='center', fontsize=16) | |
| ax.set_xlim(0, 1) | |
| ax.set_ylim(0, 1) | |
| ax.axis('off') | |
| return fig | |
| print(f"Processing {len(campaigns)} campaigns and {len(actors)} actors...") | |
| print(f"Using prompt template: '{actor_prompt_template}'") | |
| # Check GPU availability | |
| if torch.cuda.is_available(): | |
| print(f"GPU computation enabled - Device: {torch.cuda.get_device_name()}") | |
| else: | |
| print("Running on CPU") | |
| # Calculate probability matrix | |
| df_ca = campaign_actor_probs(campaigns, actors, actor_prompt_template) | |
| print(f"Actor probability matrix shape: {df_ca.shape}") | |
| print("Actor probability matrix:") | |
| print(df_ca.round(4)) | |
| # Create heatmap with matplotlib/seaborn | |
| fig, ax = plt.subplots(figsize=(max(8, len(actors)*1.2), max(6, len(campaigns)*0.8))) | |
| sns.heatmap(df_ca, annot=True, cmap='plasma', fmt='.3f', | |
| cbar_kws={'label': 'P(actor)'}, ax=ax) | |
| ax.set_title('Campaign-Actor Probabilities (softmax normalized)', | |
| fontsize=14, pad=20) | |
| ax.set_xlabel('Actor', fontsize=12) | |
| ax.set_ylabel('Campaign', fontsize=12) | |
| # Adjust label rotation | |
| plt.setp(ax.get_xticklabels(), rotation=45, ha='right') | |
| plt.setp(ax.get_yticklabels(), rotation=0) | |
| plt.tight_layout() | |
| print("Actor heatmap generated successfully!") | |
| return fig | |
| except Exception as e: | |
| print(f"Error in generate_actor_heatmap: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| fig, ax = plt.subplots(figsize=(8, 6)) | |
| ax.text(0.5, 0.5, f'Error occurred: {str(e)}', | |
| ha='center', va='center', fontsize=12, color='red') | |
| ax.set_xlim(0, 1) | |
| ax.set_ylim(0, 1) | |
| ax.axis('off') | |
| return fig | |
| def generate_technique_heatmap(c_list, t_list, technique_prompt_template, technique_neg_template): | |
| """Generate Campaign-Technique association heatmap with binary scoring visualization.""" | |
| try: | |
| campaigns = [c.strip() for c in c_list.split(",") if c.strip()] | |
| techniques = [t.strip() for t in t_list.split(",") if t.strip()] | |
| if not campaigns or not techniques: | |
| fig, ax = plt.subplots(figsize=(8, 6)) | |
| ax.text(0.5, 0.5, 'Please enter both Campaigns and Techniques', | |
| ha='center', va='center', fontsize=16) | |
| ax.set_xlim(0, 1) | |
| ax.set_ylim(0, 1) | |
| ax.axis('off') | |
| return fig | |
| print(f"Processing {len(campaigns)} campaigns and {len(techniques)} techniques...") | |
| print(f"Using prompt templates: '{technique_prompt_template}' / '{technique_neg_template}'") | |
| # Check GPU availability | |
| if torch.cuda.is_available(): | |
| print(f"GPU computation enabled - Device: {torch.cuda.get_device_name()}") | |
| else: | |
| print("Running on CPU") | |
| # Calculate score matrix | |
| df_ct = campaign_technique_matrix(campaigns, techniques, technique_prompt_template, technique_neg_template) | |
| print(f"Score matrix shape: {df_ct.shape}") | |
| print("Score matrix:") | |
| print(df_ct.round(4)) | |
| # Create heatmap with matplotlib/seaborn | |
| fig, ax = plt.subplots(figsize=(max(8, len(techniques)*1.2), max(6, len(campaigns)*0.8))) | |
| sns.heatmap(df_ct, annot=True, cmap='viridis', fmt='.3f', | |
| cbar_kws={'label': 'Association Score'}, ax=ax) | |
| ax.set_title('Campaign-Technique Associations (len-norm, independent)', | |
| fontsize=14, pad=20) | |
| ax.set_xlabel('Technique', fontsize=12) | |
| ax.set_ylabel('Campaign', fontsize=12) | |
| # Adjust label rotation | |
| plt.setp(ax.get_xticklabels(), rotation=45, ha='right') | |
| plt.setp(ax.get_yticklabels(), rotation=0) | |
| plt.tight_layout() | |
| print("Technique heatmap generated successfully!") | |
| return fig | |
| except Exception as e: | |
| print(f"Error in generate_technique_heatmap: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| fig, ax = plt.subplots(figsize=(8, 6)) | |
| ax.text(0.5, 0.5, f'Error occurred: {str(e)}', | |
| ha='center', va='center', fontsize=12, color='red') | |
| ax.set_xlim(0, 1) | |
| ax.set_ylim(0, 1) | |
| ax.axis('off') | |
| return fig | |
| with gr.Blocks(title="LLM Threat Graph Demo") as demo: | |
| gr.Markdown("# 🕸️ LLM Threat Association Analysis\n*Visualizing Campaign-Actor-Technique relationships using Language Models*") | |
| # Common inputs | |
| with gr.Row(): | |
| campaigns = gr.Textbox( | |
| "Operation Aurora, Dust Storm, ShadowHammer, NotPetya, SolarWinds", | |
| label="Campaigns (comma-separated)", | |
| placeholder="e.g., Operation Aurora, NotPetya, Stuxnet" | |
| ) | |
| # Campaign-Actor section (probabilistic) | |
| gr.Markdown("## 👤 Campaign-Actor Associations") | |
| gr.Markdown("Visualizing Campaign-Actor relationships with probabilistic heatmaps") | |
| gr.Markdown(""" | |
| **Calculation Method**: `P(actor | "{campaign} is conducted by") (softmax normalized)` | |
| 1. Calculate `phrase_log_prob("{campaign} is conducted by", actor)` for each Actor | |
| 2. Apply softmax normalization to create probability distribution (probabilities sum to 1.0 per Campaign) | |
| 3. Result: Shows relative likelihood of each Actor conducting each Campaign | |
| """) | |
| with gr.Row(): | |
| actor_prompt_template = gr.Textbox( | |
| "is conducted by", | |
| label="Actor Prompt Template", | |
| placeholder="e.g., is conducted by, is attributed to" | |
| ) | |
| actors = gr.Textbox( | |
| "APT1, APT28, APT33, APT38, FIN8", | |
| label="Actors (comma-separated)", | |
| placeholder="e.g., APT1, Lazarus Group, Cozy Bear" | |
| ) | |
| btn_actor = gr.Button("Generate Actor Heatmap", variant="primary") | |
| plot_actor = gr.Plot(label="Campaign-Actor Heatmap") | |
| btn_actor.click( | |
| fn=generate_actor_heatmap, | |
| inputs=[campaigns, actors, actor_prompt_template], | |
| outputs=plot_actor, | |
| show_progress=True | |
| ) | |
| # Campaign-Technique section (independent scoring) | |
| gr.Markdown("## 🛠️ Campaign-Technique Associations") | |
| gr.Markdown("Visualizing Campaign-Technique relationships with independent association scores") | |
| gr.Markdown(""" | |
| **Calculation Method**: `Binary Association Score (length-normalized, independent)` | |
| 1. For each Technique, calculate: | |
| - `lp_pos = phrase_log_prob("{campaign} typically uses", technique)` | |
| - `lp_neg = phrase_log_prob("{campaign} typically does NOT use", technique)` | |
| 2. Apply logistic transformation: `prob = 1 / (1 + exp(lp_neg - lp_pos))` | |
| 3. Length normalization: `score = prob / (n_tokens^0.7)` (penalty for longer phrases) | |
| 4. Result: Independent association scores (0-1) for each Campaign-Technique pair | |
| """) | |
| with gr.Row(): | |
| technique_prompt_template = gr.Textbox( | |
| "typically uses", | |
| label="Technique Prompt Template (positive)", | |
| placeholder="e.g., typically uses, commonly employs" | |
| ) | |
| technique_neg_template = gr.Textbox( | |
| "typically does NOT use", | |
| label="Technique Prompt Template (negative)", | |
| placeholder="e.g., typically does NOT use, never employs" | |
| ) | |
| techniques = gr.Textbox( | |
| "T1059 Command and Scripting Interpreter, T1566 Phishing, T1027 Obfuscated/Stored Files, T1036 Masquerading, T1105 Ingress Tool Transfer, T1018 Remote System Discovery, T1568 Dynamic Resolution", | |
| label="Techniques (comma-separated)", | |
| placeholder="e.g., T1059 Command and Scripting Interpreter, T1566 Phishing" | |
| ) | |
| btn_technique = gr.Button("Generate Technique Heatmap", variant="primary") | |
| plot_technique = gr.Plot(label="Campaign-Technique Heatmap") | |
| btn_technique.click( | |
| fn=generate_technique_heatmap, | |
| inputs=[campaigns, techniques, technique_prompt_template, technique_neg_template], | |
| outputs=plot_technique, | |
| show_progress=True | |
| ) | |
| demo.launch() | |