#Import dependencies #Local from .nextflow_file import Nextflow_File from .ro_crate import RO_Crate from . import constant from .outils import is_git_directory, format_with_tabs, replace_thing_by_call, replace_group1, group_together_ifs, extract_curly, remove_extra_jumps, get_channels_to_add_in_false_conditions from .outils_graph import flatten_dico, initia_link_dico_rec, get_number_cycles from .outils_annotate import get_tools_commands_from_user_for_process from .bioflowinsighterror import BioFlowInsightError from .graph import Graph #Outside packages import os import re import json from pathlib import Path import glob import ctypes import time class Workflow: """ This is the main workflow class, from this class, workflow analysis can be done. After analysis, workflow structure reconstruction can be done. Attributes: file: A string indicating the address to the workflow main or the directory containing the workflow duplicate: A boolean indicating if processes are to be duplicated in the structure display_info: A boolean indicating if the analysis information should be printed output_dir: A string indicating where the results will be saved name: A string indicating the name of the workflow processes_2_remove: A string indicating the processes to remove from the workflow """ def __init__(self, file, duplicate=False, display_info=True, output_dir = './results', name = None, processes_2_remove = None): #Getting the main nextflow file if(not os.path.isfile(file)): nextflow_files = glob.glob(f'{file}/*.nf') if(len(nextflow_files)==0): raise BioFlowInsightError("No Nextflow files ('.nf') are in the directory!", num = -1) txt = "" #Try to read the main.nf file -> if this cannot be found then the first nextflow file is used try: file = file+"/main.nf" with open(file, 'r') as f: txt= f.read() except: None #raise BioFlowInsightError("No 'main.nf' file found at the root of the prohject") if(txt==""): if(len(nextflow_files)==1): file = nextflow_files[0] with open(file, 'r') as f: txt= f.read() else: raise BioFlowInsightError("Multiple Nextflow files found at the root with no 'main.nf' file: I don't know which one to select") self.duplicate = duplicate self.display_info = display_info self.processes_2_remove = processes_2_remove self.output_dir = Path(output_dir) self.nextflow_files = [] self.workflow_directory = '/'.join(file.split('/')[:-1]) self.name = name self.graph = None self.conditions_2_ignore = [] OG_file = Nextflow_File(file, workflow = self, first_file = True) self.DSL = OG_file.find_DSL() self.create_empty_results() if(self.display_info): print(f"Workflow is written in {self.DSL}") def create_empty_results(self): os.makedirs(self.output_dir, exist_ok=True) os.makedirs(self.output_dir / 'debug', exist_ok=True) os.makedirs(self.output_dir / 'graphs', exist_ok=True) with open(self.output_dir / "debug" / "operations.nf",'w') as file: pass with open(self.output_dir / "debug" / "calls.nf",'w') as file: pass with open(self.output_dir / "debug" / "operations_in_call.nf",'w') as file: pass def get_root_directory(self): first_file = self.get_first_file() return '/'.join(str(first_file.get_file_address()).split('/')[:-1])+"/" def get_conditions_2_ignore(self): return self.conditions_2_ignore def get_duplicate_status(self): return self.duplicate def get_output_dir(self): return Path(self.output_dir) def get_DSL(self): return self.DSL def get_display_info_bool(self): return self.display_info def set_DSL(self, DSL): self.DSL = DSL def get_first_file(self): for file in self.nextflow_files: if(file.first_file): return file def get_workflow_main(self): return self.get_first_file().main def add_nextflow_file_2_workflow(self, nextflow_file): self.nextflow_files.append(nextflow_file) self.nextflow_files = list(set(self.nextflow_files)) def initialise(self, conditions_2_ignore = []): """Method that initialises the analysis of the worflow Keyword arguments: """ self.conditions_2_ignore = conditions_2_ignore #Right now i'm just gonna do everything in DSL2 #At this point there should only be one nextflow file if(len(self.nextflow_files)==1): self.nextflow_files[0].initialise() else: raise BioFlowInsightError("This souldn't happen. There are multiple Nextflow files composing the workflow before the analysis has even started.") if(self.display_info): citation = """\nTo cite BioFlow-Insight, please use the following publication: George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen-Boulakia, BioFlow-Insight: facilitating reuse of Nextflow workflows with structure reconstruction and visualization, NAR Genomics and Bioinformatics, Volume 6, Issue 3, September 2024, lqae092, https://doi.org/10.1093/nargab/lqae092""" print(citation) if(self.graph==None): self.graph = Graph(self) def iniatilise_tab_processes_2_remove(self): if(self.processes_2_remove==None): tab_processes_2_remove = [] if(self.processes_2_remove!=None): temp = self.processes_2_remove.split(",") for t in temp: tab_processes_2_remove.append(t.strip()) self.processes_2_remove = tab_processes_2_remove def get_structure(self): dico = {} dico['nodes'] = [] dico['edges'] = [] dico['subworkflows'] = {} if(self.get_DSL() == "DSL1"): main = self.get_workflow_main() if(main!=None): return main.get_structure(dico) elif(self.get_DSL() == "DSL2"): main = self.get_workflow_main() if(main!=None): return main.get_structure(dico) else: return dico #return self.get_structure_DSL2(dico=dico, start = True) else: raise Exception(f"The workflow's DSL is '{self.DSL}' -> I don't know what this is!") ################# # GRAPHS ################# def generate_specification_graph(self, render_graphs = True): self.iniatilise_tab_processes_2_remove() self.graph.initialise(processes_2_remove = self.processes_2_remove) self.graph.get_specification_graph(render_graphs = render_graphs) #TODO -> update this def generate_all_graphs(self, render_graphs = True): self.generate_specification_graph(render_graphs = render_graphs) #Method that checks if a given graph sepcification is an isomorphism with the workflows def check_if_equal(self, file): self.iniatilise_tab_processes_2_remove() return self.graph.check_if_equal(file, processes_2_remove = self.processes_2_remove) ########################### # Generate test data ########################### #These are the methods which generate the test data def generate_test_specification_graph(self): dico = self.graph.get_full_dico() with open(self.get_output_dir()/ 'test' /"specification_graph.json", "w") as outfile: json.dump(dico, outfile, indent = 4) def generate_all_executors(self): executors = self.get_workflow_main().get_all_executors_in_workflow() dico= {} for e in executors: dico[str(e)] = e.get_code(get_OG = True) with open(self.get_output_dir()/ 'test' /"all_executors.json", "w") as outfile: json.dump(dico, outfile, indent = 4) def generate_executors_per_subworkflows(self): subs = self.get_subworkflows_called() dico= {} for s in subs: dico[str(s)]= {} executors = s.get_all_executors_in_workflow() for e in executors: dico[str(s)][str(e)] = e.get_code(get_OG = True) with open(self.get_output_dir()/ 'test' /"executors_per_subworkflows.json", "w") as outfile: json.dump(dico, outfile, indent = 4) def generate_all_processes(self): processes = self.get_processes_called() dico= {} for p in processes: dico[str(p)] = p.get_code() with open(self.get_output_dir()/ 'test' /"all_processes.json", "w") as outfile: json.dump(dico, outfile, indent = 4) def generate_all_subworkflows(self): subs = self.get_subworkflows_called() dico= {} for s in subs: dico[str(s)] = s.get_code() with open(self.get_output_dir()/ 'test' /"all_subworkflows.json", "w") as outfile: json.dump(dico, outfile, indent = 4) def generate_all_test_data(self): self.generate_test_specification_graph() self.generate_all_executors() self.generate_all_processes() self.generate_all_subworkflows() self.generate_executors_per_subworkflows() #Returns a dico of number of processes called per each condition #For example : {condition1: 14, condition2: 10, condition:3} #14 process calls depend on condition1 #10 process calls depend on condition2 #3 process calls depend on condition3 def get_most_influential_conditions(self, show_values = True): if(self.get_duplicate_status()): most_influential_conditions = self.get_workflow_main().get_most_influential_conditions() #If show values then we replace the the conditions ids with their values if(show_values): most_influential_conditions_values = {} for condition in most_influential_conditions: try: t = most_influential_conditions_values[condition.get_value()] except: most_influential_conditions_values[condition.get_value()] = 0 most_influential_conditions_values[condition.get_value()] += most_influential_conditions[condition] most_influential_conditions = most_influential_conditions_values #Sort the dico most_influential_conditions = {k: v for k, v in sorted(most_influential_conditions.items(), key=lambda item: item[1], reverse=True)} return most_influential_conditions else: BioFlowInsightError("Need to activate 'duplicate' mode to use this method.") #When there are multiple emits turn them into one and the end of the call eg, instead of fastp_ch2 = fastp.out.fastp_ch2 -> have fastp_ch2 = fastp_ch def convert_to_DSL2(self): if(self.get_DSL()=="DSL2"): print("Workflow is already written in DSL2") else: #This tag is used as an identification to safely manipulate the string tag = str(time.time()) nextflow_file = self.get_first_file() code = nextflow_file.get_code() start_code = r"#!/usr/bin/env nextflow" start_code_pattern = r"\#\!\s*\/usr\/bin\/env\s+nextflow" end_code = "workflow.onComplete" pos_start, pos_end= 0, len(code) if(code.find(end_code)!=-1): pos_end = code.find(end_code) code_to_replace = code[pos_start:pos_end] for match in re.finditer(start_code_pattern, code): pos_start = match.span(0)[1]+1 #if(code.find(start_code)!=-1): # pos_start = code.find(start_code)+len(start_code) body = code[pos_start:pos_end]#.replace('\n', '\n\t') include_section = f"//INCLUDE_SECTION_{tag}" params_section = f"//PARAMS_SECTION_{tag}" function_section = f"//FUNCTION_SECTION_{tag}" process_section = f"//PROCESS_SECTION_{tag}" code = code.replace(code_to_replace, f"""{start_code}\n\n\n{include_section}\n\n\n{params_section}\n\n\n{function_section}\n\n\n{process_section}\n\n\nworkflow{{\n\n{body}\n}}\n\n""") ##I've out this in a comment cause since it's a DSL1 #params_list = [] #for match in re.finditer(r"params.\w+ *\= *[^\n=]([^\n])*", code): # params_list.append(match.group(0)) #for params in params_list: # code = code.replace(params, "") #params_code = "\n".join(params_list) #code = code.replace(params_section, params_code) #Moving Functions functions = [] for f in nextflow_file.functions: function = f.get_code() functions.append(function) for r in functions: code = code.replace(r, "") code = code.replace(function_section, "\n\n".join(functions)) #Moving Processes processes = [] to_replace = [] for p in nextflow_file.get_processes(): new_process, call = p.convert_to_DSL2() processes.append(new_process) to_replace.append((p.get_code(), call)) for r in to_replace: code = code.replace(r[0], r[1]) code = code.replace(process_section, "\n\n".join(processes)) #TODO -> update the operations -> also consider the operations in the params of the calls which need to be updated for o in self.get_workflow_main().get_all_executors_in_workflow(): if(o.get_type()=="Operation"): code = code.replace(o.get_code(get_OG=True), o.convert_to_DSL2()) else: raise Exception(f"Executor of type '{o.get_type()}' was extracted in a DSL1 workflow! This shoudn't happen! The code is '{o.get_code()}'") #Putting || back code = code.replace("$OR$", "||") return code #This methods generates a random set of processes to consider as relavant #It's not a uniform random it's a bit of a gaussian, centered at 0.5 def generate_random_relevant_processes(self, alpha = -1): import random #Random value between 0 and 1, centered at 0.5 def get_value(): check = True val = -1 while(check): check = False val = random.gauss(0.5, 0.1) if(val<0 or val>1): check = True return val if(self.duplicate): if(alpha == -1): alpha = get_value() else: if(0<=alpha and alpha<=1): None else: raise BioFlowInsightError("alpha is not in the interval [0; 1]") processes_called = [] if(self.get_DSL()=="DSL2"): for c in self.get_workflow_main().get_all_calls_in_workflow(): p = c.get_first_element_called() if(p.get_type()=="Process"): processes_called.append(p) else: processes_called = self.get_first_file().get_processes() nb_2_select = int(alpha*len(processes_called)) sampled = random.sample(set(processes_called), nb_2_select) name_select = [] for p in sampled: name_select.append(p.get_alias()) return name_select else: raise BioFlowInsightError("Trying to generate random relevant processes however option 'duplicate' is not activated.") #This method rewrites the entire workflow into one single file def write_workflow_into_one_file(self): #This tag is used as an identification to safely manipulate the string tag = str(time.time()) self.get_first_file code = self.get_first_file().get_code() #params_section = f"//PARAMS_SECTION_{tag}" function_section = f"//FUNCTION_SECTION" process_section = f"//PROCESS_SECTION" subworkflow_section = f"//SUBWORKFLOW_SECTION" ankers = function_section+ "\n"*3 + process_section+ "\n"*3 + subworkflow_section to_replace = [] for match in re.finditer(constant.FULL_INLCUDE_2, code): to_replace.append(match.group(0)) for r in to_replace: code = code.replace(r, ankers) ankers = "" processes, subworkflows, functions = [], [], [] for c in self.get_workflow_main().get_all_calls_in_workflow(): ele = c.get_first_element_called() if(ele.get_type()=="Process"): processes.append(ele) elif(ele.get_type()=="Subworkflow"): subworkflows.append(ele) elif(ele.get_type()=="Function"): functions.append(ele) else: raise Exception("This shoudn't happen") #Adding processes into code for p in processes: if(p.get_code() not in code): code = code.replace(process_section, '\n'+p.get_code_with_alias()+'\n'+process_section) #Adding subworkflows into code for sub in subworkflows: if(sub.get_code() not in code): code = code.replace(subworkflow_section, subworkflow_section+'\n'+sub.get_code_with_alias()+'\n') #Adding functions into code for fun in functions: if(fun.get_code() not in code): code = code.replace(function_section, function_section+'\n'+fun.get_code()+'\n') #Remove the ankers #code = code.replace(function_section, "") #code = code.replace(process_section, "") #code = code.replace(subworkflow_section, "") ankers = {"function_section":function_section, "process_section":process_section, "subworkflow_section":subworkflow_section} return code, ankers #TODO -> write tests for this method #Function that rewrites the workflow code #Rewriting everything in one file + simplifying the operations and calls to simplify the analysis def simplify_workflow_code(self): code = self.get_first_file().get_code() code, ankers = self.write_workflow_into_one_file() all_executors = self.get_workflow_main().get_all_executors_in_workflow() #We do this so that the longest operation and calls are rewritten first in the code -> to avoid problems executor_2_length = {} for e in all_executors: executor_2_length[e] = len(e.get_code(get_OG = True)) sorted_executor_2_length = {k: v for k, v in sorted(executor_2_length.items(), key=lambda item: item[1], reverse=True)} for exe in sorted_executor_2_length: if(exe.get_type()=="Call" or exe.get_type()=="Operation"): code = code.replace(exe.get_code(get_OG = True), exe.simplify_code(), 1) else: raise Exception("This shouldn't happen") return code def get_subworkflows_called(self): subs = [] for c in self.get_workflow_main().get_all_calls_in_workflow(): ele = c.get_first_element_called() if(ele.get_type()=="Subworkflow"): subs.append(ele) return subs def get_processes_called(self): subs = [] for c in self.get_workflow_main().get_all_calls_in_workflow(): ele = c.get_first_element_called() if(ele.get_type()=="Process"): subs.append(ele) return subs def rewrite_and_initialise(self, code): #Write new code in temporary file temp_file = self.get_output_dir()/f"temp_{str(self)[-7:-2]}.nf" with open(temp_file, "w") as file: file.write(code) #Replace old analysis with new analysis (simplified code) self.__init__(str(temp_file), display_info = False, duplicate=True) self.initialise() def check_relevant_processes_in_workflow(self, relevant_processes): #Check all relevat processes are in wf workflow_processes = [] for c in self.get_workflow_main().get_all_calls_in_workflow(): ele = c.get_first_element_called() if(ele.get_type()=="Process"): workflow_processes.append(ele.get_alias()) for p in relevant_processes: if(p not in workflow_processes): raise BioFlowInsightError(f"The element '{p}' given as a relevant processes is not present in the workflow's processes", 24) def generate_user_view(self, relevant_processes = [], render_graphs = True, processes_2_remove = []): self.graph.initialise(processes_2_remove = processes_2_remove) self.graph.generate_user_view(relevant_processes = relevant_processes, render_graphs = render_graphs) #I do not recommand that the dev uses the same name for the channels inside and outside the channels #Since the local channels become local at the upper level def rewrite_subworkflow_call(self, code, subworklfow): #Remove the defintion from the code code = code.replace(subworklfow.get_code(), "") OG_call = subworklfow.get_call() OG_body = subworklfow.get_work() #REPLACE HEADER TAKES subworkflow_takes = subworklfow.get_takes() parameters = OG_call.get_parameters() if(len(subworkflow_takes)!=len(parameters)): raise Exception("This shouldn't happen -> the same number of parameters should be kept") #This is to replace the paramters and the takes new_header = "" for i in range(len(parameters)): param = parameters[i] takes = subworkflow_takes[i].get_gives()[0] #Here we're checking that the input inside and outside the subworkflow are the same if(takes.get_code()!=param.get_code(get_OG = True)): new_header+=f"{takes.get_code()} = {param.get_code(get_OG = True)}" code = code.replace(OG_call.get_code(get_OG = True), f"{new_header}\n\n{OG_body}", 1) #REPLACE THE EMITS #TODO admittedly this code below is very moche -> but it's functionnal -> update it emits = subworklfow.get_emit() to_replace = [] all_executors = self.get_workflow_main().get_all_executors_in_workflow() for exe in all_executors: #We don't need to check the case call since the workflow has already been rewriteen -> emits only appear in operations if(exe.get_type()=="Operation"): emited = exe.get_origins() if(len(emited)==1): emited = emited[0] if(emited.get_type()=="Emitted"): if(emited.get_emitted_by().get_first_element_called()==subworklfow): if(emited.get_emits() not in emits): raise Exception("This shoudn't happen -> since it is the actual subworkflow") to_replace.append((exe.get_code(get_OG = True), f"{exe.get_gives()[0].get_code()} = {emited.get_emits().get_origins()[0].get_code()}")) for r in to_replace: old, new = r #Case of channel == channel if(new.split("=")[0].strip()==new.split("=")[1].strip()): new = '' code = code.replace(old, new) return code #This Function returns the channels on which the subworkflow (things_added_in_cluster) depend on def get_takes(self, things_added_in_cluster): #Basiccaly this is a deco of channels to opeartions -> when the value is an empty list #This means that the channel is totally definied in the subworkflow -> so we are searching for #Channels which aren't totatly defined in the subworkflow channels_2_sources = {} for ele in things_added_in_cluster: if(ele.get_type() == "Operation"): for o in ele.get_origins(): channels_2_sources[o] = replace_thing_by_call(o.get_source()) elif(ele.get_type() == "Call"): for param in ele.get_parameters(): if(param.get_type()=="Channel"): raise Exception("This shouldn't happen -> with the rewrite all the params should be channels") else: for o in param.get_origins(): if(o.get_type()=="Channel"): channels_2_sources[o] = replace_thing_by_call(o.get_source()) else: raise Exception("This shouldn't happen -> with the rewrite all the params should be channels") else: raise Exception("This shouldn't happen") takes = [] for channel in channels_2_sources: if(set(channels_2_sources[channel]).intersection(things_added_in_cluster)!=set(channels_2_sources[channel])): takes.append(channel) return takes #This Function returns the channels the subworkflow (things_added_in_cluster) emits (other things depend on) def get_emits(self, things_added_in_cluster): emits = [] channel_2_sink = {} #Basiccaly this is a deco of channels to opeartions -> when the value is an empty list #This means that the channel is totally definied in the subworkflow -> so we are searching for #Channels which aren't totatly defined in the subworkflow #This means that things outside the subworkflow depend on this channel channel_2_sink = {} for ele in things_added_in_cluster: if(ele.get_type() == "Operation"): for o in ele.get_gives(): channel_2_sink[o] = replace_thing_by_call(o.get_sink()) elif(ele.get_type() == "Call"): #thing = ele.get_first_element_called() for e in ele.get_later_emits(): channel_2_sink[e] = replace_thing_by_call(e.get_sink()) else: raise Exception("This shouldn't happen") for channel in channel_2_sink: if(set(channel_2_sink[channel]).intersection(things_added_in_cluster)!=set(channel_2_sink[channel])): emits.append(channel) return emits #Method which rewrites the workflow follwong the user view #Conert workflow to user_view only makes sense when the option duplicate is activated -> otherwise is doesn't make sense + it makes the analysis way more complicated def convert_workflow_2_user_view(self, relevant_processes = []): if(self.get_DSL()=="DSL1"): code = self.convert_to_DSL2() self.rewrite_and_initialise(code) if(self.duplicate): code = self.simplify_workflow_code() self.rewrite_and_initialise(code) #Get the clusters and the code self.check_relevant_processes_in_workflow(relevant_processes) self.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = []) clusters = self.graph.get_clusters_from_user_view() #DETERMING WHICH SUBWORKFLOWS ARE BROKEN WITH THE CLUSTER #Creating the clusters with calls instead of processes or subworkflows set_clusters_with_calls = [] for c in clusters: tab = [] for ele in c: if(ele.get_type()=="Operation"): if(ele.get_artificial_status()==False): tab.append(ele) else: call = ele.get_call() tab.append(call) set_clusters_with_calls.append(set(tab)) #Getting subworkflows to executors subworkflow_2_executors = {} for sub in self.get_subworkflows_called(): executors = sub.get_all_executors_in_workflow() subworkflow_2_executors[sub] = [] for ele in executors: #Cannot add calls to subworkflows -> they are already present by definition if(ele.get_type()=="Call" and ele.get_first_element_called().get_type()=="Subworkflow"): None #We don't add it else: subworkflow_2_executors[sub].append(ele) #subworkflow_2_executors[sub.get_name()] = set(list(dico.keys())) #TODO -> write tests to test this function def get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls): broken_subworkflows = [] for sub in subworkflow_2_executors: #You want this list (set) to be equal to subworkflow_2_executors[sub] elements_in_sub_with_clusters = [] for cluster in set_clusters_with_calls: if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])): break for c in cluster: if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])): break if(c in subworkflow_2_executors[sub]): elements_in_sub_with_clusters+=list(cluster) break if(set(elements_in_sub_with_clusters)==set(subworkflow_2_executors[sub])): None #This means that the subworkflow is Intact else: #This means that the subworkflow is broken broken_subworkflows.append(sub) return broken_subworkflows broken_subworkflows = get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls) #Rewrite broken subworkflows for sub in broken_subworkflows: code = self.rewrite_subworkflow_call(code, sub) #TODO -> this needs to be optimised self.rewrite_and_initialise(code) #Get the clusters and the code self.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = []) clusters = self.graph.get_clusters_from_user_view() #Get the clsuters with the corresponding operations inside #for i in range(len(clusters)): # c = clusters[i] # if(len(c)>1): # clusters[i] = self.nextflow_file.graph.get_induced_subgraph(c) #Get the topological order clusters = self.graph.get_topogical_order(clusters) #Creating the subworkflows from clusters calls_in_operations = [] non_relevant_name = 1 channels_to_replace_outside_of_cluster = [] subworkflow_clusters_to_add, subworkflow_cluster_calls_to_add = [], [] index_cluster = 0 for elements in clusters: #Check that there is at least one process in cluster at_least_one_process = False for e in elements: if(e.get_type()=="Process"): at_least_one_process = True #Only create the subworkflows for clusters with more than one element processes_added = [] things_added_in_cluster = [] if(len(elements)>1 and at_least_one_process): name, body, take, emit = "", "", "", "" first_element = True for ele in elements: if(ele.get_type()=="Process"): #Determine the name of the created subworkflow cluster if(ele.get_alias() in relevant_processes): name = f"cluster_{ele.get_alias()}" #Get the call of thing (either process or subworkflow) call = ele.get_call() #If first element -> add marker for the subworkflow call if(first_element): code = code.replace(call.get_code(get_OG = True), f"//Anker_cluster{index_cluster}") first_element = False else: code = code.replace(call.get_code(get_OG = True), "") processes_added.append(call.get_first_element_called()) values = [] for condition in call.get_all_conditions(): values.append(condition.get_value()) printed_condition = " && ".join(values) if(printed_condition!=""): body+=f"if({printed_condition}) {{\n{call.get_code()}\n}}\n" else: body+=f"\n{call.get_code()}\n" things_added_in_cluster.append(call) #Below elif(ele.get_type()=="Operation"): #TODO -> check this verification there might be some "effet de bord" if(not ele.get_artificial_status()): #If first element -> add marker for the subworkflow call if(first_element): code = code.replace(ele.get_code(get_OG = True), f"//Anker_cluster{index_cluster}", 1) first_element = False else: code = code.replace(ele.get_code(get_OG = True), "", 1) #Ignore these cases #TODO -> you should be able to remove this if(ele.get_code()[:4] not in ["emit", "take"]): origins = ele.get_origins() for o in origins: if(o.get_type()=="Call"): calls_in_operations.append(o) values = [] for condition in ele.get_all_conditions(): values.append(condition.get_value()) printed_condition = " && ".join(values) if(printed_condition!=""): body+=f"if({printed_condition}) {{\n{ele.get_code()}\n}}\n" else: body+=f"\n{ele.get_code()}\n" things_added_in_cluster.append(ele) else: raise Exception("This shoudn't happen") #TODO check this part of the code is never seen #Here we removing the Call_12313 thing for call in calls_in_operations: raise Exception("This shouldn't happen since the workflows has been rewritten") body = body.replace(call.get_code(), "") body = body.replace(str(call), call.get_code()) #If the name=="" -> it means there isn't any relevant processes in the cluster -> it means it's a cluster of non relevant nodes if(name==""): #If there are processes called we are going to use them if(len(processes_added)>0): #TODO find a better naming system name = f"non_relevant_cluster_{processes_added[0].get_alias()}" else: #TODO find a better naming system name = f"non_relevant_cluster_{non_relevant_name}" non_relevant_name+=1 #Check that there is a single condtion in the body body = group_together_ifs(body) conditions_in_subworkflow = [] end = -1 for match in re.finditer(r"if\s*(\([^\{]+)\{", body): conditions_in_subworkflow.append(match.group(1).strip()) _, start = match.span(0) if(len(conditions_in_subworkflow)==1): end = extract_curly(body, start) body = body[start: end-1].strip() #TAKE #Adding take parameters on the inside of the subworkflow takes_param = self.get_takes(things_added_in_cluster) new_param_names, index, old_param_names = [], 1, [] for param in takes_param: param_name = f"param_{name}_{index}" #Here if the input is a channel -> we keep the same name for readibility #It also solves a bug described on the 18/02/2025 if(param.get_type()!='Channel'): new_param_names.append(param_name) old_param_names.append(param.get_code()) else: new_param_names.append(param.get_code()) old_param_names.append(param.get_code()) index += 1 if(len(new_param_names)>0): temp = '\n'.join(new_param_names) take = f"\ntake:\n{temp}\n" #EMIT #Adding the emitted outputs emitted_outputs = self.get_emits(things_added_in_cluster) new_output_names, index, old_output_names = [], 0, [] for output in emitted_outputs: output_name = f"{name}.out[{index}]" new_output_names.append(output_name) old_output_names.append(output.get_code()) index += 1 if(len(old_output_names)>0): temp = '\n'.join(old_output_names) emit = f"\nemit:\n{temp}\n" #Adding empty channels if it doesn't exist in the case of a negative condition body = get_channels_to_add_in_false_conditions(body, old_output_names) #Replace names inside subworkflow subworkflow_code = f"workflow {name} {{\n{take}\nmain:\n{body}\n{emit}\n}}" for i in range(len(new_param_names)): pattern = fr"[\=\,\(] *({re.escape(takes_param[i].get_code())})[\s\,\)\.]" subworkflow_code = replace_group1(subworkflow_code, pattern, new_param_names[i]) #subworkflow_code = subworkflow_code.replace(takes_param[i].get_code(), new_param_names[i]) #TODO -> added verification of conditions params = ", ".join(old_param_names) subworkfow_call_case_true = f"{name}({params})" subworkfow_call_case_false = "" for i in range(len(new_output_names)): #In the case of channels, we just add chanel = subworkflow.out[i] if(not bool(re.findall("\.\s*out", old_output_names[i]))): subworkfow_call_case_true+=f"\n{old_output_names[i]} = {new_output_names[i]}" subworkfow_call_case_false+=f"\n{old_output_names[i]} = Channel.empty()" #In the case of emitted values we need to replace the code on the outside else: param_out_name= f"{name}_out_{i+1}" subworkfow_call_case_true+=f"\n{param_out_name} = {new_output_names[i]}" subworkfow_call_case_false+=f"\n{param_out_name} = Channel.empty()" channels_to_replace_outside_of_cluster.append((old_output_names[i], param_out_name)) #If there was only one single condition in the subworkflow cluster -> then we add it when the call is done if(len(conditions_in_subworkflow)==1): subworkfow_call = f"if{conditions_in_subworkflow[0]} {{\n{subworkfow_call_case_true}\n}} else {{\n{subworkfow_call_case_false}\n}}" None else: subworkfow_call = subworkfow_call_case_true subworkflow_clusters_to_add.append(subworkflow_code) subworkflow_cluster_calls_to_add.append(subworkfow_call) index_cluster+=1 #TODO -> rmoving the conditions which are problematic #This might not be the probleme -> when rerunnung the analysis isn't totally robust still_simplifying_conditions = True while(still_simplifying_conditions): still_simplifying_conditions = False to_replace, anker1, anker2 = "", "", "" #Replace if/else for match in re.finditer(r"if\s*\([^\{]+\{\s*(\/\/Anker_cluster\d|\s)\s*\}\s*else\s*\{\s*(\/\/Anker_cluster\d|\s)\s*\}", code): to_replace = match.group(0) anker1, anker2 = match.group(1), match.group(2) still_simplifying_conditions = True break #Replace empty if on its own if(not still_simplifying_conditions): for match in re.finditer(r"(if\s*\([^\{]+\{\s*(\/\/Anker_cluster\d|\s)\s*\})\s*[^e]", code): to_replace = match.group(1) anker1 = match.group(2) still_simplifying_conditions = True break if(still_simplifying_conditions): code = code.replace(to_replace, f"{anker1}\n{anker2}") #Replace the ankers by the calls of the subworkflows for i in range(len(subworkflow_clusters_to_add)): code = code.replace(f"//Anker_cluster{i}", subworkflow_cluster_calls_to_add[i]) for old, new in channels_to_replace_outside_of_cluster: pattern= fr"[ \(,]({re.escape(old)})" code = replace_group1(code, pattern, new) #code = code.replace(old, new) #Add the subworkflow defintions #------------------------------------- #Add anker subworkflow_section = f"//ANKER 4 SUBWORKFLOW DEF" to_replace = "" for match in re.finditer(r"workflow\s*\w*\s*\{", code): to_replace = match.group(0) break if(to_replace==""): raise Exception("No call to a workflow") code = code.replace(to_replace, f"{subworkflow_section}\n\n{to_replace}") for sub in subworkflow_clusters_to_add: code = code.replace(f'{subworkflow_section}', f"{sub}\n\n{subworkflow_section}") #Putting || back code = code.replace("$OR$", "||") return remove_extra_jumps(format_with_tabs(code)) #return code # ##So basically when retriving a thing (process or subworkflow) ##There is necessarily one call associated with the thing -> since we have the option duplicate activated ##You can check if len(thing.call)==1 -> and if not raise an error (but this shouldn't happen) else: raise BioFlowInsightError("Trying to convert the workflow with user view however option 'duplicate' is not activated. -> to reformulated")