From d18218573ee5d6959fe27868af117c8f8fc0a0eb Mon Sep 17 00:00:00 2001 From: George Marchment <georgemarchment@yahoo.fr> Date: Mon, 17 Feb 2025 11:04:57 +0100 Subject: [PATCH] Continue updating the rewritting -> i've added the detection of broken subworkflows and their rewritting --- src/call.py | 2 + src/process.py | 8 +- src/subworkflow.py | 9 +++ src/workflow.py | 178 ++++++++++++++++++++++++++++++--------------- 4 files changed, 134 insertions(+), 63 deletions(-) diff --git a/src/call.py b/src/call.py index 02d68eb..d850f10 100644 --- a/src/call.py +++ b/src/call.py @@ -420,6 +420,7 @@ class Call(Executor): process.set_printed_name(f"{temp.get_alias()}_{num}") process.initialise() self.first_element_called = process + process.add_to_calls(self) self.origin.add_element_to_elements_being_called(process) #temp.incremente_number_times_called() if(process==None and subworkflow!=None and fun==None): @@ -430,6 +431,7 @@ class Call(Executor): subworkflow.set_printed_name(f"{temp.get_alias()}_{num}") subworkflow.initialise() self.first_element_called = subworkflow + subworkflow.add_to_calls(self) self.origin.add_element_to_elements_being_called(subworkflow) if(process==None and subworkflow==None and fun!=None): self.first_element_called = fun diff --git a/src/process.py b/src/process.py index 8da04e5..a6317a7 100644 --- a/src/process.py +++ b/src/process.py @@ -84,11 +84,13 @@ class Process(Nextflow_Building_Blocks): def incremente_number_times_called(self): self.number_times_called+=1 - def set_call(self, call): - self.call.append(call) + def add_to_calls(self, call): + self.called_by.append(call) def get_call(self): - return self.call + if(len(self.called_by)!=1): + raise Exception("This shouldn't happen") + return self.called_by[0] def get_script_code(self): diff --git a/src/subworkflow.py b/src/subworkflow.py index 9a5598e..ee80322 100644 --- a/src/subworkflow.py +++ b/src/subworkflow.py @@ -48,6 +48,14 @@ class Subworkflow(Main): self.number_times_copied+=1 return sub, num + def add_to_calls(self, call): + self.called_by.append(call) + + def get_call(self): + if(len(self.called_by)!=1): + raise Exception("This shouldn't happen") + return self.called_by[0] + #TODO make sure this is uptodate def get_calls_by_name(self, name): tab = [] @@ -236,6 +244,7 @@ class Subworkflow(Main): operation = Operation(code[i], self) operation.initialise() operation.change_code(f"emit: {code[i]}") + operation.set_as_artificial() tab.append(operation) #operation.add_gives(channel) #for gives in operation.get_gives(): diff --git a/src/workflow.py b/src/workflow.py index cb1aff1..049e3e6 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -428,6 +428,14 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen raise Exception("This shouldn't happen") return code + def get_subworkflows_called(self): + subs = [] + for c in self.get_workflow_main().get_all_calls_in_workflow(): + ele = c.get_first_element_called() + if(ele.get_type()=="Subworkflow"): + subs.append(ele) + return subs + def rewrite_and_initialise(self, code): #Write new code in temporary file @@ -457,6 +465,56 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen self.graph.generate_user_view(relevant_processes = relevant_processes, render_graphs = render_graphs) + #I do not recommand that the dev uses the same name for the channels inside and outside the channels + #Since the local channels become local at the upper level + def rewrite_subworkflow_call(self, code, subworklfow): + + #Remove the defintion from the code + code = code.replace(subworklfow.get_code(), "") + OG_call = subworklfow.get_call() + OG_body = subworklfow.get_work() + + #REPLACE HEADER TAKES + subworkflow_takes = subworklfow.get_takes() + parameters = OG_call.get_parameters() + if(len(subworkflow_takes)!=len(parameters)): + raise Exception("This shouldn't happen -> the same number of parameters should be kept") + #This is to replace the paramters and the takes + new_header = "" + for i in range(len(parameters)): + param = parameters[i] + takes = subworkflow_takes[i].get_gives()[0] + if(takes.get_code()!=param.get_code(get_OG = True)): + new_header+=f"{takes.get_code()} = {param.get_code(get_OG = True)}" + + code = code.replace(OG_call.get_code(get_OG = True), f"{new_header}\n\n{OG_body}") + + #REPLACE THE EMITS + #TODO admittedly this code below is very moche -> but it's functionnal -> update it + emits = subworklfow.get_emit() + to_replace = [] + all_executors = self.get_workflow_main().get_all_executors_in_workflow() + for exe in all_executors: + #We don't need to check the case call since the workflow has already been rewriteen -> emits only appear in operations + if(exe.get_type()=="Operation"): + emited = exe.get_origins() + if(len(emited)==1): + emited = emited[0] + if(emited.get_type()=="Emitted"): + if(emited.get_emitted_by()==subworklfow): + if(emited.get_emits() not in emits): + raise Exception("This shoudn't happen -> since it is the actual subworkflow") + to_replace.append((exe.get_code(get_OG = True), f"{exe.get_gives()[0].get_code()} = {emited.get_emits().get_origins()[0].get_code()}")) + for r in to_replace: + old, new = r + #Case of channel == channel + if(new.split("=")[0].strip()==new.split("=")[1].strip()): + new = '' + code = code.replace(old, new) + + return code + + #Method which rewrites the workflow follwong the user view #Conert workflow to user_view only makes sense when the option duplicate is activated -> otherwise is doesn't make sense + it makes the analysis way more complicated def convert_workflow_2_user_view(self, relevant_processes = []): @@ -468,66 +526,66 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen self.check_relevant_processes_in_workflow(relevant_processes) self.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = []) clusters = self.graph.get_clusters_from_user_view() - # - ##DETERMING WHICH SUBWORKFLOWS ARE BROKEN WITH THE CLUSTER - ##Creating the clusters with calls instead of processes or subworkflows - #set_clusters_with_calls = [] - #for c in clusters: - # tab = [] - # for ele in c: - # if(ele.get_type()=="Operation"): - # if(ele.get_artificial_status()==False): - # tab.append(ele) - # else: - # call = ele.get_call() - # if(len(call)!=1): - # raise Exception("This shouldn't happen") - # tab.append(call[0]) - # set_clusters_with_calls.append(set(tab)) - # - ##Getting subworkflows to executors - #subworkflow_2_executors = {} - #for sub in self.get_subworkflows_called(): - # dico = {} - # sub.get_all_executors(dico) - # temp = set(list(dico.keys())) - # subworkflow_2_executors[sub] = [] - # for ele in temp: - # #Cannot add calls to subworkflows -> they are already present by definition - # if(ele.get_type()=="Call" and ele.get_first_element_called().get_type()=="Subworkflow"): - # None - # #We don't add it - # else: - # subworkflow_2_executors[sub].append(ele) - # #subworkflow_2_executors[sub.get_name()] = set(list(dico.keys())) - # - #def get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls): - # broken_subworkflows = [] - # for sub in subworkflow_2_executors: - # #You want this list (set) to be equal to subworkflow_2_executors[sub] - # elements_in_sub_with_clusters = [] - # for cluster in set_clusters_with_calls: - # if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])): - # break - # for c in cluster: - # if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])): - # break - # if(c in subworkflow_2_executors[sub]): - # elements_in_sub_with_clusters+=list(cluster) - # break - # if(set(elements_in_sub_with_clusters)==set(subworkflow_2_executors[sub])): - # None - # #This means that the subworkflow is Intact - # else: - # #This means that the subworkflow is broken - # broken_subworkflows.append(sub) - # return broken_subworkflows - # - #broken_subworkflows = get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls) - ##Rewrite broken subworkflows - #for sub in broken_subworkflows: - # code = self.rewrite_subworkflow_call(code, sub) - # + + #DETERMING WHICH SUBWORKFLOWS ARE BROKEN WITH THE CLUSTER + #Creating the clusters with calls instead of processes or subworkflows + set_clusters_with_calls = [] + for c in clusters: + tab = [] + for ele in c: + if(ele.get_type()=="Operation"): + if(ele.get_artificial_status()==False): + tab.append(ele) + else: + call = ele.get_call() + tab.append(call) + set_clusters_with_calls.append(set(tab)) + + + #Getting subworkflows to executors + subworkflow_2_executors = {} + for sub in self.get_subworkflows_called(): + executors = sub.get_all_executors_in_workflow() + subworkflow_2_executors[sub] = [] + for ele in executors: + #Cannot add calls to subworkflows -> they are already present by definition + if(ele.get_type()=="Call" and ele.get_first_element_called().get_type()=="Subworkflow"): + None + #We don't add it + else: + subworkflow_2_executors[sub].append(ele) + #subworkflow_2_executors[sub.get_name()] = set(list(dico.keys())) + + #TODO -> write tests to test this function + def get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls): + broken_subworkflows = [] + for sub in subworkflow_2_executors: + #You want this list (set) to be equal to subworkflow_2_executors[sub] + elements_in_sub_with_clusters = [] + for cluster in set_clusters_with_calls: + if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])): + break + for c in cluster: + if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])): + break + if(c in subworkflow_2_executors[sub]): + elements_in_sub_with_clusters+=list(cluster) + break + if(set(elements_in_sub_with_clusters)==set(subworkflow_2_executors[sub])): + None + #This means that the subworkflow is Intact + else: + #This means that the subworkflow is broken + broken_subworkflows.append(sub) + return broken_subworkflows + + broken_subworkflows = get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls) + + + #Rewrite broken subworkflows + for sub in broken_subworkflows: + code = self.rewrite_subworkflow_call(code, sub) + ##TODO -> this needs to be optimised #self.rewrite_and_initialise(code) ##Get the clusters and the code -- GitLab