Source code for libmultilabel.linear.linear

from __future__ import annotations

import logging
import os

import numpy as np
import scipy.sparse as sparse
from liblinear.liblinearutil import train
from tqdm import tqdm

__all__ = [
    "train_1vsrest",
    "train_thresholding",
    "train_cost_sensitive",
    "train_cost_sensitive_micro",
    "train_binary_and_multiclass",
    "predict_values",
    "get_topk_labels",
    "get_positive_labels",
    "FlatModel",
]


[docs]class FlatModel: """A model returned from a training function.""" def __init__( self, name: str, weights: np.matrix, bias: float, thresholds: float | np.ndarray, multiclass: bool, ): self.name = name self.weights = weights self.bias = bias self.thresholds = thresholds self.multiclass = multiclass
[docs] def predict_values(self, x: sparse.csr_matrix) -> np.ndarray: """Calculates the decision values associated with x. Args: x (sparse.csr_matrix): A matrix with dimension number of instances * number of features. Returns: np.ndarray: A matrix with dimension number of instances * number of classes. """ bias = self.bias bias_col = np.full((x.shape[0], 1 if bias > 0 else 0), bias) num_feature = self.weights.shape[0] num_feature -= 1 if bias > 0 else 0 if x.shape[1] < num_feature: x = sparse.hstack( [ x, np.zeros((x.shape[0], num_feature - x.shape[1])), bias_col, ], "csr", ) else: x = sparse.hstack( [ x[:, :num_feature], bias_col, ], "csr", ) return (x * self.weights).A + self.thresholds
[docs]def train_1vsrest( y: sparse.csr_matrix, x: sparse.csr_matrix, multiclass: bool = False, options: str = "", verbose: bool = True, ) -> FlatModel: """Trains a linear model for multi-label data using a one-vs-rest strategy. Args: y (sparse.csr_matrix): A 0/1 matrix with dimensions number of instances * number of classes. x (sparse.csr_matrix): A matrix with dimensions number of instances * number of features. multiclass (bool, optional): A flag indicating if the dataset is multiclass. options (str, optional): The option string passed to liblinear. Defaults to ''. verbose (bool, optional): Output extra progress information. Defaults to True. Returns: A model which can be used in predict_values. """ # Follows the MATLAB implementation at https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/multilabel/ x, options, bias = _prepare_options(x, options) y = y.tocsc() num_class = y.shape[1] num_feature = x.shape[1] weights = np.zeros((num_feature, num_class), order="F") if verbose: logging.info(f"Training one-vs-rest model on {num_class} labels") for i in tqdm(range(num_class), disable=not verbose): yi = y[:, i].toarray().reshape(-1) weights[:, i] = _do_train(2 * yi - 1, x, options).ravel() return FlatModel( name="1vsrest", weights=np.asmatrix(weights), bias=bias, thresholds=0, multiclass=multiclass, )
def _prepare_options(x: sparse.csr_matrix, options: str) -> tuple[sparse.csr_matrix, str, float]: """Prepare options and x for multi-label training. Called in the first line of any training function. Args: x (sparse.csr_matrix): A matrix with dimensions number of instances * number of features. options (str): The option string passed to liblinear. Returns: tuple[sparse.csr_matrix, str, float]: Transformed x, transformed options and bias parsed from options. """ if options is None: options = "" if any(o in options for o in ["-R", "-C", "-v"]): raise ValueError("-R, -C and -v are not supported") options_split = options.split() if "-s" in options_split: i = options_split.index("-s") solver_type = int(options_split[i + 1]) if solver_type < 0 or solver_type > 7: raise ValueError("Invalid LIBLINEAR solver type. Only classification solvers are allowed.") else: # workaround for liblinear warning about unspecified solver options_split.extend(["-s", "1"]) bias = -1.0 if "-B" in options_split: i = options_split.index("-B") bias = float(options_split[i + 1]) options_split = options_split[:i] + options_split[i + 2 :] x = sparse.hstack( [ x, np.full((x.shape[0], 1), bias), ], "csr", ) if not "-q" in options_split: options_split.append("-q") if not "-m" in options: options_split.append(f"-m {int(os.cpu_count() / 2)}") options = " ".join(options_split) return x, options, bias
[docs]def train_thresholding( y: sparse.csr_matrix, x: sparse.csr_matrix, multiclass: bool = False, options: str = "", verbose: bool = True, ) -> FlatModel: """Trains a linear model for multi-label data using a one-vs-rest strategy and cross-validation to pick decision thresholds optimizing the sum of Macro-F1 and Micro-F1. Outperforms train_1vsrest in most aspects at the cost of higher time complexity due to an internal cross-validation. This method is the micromacro-freq approach from this CIKM 2023 paper: `"On the Thresholding Strategy for Infrequent Labels in Multi-label Classification" <https://www.csie.ntu.edu.tw/~cjlin/papers/thresholding/smooth_acm.pdf>`_ (see Section 4.3 and Supplementary D). Args: y (sparse.csr_matrix): A 0/1 matrix with dimensions number of instances * number of classes. x (sparse.csr_matrix): A matrix with dimensions number of instances * number of features. multiclass (bool, optional): A flag indicating if the dataset is multiclass. options (str, optional): The option string passed to liblinear. Defaults to ''. verbose (bool, optional): Output extra progress information. Defaults to True. Returns: A model which can be used in predict_values. """ x, options, bias = _prepare_options(x, options) y = y.tocsc() num_class = y.shape[1] num_feature = x.shape[1] weights = np.zeros((num_feature, num_class), order="F") thresholds = np.zeros(num_class) if verbose: logging.info("Training thresholding model on %s labels", num_class) num_positives = np.sum(y, 2) label_order = np.flip(np.argsort(num_positives)).flat # accumulated counts for micro stats = {"tp": 0, "fp": 0, "fn": 0, "labels": 0} for i in tqdm(label_order, disable=not verbose): yi = y[:, i].toarray().reshape(-1) w, t, stats = _micromacro_one_label(2 * yi - 1, x, options, stats) weights[:, i] = w.ravel() thresholds[i] = t return FlatModel( name="thresholding", weights=np.asmatrix(weights), bias=bias, thresholds=thresholds, multiclass=multiclass, )
def _micromacro_one_label( y: np.ndarray, x: sparse.csr_matrix, options: str, stats: dict ) -> tuple[np.ndarray, float, dict]: """Perform cross-validation to select the threshold for a label. Args: y (np.ndarray): A +1/-1 array with dimensions number of instances * 1. x (sparse.csr_matrix): A matrix with dimensions number of instances * number of features. options (str): The option string passed to liblinear. stats (dict): A dictionary containing information needed to calculate Micro-F1. It includes the accumulated number of true positives, false positives, false negatives, and the number of labels processed. Returns: tuple[np.ndarray, float, dict]: the weights, threshold, and the updated stats for calculating Micro-F1. """ nr_fold = 3 thresholds = np.zeros(nr_fold) tp_sum = 0 fp_sum = 0 fn_sum = 0 stats["labels"] += 1 def micro_plus_macro(tp, fp, fn): # Because the F-measure of other labels are constants and thus does not affect optimization, # we ignore them when calculating macro-F. macro = np.nan_to_num((2 * tp) / (2 * tp + fp + fn)) / stats["labels"] micro = np.nan_to_num((2 * (tp + stats["tp"])) / (2 * (tp + stats["tp"]) + fp + fn + stats["fp"] + stats["fn"])) return micro + macro l = y.shape[0] perm = np.random.permutation(l) for fold in range(nr_fold): mask = np.zeros_like(perm, dtype="?") mask[np.arange(int(fold * l / nr_fold), int((fold + 1) * l / nr_fold))] = True val_idx = perm[mask] train_idx = perm[np.logical_not(mask)] w = _do_train(y[train_idx], x[train_idx], options) wTx = (x[val_idx] * w).A1 sorted_wTx_index = np.argsort(wTx, kind="stable") sorted_wTx = wTx[sorted_wTx_index] # ignore warning for 0/0 when calculating F-measures prev_settings = np.seterr("ignore") tp = np.sum(y[val_idx] == 1) fp = val_idx.size - tp fn = 0 best_obj = micro_plus_macro(tp, fp, fn) best_tp, best_fp, best_fn = tp, fp, fn cut = -1 y_val = y[val_idx] for i in range(val_idx.size): if y_val[sorted_wTx_index[i]] == -1: fp -= 1 else: tp -= 1 fn += 1 obj = micro_plus_macro(tp, fp, fn) if obj >= best_obj: best_obj = obj best_tp, best_fp, best_fn = tp, fp, fn cut = i np.seterr(**prev_settings) if cut == -1: # i.e. all 1 in scut thresholds[fold] = np.nextafter(sorted_wTx[0], -np.inf) # predict all 1 elif cut == val_idx.size - 1: thresholds[fold] = np.nextafter(sorted_wTx[-1], np.inf) else: thresholds[fold] = (sorted_wTx[cut] + sorted_wTx[cut + 1]) / 2 tp_sum += best_tp fp_sum += best_fp fn_sum += best_fn # In FlatModel.predict_values, the threshold is added to the decision value. # Therefore, we need to make it negative here. threshold = -thresholds.mean() stats["tp"] += tp_sum stats["fp"] += fp_sum stats["fn"] += fn_sum return _do_train(y, x, options), threshold, stats def _do_train(y: np.ndarray, x: sparse.csr_matrix, options: str) -> np.matrix: """Wrapper around liblinear.liblinearutil.train. Forcibly suppresses all IO regardless of options. Args: y (np.ndarray): A +1/-1 array with dimensions number of instances * 1. x (sparse.csr_matrix): A matrix with dimensions number of instances * number of features. options (str): The option string passed to liblinear. Returns: np.matrix: the weights. """ if y.shape[0] == 0: return np.matrix(np.zeros((x.shape[1], 1))) with silent_stderr(): model = train(y, x, options) w = np.ctypeslib.as_array(model.w, (x.shape[1], 1)) w = np.asmatrix(w) # Liblinear flips +1/-1 labels so +1 is always the first label, # but not if all labels are -1. # For our usage, we need +1 to always be the first label, # so the check is necessary. if model.get_labels()[0] == -1: return -w else: # The memory is freed on model deletion so we make a copy. return w.copy() class silent_stderr: """Context manager that suppresses stderr. Liblinear emits warnings on missing classes with specified weight, which may happen during cross-validation. Since this information is useless to the user, we suppress it. """ def __init__(self): self.stderr = os.dup(2) self.devnull = os.open(os.devnull, os.O_WRONLY) def __enter__(self): os.dup2(self.devnull, 2) def __exit__(self, type, value, traceback): os.dup2(self.stderr, 2) os.close(self.devnull) os.close(self.stderr) def _fmeasure(y_true: np.ndarray, y_pred: np.ndarray) -> float: """Calculate F1 score. Args: y_true (np.ndarray): array of +1/-1. y_pred (np.ndarray): array of +1/-1. Returns: float: the F1 score. """ tp = np.sum(np.logical_and(y_true == 1, y_pred == 1)) fn = np.sum(np.logical_and(y_true == 1, y_pred == -1)) fp = np.sum(np.logical_and(y_true == -1, y_pred == 1)) F = 0 if tp != 0 or fp != 0 or fn != 0: F = 2 * tp / (2 * tp + fp + fn) return F
[docs]def train_cost_sensitive( y: sparse.csr_matrix, x: sparse.csr_matrix, multiclass: bool = False, options: str = "", verbose: bool = True, ) -> FlatModel: """Trains a linear model for multi-label data using a one-vs-rest strategy and cross-validation to pick an optimal asymmetric misclassification cost for Macro-F1. Outperforms train_1vsrest in most aspects at the cost of higher time complexity. See user guide for more details. Args: y (sparse.csr_matrix): A 0/1 matrix with dimensions number of instances * number of classes. x (sparse.csr_matrix): A matrix with dimensions number of instances * number of features. multiclass (bool, optional): A flag indicating if the dataset is multiclass. options (str, optional): The option string passed to liblinear. Defaults to ''. verbose (bool, optional): Output extra progress information. Defaults to True. Returns: A model which can be used in predict_values. """ # Follows the MATLAB implementation at https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/multilabel/ x, options, bias = _prepare_options(x, options) y = y.tocsc() num_class = y.shape[1] num_feature = x.shape[1] weights = np.zeros((num_feature, num_class), order="F") if verbose: logging.info(f"Training cost-sensitive model for Macro-F1 on {num_class} labels") for i in tqdm(range(num_class), disable=not verbose): yi = y[:, i].toarray().reshape(-1) w = _cost_sensitive_one_label(2 * yi - 1, x, options) weights[:, i] = w.ravel() return FlatModel( name="cost_sensitive", weights=np.asmatrix(weights), bias=bias, thresholds=0, multiclass=multiclass, )
def _cost_sensitive_one_label(y: np.ndarray, x: sparse.csr_matrix, options: str) -> np.ndarray: """Loop over parameter space for cost-sensitive on a single label. Args: y (np.ndarray): A +1/-1 array with dimensions number of instances * 1. x (sparse.csr_matrix): A matrix with dimensions number of instances * number of features. options (str): The option string passed to liblinear. Returns: np.ndarray: the weights. """ l = y.shape[0] perm = np.random.permutation(l) param_space = [1, 1.33, 1.8, 2.5, 3.67, 6, 13] bestScore = -np.Inf for a in param_space: cv_options = f"{options} -w1 {a}" pred = _cross_validate(y, x, cv_options, perm) score = _fmeasure(y, pred) if bestScore < score: bestScore = score bestA = a final_options = f"{options} -w1 {bestA}" return _do_train(y, x, final_options) def _cross_validate(y: np.ndarray, x: sparse.csr_matrix, options: str, perm: np.ndarray) -> np.ndarray: """Cross-validation for cost-sensitive. Args: y (np.ndarray): A +1/-1 array with dimensions number of instances * 1. x (sparse.csr_matrix): A matrix with dimensions number of instances * number of features. options (str): The option string passed to liblinear. Returns: np.ndarray: Cross-validation result as a +1/-1 array. """ l = y.shape[0] nr_fold = 3 pred = np.zeros_like(y) for fold in range(nr_fold): mask = np.zeros_like(perm, dtype="?") mask[np.arange(int(fold * l / nr_fold), int((fold + 1) * l / nr_fold))] = 1 val_idx = perm[mask] train_idx = perm[mask != True] w = _do_train(y[train_idx], x[train_idx], options) pred[val_idx] = (x[val_idx] * w).A1 > 0 return 2 * pred - 1
[docs]def train_cost_sensitive_micro( y: sparse.csr_matrix, x: sparse.csr_matrix, multiclass: bool = False, options: str = "", verbose: bool = True, ) -> FlatModel: """Trains a linear model for multi-label data using a one-vs-rest strategy and cross-validation to pick an optimal asymmetric misclassification cost for Micro-F1. Outperforms train_1vsrest in most aspects at the cost of higher time complexity. See user guide for more details. Args: y (sparse.csr_matrix): A 0/1 matrix with dimensions number of instances * number of classes. x (sparse.csr_matrix): A matrix with dimensions number of instances * number of features. multiclass (bool, optional): A flag indicating if the dataset is multiclass. options (str, optional): The option string passed to liblinear. Defaults to ''. verbose (bool, optional): Output extra progress information. Defaults to True. Returns: A model which can be used in predict_values. """ # Follows the MATLAB implementation at https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/multilabel/ x, options, bias = _prepare_options(x, options) y = y.tocsc() num_class = y.shape[1] num_feature = x.shape[1] weights = np.zeros((num_feature, num_class), order="F") l = y.shape[0] perm = np.random.permutation(l) param_space = [1, 1.33, 1.8, 2.5, 3.67, 6, 13] bestScore = -np.Inf if verbose: logging.info(f"Training cost-sensitive model for Micro-F1 on {num_class} labels") for a in param_space: tp = fn = fp = 0 for i in tqdm(range(num_class), disable=not verbose): yi = y[:, i].toarray().reshape(-1) yi = 2 * yi - 1 cv_options = f"{options} -w1 {a}" pred = _cross_validate(yi, x, cv_options, perm) tp = tp + np.sum(np.logical_and(yi == 1, pred == 1)) fn = fn + np.sum(np.logical_and(yi == 1, pred == -1)) fp = fp + np.sum(np.logical_and(yi == -1, pred == 1)) score = 2 * tp / (2 * tp + fn + fp) if bestScore < score: bestScore = score bestA = a final_options = f"{options} -w1 {bestA}" for i in range(num_class): yi = y[:, i].toarray().reshape(-1) w = _do_train(2 * yi - 1, x, final_options) weights[:, i] = w.ravel() return FlatModel( name="cost_sensitive_micro", weights=np.asmatrix(weights), bias=bias, thresholds=0, multiclass=multiclass, )
[docs]def train_binary_and_multiclass( y: sparse.csr_matrix, x: sparse.csr_matrix, multiclass: bool = True, options: str = "", verbose: bool = True, ) -> FlatModel: """Trains a linear model for binary and multi-class data. Args: y (sparse.csr_matrix): A 0/1 matrix with dimensions number of instances * number of classes. x (sparse.csr_matrix): A matrix with dimensions number of instances * number of features. multiclass (bool, optional): A flag indicating if the dataset is multiclass. options (str, optional): The option string passed to liblinear. Defaults to ''. verbose (bool, optional): Output extra progress information. Defaults to True. Returns: A model which can be used in predict_values. """ x, options, bias = _prepare_options(x, options) num_instances, num_labels = y.shape nonzero_instance_ids, nonzero_label_ids = y.nonzero() assert ( len(set(nonzero_instance_ids)) == num_instances ), """ Invalid dataset. Only multi-class dataset is allowed.""" y = np.squeeze(nonzero_label_ids) with silent_stderr(): model = train(y, x, options) # Labels appeared in training set; length may be smaller than num_labels train_labels = np.array(model.get_labels(), dtype="int") weights = np.zeros((x.shape[1], num_labels)) if num_labels == 2 and "-s 4" not in options: # For binary classification, liblinear returns weights # with shape (number of features * 1) except '-s 4'. w = np.ctypeslib.as_array(model.w, (x.shape[1], 1)) weights[:, train_labels[0]] = w[:, 0] weights[:, train_labels[1]] = -w[:, 0] else: # Map label to the original index w = np.ctypeslib.as_array(model.w, (x.shape[1], len(train_labels))) weights[:, train_labels] = w # For labels not appeared in training, assign thresholds to -inf so they won't be predicted. thresholds = np.full(num_labels, -np.inf) thresholds[train_labels] = 0 return FlatModel( name="binary_and_multiclass", weights=np.asmatrix(weights), bias=bias, thresholds=thresholds, multiclass=multiclass, )
[docs]def predict_values(model, x: sparse.csr_matrix) -> np.ndarray: """Calculates the decision values associated with x, equivalent to model.predict_values(x). Args: model: A model returned from a training function. x (sparse.csr_matrix): A matrix with dimension number of instances * number of features. Returns: np.ndarray: A matrix with dimension number of instances * number of classes. """ return model.predict_values(x)
[docs]def get_topk_labels(preds: np.ndarray, label_mapping: np.ndarray, top_k: int = 5) -> tuple[np.ndarray, np.ndarray]: """Get labels and scores of top k predictions from decision values. Args: preds (np.ndarray): A matrix of decision values with dimension (number of instances * number of classes). label_mapping (np.ndarray): A ndarray of class labels that maps each index (from 0 to ``num_class-1``) to its label. top_k (int): Determine how many classes per instance should be predicted. Returns: Two 2d ndarray with first one containing predicted labels and the other containing corresponding scores. Both have dimension (num_instances * top_k). """ idx = np.argpartition(preds, -top_k)[:, : -top_k - 1 : -1] row_idx = np.arange(preds.shape[0])[:, None] sorted_idx = idx[row_idx, np.argsort(-preds[row_idx, idx])] scores = preds[row_idx, sorted_idx] return label_mapping[sorted_idx], scores
[docs]def get_positive_labels(preds: np.ndarray, label_mapping: np.ndarray) -> tuple[list[list[str]], list[list[float]]]: """Get all labels and scores with positive decision value. Args: preds (np.ndarray): A matrix of decision values with dimension number of instances * number of classes. label_mapping (np.ndarray): A ndarray of class labels that maps each index (from 0 to ``num_class-1``) to its label. Returns: Two 2d lists with first one containing predicted labels and the other containing corresponding scores. """ labels = [] scores = [] for ipred in preds: pos_idx = np.where(ipred > 0) labels.append(label_mapping[pos_idx[0]].tolist()) scores.append(ipred[pos_idx].tolist()) return labels, scores