Image2Video / app_quant_latent.py
rahul7star's picture
Update app_quant_latent.py
02db00d verified
raw
history blame
24.8 kB
import torch
import spaces
import gradio as gr
import sys
import platform
import diffusers
import transformers
import psutil
import os
import time
from diffusers import BitsAndBytesConfig as DiffusersBitsAndBytesConfig
from diffusers import ZImagePipeline, AutoModel
from transformers import BitsAndBytesConfig as TransformersBitsAndBytesConfig
latent_history = []
# ============================================================
# LOGGING BUFFER
# ============================================================
LOGS = ""
def log(msg):
global LOGS
print(msg)
LOGS += msg + "\n"
return msg
# ============================================================
# SYSTEM METRICS — LIVE GPU + CPU MONITORING
# ============================================================
def log_system_stats(tag=""):
try:
log(f"\n===== 🔥 SYSTEM STATS {tag} =====")
# ============= GPU STATS =============
if torch.cuda.is_available():
allocated = torch.cuda.memory_allocated(0) / 1e9
reserved = torch.cuda.memory_reserved(0) / 1e9
total = torch.cuda.get_device_properties(0).total_memory / 1e9
free = total - allocated
log(f"💠 GPU Total : {total:.2f} GB")
log(f"💠 GPU Allocated : {allocated:.2f} GB")
log(f"💠 GPU Reserved : {reserved:.2f} GB")
log(f"💠 GPU Free : {free:.2f} GB")
# ============= CPU STATS ============
cpu = psutil.cpu_percent()
ram_used = psutil.virtual_memory().used / 1e9
ram_total = psutil.virtual_memory().total / 1e9
log(f"🧠 CPU Usage : {cpu}%")
log(f"🧠 RAM Used : {ram_used:.2f} GB / {ram_total:.2f} GB")
except Exception as e:
log(f"⚠️ Failed to log system stats: {e}")
# ============================================================
# ENVIRONMENT INFO
# ============================================================
log("===================================================")
log("🔍 Z-IMAGE-TURBO DEBUGGING + LIVE METRIC LOGGER")
log("===================================================\n")
log(f"📌 PYTHON VERSION : {sys.version.replace(chr(10),' ')}")
log(f"📌 PLATFORM : {platform.platform()}")
log(f"📌 TORCH VERSION : {torch.__version__}")
log(f"📌 TRANSFORMERS VERSION : {transformers.__version__}")
log(f"📌 DIFFUSERS VERSION : {diffusers.__version__}")
log(f"📌 CUDA AVAILABLE : {torch.cuda.is_available()}")
log_system_stats("AT STARTUP")
if not torch.cuda.is_available():
raise RuntimeError("❌ CUDA Required")
device = "cuda"
gpu_id = 0
# ============================================================
# MODEL SETTINGS
# ============================================================
model_cache = "./weights/"
model_id = "Tongyi-MAI/Z-Image-Turbo"
torch_dtype = torch.bfloat16
USE_CPU_OFFLOAD = False
log("\n===================================================")
log("🧠 MODEL CONFIGURATION")
log("===================================================")
log(f"Model ID : {model_id}")
log(f"Model Cache Directory : {model_cache}")
log(f"torch_dtype : {torch_dtype}")
log(f"USE_CPU_OFFLOAD : {USE_CPU_OFFLOAD}")
log_system_stats("BEFORE TRANSFORMER LOAD")
# ============================================================
# FUNCTION TO CONVERT LATENTS TO IMAGE
# ============================================================
def latent_to_image(latent):
try:
img_tensor = pipe.vae.decode(latent)
img_tensor = (img_tensor / 2 + 0.5).clamp(0, 1)
pil_img = T.ToPILImage()(img_tensor[0])
return pil_img
except Exception as e:
log(f"⚠️ Failed to decode latent: {e}")
return None
# ============================================================
# SAFE TRANSFORMER INSPECTION
# ============================================================
def inspect_transformer(model, name):
log(f"\n🔍🔍 FULL TRANSFORMER DEBUG DUMP: {name}")
log("=" * 80)
try:
log(f"Model class : {model.__class__.__name__}")
log(f"DType : {getattr(model, 'dtype', 'unknown')}")
log(f"Device : {next(model.parameters()).device}")
log(f"Requires Grad? : {any(p.requires_grad for p in model.parameters())}")
# Check quantization
if hasattr(model, "is_loaded_in_4bit"):
log(f"4bit Quantization : {model.is_loaded_in_4bit}")
if hasattr(model, "is_loaded_in_8bit"):
log(f"8bit Quantization : {model.is_loaded_in_8bit}")
# Find blocks
candidates = ["transformer_blocks", "blocks", "layers", "encoder", "model"]
blocks = None
chosen_attr = None
for attr in candidates:
if hasattr(model, attr):
blocks = getattr(model, attr)
chosen_attr = attr
break
log(f"Block container attr : {chosen_attr}")
if blocks is None:
log("⚠️ No valid block container found.")
return
if not hasattr(blocks, "__len__"):
log("⚠️ Blocks exist but not iterable.")
return
total = len(blocks)
log(f"Total Blocks : {total}")
log("-" * 80)
# Inspect first N blocks
N = min(20, total)
for i in range(N):
block = blocks[i]
log(f"\n🧩 Block [{i}/{total-1}]")
log(f"Class: {block.__class__.__name__}")
# Print submodules
for n, m in block.named_children():
log(f" ├─ {n}: {m.__class__.__name__}")
# Print attention related
if hasattr(block, "attn"):
attn = block.attn
log(f" ├─ Attention: {attn.__class__.__name__}")
log(f" │ Heads : {getattr(attn, 'num_heads', 'unknown')}")
log(f" │ Dim : {getattr(attn, 'hidden_size', 'unknown')}")
log(f" │ Backend : {getattr(attn, 'attention_backend', 'unknown')}")
# Device + dtype info
try:
dev = next(block.parameters()).device
log(f" ├─ Device : {dev}")
except StopIteration:
pass
try:
dt = next(block.parameters()).dtype
log(f" ├─ DType : {dt}")
except StopIteration:
pass
log("\n🔚 END TRANSFORMER DEBUG DUMP")
log("=" * 80)
except Exception as e:
log(f"❌ ERROR IN INSPECTOR: {e}")
import torch
import time
# ---------- UTILITY ----------
def pretty_header(title):
log("\n\n" + "=" * 80)
log(f"🎛️ {title}")
log("=" * 80 + "\n")
# ---------- MEMORY ----------
def get_vram(prefix=""):
try:
allocated = torch.cuda.memory_allocated() / 1024**2
reserved = torch.cuda.memory_reserved() / 1024**2
log(f"{prefix}Allocated VRAM : {allocated:.2f} MB")
log(f"{prefix}Reserved VRAM : {reserved:.2f} MB")
except:
log(f"{prefix}VRAM: CUDA not available")
# ---------- MODULE INSPECT ----------
def inspect_module(name, module):
pretty_header(f"🔬 Inspecting {name}")
try:
log(f"📦 Class : {module.__class__.__name__}")
log(f"🔢 DType : {getattr(module, 'dtype', 'unknown')}")
log(f"💻 Device : {next(module.parameters()).device}")
log(f"🧮 Params : {sum(p.numel() for p in module.parameters()):,}")
# Quantization state
if hasattr(module, "is_loaded_in_4bit"):
log(f"⚙️ 4-bit QLoRA : {module.is_loaded_in_4bit}")
if hasattr(module, "is_loaded_in_8bit"):
log(f"⚙️ 8-bit load : {module.is_loaded_in_8bit}")
# Attention backend (DiT)
if hasattr(module, "set_attention_backend"):
try:
attn = getattr(module, "attention_backend", None)
log(f"🚀 Attention Backend: {attn}")
except:
pass
# Search for blocks
candidates = ["transformer_blocks", "blocks", "layers", "encoder", "model"]
blocks = None
chosen_attr = None
for attr in candidates:
if hasattr(module, attr):
blocks = getattr(module, attr)
chosen_attr = attr
break
log(f"\n📚 Block Container : {chosen_attr}")
if blocks is None:
log("⚠️ No block structure found")
return
if not hasattr(blocks, "__len__"):
log("⚠️ Blocks exist but are not iterable")
return
total = len(blocks)
log(f"🔢 Total Blocks : {total}\n")
# Inspect first 15 blocks
N = min(15, total)
for i in range(N):
blk = blocks[i]
log(f"\n🧩 Block [{i}/{total-1}] — {blk.__class__.__name__}")
for n, m in blk.named_children():
log(f" ├─ {n:<15} {m.__class__.__name__}")
# Attention details
if hasattr(blk, "attn"):
a = blk.attn
log(f" ├─ Attention")
log(f" │ Heads : {getattr(a, 'num_heads', 'unknown')}")
log(f" │ Dim : {getattr(a, 'hidden_size', 'unknown')}")
log(f" │ Backend : {getattr(a, 'attention_backend', 'unknown')}")
# Device / dtype
try:
log(f" ├─ Device : {next(blk.parameters()).device}")
log(f" ├─ DType : {next(blk.parameters()).dtype}")
except StopIteration:
pass
get_vram(" ▶ ")
except Exception as e:
log(f"❌ Module inspect error: {e}")
# ---------- LORA INSPECTION ----------
def inspect_loras(pipe):
pretty_header("🧩 LoRA ADAPTERS")
try:
if not hasattr(pipe, "lora_state_dict") and not hasattr(pipe, "adapter_names"):
log("⚠️ No LoRA system detected.")
return
if hasattr(pipe, "adapter_names"):
names = pipe.adapter_names
log(f"Available Adapters: {names}")
if hasattr(pipe, "active_adapters"):
log(f"Active Adapters : {pipe.active_adapters}")
if hasattr(pipe, "lora_scale"):
log(f"LoRA Scale : {pipe.lora_scale}")
# LoRA modules
if hasattr(pipe, "transformer") and hasattr(pipe.transformer, "modules"):
for name, module in pipe.transformer.named_modules():
if "lora" in name.lower():
log(f" 🔧 LoRA Module: {name} ({module.__class__.__name__})")
except Exception as e:
log(f"❌ LoRA inspect error: {e}")
# ---------- PIPELINE INSPECTOR ----------
def debug_pipeline(pipe):
pretty_header("🚀 FULL PIPELINE DEBUGGING")
try:
log(f"Pipeline Class : {pipe.__class__.__name__}")
log(f"Attention Impl : {getattr(pipe, 'attn_implementation', 'unknown')}")
log(f"Device : {pipe.device}")
except:
pass
get_vram("▶ ")
# Inspect TRANSFORMER
if hasattr(pipe, "transformer"):
inspect_module("Transformer", pipe.transformer)
# Inspect TEXT ENCODER
if hasattr(pipe, "text_encoder") and pipe.text_encoder is not None:
inspect_module("Text Encoder", pipe.text_encoder)
# Inspect UNET (if ZImage pipeline has it)
if hasattr(pipe, "unet"):
inspect_module("UNet", pipe.unet)
# LoRA adapters
inspect_loras(pipe)
pretty_header("🎉 END DEBUG REPORT")
# ============================================================
# LOAD TRANSFORMER — WITH LIVE STATS
# ============================================================
log("\n===================================================")
log("🔧 LOADING TRANSFORMER BLOCK")
log("===================================================")
log("📌 Logging memory before load:")
log_system_stats("START TRANSFORMER LOAD")
try:
quant_cfg = DiffusersBitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch_dtype,
bnb_4bit_use_double_quant=True,
)
transformer = AutoModel.from_pretrained(
model_id,
cache_dir=model_cache,
subfolder="transformer",
quantization_config=quant_cfg,
torch_dtype=torch_dtype,
device_map=device,
)
log("✅ Transformer loaded successfully.")
except Exception as e:
log(f"❌ Transformer load failed: {e}")
transformer = None
log_system_stats("AFTER TRANSFORMER LOAD")
if transformer:
inspect_transformer(transformer, "Transformer")
# ============================================================
# LOAD TEXT ENCODER
# ============================================================
log("\n===================================================")
log("🔧 LOADING TEXT ENCODER")
log("===================================================")
log_system_stats("START TEXT ENCODER LOAD")
try:
quant_cfg2 = TransformersBitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch_dtype,
bnb_4bit_use_double_quant=True,
)
text_encoder = AutoModel.from_pretrained(
model_id,
cache_dir=model_cache,
subfolder="text_encoder",
quantization_config=quant_cfg2,
torch_dtype=torch_dtype,
device_map=device,
)
log("✅ Text encoder loaded successfully.")
except Exception as e:
log(f"❌ Text encoder load failed: {e}")
text_encoder = None
log_system_stats("AFTER TEXT ENCODER LOAD")
if text_encoder:
inspect_transformer(text_encoder, "Text Encoder")
# ============================================================
# BUILD PIPELINE
# ============================================================
log("\n===================================================")
log("🔧 BUILDING PIPELINE")
log("===================================================")
log_system_stats("START PIPELINE BUILD")
try:
pipe = ZImagePipeline.from_pretrained(
model_id,
transformer=transformer,
text_encoder=text_encoder,
torch_dtype=torch_dtype,
)
pipe.transformer.set_attention_backend("_flash_3")
# pipe.load_lora_weights("bdsqlsz/qinglong_DetailedEyes_Z-Image", weight_name="qinglong_detailedeye_z-imageV2(comfy).safetensors", adapter_name="lora")
pipe.load_lora_weights("rahul7star/ZImageLora",
weight_name="NSFW/doggystyle_pov.safetensors", adapter_name="lora")
pipe.set_adapters(["lora",], adapter_weights=[1.])
pipe.fuse_lora(adapter_names=["lora"], lora_scale=0.75)
debug_pipeline(pipe)
# pipe.unload_lora_weights()
pipe.to("cuda")
log("✅ Pipeline built successfully.")
LOGS.append(log)
except Exception as e:
log(f"❌ Pipeline build failed: {e}")
pipe = None
log_system_stats("AFTER PIPELINE BUILD")
# -----------------------------
# Monkey-patch prepare_latents
# -----------------------------
# -----------------------------
# Monkey-patch prepare_latents
# -----------------------------
if pipe is not None:
original_prepare_latents = pipe.prepare_latents
def logged_prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
result_latents = original_prepare_latents(
batch_size, num_channels_latents, height, width, dtype, device, generator, latents
)
log_msg = f"🔹 prepare_latents called | shape={result_latents.shape}, dtype={result_latents.dtype}, device={result_latents.device}"
if hasattr(self, "_latents_log"):
self._latents_log.append(log_msg)
else:
self._latents_log = [log_msg]
return result_latents
pipe.prepare_latents = logged_prepare_latents.__get__(pipe)
else:
log("❌ WARNING: Pipe not initialized; skipping prepare_latents patch")
# Apply patch
pipe.prepare_latents = logged_prepare_latents.__get__(pipe)
from PIL import Image
import torch
# --------------------------
# Helper: Safe latent extractor
# --------------------------
def safe_get_latents(pipe, height, width, generator, device, LOGS):
try:
latents = pipe.prepare_latents(
batch_size=1,
num_channels=getattr(pipe.unet, "in_channels", 4),
height=height,
width=width,
dtype=torch.float32,
device=device,
generator=generator
)
LOGS.append(f"🔹 Latents shape: {latents.shape}, dtype: {latents.dtype}, device: {latents.device}")
return latents
except Exception as e:
LOGS.append(f"⚠️ Latent extraction failed: {e}")
return torch.randn((1, 4, height // 8, width // 8), generator=generator, device=device)
# --------------------------
# Main generation function
# --------------------------
@spaces.GPU
def generate_image(prompt, height, width, steps, seed, guidance_scale=0.0):
LOGS = []
latents = None
image = None
gallery = []
# placeholder image if all fails
placeholder = Image.new("RGB", (width, height), color=(255, 255, 255))
print(prompt)
try:
generator = torch.Generator(device).manual_seed(int(seed))
# -------------------------------
# Try advanced latent extraction
# -------------------------------
try:
latents = safe_get_latents(pipe, height, width, generator, device, LOGS)
output = pipe(
prompt=prompt,
height=height,
width=width,
num_inference_steps=steps,
guidance_scale=guidance_scale,
generator=generator,
latents=latents
)
image = output.images[0]
gallery = [image]
LOGS.extend(getattr(pipe, "_latents_log", []))
LOGS.append("✅ Advanced latent pipeline succeeded.")
except Exception as e:
LOGS.append(f"⚠️ Latent mode failed: {e}")
LOGS.append("🔁 Switching to standard pipeline...")
try:
output = pipe(
prompt=prompt,
height=height,
width=width,
num_inference_steps=steps,
guidance_scale=guidance_scale,
generator=generator,
)
image = output.images[0]
gallery = [image]
LOGS.append("✅ Standard pipeline succeeded.")
except Exception as e2:
LOGS.append(f"❌ Standard pipeline failed: {e2}")
image = placeholder
gallery = [image]
return image, gallery, LOGS
except Exception as e:
LOGS.append(f"❌ Total failure: {e}")
return placeholder, [placeholder], LOGS
@spaces.GPU
def generate_image_backup(prompt, height, width, steps, seed, guidance_scale=0.0, return_latents=False):
"""
Robust dual pipeline:
- Advanced latent generation first
- Fallback to standard pipeline if latent fails
- Always returns final image
- Returns gallery (latents or final image) and logs
"""
LOGS = []
image = None
latents = None
gallery = []
# Keep a placeholder original image (white) in case everything fails
original_image = Image.new("RGB", (width, height), color=(255, 255, 255))
try:
generator = torch.Generator(device).manual_seed(int(seed))
# -------------------------------
# Try advanced latent generation
# -------------------------------
try:
batch_size = 1
num_channels_latents = getattr(pipe.unet, "in_channels", None)
if num_channels_latents is None:
raise AttributeError("pipe.unet.in_channels not found, fallback to standard pipeline")
latents = pipe.prepare_latents(
batch_size=batch_size,
num_channels=num_channels_latents,
height=height,
width=width,
dtype=torch.float32,
device=device,
generator=generator
)
LOGS.append(f"✅ Latents prepared: {latents.shape}")
output = pipe(
prompt=prompt,
height=height,
width=width,
num_inference_steps=steps,
guidance_scale=guidance_scale,
generator=generator,
latents=latents
)
image = output.images[0]
gallery = [image] if image else []
LOGS.append("✅ Advanced latent generation succeeded.")
# -------------------------------
# Fallback to standard pipeline
# -------------------------------
except Exception as e_latent:
LOGS.append(f"⚠️ Advanced latent generation failed: {e_latent}")
LOGS.append("🔁 Falling back to standard pipeline...")
try:
output = pipe(
prompt=prompt,
height=height,
width=width,
num_inference_steps=steps,
guidance_scale=guidance_scale,
generator=generator
)
image = output.images[0]
gallery = [image] if image else []
LOGS.append("✅ Standard pipeline generation succeeded.")
except Exception as e_standard:
LOGS.append(f"❌ Standard pipeline generation failed: {e_standard}")
image = original_image # Always return some image
gallery = [image]
# -------------------------------
# Return all 3 outputs
# -------------------------------
return image, gallery, LOGS
except Exception as e:
LOGS.append(f"❌ Inference failed entirely: {e}")
return original_image, [original_image], LOGS
# ============================================================
# UI
# ============================================================
# with gr.Blocks(title="Z-Image- experiment - dont run")as demo:
# gr.Markdown("# **🚀 do not run Z-Image-Turbo — Final Image & Latents**")
# with gr.Row():
# with gr.Column(scale=1):
# prompt = gr.Textbox(label="Prompt", value="boat in Ocean")
# height = gr.Slider(256, 2048, value=1024, step=8, label="Height")
# width = gr.Slider(256, 2048, value=1024, step=8, label="Width")
# steps = gr.Slider(1, 50, value=20, step=1, label="Inference Steps")
# seed = gr.Number(value=42, label="Seed")
# run_btn = gr.Button("Generate Image")
# with gr.Column(scale=1):
# final_image = gr.Image(label="Final Image")
# latent_gallery = gr.Gallery(
# label="Latent Steps",
# columns=4,
# height=256,
# preview=True
# )
# logs_box = gr.Textbox(label="Logs", lines=15)
# run_btn.click(
# generate_image,
# inputs=[prompt, height, width, steps, seed],
# outputs=[final_image, latent_gallery, logs_box]
# )
with gr.Blocks(title="Z-Image-Turbo") as demo:
with gr.Tabs():
with gr.TabItem("Image & Latents"):
with gr.Row():
with gr.Column(scale=1):
prompt = gr.Textbox(label="Prompt", value="boat in Ocean")
height = gr.Slider(256, 2048, value=1024, step=8, label="Height")
width = gr.Slider(256, 2048, value=1024, step=8, label="Width")
steps = gr.Slider(1, 50, value=20, step=1, label="Inference Steps")
seed = gr.Number(value=42, label="Seed")
run_btn = gr.Button("Generate Image")
with gr.Column(scale=1):
final_image = gr.Image(label="Final Image")
latent_gallery = gr.Gallery(
label="Latent Steps", columns=4, height=256, preview=True
)
with gr.TabItem("Logs"):
logs_box = gr.Textbox(label="All Logs", lines=25)
# Wire the button AFTER all components exist
run_btn.click(
generate_image,
inputs=[prompt, height, width, steps, seed],
outputs=[final_image, latent_gallery, logs_box]
)
demo.launch()