Skip to content
Snippets Groups Projects
Commit 282e563e authored by Léo Schneider's avatar Léo Schneider Committed by Schneider Leo
Browse files

dlomix data

parent 2b438448
No related branches found
No related tags found
No related merge requests found
......@@ -4,4 +4,4 @@
/dataset/
/test.py
/database/
/dlomix/data/
import abc
from os.path import abspath, dirname
import numpy as np
import pandas as pd
import tensorflow as tf
from ..constants import DEFAULT_PARQUET_ENGINE
from ..utils import lower_and_trim_strings
from .parsers import ProformaParser
from .reader_utils import read_json_file, read_parquet_file_pandas
# what characterizes a datasets -->
# 1. reading mode (string, CSV, json, parquet, in-memory, etc..)
# 2. inputs (define sequence column name and additional existing feature names)
# 3. features to extract --> abstracted out in featureextractors list
# 4. outputs --> targets to use (names of column or key name in a dict)
# 1. identify reading mode
# and call static readerclass that take a data source and return a DataFrame (later consider other data structures)
# 2. pick inputs from the data after reader has finished, maintain the inputs dict
# 3. pick targets from the data after the reader has finished, maintain the targets dict
# 4. run feature extractors based on input sequences, maintain features dict
# 5. build TF Datasets accordingly
# Consider collecting member variables related to the sequences in a named tuple (sequence, mod, n_term, c_term, etc..)
# consider making the dataset object iterable --> iterate over main split tf dataset
class AbstractDataset(abc.ABC):
r"""Base class for datasets.
Parameters
-----------
data_source : str, tuple of two numpy.ndarray, numpy.ndarray, optional
source can be a tuple of two arrays (sequences, targets), single array (sequences), useful for test data, or a str with a file path to a csv file. Defaults to None.
sep : str, optional
separator to be used if the data source is a CSV file. Defaults to ",".
sequence_col : str, optional
name of the column containing the sequences in the provided CSV. Defaults to "sequence".
target_col : str, optional
name of the column containing the targets (indexed retention time). Defaults to "irt".
feature_cols : list, optional
a list of columns containing other features that can be used later as inputs to a model. Defaults to None.
seq_length : int, optional
the sequence length to be used, where all sequences will be padded to this length, longer sequences will be removed and not truncated. Defaults to 0.
parser : str, optional
name of the parser to use. Available parsers are in `dlomix.data.parsers.py`. Defaults to None; no parsing to be done on the sequence (works for unmodified sequences).
features_to_extract: list(dlomix.data.feature_extractors.SequenceFeatureExtractor), optional
List of feature extractor objects. Defaults to None; no features to extract.
batch_size : int, optional
the batch size to be used for consuming the dataset in training a model. Defaults to 32.
val_ratio : int, optional
a fraction to determine the size of the validation data (0.2 = 20%). Defaults to 0.
seed: int, optional
a seed to use for splitting the data to allow for a reproducible split. Defaults to 21.
test :bool, optional
a boolean whether the dataset is a test dataset or not. Defaults to False.
sample_run : bool, optional
a boolean to limit the number of examples to a small number, SAMPLE_RUN_N, for testing and debugging purposes. Defaults to False.
"""
ATOM_TABLE = None
SPLIT_NAMES = ["train", "val", "test"]
BATCHES_TO_PREFETCH = tf.data.AUTOTUNE
SAMPLE_RUN_N = 100
METADATA_KEY = "metadata"
PARAMS_KEY = "parameters"
ANNOTATIONS_KEY = "annotations"
TARGET_NAME_KEY = "target_column_key"
SEQUENCE_COLUMN_KEY = "sequence_column_key"
def __init__(
self,
data_source,
sep,
sequence_col,
target_col,
feature_cols=None,
seq_length=0,
parser=None,
features_to_extract=None,
batch_size=32,
val_ratio=0,
path_aminoacid_atomcounts=None,
seed=21,
test=False,
sample_run=False,
):
super(AbstractDataset, self).__init__()
np.random.seed(seed)
self.seed = seed
np.random.seed(self.seed)
self.data_source = data_source
self.sep = sep
self.sequence_col = sequence_col.lower()
self.target_col = target_col.lower()
if feature_cols:
self.feature_cols = lower_and_trim_strings(feature_cols)
else:
self.feature_cols = []
self.sample_run = sample_run
# if seq_length is 0 (default) -> no padding
self.seq_length = seq_length
self.parser = parser
self.features_to_extract = features_to_extract
self._data_mean = 0
self._data_std = 1
self.batch_size = batch_size
self.val_ratio = val_ratio
self.testing_mode = test
# main split is "train" if not in testing mode, otherwise "test"
self.main_split = (
AbstractDataset.SPLIT_NAMES[0]
if not self.testing_mode
else AbstractDataset.SPLIT_NAMES[2]
)
# initialize TF Datasets dict
self.tf_dataset = (
{self.main_split: None, AbstractDataset.SPLIT_NAMES[1]: None}
if val_ratio != 0
else {self.main_split: None}
)
self.indicies_dict = (
{self.main_split: None, AbstractDataset.SPLIT_NAMES[1]: None}
if val_ratio != 0
else {self.main_split: None}
)
# if path to counts lookup table is provided, include count features, otherwise not
self.include_count_features = True if path_aminoacid_atomcounts else False
if self.include_count_features:
self.aminoacid_atom_counts_csv_path = (
path_aminoacid_atomcounts # "../lookups/aa_comp_rel.csv"
)
self._init_atom_table()
self._resolve_parser()
self.sequences = None
self.unmodified_sequences = None
self.modifications = None
self.n_term_modifications = None
self.c_term_modifications = None
self.sequence_features = None
self.sequence_features_names = None
def _resolve_parser(self):
if self.parser is None:
return
elif self.parser == "proforma":
self.parser = ProformaParser()
else:
raise ValueError(
f"Invalid parser provided {self.parser}. For a list of available parsers, check dlomix.data.parsers.py"
)
def _parse_sequences(self):
(
self.sequences,
self.modifications,
self.n_term_modifications,
self.c_term_modifications,
) = self.parser.parse_sequences(self.sequences)
def _resolve_string_data_path(self):
is_json_file = self.data_source.endswith(".json")
if is_json_file:
json_file_base_dir = dirname(abspath(self.data_source))
self.data_source = read_json_file(self.data_source)
self._update_data_loading_for_json_format(json_file_base_dir)
is_parquet_url = ".parquet" in self.data_source and self.data_source.startswith(
"http"
)
is_parquet_file = self.data_source.endswith(".parquet")
is_csv_file = self.data_source.endswith(".csv")
if is_parquet_url or is_parquet_file:
df = read_parquet_file_pandas(self.data_source, DEFAULT_PARQUET_ENGINE)
return df
elif is_csv_file:
df = pd.read_csv(self.data_source)
return df
else:
raise ValueError(
"Invalid data source provided as a string, please provide a path to a csv, parquet, "
"or a json file."
)
def _extract_features(self):
if self.features_to_extract:
self.sequence_features = []
self.sequence_features_names = []
for feature_class in self.features_to_extract:
print("Extracting feature: ", feature_class)
extractor_class = feature_class
feature_array = np.array(
extractor_class.extract_all(
self.sequences,
self.modifications,
self.seq_length if extractor_class.pad_to_seq_length else 0,
),
dtype=np.float32,
)
# ensure an extra (1) dimension is added for later concatentiona
# this can be done later in tensorflow in the model as well, better ?
# what shapes of features could exist (BATCH X SEQ_LENGTH X 6), (BATCH X SEQ_LENGTH X 1)
if (
feature_array.ndim < 3
and feature_array.shape[-1] == self.seq_length
):
feature_array = np.expand_dims(feature_array, axis=-1)
self.sequence_features.append(feature_array)
self.sequence_features_names.append(
extractor_class.__class__.__name__.lower()
)
def _reshape_sequence_feature_arrays(self):
pass
def get_examples_at_indices(self, examples, split):
if isinstance(examples, np.ndarray):
return examples[self.indicies_dict[split]]
# to handle features
if isinstance(examples, list):
return [
examples_single[self.indicies_dict[split]]
for examples_single in examples
]
raise ValueError(
f"Provided data structure to subset for examples at split indices is neither a list nor a numpy array, but rather a {type(examples)}."
)
def _init_atom_table(self):
atom_counts = pd.read_csv(self.aminoacid_atom_counts_csv_path)
atom_counts = atom_counts.astype(str)
keys_tensor = tf.constant(atom_counts["aa"].values)
values_tensor = tf.constant(
["_".join(c) for c in list(atom_counts.iloc[:, 1:].values)]
)
init = tf.lookup.KeyValueTensorInitializer(keys_tensor, values_tensor)
AbstractDataset.ATOM_TABLE = tf.lookup.StaticHashTable(
init, default_value="0_0_0_0_0"
)
@abc.abstractmethod
def load_data(self, data):
"""load data from source and populate numpy arrays to use for tf.Dataset
Args:
data (str, tuple, dict): Path to csv or parquet file, tuple with numpy arrays, or a dict with keys
`AbstractDataset.METADATA_KEY`, `AbstractDataset.PARAMS_KEY`,
`AbstractDataset.TARGET_NAME_KEY`, `AbstractDataset.SEQUENCE_COLUMN_KEY`.
"""
raise NotImplementedError("Not implemented")
@abc.abstractmethod
def _update_data_loading_for_json_format(self, base_dir):
raise NotImplementedError("Not implemented")
@abc.abstractmethod
def _build_tf_dataset(self):
"""Build the tf.Dataset object for available splits using the data loaded by `load_data`.
Example:
`for split in self.tf_dataset.keys():
self.tf_dataset[split] = tf.data.Dataset.from_tensor_slices(
(self.inputs, self.outputs)
)`
"""
raise NotImplementedError("Not implemented")
@abc.abstractmethod
def _preprocess_tf_dataset(self):
"""Add processing logic (tensorflow functions) to apply to all tf.Datasets."""
raise NotImplementedError("Not implemented")
@abc.abstractmethod
def get_split_targets(self, split="val"):
"""Retrieve all targets (original labels) for a specific split (dependent on the task at hand)
Args:
split (str, optional): Name of the split, check `AbstractDataset.SPLIT_NAMES`. Defaults to "val".
"""
raise NotImplementedError("Not implemented")
@staticmethod
@abc.abstractmethod
def _convert_inputs_to_dict(inputs, target):
"""Collect all inputs into a python dict with corresponding keys.
When multiple inputs are used,this function is used at the beginning of the pre-processing
of TF.Datasets.
Args:
inputs (tuple(tf.Tensor)): tuple of input tensors
target (tf.Tensor): target label tensor
"""
raise NotImplementedError("Not implemented")
def _pad_sequences(self, inputs, target):
if isinstance(inputs, dict):
inputs["sequence"] = self._pad_seq(inputs["sequence"])
return inputs, target
else:
return self._pad_seq(inputs), target
def _pad_seq(self, seq):
pad_len = tf.abs(self.seq_length - tf.size(seq))
paddings = tf.concat([[0], [pad_len]], axis=0)
seq = tf.pad(seq, [paddings], "CONSTANT")
seq.set_shape([self.seq_length])
return seq
def _split_sequence(self, inputs, target):
if isinstance(inputs, dict):
inputs["sequence"] = tf.strings.bytes_split(inputs["sequence"])
return inputs, target
else:
inputs = tf.strings.bytes_split(inputs)
return inputs, target
def _generate_single_counts(self, inputs, target):
inputs["counts"] = tf.map_fn(
lambda x: AbstractDataset.ATOM_TABLE.lookup(x), inputs["sequence"]
)
inputs["counts"] = tf.map_fn(
lambda x: tf.strings.split(x, sep="_"), inputs["counts"]
)
inputs["counts"] = tf.strings.to_number(inputs["counts"])
inputs["counts"].set_shape([self.seq_length, 5])
return inputs, target
def _generate_di_counts(self, inputs, target):
# add every two neighboring elements without overlap [0 0 1 1 2 2 .... pad_length/2 pad_length/2]
segments_to_add = [i // 2 for i in range(self.seq_length)]
inputs["di_counts"] = tf.math.segment_sum(
inputs["counts"], tf.constant(segments_to_add)
)
inputs["di_counts"].set_shape([self.seq_length // 2, 5])
return inputs, target
def _get_tf_dataset(self, split=None):
assert (
split in self.tf_dataset.keys()
), f"Requested data split {split} is not available, available splits are {self.tf_dataset.keys()}"
if split in self.tf_dataset.keys():
return self.tf_dataset[split]
return self.tf_dataset
@property
def train_data(self):
"""TensorFlow Dataset object for the training data"""
return self._get_tf_dataset(AbstractDataset.SPLIT_NAMES[0])
@property
def val_data(self):
"""TensorFlow Dataset object for the validation data"""
return self._get_tf_dataset(AbstractDataset.SPLIT_NAMES[1])
@property
def test_data(self):
"""TensorFlow Dataset object for the test data"""
return self._get_tf_dataset(AbstractDataset.SPLIT_NAMES[2])
@property
def data_mean(self):
"""Mean value of the targets"""
return self._data_mean
@property
def data_std(self):
"""Standard deviation value of the targets"""
return self._data_std
@data_mean.setter
def data_mean(self, value):
self._data_mean = value
@data_std.setter
def data_std(self, value):
self._data_std = value
from os.path import dirname, join
import numpy as np
import tensorflow as tf
from ..utils import convert_nested_list_to_numpy_array, flatten_dict_for_values
from .AbstractDataset import AbstractDataset
# take into consideration if the pandas dataframe is pickled or not and then call read_pickle instead of read_csv
# allow the possiblity to have three different dataset objects, one for train, val, and test
class IntensityDataset(AbstractDataset):
r"""A dataset class for Intensity prediction tasks. It initialize a dataset object wrapping tf.Dataset and some relevant preprocessing steps.
Parameters
-----------
data_source : str, tuple of two numpy.ndarray, numpy.ndarray, optional
source can be a tuple of two arrays (sequences, targets), single array (sequences), useful for test data, or a str with a file path to a csv file. Defaults to None.
sep : str, optional
separator to be used if the data source is a CSV file. Defaults to ",".
sequence_col : str, optional
name of the column containing the sequences in the provided CSV. Defaults to "sequence".
target_col : str, optional
name of the column containing the targets (vector of intensities). Defaults to "intensities_raww".
feature_cols : list, optional
a list of columns containing other features that can be used later as inputs to a model. Defaults to None.
normalize_targets : bool, optional
a boolean whether to normalize the targets or not (subtract mean and divied by standard deviation). Defaults to False.
seq_length : int, optional
the sequence length to be used, where all sequences will be padded to this length, longer sequences will be removed and not truncated. Defaults to 0.
parser: Subclass of AbstractParser, optional
the parser to use to split amino acids and modifications. For more information, please see `dlomix.data.parsers`
batch_size : int, optional
the batch size to be used for consuming the dataset in training a model. Defaults to 32.
val_ratio : int, optional
a fraction to determine the size of the validation data (0.2 = 20%). Defaults to 0.
seed: int, optional
a seed to use for splitting the data to allow for a reproducible split. Defaults to 21.
test :bool, optional
a boolean whether the dataset is a test dataset or not. Defaults to False.
path_aminoacid_atomcounts : str, optional
a string with a path to a CSV table with the atom counts of the different amino acids (can be used for feature extraction). Defaults to None.
sample_run : bool, optional
a boolean to limit the number of examples to a small number, SAMPLE_RUN_N, for testing and debugging purposes. Defaults to False.
metadata_filtering_criteria : dict, optional
a dictionary with the filtering criteria (column names and conditions) to be used to filter the metadata. Defaults to None.
"""
# TODO: For test dataset --> examples with longer sequences --> do not drop, add NaN for prediction
def __init__(
self,
data_source=None,
sep=",",
sequence_col="sequence",
collision_energy_col="collision_energy_aligned_normed",
precursor_charge_col="precursor_charge_onehot",
intensities_col="intensities",
feature_cols=None,
normalize_targets=False,
seq_length=0,
parser=None,
features_to_extract=None,
batch_size=32,
val_ratio=0,
seed=21,
test=False,
path_aminoacid_atomcounts=None,
sample_run=False,
metadata_filtering_criteria=None,
):
super().__init__(
data_source,
sep,
sequence_col,
intensities_col,
feature_cols,
seq_length,
parser,
features_to_extract,
batch_size,
val_ratio,
path_aminoacid_atomcounts,
seed,
test,
sample_run,
)
self.collision_energy_col = collision_energy_col.lower()
self.precursor_charge_col = precursor_charge_col.lower()
self.intensities_col = self.target_col
self.metadata_filtering_criteria = metadata_filtering_criteria
self.normalize_targets = normalize_targets
self.no_intensities = self.testing_mode
self.sequences = None
self.collision_energy = None
self.precursor_charge = None
self.intensities = None
self.features_df = None
self.example_id = None
# if data is provided with the constructor call --> load, otherwise --> done
if self.data_source is not None:
self.load_data(data=data_source)
def load_data(self, data):
"""Load data into the dataset object, can be used to load data at a later point after initialization.
This function triggers the whole pipeline of: data loading, validation (against sequence length), splitting, building TensorFlow dataset objects, and apply preprocessing.
:param data: a `str` with a file path to csv file
:return: None
"""
self.data_source = data
self._read_data()
# consider removing lengthy sequences when no parser is passed
# Numpy & Pandas
if self.parser:
self._parse_sequences()
self._validate_remove_long_sequences()
if self.features_to_extract:
self._extract_features()
self._split_data()
# TF.Dataset
self._build_tf_dataset()
self._preprocess_tf_dataset()
"""
numpy array --> either a tuple or a single array
- Tuple --> means (sequences, collision_energy, precursor_charge, intensities)
- single ndarray --> means sequences only, useful for test dataset
str --> path to csv file or compressed csv file
"""
def _read_data(self):
if isinstance(self.data_source, tuple):
tuple_size_is_three_or_four = (
len(self.data_source) == 3 or len(self.data_source) == 4
)
if tuple_size_is_three_or_four:
tuple_elements_are_ndarray = all(
[isinstance(x, np.ndarray) for x in self.data_source]
)
if tuple_elements_are_ndarray:
self.sequences = self.data_source[0]
self.collision_energy = self.data_source[1]
self.precursor_charge = self.data_source[2]
if len(self.data_source) == 4:
self.intensities = self.data_source[3]
self.no_intensities = False
else:
self.intensities = np.zeros()
self.no_intensities = True
else:
raise ValueError(
"If a tuple is provided, it has to have a length of 4 and all elements should be numpy arrays."
)
elif isinstance(self.data_source, str):
df = self._resolve_string_data_path()
# used only for testing with a smaller sample from a csv file
if self.sample_run:
df = df.head(IntensityDataset.SAMPLE_RUN_N)
# lower all column names
df.columns = [col_name.lower() for col_name in df.columns]
# retrieve columns from the dataframe
self.sequences = df[self.sequence_col]
self.collision_energy = df[self.collision_energy_col]
self.precursor_charge = df[self.precursor_charge_col]
self.intensities = df[self.intensities_col]
# parse strings into lists, for precursor charge and intensities
if isinstance(self.precursor_charge.iloc[0], str):
self.precursor_charge = self.precursor_charge.apply(eval)
if isinstance(self.intensities.iloc[0], str):
self.intensities = self.intensities.apply(eval)
# get numpy arrays with .values() for all inputs and intensities
self.sequences = self.sequences.values
# for concatenation later, we expand dimensions
self.collision_energy = self.collision_energy.values.reshape(-1, 1)
self.precursor_charge = convert_nested_list_to_numpy_array(
self.precursor_charge.values, dtype=np.float32
)
self.intensities = convert_nested_list_to_numpy_array(
self.intensities.values
)
self.features_df = df[self.feature_cols]
else:
raise ValueError(
"Data source has to be either a tuple of four numpy arrays,"
"or a string path to a csv file."
)
# give the index of the element as an ID for later reference if needed
self.example_id = list(range(len(self.sequences)))
def _update_data_loading_for_json_format(self, base_dir=None):
import prospectdataset as prospect
json_dict = self.data_source
meta_data_filepath = json_dict.get(IntensityDataset.METADATA_KEY, "")
annotation_data_value = json_dict.get(IntensityDataset.ANNOTATIONS_KEY, "")
annotations_filepaths = flatten_dict_for_values(annotation_data_value)
# meta data file is assumed to be in the same path as the json input file
if base_dir:
meta_data_filepath = join(base_dir, meta_data_filepath)
annotations_filepaths = [
join(base_dir, file) for file in annotations_filepaths
]
# all annotation files are assumed to be in the same directory
if len(annotations_filepaths) > 0:
annotations_dir = dirname(annotations_filepaths[0])
else:
raise ValueError(
"No paths to annotation files were provided in the JSON file."
)
# ToDo: consider options to check if the files were processed earlier and skip this step since it is time consuming
# to pass metadata_filtering_criteria
print("Optionally Downloading and processing the data...")
print("Annotations directory: ", annotations_dir)
# fix directory path, use file names from the json file ???
print("Metadata filepath: ", meta_data_filepath)
print("Base directory: ", base_dir)
self.data_source = prospect.download_process_pool(
annotations_data_dir=annotations_dir,
metadata_path=meta_data_filepath,
save_filepath=join(base_dir, "processed_pool.parquet"),
metadata_filtering_criteria=self.metadata_filtering_criteria,
)
self.intensities_col = json_dict.get(IntensityDataset.PARAMS_KEY, {}).get(
IntensityDataset.TARGET_NAME_KEY, self.intensities_col
)
# ToDo: make dynamic based on parameters
self.sequence_col = "modified_sequence"
def _validate_remove_long_sequences(self) -> None:
"""
Validate if all sequences are shorter than the padding length, otherwise drop them.
"""
assert self.sequences.shape[0] > 0, "No sequences in the provided data."
# check if count of examples matches for all provided inputs
lengths = [
len(self.sequences),
len(self.collision_energy),
len(self.precursor_charge),
]
if not self.no_intensities:
lengths = lengths + [len(self.intensities)]
assert np.all(
lengths == np.array(lengths[0])
), "Count of examples does not match for sequences and targets."
limit = self.seq_length
vectorized_len = np.vectorize(lambda x: len(x))
mask = vectorized_len(self.sequences) <= limit
self.sequences = self.sequences[mask]
self.collision_energy = self.collision_energy[mask]
self.precursor_charge = self.precursor_charge[mask]
self.intensities = self.intensities[mask]
# once feature columns are introduced, apply the mask to the feature columns (subset the dataframe as well)
def _split_data(self):
n = len(self.sequences)
if self.val_ratio != 0 and (not self.testing_mode):
# add randomization for now and later consider the splitting logic
self.indicies_dict[IntensityDataset.SPLIT_NAMES[1]] = np.arange(n)[
: int(n * self.val_ratio)
]
self.indicies_dict[self.main_split] = np.arange(n)[
int(n * self.val_ratio) :
]
else:
self.indicies_dict[self.main_split] = np.arange(n)
def _build_tf_dataset(self):
input_dict = {}
for split in self.tf_dataset.keys():
input_dict["sequence"] = self.get_examples_at_indices(self.sequences, split)
if self.features_to_extract:
for feature_name, feature_values in zip(
self.sequence_features_names, self.sequence_features
):
input_dict[feature_name] = self.get_examples_at_indices(
feature_values, split
)
input_dict["collision_energy"] = self.get_examples_at_indices(
self.collision_energy, split
)
input_dict["precursor_charge"] = self.get_examples_at_indices(
self.precursor_charge, split
)
input_dict["target"] = self.get_examples_at_indices(self.intensities, split)
self.tf_dataset[split] = tf.data.Dataset.from_tensor_slices(input_dict)
def _preprocess_tf_dataset(self):
# ToDo: convert input to dict and assume this as the general case --> abstract out in parent class
for split in self.tf_dataset.keys():
self.tf_dataset[split] = (
self.tf_dataset[split]
.map(
IntensityDataset._convert_inputs_to_dict,
num_parallel_calls=tf.data.AUTOTUNE,
)
.map(
lambda i, t: self._split_sequence(i, t),
num_parallel_calls=tf.data.AUTOTUNE,
)
.map(
lambda i, t: self._pad_sequences(i, t),
num_parallel_calls=tf.data.AUTOTUNE,
)
)
# Here: feature engineering on the fly if needed (atom counts, etc...)
self.tf_dataset[split] = (
self.tf_dataset[split]
.batch(self.batch_size)
.prefetch(IntensityDataset.BATCHES_TO_PREFETCH)
)
def get_split_targets(self, split="val"):
"""Retrieve all targets (original labels) for a specific split.
:param split: a string specifiying the split name (train, val, test)
:return: nd.array with the targets
"""
if split not in self.indicies_dict.keys():
raise ValueError(
"requested split does not exist, availabe splits are: "
+ list(self.indicies_dict.keys())
)
return self.intensities[self.indicies_dict[split]]
def denormalize_targets(self, targets):
"""Denormalize the given targets (can also be predictions) by multiplying the standard deviation and adding the mean.
:param targets: an nd.array with targets or predictions
:return: a denormalized nd.array with the targets or the predictions
"""
return targets * self._data_std + self._data_mean
def _normalize_target(self, seq, target):
target = tf.math.divide(
tf.math.subtract(target, self._data_mean), self._data_std
)
return seq, target
@staticmethod
def _convert_inputs_to_dict(inputs):
return inputs, inputs.pop("target")
from os.path import join
import numpy as np
import pandas as pd
import tensorflow as tf
from .AbstractDataset import AbstractDataset
# take into consideration if the pandas dataframe is pickled or not and then call read_pickle instead of read_csv
# allow the possiblity to have three different dataset objects, one for train, val, and test
class RetentionTimeDataset(AbstractDataset):
r"""A dataset class for Retention Time prediction tasks. It initialize a dataset object wrapping tf.Dataset and some relevant preprocessing steps.
Parameters
-----------
data_source : str, tuple of two numpy.ndarray, numpy.ndarray, optional
source can be a tuple of two arrays (sequences, targets), single array (sequences), useful for test data, or a str with a file path to a csv file. Defaults to None.
sep : str, optional
separator to be used if the data source is a CSV file. Defaults to ",".
sequence_col : str, optional
name of the column containing the sequences in the provided CSV. Defaults to "sequence".
target_col : str, optional
name of the column containing the targets (indexed retention time). Defaults to "irt".
feature_cols : list, optional
a list of columns containing other features that can be used later as inputs to a model. Defaults to None.
normalize_targets : bool, optional
a boolean whether to normalize the targets or not (subtract mean and divied by standard deviation). Defaults to False.
seq_length : int, optional
the sequence length to be used, where all sequences will be padded to this length, longer sequences will be removed and not truncated. Defaults to 0.
parser: Subclass of AbstractParser, optional
the parser to use to split amino acids and modifications. For more information, please see `dlomix.data.parsers`
batch_size : int, optional
the batch size to be used for consuming the dataset in training a model. Defaults to 32.
val_ratio : int, optional
a fraction to determine the size of the validation data (0.2 = 20%). Defaults to 0.
seed: int, optional
a seed to use for splitting the data to allow for a reproducible split. Defaults to 21.
test :bool, optional
a boolean whether the dataset is a test dataset or not. Defaults to False.
path_aminoacid_atomcounts : str, optional
a string with a path to a CSV table with the atom counts of the different amino acids (can be used for feature extraction). Defaults to None.
sample_run : bool, optional
a boolean to limit the number of examples to a small number, SAMPLE_RUN_N, for testing and debugging purposes. Defaults to False.
"""
# TODO: For test dataset --> examples with longer sequences --> do not drop, add NaN for prediction
def __init__(
self,
data_source=None,
sep=",",
sequence_col="sequence",
target_col="irt",
feature_cols=None,
normalize_targets=False,
seq_length=0,
parser=None,
features_to_extract=None,
batch_size=32,
val_ratio=0,
seed=21,
test=False,
path_aminoacid_atomcounts=None,
sample_run=False,
):
super().__init__(
data_source,
sep,
sequence_col,
target_col,
feature_cols,
seq_length,
parser,
features_to_extract,
batch_size,
val_ratio,
path_aminoacid_atomcounts,
seed,
test,
sample_run,
)
self.normalize_targets = normalize_targets
self.sequences = None
self.targets = None
self.features_df = None
self.example_id = None
# if data is provided with the constructor call --> load, otherwise --> done
if self.data_source is not None:
self.load_data(data=data_source)
def load_data(self, data):
"""Load data into the dataset object, can be used to load data at a later point after initialization.
This function triggers the whole pipeline of: data loading, validation (against sequence length), splitting, building TensorFlow dataset objects, and apply preprocessing.
:param data: can be: tuple of two arrays (sequences, targets), single array (sequences), useful for test data, or a `str` with a file path toa csv file
:return: None
"""
self.data_source = data
self._read_data()
if self.parser:
self._parse_sequences()
self._validate_remove_long_sequences()
if self.features_to_extract:
self._extract_features()
self._split_data()
self._build_tf_dataset()
self._preprocess_tf_dataset()
"""
numpy array --> either a tuple or a single array
- Tuple --> means (sequences, targets)
- single ndarray --> means sequences only, useful for test dataset
str --> path to csv file or compressed csv file
"""
def _read_data(self):
if isinstance(self.data_source, dict):
self._update_data_loading_for_json_format()
if isinstance(self.data_source, tuple):
tuple_size_is_two = len(self.data_source) == 2
if tuple_size_is_two:
tuple_elements_are_ndarray = isinstance(
self.data_source[0], np.ndarray
) and isinstance(self.data_source[1], np.ndarray)
if tuple_elements_are_ndarray:
self.sequences = self.data_source[0]
self.targets = self.data_source[1]
else:
raise ValueError(
"If a tuple is provided, it has to have a length of 2 and both elements should be numpy arrays."
)
elif isinstance(self.data_source, np.ndarray):
self.sequences = self.data_source
self.targets = np.zeros(self.sequences.shape[0])
self._data_mean, self._data_std = 0, 1
elif isinstance(self.data_source, (str, dict)):
if isinstance(self.data_source, dict):
# a dict is passed in-memory via the json
df = pd.DataFrame(self.data_source)
else:
# a string path is passed via the json or as a constructor argument
df = self._resolve_string_data_path()
# consider sorting to leverage caching when extracting features
# df.sort_values(by=self.sequence_col, inplace=True)
# used only for testing with a smaller sample from a csv file
if self.sample_run:
df = df.head(RetentionTimeDataset.SAMPLE_RUN_N)
# lower all column names
df.columns = [col_name.lower() for col_name in df.columns]
self.sequences, self.targets = (
df[self.sequence_col].values,
df[self.target_col].values,
)
self._data_mean, self._data_std = np.mean(self.targets), np.std(
self.targets
)
self.features_df = df[self.feature_cols]
else:
raise ValueError(
"Data source has to be either a tuple of two numpy arrays, a single numpy array, "
"or a string with a path to a csv/parquet/json file."
)
# give the index of the element as an ID for later reference if needed
self.example_id = list(range(len(self.sequences)))
def _update_data_loading_for_json_format(self, base_dir=None):
json_dict = self.data_source
self.data_source = json_dict.get(RetentionTimeDataset.METADATA_KEY, "")
# meta data file is assumed to be in the same path as the json input file
if base_dir:
self.data_source = join(
base_dir, json_dict.get(RetentionTimeDataset.METADATA_KEY, "")
)
self.target_col = json_dict.get(RetentionTimeDataset.PARAMS_KEY, {}).get(
RetentionTimeDataset.TARGET_NAME_KEY, self.target_col
)
# ToDo: make dynamic based on parameters
self.sequence_col = "modified_sequence"
def _validate_remove_long_sequences(self) -> None:
"""
Validate if all sequences are shorter than the padding length, otherwise drop them.
"""
if self.sequences.shape[0] <= 0:
raise ValueError(
"No sequences in the provided data or sequences were not parsed correctly."
)
if len(self.sequences) != len(self.targets):
raise ValueError(
"Count of examples does not match for sequences and targets."
)
limit = self.seq_length
vectorized_len = np.vectorize(lambda x: len(x))
mask = vectorized_len(self.sequences) <= limit
self.sequences, self.targets = self.sequences[mask], self.targets[mask]
self.modifications = self.modifications[mask]
self.n_term_modifications, self.c_term_modifications = (
self.n_term_modifications[mask],
self.c_term_modifications[mask],
)
# once feature columns are introduced, apply the mask to the feature columns (subset the dataframe as well)
def _split_data(self):
n = len(self.sequences)
if self.val_ratio != 0 and (not self.testing_mode):
# add randomization for now and later consider the splitting logic
self.indicies_dict[RetentionTimeDataset.SPLIT_NAMES[1]] = np.arange(n)[
: int(n * self.val_ratio)
]
self.indicies_dict[self.main_split] = np.arange(n)[
int(n * self.val_ratio) :
]
else:
self.indicies_dict[self.main_split] = np.arange(n)
def _build_tf_dataset(self):
input_dict = {}
for split in self.tf_dataset.keys():
input_dict["sequence"] = self.get_examples_at_indices(self.sequences, split)
if self.features_to_extract:
for feature_name, feature_values in zip(
self.sequence_features_names, self.sequence_features
):
input_dict[feature_name] = self.get_examples_at_indices(
feature_values, split
)
input_dict["target"] = self.get_examples_at_indices(self.targets, split)
self.tf_dataset[split] = tf.data.Dataset.from_tensor_slices(input_dict)
def _preprocess_tf_dataset(self):
for split in self.tf_dataset.keys():
self.tf_dataset[split] = self.tf_dataset[split].map(
RetentionTimeDataset._convert_inputs_to_dict,
num_parallel_calls=tf.data.AUTOTUNE,
)
# avoid normalizing targets for test data --> should not be needed
if self.normalize_targets and not self.testing_mode:
self.tf_dataset[split] = self.tf_dataset[split].map(
lambda s, t: self._normalize_target(s, t),
num_parallel_calls=tf.data.AUTOTUNE,
)
self.tf_dataset[split] = (
self.tf_dataset[split]
.map(
lambda s, t: self._split_sequence(s, t),
num_parallel_calls=tf.data.AUTOTUNE,
)
.map(
lambda s, t: self._pad_sequences(s, t),
num_parallel_calls=tf.data.AUTOTUNE,
)
)
if self.include_count_features:
self.tf_dataset[split] = (
self.tf_dataset[split]
.map(
RetentionTimeDataset._convert_inputs_to_dict,
num_parallel_calls=tf.data.AUTOTUNE,
)
.map(
lambda s, t: self._generate_single_counts(s, t),
num_parallel_calls=tf.data.AUTOTUNE,
)
.map(
lambda s, t: self._generate_di_counts(s, t),
num_parallel_calls=tf.data.AUTOTUNE,
)
)
self.tf_dataset[split] = (
self.tf_dataset[split]
.batch(self.batch_size)
.prefetch(RetentionTimeDataset.BATCHES_TO_PREFETCH)
)
def get_split_targets(self, split="val"):
"""Retrieve all targets (original labels) for a specific split.
:param split: a string specifiying the split name (train, val, test)
:return: nd.array with the targets
"""
if split not in self.indicies_dict.keys():
raise ValueError(
"requested split does not exist, availabe splits are: "
+ list(self.indicies_dict.keys())
)
return self.targets[self.indicies_dict[split]]
def denormalize_targets(self, targets):
"""Denormalize the given targets (can also be predictions) by multiplying the standard deviation and adding the mean.
:param targets: an nd.array with targets or predictions
:return: a denormalized nd.array with the targets or the predictions
"""
if self.normalize_targets:
return targets * self._data_std + self._data_mean
else:
return targets
def _normalize_target(self, seq, target):
target = tf.math.divide(
tf.math.subtract(target, self._data_mean), self._data_std
)
return seq, target
"""
if more than one input is added, inputs are added to a python dict, the following methods assume that
"""
@staticmethod
def _convert_inputs_to_dict(inputs):
return inputs, inputs.pop("target")
if __name__ == "__main__":
test_data_dict = {
"metadata": {
"linear rt": [1, 2, 3],
"modified_sequence": ["ABC", "ABC", "ABC"],
},
"annotations": {},
"parameters": {"target_column_key": "linear rt"},
}
pd.DataFrame(test_data_dict["metadata"]).to_parquet("metadata.parquet")
test_data_dict_file = {
"metadata": "metadata.parquet",
"annotations": {},
"parameters": {"target_column_key": "linear rt"},
}
rtdataset = RetentionTimeDataset(data_source=test_data_dict, seq_length=20)
print(rtdataset.sequences)
print(rtdataset.targets)
rtdataset = RetentionTimeDataset(data_source=test_data_dict_file, seq_length=20)
print(rtdataset.sequences)
print(rtdataset.targets)
from .AbstractDataset import *
from .feature_extractors import *
from .IntensityDataset import *
from .RetentionTimeDataset import *
__all__ = [
"RetentionTimeDataset",
"IntensityDataset",
"AbstractDataset",
"LengthFeature",
"SequenceFeatureExtractor",
"ModificationLocationFeature",
"ModificationLossFeature",
"ModificationGainFeature",
]
import abc
from ..utils import get_constructor_call_object_creation
class SequenceFeatureExtractor(abc.ABC):
def __init__(self, pad_to_seq_length=False, padding_element=-1):
super(SequenceFeatureExtractor, self).__init__()
self.pad_to_seq_length = pad_to_seq_length
self.padding_element = padding_element
@abc.abstractmethod
def extract(self, seq, mods, **kwargs):
pass
def extract_all(self, sequences, modifications, seq_length=0):
features = []
for seq, mods in zip(sequences, modifications):
feature = self.extract(seq, mods, seq_length=seq_length)
if seq_length:
feature = self.pad_feature_to_seq_length(feature, seq_length)
features.append(feature)
return features
def pad_feature_to_seq_length(self, single_feature, seq_length=0):
feature_length = len(single_feature)
if feature_length > seq_length:
raise ValueError(
f"Feature length ({len(single_feature)}) is longer than sequence length provided ({seq_length})."
)
padding_length = seq_length - feature_length
single_feature += [self.padding_element] * padding_length
return single_feature
def __repr__(self) -> str:
return get_constructor_call_object_creation(self)
class LengthFeature(SequenceFeatureExtractor):
def __init__(self):
super(LengthFeature, self).__init__()
def extract(self, seq, mods, **kwargs):
return len(seq)
class ModificationLocationFeature(SequenceFeatureExtractor):
DICT_PTM_MOD_ATOM = {
"M[UNIMOD:35]": 4,
"S[UNIMOD:21]": 3,
"T[UNIMOD:21]": 3,
"Y[UNIMOD:21]": 3,
"R[UNIMOD:7]": 1,
"K[UNIMOD:1]": 2,
"K[UNIMOD:121]": 2,
"Q(gl)": 1,
"R[UNIMOD:34]": 2,
"K[UNIMOD:34]": 2,
"T(ga)": 3,
"S(ga)": 3,
"T(gl)": 3,
"S(gl)": 3,
"C[UNIMOD:4]": 4,
"[ac]-": 2,
"E(gl)": 1,
"K[UNIMOD:36]": 2,
"K[UNIMOD:37]": 2,
"K[UNIMOD:122]": 2,
"K[UNIMOD:58]": 2,
"K[UNIMOD:1289]": 2,
"K[UNIMOD:747]": 2,
"K[UNIMOD:64]": 2,
"K[UNIMOD:1848]": 2,
"K[UNIMOD:1363]": 2,
"K[UNIMOD:1849]": 2,
"K[UNIMOD:3]": 2,
"unknown": 1,
"R[UNIMOD:36]": 2,
"P[UNIMOD:35]": 1,
"Y[UNIMOD:354]": 1,
}
def __init__(self):
super(ModificationLocationFeature, self).__init__(pad_to_seq_length=True)
def extract(self, seq, mods, seq_length):
modified_aas = [f"{s}[UNIMOD:{m}]" for s, m in zip(seq, mods)]
feature = [
ModificationLocationFeature.DICT_PTM_MOD_ATOM.get(i, 0)
for i in modified_aas
]
return feature
class ModificationLossFeature(SequenceFeatureExtractor):
PTM_LOSS_LOOKUP = {
"M[UNIMOD:35]": [0, 0, 0, 0, 0, 0],
"S[UNIMOD:21]": [1, 0, 0, 0, 0, 0],
"T[UNIMOD:21]": [1, 0, 0, 0, 0, 0],
"Y[UNIMOD:21]": [1, 0, 0, 0, 0, 0],
"R[UNIMOD:7]": [1, 0, 1, 0, 0, 0],
"K[UNIMOD:1]": [1, 0, 0, 0, 0, 0],
"K[UNIMOD:121]": [1, 0, 0, 0, 0, 0],
"Q(gl)": [9, 4, 2, 1, 0, 0],
"R[UNIMOD:34]": [1, 0, 0, 0, 0, 0],
"K[UNIMOD:34]": [1, 0, 0, 0, 0, 0],
"T(ga)": [1, 0, 0, 0, 0, 0],
"S(ga)": [1, 0, 0, 0, 0, 0],
"T(gl)": [1, 0, 0, 0, 0, 0],
"S(gl)": [1, 0, 0, 0, 0, 0],
"C[UNIMOD:4]": [1, 0, 0, 0, 0, 0],
"[ac]-": [1, 0, 0, 0, 0, 0],
"E(gl)": [8, 4, 1, 2, 0, 0],
"K[UNIMOD:36]": [2, 0, 0, 0, 0, 0],
"K[UNIMOD:37]": [3, 0, 0, 0, 0, 0],
"K[UNIMOD:122]": [1, 0, 0, 0, 0, 0],
"K[UNIMOD:58]": [1, 0, 0, 0, 0, 0],
"K[UNIMOD:1289]": [1, 0, 0, 0, 0, 0],
"K[UNIMOD:747]": [1, 0, 0, 0, 0, 0],
"K[UNIMOD:64]": [1, 0, 0, 0, 0, 0],
"K[UNIMOD:1848]": [1, 0, 0, 0, 0, 0],
"K[UNIMOD:1363]": [1, 0, 0, 0, 0, 0],
"K[UNIMOD:1849]": [1, 0, 0, 0, 0, 0],
"K[UNIMOD:3]": [1, 0, 0, 0, 0, 0],
"unknown": [3, 0, 2, 0, 0, 0],
"R[UNIMOD:36]": [2, 0, 0, 0, 0, 0],
"P[UNIMOD:35]": [1, 0, 0, 0, 0, 0],
"Y[UNIMOD:354]": [1, 0, 0, 0, 0, 0],
}
def __init__(self):
super(ModificationLossFeature, self).__init__(
pad_to_seq_length=True, padding_element=[0, 0, 0, 0, 0, 0]
)
def extract(self, seq, mods, seq_length):
modified_aas = [f"{s}[UNIMOD:{m}]" for s, m in zip(seq, mods)]
feature = [
ModificationLossFeature.PTM_LOSS_LOOKUP.get(i, [0] * 6)
for i in modified_aas
]
return feature
class ModificationGainFeature(SequenceFeatureExtractor):
PTM_GAIN_LOOKUP = {
"M[UNIMOD:35]": [0, 0, 0, 1, 0, 0],
"S[UNIMOD:21]": [2, 0, 0, 3, 1, 0],
"T[UNIMOD:21]": [2, 0, 0, 3, 1, 0],
"Y[UNIMOD:21]": [2, 0, 0, 3, 1, 0],
"R[UNIMOD:7]": [0, 0, 0, 1, 0, 0],
"K[UNIMOD:1]": [3, 2, 0, 1, 0, 0],
"K[UNIMOD:121]": [7, 4, 2, 2, 0, 0],
"Q(gl)": [6, 4, 1, 1, 0, 0],
"R[UNIMOD:34]": [3, 1, 0, 0, 0, 0],
"K[UNIMOD:34]": [3, 1, 0, 0, 0, 0],
"T(ga)": [14, 8, 1, 5, 0, 0],
"S(ga)": [14, 8, 1, 5, 0, 0],
"T(gl)": [14, 8, 1, 5, 0, 0],
"S(gl)": [14, 8, 1, 5, 0, 0],
"C[UNIMOD:4]": [4, 2, 1, 1, 0, 0],
"[ac]-": [3, 2, 0, 1, 0, 0],
"E(gl)": [6, 4, 1, 1, 0, 0],
"K[UNIMOD:36]": [6, 2, 0, 0, 0, 0],
"K[UNIMOD:37]": [9, 3, 0, 0, 0, 0],
"K[UNIMOD:122]": [0, 1, 0, 1, 0, 0],
"K[UNIMOD:58]": [5, 3, 0, 1, 0, 0],
"K[UNIMOD:1289]": [7, 4, 0, 1, 0, 0],
"K[UNIMOD:747]": [3, 3, 0, 3, 0, 0],
"K[UNIMOD:64]": [5, 4, 0, 3, 0, 0],
"K[UNIMOD:1848]": [7, 5, 0, 3, 0, 0],
"K[UNIMOD:1363]": [5, 4, 0, 1, 0, 0],
"K[UNIMOD:1849]": [7, 4, 0, 2, 0, 0],
"K[UNIMOD:3]": [15, 10, 2, 2, 0, 1],
"unknown": [7, 2, 2, 0, 0, 0],
"R[UNIMOD:36]": [6, 2, 0, 0, 0, 0],
"P[UNIMOD:35]": [1, 0, 0, 1, 0, 0],
"Y[UNIMOD:354]": [0, 0, 1, 2, 0, 0],
}
def __init__(self):
super(ModificationGainFeature, self).__init__(
pad_to_seq_length=True, padding_element=[0, 0, 0, 0, 0, 0]
)
def extract(self, seq, mods, seq_length):
modified_aas = [f"{s}[UNIMOD:{m}]" for s, m in zip(seq, mods)]
feature = [
ModificationGainFeature.PTM_GAIN_LOOKUP.get(i, [0] * 6)
for i in modified_aas
]
return feature
import abc
import numpy as np
from pyteomics.proforma import parse
class AbstractParser(abc.ABC):
"""
Abstract class for Parsers that read sequences and split the modification information from the amino acids.
The abstract method `_parse_sequence(self, sequence)` is to be implemented by child classes.
"""
@abc.abstractmethod
def _parse_sequence(self, sequence: str):
"""parse a single sequence and return amino acids and modifications as separate data structures.
Args:
sequence (str): a modified sequence
"""
raise NotImplementedError("Not implemented.")
def _take_first_modification_proforma_output(self, mods):
# # take first non-null element (modification only) (applied to all modifications including n and c terminal)
# # ensure it is a single element and not a string
# return next(filter(lambda x: x is not None, mods), None)
return [m[0].id if m is not None else -1 for m in mods]
def _flatten_seq_mods(self, parsed_sequence: list):
"""helper function to flatten a list of tuples to two lists.
Args:
parsed_sequence (list): a list of tuples (Amino Acids, Modification) `[('A', None), ('B', Unimod:1), ('C', None)]`
Returns:
list: a list of two lists or tuples (one for Amino acids and the other for modifications). `[['A', 'B', 'C'], [None, Unimod:1, None]]`
"""
seq, mods = [list(i) for i in zip(*parsed_sequence)]
return seq, mods
def parse_sequences(self, sequences):
"""a generic function to apply the implementation of `_parse_sequence` to a list of sequencens.
Args:
sequences (list): list of string sequences, possibly with modifications.
Returns:
tuple(list, list, list, list): sequences, modifications, n_terminal modifications, c_terminal modifications
"""
seqs = []
mods = []
n_terms = []
c_terms = []
for seq in sequences:
seq, mod, n, c = self._parse_sequence(seq)
# build sequence as a string from Amino Acid list
seq = "".join(seq)
seqs.append(seq)
mods.append(mod)
n_terms.append(n)
c_terms.append(c)
seqs = np.array(seqs)
mods = np.array(mods, dtype=object)
n_terms = np.array(n_terms)
c_terms = np.array(c_terms)
return seqs, mods, n_terms, c_terms
class ProformaParser(AbstractParser):
def __init__(self):
super().__init__()
def _parse_sequence(self, sequence):
"""Implementation for parsing sequences according to the Proforma notation based on the Unimod representation.
Args:
sequence (str): sequence of amino acids, possibly with modifications.
N-term and C-term modifications have to be separated with a `-`. Example: `[Unimod:1]-ABC`
Returns:
tuple(list, list, list): output of `pyteomics.proforma.parse' with the n-term and c-term modifications
extracted from the originally returned modifiers dict.
More information: https://pyteomics.readthedocs.io/en/latest/api/proforma.html#pyteomics.proforma.parse
"""
# returns tuple (list of tuples (AA, mods), and a dict with properties)
parsed_sequence, terminal_mods_dict = parse(sequence)
n_term_mods = terminal_mods_dict.get("n_term")
c_term_mods = terminal_mods_dict.get("c_term")
if n_term_mods:
n_term_mods = n_term_mods.pop().id
else:
n_term_mods = -1
if c_term_mods:
c_term_mods = c_term_mods.pop().id
else:
c_term_mods = -1
seq, mod = self._flatten_seq_mods(parsed_sequence)
mod = self._take_first_modification_proforma_output(mod)
return seq, mod, n_term_mods, c_term_mods
import json
import pandas as pd
def read_parquet_file_pandas(filepath, parquet_engine):
"""
Reads a Parquet file located at the given filepath using pandas and the specified Parquet engine.
Parameters:
-----------
filepath : str
The file path of the Parquet file to read.
parquet_engine : str
The name of the Parquet engine to use for reading the file.
Returns:
--------
pandas.DataFrame
A pandas DataFrame containing the data from the Parquet file.
Raises:
-------
ImportError
If the specified Parquet engine is missing, fastparquet must be installed.
"""
try:
df = pd.read_parquet(filepath, engine=parquet_engine)
except ImportError:
raise ImportError(
"Parquet engine is missing, please install fastparquet using pip or conda."
)
return df
def read_json_file(filepath):
"""
Reads a JSON file located at the given filepath and returns its contents as a dictionary.
Parameters:
-----------
filepath : str
The file path of the JSON file to read.
Returns:
--------
dict
A dictionary containing the contents of the JSON file.
"""
with open(filepath, "r") as j:
json_dict = json.loads(j.read())
return json_dict
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment