import h5py import numpy as np import torch from torch.utils.data import Dataset, DataLoader import pandas as pd ALPHABET_UNMOD = { "_": 0, "A": 1, "C": 2, "D": 3, "E": 4, "F": 5, "G": 6, "H": 7, "I": 8, "K": 9, "L": 10, "M": 11, "N": 12, "P": 13, "Q": 14, "R": 15, "S": 16, "T": 17, "V": 18, "W": 19, "Y": 20, "CaC": 21, "OxM": 22 } def padding(dataframe, columns, length): def pad(x): return x + (length - len(x) + 2 * x.count('-')) * '_' for i in range(len(dataframe)): if len(dataframe[columns][i]) > length + 2 * dataframe[columns][i].count('-'): dataframe.drop(i) dataframe[columns] = dataframe[columns].map(pad) for i in range(len(dataframe)): if len(dataframe[columns][i]) > length: dataframe.drop(i) def alphabetical_to_numerical(seq): num = [] dec = 0 for i in range(len(seq) - 2 * seq.count('-')): if seq[i + dec] != '-': num.append(ALPHABET_UNMOD[seq[i + dec]]) else: if seq[i + dec + 1:i + dec + 4] == 'CaC': num.append(21) elif seq[i + dec + 1:i + dec + 4] == 'OxM': num.append(22) else: raise 'Modification not supported' dec += 4 return num class RT_Dataset(Dataset): def __init__(self, size, data_source, mode, length, format='iRT'): print('Data loader Initialisation') self.data = pd.read_csv(data_source) self.mode = mode self.format = format print('Selecting data') if mode == 'train': self.data = self.data[self.data.state == 'train'] elif mode == 'test': self.data = self.data[self.data.state == 'holdout'] elif mode == 'validation': self.data = self.data[self.data.state == 'validation'] if size is not None: self.data = self.data.sample(size) print('Padding') self.data['sequence'] = self.data['sequence'].str.pad(length, side='right', fillchar='_') self.data = self.data.drop(self.data[self.data['sequence'].map(len) > length].index) print('Converting') self.data['sequence'] = self.data['sequence'].map(alphabetical_to_numerical) self.data = self.data.reset_index() def __getitem__(self, index: int): seq = self.data['sequence'][index] if self.format == 'RT': label = self.data['retention_time'][index] if self.format == 'iRT': label = self.data['irt'][index] if self.format == 'iRT_scaled': label = self.data['iRT_scaled'][index] if self.format == 'score': label = self.data['score'][index] return torch.tensor(seq), torch.tensor(label).float() def __len__(self) -> int: return self.data.shape[0] def load_data(batch_size, data_sources, n_train=None, n_test=None, length=30): print('Loading data') train = RT_Dataset(n_train, data_sources[0], 'train', length) test = RT_Dataset(n_test, data_sources[1], 'test', length) val = RT_Dataset(n_test, data_sources[2], 'validation', length) train_loader = DataLoader(train, batch_size=batch_size, shuffle=True) test_loader = DataLoader(test, batch_size=batch_size, shuffle=True) val_loader = DataLoader(val, batch_size=batch_size, shuffle=True) return train_loader, val_loader, test_loader class H5ToStorage(): def __init__(self, hdf_path): self.path = hdf_path self.classes = [] with h5py.File(hdf_path, 'r') as hf: for class_ in hf: self.classes.append(class_) def get_class(self): return self.classes def make_npy_file(self, f_name, column): with h5py.File(self.path, 'r') as hf: data = hf[column] np.save(f_name, data) def load_split_intensity(sources, batch_size, split=(0.5, 0.25, 0.25)): assert sum(split) == 1, 'Wrong split argument' seq = np.load(sources[0]) intensity = np.load(sources[1]) energy = np.load(sources[2]) precursor_charge = np.load(sources[3]) len = np.shape(energy)[0] ind1 = int(np.floor(len * split[0])) ind2 = int(np.floor(len * (split[0] + split[1]))) train = (seq[:ind1], intensity[:ind1], energy[:ind1], precursor_charge[:ind1]) validation = ( seq[ind1:ind2], intensity[ind1:ind2], energy[ind1:ind2], precursor_charge[ind1:ind2]) test = (seq[ind2:], intensity[ind2:], energy[ind2:], precursor_charge[ind2:]) train = Intentsity_Dataset(train) test = Intentsity_Dataset(test) validation = Intentsity_Dataset(validation) train = DataLoader(train, batch_size=batch_size) test = DataLoader(test, batch_size=batch_size) validation = DataLoader(validation, batch_size=batch_size) return train, validation, test def load_intensity_from_files(f_seq, f_intentsity, f_energy, f_percursor_charge, batch_size): seq = np.load(f_seq, ) intensity = np.load(f_intentsity) energy = np.load(f_energy) precursor_charge = np.load(f_percursor_charge) data = (seq, intensity, energy, precursor_charge) dataset = Intentsity_Dataset(data) loader = DataLoader(dataset, batch_size=batch_size) return loader def load_intensity_df_from_files(f_seq, f_intentsity, f_energy, f_percursor_charge): seq = np.load(f_seq, ) intensity = np.load(f_intentsity) energy = np.load(f_energy) precursor_charge = np.load(f_percursor_charge) data = (seq, intensity, energy, precursor_charge) dataset = Intentsity_Dataset(data) return dataset class Intentsity_Dataset(Dataset): def __init__(self, data): self.data = data self.seq = data[0] self.intensity = data[1] self.energy = data[2] self.precursor_charge = data[3] def __len__(self): return len(self.seq) def __getitem__(self, idx): return torch.tensor(self.seq[idx]), torch.tensor([self.energy[idx]]).float(), torch.tensor( self.precursor_charge[idx]), torch.tensor(self.intensity[idx]).float() # storage = H5ToStorage('database/holdout_hcd.hdf5') # storage.make_npy_file('data/intensity/method.npy','method') # storage.make_npy_file('data/intensity/sequence_header.npy','sequence_integer') # storage.make_npy_file('data/intensity/intensity_header.npy', 'intensities_raw') # storage.make_npy_file('data/intensity/collision_energy_header.npy', 'collision_energy_aligned_normed') # storage.make_npy_file('data/intensity/precursor_charge_header.npy', 'precursor_charge_onehot')