#Import dependencies #Local from .nextflow_file import Nextflow_File from .ro_crate import RO_Crate from . import constant from .outils import is_git_directory, format_with_tabs, replace_thing_by_call, replace_group1, group_together_ifs, extract_curly, remove_extra_jumps, get_channels_to_add_in_false_conditions, extract_conditions, remove_empty_conditions_place_anker from .outils_graph import get_flatten_dico, initia_link_dico_rec, get_number_cycles, generate_graph from .outils_annotate import get_tools_commands_from_user_for_process from .bioflowinsighterror import BioFlowInsightError from .graph import Graph #Outside packages import os import re import json from pathlib import Path import glob import ctypes import time class Workflow: """ This is the main workflow class, from this class, workflow analysis can be done. After analysis, workflow structure reconstruction can be done. Attributes: file: A string indicating the address to the workflow main or the directory containing the workflow duplicate: A boolean indicating if processes are to be duplicated in the structure display_info: A boolean indicating if the analysis information should be printed output_dir: A string indicating where the results will be saved name: A string indicating the name of the workflow processes_2_remove: A string indicating the processes to remove from the workflow """ def __init__(self, file, duplicate=True, display_info=True, output_dir = './results', name = None, processes_2_remove = None): #Getting the main nextflow file if(not os.path.isfile(file)): nextflow_files = glob.glob(f'{file}/*.nf') if(len(nextflow_files)==0): raise BioFlowInsightError("No Nextflow files ('.nf') are in the directory!", num = -1) txt = "" #Try to read the main.nf file -> if this cannot be found then the first nextflow file is used try: file = file+"/main.nf" with open(file, 'r') as f: txt= f.read() except: None #raise BioFlowInsightError("No 'main.nf' file found at the root of the prohject") if(txt==""): if(len(nextflow_files)==1): file = nextflow_files[0] with open(file, 'r') as f: txt= f.read() else: #If there are multiple files and no main -> we just choose one at random file = nextflow_files[0] with open(file, 'r') as f: txt= f.read() #raise BioFlowInsightError("Multiple Nextflow files found at the root with no 'main.nf' file: I don't know which one to select") self.duplicate = duplicate self.display_info = display_info self.processes_2_remove = processes_2_remove self.output_dir = Path(output_dir) self.nextflow_files = [] self.workflow_directory = '/'.join(file.split('/')[:-1]) self.name = name self.graph = None self.conditions_2_ignore = [] self.ternary_operation_dico = {} self.map_element_dico = {} OG_file = Nextflow_File(file, workflow = self, first_file = True) self.DSL = OG_file.find_DSL() self.create_empty_results() if(self.display_info): print(f"Workflow is written in {self.DSL}") def create_empty_results(self): os.makedirs(self.output_dir, exist_ok=True) os.makedirs(self.output_dir / 'debug', exist_ok=True) os.makedirs(self.output_dir / 'graphs', exist_ok=True) with open(self.output_dir / "debug" / "operations.nf",'w') as file: pass with open(self.output_dir / "debug" / "calls.nf",'w') as file: pass with open(self.output_dir / "debug" / "operations_in_call.nf",'w') as file: pass def get_root_directory(self): first_file = self.get_first_file() return '/'.join(str(first_file.get_file_address()).split('/')[:-1])+"/" def get_conditions_2_ignore(self): return self.conditions_2_ignore def get_duplicate_status(self): return self.duplicate def get_output_dir(self): return Path(self.output_dir) def get_DSL(self): return self.DSL def get_display_info_bool(self): return self.display_info def set_DSL(self, DSL): self.DSL = DSL def get_first_file(self): for file in self.nextflow_files: if(file.first_file): return file def get_workflow_main(self): return self.get_first_file().main def add_nextflow_file_2_workflow(self, nextflow_file): self.nextflow_files.append(nextflow_file) self.nextflow_files = list(set(self.nextflow_files)) def initialise(self, conditions_2_ignore = []): """Method that initialises the analysis of the worflow Keyword arguments: """ self.conditions_2_ignore = conditions_2_ignore #Right now i'm just gonna do everything in DSL2 #At this point there should only be one nextflow file if(len(self.nextflow_files)==1): self.nextflow_files[0].initialise() else: raise BioFlowInsightError("This souldn't happen. There are multiple Nextflow files composing the workflow before the analysis has even started.") if(self.display_info): citation = """\nTo cite BioFlow-Insight, please use the following publication: George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen-Boulakia, BioFlow-Insight: facilitating reuse of Nextflow workflows with structure reconstruction and visualization, NAR Genomics and Bioinformatics, Volume 6, Issue 3, September 2024, lqae092, https://doi.org/10.1093/nargab/lqae092""" print(citation) if(self.graph==None): self.graph = Graph(self) def iniatilise_tab_processes_2_remove(self): if(self.processes_2_remove==None): tab_processes_2_remove = [] if(self.processes_2_remove!=None): temp = self.processes_2_remove.split(",") for t in temp: tab_processes_2_remove.append(t.strip()) self.processes_2_remove = tab_processes_2_remove def get_structure(self): dico = {} dico['nodes'] = [] dico['edges'] = [] dico['subworkflows'] = {} if(self.get_DSL() == "DSL1"): main = self.get_workflow_main() if(main!=None): return main.get_structure(dico) elif(self.get_DSL() == "DSL2"): main = self.get_workflow_main() if(main!=None): return main.get_structure(dico) else: return dico #return self.get_structure_DSL2(dico=dico, start = True) else: raise Exception(f"The workflow's DSL is '{self.DSL}' -> I don't know what this is!") ################# # GRAPHS ################# def generate_specification_graph(self, render_graphs = True): self.iniatilise_tab_processes_2_remove() self.graph.initialise(processes_2_remove = self.processes_2_remove) self.graph.get_specification_graph(render_graphs = render_graphs) def generate_process_dependency_graph(self, render_graphs = True): self.iniatilise_tab_processes_2_remove() self.graph.initialise(processes_2_remove = self.processes_2_remove) self.graph.render_process_dependency_graph(render_graphs = render_graphs) #TODO -> update this def generate_all_graphs(self, render_graphs = True): self.generate_specification_graph(render_graphs = render_graphs) self.generate_process_dependency_graph(render_graphs = render_graphs) #Method that checks if a given graph sepcification is an isomorphism with the workflows def check_if_json_equal_to_full_structure(self, file): self.iniatilise_tab_processes_2_remove() return self.graph.check_if_json_equal_to_full_structure(file, processes_2_remove = self.processes_2_remove) ########################### # Generate test data ########################### #These are the methods which generate the test data def generate_test_specification_graph(self): dico = self.graph.get_full_dico() with open(self.get_output_dir()/ 'test' /"specification_graph.json", "w") as outfile: json.dump(dico, outfile, indent = 4) def generate_all_executors(self): executors = self.get_workflow_main().get_all_executors_in_workflow() dico= {} for e in executors: dico[str(e)] = e.get_code(get_OG = True) with open(self.get_output_dir()/ 'test' /"all_executors.json", "w") as outfile: json.dump(dico, outfile, indent = 4) def generate_executors_per_subworkflows(self): subs = self.get_subworkflows_called() dico= {} for s in subs: dico[str(s)]= {} executors = s.get_all_executors_in_workflow() for e in executors: dico[str(s)][str(e)] = e.get_code(get_OG = True) with open(self.get_output_dir()/ 'test' /"executors_per_subworkflows.json", "w") as outfile: json.dump(dico, outfile, indent = 4) def generate_all_processes(self): processes = self.get_processes_called() dico= {} for p in processes: dico[str(p)] = p.get_code() with open(self.get_output_dir()/ 'test' /"all_processes.json", "w") as outfile: json.dump(dico, outfile, indent = 4) def generate_all_subworkflows(self): subs = self.get_subworkflows_called() dico= {} for s in subs: dico[str(s)] = s.get_code() with open(self.get_output_dir()/ 'test' /"all_subworkflows.json", "w") as outfile: json.dump(dico, outfile, indent = 4) def generate_all_test_data(self): self.generate_test_specification_graph() self.generate_all_executors() self.generate_all_processes() self.generate_all_subworkflows() self.generate_executors_per_subworkflows() #Returns a dico of number of processes called per each condition #For example : {condition1: 14, condition2: 10, condition:3} #14 process calls depend on condition1 #10 process calls depend on condition2 #3 process calls depend on condition3 def get_most_influential_conditions(self, show_values = True): if(self.get_duplicate_status()): most_influential_conditions = self.get_workflow_main().get_most_influential_conditions() #If show values then we replace the the conditions ids with their values if(show_values): most_influential_conditions_values = {} for condition in most_influential_conditions: try: t = most_influential_conditions_values[condition.get_value()] except: most_influential_conditions_values[condition.get_value()] = 0 most_influential_conditions_values[condition.get_value()] += most_influential_conditions[condition] most_influential_conditions = most_influential_conditions_values #Sort the dico most_influential_conditions = {k: v for k, v in sorted(most_influential_conditions.items(), key=lambda item: item[1], reverse=True)} return most_influential_conditions else: BioFlowInsightError("Need to activate 'duplicate' mode to use this method.") #When there are multiple emits turn them into one and the end of the call eg, instead of fastp_ch2 = fastp.out.fastp_ch2 -> have fastp_ch2 = fastp_ch def convert_to_DSL2(self): if(self.get_DSL()=="DSL2"): print("Workflow is already written in DSL2") else: #This tag is used as an identification to safely manipulate the string tag = str(time.time()) nextflow_file = self.get_first_file() code = nextflow_file.get_code() start_code = r"#!/usr/bin/env nextflow" start_code_pattern = r"\#\!\s*\/usr\/bin\/env\s+nextflow" end_code = "workflow.onComplete" pos_start, pos_end= 0, len(code) if(code.find(end_code)!=-1): pos_end = code.find(end_code) code_to_replace = code[pos_start:pos_end] for match in re.finditer(start_code_pattern, code): pos_start = match.span(0)[1]+1 #if(code.find(start_code)!=-1): # pos_start = code.find(start_code)+len(start_code) body = code[pos_start:pos_end]#.replace('\n', '\n\t') include_section = f"//INCLUDE_SECTION_{tag}" params_section = f"//PARAMS_SECTION_{tag}" function_section = f"//FUNCTION_SECTION_{tag}" process_section = f"//PROCESS_SECTION_{tag}" code = code.replace(code_to_replace, f"""{start_code}\n\n\n{include_section}\n\n\n{params_section}\n\n\n{function_section}\n\n\n{process_section}\n\n\nworkflow{{\n\n{body}\n}}\n\n""") ##I've out this in a comment cause since it's a DSL1 #params_list = [] #for match in re.finditer(r"params.\w+ *\= *[^\n=]([^\n])*", code): # params_list.append(match.group(0)) #for params in params_list: # code = code.replace(params, "") #params_code = "\n".join(params_list) #code = code.replace(params_section, params_code) #Moving Functions functions = [] for f in nextflow_file.functions: function = f.get_code() functions.append(function) for r in functions: code = code.replace(r, "") code = code.replace(function_section, "\n\n".join(functions)) #Moving Processes processes = [] to_replace = [] for p in nextflow_file.get_processes(): new_process, call = p.convert_to_DSL2() processes.append(new_process) to_replace.append((p.get_code(get_OG = True), call)) for r in to_replace: code = code.replace(r[0], r[1]) code = code.replace(process_section, "\n\n".join(processes)) #TODO -> update the operations -> also consider the operations in the params of the calls which need to be updated for o in self.get_workflow_main().get_all_executors_in_workflow(): if(o.get_type()=="Operation"): code = code.replace(o.get_code(get_OG=True), o.convert_to_DSL2()) else: raise Exception(f"Executor of type '{o.get_type()}' was extracted in a DSL1 workflow! This shoudn't happen! The code is '{o.get_code()}'") #Putting || back code = code.replace("$OR$", "||") #TODO -> add the other things necessary to reformat code #Somethimes this is incorrect but that's due to the fact that the DSL1 analysis isn't as clean as the DSL2 analyse (concerning the conditions) #What i mean that when searching for channels, DSL1 doesn't consider the conditions when searching from the processes while DSL2 does #The conversion works well but it's just comparing to the old DSL1 workflow doesn't make sense #If you want to put this line back you need #TODO update the DSL1 parsing to consider the blocks when defining the processes #A good example is KevinMenden/hybrid-assembly self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=False, def_check_the_same = False) return code #This methods generates a random set of processes to consider as relavant #It's not a uniform random it's a bit of a gaussian, centered at 0.5 def generate_random_relevant_processes(self, alpha = -1): edges_create_cycles = self.graph.get_edges_that_create_cycle() import random #Random value between 0 and 1, centered at 0.5 def get_value(): #check = True #val = -1 #while(check): # check = False # val = random.gauss(0.5, 0.1) # if(val<0 or val>1): # check = True val = random.random() return val if(self.duplicate): processes_called = [] if(self.get_DSL()=="DSL2"): for c in self.get_workflow_main().get_all_calls_in_workflow(): p = c.get_first_element_called() if(p.get_type()=="Process"): processes_called.append(p) else: processes_called = self.get_first_file().get_processes() searching = True while(searching): searching = False if(alpha == -1): alpha = get_value() else: if(0<=alpha and alpha<=1): None else: raise BioFlowInsightError("alpha is not in the interval [0; 1]") nb_2_select = int(alpha*len(processes_called)) sampled = random.sample(set(processes_called), nb_2_select) sampled_str = [] for s in sampled: sampled_str.append(str(s)) for e in edges_create_cycles: if(e[0] in sampled_str and e[1] in sampled_str): #So that means there are the 2 nodes which form the cycle edge in the relevant processes #-> it means we need to regenerated relevant processes searching = True break name_select = [] for p in sampled: name_select.append(p.get_alias()) return name_select else: raise BioFlowInsightError("Trying to generate random relevant processes however option 'duplicate' is not activated.") #Method that returns the order of execution for each executor def get_order_execution_executors(self): dico = {} seen = {} dico = self.get_workflow_main().get_order_execution_executors(dico, seen) tab = [] def explore_dico(dico): if(type(dico)!=dict): None else: for val in dico: tab.append(val) explore_dico(dico[val]) explore_dico(dico) return tab def add_to_ternary_operation_dico(self, old, new): self.ternary_operation_dico[new] = old def add_map_element(self, old, new): self.map_element_dico[new] = old def put_back_old_ternary_operations(self, code, ternary_operation_dico): for new in ternary_operation_dico: old = ternary_operation_dico[new] code = code.replace(new.strip(), old) return code def put_modified_operations_back(self, code, dico_operations): searching = True while(searching): searching = False for match in re.finditer(r"\.(\w+)_modified\s*\{\s*(¤[^¤]+¤)\s*\}", code): operator = match.group(1) inside = match.group(2)#Cause we want to remove the extras ...''' code = code.replace(match.group(0), f".{operator} {{ {dico_operations[inside]} }}") searching = True break return code #TODO -> write tests for this method #Function that rewrites the workflow code #Rewriting everything in one file + simplifying the operations and calls to simplify the analysis def simplify_workflow_code(self): code = self.get_first_file().get_code() #This tag is used as an identification to safely manipulate the string tag = str(time.time()) #params_section = f"//PARAMS_SECTION_{tag}" function_section = f"//FUNCTION_SECTION" process_section = f"//PROCESS_SECTION" subworkflow_section = f"//SUBWORKFLOW_SECTION" ankers = function_section+ "\n"*3 + process_section+ "\n"*3 + subworkflow_section #Place ankers pos_start = 0 start_code_pattern = r"\#\!\s*\/usr\/bin\/env\s+nextflow" for match in re.finditer(start_code_pattern, code): pos_start = match.span(0)[1]+1 code = code[:pos_start]+ankers+code[pos_start:] #Remove the includes for match in re.finditer(constant.FULL_INLCUDE_2, code): code = re.sub(fr"{re.escape(match.group(0))}.*", "", code) processes, subworkflows, functions = [], [], [] for c in self.get_workflow_main().get_all_calls_in_workflow(): ele = c.get_first_element_called() if(ele.get_type()=="Process"): processes.append(ele) elif(ele.get_type()=="Subworkflow"): subworkflows.append(ele) elif(ele.get_type()=="Function"): functions.append(ele) else: raise Exception("This shoudn't happen") #Get calls to functions made outside of themain which might have been imported -> so we need to add them for c in self.get_first_file().get_calls_made_outside_of_main(): ele = c.get_first_element_called() if(ele.get_type()=="Function"): functions.append(ele) else: raise Exception("This shoudn't happen -> either a call to a process or subworkflow outside of main or subworkflow") #Simplifying main code = code.replace(self.get_workflow_main().get_code(get_OG = True), self.get_workflow_main().simplify_code()) #Adding processes into code for p in processes: if(p.get_code_with_alias_and_id() not in code): code = code.replace(process_section, '\n'+p.simplify_code()+'\n'+process_section) #Adding subworkflows into code for sub in subworkflows: if(sub.get_code_with_alias_and_id() not in code): code = code.replace(subworkflow_section, subworkflow_section+'\n'+sub.simplify_code()+'\n') #Adding functions into code for fun in functions: if(fun.get_code() not in code): code = code.replace(function_section, function_section+'\n'+fun.get_code()+'\n') #Remove the ankers #code = code.replace(function_section, "") #code = code.replace(process_section, "") #code = code.replace(subworkflow_section, "") ankers = {"function_section":function_section, "process_section":process_section, "subworkflow_section":subworkflow_section} return code def get_subworkflows_called(self): subs = [] for c in self.get_workflow_main().get_all_calls_in_workflow(): ele = c.get_first_element_called() if(ele.get_type()=="Subworkflow"): subs.append(ele) return subs def get_processes_called(self): subs = [] for c in self.get_workflow_main().get_all_calls_in_workflow(): ele = c.get_first_element_called() if(ele.get_type()=="Process"): subs.append(ele) return subs def rewrite_and_initialise(self, code, processes_2_remove, render_graphs, def_check_the_same = True): temp_process_dependency_graph = self.graph.get_process_dependency_graph() temp_spec_graph = self.graph.full_dico #Remove the "_GG_\d+" #code = re.sub(r"_GG_\d+", "", code) #Write new code in temporary file temp_file = self.get_output_dir()/f"temp_{str(self)[-7:-2]}.nf" with open(temp_file, "w") as file: file.write(code) f = open(self.get_output_dir()/ "debug" / "rewritten.nf", "w") f.write(code) f.close() #Replace old analysis with new analysis (simplified code) self.__init__(str(temp_file), display_info = False, duplicate=True, processes_2_remove=processes_2_remove) self.initialise() os.remove(temp_file) self.graph.initialise(processes_2_remove = self.processes_2_remove) if(def_check_the_same and not self.graph.check_if_process_dependendy_is_equivalent_to_other_without_subworkflows(temp_process_dependency_graph)): if(render_graphs==True): #generate_graph(self.get_output_dir()/ "debug" /"spec_graph_OG", temp_spec_graph, render_graphs = True) generate_graph(self.get_output_dir()/ "debug" /"spec_graph", self.graph.full_dico, render_graphs = True) #generate_graph(self.get_output_dir()/ "debug" /"process_dependency_graph_OG", temp_process_dependency_graph, render_graphs = True) generate_graph(self.get_output_dir()/ "debug" /"process_dependency_graph", self.graph.get_process_dependency_graph() , render_graphs = True) raise Exception("Something went wrong: The flat dependency graph is not the same!") def check_relevant_processes_in_workflow(self, relevant_processes): #Check all relevat processes are in wf workflow_processes = {} for c in self.get_workflow_main().get_all_calls_in_workflow(): ele = c.get_first_element_called() if(ele.get_type()=="Process"): short_name = ele.get_alias().split("_GG_")[0] try: temp = workflow_processes[short_name] except: workflow_processes[short_name] = [] workflow_processes[short_name].append(ele.get_alias()) temporary_relevant = [] for p in relevant_processes: if(p not in workflow_processes): raise BioFlowInsightError(f"The element '{p}' given as a relevant processes is not present in the workflow's processes", 24) temporary_relevant+=workflow_processes[p] relevant_processes = temporary_relevant return relevant_processes def generate_user_view(self, relevant_processes = [], render_graphs = True, processes_2_remove = []): self.graph.initialise(processes_2_remove = processes_2_remove) self.graph.generate_user_view(relevant_processes = relevant_processes, render_graphs = render_graphs) #I do not recommand that the dev uses the same name for the channels inside and outside the channels #Since the local channels become local at the upper level def rewrite_subworkflow_call(self, code, subworklfow): #Remove the defintion from the code code = code.replace(subworklfow.get_code(get_OG = True), "") OG_call = subworklfow.get_call() OG_body = subworklfow.get_work() #REPLACE HEADER TAKES subworkflow_takes = subworklfow.get_takes() parameters = OG_call.get_parameters() if(len(subworkflow_takes)!=len(parameters)): raise Exception("This shouldn't happen -> the same number of parameters should be kept") #This is to replace the paramters and the takes new_header = "" for i in range(len(parameters)): param = parameters[i] takes = subworkflow_takes[i].get_gives()[0] #Here we're checking that the input inside and outside the subworkflow are the same if(takes.get_code()!=param.get_code(get_OG = True)): new_header+=f"{takes.get_code()} = {param.get_code(get_OG = True)}\n" temp_code = code code = code.replace(OG_call.get_code(get_OG = True), f"{new_header}\n\n{OG_body}", 1) if(temp_code==code): raise Exception("Something went wrong: The code hasn't changed") #REPLACE THE EMITS #TODO admittedly this code below is very moche -> but it's functionnal -> update it emits = subworklfow.get_emit() to_replace = [] all_executors = self.get_workflow_main().get_all_executors_in_workflow() for exe in all_executors: #We don't need to check the case call since the workflow has already been rewriteen -> emits only appear in operations if(exe.get_type()=="Operation"): for emited in exe.get_origins(): if(emited.get_type()=="Emitted"): if(emited.get_emitted_by().get_first_element_called()==subworklfow): if(emited.get_emits() not in emits): raise Exception("This shoudn't happen -> since it is the actual subworkflow") new = exe.get_code(get_OG = True).replace(emited.get_code(), emited.get_emits().get_origins()[0].get_code()) #to_replace.append((exe.get_code(get_OG = True), f"{exe.get_gives()[0].get_code()} = {emited.get_emits().get_origins()[0].get_code()}")) to_replace.append((exe.get_code(get_OG = True), new)) #This dictionnary is used to check if the replacement has already been done (in the case of dupliactes in new) dico_replace = {} for r in to_replace: old, new = r need_to_replace = True try: t = dico_replace[old] if(t==new): need_to_replace = False else: raise Exception("This shouldn't happen") except: dico_replace[old]= new if(need_to_replace): temp_code = code #Case of channel = channel if(new.find("=")!=-1): if(new.split("=")[0].strip()==new.split("=")[1].strip()): new = '' #code = code.replace(old, new) code = replace_group1(code, fr"({re.escape(old)})[^\w]", new) if(temp_code==code): raise Exception("Something went wrong: The code hasn't changed") return code #This Function returns the channels on which the subworkflow (things_added_in_cluster) depend on def get_takes(self, things_added_in_cluster): #Basiccaly this is a deco of channels to opeartions -> when the value is an empty list #This means that the channel is totally definied in the subworkflow -> so we are searching for #Channels which aren't totatly defined in the subworkflow channels_2_sources = {} for ele in things_added_in_cluster: if(ele.get_type() == "Operation"): for o in ele.get_origins(): if(o.get_type() in ["Channel", "Emitted"]): channels_2_sources[o] = replace_thing_by_call(o.get_source()) else: if(o.get_first_element_called().get_type()=="Function"): None else: raise Exception("This shouldn't happen") elif(ele.get_type() == "Call"): for param in ele.get_parameters(): if(param.get_type()=="Channel"): raise Exception("This shouldn't happen -> with the rewrite all the params should be channels") else: for o in param.get_origins(): if(o.get_type()=="Channel"): channels_2_sources[o] = replace_thing_by_call(o.get_source()) else: raise Exception("This shouldn't happen -> with the rewrite all the params should be channels") else: raise Exception("This shouldn't happen") takes = [] names_added = [] for channel in channels_2_sources: if(set(channels_2_sources[channel]).intersection(things_added_in_cluster)!=set(channels_2_sources[channel])): if(channel.get_name() not in names_added): takes.append(channel) names_added.append(channel.get_name()) return takes #This Function returns the channels the subworkflow (things_added_in_cluster) emits (other things depend on) def get_emits(self, things_added_in_cluster): channel_2_sink = {} #Basiccaly this is a deco of channels to opea -> this doesn't really work yetrtions -> when the value is an empty list #This means that the channel is totally definied in the subworkflow -> so we are searching for #Channels which aren't totatly defined in the subworkflow #This means that things outside the subworkflow depend on this channel channel_2_sink = {} for ele in things_added_in_cluster: if(ele.get_type() == "Operation"): for o in ele.get_gives(): channel_2_sink[o] = replace_thing_by_call(o.get_sink()) elif(ele.get_type() == "Call"): #thing = ele.get_first_element_called() for e in ele.get_later_emits(): channel_2_sink[e] = replace_thing_by_call(e.get_sink()) else: raise Exception("This shouldn't happen") emits = [] names_added = [] for channel in channel_2_sink: if(set(channel_2_sink[channel]).intersection(things_added_in_cluster)!=set(channel_2_sink[channel])): if(channel.get_name() not in names_added): emits.append(channel) names_added.append(channel.get_name()) return emits def remove_GG_from_code(self, code): def replacer(match): return match.group(1) return re.sub(f"(\w+)_GG_\d+", replacer, code) #Method which rewrites the workflow follwong the user view #Conert workflow to user_view only makes sense when the option duplicate is activated -> otherwise is doesn't make sense + it makes the analysis way more complicated def convert_workflow_2_user_view(self, relevant_processes = [], render_graphs = True): self.iniatilise_tab_processes_2_remove() self.graph.initialise(processes_2_remove = self.processes_2_remove) def get_object(address): address = int(re.findall(r"\dx\w+", address)[0], base=16) return ctypes.cast(address, ctypes.py_object).value #Check that there are no cycles which will break the creation of the user view: edges_create_cycles = self.graph.get_edges_that_create_cycle() edges_create_cycles_objects = [] for e in edges_create_cycles: edges_create_cycles_objects.append((get_object(e[0]), get_object(e[1]))) for e in edges_create_cycles_objects: n1 = e[0].get_alias() n2 = e[1].get_alias() if(n1 in relevant_processes and n2 in relevant_processes): raise BioFlowInsightError(f"The processes '{n1}' and '{n2}' cannot both be relevant processes since there is a dependency apparant in the workflow between the 2") ternary_operation_dico = self.ternary_operation_dico map_element_dico = self.map_element_dico if(self.get_DSL()=="DSL1"): code = self.convert_to_DSL2() self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=render_graphs, def_check_the_same = False) if(self.get_DSL()=="DSL2"): code = self.simplify_workflow_code() self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=render_graphs) if(self.duplicate): #DETERMING WHICH SUBWORKFLOWS ARE BROKEN WITH THE CLUSTER def get_clusters_with_calls(clusters): #Creating the clusters with calls instead of processes or subworkflows set_clusters_with_calls = [] for c in clusters: tab = [] for ele in c: if(ele.get_type()=="Operation"): if(ele.get_artificial_status()==False): tab.append(ele) else: call = ele.get_call() tab.append(call) set_clusters_with_calls.append(set(tab)) return set_clusters_with_calls #Getting subworkflows to executors def get_subworkflow_2_executors(): subworkflow_2_executors = {} for sub in self.get_subworkflows_called(): executors = sub.get_all_executors_in_workflow() subworkflow_2_executors[sub] = [] for ele in executors: #Cannot add calls to subworkflows -> they are already present by definition if(ele.get_type()=="Call" and ele.get_first_element_called().get_type()=="Subworkflow"): None #We don't add it else: subworkflow_2_executors[sub].append(ele) return subworkflow_2_executors #subworkflow_2_executors[sub.get_name()] = set(list(dico.keys())) #TODO -> write tests to test this function def get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls): broken_subworkflows = [] for sub in subworkflow_2_executors: #You want this list (set) to be equal to subworkflow_2_executors[sub] elements_in_sub_with_clusters = [] for cluster in set_clusters_with_calls: if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])): break for c in cluster: if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])): break if(c in subworkflow_2_executors[sub]): elements_in_sub_with_clusters+=list(cluster) break if(set(elements_in_sub_with_clusters)==set(subworkflow_2_executors[sub])): None #This means that the subworkflow is Intact else: #This means that the subworkflow is broken broken_subworkflows.append(sub) return broken_subworkflows #Get the clusters and the code relevant_processes = self.check_relevant_processes_in_workflow(relevant_processes) self.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = [], render_graphs=render_graphs) clusters = self.graph.get_clusters_from_user_view() broken_subworkflows = get_workflows_broken(get_subworkflow_2_executors(), get_clusters_with_calls(clusters)) #While there still are broken workflows -> need to redo the analysis while(len(broken_subworkflows)>0): #Rewrite broken subworkflows sub = broken_subworkflows[0] code = self.rewrite_subworkflow_call(code, sub) self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=render_graphs) #Get the clusters and the code #TODO -> remove the generate all_graphs -> it is not necessary if(render_graphs): self.generate_all_graphs() self.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = [], render_graphs=render_graphs) clusters = self.graph.get_clusters_from_user_view() broken_subworkflows = get_workflows_broken(get_subworkflow_2_executors(), get_clusters_with_calls(clusters)) #Get the clsuters with the corresponding operations inside #for i in range(len(clusters)): # c = clusters[i] # if(len(c)>1): # clusters[i] = self.nextflow_file.graph.get_induced_subgraph(c) #Get the topological order executors_in_order = self.get_order_execution_executors() new_clusters = [] for cluster in clusters: tab = [] for e in executors_in_order: if(e in cluster): tab.append(e) new_clusters.append(tab) clusters = new_clusters #This function returns the last executor in the clusters #This is used to place the anker def get_last_executor_in_cluster(executors_in_order, clusters): dico = {} for cluster in clusters: for ele in cluster: dico[ele] = executors_in_order.index(ele) for ele in {k: v for k, v in sorted(dico.items(), key=lambda item: item[1], reverse=True)}: return ele #Replace the last executor in the clusters by the cluster anker last_executor_in_cluster = get_last_executor_in_cluster(executors_in_order, clusters) if(last_executor_in_cluster.get_type()=="Process"): call = last_executor_in_cluster.get_call() code = code.replace(call.get_code(get_OG = True), "\n//Anker_clusters\n") elif(last_executor_in_cluster.get_type()=="Operation"): if(not last_executor_in_cluster.get_artificial_status()): code = code.replace(last_executor_in_cluster.get_code(get_OG = True), "\n//Anker_clusters\n", 1) else: raise Exception("This shoudn't happen") else: raise Exception("This shoudn't happen") #Removing elements from clusters from the code for cluster in clusters: for ele in cluster: if(ele.get_type()=="Process"): call = ele.get_call() code = code.replace(call.get_code(get_OG = True), "") elif(ele.get_type()=="Operation"): if(not ele.get_artificial_status()): code = code.replace(ele.get_code(get_OG = True), "", 1) else: raise Exception("This shoudn't happen") else: raise Exception("This shoudn't happen") #Remove the empty conditions left in the code code = remove_empty_conditions_place_anker(code) #Add the subworkflow defintions #------------------------------------- #Adding the anker subworkflow_section = f"//ANKER 4 SUBWORKFLOW DEF" to_replace = "" for match in re.finditer(r"workflow\s*\w*\s*\{", code): to_replace = match.group(0) break if(to_replace==""): raise Exception("No call to a workflow") code = code.replace(to_replace, f"{subworkflow_section}\n\n{to_replace}") #Creating the subworkflows from clusters calls_in_operations = [] non_relevant_name = 1 subworkflow_clusters_to_add, subworkflow_cluster_calls_to_add = [], [] index_cluster = len(clusters) #We replace the last clusters first -> this is cause the outputs of the last clusters aren't used anywhere else in the workflow by definition for elements in list(reversed(clusters)): channels_to_replace_outside_of_cluster = [] #Check that there is at least one process in cluster at_least_one_process = False for e in elements: if(e.get_type()=="Process"): at_least_one_process = True #Only create the subworkflows for clusters with onr more elements (and that element in a process) processes_added = [] things_added_in_cluster = [] if(len(elements)>=1 and at_least_one_process): name, body, take, emit = "", "", "", "" first_element = True for ele in elements: if(ele.get_type()=="Process"): #Determine the name of the created subworkflow cluster if(ele.get_alias() in relevant_processes): name = f"cluster_{ele.get_alias()}" #Get the call of thing (either process or subworkflow) call = ele.get_call() processes_added.append(call.get_first_element_called()) values = [] for condition in call.get_all_conditions(): values.append(condition.get_value()) printed_condition = " && ".join(values) if(printed_condition!=""): body+=f"if({printed_condition}) {{\n{call.get_code()}\n}}\n" else: body+=f"\n{call.get_code()}\n" things_added_in_cluster.append(call) #Below elif(ele.get_type()=="Operation"): #TODO -> check this verification there might be some "effet de bord" if(not ele.get_artificial_status()): #Ignore these cases #TODO -> you should be able to remove this if(ele.get_code()[:4] not in ["emit", "take"]): origins = ele.get_origins() for o in origins: if(o.get_type()=="Call"): if(o.get_first_element_called().get_type()!="Function"): calls_in_operations.append(o) values = [] for condition in ele.get_all_conditions(): values.append(condition.get_value()) printed_condition = " && ".join(values) if(printed_condition!=""): body+=f"if({printed_condition}) {{\n{ele.get_code(get_OG = True)}\n}}\n" else: body+=f"\n{ele.get_code(get_OG = True)}\n" things_added_in_cluster.append(ele) else: raise Exception("This shoudn't happen") #TODO check this part of the code is never seen #Here we removing the Call_12313 thing for call in calls_in_operations: raise Exception("This shouldn't happen since the workflows has been rewritten") body = body.replace(call.get_code(), "") body = body.replace(str(call), call.get_code()) #If the name=="" -> it means there isn't any relevant processes in the cluster -> it means it's a cluster of non relevant nodes if(name==""): #If there are processes called we are going to use them if(len(processes_added)>0): #TODO find a better naming system name = f"non_relevant_cluster_{processes_added[0].get_alias()}" else: #TODO find a better naming system name = f"non_relevant_cluster_{non_relevant_name}" non_relevant_name+=1 #Check that there is a single condtion in the body body = group_together_ifs(body) conditions_in_subworkflow = [] temp_body = body conditions_in_subworkflow_2 = extract_conditions(temp_body, only_get_inside = False) if(len(conditions_in_subworkflow_2)==1): for condition in conditions_in_subworkflow_2: start, end = conditions_in_subworkflow_2[condition] temp_body = temp_body.replace(temp_body[start: end], "") #This means that there is only one codnition with all the executors in the condition if(temp_body.strip()==""): conditions_in_subworkflow = extract_conditions(body) for condition in conditions_in_subworkflow: start, end = conditions_in_subworkflow[condition] body = body[start: end-1].strip() conditions_in_subworkflow = list(conditions_in_subworkflow.keys()) #TAKE #Adding take parameters on the inside of the subworkflow takes_param = self.get_takes(things_added_in_cluster) new_param_names, index, old_param_names = [], 1, [] for param in takes_param: param_name = f"param_{name}_{index}" #Here if the input is a channel -> we keep the same name for readibility #It also solves a bug described on the 18/02/2025 if(param.get_type()!='Channel'): new_param_names.append(param_name) old_param_names.append(param.get_code()) else: new_param_names.append(param.get_code()) old_param_names.append(param.get_code()) index += 1 if(len(new_param_names)>0): temp = '\n'.join(new_param_names) take = f"\ntake:\n{temp}\n" #EMIT #Adding the emitted outputs emitted_outputs = self.get_emits(things_added_in_cluster) new_output_names, index, old_output_names = [], 0, [] for output in emitted_outputs: output_name = f"{name}.out[{index}]" new_output_names.append(output_name) old_output_names.append(output.get_code()) index += 1 if(len(old_output_names)>0): temp = '\n'.join(old_output_names) emit = f"\nemit:\n{temp}\n" #Adding empty channels if it doesn't exist in the case of a negative condition body = get_channels_to_add_in_false_conditions(body, old_output_names) #Replace names inside subworkflow subworkflow_code = f"workflow {name} {{\n{take}\nmain:\n{body}\n{emit}\n}}" #We do this so that the longest thing are rewritten first in the code -> to avoid problems takes_param_2_length = {} takes_param_2_new_param_names = {} for i in range(len(new_param_names)): takes_param_2_new_param_names[takes_param[i].get_code()] = new_param_names[i] takes_param_2_length[takes_param[i].get_code()] = len(takes_param[i].get_code()) sorted_takes_param_2_length = {k: v for k, v in sorted(takes_param_2_length.items(), key=lambda item: item[1], reverse=True)} for take_param in sorted_takes_param_2_length: new_param = takes_param_2_new_param_names[take_param] if(take_param != new_param): #pattern = fr"[\=\,\(] *({re.escape(takes_param[i].get_code())})[\s\,\)\.]" pattern = fr"({re.escape(take_param)})[\s\,\)\.]" temp = subworkflow_code subworkflow_code = replace_group1(subworkflow_code, pattern, new_param) if(temp==subworkflow_code): print(take_param, new_param) print(pattern) print(f'"{subworkflow_code}"') raise Exception("Something went wrong -> cause the paramter wasn't updated") #subworkflow_code = subworkflow_code.replace(takes_param[i].get_code(), new_param_names[i]) #TODO -> added verification of conditions params = ", ".join(old_param_names) subworkfow_call_case_true = f"{name}({params})" subworkfow_call_case_false = "" for i in range(len(new_output_names)): #In the case of channels, we just add chanel = subworkflow.out[i] if(not bool(re.findall("\.\s*out", old_output_names[i]))): subworkfow_call_case_true+=f"\n{old_output_names[i]} = {new_output_names[i]}" subworkfow_call_case_false+=f"\n{old_output_names[i]} = Channel.empty()" #In the case of emitted values we need to replace the code on the outside else: param_out_name= f"{name}_out_{i+1}" subworkfow_call_case_true+=f"\n{param_out_name} = {new_output_names[i]}" subworkfow_call_case_false+=f"\n{param_out_name} = Channel.empty()" channels_to_replace_outside_of_cluster.append((old_output_names[i], param_out_name)) #If there was only one single condition in the subworkflow cluster -> then we add it when the call is done if(len(conditions_in_subworkflow)==1): subworkfow_call = f"if({conditions_in_subworkflow[0].split('$$__$$')[0]}) {{\n{subworkfow_call_case_true}\n}} else {{\n{subworkfow_call_case_false}\n}}" else: subworkfow_call = subworkfow_call_case_true ##TODO -> added verification of conditions #params = ", ".join(old_param_names) #subworkfow_call_case_true = f"{name}({params})" #for i in range(len(new_output_names)): # #In the case of channels, we just add chanel = subworkflow.out[i] # if(not bool(re.findall("\.\s*out", old_output_names[i]))): # subworkfow_call_case_true+=f"\n{old_output_names[i]} = {new_output_names[i]}" # #In the case of emitted values we need to replace the code on the outside # else: # param_out_name= f"{name}_out_{i+1}" # subworkfow_call_case_true+=f"\n{param_out_name} = {new_output_names[i]}" # channels_to_replace_outside_of_cluster.append((old_output_names[i], param_out_name)) ##If there was only one single condition in the subworkflow cluster -> then we add it when the call is done #if(len(conditions_in_subworkflow)==1): # subworkfow_call = f"if({conditions_in_subworkflow[0].split('$$__$$')[0]}) {{\n{subworkfow_call_case_true}\n}}" #else: # subworkfow_call = subworkfow_call_case_true #subworkflow_clusters_to_add.append(subworkflow_code) #subworkflow_cluster_calls_to_add.append(subworkfow_call) #Add the subworkflow call new_code = f"//Anker_clusters\n\n//Cluster_{index_cluster}\n{subworkfow_call}\n" code = code.replace("//Anker_clusters", new_code) for old, new in channels_to_replace_outside_of_cluster: pattern= fr"[ \(,]({re.escape(old)})[^\w]" code = replace_group1(code, pattern, new) #code = code.replace(old, new) #Add the subworkflow defintions #------------------------------------- code = code.replace(f'{subworkflow_section}', f"{subworkflow_code}\n\n{subworkflow_section}") else: body = "" for ele in elements: values = [] for condition in ele.get_all_conditions(): values.append(condition.get_value()) printed_condition = " && ".join(values) if(printed_condition!=""): body+=f"if({printed_condition}) {{\n{ele.get_code(get_OG = True)}\n}}\n" else: body+=f"\n{ele.get_code(get_OG = True)}\n" new_code = f"//Anker_clusters\n\n//Cluster_{index_cluster}\n{body}\n" code = code.replace("//Anker_clusters", new_code) index_cluster-=1 #Putting || back code = self.put_back_old_ternary_operations(code, ternary_operation_dico) code = code.replace("$OR$", "||") code = self.put_modified_operations_back(code, map_element_dico) code = remove_extra_jumps(format_with_tabs(code)) #code = self.remove_GG_from_code(code) f = open(self.get_output_dir()/ "debug" / "rewritten.nf", "w") f.write(code) f.close() self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=render_graphs) return code #return code # ##So basically when retriving a thing (process or subworkflow) ##There is necessarily one call associated with the thing -> since we have the option duplicate activated ##You can check if len(thing.call)==1 -> and if not raise an error (but this shouldn't happen) else: raise BioFlowInsightError("Trying to convert the workflow with user view however option 'duplicate' is not activated. -> to reformulated")