diff --git a/src/call.py b/src/call.py index bdc880c4eab8ece4c070e938433dc29e031c92ae..445872851fa732f8225b905d4f7260536fc27a3b 100644 --- a/src/call.py +++ b/src/call.py @@ -26,6 +26,8 @@ class Call(Executor): def __str__(self): return f"Call_{id(self)}" + def get_parameters(self): + return self.parameters def get_code(self, clean_pipe = False, get_OG=False): if(get_OG): diff --git a/src/emitted.py b/src/emitted.py index 3779c95585e84426590e67d4bbac6fd4c72c89a5..d764792e1a388803809a52e5f2894f48812abf7c 100644 --- a/src/emitted.py +++ b/src/emitted.py @@ -12,6 +12,8 @@ class Emitted(Channel): self.emitted_by = emitted_by if(not emitted_by.is_initialised()): emitted_by.initialise() + emitted_by.add_to_emits(self) + self.source.append(emitted_by) self.emits = None #->this is the channel it's emits -> in the case of a subworkflow diff --git a/src/outils.py b/src/outils.py index e33d6a0e1fd79c05c5d3fcd4d53f7d39cc812e02..9415f31d5b8be5dd27653e8d602d00c71260106d 100644 --- a/src/outils.py +++ b/src/outils.py @@ -1167,3 +1167,24 @@ def format_with_tabs(code): return code +#This function takes a list of processes/subworkflows and opeartions +#Add replaces the processes an d subworkflows by their calls +def replace_thing_by_call(tab): + to_remove = [] + to_add = [] + for ele in tab: + if(ele.get_type() not in ["Operation", "Call"]): + to_remove.append(ele) + call = ele.get_call() + #This verification is really important + if(len(call)!=1): + for c in call: + print(c.get_code(get_OG=True)) + raise Exception("This shoudn't happen since duplicate mode is activated") + call = call[0] + to_add.append(call) + + for r in to_remove: + tab.remove(r) + tab+=to_add + return tab \ No newline at end of file diff --git a/src/process.py b/src/process.py index 44c2ad818f65a5893ff2b2600568910cbeb20f90..4359ea669ee48a86d1968665e3903a58e2b1f30d 100644 --- a/src/process.py +++ b/src/process.py @@ -31,9 +31,16 @@ class Process(Nextflow_Building_Blocks): self.initialised = True self.call = [] self.number_times_called = 0 + self.later_emits = [] ##It's important this is last #self.condition = Condition(self) + def add_to_emits(self, emit): + self.later_emits.append(emit) + + def get_later_emits(self): + return self.later_emits + def set_alias(self, alias): self.alias = alias diff --git a/src/subworkflow.py b/src/subworkflow.py index 1c1ef2c72f85c0fe5b938477db161e597acf2a2d..9a9f857aef57d3c5bd10e5ce27984df79884552f 100644 --- a/src/subworkflow.py +++ b/src/subworkflow.py @@ -20,7 +20,14 @@ class Subworkflow(Main_DSL2): self.call = [] self.initialised = False - + self.later_emits = [] + + + def add_to_emits(self, emit): + self.later_emits.append(emit) + + def get_later_emits(self): + return self.later_emits def set_call(self, call): self.call.append(call) diff --git a/src/workflow.py b/src/workflow.py index 9d0f6639b63ee76647abfc416abf6cf0863a7066..18a10ad62734b68e3009086ef40ed48872c93191 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -3,7 +3,7 @@ from .nextflow_file import Nextflow_File from .ro_crate import RO_Crate from . import constant -from .outils import is_git_directory, format_with_tabs +from .outils import is_git_directory, format_with_tabs, replace_thing_by_call from .outils_graph import flatten_dico, initia_link_dico_rec, get_number_cycles from .outils_annotate import get_tools_commands_from_user_for_process from .bioflowinsighterror import BioFlowInsightError @@ -629,6 +629,61 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen def generate_user_and_process_metadata(self): self.nextflow_file.generate_user_and_process_metadata() + #This Function returns the channels on which the subworkflow (things_added_in_cluster) depend on + def get_takes(self, things_added_in_cluster): + #Basiccaly this is a deco of channels to opeartions -> when the value is an empty list + #This means that the channel is totally definied in the subworkflow -> so we are searching for + #Channels which aren't totatly defined in the subworkflow + channels_2_sources = {} + + for ele in things_added_in_cluster: + if(ele.get_type() == "Operation"): + for o in ele.get_origins(): + channels_2_sources[o] = replace_thing_by_call(o.get_source()) + elif(ele.get_type() == "Call"): + for param in ele.get_parameters(): + if(param.get_type()!="Channel"): + raise Exception("This shouldn't happen -> with the rewrite all the params should be channels") + else: + channels_2_sources[param] = replace_thing_by_call(param.get_source()) + else: + raise Exception("This shouldn't happen") + + takes = [] + for channel in channels_2_sources: + if(set(channels_2_sources[channel]).intersection(things_added_in_cluster)!=set(channels_2_sources[channel])): + takes.append(channel) + #print(things_added_in_cluster) + #print(channels_2_operations_needed) + return takes + + #This Function returns the channels the subworkflow (things_added_in_cluster) emits (other things depend on) + def get_emits(self, things_added_in_cluster): + emits = [] + channel_2_sink = {} + #Basiccaly this is a deco of channels to opeartions -> when the value is an empty list + #This means that the channel is totally definied in the subworkflow -> so we are searching for + #Channels which aren't totatly defined in the subworkflow + #This means that things outside the subworkflow depend on this channel + channel_2_sink = {} + + for ele in things_added_in_cluster: + if(ele.get_type() == "Operation"): + for o in ele.get_gives(): + channel_2_sink[o] = replace_thing_by_call(o.get_sink()) + elif(ele.get_type() == "Call"): + thing = ele.get_first_element_called() + for e in thing.get_later_emits(): + channel_2_sink[e] = replace_thing_by_call(e.get_sink()) + else: + print(ele) + raise Exception("This shouldn't happen") + + for channel in channel_2_sink: + if(set(channel_2_sink[channel]).intersection(things_added_in_cluster)!=set(channel_2_sink[channel])): + emits.append(channel) + return emits + def convert_to_DSL2(self): if(self.get_DSL()=="DSL2"): print("Workflow is already written in DSL2") @@ -775,6 +830,10 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen clusters = self.nextflow_file.graph.get_clusters_from_user_view() print(clusters) + #TODO -> need to break clusters here + #And redo analysis + + #Get the clsuters with the corresponding operations inside #for i in range(len(clusters)): # c = clusters[i] @@ -789,52 +848,79 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen #Creating the subworkflows from clusters calls_in_operations = [] + non_relevant_name = 1 for elements in clusters: - - name, body, take, emit = "", "", "", "" - for ele in elements: - if(ele.get_type()=="Process"): - - #Determine the name of the created subworkflow cluster - if(ele.get_name() in relevant_processes): - name = f"cluster_{ele.get_name()}" - #Get the call of thing (either process or subworkflow) - #TODO -> check it works with subworkflows - call = ele.get_call() - - #This verification is really important - if(len(call)!=1): - for c in call: - print(c.get_code(get_OG=True)) + #Only create the subworkflows for clusters with more than one element + processes_added = [] + things_added_in_cluster = [] + if(len(elements)>1): + name, body, take, emit = "", "", "", "" + for ele in elements: + if(ele.get_type()=="Process"): - raise Exception("This shoudn't happen since duplicate mode is activated") - call = call[0] - printed_condition = " && ".join(call.get_condition().get_conditions()) - if(printed_condition!=""): - body+=f"if({printed_condition}) {{\n{call.get_code()}\n}}\n" - else: - body+=f"\n{call.get_code()}\n" - elif(ele.get_type()=="Operation"): - - #Ignore these cases - if(ele.get_code()[:4] not in ["emit", "take"]): - origins = ele.get_origins() - for o in origins: - if(o.get_type()=="Call"): - calls_in_operations.append(o) - printed_condition = " && ".join(ele.get_condition().get_conditions()) + #Determine the name of the created subworkflow cluster + if(ele.get_name() in relevant_processes): + name = f"cluster_{ele.get_name()}" + #Get the call of thing (either process or subworkflow) + #TODO -> check it works with subworkflows + call = ele.get_call() + + #This verification is really important + if(len(call)!=1): + for c in call: + print(c.get_code(get_OG=True)) + + raise Exception("This shoudn't happen since duplicate mode is activated") + call = call[0] + processes_added.append(call.get_first_element_called()) + printed_condition = " && ".join(call.get_condition().get_conditions()) if(printed_condition!=""): - body+=f"if({printed_condition}) {{\n{ele.get_code()}\n}}\n" + body+=f"if({printed_condition}) {{\n{call.get_code()}\n}}\n" else: - body+=f"\n{ele.get_code()}\n" - #Here we removing the Call_12313 thing - for call in calls_in_operations: - body = body.replace(call.get_code(), "") - body = body.replace(str(call), call.get_code()) - - subworkflow_code = f"subworkflow {name} {{\n{take}\n{body}\n{emit}\n}}" - print(format_with_tabs(subworkflow_code)) - print("-----------") + body+=f"\n{call.get_code()}\n" + things_added_in_cluster.append(call) + elif(ele.get_type()=="Operation"): + + #Ignore these cases + if(ele.get_code()[:4] not in ["emit", "take"]): + origins = ele.get_origins() + for o in origins: + if(o.get_type()=="Call"): + calls_in_operations.append(o) + printed_condition = " && ".join(ele.get_condition().get_conditions()) + if(printed_condition!=""): + body+=f"if({printed_condition}) {{\n{ele.get_code()}\n}}\n" + else: + body+=f"\n{ele.get_code()}\n" + things_added_in_cluster.append(ele) + + #TODO check this part of the code is never seen + #Here we removing the Call_12313 thing + for call in calls_in_operations: + raise Exception("This shouldn't happen since the workflows has been rewritten") + body = body.replace(call.get_code(), "") + body = body.replace(str(call), call.get_code()) + + #If the name=="" -> it means there isn't any relevant processes in the cluster -> it means it's a cluster of non relevant nodes + if(name==""): + #If there are processes called we are going to use them + if(len(processes_added)>0): + #TODO find a better naming system + name = f"non_relevant_cluster_{processes_added[0].get_name()}" + else: + #TODO find a better naming system + name = f"non_relevant_cluster_{non_relevant_name}" + non_relevant_name+=1 + + subworkflow_code = f"subworkflow {name} {{\n{take}\n{body}\n{emit}\n}}" + print(format_with_tabs(subworkflow_code)) + + for t in self.get_takes(things_added_in_cluster): + print("*", t.get_code()) + + for t in self.get_emits(things_added_in_cluster): + print("-", t.get_code()) + print("-----------") #So basically when retriving a thing (process or subworkflow)