###################################### version 4 NER change done ####################################################### import spaces import gradio as gr from PIL import Image import numpy as np import cv2 import re def preprocess_image_for_ocr(image): image_rgb = image.convert("RGB") img_np = np.array(image_rgb) gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY) adaptive_threshold = cv2.adaptiveThreshold( gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 85, 11, ) preprocessed_pil = Image.fromarray(adaptive_threshold) return preprocessed_pil import re def extract_medication_lines(text): """ Extracts medication lines robustly: - Matches form as T./TAB./TAB/TABLET/TABLETS, C./CAP./CAP/CAPSULE/CAPSULES, etc. - Floating/slash doses (e.g., 2.5MG, 10/20MG) - Optional second form (prefix/suffix/mid) - Any case """ # Comprehensive form pattern (optional . or plural S) form = r"(T\.?|TAB\.?|TABLET(S)?|C\.?|CAP\.?|CAPSULE(S)?|SYRUP(S)?|SYP|DROP(S)?|INJ\.?|INJECTION(S)?|OINTMENT(S)?|CREAM(S)?|GEL(S)?|PATCH(ES)?|SOL\.?|SOLUTION(S)?|ORAL)" name = r"([A-Z0-9\-/]+(?:\s+[A-Z0-9\-/]+){0,4})" opt_form = fr"(?:\s+{form})?" # allow form at end as well # Dose: decimal numbers, slash combos, unit, or blank opt_dose = r"(?:\s*\d{1,4}(?:\.\d+)?(?:/\d{1,4}(?:\.\d+)?)?\s*(mg|ml|mcg|g|kg|units|iu|%|))?" pattern = re.compile( fr"\b{form}\s+{name}{opt_form}{opt_dose}\b", re.IGNORECASE ) lines = text.split('\n') matches = set() for line in lines: line = line.strip() for m in pattern.finditer(line): out = m.group(0) out = re.sub(r"\s+", " ", out).strip() matches.add(out.upper()) return '\n'.join(matches) def clinical_ner_extract(text, use_gpu=False): """ Uses ClinicalNER for medicine name, then finds form/dose in source sentence. Returns clean combinations: form + entity + dose (no unwanted text). """ # Load models in GPU context if required import torch from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline device = "cuda" if use_gpu and torch.cuda.is_available() else "cpu" tokenizer = AutoTokenizer.from_pretrained("samrawal/bert-base-uncased_clinical-ner") model = AutoModelForTokenClassification.from_pretrained("samrawal/bert-base-uncased_clinical-ner") ner_pipeline = pipeline( "ner", model=model, tokenizer=tokenizer, aggregation_strategy="simple", device=0 if device=="cuda" else -1 ) text_lines = text.split('\n') entities = ner_pipeline(text) meds = [] for ent in entities: if ent["entity_group"] == "treatment": # For each detected medicine entity, scan lines for context entity_name = ent["word"].lower() for line in text_lines: if entity_name in line.lower(): # Find form and dose form_match = re.search(r"(TAB(L?ET)?|CAP(SULE)?|SYRUP|SYP|DROP(S)?|INJ(CTION)?|OINTMENT|CREAM|GEL|PATCH|SOL(UTION)?|ORAL)", line, re.IGNORECASE) dose_match = re.search(r"(\d{1,4} ?(mg|ml|mcg|g|kg|units|IU)|\d{1,2} ?%( ?w\/w| ?w\/v| ?v\/v)?)", line, re.IGNORECASE) tokens = [] if form_match: tokens.append(form_match.group(0).upper()) tokens.append(ent["word"].upper()) if dose_match: tokens.append(dose_match.group(0)) meds.append(" ".join(tokens).strip()) break return '\n'.join(set(meds)) if meds else "None detected" @spaces.GPU def run_ocr_and_extract(image, temperature=0.2, extraction_mode="Regex"): # Load OCR model ONLY in GPU context! import torch from transformers import LightOnOCRForConditionalGeneration, LightOnOCRProcessor device = "cuda" if torch.cuda.is_available() else "cpu" attn = "sdpa" if device == "cuda" else "eager" dtype = torch.bfloat16 if device == "cuda" else torch.float32 ocr_model = LightOnOCRForConditionalGeneration.from_pretrained( "lightonai/LightOnOCR-1B-1025", attn_implementation=attn, torch_dtype=dtype, trust_remote_code=True, ).to(device).eval() processor = LightOnOCRProcessor.from_pretrained( "lightonai/LightOnOCR-1B-1025", trust_remote_code=True, ) processed_img = image # processed_img = preprocess_image_for_ocr(image) chat = [ { "role": "user", "content": [ {"type": "image", "image": processed_img} ], } ] inputs = processor.apply_chat_template( chat, add_generation_prompt=True, tokenize=True, return_dict=True, return_tensors="pt", ) inputs = { k: (v.to(device=device, dtype=dtype) if isinstance(v, torch.Tensor) and v.dtype in [torch.float32, torch.float16, torch.bfloat16] else v.to(device) if isinstance(v, torch.Tensor) else v) for k, v in inputs.items() } generation_kwargs = dict( **inputs, max_new_tokens=2048, temperature=temperature if temperature > 0 else 0.0, use_cache=True, do_sample=temperature > 0, ) with torch.no_grad(): outputs = ocr_model.generate(**generation_kwargs) output_text = processor.decode(outputs[0], skip_special_tokens=True) raw_text = output_text.strip() # Clean medicines using selected extraction method if extraction_mode == "Clinical NER": meds = clinical_ner_extract(raw_text, use_gpu=(device=="cuda")) else: # Regex meds = extract_medication_lines(raw_text) yield meds, raw_text, processed_img def process_input(file_input, temperature, extraction_mode): if file_input is None: yield "Please upload an image/PDF.", "", None return image_to_process = Image.open(file_input) for meds_out, raw_text, processed_img in run_ocr_and_extract(image_to_process, temperature, extraction_mode): yield meds_out, raw_text, processed_img with gr.Blocks(title="💊 Medicine Extraction", theme=gr.themes.Soft()) as demo: file_input = gr.File( label="Upload Image (or PDF first page for OCR)", file_types=[".png", ".jpg", ".jpeg"], # PDF support: requires render as image first type="filepath" ) temperature = gr.Slider( minimum=0.0, maximum=1.0, value=0.2, step=0.05, label="Temperature" ) extraction_mode = gr.Radio( choices=["Regex", "Clinical NER"], value="Regex", label="Extraction Method" ) medicines_output = gr.Textbox( label="💊 Cleaned Medicines", lines=10, interactive=False, show_copy_button=True ) raw_output = gr.Textbox( label="Raw OCR Output", lines=10, interactive=False, show_copy_button=True ) rendered_image = gr.Image( label="Processed Image (Thresholded for OCR)", interactive=False ) submit_btn = gr.Button("Extract Medicines", variant="primary") submit_btn.click( fn=process_input, inputs=[file_input, temperature, extraction_mode], outputs=[medicines_output, raw_output, rendered_image] ) if __name__ == "__main__": demo.launch() ##################################### version 3 NER modification to be done ############################################################ # import spaces # import gradio as gr # from PIL import Image # import numpy as np # import cv2 # import re # import re # def extract_medication_lines(text): # """ # Extracts medication/drug lines from text using flexible regex. # Supports tablet, capsule, syrup, drops, injection, ointment, cream, gel, patch, solution, etc. # Matches dose like '1/2/10/250/500 mg/ml/mcg/g/kg' or concentration '1%/2%/0.2%/0.5%/10%' w/w, w/v, v/v. # """ # form_pattern = r"(TAB(L?ET)?|CAP(SULE)?|SYRUP|SYP|DROP(S)?|INJ(CTION)?|OINTMENT|CREAM|GEL|PATCH|SOL(UTION)?|ORAL)" # # Drug name: starts with a word (alphanumeric, maybe a hyphen), up to 4 words (spaces, hyphens or slash) # name_pattern = r"([A-Z0-9\-/]+(?:\s+[A-Z0-9\-/]+){0,4})" # # Dose: e.g., 250mg, 10ml, 0.5%, 10 mcg, 150mcg, etc. and concentration/w/w/w/v/etc. # dose_pattern = r"(\d{1,4}\s*(mg|ml|mcg|g|kg|units|IU)|\d{1,2}\s*%(\s*w\/w|\s*w\/v|\s*v\/v)?)" # # concentration can appear for creams/gels: e.g. "1% w/w", "2%" # # Main pattern: will attempt to capture form anywhere, then name, then dose/concentration # main_pattern = ( # r"(?:" + form_pattern + r"\s+)?" + # Form prefix optional # name_pattern + r"\s*" + # r"(?:" + form_pattern + r"\s*)?" + # Form mid/suffix optional # r"(?:" + dose_pattern + r")" # Dose/concentration required # ) # med_regex = re.compile(main_pattern, re.IGNORECASE) # meds = [] # for line in text.split('\n'): # line_stripped = line.strip() # match = med_regex.search(line_stripped) # if match: # meds.append(line_stripped) # return '\n'.join(meds) # ########################### added NER modification to be done ################################### # def get_medicine_context(entities, text_lines): # """ # For each medicine entity detected by NER, find its form and dose context from its source line. # Returns list of strings like 'TAB ALDACTONE 25MG'. # """ # output = [] # for ent in entities: # if ent["entity_group"] == "treatment": # # Find line containing the entity's word (robust for multiline output) # for line in text_lines: # if ent["word"].lower() in line.lower(): # # Search line for context # match = re.search(r"((TAB(L?ET)?|CAP(SULE)?|SYRUP|SYP|DROP(S)?|INJ(CTION)?|OINTMENT|CREAM|GEL|PATCH|SOL(UTION)?|ORAL).{0,40})", line, re.IGNORECASE) # dose = re.search(r"\d{1,4}\s*(mg|ml|mcg|g|kg|units|IU)|\d{1,2}\s*%(\s*w\/w|\s*w\/v|\s*v\/v)?", line, re.IGNORECASE) # info = [] # if match: # info.append(match.group(0).strip()) # else: # info.append(ent["word"].strip()) # if dose: # info.append(dose.group(0).strip()) # output.append(" ".join(info)) # break # return "\n".join(set(output)) if output else "None detected" # ################################ # def preprocess_image_for_ocr(image): # image_rgb = image.convert("RGB") # img_np = np.array(image_rgb) # gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY) # adaptive_threshold = cv2.adaptiveThreshold( # gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 85,35, # ) # preprocessed_pil = Image.fromarray(adaptive_threshold) # return preprocessed_pil # @spaces.GPU # def extract_text_from_image(image, temperature=0.2, use_ner=False): # # Import and load within GPU context! # import torch # from transformers import ( # LightOnOCRForConditionalGeneration, # LightOnOCRProcessor, # AutoTokenizer, AutoModelForTokenClassification, pipeline, # ) # device = "cuda" if torch.cuda.is_available() else "cpu" # attn_implementation = "sdpa" if device == "cuda" else "eager" # dtype = torch.bfloat16 if device == "cuda" else torch.float32 # ocr_model = LightOnOCRForConditionalGeneration.from_pretrained( # "lightonai/LightOnOCR-1B-1025", # attn_implementation=attn_implementation, # torch_dtype=dtype, # trust_remote_code=True, # ).to(device).eval() # processor = LightOnOCRProcessor.from_pretrained( # "lightonai/LightOnOCR-1B-1025", # trust_remote_code=True, # ) # # NER only if requested # if use_ner: # ner_tokenizer = AutoTokenizer.from_pretrained("samrawal/bert-base-uncased_clinical-ner") # ner_model = AutoModelForTokenClassification.from_pretrained("samrawal/bert-base-uncased_clinical-ner") # ner_pipeline = pipeline( # "ner", model=ner_model, tokenizer=ner_tokenizer, aggregation_strategy="simple" # ) # processed_img = preprocess_image_for_ocr(image) # chat = [ # { # "role": "user", # "content": [ # {"type": "image", "image": processed_img} # ], # } # ] # inputs = processor.apply_chat_template( # chat, # add_generation_prompt=True, # tokenize=True, # return_dict=True, # return_tensors="pt", # ) # inputs = { # k: (v.to(device=device, dtype=dtype) # if isinstance(v, torch.Tensor) and v.dtype in [torch.float32, torch.float16, torch.bfloat16] # else v.to(device) # if isinstance(v, torch.Tensor) # else v) # for k, v in inputs.items() # } # generation_kwargs = dict( # **inputs, # max_new_tokens=2048, # temperature=temperature if temperature > 0 else 0.0, # use_cache=True, # do_sample=temperature > 0, # ) # with torch.no_grad(): # outputs = ocr_model.generate(**generation_kwargs) # output_text = processor.decode(outputs[0], skip_special_tokens=True) # cleaned_text = output_text.strip() # # Extract medicines # if use_ner: # entities = ner_pipeline(cleaned_text) # meds = [] # for ent in entities: # if ent["entity_group"] == "treatment": # word = ent["word"] # if word.startswith("##") and meds: # meds[-1] += word[2:] # else: # meds.append(word) # result_meds = ", ".join(set(meds)) if meds else "None detected" # else: # result_meds = extract_medication_lines(cleaned_text) or "None detected" # yield result_meds, processed_img # Only medicines and processed image # def process_input(file_input, temperature, page_num, extraction_mode): # if file_input is None: # yield "Please upload an image or PDF first.", None # return # image_to_process = Image.open(file_input) if not str(file_input).lower().endswith(".pdf") else None # simplify to image only # use_ner = extraction_mode == "Clinical NER" # for meds_out, processed_img in extract_text_from_image(image_to_process, temperature, use_ner): # yield meds_out, processed_img # with gr.Blocks(title="💊 Medicine Extraction", theme=gr.themes.Soft()) as demo: # file_input = gr.File( # label="🖼️ Upload Image", # file_types=[".png", ".jpg", ".jpeg"], # type="filepath" # ) # temperature = gr.Slider( # minimum=0.0, # maximum=1.0, # value=0.2, # step=0.05, # label="Temperature" # ) # extraction_mode = gr.Radio( # choices=["Clinical NER", "Regex"], # value="Regex", # label="Extraction Method", # info="Clinical NER uses ML, Regex uses rules" # ) # medicines_output = gr.Textbox( # label="💊 Extracted Medicines/Drugs", # placeholder="Medicine/drug names will appear here...", # lines=2, # max_lines=10, # interactive=False, # show_copy_button=True # ) # rendered_image = gr.Image( # label="Processed Image (Adaptive Thresholded for OCR)", # interactive=False # ) # submit_btn = gr.Button("Extract Medicines", variant="primary") # page_slider = gr.Slider(minimum=1, maximum=20, value=1, step=1, label="Page Number") # submit_btn.click( # fn=process_input, # inputs=[file_input, temperature, page_slider, extraction_mode], # outputs=[medicines_output, rendered_image] # ) # if __name__ == "__main__": # demo.launch() #################################################### running code only NER ####################### #!/usr/bin/env python3 # import subprocess # import sys # import spaces # import torch # import gradio as gr # from PIL import Image # import numpy as np # import cv2 # import pypdfium2 as pdfium # from transformers import ( # LightOnOCRForConditionalGeneration, # LightOnOCRProcessor, # ) # from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline # device = "cuda" if torch.cuda.is_available() else "cpu" # if device == "cuda": # attn_implementation = "sdpa" # dtype = torch.bfloat16 # else: # attn_implementation = "eager" # dtype = torch.float32 # ocr_model = LightOnOCRForConditionalGeneration.from_pretrained( # "lightonai/LightOnOCR-1B-1025", # attn_implementation=attn_implementation, # torch_dtype=dtype, # trust_remote_code=True, # ).to(device).eval() # processor = LightOnOCRProcessor.from_pretrained( # "lightonai/LightOnOCR-1B-1025", # trust_remote_code=True, # ) # ner_tokenizer = AutoTokenizer.from_pretrained("samrawal/bert-base-uncased_clinical-ner") # ner_model = AutoModelForTokenClassification.from_pretrained("samrawal/bert-base-uncased_clinical-ner") # ner_pipeline = pipeline( # "ner", # model=ner_model, # tokenizer=ner_tokenizer, # aggregation_strategy="simple", # ) # def render_pdf_page(page, max_resolution=1540, scale=2.77): # width, height = page.get_size() # pixel_width = width * scale # pixel_height = height * scale # resize_factor = min(1, max_resolution / pixel_width, max_resolution / pixel_height) # target_scale = scale * resize_factor # return page.render(scale=target_scale, rev_byteorder=True).to_pil() # def process_pdf(pdf_path, page_num=1): # pdf = pdfium.PdfDocument(pdf_path) # total_pages = len(pdf) # page_idx = min(max(int(page_num) - 1, 0), total_pages - 1) # page = pdf[page_idx] # img = render_pdf_page(page) # pdf.close() # return img, total_pages, page_idx + 1 # def clean_output_text(text): # markers_to_remove = ["system", "user", "assistant"] # lines = text.split('\n') # cleaned_lines = [] # for line in lines: # stripped = line.strip() # if stripped.lower() not in markers_to_remove: # cleaned_lines.append(line) # cleaned = '\n'.join(cleaned_lines).strip() # if "assistant" in text.lower(): # parts = text.split("assistant", 1) # if len(parts) > 1: # cleaned = parts[1].strip() # return cleaned # def preprocess_image_for_ocr(image): # """Convert PIL.Image to adaptive thresholded image for OCR.""" # image_rgb = image.convert("RGB") # img_np = np.array(image_rgb) # gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY) # adaptive_threshold = cv2.adaptiveThreshold( # gray, # 255, # cv2.ADAPTIVE_THRESH_GAUSSIAN_C, # cv2.THRESH_BINARY, # 85, # 35, # ) # preprocessed_pil = Image.fromarray(adaptive_threshold) # return preprocessed_pil # @spaces.GPU # def extract_text_from_image(image, temperature=0.2): # """OCR + clinical NER, with preprocessing.""" # processed_img = preprocess_image_for_ocr(image) # chat = [ # { # "role": "user", # "content": [ # {"type": "image", "image": processed_img} # ], # } # ] # inputs = processor.apply_chat_template( # chat, # add_generation_prompt=True, # tokenize=True, # return_dict=True, # return_tensors="pt", # ) # # Move inputs to device # inputs = { # k: ( # v.to(device=device, dtype=dtype) # if isinstance(v, torch.Tensor) and v.dtype in [torch.float32, torch.float16, torch.bfloat16] # else v.to(device) # if isinstance(v, torch.Tensor) # else v # ) # for k, v in inputs.items() # } # generation_kwargs = dict( # **inputs, # max_new_tokens=2048, # temperature=temperature if temperature > 0 else 0.0, # use_cache=True, # do_sample=temperature > 0, # ) # with torch.no_grad(): # outputs = ocr_model.generate(**generation_kwargs) # output_text = processor.decode(outputs[0], skip_special_tokens=True) # cleaned_text = clean_output_text(output_text) # entities = ner_pipeline(cleaned_text) # medications = [] # for ent in entities: # if ent["entity_group"] == "treatment": # word = ent["word"] # if word.startswith("##") and medications: # medications[-1] += word[2:] # else: # medications.append(word) # medications_str = ", ".join(set(medications)) if medications else "None detected" # yield cleaned_text, medications_str, output_text, processed_img # def process_input(file_input, temperature, page_num): # if file_input is None: # yield "Please upload an image or PDF first.", "", "", "", "No file!", 1 # return # image_to_process = None # page_info = "" # slider_value = page_num # file_path = file_input if isinstance(file_input, str) else file_input.name # if file_path.lower().endswith(".pdf"): # try: # image_to_process, total_pages, actual_page = process_pdf(file_path, int(page_num)) # page_info = f"Processing page {actual_page} of {total_pages}" # slider_value = actual_page # except Exception as e: # msg = f"Error processing PDF: {str(e)}" # yield msg, "", msg, "", None, slider_value # return # else: # try: # image_to_process = Image.open(file_path) # page_info = "Processing image" # except Exception as e: # msg = f"Error opening image: {str(e)}" # yield msg, "", msg, "", None, slider_value # return # try: # for cleaned_text, medications, raw_md, processed_img in extract_text_from_image( # image_to_process, temperature # ): # yield cleaned_text, medications, raw_md, page_info, processed_img, slider_value # except Exception as e: # error_msg = f"Error during text extraction: {str(e)}" # yield error_msg, "", error_msg, page_info, image_to_process, slider_value # def update_slider(file_input): # if file_input is None: # return gr.update(maximum=20, value=1) # file_path = file_input if isinstance(file_input, str) else file_input.name # if file_path.lower().endswith('.pdf'): # try: # pdf = pdfium.PdfDocument(file_path) # total_pages = len(pdf) # pdf.close() # return gr.update(maximum=total_pages, value=1) # except: # return gr.update(maximum=20, value=1) # else: # return gr.update(maximum=1, value=1) # with gr.Blocks(title="💊 Medicine Extraction", theme=gr.themes.Soft()) as demo: # file_input = gr.File( # label="🖼️ Upload Image or PDF", # file_types=[".pdf", ".png", ".jpg", ".jpeg"], # type="filepath" # ) # temperature = gr.Slider( # minimum=0.0, # maximum=1.0, # value=0.2, # step=0.05, # label="Temperature" # ) # page_slider = gr.Slider( # minimum=1, maximum=20, value=1, step=1, # label="Page Number (PDF only)", # interactive=True # ) # output_text = gr.Textbox( # label="📝 Extracted Text", # lines=4, # max_lines=10, # interactive=False, # show_copy_button=True # ) # medicines_output = gr.Textbox( # label="💊 Extracted Medicines/Drugs", # placeholder="Medicine/drug names will appear here...", # lines=2, # max_lines=5, # interactive=False, # show_copy_button=True # ) # raw_output = gr.Textbox( # label="Raw Model Output", # lines=2, # max_lines=5, # interactive=False # ) # page_info = gr.Markdown( # value="" # Info of PDF page # ) # rendered_image = gr.Image( # label="Processed Image (Thresholded for OCR)", # interactive=False # ) # num_pages = gr.Number( # value=1, label="Current Page (slider)", visible=False # ) # submit_btn = gr.Button("Extract Medicines", variant="primary") # submit_btn.click( # fn=process_input, # inputs=[file_input, temperature, page_slider], # outputs=[output_text, medicines_output, raw_output, page_info, rendered_image, num_pages] # ) # file_input.change( # fn=update_slider, # inputs=[file_input], # outputs=[page_slider] # ) # if __name__ == "__main__": # demo.launch() ########################################## ############################################################# # Create Gradio interface # with gr.Blocks(title="📖 Image/PDF OCR with LightOnOCR", theme=gr.themes.Soft()) as demo: # gr.Markdown(f""" # # 📖 Image/PDF to Text Extraction with LightOnOCR # **💡 How to use:** # 1. Upload an image or PDF # 2. For PDFs: select which page to extract (1-20) # 3. Adjust temperature if needed # 4. Click "Extract Text" # **Note:** The Markdown rendering for tables may not always be perfect. Check the raw output for complex tables! # **Model:** LightOnOCR-1B-1025 by LightOn AI # **Device:** {device.upper()} # **Attention:** {attn_implementation} # """) # with gr.Row(): # with gr.Column(scale=1): # file_input = gr.File( # label="🖼️ Upload Image or PDF", # file_types=[".pdf", ".png", ".jpg", ".jpeg"], # type="filepath" # ) # rendered_image = gr.Image( # label="📄 Preview", # type="pil", # height=400, # interactive=False # ) # num_pages = gr.Slider( # minimum=1, # maximum=20, # value=1, # step=1, # label="PDF: Page Number", # info="Select which page to extract" # ) # page_info = gr.Textbox( # label="Processing Info", # value="", # interactive=False # ) # temperature = gr.Slider( # minimum=0.0, # maximum=1.0, # value=0.2, # step=0.05, # label="Temperature", # info="0.0 = deterministic, Higher = more varied" # ) # submit_btn = gr.Button("Extract Text", variant="primary") # clear_btn = gr.Button("Clear", variant="secondary") # with gr.Column(scale=2): # output_text = gr.Markdown( # label="📄 Extracted Text (Rendered)", # value="*Extracted text will appear here...*" # ) # medications_output = gr.Textbox( # label="💊 Extracted Medicines/Drugs", # placeholder="Medicine/drug names will appear here...", # lines=2, # max_lines=5, # interactive=False, # show_copy_button=True # ) # with gr.Row(): # with gr.Column(): # raw_output = gr.Textbox( # label="Raw Markdown Output", # placeholder="Raw text will appear here...", # lines=20, # max_lines=30, # show_copy_button=True # ) # # Event handlers # submit_btn.click( # fn=process_input, # inputs=[file_input, temperature, num_pages, ], # outputs=[output_text, medications_output, raw_output, page_info, rendered_image, num_pages] # ) #################################### old code to be checked ############################################# # import sys # import threading # import spaces # import torch # import gradio as gr # from PIL import Image # from io import BytesIO # import pypdfium2 as pdfium # from transformers import ( # LightOnOCRForConditionalGeneration, # LightOnOCRProcessor, # TextIteratorStreamer, # ) # # ---- CLINICAL NER IMPORTS ---- # import spacy # device = "cuda" if torch.cuda.is_available() else "cpu" # # Choose best attention implementation based on device # if device == "cuda": # attn_implementation = "sdpa" # dtype = torch.bfloat16 # print("Using sdpa for GPU") # else: # attn_implementation = "eager" # Best for CPU # dtype = torch.float32 # print("Using eager attention for CPU") # # Initialize the LightOnOCR model and processor # print(f"Loading model on {device} with {attn_implementation} attention...") # model = LightOnOCRForConditionalGeneration.from_pretrained( # "lightonai/LightOnOCR-1B-1025", # attn_implementation=attn_implementation, # torch_dtype=dtype, # trust_remote_code=True # ).to(device).eval() # processor = LightOnOCRProcessor.from_pretrained( # "lightonai/LightOnOCR-1B-1025", # trust_remote_code=True # ) # print("Model loaded successfully!") # # ---- LOAD CLINICAL NER MODEL (BC5CDR) ---- # print("Loading clinical NER model (bc5cdr)...") # nlp_ner = spacy.load("en_ner_bc5cdr_md") # print("Clinical NER loaded.") # def render_pdf_page(page, max_resolution=1540, scale=2.77): # """Render a PDF page to PIL Image.""" # width, height = page.get_size() # pixel_width = width * scale # pixel_height = height * scale # resize_factor = min(1, max_resolution / pixel_width, max_resolution / pixel_height) # target_scale = scale * resize_factor # return page.render(scale=target_scale, rev_byteorder=True).to_pil() # def process_pdf(pdf_path, page_num=1): # """Extract a specific page from PDF.""" # pdf = pdfium.PdfDocument(pdf_path) # total_pages = len(pdf) # page_idx = min(max(int(page_num) - 1, 0), total_pages - 1) # page = pdf[page_idx] # img = render_pdf_page(page) # pdf.close() # return img, total_pages, page_idx + 1 # def clean_output_text(text): # """Remove chat template artifacts from output.""" # markers_to_remove = ["system", "user", "assistant"] # lines = text.split('\n') # cleaned_lines = [] # for line in lines: # stripped = line.strip() # # Skip lines that are just template markers # if stripped.lower() not in markers_to_remove: # cleaned_lines.append(line) # cleaned = '\n'.join(cleaned_lines).strip() # if "assistant" in text.lower(): # parts = text.split("assistant", 1) # if len(parts) > 1: # cleaned = parts[1].strip() # return cleaned # def extract_medication_names(text): # """Extract medication names using clinical NER (spacy: bc5cdr CHEMICAL).""" # doc = nlp_ner(text) # meds = [ent.text for ent in doc.ents if ent.label_ == "CHEMICAL"] # meds_unique = list(dict.fromkeys(meds)) # return meds_unique # @spaces.GPU # def extract_text_from_image(image, temperature=0.2, stream=False): # """Extract text from image using LightOnOCR model.""" # chat = [ # { # "role": "user", # "content": [ # {"type": "image", "url": image}, # ], # } # ] # inputs = processor.apply_chat_template( # chat, # add_generation_prompt=True, # tokenize=True, # return_dict=True, # return_tensors="pt" # ) # inputs = { # k: v.to(device=device, dtype=dtype) if isinstance(v, torch.Tensor) and v.dtype in [torch.float32, torch.float16, torch.bfloat16] # else v.to(device) if isinstance(v, torch.Tensor) # else v # for k, v in inputs.items() # } # generation_kwargs = dict( # **inputs, # max_new_tokens=2048, # temperature=temperature if temperature > 0 else 0.0, # use_cache=True, # do_sample=temperature > 0, # ) # if stream: # # Streaming generation # streamer = TextIteratorStreamer( # processor.tokenizer, # skip_prompt=True, # skip_special_tokens=True # ) # generation_kwargs["streamer"] = streamer # thread = threading.Thread(target=model.generate, kwargs=generation_kwargs) # thread.start() # full_text = "" # for new_text in streamer: # full_text += new_text # cleaned_text = clean_output_text(full_text) # yield cleaned_text # thread.join() # else: # # Non-streaming generation # with torch.no_grad(): # outputs = model.generate(**generation_kwargs) # output_text = processor.decode(outputs[0], skip_special_tokens=True) # cleaned_text = clean_output_text(output_text) # yield cleaned_text # def process_input(file_input, temperature, page_num, enable_streaming): # """Process uploaded file (image or PDF) and extract medication names via OCR+NER.""" # if file_input is None: # yield "Please upload an image or PDF first.", "", "", None, gr.update() # return # image_to_process = None # page_info = "" # file_path = file_input if isinstance(file_input, str) else file_input.name # # Handle PDF files # if file_path.lower().endswith('.pdf'): # try: # image_to_process, total_pages, actual_page = process_pdf(file_path, int(page_num)) # page_info = f"Processing page {actual_page} of {total_pages}" # except Exception as e: # yield f"Error processing PDF: {str(e)}", "", "", None, gr.update() # return # # Handle image files # else: # try: # image_to_process = Image.open(file_path) # page_info = "Processing image" # except Exception as e: # yield f"Error opening image: {str(e)}", "", "", None, gr.update() # return # try: # for extracted_text in extract_text_from_image(image_to_process, temperature, stream=enable_streaming): # meds = extract_medication_names(extracted_text) # meds_str = "\n".join(meds) if meds else "No medications found." # yield meds_str, meds_str, page_info, image_to_process, gr.update() # except Exception as e: # error_msg = f"Error during text extraction: {str(e)}" # yield error_msg, error_msg, page_info, image_to_process, gr.update() # def update_slider(file_input): # """Update page slider based on PDF page count.""" # if file_input is None: # return gr.update(maximum=20, value=1) # file_path = file_input if isinstance(file_input, str) else file_input.name # if file_path.lower().endswith('.pdf'): # try: # pdf = pdfium.PdfDocument(file_path) # total_pages = len(pdf) # pdf.close() # return gr.update(maximum=total_pages, value=1) # except: # return gr.update(maximum=20, value=1) # else: # return gr.update(maximum=1, value=1) # # ----- GRADIO UI ----- # with gr.Blocks(title="📖 Image/PDF OCR + Clinical NER", theme=gr.themes.Soft()) as demo: # gr.Markdown(f""" # # 📖 Medication Extraction from Image/PDF with LightOnOCR + Clinical NER # **💡 How to use:** # 1. Upload an image or PDF # 2. For PDFs: select which page to extract # 3. Adjust temperature if needed # 4. Click "Extract Medications" # **Output:** Only medication names found in text (via NER) # **Model:** LightOnOCR-1B-1025 by LightOn AI # **Device:** {device.upper()} # **Attention:** {attn_implementation} # """) # with gr.Row(): # with gr.Column(scale=1): # file_input = gr.File( # label="🖼️ Upload Image or PDF", # file_types=[".pdf", ".png", ".jpg", ".jpeg"], # type="filepath" # ) # rendered_image = gr.Image( # label="📄 Preview", # type="pil", # height=400, # interactive=False # ) # num_pages = gr.Slider( # minimum=1, # maximum=20, # value=1, # step=1, # label="PDF: Page Number", # info="Select which page to extract" # ) # page_info = gr.Textbox( # label="Processing Info", # value="", # interactive=False # ) # temperature = gr.Slider( # minimum=0.0, # maximum=1.0, # value=0.2, # step=0.05, # label="Temperature", # info="0.0 = deterministic, Higher = more varied" # ) # enable_streaming = gr.Checkbox( # label="Enable Streaming", # value=True, # info="Show text progressively as it's generated" # ) # submit_btn = gr.Button("Extract Medications", variant="primary") # clear_btn = gr.Button("Clear", variant="secondary") # with gr.Column(scale=2): # output_text = gr.Markdown( # label="🩺 Extracted Medication Names", # value="*Medication names will appear here...*" # ) # with gr.Row(): # with gr.Column(): # raw_output = gr.Textbox( # label="Extracted Medication Names (Raw)", # placeholder="Medication list will appear here...", # lines=20, # max_lines=30, # show_copy_button=True # ) # # Event handlers # submit_btn.click( # fn=process_input, # inputs=[file_input, temperature, num_pages, enable_streaming], # outputs=[output_text, raw_output, page_info, rendered_image, num_pages] # ) # file_input.change( # fn=update_slider, # inputs=[file_input], # outputs=[num_pages] # ) # clear_btn.click( # fn=lambda: (None, "*Medication names will appear here...*", "", "", None, 1), # outputs=[file_input, output_text, raw_output, page_info, rendered_image, num_pages] # ) # if __name__ == "__main__": # demo.launch()