From 282e563e9740e5177a1978e7cd5e87a1091489a8 Mon Sep 17 00:00:00 2001
From: lschneider <leo.schneider@univ-lyon1.fr>
Date: Thu, 25 Jan 2024 15:49:18 +0100
Subject: [PATCH] dlomix data

---
 .gitignore                          |   2 +-
 dlomix/data/AbstractDataset.py      | 398 ++++++++++++++++++++++++++++
 dlomix/data/IntensityDataset.py     | 385 +++++++++++++++++++++++++++
 dlomix/data/RetentionTimeDataset.py | 370 ++++++++++++++++++++++++++
 dlomix/data/__init__.py             |  15 ++
 dlomix/data/feature_extractors.py   | 198 ++++++++++++++
 dlomix/data/parsers.py              | 106 ++++++++
 dlomix/data/reader_utils.py         |  52 ++++
 8 files changed, 1525 insertions(+), 1 deletion(-)
 create mode 100644 dlomix/data/AbstractDataset.py
 create mode 100644 dlomix/data/IntensityDataset.py
 create mode 100644 dlomix/data/RetentionTimeDataset.py
 create mode 100644 dlomix/data/__init__.py
 create mode 100644 dlomix/data/feature_extractors.py
 create mode 100644 dlomix/data/parsers.py
 create mode 100644 dlomix/data/reader_utils.py

diff --git a/.gitignore b/.gitignore
index a704ac2..a583428 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,4 +4,4 @@
 /dataset/
 /test.py
 /database/
-/dlomix/data/
+
diff --git a/dlomix/data/AbstractDataset.py b/dlomix/data/AbstractDataset.py
new file mode 100644
index 0000000..1117ce7
--- /dev/null
+++ b/dlomix/data/AbstractDataset.py
@@ -0,0 +1,398 @@
+import abc
+from os.path import abspath, dirname
+
+import numpy as np
+import pandas as pd
+import tensorflow as tf
+
+from ..constants import DEFAULT_PARQUET_ENGINE
+from ..utils import lower_and_trim_strings
+from .parsers import ProformaParser
+from .reader_utils import read_json_file, read_parquet_file_pandas
+
+# what characterizes a datasets -->
+#   1. reading mode (string, CSV, json, parquet, in-memory, etc..)
+#   2. inputs (define sequence column name and additional existing feature names)
+#   3. features to extract --> abstracted out in featureextractors list
+#   4. outputs --> targets to use (names of column or key name in a dict)
+
+# 1. identify reading mode
+#    and call static readerclass that take a data source and return a DataFrame (later consider other data structures)
+# 2. pick inputs from the data after reader has finished, maintain the inputs dict
+# 3. pick targets from the data after the reader has finished, maintain the targets dict
+# 4. run feature extractors based on input sequences, maintain features dict
+# 5. build TF Datasets accordingly
+
+# Consider collecting member variables related to the sequences in a named tuple (sequence, mod, n_term, c_term, etc..)
+
+# consider making the dataset object iterable --> iterate over main split tf dataset
+
+
+class AbstractDataset(abc.ABC):
+    r"""Base class for datasets.
+
+    Parameters
+    -----------
+    data_source : str, tuple of two numpy.ndarray, numpy.ndarray, optional
+        source can be a tuple of two arrays (sequences, targets), single array (sequences), useful for test data, or a str with a file path to a csv file. Defaults to None.
+    sep : str, optional
+        separator to be used if the data source is a CSV file. Defaults to ",".
+    sequence_col :  str, optional
+        name of the column containing the sequences in the provided CSV. Defaults to "sequence".
+    target_col : str, optional
+        name of the column containing the targets (indexed retention time). Defaults to "irt".
+    feature_cols : list, optional
+        a list of columns containing other features that can be used later as inputs to a model. Defaults to None.
+    seq_length : int, optional
+        the sequence length to be used, where all sequences will be padded to this length, longer sequences will be removed and not truncated. Defaults to 0.
+    parser : str, optional
+        name of the parser to use. Available parsers are in `dlomix.data.parsers.py`. Defaults to None; no parsing to be done on the sequence (works for unmodified sequences).
+    features_to_extract: list(dlomix.data.feature_extractors.SequenceFeatureExtractor), optional
+        List of feature extractor objects. Defaults to None; no features to extract.
+    batch_size : int, optional
+        the batch size to be used for consuming the dataset in training a model. Defaults to 32.
+    val_ratio : int, optional
+        a fraction to determine the size of the validation data (0.2 = 20%). Defaults to 0.
+    seed: int, optional
+        a seed to use for splitting the data to allow for a reproducible split. Defaults to 21.
+    test :bool, optional
+        a boolean whether the dataset is a test dataset or not. Defaults to False.
+    sample_run : bool, optional
+        a boolean to limit the number of examples to a small number, SAMPLE_RUN_N, for testing and debugging purposes. Defaults to False.
+    """
+
+    ATOM_TABLE = None
+    SPLIT_NAMES = ["train", "val", "test"]
+    BATCHES_TO_PREFETCH = tf.data.AUTOTUNE
+
+    SAMPLE_RUN_N = 100
+    METADATA_KEY = "metadata"
+    PARAMS_KEY = "parameters"
+    ANNOTATIONS_KEY = "annotations"
+    TARGET_NAME_KEY = "target_column_key"
+    SEQUENCE_COLUMN_KEY = "sequence_column_key"
+
+    def __init__(
+        self,
+        data_source,
+        sep,
+        sequence_col,
+        target_col,
+        feature_cols=None,
+        seq_length=0,
+        parser=None,
+        features_to_extract=None,
+        batch_size=32,
+        val_ratio=0,
+        path_aminoacid_atomcounts=None,
+        seed=21,
+        test=False,
+        sample_run=False,
+    ):
+        super(AbstractDataset, self).__init__()
+        np.random.seed(seed)
+
+        self.seed = seed
+        np.random.seed(self.seed)
+
+        self.data_source = data_source
+        self.sep = sep
+        self.sequence_col = sequence_col.lower()
+        self.target_col = target_col.lower()
+
+        if feature_cols:
+            self.feature_cols = lower_and_trim_strings(feature_cols)
+        else:
+            self.feature_cols = []
+
+        self.sample_run = sample_run
+
+        # if seq_length is 0 (default) -> no padding
+        self.seq_length = seq_length
+        self.parser = parser
+        self.features_to_extract = features_to_extract
+
+        self._data_mean = 0
+        self._data_std = 1
+
+        self.batch_size = batch_size
+        self.val_ratio = val_ratio
+        self.testing_mode = test
+
+        # main split is "train" if not in testing mode, otherwise "test"
+        self.main_split = (
+            AbstractDataset.SPLIT_NAMES[0]
+            if not self.testing_mode
+            else AbstractDataset.SPLIT_NAMES[2]
+        )
+
+        # initialize TF Datasets dict
+        self.tf_dataset = (
+            {self.main_split: None, AbstractDataset.SPLIT_NAMES[1]: None}
+            if val_ratio != 0
+            else {self.main_split: None}
+        )
+
+        self.indicies_dict = (
+            {self.main_split: None, AbstractDataset.SPLIT_NAMES[1]: None}
+            if val_ratio != 0
+            else {self.main_split: None}
+        )
+
+        # if path to counts lookup table is provided, include count features, otherwise not
+        self.include_count_features = True if path_aminoacid_atomcounts else False
+
+        if self.include_count_features:
+            self.aminoacid_atom_counts_csv_path = (
+                path_aminoacid_atomcounts  # "../lookups/aa_comp_rel.csv"
+            )
+            self._init_atom_table()
+
+        self._resolve_parser()
+
+        self.sequences = None
+        self.unmodified_sequences = None
+        self.modifications = None
+        self.n_term_modifications = None
+        self.c_term_modifications = None
+
+        self.sequence_features = None
+        self.sequence_features_names = None
+
+    def _resolve_parser(self):
+        if self.parser is None:
+            return
+        elif self.parser == "proforma":
+            self.parser = ProformaParser()
+        else:
+            raise ValueError(
+                f"Invalid parser provided {self.parser}. For a list of available parsers, check dlomix.data.parsers.py"
+            )
+
+    def _parse_sequences(self):
+        (
+            self.sequences,
+            self.modifications,
+            self.n_term_modifications,
+            self.c_term_modifications,
+        ) = self.parser.parse_sequences(self.sequences)
+
+    def _resolve_string_data_path(self):
+        is_json_file = self.data_source.endswith(".json")
+
+        if is_json_file:
+            json_file_base_dir = dirname(abspath(self.data_source))
+            self.data_source = read_json_file(self.data_source)
+            self._update_data_loading_for_json_format(json_file_base_dir)
+
+        is_parquet_url = ".parquet" in self.data_source and self.data_source.startswith(
+            "http"
+        )
+        is_parquet_file = self.data_source.endswith(".parquet")
+        is_csv_file = self.data_source.endswith(".csv")
+
+        if is_parquet_url or is_parquet_file:
+            df = read_parquet_file_pandas(self.data_source, DEFAULT_PARQUET_ENGINE)
+            return df
+        elif is_csv_file:
+            df = pd.read_csv(self.data_source)
+            return df
+        else:
+            raise ValueError(
+                "Invalid data source provided as a string, please provide a path to a csv, parquet, "
+                "or a json file."
+            )
+
+    def _extract_features(self):
+        if self.features_to_extract:
+            self.sequence_features = []
+            self.sequence_features_names = []
+            for feature_class in self.features_to_extract:
+                print("Extracting feature: ", feature_class)
+                extractor_class = feature_class
+                feature_array = np.array(
+                    extractor_class.extract_all(
+                        self.sequences,
+                        self.modifications,
+                        self.seq_length if extractor_class.pad_to_seq_length else 0,
+                    ),
+                    dtype=np.float32,
+                )
+                # ensure an extra (1) dimension is added for later concatentiona
+                # this can be done later in tensorflow in the model as well, better ?
+                # what shapes of features could exist (BATCH X SEQ_LENGTH X 6), (BATCH X SEQ_LENGTH X 1)
+                if (
+                    feature_array.ndim < 3
+                    and feature_array.shape[-1] == self.seq_length
+                ):
+                    feature_array = np.expand_dims(feature_array, axis=-1)
+                self.sequence_features.append(feature_array)
+                self.sequence_features_names.append(
+                    extractor_class.__class__.__name__.lower()
+                )
+
+    def _reshape_sequence_feature_arrays(self):
+        pass
+
+    def get_examples_at_indices(self, examples, split):
+        if isinstance(examples, np.ndarray):
+            return examples[self.indicies_dict[split]]
+        # to handle features
+        if isinstance(examples, list):
+            return [
+                examples_single[self.indicies_dict[split]]
+                for examples_single in examples
+            ]
+        raise ValueError(
+            f"Provided data structure to subset for examples at split indices is neither a list nor a numpy array, but rather a {type(examples)}."
+        )
+
+    def _init_atom_table(self):
+        atom_counts = pd.read_csv(self.aminoacid_atom_counts_csv_path)
+        atom_counts = atom_counts.astype(str)
+
+        keys_tensor = tf.constant(atom_counts["aa"].values)
+        values_tensor = tf.constant(
+            ["_".join(c) for c in list(atom_counts.iloc[:, 1:].values)]
+        )
+        init = tf.lookup.KeyValueTensorInitializer(keys_tensor, values_tensor)
+        AbstractDataset.ATOM_TABLE = tf.lookup.StaticHashTable(
+            init, default_value="0_0_0_0_0"
+        )
+
+    @abc.abstractmethod
+    def load_data(self, data):
+        """load data from source and populate numpy arrays to use for tf.Dataset
+
+        Args:
+            data (str, tuple, dict): Path to csv or parquet file, tuple with numpy arrays, or a dict with keys
+            `AbstractDataset.METADATA_KEY`, `AbstractDataset.PARAMS_KEY`,
+            `AbstractDataset.TARGET_NAME_KEY`, `AbstractDataset.SEQUENCE_COLUMN_KEY`.
+        """
+        raise NotImplementedError("Not implemented")
+
+    @abc.abstractmethod
+    def _update_data_loading_for_json_format(self, base_dir):
+        raise NotImplementedError("Not implemented")
+
+    @abc.abstractmethod
+    def _build_tf_dataset(self):
+        """Build the tf.Dataset object for available splits using the data loaded by `load_data`.
+        Example:
+        `for split in self.tf_dataset.keys():
+            self.tf_dataset[split] = tf.data.Dataset.from_tensor_slices(
+                (self.inputs, self.outputs)
+            )`
+        """
+        raise NotImplementedError("Not implemented")
+
+    @abc.abstractmethod
+    def _preprocess_tf_dataset(self):
+        """Add processing logic (tensorflow functions) to apply to all tf.Datasets."""
+        raise NotImplementedError("Not implemented")
+
+    @abc.abstractmethod
+    def get_split_targets(self, split="val"):
+        """Retrieve all targets (original labels) for a specific split (dependent on the task at hand)
+        Args:
+            split (str, optional): Name of the split, check `AbstractDataset.SPLIT_NAMES`. Defaults to "val".
+        """
+        raise NotImplementedError("Not implemented")
+
+    @staticmethod
+    @abc.abstractmethod
+    def _convert_inputs_to_dict(inputs, target):
+        """Collect all inputs into a python dict with corresponding keys.
+            When multiple inputs are used,this function is used at the beginning of the pre-processing
+            of TF.Datasets.
+
+        Args:
+            inputs (tuple(tf.Tensor)): tuple of input tensors
+            target (tf.Tensor): target label tensor
+        """
+        raise NotImplementedError("Not implemented")
+
+    def _pad_sequences(self, inputs, target):
+        if isinstance(inputs, dict):
+            inputs["sequence"] = self._pad_seq(inputs["sequence"])
+            return inputs, target
+        else:
+            return self._pad_seq(inputs), target
+
+    def _pad_seq(self, seq):
+        pad_len = tf.abs(self.seq_length - tf.size(seq))
+        paddings = tf.concat([[0], [pad_len]], axis=0)
+        seq = tf.pad(seq, [paddings], "CONSTANT")
+        seq.set_shape([self.seq_length])
+        return seq
+
+    def _split_sequence(self, inputs, target):
+        if isinstance(inputs, dict):
+            inputs["sequence"] = tf.strings.bytes_split(inputs["sequence"])
+            return inputs, target
+        else:
+            inputs = tf.strings.bytes_split(inputs)
+        return inputs, target
+
+    def _generate_single_counts(self, inputs, target):
+        inputs["counts"] = tf.map_fn(
+            lambda x: AbstractDataset.ATOM_TABLE.lookup(x), inputs["sequence"]
+        )
+        inputs["counts"] = tf.map_fn(
+            lambda x: tf.strings.split(x, sep="_"), inputs["counts"]
+        )
+        inputs["counts"] = tf.strings.to_number(inputs["counts"])
+        inputs["counts"].set_shape([self.seq_length, 5])
+
+        return inputs, target
+
+    def _generate_di_counts(self, inputs, target):
+        # add every two neighboring elements without overlap [0 0 1 1 2 2 .... pad_length/2 pad_length/2]
+        segments_to_add = [i // 2 for i in range(self.seq_length)]
+        inputs["di_counts"] = tf.math.segment_sum(
+            inputs["counts"], tf.constant(segments_to_add)
+        )
+        inputs["di_counts"].set_shape([self.seq_length // 2, 5])
+
+        return inputs, target
+
+    def _get_tf_dataset(self, split=None):
+        assert (
+            split in self.tf_dataset.keys()
+        ), f"Requested data split {split} is not available, available splits are {self.tf_dataset.keys()}"
+        if split in self.tf_dataset.keys():
+            return self.tf_dataset[split]
+        return self.tf_dataset
+
+    @property
+    def train_data(self):
+        """TensorFlow Dataset object for the training data"""
+        return self._get_tf_dataset(AbstractDataset.SPLIT_NAMES[0])
+
+    @property
+    def val_data(self):
+        """TensorFlow Dataset object for the validation data"""
+        return self._get_tf_dataset(AbstractDataset.SPLIT_NAMES[1])
+
+    @property
+    def test_data(self):
+        """TensorFlow Dataset object for the test data"""
+        return self._get_tf_dataset(AbstractDataset.SPLIT_NAMES[2])
+
+    @property
+    def data_mean(self):
+        """Mean value of the targets"""
+        return self._data_mean
+
+    @property
+    def data_std(self):
+        """Standard deviation value of the targets"""
+        return self._data_std
+
+    @data_mean.setter
+    def data_mean(self, value):
+        self._data_mean = value
+
+    @data_std.setter
+    def data_std(self, value):
+        self._data_std = value
diff --git a/dlomix/data/IntensityDataset.py b/dlomix/data/IntensityDataset.py
new file mode 100644
index 0000000..1ad9e7d
--- /dev/null
+++ b/dlomix/data/IntensityDataset.py
@@ -0,0 +1,385 @@
+from os.path import dirname, join
+
+import numpy as np
+import tensorflow as tf
+
+from ..utils import convert_nested_list_to_numpy_array, flatten_dict_for_values
+from .AbstractDataset import AbstractDataset
+
+# take into consideration if the pandas dataframe is pickled or not and then call read_pickle instead of read_csv
+# allow the possiblity to have three different dataset objects, one for train, val, and test
+
+
+class IntensityDataset(AbstractDataset):
+    r"""A dataset class for Intensity prediction tasks. It initialize a dataset object wrapping tf.Dataset and some relevant preprocessing steps.
+
+    Parameters
+    -----------
+    data_source : str, tuple of two numpy.ndarray, numpy.ndarray, optional
+        source can be a tuple of two arrays (sequences, targets), single array (sequences), useful for test data, or a str with a file path to a csv file. Defaults to None.
+    sep : str, optional
+        separator to be used if the data source is a CSV file. Defaults to ",".
+    sequence_col :  str, optional
+        name of the column containing the sequences in the provided CSV. Defaults to "sequence".
+    target_col : str, optional
+        name of the column containing the targets (vector of intensities). Defaults to "intensities_raww".
+    feature_cols : list, optional
+        a list of columns containing other features that can be used later as inputs to a model. Defaults to None.
+    normalize_targets : bool, optional
+        a boolean whether to normalize the targets or not (subtract mean and divied by standard deviation). Defaults to False.
+    seq_length : int, optional
+        the sequence length to be used, where all sequences will be padded to this length, longer sequences will be removed and not truncated. Defaults to 0.
+    parser: Subclass of AbstractParser, optional
+        the parser to use to split amino acids and modifications. For more information, please see `dlomix.data.parsers`
+    batch_size : int, optional
+        the batch size to be used for consuming the dataset in training a model. Defaults to 32.
+    val_ratio : int, optional
+        a fraction to determine the size of the validation data (0.2 = 20%). Defaults to 0.
+    seed: int, optional
+        a seed to use for splitting the data to allow for a reproducible split. Defaults to 21.
+    test :bool, optional
+        a boolean whether the dataset is a test dataset or not. Defaults to False.
+    path_aminoacid_atomcounts : str, optional
+        a string with a path to a CSV table with the atom counts of the different amino acids (can be used for feature extraction). Defaults to None.
+    sample_run : bool, optional
+        a boolean to limit the number of examples to a small number, SAMPLE_RUN_N, for testing and debugging purposes. Defaults to False.
+    metadata_filtering_criteria : dict, optional
+        a dictionary with the filtering criteria (column names and conditions) to be used to filter the metadata. Defaults to None.
+    """
+
+    # TODO: For test dataset --> examples with longer sequences --> do not drop, add NaN for prediction
+
+    def __init__(
+        self,
+        data_source=None,
+        sep=",",
+        sequence_col="sequence",
+        collision_energy_col="collision_energy_aligned_normed",
+        precursor_charge_col="precursor_charge_onehot",
+        intensities_col="intensities",
+        feature_cols=None,
+        normalize_targets=False,
+        seq_length=0,
+        parser=None,
+        features_to_extract=None,
+        batch_size=32,
+        val_ratio=0,
+        seed=21,
+        test=False,
+        path_aminoacid_atomcounts=None,
+        sample_run=False,
+        metadata_filtering_criteria=None,
+    ):
+        super().__init__(
+            data_source,
+            sep,
+            sequence_col,
+            intensities_col,
+            feature_cols,
+            seq_length,
+            parser,
+            features_to_extract,
+            batch_size,
+            val_ratio,
+            path_aminoacid_atomcounts,
+            seed,
+            test,
+            sample_run,
+        )
+
+        self.collision_energy_col = collision_energy_col.lower()
+        self.precursor_charge_col = precursor_charge_col.lower()
+        self.intensities_col = self.target_col
+
+        self.metadata_filtering_criteria = metadata_filtering_criteria
+
+        self.normalize_targets = normalize_targets
+
+        self.no_intensities = self.testing_mode
+
+        self.sequences = None
+        self.collision_energy = None
+        self.precursor_charge = None
+        self.intensities = None
+
+        self.features_df = None
+        self.example_id = None
+
+        # if data is provided with the constructor call --> load, otherwise --> done
+        if self.data_source is not None:
+            self.load_data(data=data_source)
+
+    def load_data(self, data):
+        """Load data into the dataset object, can be used to load data at a later point after initialization.
+        This function triggers the whole pipeline of: data loading, validation (against sequence length), splitting, building TensorFlow dataset objects, and apply preprocessing.
+
+        :param data: a `str` with a file path to csv file
+        :return: None
+        """
+        self.data_source = data
+
+        self._read_data()
+        # consider removing lengthy sequences when no parser is passed
+
+        # Numpy & Pandas
+        if self.parser:
+            self._parse_sequences()
+            self._validate_remove_long_sequences()
+        if self.features_to_extract:
+            self._extract_features()
+        self._split_data()
+
+        # TF.Dataset
+        self._build_tf_dataset()
+        self._preprocess_tf_dataset()
+
+    """
+    numpy array --> either a tuple or a single array
+        - Tuple --> means (sequences, collision_energy, precursor_charge, intensities)
+        - single ndarray --> means sequences only, useful for test dataset
+    str --> path to csv file or compressed csv file
+    """
+
+    def _read_data(self):
+        if isinstance(self.data_source, tuple):
+            tuple_size_is_three_or_four = (
+                len(self.data_source) == 3 or len(self.data_source) == 4
+            )
+            if tuple_size_is_three_or_four:
+                tuple_elements_are_ndarray = all(
+                    [isinstance(x, np.ndarray) for x in self.data_source]
+                )
+                if tuple_elements_are_ndarray:
+                    self.sequences = self.data_source[0]
+                    self.collision_energy = self.data_source[1]
+                    self.precursor_charge = self.data_source[2]
+                    if len(self.data_source) == 4:
+                        self.intensities = self.data_source[3]
+                        self.no_intensities = False
+
+                    else:
+                        self.intensities = np.zeros()
+                        self.no_intensities = True
+            else:
+                raise ValueError(
+                    "If a tuple is provided, it has to have a length of 4 and all elements should be numpy arrays."
+                )
+        elif isinstance(self.data_source, str):
+            df = self._resolve_string_data_path()
+
+            # used only for testing with a smaller sample from a csv file
+            if self.sample_run:
+                df = df.head(IntensityDataset.SAMPLE_RUN_N)
+
+            # lower all column names
+            df.columns = [col_name.lower() for col_name in df.columns]
+
+            # retrieve columns from the dataframe
+            self.sequences = df[self.sequence_col]
+            self.collision_energy = df[self.collision_energy_col]
+            self.precursor_charge = df[self.precursor_charge_col]
+            self.intensities = df[self.intensities_col]
+
+            # parse strings into lists, for precursor charge and intensities
+            if isinstance(self.precursor_charge.iloc[0], str):
+                self.precursor_charge = self.precursor_charge.apply(eval)
+
+            if isinstance(self.intensities.iloc[0], str):
+                self.intensities = self.intensities.apply(eval)
+
+            # get numpy arrays with .values() for all inputs and intensities
+
+            self.sequences = self.sequences.values
+
+            # for concatenation later, we expand dimensions
+            self.collision_energy = self.collision_energy.values.reshape(-1, 1)
+
+            self.precursor_charge = convert_nested_list_to_numpy_array(
+                self.precursor_charge.values, dtype=np.float32
+            )
+            self.intensities = convert_nested_list_to_numpy_array(
+                self.intensities.values
+            )
+
+            self.features_df = df[self.feature_cols]
+        else:
+            raise ValueError(
+                "Data source has to be either a tuple of four numpy arrays,"
+                "or a string path to a csv file."
+            )
+
+        # give the index of the element as an ID for later reference if needed
+        self.example_id = list(range(len(self.sequences)))
+
+    def _update_data_loading_for_json_format(self, base_dir=None):
+        import prospectdataset as prospect
+
+        json_dict = self.data_source
+        meta_data_filepath = json_dict.get(IntensityDataset.METADATA_KEY, "")
+        annotation_data_value = json_dict.get(IntensityDataset.ANNOTATIONS_KEY, "")
+        annotations_filepaths = flatten_dict_for_values(annotation_data_value)
+
+        # meta data file is assumed to be in the same path as the json input file
+        if base_dir:
+            meta_data_filepath = join(base_dir, meta_data_filepath)
+            annotations_filepaths = [
+                join(base_dir, file) for file in annotations_filepaths
+            ]
+
+        # all annotation files are assumed to be in the same directory
+        if len(annotations_filepaths) > 0:
+            annotations_dir = dirname(annotations_filepaths[0])
+        else:
+            raise ValueError(
+                "No paths to annotation files were provided in the JSON file."
+            )
+
+        # ToDo: consider options to check if the files were processed earlier and skip this step since it is time consuming
+
+        # to pass metadata_filtering_criteria
+
+        print("Optionally Downloading and processing the data...")
+        print("Annotations directory: ", annotations_dir)
+
+        # fix directory path, use file names from the json file ???
+        print("Metadata filepath: ", meta_data_filepath)
+        print("Base directory: ", base_dir)
+
+        self.data_source = prospect.download_process_pool(
+            annotations_data_dir=annotations_dir,
+            metadata_path=meta_data_filepath,
+            save_filepath=join(base_dir, "processed_pool.parquet"),
+            metadata_filtering_criteria=self.metadata_filtering_criteria,
+        )
+
+        self.intensities_col = json_dict.get(IntensityDataset.PARAMS_KEY, {}).get(
+            IntensityDataset.TARGET_NAME_KEY, self.intensities_col
+        )
+        # ToDo: make dynamic based on parameters
+        self.sequence_col = "modified_sequence"
+
+    def _validate_remove_long_sequences(self) -> None:
+        """
+        Validate if all sequences are shorter than the padding length, otherwise drop them.
+        """
+        assert self.sequences.shape[0] > 0, "No sequences in the provided data."
+
+        # check if count of examples matches for all provided inputs
+        lengths = [
+            len(self.sequences),
+            len(self.collision_energy),
+            len(self.precursor_charge),
+        ]
+        if not self.no_intensities:
+            lengths = lengths + [len(self.intensities)]
+
+        assert np.all(
+            lengths == np.array(lengths[0])
+        ), "Count of examples does not match for sequences and targets."
+
+        limit = self.seq_length
+        vectorized_len = np.vectorize(lambda x: len(x))
+        mask = vectorized_len(self.sequences) <= limit
+        self.sequences = self.sequences[mask]
+        self.collision_energy = self.collision_energy[mask]
+        self.precursor_charge = self.precursor_charge[mask]
+        self.intensities = self.intensities[mask]
+
+        # once feature columns are introduced, apply the mask to the feature columns (subset the dataframe as well)
+
+    def _split_data(self):
+        n = len(self.sequences)
+
+        if self.val_ratio != 0 and (not self.testing_mode):
+            # add randomization for now and later consider the splitting logic
+            self.indicies_dict[IntensityDataset.SPLIT_NAMES[1]] = np.arange(n)[
+                : int(n * self.val_ratio)
+            ]
+            self.indicies_dict[self.main_split] = np.arange(n)[
+                int(n * self.val_ratio) :
+            ]
+        else:
+            self.indicies_dict[self.main_split] = np.arange(n)
+
+    def _build_tf_dataset(self):
+        input_dict = {}
+
+        for split in self.tf_dataset.keys():
+            input_dict["sequence"] = self.get_examples_at_indices(self.sequences, split)
+            if self.features_to_extract:
+                for feature_name, feature_values in zip(
+                    self.sequence_features_names, self.sequence_features
+                ):
+                    input_dict[feature_name] = self.get_examples_at_indices(
+                        feature_values, split
+                    )
+
+            input_dict["collision_energy"] = self.get_examples_at_indices(
+                self.collision_energy, split
+            )
+            input_dict["precursor_charge"] = self.get_examples_at_indices(
+                self.precursor_charge, split
+            )
+            input_dict["target"] = self.get_examples_at_indices(self.intensities, split)
+
+            self.tf_dataset[split] = tf.data.Dataset.from_tensor_slices(input_dict)
+
+    def _preprocess_tf_dataset(self):
+        # ToDo: convert input to dict and assume this as the general case --> abstract out in parent class
+
+        for split in self.tf_dataset.keys():
+            self.tf_dataset[split] = (
+                self.tf_dataset[split]
+                .map(
+                    IntensityDataset._convert_inputs_to_dict,
+                    num_parallel_calls=tf.data.AUTOTUNE,
+                )
+                .map(
+                    lambda i, t: self._split_sequence(i, t),
+                    num_parallel_calls=tf.data.AUTOTUNE,
+                )
+                .map(
+                    lambda i, t: self._pad_sequences(i, t),
+                    num_parallel_calls=tf.data.AUTOTUNE,
+                )
+            )
+
+            # Here: feature engineering on the fly if needed (atom counts, etc...)
+
+            self.tf_dataset[split] = (
+                self.tf_dataset[split]
+                .batch(self.batch_size)
+                .prefetch(IntensityDataset.BATCHES_TO_PREFETCH)
+            )
+
+    def get_split_targets(self, split="val"):
+        """Retrieve all targets (original labels) for a specific split.
+
+        :param split: a string specifiying the split name (train, val, test)
+        :return: nd.array with the targets
+        """
+        if split not in self.indicies_dict.keys():
+            raise ValueError(
+                "requested split does not exist, availabe splits are: "
+                + list(self.indicies_dict.keys())
+            )
+
+        return self.intensities[self.indicies_dict[split]]
+
+    def denormalize_targets(self, targets):
+        """Denormalize the given targets (can also be predictions) by multiplying the standard deviation and adding the mean.
+
+        :param targets: an nd.array with targets or predictions
+        :return: a denormalized nd.array with the targets or the predictions
+        """
+        return targets * self._data_std + self._data_mean
+
+    def _normalize_target(self, seq, target):
+        target = tf.math.divide(
+            tf.math.subtract(target, self._data_mean), self._data_std
+        )
+        return seq, target
+
+    @staticmethod
+    def _convert_inputs_to_dict(inputs):
+        return inputs, inputs.pop("target")
diff --git a/dlomix/data/RetentionTimeDataset.py b/dlomix/data/RetentionTimeDataset.py
new file mode 100644
index 0000000..542927a
--- /dev/null
+++ b/dlomix/data/RetentionTimeDataset.py
@@ -0,0 +1,370 @@
+from os.path import join
+
+import numpy as np
+import pandas as pd
+import tensorflow as tf
+
+from .AbstractDataset import AbstractDataset
+
+# take into consideration if the pandas dataframe is pickled or not and then call read_pickle instead of read_csv
+# allow the possiblity to have three different dataset objects, one for train, val, and test
+
+
+class RetentionTimeDataset(AbstractDataset):
+    r"""A dataset class for Retention Time prediction tasks. It initialize a dataset object wrapping tf.Dataset and some relevant preprocessing steps.
+
+    Parameters
+    -----------
+    data_source : str, tuple of two numpy.ndarray, numpy.ndarray, optional
+        source can be a tuple of two arrays (sequences, targets), single array (sequences), useful for test data, or a str with a file path to a csv file. Defaults to None.
+    sep : str, optional
+        separator to be used if the data source is a CSV file. Defaults to ",".
+    sequence_col :  str, optional
+        name of the column containing the sequences in the provided CSV. Defaults to "sequence".
+    target_col : str, optional
+        name of the column containing the targets (indexed retention time). Defaults to "irt".
+    feature_cols : list, optional
+        a list of columns containing other features that can be used later as inputs to a model. Defaults to None.
+    normalize_targets : bool, optional
+        a boolean whether to normalize the targets or not (subtract mean and divied by standard deviation). Defaults to False.
+    seq_length : int, optional
+        the sequence length to be used, where all sequences will be padded to this length, longer sequences will be removed and not truncated. Defaults to 0.
+    parser: Subclass of AbstractParser, optional
+        the parser to use to split amino acids and modifications. For more information, please see `dlomix.data.parsers`
+    batch_size : int, optional
+        the batch size to be used for consuming the dataset in training a model. Defaults to 32.
+    val_ratio : int, optional
+        a fraction to determine the size of the validation data (0.2 = 20%). Defaults to 0.
+    seed: int, optional
+        a seed to use for splitting the data to allow for a reproducible split. Defaults to 21.
+    test :bool, optional
+        a boolean whether the dataset is a test dataset or not. Defaults to False.
+    path_aminoacid_atomcounts : str, optional
+        a string with a path to a CSV table with the atom counts of the different amino acids (can be used for feature extraction). Defaults to None.
+    sample_run : bool, optional
+        a boolean to limit the number of examples to a small number, SAMPLE_RUN_N, for testing and debugging purposes. Defaults to False.
+    """
+
+    # TODO: For test dataset --> examples with longer sequences --> do not drop, add NaN for prediction
+
+    def __init__(
+        self,
+        data_source=None,
+        sep=",",
+        sequence_col="sequence",
+        target_col="irt",
+        feature_cols=None,
+        normalize_targets=False,
+        seq_length=0,
+        parser=None,
+        features_to_extract=None,
+        batch_size=32,
+        val_ratio=0,
+        seed=21,
+        test=False,
+        path_aminoacid_atomcounts=None,
+        sample_run=False,
+    ):
+        super().__init__(
+            data_source,
+            sep,
+            sequence_col,
+            target_col,
+            feature_cols,
+            seq_length,
+            parser,
+            features_to_extract,
+            batch_size,
+            val_ratio,
+            path_aminoacid_atomcounts,
+            seed,
+            test,
+            sample_run,
+        )
+
+        self.normalize_targets = normalize_targets
+
+        self.sequences = None
+        self.targets = None
+        self.features_df = None
+        self.example_id = None
+
+        # if data is provided with the constructor call --> load, otherwise --> done
+        if self.data_source is not None:
+            self.load_data(data=data_source)
+
+    def load_data(self, data):
+        """Load data into the dataset object, can be used to load data at a later point after initialization.
+        This function triggers the whole pipeline of: data loading, validation (against sequence length), splitting, building TensorFlow dataset objects, and apply preprocessing.
+
+        :param data: can be: tuple of two arrays (sequences, targets), single array (sequences), useful for test data, or a `str` with a file path toa csv file
+        :return: None
+        """
+        self.data_source = data
+
+        self._read_data()
+        if self.parser:
+            self._parse_sequences()
+            self._validate_remove_long_sequences()
+        if self.features_to_extract:
+            self._extract_features()
+        self._split_data()
+        self._build_tf_dataset()
+        self._preprocess_tf_dataset()
+
+    """
+    numpy array --> either a tuple or a single array
+        - Tuple --> means (sequences, targets)
+        - single ndarray --> means sequences only, useful for test dataset
+    str --> path to csv file or compressed csv file
+    """
+
+    def _read_data(self):
+        if isinstance(self.data_source, dict):
+            self._update_data_loading_for_json_format()
+
+        if isinstance(self.data_source, tuple):
+            tuple_size_is_two = len(self.data_source) == 2
+            if tuple_size_is_two:
+                tuple_elements_are_ndarray = isinstance(
+                    self.data_source[0], np.ndarray
+                ) and isinstance(self.data_source[1], np.ndarray)
+                if tuple_elements_are_ndarray:
+                    self.sequences = self.data_source[0]
+                    self.targets = self.data_source[1]
+            else:
+                raise ValueError(
+                    "If a tuple is provided, it has to have a length of 2 and both elements should be numpy arrays."
+                )
+
+        elif isinstance(self.data_source, np.ndarray):
+            self.sequences = self.data_source
+            self.targets = np.zeros(self.sequences.shape[0])
+            self._data_mean, self._data_std = 0, 1
+
+        elif isinstance(self.data_source, (str, dict)):
+            if isinstance(self.data_source, dict):
+                #  a dict is passed in-memory via the json
+                df = pd.DataFrame(self.data_source)
+            else:
+                # a string path is passed via the json or as a constructor argument
+                df = self._resolve_string_data_path()
+
+            # consider sorting to leverage caching when extracting features
+            # df.sort_values(by=self.sequence_col, inplace=True)
+
+            # used only for testing with a smaller sample from a csv file
+            if self.sample_run:
+                df = df.head(RetentionTimeDataset.SAMPLE_RUN_N)
+
+            # lower all column names
+            df.columns = [col_name.lower() for col_name in df.columns]
+
+            self.sequences, self.targets = (
+                df[self.sequence_col].values,
+                df[self.target_col].values,
+            )
+            self._data_mean, self._data_std = np.mean(self.targets), np.std(
+                self.targets
+            )
+
+            self.features_df = df[self.feature_cols]
+        else:
+            raise ValueError(
+                "Data source has to be either a tuple of two numpy arrays, a single numpy array, "
+                "or a string with a path to a csv/parquet/json file."
+            )
+
+        # give the index of the element as an ID for later reference if needed
+        self.example_id = list(range(len(self.sequences)))
+
+    def _update_data_loading_for_json_format(self, base_dir=None):
+        json_dict = self.data_source
+
+        self.data_source = json_dict.get(RetentionTimeDataset.METADATA_KEY, "")
+
+        # meta data file is assumed to be in the same path as the json input file
+        if base_dir:
+            self.data_source = join(
+                base_dir, json_dict.get(RetentionTimeDataset.METADATA_KEY, "")
+            )
+
+        self.target_col = json_dict.get(RetentionTimeDataset.PARAMS_KEY, {}).get(
+            RetentionTimeDataset.TARGET_NAME_KEY, self.target_col
+        )
+        # ToDo: make dynamic based on parameters
+        self.sequence_col = "modified_sequence"
+
+    def _validate_remove_long_sequences(self) -> None:
+        """
+        Validate if all sequences are shorter than the padding length, otherwise drop them.
+        """
+        if self.sequences.shape[0] <= 0:
+            raise ValueError(
+                "No sequences in the provided data or sequences were not parsed correctly."
+            )
+
+        if len(self.sequences) != len(self.targets):
+            raise ValueError(
+                "Count of examples does not match for sequences and targets."
+            )
+
+        limit = self.seq_length
+        vectorized_len = np.vectorize(lambda x: len(x))
+        mask = vectorized_len(self.sequences) <= limit
+        self.sequences, self.targets = self.sequences[mask], self.targets[mask]
+        self.modifications = self.modifications[mask]
+
+        self.n_term_modifications, self.c_term_modifications = (
+            self.n_term_modifications[mask],
+            self.c_term_modifications[mask],
+        )
+
+        # once feature columns are introduced, apply the mask to the feature columns (subset the dataframe as well)
+
+    def _split_data(self):
+        n = len(self.sequences)
+
+        if self.val_ratio != 0 and (not self.testing_mode):
+            # add randomization for now and later consider the splitting logic
+            self.indicies_dict[RetentionTimeDataset.SPLIT_NAMES[1]] = np.arange(n)[
+                : int(n * self.val_ratio)
+            ]
+            self.indicies_dict[self.main_split] = np.arange(n)[
+                int(n * self.val_ratio) :
+            ]
+        else:
+            self.indicies_dict[self.main_split] = np.arange(n)
+
+    def _build_tf_dataset(self):
+        input_dict = {}
+
+        for split in self.tf_dataset.keys():
+            input_dict["sequence"] = self.get_examples_at_indices(self.sequences, split)
+
+            if self.features_to_extract:
+                for feature_name, feature_values in zip(
+                    self.sequence_features_names, self.sequence_features
+                ):
+                    input_dict[feature_name] = self.get_examples_at_indices(
+                        feature_values, split
+                    )
+
+            input_dict["target"] = self.get_examples_at_indices(self.targets, split)
+
+            self.tf_dataset[split] = tf.data.Dataset.from_tensor_slices(input_dict)
+
+    def _preprocess_tf_dataset(self):
+        for split in self.tf_dataset.keys():
+            self.tf_dataset[split] = self.tf_dataset[split].map(
+                RetentionTimeDataset._convert_inputs_to_dict,
+                num_parallel_calls=tf.data.AUTOTUNE,
+            )
+
+            # avoid normalizing targets for test data --> should not be needed
+            if self.normalize_targets and not self.testing_mode:
+                self.tf_dataset[split] = self.tf_dataset[split].map(
+                    lambda s, t: self._normalize_target(s, t),
+                    num_parallel_calls=tf.data.AUTOTUNE,
+                )
+
+            self.tf_dataset[split] = (
+                self.tf_dataset[split]
+                .map(
+                    lambda s, t: self._split_sequence(s, t),
+                    num_parallel_calls=tf.data.AUTOTUNE,
+                )
+                .map(
+                    lambda s, t: self._pad_sequences(s, t),
+                    num_parallel_calls=tf.data.AUTOTUNE,
+                )
+            )
+
+            if self.include_count_features:
+                self.tf_dataset[split] = (
+                    self.tf_dataset[split]
+                    .map(
+                        RetentionTimeDataset._convert_inputs_to_dict,
+                        num_parallel_calls=tf.data.AUTOTUNE,
+                    )
+                    .map(
+                        lambda s, t: self._generate_single_counts(s, t),
+                        num_parallel_calls=tf.data.AUTOTUNE,
+                    )
+                    .map(
+                        lambda s, t: self._generate_di_counts(s, t),
+                        num_parallel_calls=tf.data.AUTOTUNE,
+                    )
+                )
+
+            self.tf_dataset[split] = (
+                self.tf_dataset[split]
+                .batch(self.batch_size)
+                .prefetch(RetentionTimeDataset.BATCHES_TO_PREFETCH)
+            )
+
+    def get_split_targets(self, split="val"):
+        """Retrieve all targets (original labels) for a specific split.
+
+        :param split: a string specifiying the split name (train, val, test)
+        :return: nd.array with the targets
+        """
+        if split not in self.indicies_dict.keys():
+            raise ValueError(
+                "requested split does not exist, availabe splits are: "
+                + list(self.indicies_dict.keys())
+            )
+
+        return self.targets[self.indicies_dict[split]]
+
+    def denormalize_targets(self, targets):
+        """Denormalize the given targets (can also be predictions) by multiplying the standard deviation and adding the mean.
+
+        :param targets: an nd.array with targets or predictions
+        :return: a denormalized nd.array with the targets or the predictions
+        """
+        if self.normalize_targets:
+            return targets * self._data_std + self._data_mean
+        else:
+            return targets
+
+    def _normalize_target(self, seq, target):
+        target = tf.math.divide(
+            tf.math.subtract(target, self._data_mean), self._data_std
+        )
+        return seq, target
+
+    """
+    if more than one input is added, inputs are added to a python dict, the following methods assume that
+    """
+
+    @staticmethod
+    def _convert_inputs_to_dict(inputs):
+        return inputs, inputs.pop("target")
+
+
+if __name__ == "__main__":
+    test_data_dict = {
+        "metadata": {
+            "linear rt": [1, 2, 3],
+            "modified_sequence": ["ABC", "ABC", "ABC"],
+        },
+        "annotations": {},
+        "parameters": {"target_column_key": "linear rt"},
+    }
+
+    pd.DataFrame(test_data_dict["metadata"]).to_parquet("metadata.parquet")
+
+    test_data_dict_file = {
+        "metadata": "metadata.parquet",
+        "annotations": {},
+        "parameters": {"target_column_key": "linear rt"},
+    }
+
+    rtdataset = RetentionTimeDataset(data_source=test_data_dict, seq_length=20)
+    print(rtdataset.sequences)
+    print(rtdataset.targets)
+
+    rtdataset = RetentionTimeDataset(data_source=test_data_dict_file, seq_length=20)
+    print(rtdataset.sequences)
+    print(rtdataset.targets)
diff --git a/dlomix/data/__init__.py b/dlomix/data/__init__.py
new file mode 100644
index 0000000..5d9bcfb
--- /dev/null
+++ b/dlomix/data/__init__.py
@@ -0,0 +1,15 @@
+from .AbstractDataset import *
+from .feature_extractors import *
+from .IntensityDataset import *
+from .RetentionTimeDataset import *
+
+__all__ = [
+    "RetentionTimeDataset",
+    "IntensityDataset",
+    "AbstractDataset",
+    "LengthFeature",
+    "SequenceFeatureExtractor",
+    "ModificationLocationFeature",
+    "ModificationLossFeature",
+    "ModificationGainFeature",
+]
diff --git a/dlomix/data/feature_extractors.py b/dlomix/data/feature_extractors.py
new file mode 100644
index 0000000..98f5b59
--- /dev/null
+++ b/dlomix/data/feature_extractors.py
@@ -0,0 +1,198 @@
+import abc
+
+from ..utils import get_constructor_call_object_creation
+
+
+class SequenceFeatureExtractor(abc.ABC):
+    def __init__(self, pad_to_seq_length=False, padding_element=-1):
+        super(SequenceFeatureExtractor, self).__init__()
+        self.pad_to_seq_length = pad_to_seq_length
+        self.padding_element = padding_element
+
+    @abc.abstractmethod
+    def extract(self, seq, mods, **kwargs):
+        pass
+
+    def extract_all(self, sequences, modifications, seq_length=0):
+        features = []
+        for seq, mods in zip(sequences, modifications):
+            feature = self.extract(seq, mods, seq_length=seq_length)
+            if seq_length:
+                feature = self.pad_feature_to_seq_length(feature, seq_length)
+            features.append(feature)
+        return features
+
+    def pad_feature_to_seq_length(self, single_feature, seq_length=0):
+        feature_length = len(single_feature)
+
+        if feature_length > seq_length:
+            raise ValueError(
+                f"Feature length ({len(single_feature)}) is longer than sequence length provided ({seq_length})."
+            )
+
+        padding_length = seq_length - feature_length
+        single_feature += [self.padding_element] * padding_length
+
+        return single_feature
+
+    def __repr__(self) -> str:
+        return get_constructor_call_object_creation(self)
+
+
+class LengthFeature(SequenceFeatureExtractor):
+    def __init__(self):
+        super(LengthFeature, self).__init__()
+
+    def extract(self, seq, mods, **kwargs):
+        return len(seq)
+
+
+class ModificationLocationFeature(SequenceFeatureExtractor):
+    DICT_PTM_MOD_ATOM = {
+        "M[UNIMOD:35]": 4,
+        "S[UNIMOD:21]": 3,
+        "T[UNIMOD:21]": 3,
+        "Y[UNIMOD:21]": 3,
+        "R[UNIMOD:7]": 1,
+        "K[UNIMOD:1]": 2,
+        "K[UNIMOD:121]": 2,
+        "Q(gl)": 1,
+        "R[UNIMOD:34]": 2,
+        "K[UNIMOD:34]": 2,
+        "T(ga)": 3,
+        "S(ga)": 3,
+        "T(gl)": 3,
+        "S(gl)": 3,
+        "C[UNIMOD:4]": 4,
+        "[ac]-": 2,
+        "E(gl)": 1,
+        "K[UNIMOD:36]": 2,
+        "K[UNIMOD:37]": 2,
+        "K[UNIMOD:122]": 2,
+        "K[UNIMOD:58]": 2,
+        "K[UNIMOD:1289]": 2,
+        "K[UNIMOD:747]": 2,
+        "K[UNIMOD:64]": 2,
+        "K[UNIMOD:1848]": 2,
+        "K[UNIMOD:1363]": 2,
+        "K[UNIMOD:1849]": 2,
+        "K[UNIMOD:3]": 2,
+        "unknown": 1,
+        "R[UNIMOD:36]": 2,
+        "P[UNIMOD:35]": 1,
+        "Y[UNIMOD:354]": 1,
+    }
+
+    def __init__(self):
+        super(ModificationLocationFeature, self).__init__(pad_to_seq_length=True)
+
+    def extract(self, seq, mods, seq_length):
+        modified_aas = [f"{s}[UNIMOD:{m}]" for s, m in zip(seq, mods)]
+        feature = [
+            ModificationLocationFeature.DICT_PTM_MOD_ATOM.get(i, 0)
+            for i in modified_aas
+        ]
+
+        return feature
+
+
+class ModificationLossFeature(SequenceFeatureExtractor):
+    PTM_LOSS_LOOKUP = {
+        "M[UNIMOD:35]": [0, 0, 0, 0, 0, 0],
+        "S[UNIMOD:21]": [1, 0, 0, 0, 0, 0],
+        "T[UNIMOD:21]": [1, 0, 0, 0, 0, 0],
+        "Y[UNIMOD:21]": [1, 0, 0, 0, 0, 0],
+        "R[UNIMOD:7]": [1, 0, 1, 0, 0, 0],
+        "K[UNIMOD:1]": [1, 0, 0, 0, 0, 0],
+        "K[UNIMOD:121]": [1, 0, 0, 0, 0, 0],
+        "Q(gl)": [9, 4, 2, 1, 0, 0],
+        "R[UNIMOD:34]": [1, 0, 0, 0, 0, 0],
+        "K[UNIMOD:34]": [1, 0, 0, 0, 0, 0],
+        "T(ga)": [1, 0, 0, 0, 0, 0],
+        "S(ga)": [1, 0, 0, 0, 0, 0],
+        "T(gl)": [1, 0, 0, 0, 0, 0],
+        "S(gl)": [1, 0, 0, 0, 0, 0],
+        "C[UNIMOD:4]": [1, 0, 0, 0, 0, 0],
+        "[ac]-": [1, 0, 0, 0, 0, 0],
+        "E(gl)": [8, 4, 1, 2, 0, 0],
+        "K[UNIMOD:36]": [2, 0, 0, 0, 0, 0],
+        "K[UNIMOD:37]": [3, 0, 0, 0, 0, 0],
+        "K[UNIMOD:122]": [1, 0, 0, 0, 0, 0],
+        "K[UNIMOD:58]": [1, 0, 0, 0, 0, 0],
+        "K[UNIMOD:1289]": [1, 0, 0, 0, 0, 0],
+        "K[UNIMOD:747]": [1, 0, 0, 0, 0, 0],
+        "K[UNIMOD:64]": [1, 0, 0, 0, 0, 0],
+        "K[UNIMOD:1848]": [1, 0, 0, 0, 0, 0],
+        "K[UNIMOD:1363]": [1, 0, 0, 0, 0, 0],
+        "K[UNIMOD:1849]": [1, 0, 0, 0, 0, 0],
+        "K[UNIMOD:3]": [1, 0, 0, 0, 0, 0],
+        "unknown": [3, 0, 2, 0, 0, 0],
+        "R[UNIMOD:36]": [2, 0, 0, 0, 0, 0],
+        "P[UNIMOD:35]": [1, 0, 0, 0, 0, 0],
+        "Y[UNIMOD:354]": [1, 0, 0, 0, 0, 0],
+    }
+
+    def __init__(self):
+        super(ModificationLossFeature, self).__init__(
+            pad_to_seq_length=True, padding_element=[0, 0, 0, 0, 0, 0]
+        )
+
+    def extract(self, seq, mods, seq_length):
+        modified_aas = [f"{s}[UNIMOD:{m}]" for s, m in zip(seq, mods)]
+        feature = [
+            ModificationLossFeature.PTM_LOSS_LOOKUP.get(i, [0] * 6)
+            for i in modified_aas
+        ]
+
+        return feature
+
+
+class ModificationGainFeature(SequenceFeatureExtractor):
+    PTM_GAIN_LOOKUP = {
+        "M[UNIMOD:35]": [0, 0, 0, 1, 0, 0],
+        "S[UNIMOD:21]": [2, 0, 0, 3, 1, 0],
+        "T[UNIMOD:21]": [2, 0, 0, 3, 1, 0],
+        "Y[UNIMOD:21]": [2, 0, 0, 3, 1, 0],
+        "R[UNIMOD:7]": [0, 0, 0, 1, 0, 0],
+        "K[UNIMOD:1]": [3, 2, 0, 1, 0, 0],
+        "K[UNIMOD:121]": [7, 4, 2, 2, 0, 0],
+        "Q(gl)": [6, 4, 1, 1, 0, 0],
+        "R[UNIMOD:34]": [3, 1, 0, 0, 0, 0],
+        "K[UNIMOD:34]": [3, 1, 0, 0, 0, 0],
+        "T(ga)": [14, 8, 1, 5, 0, 0],
+        "S(ga)": [14, 8, 1, 5, 0, 0],
+        "T(gl)": [14, 8, 1, 5, 0, 0],
+        "S(gl)": [14, 8, 1, 5, 0, 0],
+        "C[UNIMOD:4]": [4, 2, 1, 1, 0, 0],
+        "[ac]-": [3, 2, 0, 1, 0, 0],
+        "E(gl)": [6, 4, 1, 1, 0, 0],
+        "K[UNIMOD:36]": [6, 2, 0, 0, 0, 0],
+        "K[UNIMOD:37]": [9, 3, 0, 0, 0, 0],
+        "K[UNIMOD:122]": [0, 1, 0, 1, 0, 0],
+        "K[UNIMOD:58]": [5, 3, 0, 1, 0, 0],
+        "K[UNIMOD:1289]": [7, 4, 0, 1, 0, 0],
+        "K[UNIMOD:747]": [3, 3, 0, 3, 0, 0],
+        "K[UNIMOD:64]": [5, 4, 0, 3, 0, 0],
+        "K[UNIMOD:1848]": [7, 5, 0, 3, 0, 0],
+        "K[UNIMOD:1363]": [5, 4, 0, 1, 0, 0],
+        "K[UNIMOD:1849]": [7, 4, 0, 2, 0, 0],
+        "K[UNIMOD:3]": [15, 10, 2, 2, 0, 1],
+        "unknown": [7, 2, 2, 0, 0, 0],
+        "R[UNIMOD:36]": [6, 2, 0, 0, 0, 0],
+        "P[UNIMOD:35]": [1, 0, 0, 1, 0, 0],
+        "Y[UNIMOD:354]": [0, 0, 1, 2, 0, 0],
+    }
+
+    def __init__(self):
+        super(ModificationGainFeature, self).__init__(
+            pad_to_seq_length=True, padding_element=[0, 0, 0, 0, 0, 0]
+        )
+
+    def extract(self, seq, mods, seq_length):
+        modified_aas = [f"{s}[UNIMOD:{m}]" for s, m in zip(seq, mods)]
+        feature = [
+            ModificationGainFeature.PTM_GAIN_LOOKUP.get(i, [0] * 6)
+            for i in modified_aas
+        ]
+
+        return feature
diff --git a/dlomix/data/parsers.py b/dlomix/data/parsers.py
new file mode 100644
index 0000000..e3256fa
--- /dev/null
+++ b/dlomix/data/parsers.py
@@ -0,0 +1,106 @@
+import abc
+
+import numpy as np
+from pyteomics.proforma import parse
+
+
+class AbstractParser(abc.ABC):
+    """
+    Abstract class for Parsers that read sequences and split the modification information from the amino acids.
+    The abstract method `_parse_sequence(self, sequence)` is to be implemented by child classes.
+    """
+
+    @abc.abstractmethod
+    def _parse_sequence(self, sequence: str):
+        """parse a single sequence and return amino acids and modifications as separate data structures.
+
+        Args:
+            sequence (str): a modified sequence
+        """
+        raise NotImplementedError("Not implemented.")
+
+    def _take_first_modification_proforma_output(self, mods):
+        # # take first non-null element (modification only) (applied to all modifications including n and c terminal)
+        # # ensure it is a single element and not a string
+        # return next(filter(lambda x: x is not None, mods), None)
+        return [m[0].id if m is not None else -1 for m in mods]
+
+    def _flatten_seq_mods(self, parsed_sequence: list):
+        """helper function to flatten a list of tuples to two lists.
+
+        Args:
+            parsed_sequence (list): a list of tuples (Amino Acids, Modification) `[('A', None), ('B', Unimod:1), ('C', None)]`
+
+        Returns:
+            list: a list of two lists or tuples (one for Amino acids and the other for modifications). `[['A', 'B', 'C'], [None, Unimod:1, None]]`
+        """
+        seq, mods = [list(i) for i in zip(*parsed_sequence)]
+        return seq, mods
+
+    def parse_sequences(self, sequences):
+        """a generic function to apply the implementation of `_parse_sequence` to a list of sequencens.
+
+        Args:
+            sequences (list): list of string sequences, possibly with modifications.
+
+        Returns:
+            tuple(list, list, list, list): sequences, modifications, n_terminal modifications, c_terminal modifications
+        """
+        seqs = []
+        mods = []
+        n_terms = []
+        c_terms = []
+        for seq in sequences:
+            seq, mod, n, c = self._parse_sequence(seq)
+
+            # build sequence as a string from Amino Acid list
+            seq = "".join(seq)
+            seqs.append(seq)
+
+            mods.append(mod)
+
+            n_terms.append(n)
+            c_terms.append(c)
+        seqs = np.array(seqs)
+
+        mods = np.array(mods, dtype=object)
+        n_terms = np.array(n_terms)
+        c_terms = np.array(c_terms)
+        return seqs, mods, n_terms, c_terms
+
+
+class ProformaParser(AbstractParser):
+    def __init__(self):
+        super().__init__()
+
+    def _parse_sequence(self, sequence):
+        """Implementation for parsing sequences according to the Proforma notation based on the Unimod representation.
+
+        Args:
+            sequence (str): sequence of amino acids, possibly with modifications.
+            N-term and C-term modifications have to be separated with a `-`. Example: `[Unimod:1]-ABC`
+
+        Returns:
+            tuple(list, list, list): output of `pyteomics.proforma.parse' with the n-term and c-term modifications
+            extracted from the originally returned modifiers dict.
+            More information: https://pyteomics.readthedocs.io/en/latest/api/proforma.html#pyteomics.proforma.parse
+        """
+        # returns tuple (list of tuples (AA, mods), and a dict with properties)
+        parsed_sequence, terminal_mods_dict = parse(sequence)
+
+        n_term_mods = terminal_mods_dict.get("n_term")
+        c_term_mods = terminal_mods_dict.get("c_term")
+
+        if n_term_mods:
+            n_term_mods = n_term_mods.pop().id
+        else:
+            n_term_mods = -1
+        if c_term_mods:
+            c_term_mods = c_term_mods.pop().id
+        else:
+            c_term_mods = -1
+
+        seq, mod = self._flatten_seq_mods(parsed_sequence)
+        mod = self._take_first_modification_proforma_output(mod)
+
+        return seq, mod, n_term_mods, c_term_mods
diff --git a/dlomix/data/reader_utils.py b/dlomix/data/reader_utils.py
new file mode 100644
index 0000000..3f3ed47
--- /dev/null
+++ b/dlomix/data/reader_utils.py
@@ -0,0 +1,52 @@
+import json
+
+import pandas as pd
+
+
+def read_parquet_file_pandas(filepath, parquet_engine):
+    """
+    Reads a Parquet file located at the given filepath using pandas and the specified Parquet engine.
+
+    Parameters:
+    -----------
+    filepath : str
+        The file path of the Parquet file to read.
+    parquet_engine : str
+        The name of the Parquet engine to use for reading the file.
+
+    Returns:
+    --------
+    pandas.DataFrame
+        A pandas DataFrame containing the data from the Parquet file.
+
+    Raises:
+    -------
+    ImportError
+        If the specified Parquet engine is missing, fastparquet must be installed.
+    """
+    try:
+        df = pd.read_parquet(filepath, engine=parquet_engine)
+    except ImportError:
+        raise ImportError(
+            "Parquet engine is missing, please install fastparquet using pip or conda."
+        )
+    return df
+
+
+def read_json_file(filepath):
+    """
+    Reads a JSON file located at the given filepath and returns its contents as a dictionary.
+
+    Parameters:
+    -----------
+    filepath : str
+        The file path of the JSON file to read.
+
+    Returns:
+    --------
+    dict
+        A dictionary containing the contents of the JSON file.
+    """
+    with open(filepath, "r") as j:
+        json_dict = json.loads(j.read())
+    return json_dict
-- 
GitLab