tox21_grover_classifier / task /run_evaluation.py
hiitsmeme
added grover code, hf api files
f986893
"""
The evaluation function.
"""
from argparse import Namespace
from logging import Logger
from typing import List
import numpy as np
import torch
import torch.utils.data.distributed
from grover.data.scaler import StandardScaler
from grover.util.utils import get_class_sizes, get_data, split_data, get_task_names, get_loss_func
from grover.util.utils import load_checkpoint
from task.predict import evaluate_predictions
from grover.util.metrics import get_metric_func
from grover.util.nn_utils import param_count
from task.predict import predict
def run_evaluation(args: Namespace, logger: Logger = None) -> List[float]:
"""
Trains a model and returns test scores on the model checkpoint with the highest validation score.
:param args: Arguments.
:param logger: Logger.
:return: A list of ensemble scores for each task.
"""
if logger is not None:
debug, info = logger.debug, logger.info
else:
debug = info = print
torch.cuda.set_device(0)
# Get data
debug('Loading data')
args.task_names = get_task_names(args.data_path)
data = get_data(path=args.data_path, args=args, logger=logger)
args.num_tasks = data.num_tasks()
args.features_size = data.features_size()
debug(f'Number of tasks = {args.num_tasks}')
# Split data
debug(f'Splitting data with seed {args.seed}')
train_data, val_data, test_data = split_data(data=data,
split_type=args.split_type,
sizes=[0.8, 0.1, 0.1],
seed=args.seed,
args=args,
logger=logger)
if args.dataset_type == 'classification':
class_sizes = get_class_sizes(data)
debug('Class sizes')
for i, task_class_sizes in enumerate(class_sizes):
debug(f'{args.task_names[i]} '
f'{", ".join(f"{cls}: {size * 100:.2f}%" for cls, size in enumerate(task_class_sizes))}')
if args.features_scaling:
features_scaler = train_data.normalize_features(replace_nan_token=0)
val_data.normalize_features(features_scaler)
test_data.normalize_features(features_scaler)
else:
features_scaler = None
args.train_data_size = len(train_data)
debug(f'Total size = {len(data):,} | '
f'train size = {len(train_data):,} | val size = {len(val_data):,} | test size = {len(test_data):,}')
# Initialize scaler (regression only)
scaler = None
if args.dataset_type == 'regression':
debug('Fitting scaler')
_, train_targets = train_data.smiles(), train_data.targets()
scaler = StandardScaler().fit(train_targets)
scaled_targets = scaler.transform(train_targets).tolist()
train_data.set_targets(scaled_targets)
val_targets = val_data.targets()
scaled_val_targets = scaler.transform(val_targets).tolist()
val_data.set_targets(scaled_val_targets)
metric_func = get_metric_func(metric=args.metric)
# Set up test set evaluation
test_smiles, test_targets = test_data.smiles(), test_data.targets()
sum_test_preds = np.zeros((len(test_smiles), args.num_tasks))
# Load/build model
if args.checkpoint_paths is not None:
cur_model = args.seed
target_path = []
for path in args.checkpoint_paths:
if "fold_%d" % cur_model in path:
target_path = path
debug(f'Loading model {args.seed} from {target_path}')
model = load_checkpoint(target_path, current_args=args, cuda=args.cuda, logger=logger)
# Get loss and metric functions
loss_func = get_loss_func(args, model)
debug(f'Number of parameters = {param_count(model):,}')
test_preds, _ = predict(
model=model,
data=test_data,
batch_size=args.batch_size,
loss_func=loss_func,
logger=logger,
shared_dict={},
scaler=scaler,
args=args
)
test_scores = evaluate_predictions(
preds=test_preds,
targets=test_targets,
num_tasks=args.num_tasks,
metric_func=metric_func,
dataset_type=args.dataset_type,
logger=logger
)
if len(test_preds) != 0:
sum_test_preds += np.array(test_preds, dtype=float)
# Average test score
avg_test_score = np.nanmean(test_scores)
info(f'Model test {args.metric} = {avg_test_score:.6f}')
if args.show_individual_scores:
# Individual test scores
for task_name, test_score in zip(args.task_names, test_scores):
info(f'Model test {task_name} {args.metric} = {test_score:.6f}')
# Evaluate ensemble on test set
avg_test_preds = (sum_test_preds / args.ensemble_size).tolist()
ensemble_scores = evaluate_predictions(
preds=avg_test_preds,
targets=test_targets,
num_tasks=args.num_tasks,
metric_func=metric_func,
dataset_type=args.dataset_type,
logger=logger
)
# If you want to save the prediction result, uncomment these lines.
# ind = [['preds'] * args.num_tasks + ['targets'] * args.num_tasks, args.task_names * 2]
# ind = pd.MultiIndex.from_tuples(list(zip(*ind)))
# data = np.concatenate([np.array(avg_test_preds), np.array(test_targets)], 1)
# test_result = pd.DataFrame(data, index=test_smiles, columns=ind)
# test_result.to_csv(os.path.join(args.save_dir, 'test_result.csv'))
return ensemble_scores