Skip to content
Snippets Groups Projects
dataloader.py 6.50 KiB
import h5py
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd

ALPHABET_UNMOD = {
    "_": 0,
    "A": 1,
    "C": 2,
    "D": 3,
    "E": 4,
    "F": 5,
    "G": 6,
    "H": 7,
    "I": 8,
    "K": 9,
    "L": 10,
    "M": 11,
    "N": 12,
    "P": 13,
    "Q": 14,
    "R": 15,
    "S": 16,
    "T": 17,
    "V": 18,
    "W": 19,
    "Y": 20,
    "CaC": 21,
    "OxM": 22
}


def padding(dataframe, columns, length):
    def pad(x):
        return x + (length - len(x) + 2 * x.count('-')) * '_'

    for i in range(len(dataframe)):
        if len(dataframe[columns][i]) > length + 2 * dataframe[columns][i].count('-'):
            dataframe.drop(i)
    dataframe[columns] = dataframe[columns].map(pad)
    for i in range(len(dataframe)):
        if len(dataframe[columns][i]) > length:
            dataframe.drop(i)


def alphabetical_to_numerical(seq):
    num = []
    dec = 0
    for i in range(len(seq) - 2 * seq.count('-')):
        if seq[i + dec] != '-':
            num.append(ALPHABET_UNMOD[seq[i + dec]])
        else:
            if seq[i + dec + 1:i + dec + 4] == 'CaC':
                num.append(21)
            elif seq[i + dec + 1:i + dec + 4] == 'OxM':
                num.append(22)
            else:
                raise 'Modification not supported'
            dec += 4
    return num


class RT_Dataset(Dataset):

    def __init__(self, size, data_source, mode, length, format='iRT'):
        print('Data loader Initialisation')
        self.data = pd.read_csv(data_source)

        self.mode = mode
        self.format = format

        print('Selecting data')
        if mode == 'train':
            self.data = self.data[self.data.state == 'train']
        elif mode == 'test':
            self.data = self.data[self.data.state == 'holdout']
        elif mode == 'validation':
            self.data = self.data[self.data.state == 'validation']
        if size is not None:
            self.data = self.data.sample(size)

        print('Padding')
        self.data['sequence'] = self.data['sequence'].str.pad(length, side='right', fillchar='_')
        self.data = self.data.drop(self.data[self.data['sequence'].map(len) > length].index)

        print('Converting')
        self.data['sequence'] = self.data['sequence'].map(alphabetical_to_numerical)

        self.data = self.data.reset_index()

    def __getitem__(self, index: int):
        seq = self.data['sequence'][index]
        if self.format == 'RT':
            label = self.data['retention_time'][index]
        if self.format == 'iRT':
            label = self.data['irt'][index]
        if self.format == 'iRT_scaled':
            label = self.data['iRT_scaled'][index]
        if self.format == 'score':
            label = self.data['score'][index]
        return torch.tensor(seq), torch.tensor(label).float()

    def __len__(self) -> int:
        return self.data.shape[0]


def load_data(batch_size, data_sources, n_train=None, n_test=None, length=30):
    print('Loading data')
    train = RT_Dataset(n_train, data_sources[0], 'train', length)
    test = RT_Dataset(n_test, data_sources[1], 'test', length)
    val = RT_Dataset(n_test, data_sources[2], 'validation', length)
    train_loader = DataLoader(train, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val, batch_size=batch_size, shuffle=True)

    return train_loader, val_loader, test_loader


class H5ToStorage():
    def __init__(self, hdf_path):
        self.path = hdf_path

        self.classes = []
        with h5py.File(hdf_path, 'r') as hf:
            for class_ in hf:
                self.classes.append(class_)

    def get_class(self):
        return self.classes

    def make_npy_file(self, f_name, column):
        with h5py.File(self.path, 'r') as hf:
            data = hf[column]
            np.save(f_name, data)


def load_split_intensity(sources, batch_size, split=(0.5, 0.25, 0.25)):
    assert sum(split) == 1, 'Wrong split argument'
    seq = np.load(sources[0])
    intensity = np.load(sources[1])
    energy = np.load(sources[2])
    precursor_charge = np.load(sources[3])

    len = np.shape(energy)[0]
    ind1 = int(np.floor(len * split[0]))
    ind2 = int(np.floor(len * (split[0] + split[1])))
    train = (seq[:ind1], intensity[:ind1], energy[:ind1], precursor_charge[:ind1])
    validation = (
        seq[ind1:ind2], intensity[ind1:ind2], energy[ind1:ind2], precursor_charge[ind1:ind2])
    test = (seq[ind2:], intensity[ind2:], energy[ind2:], precursor_charge[ind2:])

    train = Intentsity_Dataset(train)
    test = Intentsity_Dataset(test)
    validation = Intentsity_Dataset(validation)
    train = DataLoader(train, batch_size=batch_size)
    test = DataLoader(test, batch_size=batch_size)
    validation = DataLoader(validation, batch_size=batch_size)

    return train, validation, test


def load_intensity_from_files(f_seq, f_intentsity, f_energy, f_percursor_charge, batch_size):
    seq = np.load(f_seq, )
    intensity = np.load(f_intentsity)
    energy = np.load(f_energy)
    precursor_charge = np.load(f_percursor_charge)
    data = (seq, intensity, energy, precursor_charge)
    dataset = Intentsity_Dataset(data)
    loader = DataLoader(dataset, batch_size=batch_size)
    return loader

def load_intensity_df_from_files(f_seq, f_intentsity, f_energy, f_percursor_charge):
    seq = np.load(f_seq, )
    intensity = np.load(f_intentsity)
    energy = np.load(f_energy)
    precursor_charge = np.load(f_percursor_charge)
    data = (seq, intensity, energy, precursor_charge)
    dataset = Intentsity_Dataset(data)
    return dataset

class Intentsity_Dataset(Dataset):

    def __init__(self, data):
        self.data = data
        self.seq = data[0]
        self.intensity = data[1]
        self.energy = data[2]
        self.precursor_charge = data[3]

    def __len__(self):
        return len(self.seq)

    def __getitem__(self, idx):
        return torch.tensor(self.seq[idx]), torch.tensor([self.energy[idx]]).float(), torch.tensor(
            self.precursor_charge[idx]), torch.tensor(self.intensity[idx]).float()

# storage = H5ToStorage('database/holdout_hcd.hdf5')
# storage.make_npy_file('data/intensity/method.npy','method')
# storage.make_npy_file('data/intensity/sequence_header.npy','sequence_integer')
# storage.make_npy_file('data/intensity/intensity_header.npy', 'intensities_raw')
# storage.make_npy_file('data/intensity/collision_energy_header.npy', 'collision_energy_aligned_normed')
# storage.make_npy_file('data/intensity/precursor_charge_header.npy', 'precursor_charge_onehot')