""" The evaluation function. """ from argparse import Namespace from logging import Logger from typing import List import numpy as np import torch import torch.utils.data.distributed from grover.data.scaler import StandardScaler from grover.util.utils import get_class_sizes, get_data, split_data, get_task_names, get_loss_func from grover.util.utils import load_checkpoint from task.predict import evaluate_predictions from grover.util.metrics import get_metric_func from grover.util.nn_utils import param_count from task.predict import predict def run_evaluation(args: Namespace, logger: Logger = None) -> List[float]: """ Trains a model and returns test scores on the model checkpoint with the highest validation score. :param args: Arguments. :param logger: Logger. :return: A list of ensemble scores for each task. """ if logger is not None: debug, info = logger.debug, logger.info else: debug = info = print torch.cuda.set_device(0) # Get data debug('Loading data') args.task_names = get_task_names(args.data_path) data = get_data(path=args.data_path, args=args, logger=logger) args.num_tasks = data.num_tasks() args.features_size = data.features_size() debug(f'Number of tasks = {args.num_tasks}') # Split data debug(f'Splitting data with seed {args.seed}') train_data, val_data, test_data = split_data(data=data, split_type=args.split_type, sizes=[0.8, 0.1, 0.1], seed=args.seed, args=args, logger=logger) if args.dataset_type == 'classification': class_sizes = get_class_sizes(data) debug('Class sizes') for i, task_class_sizes in enumerate(class_sizes): debug(f'{args.task_names[i]} ' f'{", ".join(f"{cls}: {size * 100:.2f}%" for cls, size in enumerate(task_class_sizes))}') if args.features_scaling: features_scaler = train_data.normalize_features(replace_nan_token=0) val_data.normalize_features(features_scaler) test_data.normalize_features(features_scaler) else: features_scaler = None args.train_data_size = len(train_data) debug(f'Total size = {len(data):,} | ' f'train size = {len(train_data):,} | val size = {len(val_data):,} | test size = {len(test_data):,}') # Initialize scaler (regression only) scaler = None if args.dataset_type == 'regression': debug('Fitting scaler') _, train_targets = train_data.smiles(), train_data.targets() scaler = StandardScaler().fit(train_targets) scaled_targets = scaler.transform(train_targets).tolist() train_data.set_targets(scaled_targets) val_targets = val_data.targets() scaled_val_targets = scaler.transform(val_targets).tolist() val_data.set_targets(scaled_val_targets) metric_func = get_metric_func(metric=args.metric) # Set up test set evaluation test_smiles, test_targets = test_data.smiles(), test_data.targets() sum_test_preds = np.zeros((len(test_smiles), args.num_tasks)) # Load/build model if args.checkpoint_paths is not None: cur_model = args.seed target_path = [] for path in args.checkpoint_paths: if "fold_%d" % cur_model in path: target_path = path debug(f'Loading model {args.seed} from {target_path}') model = load_checkpoint(target_path, current_args=args, cuda=args.cuda, logger=logger) # Get loss and metric functions loss_func = get_loss_func(args, model) debug(f'Number of parameters = {param_count(model):,}') test_preds, _ = predict( model=model, data=test_data, batch_size=args.batch_size, loss_func=loss_func, logger=logger, shared_dict={}, scaler=scaler, args=args ) test_scores = evaluate_predictions( preds=test_preds, targets=test_targets, num_tasks=args.num_tasks, metric_func=metric_func, dataset_type=args.dataset_type, logger=logger ) if len(test_preds) != 0: sum_test_preds += np.array(test_preds, dtype=float) # Average test score avg_test_score = np.nanmean(test_scores) info(f'Model test {args.metric} = {avg_test_score:.6f}') if args.show_individual_scores: # Individual test scores for task_name, test_score in zip(args.task_names, test_scores): info(f'Model test {task_name} {args.metric} = {test_score:.6f}') # Evaluate ensemble on test set avg_test_preds = (sum_test_preds / args.ensemble_size).tolist() ensemble_scores = evaluate_predictions( preds=avg_test_preds, targets=test_targets, num_tasks=args.num_tasks, metric_func=metric_func, dataset_type=args.dataset_type, logger=logger ) # If you want to save the prediction result, uncomment these lines. # ind = [['preds'] * args.num_tasks + ['targets'] * args.num_tasks, args.task_names * 2] # ind = pd.MultiIndex.from_tuples(list(zip(*ind))) # data = np.concatenate([np.array(avg_test_preds), np.array(test_targets)], 1) # test_result = pd.DataFrame(data, index=test_smiles, columns=ind) # test_result.to_csv(os.path.join(args.save_dir, 'test_result.csv')) return ensemble_scores