SkyNait commited on
Commit
908672e
·
1 Parent(s): 6ab0ae7

fix RabbitMQ

Browse files
__pycache__/inference_svm_model.cpython-310.pyc DELETED
Binary file (1.24 kB)
 
__pycache__/mineru_single.cpython-310.pyc DELETED
Binary file (10.6 kB)
 
__pycache__/table_row_extraction.cpython-310.pyc DELETED
Binary file (10.9 kB)
 
__pycache__/topic_extraction.cpython-310.pyc DELETED
Binary file (23.3 kB)
 
__pycache__/worker.cpython-310.pyc DELETED
Binary file (6.52 kB)
 
worker.py CHANGED
@@ -132,26 +132,33 @@ class RabbitMQWorker:
132
  elif pattern == "topic_extraction":
133
  data = body_dict.get("data")
134
  input_files = data.get("input_files")
135
- logger.info("[Worker %s] Found %d file(s) to process.", thread_id, len(input_files))
136
- if not input_files or not isinstance(input_files, list):
137
- logger.error("[Worker %s] No input files provided for topic extraction.", thread_id)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
138
  ch.basic_ack(delivery_tag=method.delivery_tag)
139
- return
140
- # Use the first file's URL for topic extraction
141
- pdf_url = input_files[0].get("url")
142
- logger.info("[Worker %s] Processing topic extraction for URL: %s", thread_id, pdf_url)
143
- try:
144
- topics_markdown = self.topic_processor.process(pdf_url)
145
- data["topics_markdown"] = topics_markdown
146
- body_dict["pattern"] = "topic_extraction_update_from_gpu_server"
147
- body_dict["data"] = data
148
- if self.publish_message(body_dict, headers):
149
- ch.basic_ack(delivery_tag=method.delivery_tag)
150
- else:
151
- ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
152
- except Exception as e:
153
- logger.error("Error processing topic extraction: %s", e)
154
  ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
 
 
155
 
156
  else:
157
  ch.basic_ack(delivery_tag=method.delivery_tag, requeue=False)
 
132
  elif pattern == "topic_extraction":
133
  data = body_dict.get("data")
134
  input_files = data.get("input_files")
135
+ logger.info("[Worker %s] Found %d file(s) to process for topic extraction.", thread_id, len(input_files))
136
+
137
+ topics_contexts = []
138
+ for file in input_files:
139
+ try:
140
+ pdf_url = file.get("url")
141
+ logger.info("[Worker %s] Processing topic extraction for URL: %s", thread_id, pdf_url)
142
+ result = self.topic_processor.process(pdf_url)
143
+ context = {
144
+ "key": file.get("key", ""),
145
+ "body": result
146
+ }
147
+ topics_contexts.append(context)
148
+ except Exception as e:
149
+ err_str = f"Error processing topic extraction for file {file.get('key', '')}: {e}"
150
+ logger.error(err_str)
151
+ topics_contexts.append({"key": file.get("key", ""), "body": err_str})
152
+ data["topics_markdown"] = topics_contexts
153
+ body_dict["pattern"] = "topic_extraction_update_from_gpu_server"
154
+ body_dict["data"] = data
155
+ if self.publish_message(body_dict, headers):
156
+ logger.info("[Worker %s] Successfully published topic extraction results to ml_server.", thread_id)
157
  ch.basic_ack(delivery_tag=method.delivery_tag)
158
+ else:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159
  ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
160
+
161
+ logger.info("[Worker %s] Contexts: %s", thread_id, contexts)
162
 
163
  else:
164
  ch.basic_ack(delivery_tag=method.delivery_tag, requeue=False)