lucweber's picture
Upload difficulty scorer model
d4c0208 verified
raw
history blame
4.71 kB
import os
from typing import Optional
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
import torch.nn as nn
# Define a custom model that wraps a causal LM and adds a regression head
class CausalLMForRegression(nn.Module):
def __init__(self, model_name):
super().__init__()
# Load the causal LM with hidden states enabled
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
output_hidden_states=True
)
self.base_model = "Qwen/Qwen3-8B" # WARNING: USED FOR GETTING TOKENIZER. THIS IS HARDCODED FOR NOW!!
# Using pooled hidden state to a single scalar
self.regression_head = nn.Linear(self.model.config.hidden_size, 1)
try:
regression_head_path = os.path.join(model_name, "regression_head.bin")
state = torch.load(regression_head_path, map_location="cpu")
self.regression_head.load_state_dict(state)
except FileNotFoundError:
print(f"No regression head found. Initializing with random weights!")
self._keys_to_ignore_on_save = []
def forward(self, input_ids, attention_mask=None, labels=None):
# Flatten extra dimensions if present
if input_ids.dim() == 3:
# e.g. from (accum_steps, batch_size, seq_length) to (accum_steps * batch_size, seq_length)
input_ids = input_ids.view(-1, input_ids.size(-1))
if attention_mask is not None and attention_mask.dim() == 3:
attention_mask = attention_mask.view(-1, attention_mask.size(-1))
outputs = self.model(input_ids, attention_mask=attention_mask)
hidden_states = outputs.hidden_states[-1] # Now should have shape: (batch, seq_length, hidden_size)
# Mean-pooling over non-padding tokens
if attention_mask is not None:
mask = attention_mask.unsqueeze(-1).expand_as(hidden_states).to(hidden_states.dtype)
hidden_sum = torch.sum(hidden_states * mask, dim=1)
lengths = mask.sum(dim=1)
pooled = hidden_sum / lengths
else:
pooled = hidden_states.mean(dim=1)
logits = self.regression_head(pooled).squeeze(-1)
loss = None
if labels is not None:
loss_fn = nn.HuberLoss() #nn.MSELoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
def get_input_embeddings(self):
# Delegate to the underlying causal LM's get_input_embeddings method.
return self.model.get_input_embeddings()
def save_pretrained(self, output_dir, safe_serialization=False):
os.makedirs(output_dir, exist_ok=True)
# Ensure we are saving the entire model properly
model_state_dict = self.model.state_dict()
for key, value in model_state_dict.items():
if value.shape[0] == 0:
print(f"Warning: Tensor {key} has shape {value.shape}, which may be problematic.")
# Save model with proper weight tie handling
self.model.save_pretrained(output_dir, safe_serialization=False)
torch.save(self.regression_head.state_dict(), os.path.join(output_dir, "regression_head.bin"))
def get_tokenizer(self):
try:
tokenizer = AutoTokenizer.from_pretrained(self.model.name_or_path)
print(f"Loaded tokenizer from {self.model.name_or_path}")
except:
tokenizer = AutoTokenizer.from_pretrained(self.base_model)
print(f"Loaded tokenizer from {self.base_model}")
return tokenizer
@classmethod
def from_pretrained(cls, output_dir):
from_local = os.path.exists(output_dir)
loading_kwargs = {"use_safetensors": False} if from_local else {}
model = AutoModelForCausalLM.from_pretrained(output_dir, **loading_kwargs)
# Explicitly enable `output_hidden_states` after loading
model.config.output_hidden_states = True
# Create an uninitialized instance of CausalLMForRegression
instance = cls.__new__(cls)
nn.Module.__init__(instance)
instance._keys_to_ignore_on_save = []
instance.model = model
# Load the regression head separately
instance.regression_head = nn.Linear(model.config.hidden_size, 1)
try:
regression_head_path = os.path.join(output_dir, "regression_head.bin")
state = torch.load(regression_head_path, map_location="cpu")
instance.regression_head.load_state_dict(state)
except FileNotFoundError:
print(f"No regression head found. Initializing with random weights!")
return instance