Source code for libmultilabel.nn.data_utils

import csv
import gc
import logging
import warnings

import pandas as pd
import torch
import transformers
from nltk.tokenize import RegexpTokenizer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset
from torchtext.vocab import build_vocab_from_iterator, pretrained_aliases, Vocab
from tqdm import tqdm

transformers.logging.set_verbosity_error()
warnings.simplefilter(action="ignore", category=FutureWarning)

UNK = "<unk>"
PAD = "<pad>"


class TextDataset(Dataset):
    """Class for text dataset.

    Args:
        data (list[dict]): List of instances with index, label, and text.
        classes (list): List of labels.
        max_seq_length (int, optional): The maximum number of tokens of a sample.
        add_special_tokens (bool, optional): Whether to add the special tokens. Defaults to True.
        tokenizer (transformers.PreTrainedTokenizerBase, optional): HuggingFace's tokenizer of
            the transformer-based pretrained language model. Defaults to None.
        word_dict (torchtext.vocab.Vocab, optional): A vocab object for word tokenizer to
            map tokens to indices. Defaults to None.
    """

    def __init__(
        self,
        data,
        classes,
        max_seq_length,
        add_special_tokens=True,
        *,
        tokenizer=None,
        word_dict=None,
    ):
        self.data = data
        self.classes = classes
        self.max_seq_length = max_seq_length
        self.word_dict = word_dict
        self.tokenizer = tokenizer
        self.add_special_tokens = add_special_tokens

        self.num_classes = len(self.classes)
        self.label_binarizer = MultiLabelBinarizer().fit([classes])

        if not isinstance(self.word_dict, Vocab) ^ isinstance(self.tokenizer, transformers.PreTrainedTokenizerBase):
            raise ValueError("Please specify exactly one of word_dict or tokenizer")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        data = self.data[index]
        if self.tokenizer is not None:  # transformers tokenizer
            if self.add_special_tokens:  # tentatively hard code
                input_ids = self.tokenizer.encode(
                    data["text"], padding="max_length", max_length=self.max_seq_length, truncation=True
                )
            else:
                input_ids = self.tokenizer.encode(data["text"], add_special_tokens=False)
        else:
            input_ids = [self.word_dict[word] for word in data["text"]]
        return {
            "text": torch.LongTensor(input_ids[: self.max_seq_length]),
            "label": torch.IntTensor(self.label_binarizer.transform([data["label"]])[0]),
        }


def tokenize(text):
    """Tokenize text.

    Args:
        text (str): Text to tokenize.

    Returns:
        list: A list of tokens.
    """
    tokenizer = RegexpTokenizer(r"\w+")
    return [t.lower() for t in tokenizer.tokenize(text) if not t.isnumeric()]


def generate_batch(data_batch):
    text_list = [data["text"] for data in data_batch]
    label_list = [data["label"] for data in data_batch]
    length_list = [len(data["text"]) for data in data_batch]
    return {
        "text": pad_sequence(text_list, batch_first=True),
        "label": torch.stack(label_list),
        "length": torch.IntTensor(length_list),
    }


[docs]def get_dataset_loader( data, classes, device, max_seq_length=500, batch_size=1, shuffle=False, data_workers=4, add_special_tokens=True, *, tokenizer=None, word_dict=None, ): """Create a pytorch DataLoader. Args: data (list[dict]): List of training instances with index, label, and tokenized text. classes (list): List of labels. device (torch.device): One of cuda or cpu. max_seq_length (int, optional): The maximum number of tokens of a sample. Defaults to 500. batch_size (int, optional): Size of training batches. Defaults to 1. shuffle (bool, optional): Whether to shuffle training data before each epoch. Defaults to False. data_workers (int, optional): Use multi-cpu core for data pre-processing. Defaults to 4. add_special_tokens (bool, optional): Whether to add the special tokens. Defaults to True. tokenizer (transformers.PreTrainedTokenizerBase, optional): HuggingFace's tokenizer of the transformer-based pretrained language model. Defaults to None. word_dict (torchtext.vocab.Vocab, optional): A vocab object for word tokenizer to map tokens to indices. Defaults to None. Returns: torch.utils.data.DataLoader: A pytorch DataLoader. """ dataset = TextDataset( data, classes, max_seq_length, word_dict=word_dict, tokenizer=tokenizer, add_special_tokens=add_special_tokens ) dataset_loader = torch.utils.data.DataLoader( dataset, batch_size=batch_size, shuffle=shuffle, num_workers=data_workers, collate_fn=generate_batch, pin_memory="cuda" in device.type, ) return dataset_loader
def _load_raw_data(data, is_test=False, tokenize_text=True, remove_no_label_data=False): """Load and tokenize raw data in file or dataframe. Args: data (Union[str, pandas,.Dataframe]): Training, test, or validation data in file or dataframe. is_test (bool, optional): Whether the data is for test or not. Defaults to False. remove_no_label_data (bool, optional): Whether to remove training/validation instances that have no labels. This is effective only when is_test=False. Defaults to False. Returns: dict: [{(optional: "index": ..., )"label": ..., "text": ...}, ...] """ assert isinstance(data, str) or isinstance(data, pd.DataFrame), "Data must be from a file or pandas dataframe." if isinstance(data, str): logging.info(f"Load data from {data}.") data = pd.read_csv(data, sep="\t", header=None, on_bad_lines="warn", quoting=csv.QUOTE_NONE).fillna("") data = data.astype(str) if data.shape[1] == 2: data.columns = ["label", "text"] data = data.reset_index() elif data.shape[1] == 3: data.columns = ["index", "label", "text"] else: raise ValueError(f"Expected 2 or 3 columns, got {data.shape[1]}.") data["label"] = data["label"].astype(str).map(lambda s: s.split()) if tokenize_text: data["text"] = data["text"].map(tokenize) data = data.to_dict("records") if not is_test: num_no_label_data = sum(1 for d in data if len(d["label"]) == 0) if num_no_label_data > 0: if remove_no_label_data: logging.info( f"Remove {num_no_label_data} instances that have no labels from data.", extra={"collect": True} ) data = [d for d in data if len(d["label"]) > 0] else: logging.info( f"Keep {num_no_label_data} instances that have no labels from data.", extra={"collect": True} ) return data
[docs]def load_datasets( training_data=None, test_data=None, val_data=None, val_size=0.2, merge_train_val=False, tokenize_text=True, remove_no_label_data=False, ): """Load data from the specified data paths or the given dataframe. If `val_data` does not exist but `val_size` > 0, the validation set will be split from the training dataset. Args: training_data (Union[str, pandas,.Dataframe], optional): Path to training data or a dataframe. test_data (Union[str, pandas,.Dataframe], optional): Path to test data or a dataframe. val_data (Union[str, pandas,.Dataframe], optional): Path to validation data or a dataframe. val_size (float, optional): Training-validation split: a ratio in [0, 1] or an integer for the size of the validation set. Defaults to 0.2. merge_train_val (bool, optional): Whether to merge the training and validation data. Defaults to False. tokenize_text (bool, optional): Whether to tokenize text. Defaults to True. remove_no_label_data (bool, optional): Whether to remove training/validation instances that have no labels. Defaults to False. Returns: dict: A dictionary of datasets. """ if training_data is None and test_data is None: raise ValueError("At least one of `training_data` and `test_data` must be specified.") datasets = {} if training_data is not None: logging.info(f"Loading training data") datasets["train"] = _load_raw_data( training_data, tokenize_text=tokenize_text, remove_no_label_data=remove_no_label_data ) if val_data is not None: datasets["val"] = _load_raw_data( val_data, tokenize_text=tokenize_text, remove_no_label_data=remove_no_label_data ) elif val_size > 0: datasets["train"], datasets["val"] = train_test_split(datasets["train"], test_size=val_size, random_state=42) if test_data is not None: logging.info(f"Loading test data") datasets["test"] = _load_raw_data( test_data, is_test=True, tokenize_text=tokenize_text, remove_no_label_data=remove_no_label_data ) if merge_train_val and "val" in datasets: datasets["train"] = datasets["train"] + datasets["val"] for i in range(len(datasets["train"])): datasets["train"][i]["index"] = i del datasets["val"] gc.collect() msg = " / ".join(f"{k}: {len(v)}" for k, v in datasets.items()) logging.info(f"Finish loading dataset ({msg})") return datasets
[docs]def load_or_build_text_dict( dataset, vocab_file=None, min_vocab_freq=1, embed_file=None, embed_cache_dir=None, silent=False, normalize_embed=False, ): """Build or load the vocabulary from the training dataset or the predefined `vocab_file`. The pretrained embedding can be either from a self-defined `embed_file` or from one of the vectors defined in torchtext.vocab.pretrained_aliases (https://github.com/pytorch/text/blob/main/torchtext/vocab/vectors.py). Args: dataset (list): List of training instances with index, label, and tokenized text. vocab_file (str, optional): Path to a file holding vocabuaries. Defaults to None. min_vocab_freq (int, optional): The minimum frequency needed to include a token in the vocabulary. Defaults to 1. embed_file (str): Path to a file holding pre-trained embeddings. embed_cache_dir (str, optional): Path to a directory for storing cached embeddings. Defaults to None. silent (bool, optional): Enable silent mode. Defaults to False. normalize_embed (bool, optional): Whether the embeddings of each word is normalized to a unit vector. Defaults to False. Returns: tuple[torchtext.vocab.Vocab, torch.Tensor]: A vocab object which maps tokens to indices and the pre-trained word vectors of shape (vocab_size, embed_dim). """ if vocab_file: logging.info(f"Load vocab from {vocab_file}") with open(vocab_file, "r") as fp: vocab_list = [[vocab.strip() for vocab in fp.readlines()]] # Keep PAD index 0 to align `padding_idx` of # class Embedding in libmultilabel.nn.networks.modules. vocabs = build_vocab_from_iterator(vocab_list, min_freq=1, specials=[PAD, UNK]) else: vocab_list = [set(data["text"]) for data in dataset] vocabs = build_vocab_from_iterator(vocab_list, min_freq=min_vocab_freq, specials=[PAD, UNK]) vocabs.set_default_index(vocabs[UNK]) logging.info(f"Read {len(vocabs)} vocabularies.") embedding_weights = get_embedding_weights_from_file(vocabs, embed_file, silent, embed_cache_dir) if normalize_embed: # To have better precision for calculating the normalization, we convert the original # embedding_weights from a torch.FloatTensor to a torch.DoubleTensor. # After the normalization, we will convert the embedding_weights back to a torch.FloatTensor. embedding_weights = embedding_weights.double() for i, vector in enumerate(embedding_weights): # We use the constant 1e-6 by following https://github.com/jamesmullenbach/caml-mimic/blob/44a47455070d3d5c6ee69fb5305e32caec104960/dataproc/extract_wvs.py#L60 # for an internal experiment of reproducing their results. embedding_weights[i] = vector / float(torch.linalg.norm(vector) + 1e-6) embedding_weights = embedding_weights.float() return vocabs, embedding_weights
[docs]def load_or_build_label(datasets, label_file=None, include_test_labels=False): """Obtain the label set from loading a label file or from the given data sets. The label set contains labels in the training and validation sets. Labels in the test set are included only when `include_test_labels` is True. Args: datasets (dict): A dictionary of datasets. Each dataset contains list of instances with index, label, and tokenized text. label_file (str, optional): Path to a file holding all labels. include_test_labels (bool, optional): Whether to include labels in the test dataset. Defaults to False. Returns: list: A list of labels sorted in alphabetical order. """ if label_file is not None: logging.info(f"Load labels from {label_file}.") with open(label_file, "r") as fp: classes = sorted([s.strip() for s in fp.readlines()]) else: if "test" not in datasets and include_test_labels: raise ValueError(f"Specified the inclusion of test labels but test file does not exist") classes = set() for split, data in datasets.items(): if split == "test" and not include_test_labels: continue for instance in data: classes.update(instance["label"]) classes = sorted(classes) logging.info(f"Read {len(classes)} labels.") return classes
def get_embedding_weights_from_file(word_dict, embed_file, silent=False, cache=None): """If the word exists in the embedding file, load the pretrained word embedding. Otherwise, assign a zero vector to that word. Args: word_dict (torchtext.vocab.Vocab): A vocab object which maps tokens to indices. embed_file (str): Path to a file holding pre-trained embeddings. silent (bool, optional): Enable silent mode. Defaults to False. cache (str, optional): Path to a directory for storing cached embeddings. Defaults to None. Returns: torch.Tensor: Embedding weights (vocab_size, embed_size) """ # Load pretrained word embedding load_embedding_from_file = embed_file not in pretrained_aliases if load_embedding_from_file: logging.info(f"Load pretrained embedding from file: {embed_file}.") with open(embed_file) as f: word_vectors = f.readlines() embed_size = len(word_vectors[0].split()) - 1 vector_dict = {} for word_vector in tqdm(word_vectors, disable=silent): word, vector = word_vector.rstrip().split(" ", 1) vector = torch.Tensor(list(map(float, vector.split()))) vector_dict[word] = vector else: logging.info(f"Load pretrained embedding from torchtext.") # Adapted from https://pytorch.org/text/0.9.0/_modules/torchtext/vocab.html#Vocab.load_vectors. if embed_file not in pretrained_aliases: raise ValueError( "Got embed_file {}, but allowed pretrained " "vectors are {}".format(embed_file, list(pretrained_aliases.keys())) ) # Hotfix: Glove URLs are outdated in Torchtext # (https://github.com/pytorch/text/blob/main/torchtext/vocab/vectors.py#L213-L217) pretrained_cls = pretrained_aliases[embed_file] if embed_file.startswith("glove"): for name, url in pretrained_cls.func.url.items(): file_name = url.split("/")[-1] pretrained_cls.func.url[name] = f"https://huggingface.co/stanfordnlp/glove/resolve/main/{file_name}" vector_dict = pretrained_cls(cache=cache) embed_size = vector_dict.dim embedding_weights = torch.zeros(len(word_dict), embed_size) if load_embedding_from_file: # Add UNK embedding # AttentionXML: np.random.uniform(-1.0, 1.0, embed_size) # CAML: np.random.randn(embed_size) unk_vector = torch.randn(embed_size) embedding_weights[word_dict[UNK]] = unk_vector # Store pretrained word embedding vec_counts = 0 for word in word_dict.get_itos(): # The condition can be used to process the word that does not in the embedding file. # Note that torchtext vector object has already dealt with this, # so we can directly make a query without addtional handling. if (load_embedding_from_file and word in vector_dict) or not load_embedding_from_file: embedding_weights[word_dict[word]] = vector_dict[word] vec_counts += 1 logging.info(f"loaded {vec_counts}/{len(word_dict)} word embeddings") return embedding_weights