pulse_core_1 / train.py
Vu Anh
Add VLSP2016 dataset support and comprehensive evaluation updates
08bbb4c
#!/usr/bin/env python3
"""
Training script for Vietnamese sentiment classification.
Trains TF-IDF + ML models on VLSP2016 sentiment dataset.
This script trains various machine learning models for Vietnamese sentiment analysis.
"""
import argparse
import json
import logging
import os
import time
from datetime import datetime
import numpy as np
from datasets import load_dataset
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, AdaBoostClassifier
from sklearn.naive_bayes import MultinomialNB
from sklearn.neural_network import MLPClassifier
from sklearn.tree import DecisionTreeClassifier
import joblib
def setup_logging(run_name):
"""Setup logging to save all information to runs folder"""
runs_dir = "runs"
os.makedirs(runs_dir, exist_ok=True)
run_dir = os.path.join(runs_dir, run_name)
os.makedirs(run_dir, exist_ok=True)
log_file = os.path.join(run_dir, "training.log")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler(log_file), logging.StreamHandler()],
)
return run_dir
def load_uts2017_data(split_ratio=0.2, random_state=42, n_samples=None):
"""Load and prepare UTS2017_Bank aspect sentiment dataset
Args:
split_ratio: Ratio for train/test split
random_state: Random seed for reproducibility
n_samples: Optional limit on number of samples
Returns:
Tuple of (X_train, y_train), (X_test, y_test)
"""
print("Loading UTS2017_Bank aspect sentiment dataset from Hugging Face...")
# Load the aspect sentiment subset
dataset = load_dataset("undertheseanlp/UTS2017_Bank", "aspect_sentiment")
# Get the train split (the dataset only has a train split)
train_data = dataset["train"]
# Extract text and aspects (which contains both aspect and sentiment)
texts = []
labels = []
for item in train_data:
text = item["text"]
aspect_data = item["aspects"]
# Handle multiple aspects per text (take the first one for now)
if aspect_data and len(aspect_data) > 0:
aspect = aspect_data[0]["aspect"]
sentiment = aspect_data[0]["sentiment"]
texts.append(text)
labels.append(f"{aspect}#{sentiment}")
# Convert to lists for consistency
texts = list(texts)
labels = list(labels)
# Apply sample limit if specified
if n_samples and n_samples < len(texts):
# Shuffle before sampling to get balanced classes
indices = np.arange(len(texts))
np.random.seed(random_state)
np.random.shuffle(indices)
indices = indices[:n_samples]
texts = [texts[i] for i in indices]
labels = [labels[i] for i in indices]
# Convert to numpy arrays for consistency
X = np.array(texts)
y = np.array(labels)
# Split into train and test sets
# Use stratify only if we have enough samples per class (at least 2)
min_samples_per_class = 2
unique_classes, class_counts = np.unique(y, return_counts=True)
can_stratify = all(count >= min_samples_per_class for count in class_counts)
if can_stratify:
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=split_ratio, random_state=random_state, stratify=y
)
else:
print(
f"Warning: Some classes have fewer than {min_samples_per_class} samples. Disabling stratification."
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=split_ratio, random_state=random_state
)
print(f"Dataset loaded: {len(X_train)} train samples, {len(X_test)} test samples")
print(f"Number of unique labels: {len(set(y))}")
return (X_train, y_train), (X_test, y_test)
def load_vlsp2016_data(use_predefined_split=True, split_ratio=0.2, random_state=42, n_samples=None):
"""Load and prepare VLSP2016 sentiment dataset
Args:
use_predefined_split: If True, use the predefined train/test split from the dataset
split_ratio: Ratio for train/test split (only used if use_predefined_split is False)
random_state: Random seed for reproducibility
n_samples: Optional limit on number of samples
Returns:
Tuple of (X_train, y_train), (X_test, y_test)
"""
print("Loading VLSP2016 sentiment dataset from Hugging Face...")
# Load the dataset
dataset = load_dataset("ura-hcmut/vlsp2016")
if use_predefined_split:
# Use the predefined train/test split
train_data = dataset["train"]
test_data = dataset["test"]
# Extract texts and labels
X_train = [item["Data"] for item in train_data]
y_train = [item["Class"] for item in train_data]
X_test = [item["Data"] for item in test_data]
y_test = [item["Class"] for item in test_data]
# Apply sample limit if specified
if n_samples:
if n_samples < len(X_train):
# Shuffle before sampling to get balanced classes
indices = np.arange(len(X_train))
np.random.seed(random_state)
np.random.shuffle(indices)
indices = indices[:n_samples]
X_train = [X_train[i] for i in indices]
y_train = [y_train[i] for i in indices]
if n_samples < len(X_test):
# Proportionally reduce test set with shuffling
test_samples = int(n_samples * 0.2) # Keep similar ratio
indices = np.arange(len(X_test))
np.random.seed(random_state)
np.random.shuffle(indices)
indices = indices[:test_samples]
X_test = [X_test[i] for i in indices]
y_test = [y_test[i] for i in indices]
# Convert to numpy arrays
X_train = np.array(X_train)
y_train = np.array(y_train)
X_test = np.array(X_test)
y_test = np.array(y_test)
else:
# Combine train and test, then create custom split
all_data = list(dataset["train"]) + list(dataset["test"])
# Extract texts and labels
texts = [item["Data"] for item in all_data]
labels = [item["Class"] for item in all_data]
# Apply sample limit if specified
if n_samples and n_samples < len(texts):
texts = texts[:n_samples]
labels = labels[:n_samples]
# Convert to numpy arrays
X = np.array(texts)
y = np.array(labels)
# Split into train and test sets
# Use stratify only if we have enough samples per class (at least 2)
min_samples_per_class = 2
unique_classes, class_counts = np.unique(y, return_counts=True)
can_stratify = all(count >= min_samples_per_class for count in class_counts)
if can_stratify:
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=split_ratio, random_state=random_state, stratify=y
)
else:
print(
f"Warning: Some classes have fewer than {min_samples_per_class} samples. Disabling stratification."
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=split_ratio, random_state=random_state
)
print(f"Dataset loaded: {len(X_train)} train samples, {len(X_test)} test samples")
print(f"Number of unique labels: {len(set(y_train))}")
print(f"Labels: {sorted(set(y_train))}")
return (X_train, y_train), (X_test, y_test)
def get_available_models():
"""Get available classifier options"""
return {
# Traditional algorithms
"logistic": LogisticRegression(max_iter=1000, random_state=42),
"svc_linear": SVC(kernel="linear", random_state=42, probability=True),
"svc_rbf": SVC(kernel="rbf", random_state=42, probability=True, gamma='scale'),
"naive_bayes": MultinomialNB(),
# Tree-based algorithms
"decision_tree": DecisionTreeClassifier(random_state=42, max_depth=10),
"random_forest": RandomForestClassifier(n_estimators=100, random_state=42, max_depth=10, n_jobs=-1),
# Boosting algorithms
"gradient_boost": GradientBoostingClassifier(n_estimators=100, random_state=42, max_depth=5),
"ada_boost": AdaBoostClassifier(n_estimators=100, random_state=42),
# Neural network
"mlp": MLPClassifier(hidden_layer_sizes=(100, 50), max_iter=500, random_state=42, early_stopping=True),
}
def load_data(dataset_name="vlsp2016", split_ratio=0.2, random_state=42, n_samples=None):
"""Load data from the specified dataset
Args:
dataset_name: Name of the dataset to load ('vlsp2016' or 'uts2017')
split_ratio: Ratio for train/test split
random_state: Random seed for reproducibility
n_samples: Optional limit on number of samples
Returns:
Tuple of (X_train, y_train), (X_test, y_test), dataset_display_name
"""
if dataset_name.lower() == "vlsp2016":
(X_train, y_train), (X_test, y_test) = load_vlsp2016_data(
use_predefined_split=True, split_ratio=split_ratio,
random_state=random_state, n_samples=n_samples
)
display_name = "VLSP2016_Sentiment"
elif dataset_name.lower() == "uts2017":
(X_train, y_train), (X_test, y_test) = load_uts2017_data(
split_ratio=split_ratio, random_state=random_state, n_samples=n_samples
)
display_name = "UTS2017_Bank_AspectSentiment"
else:
raise ValueError(f"Unknown dataset: {dataset_name}. Choose 'vlsp2016' or 'uts2017'")
return (X_train, y_train), (X_test, y_test), display_name
def train_model(
dataset="vlsp2016",
model_name="logistic",
max_features=20000,
ngram_range=(1, 2),
split_ratio=0.2,
n_samples=None,
export_model=False,
):
"""Train a single model with specified parameters
Args:
dataset: Name of the dataset to use ('vlsp2016' or 'uts2017')
model_name: Name of the model to train ('logistic' or 'svc')
max_features: Maximum number of features for TF-IDF vectorizer
ngram_range: N-gram range for feature extraction
split_ratio: Train/test split ratio
n_samples: Optional limit on number of samples
export_model: Whether to export the model for distribution
Returns:
Dictionary containing training results
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
run_dir = setup_logging(timestamp)
logging.info(f"Starting training run: {timestamp}")
logging.info(f"Dataset: {dataset}")
logging.info(f"Model: {model_name}")
logging.info(f"Max features: {max_features}")
logging.info(f"N-gram range: {ngram_range}")
if n_samples:
logging.info(f"Sample limit: {n_samples}")
# Create output folder for models
output_folder = os.path.join(run_dir, "models")
os.makedirs(output_folder, exist_ok=True)
# Load data
logging.info(f"Loading {dataset} dataset...")
(X_train, y_train), (X_test, y_test), dataset_name = load_data(
dataset_name=dataset, split_ratio=split_ratio, random_state=42, n_samples=n_samples
)
# Get unique labels for reporting
unique_labels = sorted(set(y_train))
label_counts_train = {label: np.sum(y_train == label) for label in unique_labels}
label_counts_test = {label: np.sum(y_test == label) for label in unique_labels}
logging.info(f"Train samples: {len(X_train)}")
logging.info(f"Test samples: {len(X_test)}")
logging.info(f"Unique labels: {len(unique_labels)}")
logging.info(f"Label distribution (train): {label_counts_train}")
logging.info(f"Label distribution (test): {label_counts_test}")
# Get model
available_models = get_available_models()
if model_name not in available_models:
raise ValueError(
f"Model '{model_name}' not available. Choose from: {list(available_models.keys())}"
)
classifier = available_models[model_name]
clf_name = classifier.__class__.__name__
logging.info(f"Selected classifier: {clf_name}")
# Configuration name
config_name = f"{dataset_name}_{clf_name}_feat{max_features // 1000}k_ngram{ngram_range[0]}-{ngram_range[1]}"
logging.info("=" * 60)
logging.info(f"Training: {config_name}")
logging.info("=" * 60)
# Create TF-IDF pipeline
logging.info(
f"Creating pipeline with max_features={max_features}, ngram_range={ngram_range}"
)
text_clf = Pipeline(
[
(
"vect",
CountVectorizer(max_features=max_features, ngram_range=ngram_range),
),
("tfidf", TfidfTransformer(use_idf=True)),
("clf", classifier),
]
)
# Train the model
logging.info("Training model...")
start_time = time.time()
text_clf.fit(X_train, y_train)
train_time = time.time() - start_time
logging.info(f"Training completed in {train_time:.2f} seconds")
# Evaluate on training set
logging.info("Evaluating on training set...")
train_predictions = text_clf.predict(X_train)
train_accuracy = accuracy_score(y_train, train_predictions)
logging.info(f"Training accuracy: {train_accuracy:.4f}")
# Evaluate on test set
logging.info("Evaluating on test set...")
start_time = time.time()
test_predictions = text_clf.predict(X_test)
test_accuracy = accuracy_score(y_test, test_predictions)
prediction_time = time.time() - start_time
logging.info(f"Test accuracy: {test_accuracy:.4f}")
logging.info(f"Prediction time: {prediction_time:.2f} seconds")
# Classification report
logging.info("Classification Report:")
report = classification_report(y_test, test_predictions, zero_division=0)
logging.info(report)
print("\nClassification Report:")
print(report)
# Save classification report as dict
report_dict = classification_report(
y_test, test_predictions, zero_division=0, output_dict=True
)
# Confusion matrix
cm = confusion_matrix(y_test, test_predictions, labels=unique_labels)
logging.info(f"Confusion Matrix shape: {cm.shape}")
# Save the model
model_path = os.path.join(output_folder, "model.joblib")
joblib.dump(text_clf, model_path)
logging.info(f"Model saved to {model_path}")
print(f"Model saved to {model_path}")
# Save model with config name
config_model_path = os.path.join(output_folder, f"{config_name}.joblib")
joblib.dump(text_clf, config_model_path)
logging.info(f"Model also saved as {config_model_path}")
# Export model if requested
if export_model:
# Use format: <dataset>_sentiment_<timestamp>.joblib
run_id = os.path.basename(run_dir)
dataset_prefix = dataset.lower()
export_filename = f"{dataset_prefix}_sentiment_{run_id}.joblib"
export_path = os.path.join(".", export_filename)
joblib.dump(text_clf, export_path)
logging.info(f"Model exported as {export_path}")
print(f"Model exported for distribution: {export_filename}")
# Save label mapping
label_mapping_path = os.path.join(output_folder, "labels.txt")
with open(label_mapping_path, "w", encoding="utf-8") as f:
for label in unique_labels:
f.write(f"{label}\n")
logging.info(f"Label mapping saved to {label_mapping_path}")
# Save metadata
metadata = {
"timestamp": timestamp,
"dataset": dataset,
"dataset_name": dataset_name,
"config_name": config_name,
"model_name": model_name,
"classifier": clf_name,
"max_features": max_features,
"ngram_range": list(ngram_range),
"split_ratio": split_ratio,
"n_samples": n_samples,
"train_samples": len(X_train),
"test_samples": len(X_test),
"unique_labels": len(unique_labels),
"labels": unique_labels,
"train_accuracy": float(train_accuracy),
"test_accuracy": float(test_accuracy),
"train_time": train_time,
"prediction_time": prediction_time,
"classification_report": report_dict,
"confusion_matrix": cm.tolist(),
}
metadata_path = os.path.join(run_dir, "metadata.json")
with open(metadata_path, "w", encoding="utf-8") as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
logging.info(f"Metadata saved to {metadata_path}")
# Print summary
print("\n" + "=" * 60)
print("Training Summary")
print("=" * 60)
print(f"Model: {clf_name}")
print(f"Training samples: {len(X_train)}")
print(f"Test samples: {len(X_test)}")
print(f"Number of classes: {len(unique_labels)}")
print(f"Training accuracy: {train_accuracy:.4f}")
print(f"Test accuracy: {test_accuracy:.4f}")
print(f"Training time: {train_time:.2f} seconds")
print(f"Model saved to: {model_path}")
print("=" * 60)
return metadata
def train_all_configurations(dataset="vlsp2016", models=None, num_rows=None):
"""Train multiple model configurations and compare results"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
run_dir = setup_logging(timestamp)
logging.info(f"Starting comparison run: {timestamp}")
logging.info(f"Dataset: {dataset}")
if num_rows:
logging.info(f"Sample limit: {num_rows}")
if models is None:
# Define all available models for comparison
available_models = get_available_models()
models = list(available_models.keys())
logging.info(f"Models to compare: {models}")
# Define configurations to test - focusing on best performing settings
configurations = []
for model_name in models:
if model_name in ["svc_rbf", "gradient_boost", "ada_boost", "mlp"]:
# Use fewer features for computationally expensive models
configurations.append({
"dataset": dataset,
"model_name": model_name,
"max_features": 10000,
"ngram_range": (1, 2),
"n_samples": num_rows
})
else:
# Use more features for faster models
configurations.append({
"dataset": dataset,
"model_name": model_name,
"max_features": 20000,
"ngram_range": (1, 2),
"n_samples": num_rows
})
results = []
for config in configurations:
print(f"\nTraining configuration: {config}")
try:
result = train_model(**config)
results.append(result)
except Exception as e:
logging.error(f"Failed to train with config {config}: {e}")
print(f"Error training configuration: {e}")
# Save comparison results
comparison_path = os.path.join(run_dir, "comparison_results.json")
with open(comparison_path, "w", encoding="utf-8") as f:
json.dump(results, f, indent=2, ensure_ascii=False)
# Print comparison table
print("\n" + "=" * 80)
print("Model Comparison Results")
print("=" * 80)
print(
f"{'Model':<10} {'Features':<10} {'N-gram':<10} {'Train Acc':<12} {'Test Acc':<12}"
)
print("-" * 80)
for result in sorted(results, key=lambda x: x["test_accuracy"], reverse=True):
model = result["classifier"][:8]
features = f"{result['max_features'] // 1000}k"
ngram = f"{result['ngram_range'][0]}-{result['ngram_range'][1]}"
train_acc = result["train_accuracy"]
test_acc = result["test_accuracy"]
print(
f"{model:<10} {features:<10} {ngram:<10} {train_acc:<12.4f} {test_acc:<12.4f}"
)
print("=" * 80)
# Find best model
best_model = max(results, key=lambda x: x["test_accuracy"])
print(f"\nBest model: {best_model['config_name']}")
print(f"Test accuracy: {best_model['test_accuracy']:.4f}")
return results
def train_notebook(dataset="vlsp2016", model_name="logistic", max_features=20000, ngram_min=1, ngram_max=2,
split_ratio=0.2, n_samples=None, compare=False, export_model=False):
"""
Convenience function for training in Jupyter/Colab notebooks without argparse.
Example usage:
from train import train_notebook
train_notebook(dataset="vlsp2016", model_name="logistic", max_features=20000, export_model=True)
"""
if compare:
print(f"Training and comparing multiple configurations on {dataset}...")
return train_all_configurations(dataset=dataset)
else:
print(f"Training {model_name} model on {dataset} dataset...")
print(f"Configuration: max_features={max_features}, ngram=({ngram_min}, {ngram_max})")
return train_model(
dataset=dataset,
model_name=model_name,
max_features=max_features,
ngram_range=(ngram_min, ngram_max),
split_ratio=split_ratio,
n_samples=n_samples,
export_model=export_model,
)
def main():
"""Main function with argument parsing"""
# Detect if running in Jupyter/Colab
import sys
in_notebook = hasattr(sys, 'ps1') or 'ipykernel' in sys.modules or 'google.colab' in sys.modules
parser = argparse.ArgumentParser(
description="Train Vietnamese sentiment classification model on various datasets"
)
parser.add_argument(
"--dataset",
type=str,
choices=["vlsp2016", "uts2017"],
default="vlsp2016",
help="Dataset to use for training (default: vlsp2016)",
)
parser.add_argument(
"--model",
type=str,
choices=["logistic", "svc_linear", "svc_rbf", "naive_bayes", "decision_tree", "random_forest", "gradient_boost", "ada_boost", "mlp"],
default="logistic",
help="Model type to train (default: logistic)",
)
parser.add_argument(
"--max-features",
type=int,
default=20000,
help="Maximum number of features for TF-IDF (default: 20000)",
)
parser.add_argument(
"--ngram-min", type=int, default=1, help="Minimum n-gram range (default: 1)"
)
parser.add_argument(
"--ngram-max", type=int, default=2, help="Maximum n-gram range (default: 2)"
)
parser.add_argument(
"--split-ratio", type=float, default=0.2, help="Test split ratio (default: 0.2)"
)
parser.add_argument(
"--num-rows",
type=int,
default=None,
help="Limit number of rows/samples for quick testing (default: None - use all data)",
)
parser.add_argument(
"--compare",
action="store_true",
help="Train and compare multiple configurations",
)
parser.add_argument(
"--compare-models",
nargs="+",
help="List of specific models to compare (e.g., --compare-models logistic random_forest svc_rbf)",
choices=["logistic", "svc_linear", "svc_rbf", "naive_bayes", "decision_tree", "random_forest", "gradient_boost", "ada_boost", "mlp"]
)
parser.add_argument(
"--export-model",
action="store_true",
help="Export a copy of the trained model to project root for distribution/publishing"
)
# Use parse_known_args to ignore Jupyter/Colab kernel arguments
args, unknown = parser.parse_known_args()
# If running in notebook and there are unknown args, inform user
if in_notebook and unknown:
print(f"Note: Running in Jupyter/Colab environment. Ignoring kernel arguments: {unknown}")
if args.compare or args.compare_models:
if args.compare_models:
print(f"Training and comparing selected models: {args.compare_models}")
print(f"Dataset: {args.dataset}")
if args.num_rows:
print(f"Using {args.num_rows} rows")
train_all_configurations(dataset=args.dataset, models=args.compare_models, num_rows=args.num_rows)
else:
print("Training and comparing all available models...")
print(f"Dataset: {args.dataset}")
if args.num_rows:
print(f"Using {args.num_rows} rows")
train_all_configurations(dataset=args.dataset, num_rows=args.num_rows)
else:
print(f"Training {args.model} model on {args.dataset} dataset...")
print(
f"Configuration: max_features={args.max_features}, ngram=({args.ngram_min}, {args.ngram_max})"
)
train_model(
dataset=args.dataset,
model_name=args.model,
max_features=args.max_features,
ngram_range=(args.ngram_min, args.ngram_max),
split_ratio=args.split_ratio,
n_samples=args.num_rows,
export_model=args.export_model,
)
if __name__ == "__main__":
main()