Skip to content
Snippets Groups Projects
Commit a34c9b51 authored by George Marchment's avatar George Marchment
Browse files

What looks like a stable version

parent 4df87025
No related branches found
No related tags found
No related merge requests found
Pipeline #14380 failed with stage
in 2 minutes and 25 seconds
......@@ -19,7 +19,7 @@ class Block(Root):
#This method returns returns all the conditions above the block
#Basically everything which needs to be true for the block to exist
def get_all_conditions(self, conditions = {}):
def get_all_conditions(self, conditions):
conditions[self.condition] = ''
self.origin.get_all_conditions(conditions = conditions)
return conditions
......
......@@ -42,6 +42,9 @@ class Call(Executor):
def add_to_emits(self, emitted):
self.emits.append(emitted)
def get_later_emits(self):
return self.emits
def __str__(self):
return f"Call_{id(self)}"
......@@ -78,22 +81,26 @@ class Call(Executor):
#Case the param is an operation
elif(param.get_type()=="Operation"):
#If it's an artificial operation -> we don't need to do anything
if(not param.get_artificial_status()):
code = code.replace(param.get_code(get_OG=True), param_new_name)
lines = param.simplify_code().split('\n')
if(len(lines)==1):
new_bit = f"{param_new_name} = {lines[0]}"
else:
head = '\n'.join(lines[:-1])
new_bit = f"{head}\n{param_new_name} = {lines[-1]}"
code = code.replace(tag_to_add, f"{tag_to_add}\n{new_bit}")
code = code.replace(param.get_code(get_OG=True), param_new_name)
lines = param.simplify_code().split('\n')
if(len(lines)==1):
new_bit = f"{param_new_name} = {lines[0]}"
else:
head = '\n'.join(lines[:-1])
new_bit = f"{head}\n{param_new_name} = {lines[-1]}"
code = code.replace(tag_to_add, f"{tag_to_add}\n{new_bit}")
#Case Channel
elif(param.get_type()=="Channel"):
raise Exception("This shouldn't happen")
None
elif(param.get_type()=="Emitted"):
None
else:
print(param)
raise Exception("This shouldn't happen")
index+=1
return code.replace(tag_to_add, "").strip()
......@@ -166,6 +173,7 @@ class Call(Executor):
channels = [channel]
from .operation import Operation
ope = Operation(f"{param}", self)
ope.set_as_artificial()
for channel in channels:
channel.add_sink(self)
ope.add_element_origins(channel)
......
......@@ -31,6 +31,14 @@ class Executor(Nextflow_Building_Blocks):
def get_list_name_processes(self):
return self.origin.get_list_name_processes()
def get_all_conditions(self):
if(self.origin.get_type()=="Root"):
return []
elif(self.origin.get_type()=="Block"):
conditions = {}
return self.origin.get_all_conditions(conditions)
else:
return self.origin.get_all_conditions()
def get_subworkflow_from_name(self, name):
......@@ -93,8 +101,6 @@ class Executor(Nextflow_Building_Blocks):
def get_executors(self):
return self.origin.get_executors()
def get_condition(self):
return self.condition
def add_element_to_elements_being_called(self, element):
self.origin.add_element_to_elements_being_called(element)
......
......@@ -446,7 +446,6 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen
#Replace old analysis with new analysis (simplified code)
self.__init__(str(temp_file), display_info = False, duplicate=True)
self.initialise()
self.generate_all_graphs()
def check_relevant_processes_in_workflow(self, relevant_processes):
#Check all relevat processes are in wf
......@@ -547,6 +546,68 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen
return code
#This Function returns the channels on which the subworkflow (things_added_in_cluster) depend on
def get_takes(self, things_added_in_cluster):
#Basiccaly this is a deco of channels to opeartions -> when the value is an empty list
#This means that the channel is totally definied in the subworkflow -> so we are searching for
#Channels which aren't totatly defined in the subworkflow
channels_2_sources = {}
for ele in things_added_in_cluster:
if(ele.get_type() == "Operation"):
for o in ele.get_origins():
channels_2_sources[o] = replace_thing_by_call(o.get_source())
elif(ele.get_type() == "Call"):
for param in ele.get_parameters():
if(param.get_type()=="Channel"):
print(param, param.get_code())
raise Exception("This shouldn't happen -> with the rewrite all the params should be channels")
else:
for o in param.get_origins():
if(o.get_type()=="Channel"):
channels_2_sources[o] = replace_thing_by_call(o.get_source())
else:
raise Exception("This shouldn't happen -> with the rewrite all the params should be channels")
else:
raise Exception("This shouldn't happen")
takes = []
for channel in channels_2_sources:
if(set(channels_2_sources[channel]).intersection(things_added_in_cluster)!=set(channels_2_sources[channel])):
takes.append(channel)
#print(things_added_in_cluster)
#print(channels_2_operations_needed)
return takes
#This Function returns the channels the subworkflow (things_added_in_cluster) emits (other things depend on)
def get_emits(self, things_added_in_cluster):
emits = []
channel_2_sink = {}
#Basiccaly this is a deco of channels to opeartions -> when the value is an empty list
#This means that the channel is totally definied in the subworkflow -> so we are searching for
#Channels which aren't totatly defined in the subworkflow
#This means that things outside the subworkflow depend on this channel
channel_2_sink = {}
for ele in things_added_in_cluster:
if(ele.get_type() == "Operation"):
for o in ele.get_gives():
channel_2_sink[o] = replace_thing_by_call(o.get_sink())
elif(ele.get_type() == "Call"):
#thing = ele.get_first_element_called()
for e in ele.get_later_emits():
channel_2_sink[e] = replace_thing_by_call(e.get_sink())
else:
print(ele)
raise Exception("This shouldn't happen")
for channel in channel_2_sink:
if(set(channel_2_sink[channel]).intersection(things_added_in_cluster)!=set(channel_2_sink[channel])):
emits.append(channel)
return emits
#Method which rewrites the workflow follwong the user view
#Conert workflow to user_view only makes sense when the option duplicate is activated -> otherwise is doesn't make sense + it makes the analysis way more complicated
def convert_workflow_2_user_view(self, relevant_processes = []):
......@@ -623,7 +684,6 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen
#Get the clusters and the code
self.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = [])
clusters = self.graph.get_clusters_from_user_view()
print(clusters)
#Get the clsuters with the corresponding operations inside
......@@ -637,231 +697,239 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen
##Creating the subworkflows from clusters
#calls_in_operations = []
#non_relevant_name = 1
#channels_to_replace_outside_of_cluster = []
#subworkflow_clusters_to_add, subworkflow_cluster_calls_to_add = [], []
#index_cluster = 0
#for elements in clusters:
# #Only create the subworkflows for clusters with more than one element
# processes_added = []
# things_added_in_cluster = []
# if(len(elements)>1):
# name, body, take, emit = "", "", "", ""
# first_element = True
# for ele in elements:
#
# if(ele.get_type()=="Process"):
#
# #Determine the name of the created subworkflow cluster
# if(ele.get_alias() in relevant_processes):
# name = f"cluster_{ele.get_alias()}"
# #Get the call of thing (either process or subworkflow)
# #TODO -> check it works with subworkflows
# call = ele.get_call()
#
# #This verification is really important
# if(len(call)!=1):
# for c in call:
# print(c.get_code(get_OG=True))
#
# raise Exception("This shoudn't happen since duplicate mode is activated")
# call = call[0]
#
# #If first element -> add marker for the subworkflow call
# if(first_element):
# code = code.replace(call.get_code(get_OG = True), f"//Anker_cluster{index_cluster}")
# first_element = False
# else:
# code = code.replace(call.get_code(get_OG = True), "")
#
#
# processes_added.append(call.get_first_element_called())
# printed_condition = " && ".join(call.get_condition().get_conditions())
# if(printed_condition!=""):
# body+=f"if({printed_condition}) {{\n{call.get_code()}\n}}\n"
# else:
# body+=f"\n{call.get_code()}\n"
# things_added_in_cluster.append(call)
# elif(ele.get_type()=="Operation"):
#
# #If first element -> add marker for the subworkflow call
# if(first_element):
# code = code.replace(ele.get_code(get_OG = True), f"//Anker_cluster{index_cluster}")
# first_element = False
# else:
# code = code.replace(ele.get_code(get_OG = True), "")
#
# #Ignore these cases
# if(ele.get_code()[:4] not in ["emit", "take"]):
# origins = ele.get_origins()
# for o in origins:
# if(o.get_type()=="Call"):
# calls_in_operations.append(o)
# printed_condition = " && ".join(ele.get_condition().get_conditions())
# if(printed_condition!=""):
# body+=f"if({printed_condition}) {{\n{ele.get_code()}\n}}\n"
# else:
# body+=f"\n{ele.get_code()}\n"
# things_added_in_cluster.append(ele)
# else:
# raise Exception("This shoudn't happen")
#
# #TODO check this part of the code is never seen
# #Here we removing the Call_12313 thing
# for call in calls_in_operations:
# raise Exception("This shouldn't happen since the workflows has been rewritten")
# body = body.replace(call.get_code(), "")
# body = body.replace(str(call), call.get_code())
#
# #If the name=="" -> it means there isn't any relevant processes in the cluster -> it means it's a cluster of non relevant nodes
# if(name==""):
# #If there are processes called we are going to use them
# if(len(processes_added)>0):
# #TODO find a better naming system
# name = f"non_relevant_cluster_{processes_added[0].get_alias()}"
# else:
# #TODO find a better naming system
# name = f"non_relevant_cluster_{non_relevant_name}"
# non_relevant_name+=1
#
# #Check that there is a single condtion in the body
# body = group_together_ifs(body)
# conditions_in_subworkflow = []
# end = -1
# for match in re.finditer(r"if\s*(\([^\{]+)\{", body):
# conditions_in_subworkflow.append(match.group(1).strip())
# _, start = match.span(0)
#
# if(len(conditions_in_subworkflow)==1):
# end = extract_curly(body, start)
# body = body[start: end-1].strip()
#
#
# #TAKE
# #Adding take parameters on the inside of the subworkflow
# takes_param = self.get_takes(things_added_in_cluster)
# new_param_names, index, old_param_names = [], 1, []
# for param in takes_param:
# param_name = f"param_{name}_{index}"
# new_param_names.append(param_name)
# old_param_names.append(param.get_code())
# index += 1
# if(len(new_param_names)>0):
# temp = '\n'.join(new_param_names)
# take = f"\ntake:\n{temp}\n"
#
# #EMIT
# #Adding the emitted outputs
# emitted_outputs = self.get_emits(things_added_in_cluster)
# new_output_names, index, old_output_names = [], 0, []
# for output in emitted_outputs:
# output_name = f"{name}.out[{index}]"
# new_output_names.append(output_name)
# old_output_names.append(output.get_code())
# index += 1
#
# if(len(old_output_names)>0):
# temp = '\n'.join(old_output_names)
# emit = f"\nemit:\n{temp}\n"
#
# #Adding empty channels if it doesn't exist in the case of a negative condition
# body = get_channels_to_add_in_false_conditions(body, old_output_names)
#
#
#
# #Replace names inside subworkflow
# subworkflow_code = f"workflow {name} {{\n{take}\nmain:\n{body}\n{emit}\n}}"
# for i in range(len(new_param_names)):
# pattern = fr"[\=\,\(] *({re.escape(takes_param[i].get_code())})[\s\,\)\.]"
# subworkflow_code = replace_group1(subworkflow_code, pattern, new_param_names[i])
# #subworkflow_code = subworkflow_code.replace(takes_param[i].get_code(), new_param_names[i])
#
# #TODO -> added verification of conditions
# params = ", ".join(old_param_names)
# subworkfow_call_case_true = f"{name}({params})"
# subworkfow_call_case_false = ""
# for i in range(len(new_output_names)):
# #In the case of channels, we just add chanel = subworkflow.out[i]
# if(not bool(re.findall("\.\s*out", old_output_names[i]))):
# subworkfow_call_case_true+=f"\n{old_output_names[i]} = {new_output_names[i]}"
# subworkfow_call_case_false+=f"\n{old_output_names[i]} = Channel.empty()"
# #In the case of emitted values we need to replace the code on the outside
# else:
# param_out_name= f"{name}_out_{i+1}"
# subworkfow_call_case_true+=f"\n{param_out_name} = {new_output_names[i]}"
# subworkfow_call_case_false+=f"\n{param_out_name} = Channel.empty()"
# channels_to_replace_outside_of_cluster.append((old_output_names[i], param_out_name))
# #If there was only one single condition in the subworkflow cluster -> then we add it when the call is done
# if(len(conditions_in_subworkflow)==1):
# subworkfow_call = f"if{conditions_in_subworkflow[0]} {{\n{subworkfow_call_case_true}\n}} else {{\n{subworkfow_call_case_false}\n}}"
# None
# else:
# subworkfow_call = subworkfow_call_case_true
#
#
# subworkflow_clusters_to_add.append(subworkflow_code)
# subworkflow_cluster_calls_to_add.append(subworkfow_call)
# index_cluster+=1
#
#
##TODO -> rmoving the conditions which are problematic
##This might not be the probleme -> when rerunnung the analysis isn't totally robust
#still_simplifying_conditions = True
#while(still_simplifying_conditions):
# still_simplifying_conditions = False
# to_replace, anker1, anker2 = "", "", ""
# #Replace if/else
# for match in re.finditer(r"if\s*\([^\{]+\{\s*(\/\/Anker_cluster\d|\s)\s*\}\s*else\s*\{\s*(\/\/Anker_cluster\d|\s)\s*\}", code):
# to_replace = match.group(0)
# anker1, anker2 = match.group(1), match.group(2)
# still_simplifying_conditions = True
# break
# #Replace empty if on its own
# if(not still_simplifying_conditions):
# for match in re.finditer(r"(if\s*\([^\{]+\{\s*(\/\/Anker_cluster\d|\s)\s*\})\s*[^e]", code):
# to_replace = match.group(1)
# anker1 = match.group(2)
# still_simplifying_conditions = True
# break
# if(still_simplifying_conditions):
# code = code.replace(to_replace, f"{anker1}\n{anker2}")
#
#
##Replace the ankers by the calls of the subworkflows
#for i in range(len(subworkflow_clusters_to_add)):
# #print(f"//Anker_cluster{i}", subworkflow_cluster_calls_to_add[i])
# code = code.replace(f"//Anker_cluster{i}", subworkflow_cluster_calls_to_add[i])
#
#for old, new in channels_to_replace_outside_of_cluster:
# pattern= fr"[ \(,]({re.escape(old)})"
# code = replace_group1(code, pattern, new)
# #code = code.replace(old, new)
#
#
##Add the subworkflow defintions
##-------------------------------------
##Add anker
#subworkflow_section = f"//ANKER 4 SUBWORKFLOW DEF"
#to_replace = ""
#for match in re.finditer(r"workflow\s+\w*\s*\{", code):
# to_replace = match.group(0)
# break
#if(to_replace==""):
# raise Exception("No call to a workflow")
#
#code = code.replace(to_replace, f"{subworkflow_section}\n\n{to_replace}")
#
#for sub in subworkflow_clusters_to_add:
# code = code.replace(f'{subworkflow_section}', f"{sub}\n\n{subworkflow_section}")
#
#
##Putting || back
#code = code.replace("$OR$", "||")
#
#return remove_extra_jumps(format_with_tabs(code))
return code
#Creating the subworkflows from clusters
calls_in_operations = []
non_relevant_name = 1
channels_to_replace_outside_of_cluster = []
subworkflow_clusters_to_add, subworkflow_cluster_calls_to_add = [], []
index_cluster = 0
for elements in clusters:
#Only create the subworkflows for clusters with more than one element
processes_added = []
things_added_in_cluster = []
if(len(elements)>1):
name, body, take, emit = "", "", "", ""
first_element = True
for ele in elements:
if(ele.get_type()=="Process"):
#Determine the name of the created subworkflow cluster
if(ele.get_alias() in relevant_processes):
name = f"cluster_{ele.get_alias()}"
#Get the call of thing (either process or subworkflow)
call = ele.get_call()
#If first element -> add marker for the subworkflow call
if(first_element):
code = code.replace(call.get_code(get_OG = True), f"//Anker_cluster{index_cluster}")
first_element = False
else:
code = code.replace(call.get_code(get_OG = True), "")
processes_added.append(call.get_first_element_called())
values = []
for condition in call.get_all_conditions():
values.append(condition.get_value())
printed_condition = " && ".join(values)
if(printed_condition!=""):
body+=f"if({printed_condition}) {{\n{call.get_code()}\n}}\n"
else:
body+=f"\n{call.get_code()}\n"
things_added_in_cluster.append(call)
#Below
elif(ele.get_type()=="Operation"):
#TODO -> check this verification there might be some "effet de bord"
if(not ele.get_artificial_status()):
#If first element -> add marker for the subworkflow call
print(ele.get_code(get_OG = True))
if(first_element):
code = code.replace(ele.get_code(get_OG = True), f"//Anker_cluster{index_cluster}", 1)
first_element = False
else:
code = code.replace(ele.get_code(get_OG = True), "", 1)
#Ignore these cases
if(ele.get_code()[:4] not in ["emit", "take"]):
origins = ele.get_origins()
for o in origins:
if(o.get_type()=="Call"):
calls_in_operations.append(o)
values = []
for condition in ele.get_all_conditions():
values.append(condition.get_value())
printed_condition = " && ".join(values)
if(printed_condition!=""):
body+=f"if({printed_condition}) {{\n{ele.get_code()}\n}}\n"
else:
body+=f"\n{ele.get_code()}\n"
things_added_in_cluster.append(ele)
else:
raise Exception("This shoudn't happen")
#TODO check this part of the code is never seen
#Here we removing the Call_12313 thing
for call in calls_in_operations:
raise Exception("This shouldn't happen since the workflows has been rewritten")
body = body.replace(call.get_code(), "")
body = body.replace(str(call), call.get_code())
#If the name=="" -> it means there isn't any relevant processes in the cluster -> it means it's a cluster of non relevant nodes
if(name==""):
#If there are processes called we are going to use them
if(len(processes_added)>0):
#TODO find a better naming system
name = f"non_relevant_cluster_{processes_added[0].get_alias()}"
else:
#TODO find a better naming system
name = f"non_relevant_cluster_{non_relevant_name}"
non_relevant_name+=1
#Check that there is a single condtion in the body
body = group_together_ifs(body)
conditions_in_subworkflow = []
end = -1
for match in re.finditer(r"if\s*(\([^\{]+)\{", body):
conditions_in_subworkflow.append(match.group(1).strip())
_, start = match.span(0)
if(len(conditions_in_subworkflow)==1):
end = extract_curly(body, start)
body = body[start: end-1].strip()
#TAKE
#Adding take parameters on the inside of the subworkflow
takes_param = self.get_takes(things_added_in_cluster)
new_param_names, index, old_param_names = [], 1, []
for param in takes_param:
param_name = f"param_{name}_{index}"
new_param_names.append(param_name)
old_param_names.append(param.get_code())
index += 1
if(len(new_param_names)>0):
temp = '\n'.join(new_param_names)
take = f"\ntake:\n{temp}\n"
#EMIT
#Adding the emitted outputs
emitted_outputs = self.get_emits(things_added_in_cluster)
new_output_names, index, old_output_names = [], 0, []
for output in emitted_outputs:
output_name = f"{name}.out[{index}]"
new_output_names.append(output_name)
old_output_names.append(output.get_code())
index += 1
if(len(old_output_names)>0):
temp = '\n'.join(old_output_names)
emit = f"\nemit:\n{temp}\n"
#Adding empty channels if it doesn't exist in the case of a negative condition
body = get_channels_to_add_in_false_conditions(body, old_output_names)
#Replace names inside subworkflow
subworkflow_code = f"workflow {name} {{\n{take}\nmain:\n{body}\n{emit}\n}}"
for i in range(len(new_param_names)):
pattern = fr"[\=\,\(] *({re.escape(takes_param[i].get_code())})[\s\,\)\.]"
subworkflow_code = replace_group1(subworkflow_code, pattern, new_param_names[i])
#subworkflow_code = subworkflow_code.replace(takes_param[i].get_code(), new_param_names[i])
#TODO -> added verification of conditions
params = ", ".join(old_param_names)
subworkfow_call_case_true = f"{name}({params})"
subworkfow_call_case_false = ""
for i in range(len(new_output_names)):
#In the case of channels, we just add chanel = subworkflow.out[i]
if(not bool(re.findall("\.\s*out", old_output_names[i]))):
subworkfow_call_case_true+=f"\n{old_output_names[i]} = {new_output_names[i]}"
subworkfow_call_case_false+=f"\n{old_output_names[i]} = Channel.empty()"
#In the case of emitted values we need to replace the code on the outside
else:
param_out_name= f"{name}_out_{i+1}"
subworkfow_call_case_true+=f"\n{param_out_name} = {new_output_names[i]}"
subworkfow_call_case_false+=f"\n{param_out_name} = Channel.empty()"
channels_to_replace_outside_of_cluster.append((old_output_names[i], param_out_name))
#If there was only one single condition in the subworkflow cluster -> then we add it when the call is done
if(len(conditions_in_subworkflow)==1):
subworkfow_call = f"if{conditions_in_subworkflow[0]} {{\n{subworkfow_call_case_true}\n}} else {{\n{subworkfow_call_case_false}\n}}"
None
else:
subworkfow_call = subworkfow_call_case_true
subworkflow_clusters_to_add.append(subworkflow_code)
subworkflow_cluster_calls_to_add.append(subworkfow_call)
index_cluster+=1
#TODO -> rmoving the conditions which are problematic
#This might not be the probleme -> when rerunnung the analysis isn't totally robust
still_simplifying_conditions = True
while(still_simplifying_conditions):
still_simplifying_conditions = False
to_replace, anker1, anker2 = "", "", ""
#Replace if/else
for match in re.finditer(r"if\s*\([^\{]+\{\s*(\/\/Anker_cluster\d|\s)\s*\}\s*else\s*\{\s*(\/\/Anker_cluster\d|\s)\s*\}", code):
to_replace = match.group(0)
anker1, anker2 = match.group(1), match.group(2)
still_simplifying_conditions = True
break
#Replace empty if on its own
if(not still_simplifying_conditions):
for match in re.finditer(r"(if\s*\([^\{]+\{\s*(\/\/Anker_cluster\d|\s)\s*\})\s*[^e]", code):
to_replace = match.group(1)
anker1 = match.group(2)
still_simplifying_conditions = True
break
if(still_simplifying_conditions):
code = code.replace(to_replace, f"{anker1}\n{anker2}")
#Replace the ankers by the calls of the subworkflows
for i in range(len(subworkflow_clusters_to_add)):
#print(f"//Anker_cluster{i}", subworkflow_cluster_calls_to_add[i])
code = code.replace(f"//Anker_cluster{i}", subworkflow_cluster_calls_to_add[i])
for old, new in channels_to_replace_outside_of_cluster:
pattern= fr"[ \(,]({re.escape(old)})"
code = replace_group1(code, pattern, new)
#code = code.replace(old, new)
#Add the subworkflow defintions
#-------------------------------------
#Add anker
subworkflow_section = f"//ANKER 4 SUBWORKFLOW DEF"
to_replace = ""
for match in re.finditer(r"workflow\s+\w*\s*\{", code):
to_replace = match.group(0)
break
if(to_replace==""):
raise Exception("No call to a workflow")
code = code.replace(to_replace, f"{subworkflow_section}\n\n{to_replace}")
for sub in subworkflow_clusters_to_add:
code = code.replace(f'{subworkflow_section}', f"{sub}\n\n{subworkflow_section}")
#Putting || back
code = code.replace("$OR$", "||")
return remove_extra_jumps(format_with_tabs(code))
#return code
#
##So basically when retriving a thing (process or subworkflow)
##There is necessarily one call associated with the thing -> since we have the option duplicate activated
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment