-
Schneider Leo authored5d66520d
dataloader.py 6.49 KiB
import h5py
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd
ALPHABET_UNMOD = {
"_": 0,
"A": 1,
"C": 2,
"D": 3,
"E": 4,
"F": 5,
"G": 6,
"H": 7,
"I": 8,
"K": 9,
"L": 10,
"M": 11,
"N": 12,
"P": 13,
"Q": 14,
"R": 15,
"S": 16,
"T": 17,
"V": 18,
"W": 19,
"Y": 20,
"CaC": 21,
"OxM": 22
}
def padding(dataframe, columns, length):
def pad(x):
return x + (length - len(x) + 2 * x.count('-')) * '_'
for i in range(len(dataframe)):
if len(dataframe[columns][i]) > length + 2 * dataframe[columns][i].count('-'):
dataframe.drop(i)
dataframe[columns] = dataframe[columns].map(pad)
for i in range(len(dataframe)):
if len(dataframe[columns][i]) > length:
dataframe.drop(i)
def alphabetical_to_numerical(seq):
num = []
dec = 0
for i in range(len(seq) - 2 * seq.count('-')):
if seq[i + dec] != '-':
num.append(ALPHABET_UNMOD[seq[i + dec]])
else:
if seq[i + dec + 1:i + dec + 4] == 'CaC':
num.append(21)
elif seq[i + dec + 1:i + dec + 4] == 'OxM':
num.append(22)
else:
raise 'Modification not supported'
dec += 4
return num
class RT_Dataset(Dataset):
def __init__(self, size, data_source, mode, length, format='iRT'):
print('Data loader Initialisation')
self.data = pd.read_csv(data_source)
self.mode = mode
self.format = format
print('Selecting data')
if mode == 'train':
self.data = self.data[self.data.state == 'train']
elif mode == 'test':
self.data = self.data[self.data.state == 'holdout']
elif mode == 'validation':
self.data = self.data[self.data.state == 'validation']
if size is not None:
self.data = self.data.sample(size)
print('Padding')
self.data['sequence'] = self.data['sequence'].str.pad(length, side='right', fillchar='_')
self.data = self.data.drop(self.data[self.data['sequence'].map(len) > length].index)
print('Converting')
self.data['sequence'] = self.data['sequence'].map(alphabetical_to_numerical)
self.data = self.data.reset_index()
def __getitem__(self, index: int):
seq = self.data['sequence'][index]
if self.format == 'RT':
label = self.data['retention_time'][index]
if self.format == 'iRT':
label = self.data['irt'][index]
if self.format == 'iRT_scaled':
label = self.data['iRT_scaled'][index]
if self.format == 'score':
label = self.data['score'][index]
return torch.tensor(seq), torch.tensor(label).float()
def __len__(self) -> int:
return self.data.shape[0]
def load_data(batch_size, data_sources, n_train=None, n_test=None, length=30):
print('Loading data')
train = RT_Dataset(n_train, data_sources[0], 'train', length)
test = RT_Dataset(n_test, data_sources[1], 'test', length)
val = RT_Dataset(n_test, data_sources[2], 'test', length)
train_loader = DataLoader(train, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val, batch_size=batch_size, shuffle=True)
return train_loader, val_loader, test_loader
class H5ToStorage():
def __init__(self, hdf_path):
self.path = hdf_path
self.classes = []
with h5py.File(hdf_path, 'r') as hf:
for class_ in hf:
self.classes.append(class_)
def get_class(self):
return self.classes
def make_npy_file(self, f_name, column):
with h5py.File(self.path, 'r') as hf:
data = hf[column]
np.save(f_name, data)
def load_split_intensity(sources, batch_size, split=(0.5, 0.25, 0.25)):
assert sum(split) == 1, 'Wrong split argument'
seq = np.load(sources[0])
intensity = np.load(sources[1])
energy = np.load(sources[2])
precursor_charge = np.load(sources[3])
len = np.shape(energy)[0]
ind1 = int(np.floor(len * split[0]))
ind2 = int(np.floor(len * (split[0] + split[1])))
train = (seq[:ind1], intensity[:ind1], energy[:ind1], precursor_charge[:ind1])
validation = (
seq[ind1:ind2], intensity[ind1:ind2], energy[ind1:ind2], precursor_charge[ind1:ind2])
test = (seq[ind2:], intensity[ind2:], energy[ind2:], precursor_charge[ind2:])
train = Intentsity_Dataset(train)
test = Intentsity_Dataset(test)
validation = Intentsity_Dataset(validation)
train = DataLoader(train, batch_size=batch_size)
test = DataLoader(test, batch_size=batch_size)
validation = DataLoader(validation, batch_size=batch_size)
return train, validation, test
def load_intensity_from_files(f_seq, f_intentsity, f_energy, f_percursor_charge, batch_size):
seq = np.load(f_seq, )
intensity = np.load(f_intentsity)
energy = np.load(f_energy)
precursor_charge = np.load(f_percursor_charge)
data = (seq, intensity, energy, precursor_charge)
dataset = Intentsity_Dataset(data)
loader = DataLoader(dataset, batch_size=batch_size)
return loader
def load_intensity_df_from_files(f_seq, f_intentsity, f_energy, f_percursor_charge):
seq = np.load(f_seq, )
intensity = np.load(f_intentsity)
energy = np.load(f_energy)
precursor_charge = np.load(f_percursor_charge)
data = (seq, intensity, energy, precursor_charge)
dataset = Intentsity_Dataset(data)
return dataset
class Intentsity_Dataset(Dataset):
def __init__(self, data):
self.data = data
self.seq = data[0]
self.intensity = data[1]
self.energy = data[2]
self.precursor_charge = data[3]
def __len__(self):
return len(self.seq)
def __getitem__(self, idx):
return torch.tensor(self.seq[idx]), torch.tensor([self.energy[idx]]).float(), torch.tensor(
self.precursor_charge[idx]), torch.tensor(self.intensity[idx]).float()
# storage = H5ToStorage('database/holdout_hcd.hdf5')
# storage.make_npy_file('data/intensity/method.npy','method')
# storage.make_npy_file('data/intensity/sequence_header.npy','sequence_integer')
# storage.make_npy_file('data/intensity/intensity_header.npy', 'intensities_raw')
# storage.make_npy_file('data/intensity/collision_energy_header.npy', 'collision_energy_aligned_normed')
# storage.make_npy_file('data/intensity/precursor_charge_header.npy', 'precursor_charge_onehot')