Skip to content
Snippets Groups Projects
Commit dd319afc authored by George Marchment's avatar George Marchment
Browse files

Started added workflow rewrite

parent 470cb7c2
No related branches found
No related tags found
No related merge requests found
Pipeline #14372 failed with stage
in 2 minutes and 33 seconds
...@@ -48,6 +48,49 @@ class Main(Nextflow_Building_Blocks): ...@@ -48,6 +48,49 @@ class Main(Nextflow_Building_Blocks):
dico = {} dico = {}
self.root.get_all_calls_in_subworkflow(calls = dico) self.root.get_all_calls_in_subworkflow(calls = dico)
return list(dico.keys()) return list(dico.keys())
def get_all_executors_in_subworkflow(self):
dico = {}
self.root.get_all_executors_in_subworkflow(calls = dico)
return list(dico.keys())
#TODO -> write tests to test this method
def get_all_calls_in_workflow(self):
all_calls = self.get_all_calls_in_subworkflow()
dico = {}
for c in all_calls:
sub = c.get_first_element_called()
if(sub.get_type()=="Subworkflow"):
if(c not in dico):
sub_calls = sub.get_all_calls_in_workflow()
for sub_c in sub_calls:
dico[sub_c] = ""
for c in all_calls:
dico[c] = ""
return list(dico.keys())
#TODO -> write tests to test this method
def get_all_executors_in_workflow(self):
all_executors = self.get_all_executors_in_subworkflow()
dico = {}
for e in all_executors:
if(e.get_type()=="Call"):
for c in e.get_all_calls():
sub = c.get_first_element_called()
if(sub.get_type()=="Subworkflow"):
if(c not in dico):
sub_calls = sub.get_all_executors_in_workflow()
for sub_c in sub_calls:
dico[sub_c] = ""
#Case it's an operation
else:
dico[e] = ""
for e in all_executors:
dico[e] = ""
return list(dico.keys())
def check_includes(self): def check_includes(self):
......
...@@ -220,7 +220,13 @@ class Root(Nextflow_Building_Blocks): ...@@ -220,7 +220,13 @@ class Root(Nextflow_Building_Blocks):
calls[c] = '' calls[c] = ''
#if(c.get_first_element_called().get_type()=="Subworkflow"): #if(c.get_first_element_called().get_type()=="Subworkflow"):
# c.get_first_element_called().root.get_all_calls(calls = calls) # c.get_first_element_called().root.get_all_calls(calls = calls)
def get_all_executors_in_subworkflow(self, calls = {}):
all_executors = self.get_executors_same_level()+self.get_inside_executors()
for e in all_executors:
calls[e] = ''
############# #############
# PROCESSES # PROCESSES
......
...@@ -43,20 +43,23 @@ class Workflow: ...@@ -43,20 +43,23 @@ class Workflow:
nextflow_files = glob.glob(f'{file}/*.nf') nextflow_files = glob.glob(f'{file}/*.nf')
if(len(nextflow_files)==0): if(len(nextflow_files)==0):
raise BioFlowInsightError("No Nextflow files ('.nf') are in the directory!", num = -1) raise BioFlowInsightError("No Nextflow files ('.nf') are in the directory!", num = -1)
txt = ""
#Try to read the main.nf file -> if this cannot be found then the first nextflow file is used #Try to read the main.nf file -> if this cannot be found then the first nextflow file is used
try: try:
file = '/'.join(nextflow_files[0].split('/')[:-1])+"/main.nf"
file = file+"/main.nf"
with open(file, 'r') as f: with open(file, 'r') as f:
txt= f.read() txt= f.read()
except: except:
None None
#raise BioFlowInsightError("No 'main.nf' file found at the root of the prohject") #raise BioFlowInsightError("No 'main.nf' file found at the root of the prohject")
if(len(nextflow_files)==1): if(txt==""):
file = nextflow_files[0] if(len(nextflow_files)==1):
with open(file, 'r') as f: file = nextflow_files[0]
txt= f.read() with open(file, 'r') as f:
else: txt= f.read()
raise BioFlowInsightError("Multiple Nextflow files found at the root with no 'main.nf' file: I don't know which one to select") else:
raise BioFlowInsightError("Multiple Nextflow files found at the root with no 'main.nf' file: I don't know which one to select")
...@@ -272,7 +275,6 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen ...@@ -272,7 +275,6 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen
for f in nextflow_file.functions: for f in nextflow_file.functions:
function = f.get_code() function = f.get_code()
functions.append(function) functions.append(function)
print(functions)
for r in functions: for r in functions:
code = code.replace(r, "") code = code.replace(r, "")
code = code.replace(function_section, "\n\n".join(functions)) code = code.replace(function_section, "\n\n".join(functions))
...@@ -301,6 +303,436 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen ...@@ -301,6 +303,436 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen
code = code.replace("$OR$", "||") code = code.replace("$OR$", "||")
return code return code
#This methods generates a random set of processes to consider as relavant
#It's not a uniform random it's a bit of a gaussian, centered at 0.5
def generate_random_relevant_processes(self, alpha = -1):
import random
#Random value between 0 and 1, centered at 0.5
def get_value():
check = True
val = -1
while(check):
check = False
val = random.gauss(0.5, 0.1)
if(val<0 or val>1):
check = True
return val
if(self.duplicate):
if(alpha == -1):
alpha = get_value()
else:
if(0<=alpha and alpha<=1):
None
else:
raise BioFlowInsightError("alpha is not in the interval [0; 1]")
processes_called = []
for c in self.get_workflow_main().get_all_calls_in_workflow():
p = c.get_first_element_called()
if(p.get_type()=="Process"):
processes_called.append(p)
nb_2_select = int(alpha*len(processes_called))
sampled = random.sample(set(processes_called), nb_2_select)
#name_select = []
#for p in sampled:
# name_select.append(p.get_alias())
return sampled
else:
raise BioFlowInsightError("Trying to generate random relevant processes however option 'duplicate' is not activated.")
#This method rewrites the entire workflow into one single file
def write_workflow_into_one_file(self):
#This tag is used as an identification to safely manipulate the string
tag = str(time.time())
self.get_first_file
code = self.get_first_file().get_code()
#params_section = f"//PARAMS_SECTION_{tag}"
function_section = f"//FUNCTION_SECTION"
process_section = f"//PROCESS_SECTION"
subworkflow_section = f"//SUBWORKFLOW_SECTION"
ankers = function_section+ "\n"*3 + process_section+ "\n"*3 + subworkflow_section
to_replace = []
for match in re.finditer(constant.FULL_INLCUDE_2, code):
to_replace.append(match.group(0))
for r in to_replace:
code = code.replace(r, ankers)
ankers = ""
#Adding processes into code
for p in self.get_processes_called():
if(p.get_code() not in code):
code = code.replace(process_section, '\n'+p.get_code_with_alias()+'\n'+process_section)
#Adding subworkflows into code
for sub in self.get_subworkflows_called():
if(sub.get_code() not in code):
code = code.replace(subworkflow_section, subworkflow_section+'\n'+sub.get_code_with_alias()+'\n')
#Adding functions into code
for fun in self.get_functions_called():
if(fun.get_code() not in code):
code = code.replace(function_section, function_section+'\n'+fun.get_code()+'\n')
#Remove the ankers
#code = code.replace(function_section, "")
#code = code.replace(process_section, "")
#code = code.replace(subworkflow_section, "")
ankers = {"function_section":function_section,
"process_section":process_section,
"subworkflow_section":subworkflow_section}
return code, ankers
#TODO -> write tests for this method
#Function that rewrites the workflow code
#Rewriting everything in one file + simplifying the operations and calls to simplify the analysis
def simplify_workflow_code(self):
code = self.get_first_file().get_code()
#code, ankers = self.write_workflow_into_one_file()
#TODO -> update method get_all_executors_from_workflow -> right now it's not searching through the subworkflows
for exe in self.get_workflow_main().get_all_executors_in_workflow():
if(exe.get_type()=="Call" or exe.get_type()=="Operation"):
code = code.replace(exe.get_code(get_OG = True), exe.simplify_code())
else:
print(exe.get_code(), exe.get_type())
raise Exception("This shouldn't happen")
return code
#Method which rewrites the workflow follwong the user view
#Conert workflow to user_view only makes sense when the option duplicate is activated -> otherwise is doesn't make sense + it makes the analysis way more complicated
def convert_workflow_2_user_view(self, relevant_processes = []):
if(self.duplicate):
None
code = self.simplify_workflow_code()
print(code)
#self.rewrite_and_initialise(code)
#
##Get the clusters and the code
#self.check_relevant_processes_in_workflow(relevant_processes)
#self.nextflow_file.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = [])
#clusters = self.nextflow_file.graph.get_clusters_from_user_view()
#
##DETERMING WHICH SUBWORKFLOWS ARE BROKEN WITH THE CLUSTER
##Creating the clusters with calls instead of processes or subworkflows
#set_clusters_with_calls = []
#for c in clusters:
# tab = []
# for ele in c:
# if(ele.get_type()=="Operation"):
# if(ele.get_artificial_status()==False):
# tab.append(ele)
# else:
# call = ele.get_call()
# if(len(call)!=1):
# raise Exception("This shouldn't happen")
# tab.append(call[0])
# set_clusters_with_calls.append(set(tab))
#
##Getting subworkflows to executors
#subworkflow_2_executors = {}
#for sub in self.get_subworkflows_called():
# dico = {}
# sub.get_all_executors(dico)
# temp = set(list(dico.keys()))
# subworkflow_2_executors[sub] = []
# for ele in temp:
# #Cannot add calls to subworkflows -> they are already present by definition
# if(ele.get_type()=="Call" and ele.get_first_element_called().get_type()=="Subworkflow"):
# None
# #We don't add it
# else:
# subworkflow_2_executors[sub].append(ele)
# #subworkflow_2_executors[sub.get_name()] = set(list(dico.keys()))
#
#def get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls):
# broken_subworkflows = []
# for sub in subworkflow_2_executors:
# #You want this list (set) to be equal to subworkflow_2_executors[sub]
# elements_in_sub_with_clusters = []
# for cluster in set_clusters_with_calls:
# if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])):
# break
# for c in cluster:
# if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])):
# break
# if(c in subworkflow_2_executors[sub]):
# elements_in_sub_with_clusters+=list(cluster)
# break
# if(set(elements_in_sub_with_clusters)==set(subworkflow_2_executors[sub])):
# None
# #This means that the subworkflow is Intact
# else:
# #This means that the subworkflow is broken
# broken_subworkflows.append(sub)
# return broken_subworkflows
#
#broken_subworkflows = get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls)
##Rewrite broken subworkflows
#for sub in broken_subworkflows:
# code = self.rewrite_subworkflow_call(code, sub)
#
##TODO -> this needs to be optimised
#self.rewrite_and_initialise(code)
##Get the clusters and the code
#self.nextflow_file.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = [])
#clusters = self.nextflow_file.graph.get_clusters_from_user_view()
#
#
#
##Get the clsuters with the corresponding operations inside
##for i in range(len(clusters)):
## c = clusters[i]
## if(len(c)>1):
## clusters[i] = self.nextflow_file.graph.get_induced_subgraph(c)
##print(clusters)
##Get the topological order
#clusters = self.nextflow_file.graph.get_topogical_order(clusters)
##print(clusters)
#
#
##Creating the subworkflows from clusters
#calls_in_operations = []
#non_relevant_name = 1
#channels_to_replace_outside_of_cluster = []
#subworkflow_clusters_to_add, subworkflow_cluster_calls_to_add = [], []
#index_cluster = 0
#for elements in clusters:
# #Only create the subworkflows for clusters with more than one element
# processes_added = []
# things_added_in_cluster = []
# if(len(elements)>1):
# name, body, take, emit = "", "", "", ""
# first_element = True
# for ele in elements:
#
# if(ele.get_type()=="Process"):
#
# #Determine the name of the created subworkflow cluster
# if(ele.get_alias() in relevant_processes):
# name = f"cluster_{ele.get_alias()}"
# #Get the call of thing (either process or subworkflow)
# #TODO -> check it works with subworkflows
# call = ele.get_call()
#
# #This verification is really important
# if(len(call)!=1):
# for c in call:
# print(c.get_code(get_OG=True))
#
# raise Exception("This shoudn't happen since duplicate mode is activated")
# call = call[0]
#
# #If first element -> add marker for the subworkflow call
# if(first_element):
# code = code.replace(call.get_code(get_OG = True), f"//Anker_cluster{index_cluster}")
# first_element = False
# else:
# code = code.replace(call.get_code(get_OG = True), "")
#
#
# processes_added.append(call.get_first_element_called())
# printed_condition = " && ".join(call.get_condition().get_conditions())
# if(printed_condition!=""):
# body+=f"if({printed_condition}) {{\n{call.get_code()}\n}}\n"
# else:
# body+=f"\n{call.get_code()}\n"
# things_added_in_cluster.append(call)
# elif(ele.get_type()=="Operation"):
#
# #If first element -> add marker for the subworkflow call
# if(first_element):
# code = code.replace(ele.get_code(get_OG = True), f"//Anker_cluster{index_cluster}")
# first_element = False
# else:
# code = code.replace(ele.get_code(get_OG = True), "")
#
# #Ignore these cases
# if(ele.get_code()[:4] not in ["emit", "take"]):
# origins = ele.get_origins()
# for o in origins:
# if(o.get_type()=="Call"):
# calls_in_operations.append(o)
# printed_condition = " && ".join(ele.get_condition().get_conditions())
# if(printed_condition!=""):
# body+=f"if({printed_condition}) {{\n{ele.get_code()}\n}}\n"
# else:
# body+=f"\n{ele.get_code()}\n"
# things_added_in_cluster.append(ele)
# else:
# raise Exception("This shoudn't happen")
#
# #TODO check this part of the code is never seen
# #Here we removing the Call_12313 thing
# for call in calls_in_operations:
# raise Exception("This shouldn't happen since the workflows has been rewritten")
# body = body.replace(call.get_code(), "")
# body = body.replace(str(call), call.get_code())
#
# #If the name=="" -> it means there isn't any relevant processes in the cluster -> it means it's a cluster of non relevant nodes
# if(name==""):
# #If there are processes called we are going to use them
# if(len(processes_added)>0):
# #TODO find a better naming system
# name = f"non_relevant_cluster_{processes_added[0].get_alias()}"
# else:
# #TODO find a better naming system
# name = f"non_relevant_cluster_{non_relevant_name}"
# non_relevant_name+=1
#
# #Check that there is a single condtion in the body
# body = group_together_ifs(body)
# conditions_in_subworkflow = []
# end = -1
# for match in re.finditer(r"if\s*(\([^\{]+)\{", body):
# conditions_in_subworkflow.append(match.group(1).strip())
# _, start = match.span(0)
#
# if(len(conditions_in_subworkflow)==1):
# end = extract_curly(body, start)
# body = body[start: end-1].strip()
#
#
# #TAKE
# #Adding take parameters on the inside of the subworkflow
# takes_param = self.get_takes(things_added_in_cluster)
# new_param_names, index, old_param_names = [], 1, []
# for param in takes_param:
# param_name = f"param_{name}_{index}"
# new_param_names.append(param_name)
# old_param_names.append(param.get_code())
# index += 1
# if(len(new_param_names)>0):
# temp = '\n'.join(new_param_names)
# take = f"\ntake:\n{temp}\n"
#
# #EMIT
# #Adding the emitted outputs
# emitted_outputs = self.get_emits(things_added_in_cluster)
# new_output_names, index, old_output_names = [], 0, []
# for output in emitted_outputs:
# output_name = f"{name}.out[{index}]"
# new_output_names.append(output_name)
# old_output_names.append(output.get_code())
# index += 1
#
# if(len(old_output_names)>0):
# temp = '\n'.join(old_output_names)
# emit = f"\nemit:\n{temp}\n"
#
# #Adding empty channels if it doesn't exist in the case of a negative condition
# body = get_channels_to_add_in_false_conditions(body, old_output_names)
#
#
#
# #Replace names inside subworkflow
# subworkflow_code = f"workflow {name} {{\n{take}\nmain:\n{body}\n{emit}\n}}"
# for i in range(len(new_param_names)):
# pattern = fr"[\=\,\(] *({re.escape(takes_param[i].get_code())})[\s\,\)\.]"
# subworkflow_code = replace_group1(subworkflow_code, pattern, new_param_names[i])
# #subworkflow_code = subworkflow_code.replace(takes_param[i].get_code(), new_param_names[i])
#
# #TODO -> added verification of conditions
# params = ", ".join(old_param_names)
# subworkfow_call_case_true = f"{name}({params})"
# subworkfow_call_case_false = ""
# for i in range(len(new_output_names)):
# #In the case of channels, we just add chanel = subworkflow.out[i]
# if(not bool(re.findall("\.\s*out", old_output_names[i]))):
# subworkfow_call_case_true+=f"\n{old_output_names[i]} = {new_output_names[i]}"
# subworkfow_call_case_false+=f"\n{old_output_names[i]} = Channel.empty()"
# #In the case of emitted values we need to replace the code on the outside
# else:
# param_out_name= f"{name}_out_{i+1}"
# subworkfow_call_case_true+=f"\n{param_out_name} = {new_output_names[i]}"
# subworkfow_call_case_false+=f"\n{param_out_name} = Channel.empty()"
# channels_to_replace_outside_of_cluster.append((old_output_names[i], param_out_name))
# #If there was only one single condition in the subworkflow cluster -> then we add it when the call is done
# if(len(conditions_in_subworkflow)==1):
# subworkfow_call = f"if{conditions_in_subworkflow[0]} {{\n{subworkfow_call_case_true}\n}} else {{\n{subworkfow_call_case_false}\n}}"
# None
# else:
# subworkfow_call = subworkfow_call_case_true
#
#
# subworkflow_clusters_to_add.append(subworkflow_code)
# subworkflow_cluster_calls_to_add.append(subworkfow_call)
# index_cluster+=1
#
#
##TODO -> rmoving the conditions which are problematic
##This might not be the probleme -> when rerunnung the analysis isn't totally robust
#still_simplifying_conditions = True
#while(still_simplifying_conditions):
# still_simplifying_conditions = False
# to_replace, anker1, anker2 = "", "", ""
# #Replace if/else
# for match in re.finditer(r"if\s*\([^\{]+\{\s*(\/\/Anker_cluster\d|\s)\s*\}\s*else\s*\{\s*(\/\/Anker_cluster\d|\s)\s*\}", code):
# to_replace = match.group(0)
# anker1, anker2 = match.group(1), match.group(2)
# still_simplifying_conditions = True
# break
# #Replace empty if on its own
# if(not still_simplifying_conditions):
# for match in re.finditer(r"(if\s*\([^\{]+\{\s*(\/\/Anker_cluster\d|\s)\s*\})\s*[^e]", code):
# to_replace = match.group(1)
# anker1 = match.group(2)
# still_simplifying_conditions = True
# break
# if(still_simplifying_conditions):
# code = code.replace(to_replace, f"{anker1}\n{anker2}")
#
#
##Replace the ankers by the calls of the subworkflows
#for i in range(len(subworkflow_clusters_to_add)):
# #print(f"//Anker_cluster{i}", subworkflow_cluster_calls_to_add[i])
# code = code.replace(f"//Anker_cluster{i}", subworkflow_cluster_calls_to_add[i])
#
#for old, new in channels_to_replace_outside_of_cluster:
# pattern= fr"[ \(,]({re.escape(old)})"
# code = replace_group1(code, pattern, new)
# #code = code.replace(old, new)
#
#
##Add the subworkflow defintions
##-------------------------------------
##Add anker
#subworkflow_section = f"//ANKER 4 SUBWORKFLOW DEF"
#to_replace = ""
#for match in re.finditer(r"workflow\s+\w*\s*\{", code):
# to_replace = match.group(0)
# break
#if(to_replace==""):
# raise Exception("No call to a workflow")
#
#code = code.replace(to_replace, f"{subworkflow_section}\n\n{to_replace}")
#
#for sub in subworkflow_clusters_to_add:
# code = code.replace(f'{subworkflow_section}', f"{sub}\n\n{subworkflow_section}")
#
#
##Putting || back
#code = code.replace("$OR$", "||")
#
#return remove_extra_jumps(format_with_tabs(code))
#
#
##So basically when retriving a thing (process or subworkflow)
##There is necessarily one call associated with the thing -> since we have the option duplicate activated
##You can check if len(thing.call)==1 -> and if not raise an error (but this shouldn't happen)
else:
raise BioFlowInsightError("Trying to convert the workflow with user view however option 'duplicate' is not activated. -> to reformulated")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment