diff --git a/src/graph.py b/src/graph.py index b9cd44248ed3b3fd90fb3739aa73aa3842729bc9..d132cd50576a871b6363bb76462ab9fe43117ee9 100644 --- a/src/graph.py +++ b/src/graph.py @@ -430,7 +430,7 @@ class Graph(): #GENERATE USER VIEW #============================ - def get_user_view_graph(self, relevant_processes = [], use_process_dependency_graph = False): + def get_user_view_graph(self, relevant_processes = [], use_process_dependency_graph = False, alias_2_tools = {}): #For now i'm only gonna work from the flattened dico if(use_process_dependency_graph): self.initialise_flattened_dico(self.dico_process_dependency_graph) @@ -438,7 +438,7 @@ class Graph(): self.initialise_flattened_dico(self.full_dico) dico = remove_artificial_nodes(self.dico_flattened) - self.user_view, self.new_nodes_user_view = relev_user_view_builder(dico, relevant_modules=relevant_processes) + self.user_view, self.new_nodes_user_view = relev_user_view_builder(dico, relevant_modules=relevant_processes, alias_2_tools = alias_2_tools) with open(self.get_output_dir()/ "graphs/user_view.json", 'w') as output_file : json.dump(self.user_view, output_file, indent=4) @@ -451,9 +451,9 @@ class Graph(): #return self.user_view, user_view_with_subworkflows return self.user_view - def generate_user_view(self, relevant_processes = [], render_graphs = True, use_process_dependency_graph = False): + def generate_user_view(self, relevant_processes = [], render_graphs = True, use_process_dependency_graph = False, alias_2_tools = {}): #user_view, user_view_with_subworkflows = self.get_user_view_graph(relevant_processes = relevant_processes) - user_view = self.get_user_view_graph(relevant_processes = relevant_processes, use_process_dependency_graph = use_process_dependency_graph) + user_view = self.get_user_view_graph(relevant_processes = relevant_processes, use_process_dependency_graph = use_process_dependency_graph, alias_2_tools = alias_2_tools) #self.user_view_with_subworkflows = user_view_with_subworkflows generate_graph(self.get_output_dir()/'graphs'/"user_view", user_view, label_edge=True, label_node=True, render_graphs = render_graphs, root = False, relevant_nodes = copy.deepcopy(relevant_processes)) #generate_graph(self.get_output_dir()/'graphs'/"user_view_with_subworkflows", user_view_with_subworkflows, label_edge=True, label_node=True, render_graphs = render_graphs, root = False, relevant_nodes = copy.deepcopy(relevant_processes)) diff --git a/src/outils_graph.py b/src/outils_graph.py index 6b4646ec177cc36104c9fbabaad6254976c5b56a..303d08885e48887e3deafc8ce20a2ae73fdd7b89 100644 --- a/src/outils_graph.py +++ b/src/outils_graph.py @@ -1,6 +1,8 @@ import graphviz import copy import numpy as np +import os +import json process_id = "src.process.Process" operation_id = "<src.operation.Operation" @@ -808,18 +810,53 @@ def get_names_tab(dico, tab): final.append(names) return final -def get_name_new_node(new_nodes, relevant_modules): +def get_name_new_node(new_nodes, relevant_modules, tag, alias_2_tools): for r in relevant_modules: for new in new_nodes: if(r in new): return r - #Arbitrary choice of choosing the name with the longest name - longest_name = new_nodes[0][0] - for name in new_nodes: - if(len(longest_name)<len(name[0])): - longest_name = name[0] - - return longest_name + if(alias_2_tools=={}): + #Arbitrary choice of choosing the name with the longest name + longest_name = new_nodes[0][0] + for name in new_nodes: + if(len(longest_name)<len(name[0])): + longest_name = name[0] + return longest_name + else: + #We choose the process which has the "rarest" tool process + names = [] + for name in new_nodes: + process_name = name[0].split(tag)[0] + if(process_name!=""): + names.append(process_name) + if(names==[]): + return new_nodes[0][0] + else: + OG_path = os.getcwd() + #Change working directory to the one of the file + os.chdir("/".join((str(__file__).split("/")[:-1]))) + with open("../ressources/tool_2_nb_usage.json", 'r') as file: + tool_2_nb_usage = json.load(file) + os.chdir(OG_path) + + min_tool, min_process = np.inf, names[0] + for alias in names: + try: + tools = alias_2_tools[alias] + except: + tools = [""] + for t in tools: + try: + val = tool_2_nb_usage[t] + if(t in ['python', 'r', 'perl', 'julia']):#Cause in this case it is a custom script -> one should hope that it is important in this case + val = 1 + except: + val = 1 + if(val<min_tool): + min_tool = val + min_process = alias + return min_process + def check_same_elements(list1, list2): return set(list1)==set(list2) @@ -835,7 +872,7 @@ def get_color_node(node, new_nodes): prop = 256- int(127*len(node)/max) return rgb_to_hex(prop, prop, prop) -def relev_user_view_builder(dico_param, relevant_modules): +def relev_user_view_builder(dico_param, relevant_modules, alias_2_tools): import time dico = copy.deepcopy(dico_param) tag = str(time.time()) @@ -963,7 +1000,7 @@ def relev_user_view_builder(dico_param, relevant_modules): new_dico["subworkflows"] = [] for i in range(len(new_nodes)): new_nodes[i].sort() - new_name = get_name_new_node(get_names_tab(dico, new_nodes[i]), relevant_modules) + new_name = get_name_new_node(get_names_tab(dico, new_nodes[i]), relevant_modules, tag, alias_2_tools) name_printed = new_name.split(tag)[0] shape = "ellipse" if(name_printed==""): diff --git a/src/workflow.py b/src/workflow.py index 3ce11d064a076f47867bac8b0fe4d246130ba980..4ca2a7cf33bcb382278037e1382aee1ec3fe8aad 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -88,6 +88,8 @@ class Workflow: if(self.display_info): print(f"Workflow is written in {self.DSL}") self.cycle_in_workflow = False + self.alias_2_tools = {} + self.scripts_2_tools = {} def create_empty_results(self): @@ -139,7 +141,7 @@ class Workflow: self.nextflow_files.append(nextflow_file) self.nextflow_files = list(set(self.nextflow_files)) - def initialise(self, conditions_2_ignore = []): + def initialise(self, conditions_2_ignore = [], extract_tools = False): """Method that initialises the analysis of the worflow Keyword arguments: @@ -162,6 +164,12 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen if(self.graph==None): self.graph = Graph(self) + + if(extract_tools): + for p in self.get_processes_called(): + tools = p.get_tools() + self.alias_2_tools[p.get_alias()] = tools + self.scripts_2_tools[p.get_script_code()] = tools def iniatilise_tab_processes_2_remove(self): @@ -474,7 +482,7 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen raise BioFlowInsightError("Trying to generate random relevant processes however option 'duplicate' is not activated.") - def get_random_relevant_processes_which_use_bioinformatics_tools(self, processes_2_tools = {}): + def get_random_relevant_processes_which_use_bioinformatics_tools(self, scripts_2_tools = {}): if(self.duplicate): processes_called = [] if(self.get_DSL()=="DSL2"): @@ -486,8 +494,8 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen processes_called = self.get_first_file().get_processes() processes_with_bioinfo_tools = [] for p in processes_called: - if(processes_2_tools!={}): - tools = processes_2_tools[p.get_code()] + if(scripts_2_tools!={}): + tools = scripts_2_tools[p.get_script_code()] else: tools = p.get_tools() if(len(tools)>0): @@ -497,7 +505,7 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen raise BioFlowInsightError("Trying to generate random relevant processes however option 'duplicate' is not activated.") - def get_random_relevant_processes_which_use_bioinformatics_tools_considering_their_frequency(self, processes_2_tools = {}): + def get_random_relevant_processes_which_use_bioinformatics_tools_considering_their_frequency(self, scripts_2_tools = {}): OG_path = os.getcwd() #Change working directory to the one of the file @@ -517,8 +525,8 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen processes_called = self.get_first_file().get_processes() process_to_min_frequency = {} for p in processes_called: - if(processes_2_tools!={}): - tools = processes_2_tools[p.get_code()] + if(scripts_2_tools!={}): + tools = scripts_2_tools[p.get_script_code()] else: tools = p.get_tools() @@ -547,119 +555,7 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen raise BioFlowInsightError("Trying to generate random relevant processes however option 'duplicate' is not activated.") - #The reduction alpha is the minimun number cluster depending on the percentage ofprocesses - #For example if there are 10 processes and reduction_alpha = 0.2 -> we want at least 2 clusters - #In the same idea if reduction_alpha = 0.4 -> we want at least 4 clusters - def get_relevant_processes_which_minimize_nb_of_clusters(self, reduction_alpha = 0.2, number_of_tries = 50): - - min_nb_clusters, min_relevant_processes = np.inf, [] - already_tried = [] - print('-'*number_of_tries+">") - for i in range(number_of_tries): - print('.', end='') - random_relevant_processes = self.generate_random_relevant_processes() - escape = 0 - while(escape<100 and set(random_relevant_processes) in already_tried): - escape+=1 - random_relevant_processes = self.generate_random_relevant_processes() - #Cause it means we've already searched the majority of the possibilities - if(escape>=100): - return min_relevant_processes - already_tried.append(set(random_relevant_processes)) - #Get the clusters and the code - self.generate_user_view(relevant_processes = random_relevant_processes, render_graphs=False) - clusters = self.graph.get_clusters_from_user_view() - #We want the number of clusters to be at least x% of the size of the workflows - if(len(clusters)>=reduction_alpha*len(self.get_processes_called()) and - len(clusters)<min_nb_clusters): - min_relevant_processes = random_relevant_processes - min_nb_clusters = len(clusters) - return min_relevant_processes - - #reduction_beta is the maximum number of clusters depending on the number of processes given in a percentage - #For example if there are 10 processes and reduction_beta = 0.8 -> we want a maximum of 8 clusters - #In the same idea if reduction_beta = 0.6 -> we want a maximum of 6 clusters - #reduction_alpha is the same as above - def get_relevant_processes_which_uniformizes_cluster_distribution(self, reduction_alpha = 0.2, reduction_beta = 0.8, number_of_tries = 50): - - min_uniform_score, min_relevant_processes = np.inf, [] - already_tried = [] - print('-'*number_of_tries+">") - for i in range(number_of_tries): - print('.', end='') - random_relevant_processes = self.generate_random_relevant_processes() - escape = 0 - while(escape<100 and set(random_relevant_processes) in already_tried): - escape+=1 - random_relevant_processes = self.generate_random_relevant_processes() - #Cause it means we've already searched the majority of the possibilities - if(escape>=100): - return min_relevant_processes - already_tried.append(set(random_relevant_processes)) - #Get the clusters and the code - self.generate_user_view(relevant_processes = random_relevant_processes, render_graphs=False) - clusters = self.graph.get_clusters_from_user_view() - clusters_2_size = [] - for c in clusters: - nb_processes = 0 - for ele in c: - if(ele.get_type()=="Process"): - nb_processes+=1 - clusters_2_size.append(nb_processes) - score = 0 - average = np.mean(clusters_2_size) - #La variance - #https://fr.wikipedia.org/wiki/Variance_(math%C3%A9matiques) - for x in clusters_2_size: - score += (average-x)**2/len(clusters_2_size) - if(len(clusters)>=reduction_alpha*len(self.get_processes_called()) and - len(clusters)<=reduction_beta*len(self.get_processes_called()) and - score<min_uniform_score): - min_relevant_processes = random_relevant_processes - min_uniform_score = score - return min_relevant_processes - - - #reduction_alpha is the same as above - #reduction_beta is the same as above - def get_relevant_which_minizes_the_number_of_conditions(self, reduction_alpha = 0.2, reduction_beta = 0.8, number_of_tries = 50): - import copy - min_condition_score, min_relevant_processes = np.inf, [] - already_tried = [] - w_save = copy.deepcopy(self) - number_processes_called = len(self.get_processes_called()) - print('-'*number_of_tries+">") - for i in range(number_of_tries): - print('.', end='') - w = copy.deepcopy(w_save) - random_relevant_processes = w.generate_random_relevant_processes() - escape = 0 - while(escape<100 and set(random_relevant_processes) in already_tried): - escape+=1 - random_relevant_processes = w.generate_random_relevant_processes() - #Cause it means we've already searched the majority of the possibilities - if(escape>=100): - return min_relevant_processes - already_tried.append(set(random_relevant_processes)) - _, cluster_organisation = w.convert_workflow_2_user_view(relevant_processes=random_relevant_processes, render_graphs = False) - - tab_nb_executors_per_cluster, tab_nb_conditions_per_cluster = [], [] - for c in cluster_organisation: - tab_nb_executors_per_cluster.append(cluster_organisation[c]["nb_executors"]) - tab_nb_conditions_per_cluster.append(cluster_organisation[c]["nb_conditions"]) - - score = np.max(tab_nb_conditions_per_cluster) - #score = np.mean(tab_nb_conditions_per_cluster) - #score = np.median(tab_nb_conditions_per_cluster) - #Ratio - #score = np.max(np.array(tab_nb_conditions_per_cluster)/np.array(tab_nb_executors_per_cluster)) - if(len(cluster_organisation)>=reduction_alpha*number_processes_called and - len(cluster_organisation)<=reduction_beta*number_processes_called and - score<min_condition_score): - min_relevant_processes = random_relevant_processes - min_condition_score = score - return min_relevant_processes #TODO -> add excpetion Channel exists in multiple forms -> check with 132 #Cycle exists in workflow too -> 667 @@ -676,27 +572,37 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen import copy min_score, min_processes = np.inf, [] already_tried = [] - w_save = copy.deepcopy(self) - processes_called = self.get_processes_called() + working_workflow = copy.deepcopy(self) + + processes_called = working_workflow.get_processes_called() number_processes_called = len(processes_called) - processes_2_tools = {} + all_process_as_relevant = [] + for p in processes_called: + all_process_as_relevant.append(p.get_alias()) + all_process_as_relevant = list(set(all_process_as_relevant)) + working_workflow.rewrite_workflow_remove_subworkflows(relevant_processes = all_process_as_relevant, render_graphs = False) + w_save = copy.deepcopy(working_workflow) + scripts_2_tools = {} print("Extracting the tools from the processes") print('-'*len(processes_called)+">") for p in processes_called: print('.', end='') - processes_2_tools[p.get_code()] = p.get_tools() + try: + scripts_2_tools[p.get_script_code()] = self.scripts_2_tools[p.get_script_code()] + except: + scripts_2_tools[p.get_script_code()] = p.get_tools() print("\n") - #print("Testing different combinations") - #print('-'*number_of_tries+">") + print("Testing different combinations") + print('-'*number_of_tries+">") for i in range(number_of_tries): - #print('.', end='') - print(i/number_of_tries*100) + print('.', end='') + #print(i/number_of_tries*100) w = copy.deepcopy(w_save) if(process_pre_selection == "bioinfo"): - random_relevant_processes = w.get_random_relevant_processes_which_use_bioinformatics_tools(processes_2_tools = processes_2_tools) + random_relevant_processes = w.get_random_relevant_processes_which_use_bioinformatics_tools(scripts_2_tools = scripts_2_tools) elif(process_pre_selection == "bioinfo_freq"): - random_relevant_processes = w.get_random_relevant_processes_which_use_bioinformatics_tools_considering_their_frequency(processes_2_tools = processes_2_tools) + random_relevant_processes = w.get_random_relevant_processes_which_use_bioinformatics_tools_considering_their_frequency(scripts_2_tools = scripts_2_tools) elif(process_pre_selection == "None"): random_relevant_processes = w.generate_random_relevant_processes() else: @@ -709,19 +615,21 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen if(escape>=100): return min_processes already_tried.append(set(random_relevant_processes)) - #Here the nb of conditions returned is the number of conditions in the clusters after the rewrite def get_nb_conditions_in_clusters(clusters): nb_conditions_in_clusters = [] for cluster in clusters: all_conditions_cluster = [] for c in cluster: + flat_condition_for_element, flat_condition_for_element_tab = "", [] conditions_for_element = c.get_all_conditions() if(len(conditions_for_element)==0): all_conditions_cluster.append("no value") else: for condition in conditions_for_element: - all_conditions_cluster.append(condition.get_value()) + flat_condition_for_element_tab.append(condition.get_value()) + flat_condition_for_element = " && ".join(flat_condition_for_element_tab) + all_conditions_cluster.append(flat_condition_for_element) all_conditions_cluster = list(set(all_conditions_cluster)) @@ -748,7 +656,7 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen # tab_nb_conditions_per_cluster_1.append(cluster_organisation[c]["nb_conditions"]) w = copy.deepcopy(w_save) - w.generate_user_view(relevant_processes = random_relevant_processes, render_graphs=False) + w.generate_user_view(relevant_processes = random_relevant_processes, render_graphs=False, use_process_dependency_graph = False) clusters = w.graph.get_clusters_from_user_view() cluster_with_processes = [] for cluster in clusters: @@ -763,6 +671,7 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen for cluster in cluster_with_processes: tab_nb_executors_per_cluster.append(len(cluster)) + print(np.array(tab_nb_executors_per_cluster).sum()) #Number condtions per cluster tab_nb_conditions_per_cluster = get_nb_conditions_in_clusters(cluster_with_processes) #Number of processes per cluster @@ -791,14 +700,16 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen average_number_of_process_per_cluster = np.mean(tab_nb_processes_per_cluster) for x in tab_nb_processes_per_cluster: uniformity_variance += (average_number_of_process_per_cluster-x)**2/nb_clusters - + dico_results = {"min_nb_clusters":(nb_clusters / number_processes_called), "min_nb_non_relevant_cluster":(nb_non_relevant_clusters / nb_clusters), "uniformity":(uniformity_variance / number_processes_called), "concordance":np.max(np.array(tab_nb_conditions_per_cluster)/np.array(tab_nb_executors_per_cluster)) } score = concordance_factor * np.max(np.array(tab_nb_conditions_per_cluster)/np.array(tab_nb_executors_per_cluster)) + \ uniformity_factor * (uniformity_variance / number_processes_called) + \ min_nb_clusters_factor * (nb_clusters / number_processes_called) + \ min_nb_non_relevant_cluster_factor * (nb_non_relevant_clusters / nb_clusters) - return score, cluster_with_processes + return score, cluster_with_processes, dico_results - score, cluster_organisation = get_score_from_set_relevant_processes(w_save, random_relevant_processes) + + score, cluster_organisation, dico_results = get_score_from_set_relevant_processes(w_save, random_relevant_processes) + #print(dico_results) if(len(cluster_organisation)>=reduction_alpha*number_processes_called and len(cluster_organisation)<=reduction_beta*number_processes_called and score<min_score): @@ -808,9 +719,19 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen #print("min_nb_clusters", (nb_clusters / number_processes_called) ) #print("min_nb_non_relevant_cluster", (nb_non_relevant_clusters / nb_clusters)) #print("score", score) + print() + print(random_relevant_processes) + print("-->", dico_results) + print(score) min_processes = random_relevant_processes min_score = score - return min_processes + + #remove the GG since we're working on the rewritten workflow + processes_returned = [] + for p in min_processes: + processes_returned.append(p.split('_GG_')[0]) + + return processes_returned #Method that returns the order of execution for each executor def get_order_execution_executors(self): @@ -1020,8 +941,9 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen def generate_user_view(self, relevant_processes = [], render_graphs = True, processes_2_remove = [], use_process_dependency_graph = False): + alias_2_tools = self.alias_2_tools self.graph.initialise(processes_2_remove = processes_2_remove) - self.graph.generate_user_view(relevant_processes = relevant_processes, render_graphs = render_graphs, use_process_dependency_graph = use_process_dependency_graph) + self.graph.generate_user_view(relevant_processes = relevant_processes, render_graphs = render_graphs, use_process_dependency_graph = use_process_dependency_graph, alias_2_tools = alias_2_tools) #I do not recommand that the dev uses the same name for the channels inside and outside the channels @@ -1216,6 +1138,95 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen self.check_that_a_channel_is_not_defined_used_and_redefined_used_in_another_block() + def rewrite_workflow_remove_subworkflows(self, relevant_processes, render_graphs): + if(self.get_DSL()=="DSL1"): + code = self.convert_to_DSL2() + self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=render_graphs, def_check_the_same = False) + + if(self.get_DSL()=="DSL2"): + code = self.simplify_workflow_code() + self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=render_graphs) + + #DETERMING WHICH SUBWORKFLOWS ARE BROKEN WITH THE CLUSTER + def get_clusters_with_calls(clusters): + #Creating the clusters with calls instead of processes or subworkflows + set_clusters_with_calls = [] + for c in clusters: + tab = [] + for ele in c: + if(ele.get_type()=="Operation"): + if(ele.get_create_loop_with_call_status()): + raise BioFlowInsightError(f'BioFlow-Insight cannot rewrite the workflows since the operation "{ele.get_code(get_OG = True)}" defines the channel on which it depends. Simply try to renaming the channel.', type="Rewrite Error") + if(ele.get_artificial_status()==False): + tab.append(ele) + else: + call = ele.get_call() + tab.append(call) + set_clusters_with_calls.append(set(tab)) + return set_clusters_with_calls + + + #Getting subworkflows to executors + def get_subworkflow_2_executors(): + subworkflow_2_executors = {} + for sub in self.get_subworkflows_called(): + executors = sub.get_all_executors_in_workflow() + subworkflow_2_executors[sub] = [] + for ele in executors: + #Cannot add calls to subworkflows -> they are already present by definition + if(ele.get_type()=="Call" and ele.get_first_element_called().get_type()=="Subworkflow"): + None + #We don't add it + else: + subworkflow_2_executors[sub].append(ele) + return subworkflow_2_executors + #subworkflow_2_executors[sub.get_name()] = set(list(dico.keys())) + + #TODO -> write tests to test this function + def get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls): + broken_subworkflows = [] + for sub in subworkflow_2_executors: + #You want this list (set) to be equal to subworkflow_2_executors[sub] + elements_in_sub_with_clusters = [] + for cluster in set_clusters_with_calls: + if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])): + break + for c in cluster: + if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])): + break + if(c in subworkflow_2_executors[sub]): + elements_in_sub_with_clusters+=list(cluster) + break + if(set(elements_in_sub_with_clusters)==set(subworkflow_2_executors[sub])): + None + #This means that the subworkflow is Intact + else: + #This means that the subworkflow is broken + broken_subworkflows.append(sub) + return broken_subworkflows + + #Get the clusters and the code + relevant_processes = self.check_relevant_processes_in_workflow(relevant_processes) + self.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = [], render_graphs=False) + clusters = self.graph.get_clusters_from_user_view() + + broken_subworkflows = get_workflows_broken(get_subworkflow_2_executors(), get_clusters_with_calls(clusters)) + #While there still are broken workflows -> need to redo the analysis + while(len(broken_subworkflows)>0): + #Rewrite broken subworkflows + sub = broken_subworkflows[0] + code = self.rewrite_subworkflow_call(code, sub) + self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=render_graphs) + #Get the clusters and the code + #TODO -> remove the generate all_graphs -> it is not necessary + if(render_graphs): + self.generate_all_graphs() + self.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = [], render_graphs=False) + clusters = self.graph.get_clusters_from_user_view() + broken_subworkflows = get_workflows_broken(get_subworkflow_2_executors(), get_clusters_with_calls(clusters)) + + return code, clusters + #Method which rewrites the workflow follwong the user view #Conert workflow to user_view only makes sense when the option duplicate is activated -> otherwise is doesn't make sense + it makes the analysis way more complicated @@ -1262,92 +1273,8 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen self.check_multiple_subworkflow() self.get_order_execution_executors() - if(self.get_DSL()=="DSL1"): - code = self.convert_to_DSL2() - self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=render_graphs, def_check_the_same = False) - if(self.get_DSL()=="DSL2"): - code = self.simplify_workflow_code() - self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=render_graphs) - - #DETERMING WHICH SUBWORKFLOWS ARE BROKEN WITH THE CLUSTER - def get_clusters_with_calls(clusters): - #Creating the clusters with calls instead of processes or subworkflows - set_clusters_with_calls = [] - for c in clusters: - tab = [] - for ele in c: - if(ele.get_type()=="Operation"): - if(ele.get_create_loop_with_call_status()): - raise BioFlowInsightError(f'BioFlow-Insight cannot rewrite the workflows since the operation "{ele.get_code(get_OG = True)}" defines the channel on which it depends. Simply try to renaming the channel.', type="Rewrite Error") - if(ele.get_artificial_status()==False): - tab.append(ele) - else: - call = ele.get_call() - tab.append(call) - set_clusters_with_calls.append(set(tab)) - return set_clusters_with_calls - - - #Getting subworkflows to executors - def get_subworkflow_2_executors(): - subworkflow_2_executors = {} - for sub in self.get_subworkflows_called(): - executors = sub.get_all_executors_in_workflow() - subworkflow_2_executors[sub] = [] - for ele in executors: - #Cannot add calls to subworkflows -> they are already present by definition - if(ele.get_type()=="Call" and ele.get_first_element_called().get_type()=="Subworkflow"): - None - #We don't add it - else: - subworkflow_2_executors[sub].append(ele) - return subworkflow_2_executors - #subworkflow_2_executors[sub.get_name()] = set(list(dico.keys())) - - #TODO -> write tests to test this function - def get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls): - broken_subworkflows = [] - for sub in subworkflow_2_executors: - #You want this list (set) to be equal to subworkflow_2_executors[sub] - elements_in_sub_with_clusters = [] - for cluster in set_clusters_with_calls: - if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])): - break - for c in cluster: - if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])): - break - if(c in subworkflow_2_executors[sub]): - elements_in_sub_with_clusters+=list(cluster) - break - if(set(elements_in_sub_with_clusters)==set(subworkflow_2_executors[sub])): - None - #This means that the subworkflow is Intact - else: - #This means that the subworkflow is broken - broken_subworkflows.append(sub) - return broken_subworkflows - - #Get the clusters and the code - relevant_processes = self.check_relevant_processes_in_workflow(relevant_processes) - self.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = [], render_graphs=render_graphs) - clusters = self.graph.get_clusters_from_user_view() - - broken_subworkflows = get_workflows_broken(get_subworkflow_2_executors(), get_clusters_with_calls(clusters)) - #While there still are broken workflows -> need to redo the analysis - while(len(broken_subworkflows)>0): - #Rewrite broken subworkflows - sub = broken_subworkflows[0] - code = self.rewrite_subworkflow_call(code, sub) - self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=render_graphs) - #Get the clusters and the code - #TODO -> remove the generate all_graphs -> it is not necessary - if(render_graphs): - self.generate_all_graphs() - self.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = [], render_graphs=render_graphs) - clusters = self.graph.get_clusters_from_user_view() - broken_subworkflows = get_workflows_broken(get_subworkflow_2_executors(), get_clusters_with_calls(clusters)) - + code, clusters = self.rewrite_workflow_remove_subworkflows(relevant_processes, render_graphs) #Get the clsuters with the corresponding operations inside @@ -1465,7 +1392,8 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen if(ele.get_type()=="Process"): nb_processes+=1 #Determine the name of the created subworkflow cluster - if(ele.get_alias() in relevant_processes): + name_of_process_without_GG = ele.get_alias().split("_GG_")[0] + if(name_of_process_without_GG in relevant_processes): name = f"cluster_{ele.get_alias()}" #Get the call of thing (either process or subworkflow) call = ele.get_call() @@ -1528,7 +1456,11 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen #If the tools have been extracted they can be used to name the none relevant processes if(extract_tools): for p in processes_added: - for t in p.get_tools(): + try: + tools = self.scripts_2_tools[p.get_script_code()] + except: + tools = p.get_tools() + for t in tools: try: val = tool_2_nb_usage[t] if(t in ['python', 'r', 'perl', 'julia']):#Cause in this case it is a custom script -> one should hope that it is important in this case