import torch import spaces import gradio as gr import sys import platform import diffusers import transformers import psutil import os import time from diffusers import BitsAndBytesConfig as DiffusersBitsAndBytesConfig from diffusers import ZImagePipeline, AutoModel from transformers import BitsAndBytesConfig as TransformersBitsAndBytesConfig latent_history = [] # ============================================================ # LOGGING BUFFER # ============================================================ LOGS = "" def log(msg): global LOGS print(msg) LOGS += msg + "\n" return msg # ============================================================ # SYSTEM METRICS — LIVE GPU + CPU MONITORING # ============================================================ def log_system_stats(tag=""): try: log(f"\n===== šŸ”„ SYSTEM STATS {tag} =====") # ============= GPU STATS ============= if torch.cuda.is_available(): allocated = torch.cuda.memory_allocated(0) / 1e9 reserved = torch.cuda.memory_reserved(0) / 1e9 total = torch.cuda.get_device_properties(0).total_memory / 1e9 free = total - allocated log(f"šŸ’  GPU Total : {total:.2f} GB") log(f"šŸ’  GPU Allocated : {allocated:.2f} GB") log(f"šŸ’  GPU Reserved : {reserved:.2f} GB") log(f"šŸ’  GPU Free : {free:.2f} GB") # ============= CPU STATS ============ cpu = psutil.cpu_percent() ram_used = psutil.virtual_memory().used / 1e9 ram_total = psutil.virtual_memory().total / 1e9 log(f"🧠 CPU Usage : {cpu}%") log(f"🧠 RAM Used : {ram_used:.2f} GB / {ram_total:.2f} GB") except Exception as e: log(f"āš ļø Failed to log system stats: {e}") # ============================================================ # ENVIRONMENT INFO # ============================================================ log("===================================================") log("šŸ” Z-IMAGE-TURBO DEBUGGING + LIVE METRIC LOGGER") log("===================================================\n") log(f"šŸ“Œ PYTHON VERSION : {sys.version.replace(chr(10),' ')}") log(f"šŸ“Œ PLATFORM : {platform.platform()}") log(f"šŸ“Œ TORCH VERSION : {torch.__version__}") log(f"šŸ“Œ TRANSFORMERS VERSION : {transformers.__version__}") log(f"šŸ“Œ DIFFUSERS VERSION : {diffusers.__version__}") log(f"šŸ“Œ CUDA AVAILABLE : {torch.cuda.is_available()}") log_system_stats("AT STARTUP") if not torch.cuda.is_available(): raise RuntimeError("āŒ CUDA Required") device = "cuda" gpu_id = 0 # ============================================================ # MODEL SETTINGS # ============================================================ model_cache = "./weights/" model_id = "Tongyi-MAI/Z-Image-Turbo" torch_dtype = torch.bfloat16 USE_CPU_OFFLOAD = False log("\n===================================================") log("🧠 MODEL CONFIGURATION") log("===================================================") log(f"Model ID : {model_id}") log(f"Model Cache Directory : {model_cache}") log(f"torch_dtype : {torch_dtype}") log(f"USE_CPU_OFFLOAD : {USE_CPU_OFFLOAD}") log_system_stats("BEFORE TRANSFORMER LOAD") # ============================================================ # FUNCTION TO CONVERT LATENTS TO IMAGE # ============================================================ def latent_to_image(latent): try: img_tensor = pipe.vae.decode(latent) img_tensor = (img_tensor / 2 + 0.5).clamp(0, 1) pil_img = T.ToPILImage()(img_tensor[0]) return pil_img except Exception as e: log(f"āš ļø Failed to decode latent: {e}") return None # ============================================================ # SAFE TRANSFORMER INSPECTION # ============================================================ def inspect_transformer(model, name): log(f"\nšŸ”šŸ” FULL TRANSFORMER DEBUG DUMP: {name}") log("=" * 80) try: log(f"Model class : {model.__class__.__name__}") log(f"DType : {getattr(model, 'dtype', 'unknown')}") log(f"Device : {next(model.parameters()).device}") log(f"Requires Grad? : {any(p.requires_grad for p in model.parameters())}") # Check quantization if hasattr(model, "is_loaded_in_4bit"): log(f"4bit Quantization : {model.is_loaded_in_4bit}") if hasattr(model, "is_loaded_in_8bit"): log(f"8bit Quantization : {model.is_loaded_in_8bit}") # Find blocks candidates = ["transformer_blocks", "blocks", "layers", "encoder", "model"] blocks = None chosen_attr = None for attr in candidates: if hasattr(model, attr): blocks = getattr(model, attr) chosen_attr = attr break log(f"Block container attr : {chosen_attr}") if blocks is None: log("āš ļø No valid block container found.") return if not hasattr(blocks, "__len__"): log("āš ļø Blocks exist but not iterable.") return total = len(blocks) log(f"Total Blocks : {total}") log("-" * 80) # Inspect first N blocks N = min(20, total) for i in range(N): block = blocks[i] log(f"\n🧩 Block [{i}/{total-1}]") log(f"Class: {block.__class__.__name__}") # Print submodules for n, m in block.named_children(): log(f" ā”œā”€ {n}: {m.__class__.__name__}") # Print attention related if hasattr(block, "attn"): attn = block.attn log(f" ā”œā”€ Attention: {attn.__class__.__name__}") log(f" │ Heads : {getattr(attn, 'num_heads', 'unknown')}") log(f" │ Dim : {getattr(attn, 'hidden_size', 'unknown')}") log(f" │ Backend : {getattr(attn, 'attention_backend', 'unknown')}") # Device + dtype info try: dev = next(block.parameters()).device log(f" ā”œā”€ Device : {dev}") except StopIteration: pass try: dt = next(block.parameters()).dtype log(f" ā”œā”€ DType : {dt}") except StopIteration: pass log("\nšŸ”š END TRANSFORMER DEBUG DUMP") log("=" * 80) except Exception as e: log(f"āŒ ERROR IN INSPECTOR: {e}") import torch import time # ---------- UTILITY ---------- def pretty_header(title): log("\n\n" + "=" * 80) log(f"šŸŽ›ļø {title}") log("=" * 80 + "\n") # ---------- MEMORY ---------- def get_vram(prefix=""): try: allocated = torch.cuda.memory_allocated() / 1024**2 reserved = torch.cuda.memory_reserved() / 1024**2 log(f"{prefix}Allocated VRAM : {allocated:.2f} MB") log(f"{prefix}Reserved VRAM : {reserved:.2f} MB") except: log(f"{prefix}VRAM: CUDA not available") # ---------- MODULE INSPECT ---------- def inspect_module(name, module): pretty_header(f"šŸ”¬ Inspecting {name}") try: log(f"šŸ“¦ Class : {module.__class__.__name__}") log(f"šŸ”¢ DType : {getattr(module, 'dtype', 'unknown')}") log(f"šŸ’» Device : {next(module.parameters()).device}") log(f"🧮 Params : {sum(p.numel() for p in module.parameters()):,}") # Quantization state if hasattr(module, "is_loaded_in_4bit"): log(f"āš™ļø 4-bit QLoRA : {module.is_loaded_in_4bit}") if hasattr(module, "is_loaded_in_8bit"): log(f"āš™ļø 8-bit load : {module.is_loaded_in_8bit}") # Attention backend (DiT) if hasattr(module, "set_attention_backend"): try: attn = getattr(module, "attention_backend", None) log(f"šŸš€ Attention Backend: {attn}") except: pass # Search for blocks candidates = ["transformer_blocks", "blocks", "layers", "encoder", "model"] blocks = None chosen_attr = None for attr in candidates: if hasattr(module, attr): blocks = getattr(module, attr) chosen_attr = attr break log(f"\nšŸ“š Block Container : {chosen_attr}") if blocks is None: log("āš ļø No block structure found") return if not hasattr(blocks, "__len__"): log("āš ļø Blocks exist but are not iterable") return total = len(blocks) log(f"šŸ”¢ Total Blocks : {total}\n") # Inspect first 15 blocks N = min(15, total) for i in range(N): blk = blocks[i] log(f"\n🧩 Block [{i}/{total-1}] — {blk.__class__.__name__}") for n, m in blk.named_children(): log(f" ā”œā”€ {n:<15} {m.__class__.__name__}") # Attention details if hasattr(blk, "attn"): a = blk.attn log(f" ā”œā”€ Attention") log(f" │ Heads : {getattr(a, 'num_heads', 'unknown')}") log(f" │ Dim : {getattr(a, 'hidden_size', 'unknown')}") log(f" │ Backend : {getattr(a, 'attention_backend', 'unknown')}") # Device / dtype try: log(f" ā”œā”€ Device : {next(blk.parameters()).device}") log(f" ā”œā”€ DType : {next(blk.parameters()).dtype}") except StopIteration: pass get_vram(" ā–¶ ") except Exception as e: log(f"āŒ Module inspect error: {e}") # ---------- LORA INSPECTION ---------- def inspect_loras(pipe): pretty_header("🧩 LoRA ADAPTERS") try: if not hasattr(pipe, "lora_state_dict") and not hasattr(pipe, "adapter_names"): log("āš ļø No LoRA system detected.") return if hasattr(pipe, "adapter_names"): names = pipe.adapter_names log(f"Available Adapters: {names}") if hasattr(pipe, "active_adapters"): log(f"Active Adapters : {pipe.active_adapters}") if hasattr(pipe, "lora_scale"): log(f"LoRA Scale : {pipe.lora_scale}") # LoRA modules if hasattr(pipe, "transformer") and hasattr(pipe.transformer, "modules"): for name, module in pipe.transformer.named_modules(): if "lora" in name.lower(): log(f" šŸ”§ LoRA Module: {name} ({module.__class__.__name__})") except Exception as e: log(f"āŒ LoRA inspect error: {e}") # ---------- PIPELINE INSPECTOR ---------- def debug_pipeline(pipe): pretty_header("šŸš€ FULL PIPELINE DEBUGGING") try: log(f"Pipeline Class : {pipe.__class__.__name__}") log(f"Attention Impl : {getattr(pipe, 'attn_implementation', 'unknown')}") log(f"Device : {pipe.device}") except: pass get_vram("ā–¶ ") # Inspect TRANSFORMER if hasattr(pipe, "transformer"): inspect_module("Transformer", pipe.transformer) # Inspect TEXT ENCODER if hasattr(pipe, "text_encoder") and pipe.text_encoder is not None: inspect_module("Text Encoder", pipe.text_encoder) # Inspect UNET (if ZImage pipeline has it) if hasattr(pipe, "unet"): inspect_module("UNet", pipe.unet) # LoRA adapters inspect_loras(pipe) pretty_header("šŸŽ‰ END DEBUG REPORT") # ============================================================ # LOAD TRANSFORMER — WITH LIVE STATS # ============================================================ log("\n===================================================") log("šŸ”§ LOADING TRANSFORMER BLOCK") log("===================================================") log("šŸ“Œ Logging memory before load:") log_system_stats("START TRANSFORMER LOAD") try: quant_cfg = DiffusersBitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch_dtype, bnb_4bit_use_double_quant=True, ) transformer = AutoModel.from_pretrained( model_id, cache_dir=model_cache, subfolder="transformer", quantization_config=quant_cfg, torch_dtype=torch_dtype, device_map=device, ) log("āœ… Transformer loaded successfully.") except Exception as e: log(f"āŒ Transformer load failed: {e}") transformer = None log_system_stats("AFTER TRANSFORMER LOAD") if transformer: inspect_transformer(transformer, "Transformer") # ============================================================ # LOAD TEXT ENCODER # ============================================================ log("\n===================================================") log("šŸ”§ LOADING TEXT ENCODER") log("===================================================") log_system_stats("START TEXT ENCODER LOAD") try: quant_cfg2 = TransformersBitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch_dtype, bnb_4bit_use_double_quant=True, ) text_encoder = AutoModel.from_pretrained( model_id, cache_dir=model_cache, subfolder="text_encoder", quantization_config=quant_cfg2, torch_dtype=torch_dtype, device_map=device, ) log("āœ… Text encoder loaded successfully.") except Exception as e: log(f"āŒ Text encoder load failed: {e}") text_encoder = None log_system_stats("AFTER TEXT ENCODER LOAD") if text_encoder: inspect_transformer(text_encoder, "Text Encoder") # ============================================================ # BUILD PIPELINE # ============================================================ log("\n===================================================") log("šŸ”§ BUILDING PIPELINE") log("===================================================") log_system_stats("START PIPELINE BUILD") try: pipe = ZImagePipeline.from_pretrained( model_id, transformer=transformer, text_encoder=text_encoder, torch_dtype=torch_dtype, ) pipe.transformer.set_attention_backend("_flash_3") # pipe.load_lora_weights("bdsqlsz/qinglong_DetailedEyes_Z-Image", weight_name="qinglong_detailedeye_z-imageV2(comfy).safetensors", adapter_name="lora") pipe.load_lora_weights("rahul7star/ZImageLora", weight_name="NSFW/doggystyle_pov.safetensors", adapter_name="lora") pipe.set_adapters(["lora",], adapter_weights=[1.]) pipe.fuse_lora(adapter_names=["lora"], lora_scale=0.75) debug_pipeline(pipe) # pipe.unload_lora_weights() pipe.to("cuda") log("āœ… Pipeline built successfully.") LOGS.append(log) except Exception as e: log(f"āŒ Pipeline build failed: {e}") pipe = None log_system_stats("AFTER PIPELINE BUILD") # ----------------------------- # Monkey-patch prepare_latents # ----------------------------- # ----------------------------- # Monkey-patch prepare_latents # ----------------------------- if pipe is not None: original_prepare_latents = pipe.prepare_latents def logged_prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None): result_latents = original_prepare_latents( batch_size, num_channels_latents, height, width, dtype, device, generator, latents ) log_msg = f"šŸ”¹ prepare_latents called | shape={result_latents.shape}, dtype={result_latents.dtype}, device={result_latents.device}" if hasattr(self, "_latents_log"): self._latents_log.append(log_msg) else: self._latents_log = [log_msg] return result_latents pipe.prepare_latents = logged_prepare_latents.__get__(pipe) else: log("āŒ WARNING: Pipe not initialized; skipping prepare_latents patch") # Apply patch pipe.prepare_latents = logged_prepare_latents.__get__(pipe) from PIL import Image import torch # -------------------------- # Helper: Safe latent extractor # -------------------------- def safe_get_latents(pipe, height, width, generator, device, LOGS): try: latents = pipe.prepare_latents( batch_size=1, num_channels=getattr(pipe.unet, "in_channels", 4), height=height, width=width, dtype=torch.float32, device=device, generator=generator ) LOGS.append(f"šŸ”¹ Latents shape: {latents.shape}, dtype: {latents.dtype}, device: {latents.device}") return latents except Exception as e: LOGS.append(f"āš ļø Latent extraction failed: {e}") return torch.randn((1, 4, height // 8, width // 8), generator=generator, device=device) # -------------------------- # Main generation function # -------------------------- @spaces.GPU def generate_image(prompt, height, width, steps, seed, guidance_scale=0.0): LOGS = [] latents = None image = None gallery = [] # placeholder image if all fails placeholder = Image.new("RGB", (width, height), color=(255, 255, 255)) print(prompt) try: generator = torch.Generator(device).manual_seed(int(seed)) # ------------------------------- # Try advanced latent extraction # ------------------------------- try: latents = safe_get_latents(pipe, height, width, generator, device, LOGS) output = pipe( prompt=prompt, height=height, width=width, num_inference_steps=steps, guidance_scale=guidance_scale, generator=generator, latents=latents ) image = output.images[0] gallery = [image] LOGS.extend(getattr(pipe, "_latents_log", [])) LOGS.append("āœ… Advanced latent pipeline succeeded.") except Exception as e: LOGS.append(f"āš ļø Latent mode failed: {e}") LOGS.append("šŸ” Switching to standard pipeline...") try: output = pipe( prompt=prompt, height=height, width=width, num_inference_steps=steps, guidance_scale=guidance_scale, generator=generator, ) image = output.images[0] gallery = [image] LOGS.append("āœ… Standard pipeline succeeded.") except Exception as e2: LOGS.append(f"āŒ Standard pipeline failed: {e2}") image = placeholder gallery = [image] return image, gallery, LOGS except Exception as e: LOGS.append(f"āŒ Total failure: {e}") return placeholder, [placeholder], LOGS @spaces.GPU def generate_image_backup(prompt, height, width, steps, seed, guidance_scale=0.0, return_latents=False): """ Robust dual pipeline: - Advanced latent generation first - Fallback to standard pipeline if latent fails - Always returns final image - Returns gallery (latents or final image) and logs """ LOGS = [] image = None latents = None gallery = [] # Keep a placeholder original image (white) in case everything fails original_image = Image.new("RGB", (width, height), color=(255, 255, 255)) try: generator = torch.Generator(device).manual_seed(int(seed)) # ------------------------------- # Try advanced latent generation # ------------------------------- try: batch_size = 1 num_channels_latents = getattr(pipe.unet, "in_channels", None) if num_channels_latents is None: raise AttributeError("pipe.unet.in_channels not found, fallback to standard pipeline") latents = pipe.prepare_latents( batch_size=batch_size, num_channels=num_channels_latents, height=height, width=width, dtype=torch.float32, device=device, generator=generator ) LOGS.append(f"āœ… Latents prepared: {latents.shape}") output = pipe( prompt=prompt, height=height, width=width, num_inference_steps=steps, guidance_scale=guidance_scale, generator=generator, latents=latents ) image = output.images[0] gallery = [image] if image else [] LOGS.append("āœ… Advanced latent generation succeeded.") # ------------------------------- # Fallback to standard pipeline # ------------------------------- except Exception as e_latent: LOGS.append(f"āš ļø Advanced latent generation failed: {e_latent}") LOGS.append("šŸ” Falling back to standard pipeline...") try: output = pipe( prompt=prompt, height=height, width=width, num_inference_steps=steps, guidance_scale=guidance_scale, generator=generator ) image = output.images[0] gallery = [image] if image else [] LOGS.append("āœ… Standard pipeline generation succeeded.") except Exception as e_standard: LOGS.append(f"āŒ Standard pipeline generation failed: {e_standard}") image = original_image # Always return some image gallery = [image] # ------------------------------- # Return all 3 outputs # ------------------------------- return image, gallery, LOGS except Exception as e: LOGS.append(f"āŒ Inference failed entirely: {e}") return original_image, [original_image], LOGS # ============================================================ # UI # ============================================================ # with gr.Blocks(title="Z-Image- experiment - dont run")as demo: # gr.Markdown("# **šŸš€ do not run Z-Image-Turbo — Final Image & Latents**") # with gr.Row(): # with gr.Column(scale=1): # prompt = gr.Textbox(label="Prompt", value="boat in Ocean") # height = gr.Slider(256, 2048, value=1024, step=8, label="Height") # width = gr.Slider(256, 2048, value=1024, step=8, label="Width") # steps = gr.Slider(1, 50, value=20, step=1, label="Inference Steps") # seed = gr.Number(value=42, label="Seed") # run_btn = gr.Button("Generate Image") # with gr.Column(scale=1): # final_image = gr.Image(label="Final Image") # latent_gallery = gr.Gallery( # label="Latent Steps", # columns=4, # height=256, # preview=True # ) # logs_box = gr.Textbox(label="Logs", lines=15) # run_btn.click( # generate_image, # inputs=[prompt, height, width, steps, seed], # outputs=[final_image, latent_gallery, logs_box] # ) with gr.Blocks(title="Z-Image-Turbo") as demo: with gr.Tabs(): with gr.TabItem("Image & Latents"): with gr.Row(): with gr.Column(scale=1): prompt = gr.Textbox(label="Prompt", value="boat in Ocean") height = gr.Slider(256, 2048, value=1024, step=8, label="Height") width = gr.Slider(256, 2048, value=1024, step=8, label="Width") steps = gr.Slider(1, 50, value=20, step=1, label="Inference Steps") seed = gr.Number(value=42, label="Seed") run_btn = gr.Button("Generate Image") with gr.Column(scale=1): final_image = gr.Image(label="Final Image") latent_gallery = gr.Gallery( label="Latent Steps", columns=4, height=256, preview=True ) with gr.TabItem("Logs"): logs_box = gr.Textbox(label="All Logs", lines=25) # Wire the button AFTER all components exist run_btn.click( generate_image, inputs=[prompt, height, width, steps, seed], outputs=[final_image, latent_gallery, logs_box] ) demo.launch()