From 481977c4b8992d0b4d3cf45eb1cd4866ab38eeea Mon Sep 17 00:00:00 2001 From: George Marchment <georgemarchment@yahoo.fr> Date: Tue, 15 Apr 2025 16:29:06 +0200 Subject: [PATCH] Added criteria which calculates the general score -> you can ignore certain things depending on the parameters --- src/workflow.py | 199 +++++++++++++++++++++++++++++++++++------------- 1 file changed, 144 insertions(+), 55 deletions(-) diff --git a/src/workflow.py b/src/workflow.py index 16231fe..7385790 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -389,26 +389,59 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen return code - #This methods generates a random set of processes to consider as relavant - #It's not a uniform random it's a bit of a gaussian, centered at 0.5 - def generate_random_relevant_processes(self, alpha = -1): + #This function draws processes from a pool and checks if they are direclty dependend on each other + def draw_pool_and_check_dependencies(self, pool, alpha=-1): edges_create_cycles = self.graph.get_edges_that_create_cycle() import random #Random value between 0 and 1, centered at 0.5 def get_value(): - #check = True - #val = -1 - #while(check): - # check = False - # val = random.gauss(0.5, 0.1) - # if(val<0 or val>1): - # check = True val = random.random() return val - if(self.duplicate): + searching = True + while(searching): + searching = False + if(alpha == -1): + alpha = get_value() + else: + if(0<=alpha and alpha<=1): + None + else: + raise BioFlowInsightError("alpha is not in the interval [0; 1]") + nb_2_select = int(alpha*len(pool)) + sampled = random.sample(set(pool), nb_2_select) + + def get_object(address): + address = int(re.findall(r"\dx\w+", address)[0], base=16) + return ctypes.cast(address, ctypes.py_object).value + + edges_create_cycles_names = [] + for e in edges_create_cycles: + n1, n2 = e + obj1, obj2 = get_object(n1), get_object(n2) + edges_create_cycles_names.append((obj1.get_alias(), obj2.get_alias())) + + sampled_str = [] + for s in sampled: + sampled_str.append(s.get_alias()) + for e in edges_create_cycles_names: + if(e[0] in sampled_str and e[1] in sampled_str): + #So that means there are the 2 nodes which form the cycle edge in the relevant processes + #-> it means we need to regenerated relevant processes + searching = True + break + + name_select = [] + for p in sampled: + name_select.append(p.get_alias()) + return name_select + + + #This methods generates a random set of processes to consider as relavant + def generate_random_relevant_processes(self, alpha = -1): + if(self.duplicate): processes_called = [] if(self.get_DSL()=="DSL2"): for c in self.get_workflow_main().get_all_calls_in_workflow(): @@ -417,49 +450,36 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen processes_called.append(p) else: processes_called = self.get_first_file().get_processes() - - searching = True - while(searching): - searching = False - if(alpha == -1): - alpha = get_value() - else: - if(0<=alpha and alpha<=1): - None - else: - raise BioFlowInsightError("alpha is not in the interval [0; 1]") - nb_2_select = int(alpha*len(processes_called)) - sampled = random.sample(set(processes_called), nb_2_select) - - - - def get_object(address): - address = int(re.findall(r"\dx\w+", address)[0], base=16) - return ctypes.cast(address, ctypes.py_object).value - - edges_create_cycles_names = [] - for e in edges_create_cycles: - n1, n2 = e - obj1, obj2 = get_object(n1), get_object(n2) - edges_create_cycles_names.append((obj1.get_alias(), obj2.get_alias())) - - sampled_str = [] - for s in sampled: - sampled_str.append(s.get_alias()) - for e in edges_create_cycles_names: - if(e[0] in sampled_str and e[1] in sampled_str): - #So that means there are the 2 nodes which form the cycle edge in the relevant processes - #-> it means we need to regenerated relevant processes - searching = True - break - - name_select = [] - for p in sampled: - name_select.append(p.get_alias()) - return name_select + return self.draw_pool_and_check_dependencies(processes_called) else: raise BioFlowInsightError("Trying to generate random relevant processes however option 'duplicate' is not activated.") + + + #TODO -> do this in a bit of a smarter way -> looking at popularity of the tools + def get_random_relevant_processes_which_use_bioinformatics_tools(self, processes_2_tools = {}): + if(self.duplicate): + processes_called = [] + if(self.get_DSL()=="DSL2"): + for c in self.get_workflow_main().get_all_calls_in_workflow(): + p = c.get_first_element_called() + if(p.get_type()=="Process"): + processes_called.append(p) + else: + processes_called = self.get_first_file().get_processes() + processes_with_bioinfo_tools = [] + for p in processes_called: + if(processes_2_tools!={}): + tools = processes_2_tools[p.get_code()] + else: + tools = p.get_tools() + if(len(tools)>0): + processes_with_bioinfo_tools.append(p) + return self.draw_pool_and_check_dependencies(processes_with_bioinfo_tools) + else: + raise BioFlowInsightError("Trying to generate random relevant processes however option 'duplicate' is not activated.") + + #The reduction alpha is the minimun number cluster depending on the percentage ofprocesses #For example if there are 10 processes and reduction_alpha = 0.2 -> we want at least 2 clusters #In the same idea if reduction_alpha = 0.4 -> we want at least 4 clusters @@ -521,8 +541,10 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen clusters_2_size.append(nb_processes) score = 0 average = np.mean(clusters_2_size) + #La variance + #https://fr.wikipedia.org/wiki/Variance_(math%C3%A9matiques) for x in clusters_2_size: - score += ((average-x)/average)**2 + score += (average-x)**2/len(clusters_2_size) if(len(clusters)>=reduction_alpha*len(self.get_processes_called()) and len(clusters)<=reduction_beta*len(self.get_processes_called()) and score<min_uniform_score): @@ -573,6 +595,73 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen min_condition_score = score return min_relevant_processes + def get_relevant_following_best_general_score(self, + reduction_alpha = 0.2, + reduction_beta = 0.8, + number_of_tries = 50, + using_processes_with_bioinfo_tools = True, + concordance_factor = 1, + uniformity_factor = 1, + min_nb_clusters_factor = 1, + min_nb_non_relevant_cluster = 1): + import numpy as np + import copy + min_score, min_processes = np.inf, [] + already_tried = [] + w_save = copy.deepcopy(self) + processes_called = self.get_processes_called() + number_processes_called = len(processes_called) + processes_2_tools = {} + for p in processes_called: + processes_2_tools[p.get_code()] = p.get_tools() + print('-'*number_of_tries+">") + for i in range(number_of_tries): + print('.', end='') + w = copy.deepcopy(w_save) + if(using_processes_with_bioinfo_tools): + random_relevant_processes = w.get_random_relevant_processes_which_use_bioinformatics_tools(processes_2_tools = processes_2_tools) + else: + random_relevant_processes = w.generate_random_relevant_processes() + escape = 0 + while(escape<100 and set(random_relevant_processes) in already_tried): + escape+=1 + random_relevant_processes = w.generate_random_relevant_processes() + #Cause it means we've already searched the majority of the possibilities + if(escape>=100): + return min_processes + already_tried.append(set(random_relevant_processes)) + _, cluster_organisation = w.convert_workflow_2_user_view(relevant_processes=random_relevant_processes, render_graphs = False) + + tab_nb_executors_per_cluster, tab_nb_processes_per_cluster, tab_nb_conditions_per_cluster = [], [], [] + for c in cluster_organisation: + tab_nb_executors_per_cluster.append(cluster_organisation[c]["nb_executors"]) + tab_nb_processes_per_cluster.append(cluster_organisation[c]["nb_processes"]) + tab_nb_conditions_per_cluster.append(cluster_organisation[c]["nb_conditions"]) + + nb_clusters = len(cluster_organisation) + nb_non_relevant_clusters = 0 + for c in cluster_organisation: + #This means it's a non relvant cluster + if("non_relevant_cluster_" in c): + nb_non_relevant_clusters+=1 + + uniformity_variance = 0 + average_number_of_process_per_cluster = np.mean(tab_nb_processes_per_cluster) + for x in tab_nb_processes_per_cluster: + uniformity_variance += (average_number_of_process_per_cluster-x)**2/nb_clusters + + score = concordance_factor * np.mean(np.array(tab_nb_conditions_per_cluster)/np.array(tab_nb_executors_per_cluster)) + \ + uniformity_factor * (uniformity_variance / number_processes_called) + \ + min_nb_clusters_factor * (nb_clusters / number_processes_called) + \ + min_nb_non_relevant_cluster * (nb_non_relevant_clusters / nb_clusters) + + if(len(cluster_organisation)>=reduction_alpha*number_processes_called and + len(cluster_organisation)<=reduction_beta*number_processes_called and + score<min_score): + min_processes = random_relevant_processes + min_score = score + return min_processes + #Method that returns the order of execution for each executor def get_order_execution_executors(self): dico = {} @@ -1142,7 +1231,7 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen #We replace the last clusters first -> this is cause the outputs of the last clusters aren't used anywhere else in the workflow by definition for elements in list(reversed(clusters)): - nb_executors = 0 + nb_executors, nb_processes = 0, 0 channels_to_replace_outside_of_cluster = [] @@ -1163,7 +1252,7 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen for ele in elements: nb_executors+=1 if(ele.get_type()=="Process"): - + nb_processes+=1 #Determine the name of the created subworkflow cluster if(ele.get_alias() in relevant_processes): name = f"cluster_{ele.get_alias()}" @@ -1364,7 +1453,7 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen #code = code.replace(old, new) #Since i've added the conditions myself -> i can just count them by searching for this simple pattern - clusters_2_organisation[subworkflow_code] = {"nb_executors":nb_executors, "nb_conditions":subworkflow_code.count("if(")} + clusters_2_organisation[subworkflow_code] = {"nb_executors":nb_executors, "nb_conditions":subworkflow_code.count("if("), "nb_processes":nb_processes} #Add the subworkflow defintions #------------------------------------- -- GitLab