-
George Marchment authored59e05ba6
workflow.py 45.15 KiB
#Import dependencies
#Local
from .nextflow_file import Nextflow_File
from .ro_crate import RO_Crate
from . import constant
from .outils import is_git_directory, format_with_tabs, replace_thing_by_call, replace_group1, group_together_ifs, extract_curly, remove_extra_jumps, get_channels_to_add_in_false_conditions
from .outils_graph import flatten_dico, initia_link_dico_rec, get_number_cycles
from .outils_annotate import get_tools_commands_from_user_for_process
from .bioflowinsighterror import BioFlowInsightError
from .graph import Graph
#Outside packages
import os
import re
import json
from pathlib import Path
import glob
import ctypes
import time
class Workflow:
"""
This is the main workflow class, from this class, workflow analysis can be done.
After analysis, workflow structure reconstruction can be done.
Attributes:
file: A string indicating the address to the workflow main or the directory containing the workflow
duplicate: A boolean indicating if processes are to be duplicated in the structure
display_info: A boolean indicating if the analysis information should be printed
output_dir: A string indicating where the results will be saved
name: A string indicating the name of the workflow
processes_2_remove: A string indicating the processes to remove from the workflow
"""
def __init__(self, file, duplicate=False, display_info=True, output_dir = './results',
name = None, processes_2_remove = None):
#Getting the main nextflow file
if(not os.path.isfile(file)):
nextflow_files = glob.glob(f'{file}/*.nf')
if(len(nextflow_files)==0):
raise BioFlowInsightError("No Nextflow files ('.nf') are in the directory!", num = -1)
txt = ""
#Try to read the main.nf file -> if this cannot be found then the first nextflow file is used
try:
file = file+"/main.nf"
with open(file, 'r') as f:
txt= f.read()
except:
None
#raise BioFlowInsightError("No 'main.nf' file found at the root of the prohject")
if(txt==""):
if(len(nextflow_files)==1):
file = nextflow_files[0]
with open(file, 'r') as f:
txt= f.read()
else:
raise BioFlowInsightError("Multiple Nextflow files found at the root with no 'main.nf' file: I don't know which one to select")
self.duplicate = duplicate
self.display_info = display_info
self.processes_2_remove = processes_2_remove
self.output_dir = Path(output_dir)
self.nextflow_files = []
self.workflow_directory = '/'.join(file.split('/')[:-1])
self.name = name
self.graph = None
self.conditions_2_ignore = []
OG_file = Nextflow_File(file, workflow = self, first_file = True)
self.DSL = OG_file.find_DSL()
self.create_empty_results()
if(self.display_info):
print(f"Workflow is written in {self.DSL}")
def create_empty_results(self):
os.makedirs(self.output_dir, exist_ok=True)
os.makedirs(self.output_dir / 'debug', exist_ok=True)
os.makedirs(self.output_dir / 'graphs', exist_ok=True)
with open(self.output_dir / "debug" / "operations.nf",'w') as file:
pass
with open(self.output_dir / "debug" / "calls.nf",'w') as file:
pass
with open(self.output_dir / "debug" / "operations_in_call.nf",'w') as file:
pass
def get_root_directory(self):
first_file = self.get_first_file()
return '/'.join(str(first_file.get_file_address()).split('/')[:-1])+"/"
def get_conditions_2_ignore(self):
return self.conditions_2_ignore
def get_duplicate_status(self):
return self.duplicate
def get_output_dir(self):
return Path(self.output_dir)
def get_DSL(self):
return self.DSL
def get_display_info_bool(self):
return self.display_info
def set_DSL(self, DSL):
self.DSL = DSL
def get_first_file(self):
for file in self.nextflow_files:
if(file.first_file):
return file
def get_workflow_main(self):
return self.get_first_file().main
def add_nextflow_file_2_workflow(self, nextflow_file):
self.nextflow_files.append(nextflow_file)
self.nextflow_files = list(set(self.nextflow_files))
def initialise(self, conditions_2_ignore = []):
"""Method that initialises the analysis of the worflow
Keyword arguments:
"""
self.conditions_2_ignore = conditions_2_ignore
#Right now i'm just gonna do everything in DSL2
#At this point there should only be one nextflow file
if(len(self.nextflow_files)==1):
self.nextflow_files[0].initialise()
else:
raise BioFlowInsightError("This souldn't happen. There are multiple Nextflow files composing the workflow before the analysis has even started.")
if(self.display_info):
citation = """\nTo cite BioFlow-Insight, please use the following publication:
George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen-Boulakia, BioFlow-Insight: facilitating reuse of Nextflow workflows with structure reconstruction and visualization, NAR Genomics and Bioinformatics, Volume 6, Issue 3, September 2024, lqae092, https://doi.org/10.1093/nargab/lqae092"""
print(citation)
if(self.graph==None):
self.graph = Graph(self)
def iniatilise_tab_processes_2_remove(self):
if(self.processes_2_remove==None):
tab_processes_2_remove = []
if(self.processes_2_remove!=None):
temp = self.processes_2_remove.split(",")
for t in temp:
tab_processes_2_remove.append(t.strip())
self.processes_2_remove = tab_processes_2_remove
def get_structure(self):
dico = {}
dico['nodes'] = []
dico['edges'] = []
dico['subworkflows'] = {}
if(self.get_DSL() == "DSL1"):
main = self.get_workflow_main()
if(main!=None):
return main.get_structure(dico)
elif(self.get_DSL() == "DSL2"):
main = self.get_workflow_main()
if(main!=None):
return main.get_structure(dico)
else:
return dico
#return self.get_structure_DSL2(dico=dico, start = True)
else:
raise Exception(f"The workflow's DSL is '{self.DSL}' -> I don't know what this is!")
#################
# GRAPHS
#################
def generate_specification_graph(self, render_graphs = True):
self.iniatilise_tab_processes_2_remove()
self.graph.initialise(processes_2_remove = self.processes_2_remove)
self.graph.get_specification_graph(render_graphs = render_graphs)
#TODO -> update this
def generate_all_graphs(self, render_graphs = True):
self.generate_specification_graph(render_graphs = render_graphs)
#Method that checks if a given graph sepcification is an isomorphism with the workflows
def check_if_equal(self, file):
self.iniatilise_tab_processes_2_remove()
return self.graph.check_if_equal(file, processes_2_remove = self.processes_2_remove)
###########################
# Generate test data
###########################
#These are the methods which generate the test data
def generate_test_specification_graph(self):
dico = self.graph.get_full_dico()
with open(self.get_output_dir()/ 'test' /"specification_graph.json", "w") as outfile:
json.dump(dico, outfile, indent = 4)
def generate_all_executors(self):
executors = self.get_workflow_main().get_all_executors_in_workflow()
dico= {}
for e in executors:
dico[str(e)] = e.get_code(get_OG = True)
with open(self.get_output_dir()/ 'test' /"all_executors.json", "w") as outfile:
json.dump(dico, outfile, indent = 4)
def generate_executors_per_subworkflows(self):
subs = self.get_subworkflows_called()
dico= {}
for s in subs:
dico[str(s)]= {}
executors = s.get_all_executors_in_workflow()
for e in executors:
dico[str(s)][str(e)] = e.get_code(get_OG = True)
with open(self.get_output_dir()/ 'test' /"executors_per_subworkflows.json", "w") as outfile:
json.dump(dico, outfile, indent = 4)
def generate_all_processes(self):
processes = self.get_processes_called()
dico= {}
for p in processes:
dico[str(p)] = p.get_code()
with open(self.get_output_dir()/ 'test' /"all_processes.json", "w") as outfile:
json.dump(dico, outfile, indent = 4)
def generate_all_subworkflows(self):
subs = self.get_subworkflows_called()
dico= {}
for s in subs:
dico[str(s)] = s.get_code()
with open(self.get_output_dir()/ 'test' /"all_subworkflows.json", "w") as outfile:
json.dump(dico, outfile, indent = 4)
def generate_all_test_data(self):
self.generate_test_specification_graph()
self.generate_all_executors()
self.generate_all_processes()
self.generate_all_subworkflows()
self.generate_executors_per_subworkflows()
#Returns a dico of number of processes called per each condition
#For example : {condition1: 14, condition2: 10, condition:3}
#14 process calls depend on condition1
#10 process calls depend on condition2
#3 process calls depend on condition3
def get_most_influential_conditions(self, show_values = True):
if(self.get_duplicate_status()):
most_influential_conditions = self.get_workflow_main().get_most_influential_conditions()
#If show values then we replace the the conditions ids with their values
if(show_values):
most_influential_conditions_values = {}
for condition in most_influential_conditions:
try:
t = most_influential_conditions_values[condition.get_value()]
except:
most_influential_conditions_values[condition.get_value()] = 0
most_influential_conditions_values[condition.get_value()] += most_influential_conditions[condition]
most_influential_conditions = most_influential_conditions_values
#Sort the dico
most_influential_conditions = {k: v for k, v in sorted(most_influential_conditions.items(), key=lambda item: item[1], reverse=True)}
return most_influential_conditions
else:
BioFlowInsightError("Need to activate 'duplicate' mode to use this method.")
#When there are multiple emits turn them into one and the end of the call eg, instead of fastp_ch2 = fastp.out.fastp_ch2 -> have fastp_ch2 = fastp_ch
def convert_to_DSL2(self):
if(self.get_DSL()=="DSL2"):
print("Workflow is already written in DSL2")
else:
#This tag is used as an identification to safely manipulate the string
tag = str(time.time())
nextflow_file = self.get_first_file()
code = nextflow_file.get_code()
start_code = r"#!/usr/bin/env nextflow"
start_code_pattern = r"\#\!\s*\/usr\/bin\/env\s+nextflow"
end_code = "workflow.onComplete"
pos_start, pos_end= 0, len(code)
if(code.find(end_code)!=-1):
pos_end = code.find(end_code)
code_to_replace = code[pos_start:pos_end]
for match in re.finditer(start_code_pattern, code):
pos_start = match.span(0)[1]+1
#if(code.find(start_code)!=-1):
# pos_start = code.find(start_code)+len(start_code)
body = code[pos_start:pos_end]#.replace('\n', '\n\t')
include_section = f"//INCLUDE_SECTION_{tag}"
params_section = f"//PARAMS_SECTION_{tag}"
function_section = f"//FUNCTION_SECTION_{tag}"
process_section = f"//PROCESS_SECTION_{tag}"
code = code.replace(code_to_replace, f"""{start_code}\n\n\n{include_section}\n\n\n{params_section}\n\n\n{function_section}\n\n\n{process_section}\n\n\nworkflow{{\n\n{body}\n}}\n\n""")
##I've out this in a comment cause since it's a DSL1
#params_list = []
#for match in re.finditer(r"params.\w+ *\= *[^\n=]([^\n])*", code):
# params_list.append(match.group(0))
#for params in params_list:
# code = code.replace(params, "")
#params_code = "\n".join(params_list)
#code = code.replace(params_section, params_code)
#Moving Functions
functions = []
for f in nextflow_file.functions:
function = f.get_code()
functions.append(function)
for r in functions:
code = code.replace(r, "")
code = code.replace(function_section, "\n\n".join(functions))
#Moving Processes
processes = []
to_replace = []
for p in nextflow_file.get_processes():
new_process, call = p.convert_to_DSL2()
processes.append(new_process)
to_replace.append((p.get_code(), call))
for r in to_replace:
code = code.replace(r[0], r[1])
code = code.replace(process_section, "\n\n".join(processes))
#TODO -> update the operations -> also consider the operations in the params of the calls which need to be updated
for o in self.get_workflow_main().get_all_executors_in_workflow():
if(o.get_type()=="Operation"):
code = code.replace(o.get_code(get_OG=True), o.convert_to_DSL2())
else:
raise Exception(f"Executor of type '{o.get_type()}' was extracted in a DSL1 workflow! This shoudn't happen! The code is '{o.get_code()}'")
#Putting || back
code = code.replace("$OR$", "||")
return code
#This methods generates a random set of processes to consider as relavant
#It's not a uniform random it's a bit of a gaussian, centered at 0.5
def generate_random_relevant_processes(self, alpha = -1):
import random
#Random value between 0 and 1, centered at 0.5
def get_value():
check = True
val = -1
while(check):
check = False
val = random.gauss(0.5, 0.1)
if(val<0 or val>1):
check = True
return val
if(self.duplicate):
if(alpha == -1):
alpha = get_value()
else:
if(0<=alpha and alpha<=1):
None
else:
raise BioFlowInsightError("alpha is not in the interval [0; 1]")
processes_called = []
for c in self.get_workflow_main().get_all_calls_in_workflow():
p = c.get_first_element_called()
if(p.get_type()=="Process"):
processes_called.append(p)
nb_2_select = int(alpha*len(processes_called))
sampled = random.sample(set(processes_called), nb_2_select)
name_select = []
for p in sampled:
name_select.append(p.get_alias())
return name_select
else:
raise BioFlowInsightError("Trying to generate random relevant processes however option 'duplicate' is not activated.")
#This method rewrites the entire workflow into one single file
def write_workflow_into_one_file(self):
#This tag is used as an identification to safely manipulate the string
tag = str(time.time())
self.get_first_file
code = self.get_first_file().get_code()
#params_section = f"//PARAMS_SECTION_{tag}"
function_section = f"//FUNCTION_SECTION"
process_section = f"//PROCESS_SECTION"
subworkflow_section = f"//SUBWORKFLOW_SECTION"
ankers = function_section+ "\n"*3 + process_section+ "\n"*3 + subworkflow_section
to_replace = []
for match in re.finditer(constant.FULL_INLCUDE_2, code):
to_replace.append(match.group(0))
for r in to_replace:
code = code.replace(r, ankers)
ankers = ""
processes, subworkflows, functions = [], [], []
for c in self.get_workflow_main().get_all_calls_in_workflow():
ele = c.get_first_element_called()
if(ele.get_type()=="Process"):
processes.append(ele)
elif(ele.get_type()=="Subworkflow"):
subworkflows.append(ele)
elif(ele.get_type()=="Function"):
functions.append(ele)
else:
raise Exception("This shoudn't happen")
#Adding processes into code
for p in processes:
if(p.get_code() not in code):
code = code.replace(process_section, '\n'+p.get_code_with_alias()+'\n'+process_section)
#Adding subworkflows into code
for sub in subworkflows:
if(sub.get_code() not in code):
code = code.replace(subworkflow_section, subworkflow_section+'\n'+sub.get_code_with_alias()+'\n')
#Adding functions into code
for fun in functions:
if(fun.get_code() not in code):
code = code.replace(function_section, function_section+'\n'+fun.get_code()+'\n')
#Remove the ankers
#code = code.replace(function_section, "")
#code = code.replace(process_section, "")
#code = code.replace(subworkflow_section, "")
ankers = {"function_section":function_section,
"process_section":process_section,
"subworkflow_section":subworkflow_section}
return code, ankers
#TODO -> write tests for this method
#Function that rewrites the workflow code
#Rewriting everything in one file + simplifying the operations and calls to simplify the analysis
def simplify_workflow_code(self):
code = self.get_first_file().get_code()
code, ankers = self.write_workflow_into_one_file()
all_executors = self.get_workflow_main().get_all_executors_in_workflow()
#We do this so that the longest operation and calls are rewritten first in the code -> to avoid problems
executor_2_length = {}
for e in all_executors:
executor_2_length[e] = len(e.get_code(get_OG = True))
sorted_executor_2_length = {k: v for k, v in sorted(executor_2_length.items(), key=lambda item: item[1], reverse=True)}
for exe in sorted_executor_2_length:
if(exe.get_type()=="Call" or exe.get_type()=="Operation"):
code = code.replace(exe.get_code(get_OG = True), exe.simplify_code(), 1)
else:
print(exe.get_code(), exe.get_type())
raise Exception("This shouldn't happen")
return code
def get_subworkflows_called(self):
subs = []
for c in self.get_workflow_main().get_all_calls_in_workflow():
ele = c.get_first_element_called()
if(ele.get_type()=="Subworkflow"):
subs.append(ele)
return subs
def get_processes_called(self):
subs = []
for c in self.get_workflow_main().get_all_calls_in_workflow():
ele = c.get_first_element_called()
if(ele.get_type()=="Process"):
subs.append(ele)
return subs
def rewrite_and_initialise(self, code):
#Write new code in temporary file
temp_file = self.get_output_dir()/f"temp_{str(self)[-7:-2]}.nf"
with open(temp_file, "w") as file:
file.write(code)
#Replace old analysis with new analysis (simplified code)
self.__init__(str(temp_file), display_info = False, duplicate=True)
self.initialise()
def check_relevant_processes_in_workflow(self, relevant_processes):
#Check all relevat processes are in wf
workflow_processes = []
for c in self.get_workflow_main().get_all_calls_in_workflow():
ele = c.get_first_element_called()
if(ele.get_type()=="Process"):
workflow_processes.append(ele.get_alias())
for p in relevant_processes:
if(p not in workflow_processes):
raise BioFlowInsightError(f"The element '{p}' given as a relevant processes is not present in the workflow's processes", 24)
def generate_user_view(self, relevant_processes = [], render_graphs = True, processes_2_remove = []):
self.graph.initialise(processes_2_remove = processes_2_remove)
self.graph.generate_user_view(relevant_processes = relevant_processes, render_graphs = render_graphs)
#I do not recommand that the dev uses the same name for the channels inside and outside the channels
#Since the local channels become local at the upper level
def rewrite_subworkflow_call(self, code, subworklfow):
#Remove the defintion from the code
code = code.replace(subworklfow.get_code(), "")
OG_call = subworklfow.get_call()
OG_body = subworklfow.get_work()
#REPLACE HEADER TAKES
subworkflow_takes = subworklfow.get_takes()
parameters = OG_call.get_parameters()
if(len(subworkflow_takes)!=len(parameters)):
raise Exception("This shouldn't happen -> the same number of parameters should be kept")
#This is to replace the paramters and the takes
new_header = ""
for i in range(len(parameters)):
param = parameters[i]
takes = subworkflow_takes[i].get_gives()[0]
#Here we're checking that the input inside and outside the subworkflow are the same
#If they are we're going to remove everything to avoid the case
"""
param_1 = fastq
sub(param_1)
AND in the subworkflow
sub{
take:
fastq
}
----------
This would mean when removing the subworkflow def -> we would get this:
param_1 = fastq
fastq = param_1
"""
#Obviously we want to avoid this case
input_val = ""
try:
input_val = param.origins[0].get_source()[0].get_origins()[0].get_name()
except:
input_val = param.get_code(get_OG = True)
if(takes.get_code()!=input_val):
new_header+=f"{takes.get_code()} = {param.get_code(get_OG = True)}"
else:
#In the case they are the same -> we remove the remaining operation (which doesn't serve a purpose)
#The "param_1 = fastq" operation
operation_code = param.origins[0].get_source()[0].get_code()
code = code.replace(operation_code, "", 1)
code = code.replace(OG_call.get_code(get_OG = True), f"{new_header}\n\n{OG_body}", 1)
#REPLACE THE EMITS
#TODO admittedly this code below is very moche -> but it's functionnal -> update it
emits = subworklfow.get_emit()
to_replace = []
all_executors = self.get_workflow_main().get_all_executors_in_workflow()
for exe in all_executors:
#We don't need to check the case call since the workflow has already been rewriteen -> emits only appear in operations
if(exe.get_type()=="Operation"):
emited = exe.get_origins()
if(len(emited)==1):
emited = emited[0]
if(emited.get_type()=="Emitted"):
if(emited.get_emitted_by().get_first_element_called()==subworklfow):
if(emited.get_emits() not in emits):
raise Exception("This shoudn't happen -> since it is the actual subworkflow")
to_replace.append((exe.get_code(get_OG = True), f"{exe.get_gives()[0].get_code()} = {emited.get_emits().get_origins()[0].get_code()}"))
for r in to_replace:
old, new = r
#Case of channel == channel
if(new.split("=")[0].strip()==new.split("=")[1].strip()):
new = ''
code = code.replace(old, new)
return code
#This Function returns the channels on which the subworkflow (things_added_in_cluster) depend on
def get_takes(self, things_added_in_cluster):
#Basiccaly this is a deco of channels to opeartions -> when the value is an empty list
#This means that the channel is totally definied in the subworkflow -> so we are searching for
#Channels which aren't totatly defined in the subworkflow
channels_2_sources = {}
for ele in things_added_in_cluster:
if(ele.get_type() == "Operation"):
for o in ele.get_origins():
channels_2_sources[o] = replace_thing_by_call(o.get_source())
elif(ele.get_type() == "Call"):
for param in ele.get_parameters():
if(param.get_type()=="Channel"):
print(param, param.get_code())
raise Exception("This shouldn't happen -> with the rewrite all the params should be channels")
else:
for o in param.get_origins():
if(o.get_type()=="Channel"):
channels_2_sources[o] = replace_thing_by_call(o.get_source())
else:
raise Exception("This shouldn't happen -> with the rewrite all the params should be channels")
else:
raise Exception("This shouldn't happen")
takes = []
for channel in channels_2_sources:
if(set(channels_2_sources[channel]).intersection(things_added_in_cluster)!=set(channels_2_sources[channel])):
takes.append(channel)
#print(things_added_in_cluster)
#print(channels_2_operations_needed)
return takes
#This Function returns the channels the subworkflow (things_added_in_cluster) emits (other things depend on)
def get_emits(self, things_added_in_cluster):
emits = []
channel_2_sink = {}
#Basiccaly this is a deco of channels to opeartions -> when the value is an empty list
#This means that the channel is totally definied in the subworkflow -> so we are searching for
#Channels which aren't totatly defined in the subworkflow
#This means that things outside the subworkflow depend on this channel
channel_2_sink = {}
for ele in things_added_in_cluster:
if(ele.get_type() == "Operation"):
for o in ele.get_gives():
channel_2_sink[o] = replace_thing_by_call(o.get_sink())
elif(ele.get_type() == "Call"):
#thing = ele.get_first_element_called()
for e in ele.get_later_emits():
channel_2_sink[e] = replace_thing_by_call(e.get_sink())
else:
print(ele)
raise Exception("This shouldn't happen")
for channel in channel_2_sink:
if(set(channel_2_sink[channel]).intersection(things_added_in_cluster)!=set(channel_2_sink[channel])):
emits.append(channel)
return emits
#Method which rewrites the workflow follwong the user view
#Conert workflow to user_view only makes sense when the option duplicate is activated -> otherwise is doesn't make sense + it makes the analysis way more complicated
def convert_workflow_2_user_view(self, relevant_processes = []):
if(self.duplicate):
code = self.simplify_workflow_code()
self.rewrite_and_initialise(code)
#Get the clusters and the code
self.check_relevant_processes_in_workflow(relevant_processes)
self.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = [])
clusters = self.graph.get_clusters_from_user_view()
#DETERMING WHICH SUBWORKFLOWS ARE BROKEN WITH THE CLUSTER
#Creating the clusters with calls instead of processes or subworkflows
set_clusters_with_calls = []
for c in clusters:
tab = []
for ele in c:
if(ele.get_type()=="Operation"):
if(ele.get_artificial_status()==False):
tab.append(ele)
else:
call = ele.get_call()
tab.append(call)
set_clusters_with_calls.append(set(tab))
#Getting subworkflows to executors
subworkflow_2_executors = {}
for sub in self.get_subworkflows_called():
executors = sub.get_all_executors_in_workflow()
subworkflow_2_executors[sub] = []
for ele in executors:
#Cannot add calls to subworkflows -> they are already present by definition
if(ele.get_type()=="Call" and ele.get_first_element_called().get_type()=="Subworkflow"):
None
#We don't add it
else:
subworkflow_2_executors[sub].append(ele)
#subworkflow_2_executors[sub.get_name()] = set(list(dico.keys()))
#TODO -> write tests to test this function
def get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls):
broken_subworkflows = []
for sub in subworkflow_2_executors:
#You want this list (set) to be equal to subworkflow_2_executors[sub]
elements_in_sub_with_clusters = []
for cluster in set_clusters_with_calls:
if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])):
break
for c in cluster:
if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])):
break
if(c in subworkflow_2_executors[sub]):
elements_in_sub_with_clusters+=list(cluster)
break
if(set(elements_in_sub_with_clusters)==set(subworkflow_2_executors[sub])):
None
#This means that the subworkflow is Intact
else:
#This means that the subworkflow is broken
broken_subworkflows.append(sub)
return broken_subworkflows
broken_subworkflows = get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls)
#Rewrite broken subworkflows
for sub in broken_subworkflows:
code = self.rewrite_subworkflow_call(code, sub)
#TODO -> this needs to be optimised
self.rewrite_and_initialise(code)
#Get the clusters and the code
self.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = [])
clusters = self.graph.get_clusters_from_user_view()
#Get the clsuters with the corresponding operations inside
#for i in range(len(clusters)):
# c = clusters[i]
# if(len(c)>1):
# clusters[i] = self.nextflow_file.graph.get_induced_subgraph(c)
#print(clusters)
#Get the topological order
clusters = self.graph.get_topogical_order(clusters)
#Creating the subworkflows from clusters
calls_in_operations = []
non_relevant_name = 1
channels_to_replace_outside_of_cluster = []
subworkflow_clusters_to_add, subworkflow_cluster_calls_to_add = [], []
index_cluster = 0
for elements in clusters:
#Only create the subworkflows for clusters with more than one element
processes_added = []
things_added_in_cluster = []
if(len(elements)>1):
name, body, take, emit = "", "", "", ""
first_element = True
for ele in elements:
if(ele.get_type()=="Process"):
#Determine the name of the created subworkflow cluster
if(ele.get_alias() in relevant_processes):
name = f"cluster_{ele.get_alias()}"
#Get the call of thing (either process or subworkflow)
call = ele.get_call()
#If first element -> add marker for the subworkflow call
if(first_element):
code = code.replace(call.get_code(get_OG = True), f"//Anker_cluster{index_cluster}")
first_element = False
else:
code = code.replace(call.get_code(get_OG = True), "")
processes_added.append(call.get_first_element_called())
values = []
for condition in call.get_all_conditions():
values.append(condition.get_value())
printed_condition = " && ".join(values)
if(printed_condition!=""):
body+=f"if({printed_condition}) {{\n{call.get_code()}\n}}\n"
else:
body+=f"\n{call.get_code()}\n"
things_added_in_cluster.append(call)
#Below
elif(ele.get_type()=="Operation"):
#TODO -> check this verification there might be some "effet de bord"
if(not ele.get_artificial_status()):
#If first element -> add marker for the subworkflow call
print(ele.get_code(get_OG = True))
if(first_element):
code = code.replace(ele.get_code(get_OG = True), f"//Anker_cluster{index_cluster}", 1)
first_element = False
else:
code = code.replace(ele.get_code(get_OG = True), "", 1)
#Ignore these cases
if(ele.get_code()[:4] not in ["emit", "take"]):
origins = ele.get_origins()
for o in origins:
if(o.get_type()=="Call"):
calls_in_operations.append(o)
values = []
for condition in ele.get_all_conditions():
values.append(condition.get_value())
printed_condition = " && ".join(values)
if(printed_condition!=""):
body+=f"if({printed_condition}) {{\n{ele.get_code()}\n}}\n"
else:
body+=f"\n{ele.get_code()}\n"
things_added_in_cluster.append(ele)
else:
raise Exception("This shoudn't happen")
#TODO check this part of the code is never seen
#Here we removing the Call_12313 thing
for call in calls_in_operations:
raise Exception("This shouldn't happen since the workflows has been rewritten")
body = body.replace(call.get_code(), "")
body = body.replace(str(call), call.get_code())
#If the name=="" -> it means there isn't any relevant processes in the cluster -> it means it's a cluster of non relevant nodes
if(name==""):
#If there are processes called we are going to use them
if(len(processes_added)>0):
#TODO find a better naming system
name = f"non_relevant_cluster_{processes_added[0].get_alias()}"
else:
#TODO find a better naming system
name = f"non_relevant_cluster_{non_relevant_name}"
non_relevant_name+=1
#Check that there is a single condtion in the body
body = group_together_ifs(body)
conditions_in_subworkflow = []
end = -1
for match in re.finditer(r"if\s*(\([^\{]+)\{", body):
conditions_in_subworkflow.append(match.group(1).strip())
_, start = match.span(0)
if(len(conditions_in_subworkflow)==1):
end = extract_curly(body, start)
body = body[start: end-1].strip()
#TAKE
#Adding take parameters on the inside of the subworkflow
takes_param = self.get_takes(things_added_in_cluster)
new_param_names, index, old_param_names = [], 1, []
for param in takes_param:
param_name = f"param_{name}_{index}"
new_param_names.append(param_name)
old_param_names.append(param.get_code())
index += 1
if(len(new_param_names)>0):
temp = '\n'.join(new_param_names)
take = f"\ntake:\n{temp}\n"
#EMIT
#Adding the emitted outputs
emitted_outputs = self.get_emits(things_added_in_cluster)
new_output_names, index, old_output_names = [], 0, []
for output in emitted_outputs:
output_name = f"{name}.out[{index}]"
new_output_names.append(output_name)
old_output_names.append(output.get_code())
index += 1
if(len(old_output_names)>0):
temp = '\n'.join(old_output_names)
emit = f"\nemit:\n{temp}\n"
#Adding empty channels if it doesn't exist in the case of a negative condition
body = get_channels_to_add_in_false_conditions(body, old_output_names)
#Replace names inside subworkflow
subworkflow_code = f"workflow {name} {{\n{take}\nmain:\n{body}\n{emit}\n}}"
for i in range(len(new_param_names)):
pattern = fr"[\=\,\(] *({re.escape(takes_param[i].get_code())})[\s\,\)\.]"
subworkflow_code = replace_group1(subworkflow_code, pattern, new_param_names[i])
#subworkflow_code = subworkflow_code.replace(takes_param[i].get_code(), new_param_names[i])
#TODO -> added verification of conditions
params = ", ".join(old_param_names)
subworkfow_call_case_true = f"{name}({params})"
subworkfow_call_case_false = ""
for i in range(len(new_output_names)):
#In the case of channels, we just add chanel = subworkflow.out[i]
if(not bool(re.findall("\.\s*out", old_output_names[i]))):
subworkfow_call_case_true+=f"\n{old_output_names[i]} = {new_output_names[i]}"
subworkfow_call_case_false+=f"\n{old_output_names[i]} = Channel.empty()"
#In the case of emitted values we need to replace the code on the outside
else:
param_out_name= f"{name}_out_{i+1}"
subworkfow_call_case_true+=f"\n{param_out_name} = {new_output_names[i]}"
subworkfow_call_case_false+=f"\n{param_out_name} = Channel.empty()"
channels_to_replace_outside_of_cluster.append((old_output_names[i], param_out_name))
#If there was only one single condition in the subworkflow cluster -> then we add it when the call is done
if(len(conditions_in_subworkflow)==1):
subworkfow_call = f"if{conditions_in_subworkflow[0]} {{\n{subworkfow_call_case_true}\n}} else {{\n{subworkfow_call_case_false}\n}}"
None
else:
subworkfow_call = subworkfow_call_case_true
subworkflow_clusters_to_add.append(subworkflow_code)
subworkflow_cluster_calls_to_add.append(subworkfow_call)
index_cluster+=1
#TODO -> rmoving the conditions which are problematic
#This might not be the probleme -> when rerunnung the analysis isn't totally robust
still_simplifying_conditions = True
while(still_simplifying_conditions):
still_simplifying_conditions = False
to_replace, anker1, anker2 = "", "", ""
#Replace if/else
for match in re.finditer(r"if\s*\([^\{]+\{\s*(\/\/Anker_cluster\d|\s)\s*\}\s*else\s*\{\s*(\/\/Anker_cluster\d|\s)\s*\}", code):
to_replace = match.group(0)
anker1, anker2 = match.group(1), match.group(2)
still_simplifying_conditions = True
break
#Replace empty if on its own
if(not still_simplifying_conditions):
for match in re.finditer(r"(if\s*\([^\{]+\{\s*(\/\/Anker_cluster\d|\s)\s*\})\s*[^e]", code):
to_replace = match.group(1)
anker1 = match.group(2)
still_simplifying_conditions = True
break
if(still_simplifying_conditions):
code = code.replace(to_replace, f"{anker1}\n{anker2}")
#Replace the ankers by the calls of the subworkflows
for i in range(len(subworkflow_clusters_to_add)):
#print(f"//Anker_cluster{i}", subworkflow_cluster_calls_to_add[i])
code = code.replace(f"//Anker_cluster{i}", subworkflow_cluster_calls_to_add[i])
for old, new in channels_to_replace_outside_of_cluster:
pattern= fr"[ \(,]({re.escape(old)})"
code = replace_group1(code, pattern, new)
#code = code.replace(old, new)
#Add the subworkflow defintions
#-------------------------------------
#Add anker
subworkflow_section = f"//ANKER 4 SUBWORKFLOW DEF"
to_replace = ""
for match in re.finditer(r"workflow\s+\w*\s*\{", code):
to_replace = match.group(0)
break
if(to_replace==""):
raise Exception("No call to a workflow")
code = code.replace(to_replace, f"{subworkflow_section}\n\n{to_replace}")
for sub in subworkflow_clusters_to_add:
code = code.replace(f'{subworkflow_section}', f"{sub}\n\n{subworkflow_section}")
#Putting || back
code = code.replace("$OR$", "||")
return remove_extra_jumps(format_with_tabs(code))
#return code
#
##So basically when retriving a thing (process or subworkflow)
##There is necessarily one call associated with the thing -> since we have the option duplicate activated
##You can check if len(thing.call)==1 -> and if not raise an error (but this shouldn't happen)
else:
raise BioFlowInsightError("Trying to convert the workflow with user view however option 'duplicate' is not activated. -> to reformulated")