diff --git a/src/call.py b/src/call.py index b117605ca15a89da80ed9ef25f10f0c02b92624e..44077b4cc7f0040c3c1d8adf742d12ca4347e7c7 100644 --- a/src/call.py +++ b/src/call.py @@ -35,7 +35,7 @@ class Call(Executor): return self.clean_pipe_operator(self.code.get_code()) else: return self.code.get_code() - + def get_type(self): return "Call" @@ -376,7 +376,11 @@ class Call(Executor): subworkflow = self.get_subworkflow_from_name(tab_call[0]) fun = self.get_function_from_name(tab_call[0]) if(process!=None and subworkflow==None and fun==None): + #temp = process + ##If the lements need to duplicated -> then we need to duplicate it + #if(self.get_duplicate_status()): self.first_element_called = process + #print(process.get_name(), process.call) if(process==None and subworkflow!=None and fun==None): self.first_element_called = subworkflow if(process==None and subworkflow==None and fun!=None): @@ -432,6 +436,7 @@ class Call(Executor): def initialise(self): self.analyse_call(self.get_code(clean_pipe = True)) self.write_summary() + self.first_element_called.set_call(self) #self.add_call_count() diff --git a/src/condition.py b/src/condition.py index 7b4a5e8ea4c1d118e6b4ff43363f93e1309a02d7..bf9ed4cc4c588a554b2f0475b77bf63c90250380 100644 --- a/src/condition.py +++ b/src/condition.py @@ -7,6 +7,9 @@ class Condition: self.conditions = [] self.initialise() + def get_conditions(self): + return self.conditions + def initialise(self): thing_defined = self.origin.get_code(get_OG=True) diff --git a/src/executor.py b/src/executor.py index 082995bb59b9881bd7cbee337b0a8411cc53d9ea..c4b127f969f757fb5d93ab150f174f2b4e54c000 100644 --- a/src/executor.py +++ b/src/executor.py @@ -20,6 +20,7 @@ class Executor(Nextflow_Building_Blocks): def __init__(self, code, origin): self.origin = origin self.code = Code(code = code, origin = self) + self.condition = None @@ -57,6 +58,8 @@ class Executor(Nextflow_Building_Blocks): def get_executors(self): return self.origin.get_executors() + def get_condition(self): + return self.condition def get_file_address(self): diff --git a/src/graph.py b/src/graph.py index 33aa7c78dfbe3388332a1a8f6d1379cfe0ac7d5f..bcc826f6a78e803675ef0cf6e3e0709d98824018 100644 --- a/src/graph.py +++ b/src/graph.py @@ -4,9 +4,14 @@ import networkx as nx import numpy as np import copy import re +import ctypes from .outils_graph import * +def get_object(address): + address = int(re.findall(r"\dx\w+", address)[0], base=16) + return ctypes.cast(address, ctypes.py_object).value + class Graph(): def __init__(self, nextflow_file): self.workflow = nextflow_file @@ -300,7 +305,7 @@ class Graph(): dico = self.dico_flattened user_view, self.new_nodes_user_view = relev_user_view_builder(dico, relevant_modules=relevant_processes) - + with open(self.get_output_dir()/ "graphs/user_view.json", 'w') as output_file : json.dump(user_view, output_file, indent=4) @@ -317,6 +322,18 @@ class Graph(): generate_graph(self.get_output_dir()/'graphs'/"user_view", user_view, label_edge=True, label_node=True, render_graphs = render_graphs, root = False, relevant_nodes = copy.deepcopy(relevant_processes)) generate_graph(self.get_output_dir()/'graphs'/"user_view_with_subworkflows", user_view_with_subworkflows, label_edge=True, label_node=True, render_graphs = render_graphs, root = False, relevant_nodes = copy.deepcopy(relevant_processes)) + + def get_clusters_from_user_view(self): + + tab = [] + for cluster in self.new_nodes_user_view: + temp = [] + for ele in cluster: + temp.append(get_object(ele)) + tab.append(temp) + + return tab + #============================ #GENERATE LEVEL GRAPHS #============================ @@ -435,8 +452,8 @@ class Graph(): for tmp in nodes_level: if B in tmp: node_B_temp = tmp - - if(not exist_path_dico(node_A_temp, node_B_temp, dependency_level)): + exists, _ = exist_path_dico(node_A_temp, node_B_temp, dependency_level) + if(not exists): print("False dependency", edge_user) return True @@ -551,7 +568,7 @@ class Graph(): #Topological ordering #A topological ordering is possible if and only if the graph has no directed cycles, that is, if it is a directed acyclic graph (DAG) #We turn the CDG (cyclic directed graphs) into a DAG (directed acyclic graph) - for A, B in edges_create_cycles: + """for A, B in edges_create_cycles: links_flattened_source_sink[A].remove(B) @@ -582,7 +599,7 @@ class Graph(): dico['number_of_paths_source_2_sink'] = get_number_paths_source_2_sink(links_flattened_source_sink) dico['shortest_path'] = dijkstra(links_flattened_source_sink) - dico['longest_path'] = get_longest_distance(links_flattened_source_sink) + dico['longest_path'] = get_longest_distance(links_flattened_source_sink)""" """#Check that the values calculated are the same than what gives networkX @@ -644,3 +661,75 @@ class Graph(): # dico = self.get_metadata(G) # with open(self.get_output_dir() / "graphs/metadata_graph_wo_operations.json", 'w') as output_file : # json.dump(dico, output_file, indent=4) + + def get_topogical_order(self, clusters): + #if(self.get_process_dependency_graph_dico()=={}): + # self.get_process_dependency_graph() + link_dico = copy.deepcopy(self.link_dico) + sorted_nodes = topological_sort(link_dico) + clusters_sorted = [] + for elements in clusters: + sub_sorted = [] + for ele in sorted_nodes: + ele = get_object(ele) + if(ele in elements): + sub_sorted.append(ele) + clusters_sorted.append(sub_sorted) + return clusters_sorted + + #From a list of processes + #This method gets the nodes from the larger induced graph from these processes + def get_induced_subgraph(self, processes): + self.intia_link_dico() + + nodes_to_conserve = [] + + #Turning the processes into strings so they are compatible with the dico graphs and can be used + processes_strings = [] + for p in processes: + processes_strings.append(str(p)) + #Reomving the unwanted processes from the link dico + link_dico_without_unwanted_processes = copy.deepcopy(self.link_dico) + to_remove = [] + for node in link_dico_without_unwanted_processes: + if(is_process(node) and node not in processes_strings): + to_remove.append(node) + for r in to_remove: + link_dico_without_unwanted_processes.pop(r) + + #Building tab of edges + edges = [] + for A in link_dico_without_unwanted_processes: + for B in link_dico_without_unwanted_processes[A]: + if(B not in to_remove): + edges.append({'A':A, 'B':B}) + + for A in processes_strings: + for B in processes_strings: + if(A!=B): + #While paths still exist we continue to search + exists = True + temp_edges = copy.deepcopy(edges) + while(exists): + exists = False + exists , visited = exist_path(A, B, temp_edges) + nodes_visited = [] + for n in visited: + if(visited[n]): + nodes_visited.append(n) + #In the case there is a path exists, we remove an edge (the last one, connecting to the last node) + #By removing this node -> we break that path + for n in nodes_visited: + try: + temp_edges.remove({'A':n, 'B':B}) + break + except: + None + nodes_to_conserve += nodes_visited + elements = [] + for n in list(set(nodes_to_conserve)): + elements.append(get_object(n)) + return elements + + + diff --git a/src/main_DSL2.py b/src/main_DSL2.py index 7360194ba2718dc4fcab0be0ac288a3d96e1507b..c396134bbf7a0e5e3ce3558d5101bd9676d30283 100644 --- a/src/main_DSL2.py +++ b/src/main_DSL2.py @@ -61,6 +61,21 @@ class Main_DSL2(Nextflow_Building_Blocks): _ = c.get_processes_called(defined = defined) return list(defined.keys()) + + def get_subworkflows_called(self, defined = {}): + for c in self.get_all_called(): + if(c.get_type()=="Subworkflow"): + defined[c] = [] + _ = c.get_subworkflows_called(defined = defined) + return list(defined.keys()) + + def get_functions_called(self, defined = {}): + for c in self.get_all_called(): + if(c.get_type()=="Function"): + defined[c] = [] + elif(c.get_type()=="Subworkflow"): + _ = c.get_functions_called(defined = defined) + return list(defined.keys()) def get_function_from_name(self, name): diff --git a/src/nextflow_building_blocks.py b/src/nextflow_building_blocks.py index 7d82334f88526266f7ade7e918ecb8dc36bd1af6..f54182b57d4b83f94f7bf10e325bcb468e25564c 100644 --- a/src/nextflow_building_blocks.py +++ b/src/nextflow_building_blocks.py @@ -49,7 +49,7 @@ class Nextflow_Building_Blocks: def get_file_address(self): return self.origin.get_file_address() - + def get_display_info(self): return self.origin.get_display_info() diff --git a/src/nextflow_file.py b/src/nextflow_file.py index f4112fdf9306c71f9523b4132926e81c2f409fc3..0b3e1b788d40643e902a4a3ad7a2b296fcdb4c5d 100644 --- a/src/nextflow_file.py +++ b/src/nextflow_file.py @@ -295,6 +295,22 @@ class Nextflow_File(Nextflow_Building_Blocks): return self.main.get_processes_called(defined={}) else: raise Exception("This shouldn't happen!") + + def get_subworkflows_called(self): + if(self.get_DSL()=="DSL1"): + return [] + elif(self.get_DSL()=="DSL2"): + return self.main.get_subworkflows_called(defined={}) + else: + raise Exception("This shouldn't happen!") + + def get_functions_called(self): + if(self.get_DSL()=="DSL1"): + return self.functions + elif(self.get_DSL()=="DSL2"): + return self.main.get_functions_called(defined={}) + else: + raise Exception("This shouldn't happen!") diff --git a/src/outils_graph.py b/src/outils_graph.py index b754f91b62913a41604efe9886c43273b99a8e3e..48401ac717f2f7c3189d888c64fbbdf5ad6f0f9b 100644 --- a/src/outils_graph.py +++ b/src/outils_graph.py @@ -556,7 +556,8 @@ def exist_path(A, B, edges): visited = {} for n in N: visited[n] = False - return exist_path_rec(A, B, edges, visited) + exists = exist_path_rec(A, B, edges, visited) + return exists, visited def get_edges(dico, val= []): val+=dico["edges"] @@ -566,20 +567,23 @@ def get_edges(dico, val= []): def exist_path_dico(A, B, dico): edges = get_edges(dico) - return exist_path(A, B, edges) + exists, visited = exist_path(A, B, edges) + return exists, visited def nr_path_succ(n, r, dico, R): rest_of_R = set(R)-set([r]) edges = remove_edges_with_node(dico["edges"], rest_of_R) - if(exist_path(n, r, edges)): + exists, _ = exist_path(n, r, edges) + if(exists): return True return False def nr_path_pred(r, n, dico, R): rest_of_R = set(R)-set([r]) edges = remove_edges_with_node(dico["edges"], rest_of_R) - if(exist_path(r, n, edges)): + exists, _ = exist_path(r, n, edges) + if(exists): return True return False diff --git a/src/process.py b/src/process.py index 831761c25d563317ed80d86b240f162801cd39c2..3d7a1c899c8de80df9bf33352c44dfb947efb740 100644 --- a/src/process.py +++ b/src/process.py @@ -29,12 +29,19 @@ class Process(Nextflow_Building_Blocks): self.external_scripts = [-1] self.initialise() self.initialised = True + self.call = [] ##It's important this is last #self.condition = Condition(self) def set_alias(self, alias): self.alias = alias + def set_call(self, call): + self.call.append(call) + + def get_call(self): + return self.call + def get_alias(self): return self.alias diff --git a/src/subworkflow.py b/src/subworkflow.py index d468e068b9d8cdb61de8b127ec345b594043ecaf..b0aed5d57e6e72d7f735f1f2beb823141f39b4c1 100644 --- a/src/subworkflow.py +++ b/src/subworkflow.py @@ -17,9 +17,16 @@ class Subworkflow(Main_DSL2): self.take = [] self.work = None self.emit = [] + self.call = [] self.initialised = False + def set_call(self, call): + self.call.append(call) + + def get_call(self): + return self.call + def print_summary(self, tab = 0): print(" "*tab+f"* {self.name} ({self})") super().print_summary(tab) diff --git a/src/workflow.py b/src/workflow.py index 2639f5e8036b57ecc06edf439835521b6ace35ad..51437685d283d30102ff3153a82547ea9180a346 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -384,6 +384,24 @@ class Workflow: """ return self.nextflow_file.get_processes_called() + + def get_subworkflows_called(self): + """Method that returns a list of the subworkflows called/used during the workflow execution + + Keyword arguments: + + """ + return self.nextflow_file.get_subworkflows_called() + + def get_functions_called(self): + """Method that returns a list of the functions called/used during the workflow execution + + Keyword arguments: + + """ + return self.nextflow_file.get_functions_called() + + def get_tools(self): """Method that returns a list of the tools used by the workflow @@ -640,7 +658,6 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen params_code = "\n".join(params_list) code = code.replace(params_section, params_code) - #Moving Functions functions = [] for f in self.nextflow_file.functions: @@ -650,8 +667,6 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen for r in functions: code = code.replace(r, "") code = code.replace(function_section, "\n\n".join(functions)) - - #Moving Processes processes = [] @@ -672,9 +687,118 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen code = code.replace(o.get_code(get_OG=True), o.convert_to_DSL2()) else: raise Exception(f"Executor of type '{o.get_type()}' was extracted in a DSL1 workflow! This shoudn't happen! The code is '{o.get_code()}'") - #print(code) - + return code + + def write_workflow_into_one_file(self): + #This tag is used as an identification to safely manipulate the string + tag = str(time.time()) + + code = self.nextflow_file.get_code() + + #params_section = f"//PARAMS_SECTION_{tag}" + function_section = f"//FUNCTION_SECTION_{tag}" + process_section = f"//PROCESS_SECTION_{tag}" + subworkflow_section = f"//SUBWORKFLOW_SECTION_{tag}" + + ankers = function_section+ "\n"*3 + process_section+ "\n"*3 + subworkflow_section + + to_replace = [] + for match in re.finditer(constant.FULL_INLCUDE_2, code): + to_replace.append(match.group(0)) + for r in to_replace: + code = code.replace(r, ankers) + ankers = "" + + #Adding processes into code + for p in self.get_processes_called(): + if(p.get_code() not in code): + code = code.replace(process_section, '\n'+p.get_code()+'\n'+process_section) + + #Adding subworkflows into code + for sub in self.get_subworkflows_called(): + if(sub.get_code() not in code): + code = code.replace(subworkflow_section, subworkflow_section+'\n'+sub.get_code()+'\n') + + #Adding functions into code + for fun in self.get_functions_called(): + if(fun.get_code() not in code): + code = code.replace(function_section, function_section+'\n'+fun.get_code()+'\n') + + #Remove the ankers + #code = code.replace(function_section, "") + #code = code.replace(process_section, "") + #code = code.replace(subworkflow_section, "") + ankers = {"function_section":function_section, + "process_section":process_section, + "subworkflow_section":subworkflow_section} + + return code, ankers + + #Conert workflow to user_view only makes sense when the option duplicate is activated -> otherwise is doesn't make sense + it makes the analysis way more complicated + def convert_workflow_2_user_view(self, relevant_processes = []): + if(self.duplicate): + self.nextflow_file.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = []) + clusters = self.nextflow_file.graph.get_clusters_from_user_view() + print(clusters) + code, ankers = self.write_workflow_into_one_file() #print(code) - # - #for c in self.get_channels(): \ No newline at end of file + + for i in range(len(clusters)): + c = clusters[i] + if(len(c)>1): + clusters[i] = self.nextflow_file.graph.get_induced_subgraph(c) + print(clusters) + clusters = self.nextflow_file.graph.get_topogical_order(clusters) + print(clusters) + #Creating the subworkflows from clusters + calls_in_operations = [] + for elements in clusters: + name, body, take, emit = "", "", "", "" + for ele in elements: + if(ele.get_type()=="Process"): + #Determine the name of the created subworkflow cluster + if(ele.get_name() in relevant_processes): + name = f"cluster_{ele.get_name()}" + #Get the call of thing (either process or subworkflow) + #TODO -> check it works with subworkflows + call = ele.get_call() + #This verification is really important + if(len(call)!=1): + raise Exception("This shoudn't happen since duplicate mode is activated") + call = call[0] + printed_condition = " && ".join(call.get_condition().get_conditions()) + if(printed_condition!=""): + body+=f"if({printed_condition}) {{\n{call.get_code()}\n}}\n" + else: + body+=f"\n{call.get_code()}\n" + elif(ele.get_type()=="Operation"): + origins = ele.get_origins() + for o in origins: + if(o.get_type()=="Call"): + calls_in_operations.append(o) + printed_condition = " && ".join(ele.get_condition().get_conditions()) + if(printed_condition!=""): + body+=f"if({printed_condition}) {{\n{ele.get_code()}\n}}\n" + else: + body+=f"\n{ele.get_code()}\n" + for call in calls_in_operations: + body = body.replace(call.get_code(), "") + body = body.replace(str(call), call.get_code()) + + + subworkflow_code = f"subworkflow {name} {{\n{take}\n{body}\n{emit}\n}}" + print(subworkflow_code) + print() + + + #So basically when retriving a thing (process or subworkflow) + #There is necessarily one call associated with the thing -> since we have the option duplicate activated + #You can check if len(thing.call)==1 -> and if not raise an error (but this shouldn't happen) + else: + raise BioFlowInsightError("Trying to convert the workflow with user view however option 'duplicate' is not activated. -> to reformulated") + + + + +