|
|
import os |
|
|
from typing import Optional |
|
|
from transformers import AutoModelForCausalLM, AutoTokenizer |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
|
|
|
|
|
|
|
|
|
class CausalLMForRegression(nn.Module): |
|
|
def __init__(self, model_name): |
|
|
super().__init__() |
|
|
|
|
|
self.model = AutoModelForCausalLM.from_pretrained( |
|
|
model_name, |
|
|
output_hidden_states=True |
|
|
) |
|
|
self.base_model = "Qwen/Qwen3-8B" |
|
|
|
|
|
self.regression_head = nn.Linear(self.model.config.hidden_size, 1) |
|
|
try: |
|
|
regression_head_path = os.path.join(model_name, "regression_head.bin") |
|
|
state = torch.load(regression_head_path, map_location="cpu") |
|
|
self.regression_head.load_state_dict(state) |
|
|
except FileNotFoundError: |
|
|
print(f"No regression head found. Initializing with random weights!") |
|
|
self._keys_to_ignore_on_save = [] |
|
|
|
|
|
def forward(self, input_ids, attention_mask=None, labels=None): |
|
|
|
|
|
if input_ids.dim() == 3: |
|
|
|
|
|
input_ids = input_ids.view(-1, input_ids.size(-1)) |
|
|
if attention_mask is not None and attention_mask.dim() == 3: |
|
|
attention_mask = attention_mask.view(-1, attention_mask.size(-1)) |
|
|
|
|
|
outputs = self.model(input_ids, attention_mask=attention_mask) |
|
|
hidden_states = outputs.hidden_states[-1] |
|
|
|
|
|
|
|
|
if attention_mask is not None: |
|
|
mask = attention_mask.unsqueeze(-1).expand_as(hidden_states).to(hidden_states.dtype) |
|
|
hidden_sum = torch.sum(hidden_states * mask, dim=1) |
|
|
lengths = mask.sum(dim=1) |
|
|
pooled = hidden_sum / lengths |
|
|
else: |
|
|
pooled = hidden_states.mean(dim=1) |
|
|
|
|
|
logits = self.regression_head(pooled).squeeze(-1) |
|
|
|
|
|
loss = None |
|
|
if labels is not None: |
|
|
loss_fn = nn.HuberLoss() |
|
|
loss = loss_fn(logits, labels) |
|
|
|
|
|
return {"loss": loss, "logits": logits} |
|
|
|
|
|
def get_input_embeddings(self): |
|
|
|
|
|
return self.model.get_input_embeddings() |
|
|
|
|
|
def save_pretrained(self, output_dir, safe_serialization=False): |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
model_state_dict = self.model.state_dict() |
|
|
for key, value in model_state_dict.items(): |
|
|
if value.shape[0] == 0: |
|
|
print(f"Warning: Tensor {key} has shape {value.shape}, which may be problematic.") |
|
|
|
|
|
|
|
|
self.model.save_pretrained(output_dir, safe_serialization=False) |
|
|
torch.save(self.regression_head.state_dict(), os.path.join(output_dir, "regression_head.bin")) |
|
|
|
|
|
|
|
|
def get_tokenizer(self): |
|
|
try: |
|
|
tokenizer = AutoTokenizer.from_pretrained(self.model.name_or_path) |
|
|
print(f"Loaded tokenizer from {self.model.name_or_path}") |
|
|
except: |
|
|
tokenizer = AutoTokenizer.from_pretrained(self.base_model) |
|
|
print(f"Loaded tokenizer from {self.base_model}") |
|
|
return tokenizer |
|
|
|
|
|
@classmethod |
|
|
def from_pretrained(cls, output_dir): |
|
|
from_local = os.path.exists(output_dir) |
|
|
loading_kwargs = {"use_safetensors": False} if from_local else {} |
|
|
|
|
|
model = AutoModelForCausalLM.from_pretrained(output_dir, **loading_kwargs) |
|
|
|
|
|
|
|
|
model.config.output_hidden_states = True |
|
|
|
|
|
|
|
|
instance = cls.__new__(cls) |
|
|
nn.Module.__init__(instance) |
|
|
instance._keys_to_ignore_on_save = [] |
|
|
instance.model = model |
|
|
|
|
|
|
|
|
instance.regression_head = nn.Linear(model.config.hidden_size, 1) |
|
|
try: |
|
|
regression_head_path = os.path.join(output_dir, "regression_head.bin") |
|
|
state = torch.load(regression_head_path, map_location="cpu") |
|
|
instance.regression_head.load_state_dict(state) |
|
|
except FileNotFoundError: |
|
|
print(f"No regression head found. Initializing with random weights!") |
|
|
return instance |