Skip to content
Snippets Groups Projects
Commit c2a8f823 authored by Schneider Leo's avatar Schneider Leo
Browse files

add : apex only binary images

parent 3c8721d0
No related branches found
No related tags found
No related merge requests found
......@@ -5,8 +5,8 @@ import re
import numpy as np
import matplotlib.image as mpimg
from build_image import build_image_ms1
from image_processing.build_image import build_image_ms1_wiff
from image_processing.build_image import build_image_ms1_wiff, build_image_ms1_wiff_charge_filtered, build_image_ms1_wiff_charge_filtered_apex_only
"""
find . -name '*.mzML' -exec cp -prv '{}' '/home/leo/PycharmProjects/pseudo_image/data/raw_data' ';'
......@@ -173,15 +173,15 @@ def create_dataset():
name = label[label['path_aer'] == path.split("/")[-1]]['sample_name'].values[0]
analyse = 'AER'
if species is not None: #save image in species specific dir
directory_path_png = '../data/processed_data_wiff/png_image/{}'.format(species)
directory_path_npy = '../data/processed_data_wiff/npy_image/{}'.format(species)
directory_path_png = '../data/processed_data_wiff_clean_005_10000_apex/png_image/{}'.format(species)
directory_path_npy = '../data/processed_data_wiff_clean_005_10000_apex/npy_image/{}'.format(species)
if not os.path.isdir(directory_path_png):
os.makedirs(directory_path_png)
if not os.path.isdir(directory_path_npy):
os.makedirs(directory_path_npy)
if not os.path.isfile(directory_path_png + "/" + name + '_' + analyse + '.png'):
try :
mat = build_image_ms1_wiff(path, 1)
mat = build_image_ms1_wiff_charge_filtered_apex_only(path, bin_mz=1,tolerance=0.005,noise=10000)
mpimg.imsave(directory_path_png + "/" + name + '_' + analyse + '.png', mat)
np.save(directory_path_npy + "/" + name + '_' + analyse + '.npy', mat)
print('image create')
......@@ -202,15 +202,15 @@ def create_dataset():
name = label[label['path_aer'] == path.split("/")[-1]]['sample_name'].values[0]
analyse = 'AER'
if species is not None:
directory_path_png = '../data/processed_data_wiff/png_image/{}'.format(species)
directory_path_npy = '../data/processed_data_wiff/npy_image/{}'.format(species)
directory_path_png = '../data/processed_data_wiff_clean_005_10000_apex/png_image/{}'.format(species)
directory_path_npy = '../data/processed_data_wiff_clean_005_10000_apex/npy_image/{}'.format(species)
if not os.path.isdir(directory_path_png):
os.makedirs(directory_path_png)
if not os.path.isdir(directory_path_npy):
os.makedirs(directory_path_npy)
if not os.path.isfile(directory_path_png + "/" + name + '_' + analyse + '.png'):
try :
mat = build_image_ms1_wiff(path, 1)
mat = build_image_ms1_wiff_charge_filtered_apex_only(path, bin_mz=1,tolerance=0.005,noise=10000)
mpimg.imsave(directory_path_png + "/" + name + '_' + analyse + '.png', mat)
np.save(directory_path_npy + "/" + name + '_' + analyse + '.npy', mat)
print('image create')
......
......@@ -2,6 +2,9 @@ import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as colors
import pyopenms as oms
from matplotlib import image as mpimg
from torch.onnx.symbolic_opset12 import argmax
from pyRawMSDataReader.pyRawMSDataReader.WiffFileReader_py import WiffFileReader
......@@ -36,7 +39,7 @@ def build_image_ms1_wiff(path, bin_mz):
return im
def build_image_ms1_wiff_charge_filtered(path, bin_mz=1, tolerance=0.01):
def build_image_ms1_wiff_charge_filtered(path, bin_mz=1, tolerance=0.01, noise=10000):
#load raw data
rawFile = WiffFileReader(path)
max_cycle=0
......@@ -58,36 +61,20 @@ def build_image_ms1_wiff_charge_filtered(path, bin_mz=1, tolerance=0.01):
for scanNumber in range(total_scan_number):
if rawFile.GetMSOrderForScanNum(scanNumber) == 1:
masses, intensities = rawFile.GetCentroidMassListFromScanNum(scanNumber)
is_monocharged = [False for i in range(len(masses))]
is_bicharged = [False for i in range(len(masses))]
is_tricharged = [False for i in range(len(masses))]
for i in range(len(masses)):
m = masses[i]
found_s = True
found_b = True
found_t = True
s = 1
b=1
t=1
if not is_monocharged[i]:
dec = 1
while found_s :
found_s=False
while (i + dec) < len(masses) and masses[i + dec] - m < (s + tolerance):
if masses[i + dec] - m > (1 - tolerance):
is_monocharged[i] = True
is_monocharged[i + dec] = True
found_s=True
dec += 1
s+=1
if not is_bicharged[i]:
dec = 1
while found_b :
found_b=False
while (i + dec) < len(masses) and masses[i + dec] - m < (b / 2 + tolerance):
if masses[i + dec] - m > (1 / 2 - tolerance):
if masses[i + dec] - m > (b / 2 - tolerance):
is_bicharged[i] = True
is_bicharged[i + dec] = True
found_b = True
......@@ -108,13 +95,102 @@ def build_image_ms1_wiff_charge_filtered(path, bin_mz=1, tolerance=0.01):
line = np.zeros(n_bin_ms1)
if len(masses) > 0:
for k in range(len(masses)):
if is_bicharged[k] or is_tricharged[k] :
if (is_bicharged[k] or is_tricharged[k]) and intensities[k]>=noise:
line[int((masses[k] - ms1_start_mz) // size_bin_ms1)] += intensities[k]
im[cycle, :] = line
cycle += 1
rawFile.Close()
return im
#TODO calculer les apex au fil de l'eau, trop long sinon
def build_image_ms1_wiff_charge_filtered_apex_only(path, bin_mz=1, tolerance=0.01, noise=10000):
#load raw data
rawFile = WiffFileReader(path)
max_cycle=0
total_scan_number = rawFile.GetLastSpectrumNumber()
for scanNumber in range (total_scan_number):
if rawFile.GetMSOrderForScanNum(scanNumber) == 1 :
ms1_start_mz = rawFile.source.ScanInfos[scanNumber].LowMz
ms1_end_mz = rawFile.source.ScanInfos[scanNumber].HighMz
max_cycle+=1
print('start', ms1_start_mz, 'end', ms1_end_mz)
total_ms1_mz = ms1_end_mz - ms1_start_mz
n_bin_ms1 = int(total_ms1_mz // bin_mz)
size_bin_ms1 = total_ms1_mz / n_bin_ms1
mass_list_ms1=[]
intensity_list_ms1 = []
for scanNumber in range(total_scan_number):
if rawFile.GetMSOrderForScanNum(scanNumber) == 1:
masses, intensities = rawFile.GetCentroidMassListFromScanNum(scanNumber)
mass_list_ms1.append(masses)
intensity_list_ms1.append(intensities)
rawFile.Close()
cluster_list=[]
cluster_ref_2 = []
cluster_ref_3 = []
for cycle in range(len(mass_list_ms1)):
for i in range(len(mass_list_ms1[cycle])):
is_in_cluster = False
mass, intensity = mass_list_ms1[cycle][i], intensity_list_ms1[cycle][i]
if intensity>=noise:
#check id peak belongs to a previous cluster
for cycle_ref,mass_ref,cluster_id in cluster_ref_2 :
if cycle_ref==cycle:
if mass_ref - tolerance < mass_list_ms1[cycle][i] < mass_ref + tolerance :
if mass - cluster_list[cluster_id][0][1] > 1/(3 - 2 * tolerance) :
cluster_list[cluster_id] = [max([cluster_list[cluster_id][0],(cycle,mass,intensity)],key=lambda x : x[2]),cluster_list[cluster_id][1]+ 1]
else :
cluster_list[cluster_id] = [max([cluster_list[cluster_id][0], (cycle, mass, intensity)], key=lambda x: x[2]),cluster_list[cluster_id][1] + 1]
if not (cycle + 1,mass_ref, cluster_id) in cluster_ref_2:
cluster_ref_2.append((cycle + 1,mass_ref, cluster_id))
is_in_cluster = True
if not (cycle,mass_ref+1/2,cluster_id) in cluster_ref_2 :
cluster_ref_2.append((cycle,mass_ref+1/2,cluster_id))
for cycle_ref,mass_ref,cluster_id in cluster_ref_3 :
if cycle_ref==cycle:
if mass_ref - tolerance < mass_list_ms1[cycle][i] < mass_ref + tolerance :
cluster_list[cluster_id].append((cycle,mass,intensity))
if not ( cycle + 1,mass_ref, cluster_id) in cluster_ref_3:
cluster_ref_3.append(( cycle + 1,mass_ref, cluster_id))
is_in_cluster = True
if not (cycle,mass_ref+1/3,cluster_id) in cluster_ref_3 :
cluster_ref_3.append((cycle,mass_ref+1/3,cluster_id))
#else create a new cluster based on this peak
if not is_in_cluster :
cluster_ref_2.append((cycle,mass+1/2,len(cluster_list)))
cluster_ref_2.append((cycle+1, mass, len(cluster_list)))
cluster_list.append([(0,0,0),0]) #(cycle mass intensity),number of heavy isotop
cluster_ref_3.append((cycle, mass + 1 / 3, len(cluster_list)))
cluster_ref_3.append((cycle + 1, mass, len(cluster_list)))
cluster_list.append([(0,0,0),0]) #(cycle mass intensity),number of heavy isotop
#remove old cluster ref to improve perf (optional)
for cycle_ref,mass_ref,cluster_id in cluster_ref_2 :
if cycle_ref == cycle :
cluster_ref_2= [x for x in cluster_ref_2 if x != (cycle_ref,mass_ref,cluster_id)]
for cycle_ref,mass_ref, cluster_id in cluster_ref_3 :
if cycle_ref == cycle :
cluster_ref_3 = [x for x in cluster_ref_3 if x != (cycle_ref,mass_ref,cluster_id)]
apex_list = []
for cluster in cluster_list : # cluster = (cycle,mass,intensity)
if cluster[1]>=1: #at least a heavy isotop detected (valid istope group)
apex_list.append(cluster[0])
#create image based on detected valid apexes
im = np.zeros([max_cycle, n_bin_ms1])
for cycle, mass, _ in apex_list:
im[int(cycle), int((mass - ms1_start_mz) // size_bin_ms1)] = 1
return im
def build_image_ms1(path, bin_mz):
#load raw data
e = oms.MSExperiment()
......@@ -162,17 +238,5 @@ def build_image_ms1(path, bin_mz):
if __name__ == '__main__':
path = '../data/raw_data/SERMAR-10-AER-d200.wiff'
im_filtered = build_image_ms1_wiff_charge_filtered(path,tolerance=0.0001)
# im_base = build_image_ms1_wiff(path,bin_mz=1)
im_filtered=np.log(im_filtered+1)
# im_base=np.log(im_base+1)
plt.imshow(im_filtered, interpolation='nearest')
plt.savefig('filtered_image_tol_00001.png')
plt.clf()
# plt.imshow(im_base, interpolation='nearest')
# plt.savefig('base_image.png')
# plt.clf()
\ No newline at end of file
mat = build_image_ms1_wiff_charge_filtered_apex_only(path)
mpimg.imsave('test.png', mat)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment