From dd319afcf81e6b9c05e168bccc1de98a58297ad3 Mon Sep 17 00:00:00 2001 From: George Marchment <georgemarchment@yahoo.fr> Date: Fri, 14 Feb 2025 15:36:34 +0100 Subject: [PATCH] Started added workflow rewrite --- src/main.py | 43 +++++ src/root.py | 8 +- src/workflow.py | 448 +++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 490 insertions(+), 9 deletions(-) diff --git a/src/main.py b/src/main.py index fc011ee..2834d5b 100644 --- a/src/main.py +++ b/src/main.py @@ -48,6 +48,49 @@ class Main(Nextflow_Building_Blocks): dico = {} self.root.get_all_calls_in_subworkflow(calls = dico) return list(dico.keys()) + + def get_all_executors_in_subworkflow(self): + dico = {} + self.root.get_all_executors_in_subworkflow(calls = dico) + return list(dico.keys()) + + + #TODO -> write tests to test this method + def get_all_calls_in_workflow(self): + all_calls = self.get_all_calls_in_subworkflow() + dico = {} + for c in all_calls: + sub = c.get_first_element_called() + if(sub.get_type()=="Subworkflow"): + if(c not in dico): + sub_calls = sub.get_all_calls_in_workflow() + for sub_c in sub_calls: + dico[sub_c] = "" + for c in all_calls: + dico[c] = "" + + return list(dico.keys()) + + #TODO -> write tests to test this method + def get_all_executors_in_workflow(self): + all_executors = self.get_all_executors_in_subworkflow() + dico = {} + for e in all_executors: + if(e.get_type()=="Call"): + for c in e.get_all_calls(): + sub = c.get_first_element_called() + if(sub.get_type()=="Subworkflow"): + if(c not in dico): + sub_calls = sub.get_all_executors_in_workflow() + for sub_c in sub_calls: + dico[sub_c] = "" + #Case it's an operation + else: + dico[e] = "" + for e in all_executors: + dico[e] = "" + + return list(dico.keys()) def check_includes(self): diff --git a/src/root.py b/src/root.py index a7e3af1..2e92e8d 100644 --- a/src/root.py +++ b/src/root.py @@ -220,7 +220,13 @@ class Root(Nextflow_Building_Blocks): calls[c] = '' #if(c.get_first_element_called().get_type()=="Subworkflow"): # c.get_first_element_called().root.get_all_calls(calls = calls) - + + + def get_all_executors_in_subworkflow(self, calls = {}): + all_executors = self.get_executors_same_level()+self.get_inside_executors() + for e in all_executors: + calls[e] = '' + ############# # PROCESSES diff --git a/src/workflow.py b/src/workflow.py index d4f43d0..c4d8dc0 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -43,20 +43,23 @@ class Workflow: nextflow_files = glob.glob(f'{file}/*.nf') if(len(nextflow_files)==0): raise BioFlowInsightError("No Nextflow files ('.nf') are in the directory!", num = -1) + txt = "" #Try to read the main.nf file -> if this cannot be found then the first nextflow file is used try: - file = '/'.join(nextflow_files[0].split('/')[:-1])+"/main.nf" + + file = file+"/main.nf" with open(file, 'r') as f: txt= f.read() except: None #raise BioFlowInsightError("No 'main.nf' file found at the root of the prohject") - if(len(nextflow_files)==1): - file = nextflow_files[0] - with open(file, 'r') as f: - txt= f.read() - else: - raise BioFlowInsightError("Multiple Nextflow files found at the root with no 'main.nf' file: I don't know which one to select") + if(txt==""): + if(len(nextflow_files)==1): + file = nextflow_files[0] + with open(file, 'r') as f: + txt= f.read() + else: + raise BioFlowInsightError("Multiple Nextflow files found at the root with no 'main.nf' file: I don't know which one to select") @@ -272,7 +275,6 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen for f in nextflow_file.functions: function = f.get_code() functions.append(function) - print(functions) for r in functions: code = code.replace(r, "") code = code.replace(function_section, "\n\n".join(functions)) @@ -301,6 +303,436 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen code = code.replace("$OR$", "||") return code + + #This methods generates a random set of processes to consider as relavant + #It's not a uniform random it's a bit of a gaussian, centered at 0.5 + def generate_random_relevant_processes(self, alpha = -1): + import random + + #Random value between 0 and 1, centered at 0.5 + def get_value(): + check = True + val = -1 + while(check): + check = False + val = random.gauss(0.5, 0.1) + if(val<0 or val>1): + check = True + return val + + if(self.duplicate): + if(alpha == -1): + alpha = get_value() + else: + if(0<=alpha and alpha<=1): + None + else: + raise BioFlowInsightError("alpha is not in the interval [0; 1]") + + processes_called = [] + for c in self.get_workflow_main().get_all_calls_in_workflow(): + p = c.get_first_element_called() + if(p.get_type()=="Process"): + processes_called.append(p) + nb_2_select = int(alpha*len(processes_called)) + sampled = random.sample(set(processes_called), nb_2_select) + #name_select = [] + #for p in sampled: + # name_select.append(p.get_alias()) + return sampled + else: + raise BioFlowInsightError("Trying to generate random relevant processes however option 'duplicate' is not activated.") + + #This method rewrites the entire workflow into one single file + def write_workflow_into_one_file(self): + #This tag is used as an identification to safely manipulate the string + tag = str(time.time()) + + self.get_first_file + code = self.get_first_file().get_code() + + #params_section = f"//PARAMS_SECTION_{tag}" + function_section = f"//FUNCTION_SECTION" + process_section = f"//PROCESS_SECTION" + subworkflow_section = f"//SUBWORKFLOW_SECTION" + + ankers = function_section+ "\n"*3 + process_section+ "\n"*3 + subworkflow_section + + to_replace = [] + for match in re.finditer(constant.FULL_INLCUDE_2, code): + to_replace.append(match.group(0)) + for r in to_replace: + code = code.replace(r, ankers) + ankers = "" + + #Adding processes into code + for p in self.get_processes_called(): + if(p.get_code() not in code): + code = code.replace(process_section, '\n'+p.get_code_with_alias()+'\n'+process_section) + + #Adding subworkflows into code + for sub in self.get_subworkflows_called(): + if(sub.get_code() not in code): + code = code.replace(subworkflow_section, subworkflow_section+'\n'+sub.get_code_with_alias()+'\n') + + #Adding functions into code + for fun in self.get_functions_called(): + if(fun.get_code() not in code): + code = code.replace(function_section, function_section+'\n'+fun.get_code()+'\n') + + #Remove the ankers + #code = code.replace(function_section, "") + #code = code.replace(process_section, "") + #code = code.replace(subworkflow_section, "") + ankers = {"function_section":function_section, + "process_section":process_section, + "subworkflow_section":subworkflow_section} + + return code, ankers + + + + #TODO -> write tests for this method + #Function that rewrites the workflow code + #Rewriting everything in one file + simplifying the operations and calls to simplify the analysis + def simplify_workflow_code(self): + code = self.get_first_file().get_code() + #code, ankers = self.write_workflow_into_one_file() + #TODO -> update method get_all_executors_from_workflow -> right now it's not searching through the subworkflows + for exe in self.get_workflow_main().get_all_executors_in_workflow(): + if(exe.get_type()=="Call" or exe.get_type()=="Operation"): + code = code.replace(exe.get_code(get_OG = True), exe.simplify_code()) + else: + print(exe.get_code(), exe.get_type()) + raise Exception("This shouldn't happen") + return code + + + #Method which rewrites the workflow follwong the user view + #Conert workflow to user_view only makes sense when the option duplicate is activated -> otherwise is doesn't make sense + it makes the analysis way more complicated + def convert_workflow_2_user_view(self, relevant_processes = []): + if(self.duplicate): + None + code = self.simplify_workflow_code() + print(code) + #self.rewrite_and_initialise(code) + # + ##Get the clusters and the code + #self.check_relevant_processes_in_workflow(relevant_processes) + #self.nextflow_file.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = []) + #clusters = self.nextflow_file.graph.get_clusters_from_user_view() + # + ##DETERMING WHICH SUBWORKFLOWS ARE BROKEN WITH THE CLUSTER + ##Creating the clusters with calls instead of processes or subworkflows + #set_clusters_with_calls = [] + #for c in clusters: + # tab = [] + # for ele in c: + # if(ele.get_type()=="Operation"): + # if(ele.get_artificial_status()==False): + # tab.append(ele) + # else: + # call = ele.get_call() + # if(len(call)!=1): + # raise Exception("This shouldn't happen") + # tab.append(call[0]) + # set_clusters_with_calls.append(set(tab)) + # + ##Getting subworkflows to executors + #subworkflow_2_executors = {} + #for sub in self.get_subworkflows_called(): + # dico = {} + # sub.get_all_executors(dico) + # temp = set(list(dico.keys())) + # subworkflow_2_executors[sub] = [] + # for ele in temp: + # #Cannot add calls to subworkflows -> they are already present by definition + # if(ele.get_type()=="Call" and ele.get_first_element_called().get_type()=="Subworkflow"): + # None + # #We don't add it + # else: + # subworkflow_2_executors[sub].append(ele) + # #subworkflow_2_executors[sub.get_name()] = set(list(dico.keys())) + # + #def get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls): + # broken_subworkflows = [] + # for sub in subworkflow_2_executors: + # #You want this list (set) to be equal to subworkflow_2_executors[sub] + # elements_in_sub_with_clusters = [] + # for cluster in set_clusters_with_calls: + # if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])): + # break + # for c in cluster: + # if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])): + # break + # if(c in subworkflow_2_executors[sub]): + # elements_in_sub_with_clusters+=list(cluster) + # break + # if(set(elements_in_sub_with_clusters)==set(subworkflow_2_executors[sub])): + # None + # #This means that the subworkflow is Intact + # else: + # #This means that the subworkflow is broken + # broken_subworkflows.append(sub) + # return broken_subworkflows + # + #broken_subworkflows = get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls) + ##Rewrite broken subworkflows + #for sub in broken_subworkflows: + # code = self.rewrite_subworkflow_call(code, sub) + # + ##TODO -> this needs to be optimised + #self.rewrite_and_initialise(code) + ##Get the clusters and the code + #self.nextflow_file.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = []) + #clusters = self.nextflow_file.graph.get_clusters_from_user_view() + # + # + # + ##Get the clsuters with the corresponding operations inside + ##for i in range(len(clusters)): + ## c = clusters[i] + ## if(len(c)>1): + ## clusters[i] = self.nextflow_file.graph.get_induced_subgraph(c) + ##print(clusters) + ##Get the topological order + #clusters = self.nextflow_file.graph.get_topogical_order(clusters) + ##print(clusters) + # + # + ##Creating the subworkflows from clusters + #calls_in_operations = [] + #non_relevant_name = 1 + #channels_to_replace_outside_of_cluster = [] + #subworkflow_clusters_to_add, subworkflow_cluster_calls_to_add = [], [] + #index_cluster = 0 + #for elements in clusters: + # #Only create the subworkflows for clusters with more than one element + # processes_added = [] + # things_added_in_cluster = [] + # if(len(elements)>1): + # name, body, take, emit = "", "", "", "" + # first_element = True + # for ele in elements: + # + # if(ele.get_type()=="Process"): + # + # #Determine the name of the created subworkflow cluster + # if(ele.get_alias() in relevant_processes): + # name = f"cluster_{ele.get_alias()}" + # #Get the call of thing (either process or subworkflow) + # #TODO -> check it works with subworkflows + # call = ele.get_call() + # + # #This verification is really important + # if(len(call)!=1): + # for c in call: + # print(c.get_code(get_OG=True)) + # + # raise Exception("This shoudn't happen since duplicate mode is activated") + # call = call[0] + # + # #If first element -> add marker for the subworkflow call + # if(first_element): + # code = code.replace(call.get_code(get_OG = True), f"//Anker_cluster{index_cluster}") + # first_element = False + # else: + # code = code.replace(call.get_code(get_OG = True), "") + # + # + # processes_added.append(call.get_first_element_called()) + # printed_condition = " && ".join(call.get_condition().get_conditions()) + # if(printed_condition!=""): + # body+=f"if({printed_condition}) {{\n{call.get_code()}\n}}\n" + # else: + # body+=f"\n{call.get_code()}\n" + # things_added_in_cluster.append(call) + # elif(ele.get_type()=="Operation"): + # + # #If first element -> add marker for the subworkflow call + # if(first_element): + # code = code.replace(ele.get_code(get_OG = True), f"//Anker_cluster{index_cluster}") + # first_element = False + # else: + # code = code.replace(ele.get_code(get_OG = True), "") + # + # #Ignore these cases + # if(ele.get_code()[:4] not in ["emit", "take"]): + # origins = ele.get_origins() + # for o in origins: + # if(o.get_type()=="Call"): + # calls_in_operations.append(o) + # printed_condition = " && ".join(ele.get_condition().get_conditions()) + # if(printed_condition!=""): + # body+=f"if({printed_condition}) {{\n{ele.get_code()}\n}}\n" + # else: + # body+=f"\n{ele.get_code()}\n" + # things_added_in_cluster.append(ele) + # else: + # raise Exception("This shoudn't happen") + # + # #TODO check this part of the code is never seen + # #Here we removing the Call_12313 thing + # for call in calls_in_operations: + # raise Exception("This shouldn't happen since the workflows has been rewritten") + # body = body.replace(call.get_code(), "") + # body = body.replace(str(call), call.get_code()) + # + # #If the name=="" -> it means there isn't any relevant processes in the cluster -> it means it's a cluster of non relevant nodes + # if(name==""): + # #If there are processes called we are going to use them + # if(len(processes_added)>0): + # #TODO find a better naming system + # name = f"non_relevant_cluster_{processes_added[0].get_alias()}" + # else: + # #TODO find a better naming system + # name = f"non_relevant_cluster_{non_relevant_name}" + # non_relevant_name+=1 + # + # #Check that there is a single condtion in the body + # body = group_together_ifs(body) + # conditions_in_subworkflow = [] + # end = -1 + # for match in re.finditer(r"if\s*(\([^\{]+)\{", body): + # conditions_in_subworkflow.append(match.group(1).strip()) + # _, start = match.span(0) + # + # if(len(conditions_in_subworkflow)==1): + # end = extract_curly(body, start) + # body = body[start: end-1].strip() + # + # + # #TAKE + # #Adding take parameters on the inside of the subworkflow + # takes_param = self.get_takes(things_added_in_cluster) + # new_param_names, index, old_param_names = [], 1, [] + # for param in takes_param: + # param_name = f"param_{name}_{index}" + # new_param_names.append(param_name) + # old_param_names.append(param.get_code()) + # index += 1 + # if(len(new_param_names)>0): + # temp = '\n'.join(new_param_names) + # take = f"\ntake:\n{temp}\n" + # + # #EMIT + # #Adding the emitted outputs + # emitted_outputs = self.get_emits(things_added_in_cluster) + # new_output_names, index, old_output_names = [], 0, [] + # for output in emitted_outputs: + # output_name = f"{name}.out[{index}]" + # new_output_names.append(output_name) + # old_output_names.append(output.get_code()) + # index += 1 + # + # if(len(old_output_names)>0): + # temp = '\n'.join(old_output_names) + # emit = f"\nemit:\n{temp}\n" + # + # #Adding empty channels if it doesn't exist in the case of a negative condition + # body = get_channels_to_add_in_false_conditions(body, old_output_names) + # + # + # + # #Replace names inside subworkflow + # subworkflow_code = f"workflow {name} {{\n{take}\nmain:\n{body}\n{emit}\n}}" + # for i in range(len(new_param_names)): + # pattern = fr"[\=\,\(] *({re.escape(takes_param[i].get_code())})[\s\,\)\.]" + # subworkflow_code = replace_group1(subworkflow_code, pattern, new_param_names[i]) + # #subworkflow_code = subworkflow_code.replace(takes_param[i].get_code(), new_param_names[i]) + # + # #TODO -> added verification of conditions + # params = ", ".join(old_param_names) + # subworkfow_call_case_true = f"{name}({params})" + # subworkfow_call_case_false = "" + # for i in range(len(new_output_names)): + # #In the case of channels, we just add chanel = subworkflow.out[i] + # if(not bool(re.findall("\.\s*out", old_output_names[i]))): + # subworkfow_call_case_true+=f"\n{old_output_names[i]} = {new_output_names[i]}" + # subworkfow_call_case_false+=f"\n{old_output_names[i]} = Channel.empty()" + # #In the case of emitted values we need to replace the code on the outside + # else: + # param_out_name= f"{name}_out_{i+1}" + # subworkfow_call_case_true+=f"\n{param_out_name} = {new_output_names[i]}" + # subworkfow_call_case_false+=f"\n{param_out_name} = Channel.empty()" + # channels_to_replace_outside_of_cluster.append((old_output_names[i], param_out_name)) + # #If there was only one single condition in the subworkflow cluster -> then we add it when the call is done + # if(len(conditions_in_subworkflow)==1): + # subworkfow_call = f"if{conditions_in_subworkflow[0]} {{\n{subworkfow_call_case_true}\n}} else {{\n{subworkfow_call_case_false}\n}}" + # None + # else: + # subworkfow_call = subworkfow_call_case_true + # + # + # subworkflow_clusters_to_add.append(subworkflow_code) + # subworkflow_cluster_calls_to_add.append(subworkfow_call) + # index_cluster+=1 + # + # + ##TODO -> rmoving the conditions which are problematic + ##This might not be the probleme -> when rerunnung the analysis isn't totally robust + #still_simplifying_conditions = True + #while(still_simplifying_conditions): + # still_simplifying_conditions = False + # to_replace, anker1, anker2 = "", "", "" + # #Replace if/else + # for match in re.finditer(r"if\s*\([^\{]+\{\s*(\/\/Anker_cluster\d|\s)\s*\}\s*else\s*\{\s*(\/\/Anker_cluster\d|\s)\s*\}", code): + # to_replace = match.group(0) + # anker1, anker2 = match.group(1), match.group(2) + # still_simplifying_conditions = True + # break + # #Replace empty if on its own + # if(not still_simplifying_conditions): + # for match in re.finditer(r"(if\s*\([^\{]+\{\s*(\/\/Anker_cluster\d|\s)\s*\})\s*[^e]", code): + # to_replace = match.group(1) + # anker1 = match.group(2) + # still_simplifying_conditions = True + # break + # if(still_simplifying_conditions): + # code = code.replace(to_replace, f"{anker1}\n{anker2}") + # + # + ##Replace the ankers by the calls of the subworkflows + #for i in range(len(subworkflow_clusters_to_add)): + # #print(f"//Anker_cluster{i}", subworkflow_cluster_calls_to_add[i]) + # code = code.replace(f"//Anker_cluster{i}", subworkflow_cluster_calls_to_add[i]) + # + #for old, new in channels_to_replace_outside_of_cluster: + # pattern= fr"[ \(,]({re.escape(old)})" + # code = replace_group1(code, pattern, new) + # #code = code.replace(old, new) + # + # + ##Add the subworkflow defintions + ##------------------------------------- + ##Add anker + #subworkflow_section = f"//ANKER 4 SUBWORKFLOW DEF" + #to_replace = "" + #for match in re.finditer(r"workflow\s+\w*\s*\{", code): + # to_replace = match.group(0) + # break + #if(to_replace==""): + # raise Exception("No call to a workflow") + # + #code = code.replace(to_replace, f"{subworkflow_section}\n\n{to_replace}") + # + #for sub in subworkflow_clusters_to_add: + # code = code.replace(f'{subworkflow_section}', f"{sub}\n\n{subworkflow_section}") + # + # + ##Putting || back + #code = code.replace("$OR$", "||") + # + #return remove_extra_jumps(format_with_tabs(code)) + # + # + ##So basically when retriving a thing (process or subworkflow) + ##There is necessarily one call associated with the thing -> since we have the option duplicate activated + ##You can check if len(thing.call)==1 -> and if not raise an error (but this shouldn't happen) + else: + raise BioFlowInsightError("Trying to convert the workflow with user view however option 'duplicate' is not activated. -> to reformulated") + + -- GitLab