Skip to content
Snippets Groups Projects
Commit 5c0c1c1f authored by George Marchment's avatar George Marchment
Browse files

Add function get get locally take and emits for subworkflows

parent d85ffdf1
No related merge requests found
Pipeline #14258 failed with stage
in 2 minutes and 13 seconds
......@@ -26,6 +26,8 @@ class Call(Executor):
def __str__(self):
return f"Call_{id(self)}"
def get_parameters(self):
return self.parameters
def get_code(self, clean_pipe = False, get_OG=False):
if(get_OG):
......
......@@ -12,6 +12,8 @@ class Emitted(Channel):
self.emitted_by = emitted_by
if(not emitted_by.is_initialised()):
emitted_by.initialise()
emitted_by.add_to_emits(self)
self.source.append(emitted_by)
self.emits = None #->this is the channel it's emits -> in the case of a subworkflow
......
......@@ -1167,3 +1167,24 @@ def format_with_tabs(code):
return code
#This function takes a list of processes/subworkflows and opeartions
#Add replaces the processes an d subworkflows by their calls
def replace_thing_by_call(tab):
to_remove = []
to_add = []
for ele in tab:
if(ele.get_type() not in ["Operation", "Call"]):
to_remove.append(ele)
call = ele.get_call()
#This verification is really important
if(len(call)!=1):
for c in call:
print(c.get_code(get_OG=True))
raise Exception("This shoudn't happen since duplicate mode is activated")
call = call[0]
to_add.append(call)
for r in to_remove:
tab.remove(r)
tab+=to_add
return tab
\ No newline at end of file
......@@ -31,9 +31,16 @@ class Process(Nextflow_Building_Blocks):
self.initialised = True
self.call = []
self.number_times_called = 0
self.later_emits = []
##It's important this is last
#self.condition = Condition(self)
def add_to_emits(self, emit):
self.later_emits.append(emit)
def get_later_emits(self):
return self.later_emits
def set_alias(self, alias):
self.alias = alias
......
......@@ -20,7 +20,14 @@ class Subworkflow(Main_DSL2):
self.call = []
self.initialised = False
self.later_emits = []
def add_to_emits(self, emit):
self.later_emits.append(emit)
def get_later_emits(self):
return self.later_emits
def set_call(self, call):
self.call.append(call)
......
......@@ -3,7 +3,7 @@
from .nextflow_file import Nextflow_File
from .ro_crate import RO_Crate
from . import constant
from .outils import is_git_directory, format_with_tabs
from .outils import is_git_directory, format_with_tabs, replace_thing_by_call
from .outils_graph import flatten_dico, initia_link_dico_rec, get_number_cycles
from .outils_annotate import get_tools_commands_from_user_for_process
from .bioflowinsighterror import BioFlowInsightError
......@@ -629,6 +629,61 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen
def generate_user_and_process_metadata(self):
self.nextflow_file.generate_user_and_process_metadata()
#This Function returns the channels on which the subworkflow (things_added_in_cluster) depend on
def get_takes(self, things_added_in_cluster):
#Basiccaly this is a deco of channels to opeartions -> when the value is an empty list
#This means that the channel is totally definied in the subworkflow -> so we are searching for
#Channels which aren't totatly defined in the subworkflow
channels_2_sources = {}
for ele in things_added_in_cluster:
if(ele.get_type() == "Operation"):
for o in ele.get_origins():
channels_2_sources[o] = replace_thing_by_call(o.get_source())
elif(ele.get_type() == "Call"):
for param in ele.get_parameters():
if(param.get_type()!="Channel"):
raise Exception("This shouldn't happen -> with the rewrite all the params should be channels")
else:
channels_2_sources[param] = replace_thing_by_call(param.get_source())
else:
raise Exception("This shouldn't happen")
takes = []
for channel in channels_2_sources:
if(set(channels_2_sources[channel]).intersection(things_added_in_cluster)!=set(channels_2_sources[channel])):
takes.append(channel)
#print(things_added_in_cluster)
#print(channels_2_operations_needed)
return takes
#This Function returns the channels the subworkflow (things_added_in_cluster) emits (other things depend on)
def get_emits(self, things_added_in_cluster):
emits = []
channel_2_sink = {}
#Basiccaly this is a deco of channels to opeartions -> when the value is an empty list
#This means that the channel is totally definied in the subworkflow -> so we are searching for
#Channels which aren't totatly defined in the subworkflow
#This means that things outside the subworkflow depend on this channel
channel_2_sink = {}
for ele in things_added_in_cluster:
if(ele.get_type() == "Operation"):
for o in ele.get_gives():
channel_2_sink[o] = replace_thing_by_call(o.get_sink())
elif(ele.get_type() == "Call"):
thing = ele.get_first_element_called()
for e in thing.get_later_emits():
channel_2_sink[e] = replace_thing_by_call(e.get_sink())
else:
print(ele)
raise Exception("This shouldn't happen")
for channel in channel_2_sink:
if(set(channel_2_sink[channel]).intersection(things_added_in_cluster)!=set(channel_2_sink[channel])):
emits.append(channel)
return emits
def convert_to_DSL2(self):
if(self.get_DSL()=="DSL2"):
print("Workflow is already written in DSL2")
......@@ -775,6 +830,10 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen
clusters = self.nextflow_file.graph.get_clusters_from_user_view()
print(clusters)
#TODO -> need to break clusters here
#And redo analysis
#Get the clsuters with the corresponding operations inside
#for i in range(len(clusters)):
# c = clusters[i]
......@@ -789,52 +848,79 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen
#Creating the subworkflows from clusters
calls_in_operations = []
non_relevant_name = 1
for elements in clusters:
name, body, take, emit = "", "", "", ""
for ele in elements:
if(ele.get_type()=="Process"):
#Determine the name of the created subworkflow cluster
if(ele.get_name() in relevant_processes):
name = f"cluster_{ele.get_name()}"
#Get the call of thing (either process or subworkflow)
#TODO -> check it works with subworkflows
call = ele.get_call()
#This verification is really important
if(len(call)!=1):
for c in call:
print(c.get_code(get_OG=True))
#Only create the subworkflows for clusters with more than one element
processes_added = []
things_added_in_cluster = []
if(len(elements)>1):
name, body, take, emit = "", "", "", ""
for ele in elements:
if(ele.get_type()=="Process"):
raise Exception("This shoudn't happen since duplicate mode is activated")
call = call[0]
printed_condition = " && ".join(call.get_condition().get_conditions())
if(printed_condition!=""):
body+=f"if({printed_condition}) {{\n{call.get_code()}\n}}\n"
else:
body+=f"\n{call.get_code()}\n"
elif(ele.get_type()=="Operation"):
#Ignore these cases
if(ele.get_code()[:4] not in ["emit", "take"]):
origins = ele.get_origins()
for o in origins:
if(o.get_type()=="Call"):
calls_in_operations.append(o)
printed_condition = " && ".join(ele.get_condition().get_conditions())
#Determine the name of the created subworkflow cluster
if(ele.get_name() in relevant_processes):
name = f"cluster_{ele.get_name()}"
#Get the call of thing (either process or subworkflow)
#TODO -> check it works with subworkflows
call = ele.get_call()
#This verification is really important
if(len(call)!=1):
for c in call:
print(c.get_code(get_OG=True))
raise Exception("This shoudn't happen since duplicate mode is activated")
call = call[0]
processes_added.append(call.get_first_element_called())
printed_condition = " && ".join(call.get_condition().get_conditions())
if(printed_condition!=""):
body+=f"if({printed_condition}) {{\n{ele.get_code()}\n}}\n"
body+=f"if({printed_condition}) {{\n{call.get_code()}\n}}\n"
else:
body+=f"\n{ele.get_code()}\n"
#Here we removing the Call_12313 thing
for call in calls_in_operations:
body = body.replace(call.get_code(), "")
body = body.replace(str(call), call.get_code())
subworkflow_code = f"subworkflow {name} {{\n{take}\n{body}\n{emit}\n}}"
print(format_with_tabs(subworkflow_code))
print("-----------")
body+=f"\n{call.get_code()}\n"
things_added_in_cluster.append(call)
elif(ele.get_type()=="Operation"):
#Ignore these cases
if(ele.get_code()[:4] not in ["emit", "take"]):
origins = ele.get_origins()
for o in origins:
if(o.get_type()=="Call"):
calls_in_operations.append(o)
printed_condition = " && ".join(ele.get_condition().get_conditions())
if(printed_condition!=""):
body+=f"if({printed_condition}) {{\n{ele.get_code()}\n}}\n"
else:
body+=f"\n{ele.get_code()}\n"
things_added_in_cluster.append(ele)
#TODO check this part of the code is never seen
#Here we removing the Call_12313 thing
for call in calls_in_operations:
raise Exception("This shouldn't happen since the workflows has been rewritten")
body = body.replace(call.get_code(), "")
body = body.replace(str(call), call.get_code())
#If the name=="" -> it means there isn't any relevant processes in the cluster -> it means it's a cluster of non relevant nodes
if(name==""):
#If there are processes called we are going to use them
if(len(processes_added)>0):
#TODO find a better naming system
name = f"non_relevant_cluster_{processes_added[0].get_name()}"
else:
#TODO find a better naming system
name = f"non_relevant_cluster_{non_relevant_name}"
non_relevant_name+=1
subworkflow_code = f"subworkflow {name} {{\n{take}\n{body}\n{emit}\n}}"
print(format_with_tabs(subworkflow_code))
for t in self.get_takes(things_added_in_cluster):
print("*", t.get_code())
for t in self.get_emits(things_added_in_cluster):
print("-", t.get_code())
print("-----------")
#So basically when retriving a thing (process or subworkflow)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment