-
George Marchment authored
Update function "remove_empty_conditions_place_anker" -> removing the process so that the conditions inside process are not extracted
55d474f5
workflow.py 59.78 KiB
#Import dependencies
#Local
from .nextflow_file import Nextflow_File
from .ro_crate import RO_Crate
from . import constant
from .outils import is_git_directory, format_with_tabs, replace_thing_by_call, replace_group1, group_together_ifs, extract_curly, remove_extra_jumps, get_channels_to_add_in_false_conditions, extract_conditions, remove_empty_conditions_place_anker
from .outils_graph import get_flatten_dico, initia_link_dico_rec, get_number_cycles, generate_graph
from .outils_annotate import get_tools_commands_from_user_for_process
from .bioflowinsighterror import BioFlowInsightError
from .graph import Graph
#Outside packages
import os
import re
import json
from pathlib import Path
import glob
import ctypes
import time
class Workflow:
"""
This is the main workflow class, from this class, workflow analysis can be done.
After analysis, workflow structure reconstruction can be done.
Attributes:
file: A string indicating the address to the workflow main or the directory containing the workflow
duplicate: A boolean indicating if processes are to be duplicated in the structure
display_info: A boolean indicating if the analysis information should be printed
output_dir: A string indicating where the results will be saved
name: A string indicating the name of the workflow
processes_2_remove: A string indicating the processes to remove from the workflow
"""
def __init__(self, file, duplicate=True, display_info=True, output_dir = './results',
name = None, processes_2_remove = None):
#Getting the main nextflow file
if(not os.path.isfile(file)):
nextflow_files = glob.glob(f'{file}/*.nf')
if(len(nextflow_files)==0):
raise BioFlowInsightError("No Nextflow files ('.nf') are in the directory!", num = -1)
txt = ""
#Try to read the main.nf file -> if this cannot be found then the first nextflow file is used
try:
file = file+"/main.nf"
with open(file, 'r') as f:
txt= f.read()
except:
None
#raise BioFlowInsightError("No 'main.nf' file found at the root of the prohject")
if(txt==""):
if(len(nextflow_files)==1):
file = nextflow_files[0]
with open(file, 'r') as f:
txt= f.read()
else:
#If there are multiple files and no main -> we just choose one at random
file = nextflow_files[0]
with open(file, 'r') as f:
txt= f.read()
#raise BioFlowInsightError("Multiple Nextflow files found at the root with no 'main.nf' file: I don't know which one to select")
self.duplicate = duplicate
self.display_info = display_info
self.processes_2_remove = processes_2_remove
self.output_dir = Path(output_dir)
self.nextflow_files = []
self.workflow_directory = '/'.join(file.split('/')[:-1])
self.name = name
self.graph = None
self.conditions_2_ignore = []
self.ternary_operation_dico = {}
self.map_element_dico = {}
OG_file = Nextflow_File(file, workflow = self, first_file = True)
self.DSL = OG_file.find_DSL()
self.create_empty_results()
if(self.display_info):
print(f"Workflow is written in {self.DSL}")
def create_empty_results(self):
os.makedirs(self.output_dir, exist_ok=True)
os.makedirs(self.output_dir / 'debug', exist_ok=True)
os.makedirs(self.output_dir / 'graphs', exist_ok=True)
with open(self.output_dir / "debug" / "operations.nf",'w') as file:
pass
with open(self.output_dir / "debug" / "calls.nf",'w') as file:
pass
with open(self.output_dir / "debug" / "operations_in_call.nf",'w') as file:
pass
def get_root_directory(self):
first_file = self.get_first_file()
return '/'.join(str(first_file.get_file_address()).split('/')[:-1])+"/"
def get_conditions_2_ignore(self):
return self.conditions_2_ignore
def get_duplicate_status(self):
return self.duplicate
def get_output_dir(self):
return Path(self.output_dir)
def get_DSL(self):
return self.DSL
def get_display_info_bool(self):
return self.display_info
def set_DSL(self, DSL):
self.DSL = DSL
def get_first_file(self):
for file in self.nextflow_files:
if(file.first_file):
return file
def get_workflow_main(self):
return self.get_first_file().main
def add_nextflow_file_2_workflow(self, nextflow_file):
self.nextflow_files.append(nextflow_file)
self.nextflow_files = list(set(self.nextflow_files))
def initialise(self, conditions_2_ignore = []):
"""Method that initialises the analysis of the worflow
Keyword arguments:
"""
self.conditions_2_ignore = conditions_2_ignore
#Right now i'm just gonna do everything in DSL2
#At this point there should only be one nextflow file
if(len(self.nextflow_files)==1):
self.nextflow_files[0].initialise()
else:
raise BioFlowInsightError("This souldn't happen. There are multiple Nextflow files composing the workflow before the analysis has even started.")
if(self.display_info):
citation = """\nTo cite BioFlow-Insight, please use the following publication:
George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen-Boulakia, BioFlow-Insight: facilitating reuse of Nextflow workflows with structure reconstruction and visualization, NAR Genomics and Bioinformatics, Volume 6, Issue 3, September 2024, lqae092, https://doi.org/10.1093/nargab/lqae092"""
print(citation)
if(self.graph==None):
self.graph = Graph(self)
def iniatilise_tab_processes_2_remove(self):
if(self.processes_2_remove==None):
tab_processes_2_remove = []
if(self.processes_2_remove!=None):
temp = self.processes_2_remove.split(",")
for t in temp:
tab_processes_2_remove.append(t.strip())
self.processes_2_remove = tab_processes_2_remove
def get_structure(self):
dico = {}
dico['nodes'] = []
dico['edges'] = []
dico['subworkflows'] = {}
if(self.get_DSL() == "DSL1"):
main = self.get_workflow_main()
if(main!=None):
return main.get_structure(dico)
elif(self.get_DSL() == "DSL2"):
main = self.get_workflow_main()
if(main!=None):
return main.get_structure(dico)
else:
return dico
#return self.get_structure_DSL2(dico=dico, start = True)
else:
raise Exception(f"The workflow's DSL is '{self.DSL}' -> I don't know what this is!")
#################
# GRAPHS
#################
def generate_specification_graph(self, render_graphs = True):
self.iniatilise_tab_processes_2_remove()
self.graph.initialise(processes_2_remove = self.processes_2_remove)
self.graph.get_specification_graph(render_graphs = render_graphs)
def generate_process_dependency_graph(self, render_graphs = True):
self.iniatilise_tab_processes_2_remove()
self.graph.initialise(processes_2_remove = self.processes_2_remove)
self.graph.render_process_dependency_graph(render_graphs = render_graphs)
#TODO -> update this
def generate_all_graphs(self, render_graphs = True):
self.generate_specification_graph(render_graphs = render_graphs)
self.generate_process_dependency_graph(render_graphs = render_graphs)
#Method that checks if a given graph sepcification is an isomorphism with the workflows
def check_if_json_equal_to_full_structure(self, file):
self.iniatilise_tab_processes_2_remove()
return self.graph.check_if_json_equal_to_full_structure(file, processes_2_remove = self.processes_2_remove)
###########################
# Generate test data
###########################
#These are the methods which generate the test data
def generate_test_specification_graph(self):
dico = self.graph.get_full_dico()
with open(self.get_output_dir()/ 'test' /"specification_graph.json", "w") as outfile:
json.dump(dico, outfile, indent = 4)
def generate_all_executors(self):
executors = self.get_workflow_main().get_all_executors_in_workflow()
dico= {}
for e in executors:
dico[str(e)] = e.get_code(get_OG = True)
with open(self.get_output_dir()/ 'test' /"all_executors.json", "w") as outfile:
json.dump(dico, outfile, indent = 4)
def generate_executors_per_subworkflows(self):
subs = self.get_subworkflows_called()
dico= {}
for s in subs:
dico[str(s)]= {}
executors = s.get_all_executors_in_workflow()
for e in executors:
dico[str(s)][str(e)] = e.get_code(get_OG = True)
with open(self.get_output_dir()/ 'test' /"executors_per_subworkflows.json", "w") as outfile:
json.dump(dico, outfile, indent = 4)
def generate_all_processes(self):
processes = self.get_processes_called()
dico= {}
for p in processes:
dico[str(p)] = p.get_code()
with open(self.get_output_dir()/ 'test' /"all_processes.json", "w") as outfile:
json.dump(dico, outfile, indent = 4)
def generate_all_subworkflows(self):
subs = self.get_subworkflows_called()
dico= {}
for s in subs:
dico[str(s)] = s.get_code()
with open(self.get_output_dir()/ 'test' /"all_subworkflows.json", "w") as outfile:
json.dump(dico, outfile, indent = 4)
def generate_all_test_data(self):
self.generate_test_specification_graph()
self.generate_all_executors()
self.generate_all_processes()
self.generate_all_subworkflows()
self.generate_executors_per_subworkflows()
#Returns a dico of number of processes called per each condition
#For example : {condition1: 14, condition2: 10, condition:3}
#14 process calls depend on condition1
#10 process calls depend on condition2
#3 process calls depend on condition3
def get_most_influential_conditions(self, show_values = True):
if(self.get_duplicate_status()):
most_influential_conditions = self.get_workflow_main().get_most_influential_conditions()
#If show values then we replace the the conditions ids with their values
if(show_values):
most_influential_conditions_values = {}
for condition in most_influential_conditions:
try:
t = most_influential_conditions_values[condition.get_value()]
except:
most_influential_conditions_values[condition.get_value()] = 0
most_influential_conditions_values[condition.get_value()] += most_influential_conditions[condition]
most_influential_conditions = most_influential_conditions_values
#Sort the dico
most_influential_conditions = {k: v for k, v in sorted(most_influential_conditions.items(), key=lambda item: item[1], reverse=True)}
return most_influential_conditions
else:
BioFlowInsightError("Need to activate 'duplicate' mode to use this method.")
#When there are multiple emits turn them into one and the end of the call eg, instead of fastp_ch2 = fastp.out.fastp_ch2 -> have fastp_ch2 = fastp_ch
def convert_to_DSL2(self):
if(self.get_DSL()=="DSL2"):
print("Workflow is already written in DSL2")
else:
#This tag is used as an identification to safely manipulate the string
tag = str(time.time())
nextflow_file = self.get_first_file()
code = nextflow_file.get_code()
start_code = r"#!/usr/bin/env nextflow"
start_code_pattern = r"\#\!\s*\/usr\/bin\/env\s+nextflow"
end_code = "workflow.onComplete"
pos_start, pos_end= 0, len(code)
if(code.find(end_code)!=-1):
pos_end = code.find(end_code)
code_to_replace = code[pos_start:pos_end]
for match in re.finditer(start_code_pattern, code):
pos_start = match.span(0)[1]+1
#if(code.find(start_code)!=-1):
# pos_start = code.find(start_code)+len(start_code)
body = code[pos_start:pos_end]#.replace('\n', '\n\t')
include_section = f"//INCLUDE_SECTION_{tag}"
params_section = f"//PARAMS_SECTION_{tag}"
function_section = f"//FUNCTION_SECTION_{tag}"
process_section = f"//PROCESS_SECTION_{tag}"
code = code.replace(code_to_replace, f"""{start_code}\n\n\n{include_section}\n\n\n{params_section}\n\n\n{function_section}\n\n\n{process_section}\n\n\nworkflow{{\n\n{body}\n}}\n\n""")
##I've out this in a comment cause since it's a DSL1
#params_list = []
#for match in re.finditer(r"params.\w+ *\= *[^\n=]([^\n])*", code):
# params_list.append(match.group(0))
#for params in params_list:
# code = code.replace(params, "")
#params_code = "\n".join(params_list)
#code = code.replace(params_section, params_code)
#Moving Functions
functions = []
for f in nextflow_file.functions:
function = f.get_code()
functions.append(function)
for r in functions:
code = code.replace(r, "")
code = code.replace(function_section, "\n\n".join(functions))
#Moving Processes
processes = []
to_replace = []
for p in nextflow_file.get_processes():
new_process, call = p.convert_to_DSL2()
processes.append(new_process)
to_replace.append((p.get_code(get_OG = True), call))
for r in to_replace:
code = code.replace(r[0], r[1])
code = code.replace(process_section, "\n\n".join(processes))
#TODO -> update the operations -> also consider the operations in the params of the calls which need to be updated
for o in self.get_workflow_main().get_all_executors_in_workflow():
if(o.get_type()=="Operation"):
code = code.replace(o.get_code(get_OG=True), o.convert_to_DSL2())
else:
raise Exception(f"Executor of type '{o.get_type()}' was extracted in a DSL1 workflow! This shoudn't happen! The code is '{o.get_code()}'")
#Putting || back
code = code.replace("$OR$", "||")
#put_modified_operations_back
#TODO -> add the other things necessary to reformat code
#Somethimes this is incorrect but that's due to the fact that the DSL1 analysis isn't as clean as the DSL2 analyse (concerning the conditions)
#What i mean that when searching for channels, DSL1 doesn't consider the conditions when searching from the processes while DSL2 does
#The conversion works well but it's just comparing to the old DSL1 workflow doesn't make sense
#If you want to put this line back you need #TODO update the DSL1 parsing to consider the blocks when defining the processes
#A good example is KevinMenden/hybrid-assembly
self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=False, def_check_the_same = False)
return code
#This methods generates a random set of processes to consider as relavant
#It's not a uniform random it's a bit of a gaussian, centered at 0.5
def generate_random_relevant_processes(self, alpha = -1):
edges_create_cycles = self.graph.get_edges_that_create_cycle()
import random
#Random value between 0 and 1, centered at 0.5
def get_value():
#check = True
#val = -1
#while(check):
# check = False
# val = random.gauss(0.5, 0.1)
# if(val<0 or val>1):
# check = True
val = random.random()
return val
if(self.duplicate):
processes_called = []
if(self.get_DSL()=="DSL2"):
for c in self.get_workflow_main().get_all_calls_in_workflow():
p = c.get_first_element_called()
if(p.get_type()=="Process"):
processes_called.append(p)
else:
processes_called = self.get_first_file().get_processes()
searching = True
while(searching):
searching = False
if(alpha == -1):
alpha = get_value()
else:
if(0<=alpha and alpha<=1):
None
else:
raise BioFlowInsightError("alpha is not in the interval [0; 1]")
nb_2_select = int(alpha*len(processes_called))
sampled = random.sample(set(processes_called), nb_2_select)
sampled_str = []
for s in sampled:
sampled_str.append(str(s))
for e in edges_create_cycles:
if(e[0] in sampled_str and e[1] in sampled_str):
#So that means there are the 2 nodes which form the cycle edge in the relevant processes
#-> it means we need to regenerated relevant processes
searching = True
break
name_select = []
for p in sampled:
name_select.append(p.get_alias())
return name_select
else:
raise BioFlowInsightError("Trying to generate random relevant processes however option 'duplicate' is not activated.")
#Method that returns the order of execution for each executor
def get_order_execution_executors(self):
dico = {}
seen = {}
dico = self.get_workflow_main().get_order_execution_executors(dico, seen)
tab = []
def explore_dico(dico):
if(type(dico)!=dict):
None
else:
for val in dico:
tab.append(val)
explore_dico(dico[val])
explore_dico(dico)
return tab
def add_to_ternary_operation_dico(self, old, new):
self.ternary_operation_dico[new] = old
def add_map_element(self, old, new):
self.map_element_dico[new] = old
def put_back_old_ternary_operations(self, code, ternary_operation_dico):
for new in ternary_operation_dico:
old = ternary_operation_dico[new]
code = code.replace(new.strip(), old)
return code
def put_modified_operations_back(self, code, dico_operations):
searching = True
while(searching):
searching = False
for match in re.finditer(r"\.(\w+)_modified\s*\{\s*(¤[^¤]+¤)\s*\}", code):
operator = match.group(1)
inside = match.group(2)#Cause we want to remove the extras ...'''
code = code.replace(match.group(0), f".{operator} {{ {dico_operations[inside]} }}")
searching = True
break
return code
#TODO -> write tests for this method
#Function that rewrites the workflow code
#Rewriting everything in one file + simplifying the operations and calls to simplify the analysis
def simplify_workflow_code(self):
code = self.get_first_file().get_code()
#This tag is used as an identification to safely manipulate the string
tag = str(time.time())
#params_section = f"//PARAMS_SECTION_{tag}"
function_section = f"//FUNCTION_SECTION"
process_section = f"//PROCESS_SECTION"
subworkflow_section = f"//SUBWORKFLOW_SECTION"
ankers = function_section+ "\n"*3 + process_section+ "\n"*3 + subworkflow_section
#Place ankers
pos_start = 0
start_code_pattern = r"\#\!\s*\/usr\/bin\/env\s+nextflow"
for match in re.finditer(start_code_pattern, code):
pos_start = match.span(0)[1]+1
code = code[:pos_start]+ankers+code[pos_start:]
#Remove the includes
for match in re.finditer(constant.FULL_INLCUDE_2, code):
full_include = match.group(0)
for temp in re.finditer(fr"{re.escape(full_include)} *addParams\(", code):
raise BioFlowInsightError("There is an 'addParams' in an include. BioFlow-Insight doesn not how to rewrite this.")
code = re.sub(fr"{re.escape(full_include)}.*", "", code)
processes, subworkflows, functions = [], [], []
for c in self.get_workflow_main().get_all_calls_in_workflow():
ele = c.get_first_element_called()
if(ele.get_type()=="Process"):
processes.append(ele)
elif(ele.get_type()=="Subworkflow"):
subworkflows.append(ele)
elif(ele.get_type()=="Function"):
functions.append(ele)
else:
raise Exception("This shoudn't happen")
#Get calls to functions made outside of themain which might have been imported -> so we need to add them
for c in self.get_first_file().get_calls_made_outside_of_main():
ele = c.get_first_element_called()
if(ele.get_type()=="Function"):
functions.append(ele)
else:
raise Exception("This shoudn't happen -> either a call to a process or subworkflow outside of main or subworkflow")
#Simplifying main
code = code.replace(self.get_workflow_main().get_code(get_OG = True), self.get_workflow_main().simplify_code())
#Adding processes into code
for p in processes:
if(p.get_code_with_alias_and_id() not in code):
code = code.replace(process_section, '\n'+p.simplify_code()+'\n'+process_section)
#Adding subworkflows into code
for sub in subworkflows:
if(sub.get_code_with_alias_and_id() not in code):
code = code.replace(subworkflow_section, subworkflow_section+'\n'+sub.simplify_code()+'\n')
#Adding functions into code
for fun in functions:
if(fun.get_code() not in code):
code = code.replace(function_section, function_section+'\n'+fun.get_code()+'\n')
#Remove the ankers
#code = code.replace(function_section, "")
#code = code.replace(process_section, "")
#code = code.replace(subworkflow_section, "")
ankers = {"function_section":function_section,
"process_section":process_section,
"subworkflow_section":subworkflow_section}
return code
def get_subworkflows_called(self):
subs = []
for c in self.get_workflow_main().get_all_calls_in_workflow():
ele = c.get_first_element_called()
if(ele.get_type()=="Subworkflow"):
subs.append(ele)
return subs
def get_processes_called(self):
subs = []
for c in self.get_workflow_main().get_all_calls_in_workflow():
ele = c.get_first_element_called()
if(ele.get_type()=="Process"):
subs.append(ele)
return subs
def rewrite_and_initialise(self, code, processes_2_remove, render_graphs, def_check_the_same = True):
temp_process_dependency_graph = self.graph.get_process_dependency_graph()
temp_spec_graph = self.graph.full_dico
#Remove the "_GG_\d+"
#code = re.sub(r"_GG_\d+", "", code)
#Write new code in temporary file
temp_file = self.get_output_dir()/f"temp_{str(self)[-7:-2]}.nf"
with open(temp_file, "w") as file:
file.write(code)
f = open(self.get_output_dir()/ "debug" / "rewritten.nf", "w")
f.write(code)
f.close()
#Replace old analysis with new analysis (simplified code)
self.__init__(str(temp_file), display_info = False, duplicate=True, processes_2_remove=processes_2_remove)
self.initialise()
os.remove(temp_file)
self.graph.initialise(processes_2_remove = self.processes_2_remove)
if(def_check_the_same and not self.graph.check_if_process_dependendy_is_equivalent_to_other_without_subworkflows(temp_process_dependency_graph)):
if(render_graphs==True):
#generate_graph(self.get_output_dir()/ "debug" /"spec_graph_OG", temp_spec_graph, render_graphs = True)
generate_graph(self.get_output_dir()/ "debug" /"spec_graph", self.graph.full_dico, render_graphs = True)
#generate_graph(self.get_output_dir()/ "debug" /"process_dependency_graph_OG", temp_process_dependency_graph, render_graphs = True)
generate_graph(self.get_output_dir()/ "debug" /"process_dependency_graph", self.graph.get_process_dependency_graph() , render_graphs = True)
raise Exception("Something went wrong: The flat dependency graph is not the same!")
def check_relevant_processes_in_workflow(self, relevant_processes):
#Check all relevat processes are in wf
workflow_processes = {}
for c in self.get_workflow_main().get_all_calls_in_workflow():
ele = c.get_first_element_called()
if(ele.get_type()=="Process"):
short_name = ele.get_alias().split("_GG_")[0]
try:
temp = workflow_processes[short_name]
except:
workflow_processes[short_name] = []
workflow_processes[short_name].append(ele.get_alias())
temporary_relevant = []
for p in relevant_processes:
if(p not in workflow_processes):
raise BioFlowInsightError(f"The element '{p}' given as a relevant processes is not present in the workflow's processes", 24)
temporary_relevant+=workflow_processes[p]
relevant_processes = temporary_relevant
return relevant_processes
def generate_user_view(self, relevant_processes = [], render_graphs = True, processes_2_remove = []):
self.graph.initialise(processes_2_remove = processes_2_remove)
self.graph.generate_user_view(relevant_processes = relevant_processes, render_graphs = render_graphs)
#I do not recommand that the dev uses the same name for the channels inside and outside the channels
#Since the local channels become local at the upper level
def rewrite_subworkflow_call(self, code, subworklfow):
#Remove the defintion from the code
code = code.replace(subworklfow.get_code(get_OG = True), "")
OG_call = subworklfow.get_call()
OG_body = subworklfow.get_work()
#REPLACE HEADER TAKES
subworkflow_takes = subworklfow.get_takes()
parameters = OG_call.get_parameters()
if(len(subworkflow_takes)!=len(parameters)):
raise Exception("This shouldn't happen -> the same number of parameters should be kept")
#This is to replace the paramters and the takes
new_header = ""
for i in range(len(parameters)):
param = parameters[i]
takes = subworkflow_takes[i].get_gives()[0]
#Here we're checking that the input inside and outside the subworkflow are the same
if(takes.get_code()!=param.get_code(get_OG = True)):
new_header+=f"{takes.get_code()} = {param.get_code(get_OG = True)}\n"
temp_code = code
code = code.replace(OG_call.get_code(get_OG = True), f"{new_header}\n\n{OG_body}", 1)
if(temp_code==code):
raise Exception("Something went wrong: The code hasn't changed")
#REPLACE THE EMITS
#TODO admittedly this code below is very moche -> but it's functionnal -> update it
emits = subworklfow.get_emit()
to_replace = []
all_executors = self.get_workflow_main().get_all_executors_in_workflow()
for exe in all_executors:
#We don't need to check the case call since the workflow has already been rewriteen -> emits only appear in operations
if(exe.get_type()=="Operation"):
added = False
new = exe.get_code(get_OG = True)
for emited in exe.get_origins():
if(emited.get_type()=="Emitted"):
if(emited.get_emitted_by().get_first_element_called()==subworklfow):
if(emited.get_emits() not in emits):
raise Exception("This shoudn't happen -> since it is the actual subworkflow")
new = new.replace(emited.get_code(), emited.get_emits().get_origins()[0].get_code())
added = True
#to_replace.append((exe.get_code(get_OG = True), f"{exe.get_gives()[0].get_code()} = {emited.get_emits().get_origins()[0].get_code()}"))
if(added):
to_replace.append((exe.get_code(get_OG = True), new))
#This dictionnary is used to check if the replacement has already been done (in the case of dupliactes in new)
dico_replace = {}
for r in to_replace:
old, new = r
need_to_replace = True
try:
t = dico_replace[old]
if(t==new):
need_to_replace = False
else:
raise Exception("This shouldn't happen")
except:
dico_replace[old]= new
if(need_to_replace):
temp_code = code
#Case of channel = channel
if(new.find("=")!=-1):
if(new.split("=")[0].strip()==new.split("=")[1].strip()):
new = ''
#code = code.replace(old, new)
code = replace_group1(code, fr"({re.escape(old)})[^\w]", new)
if(temp_code==code):
#print(code)
#print("old", f'"{old}"')
#print("new", f'"{new}"')
raise Exception("Something went wrong: The code hasn't changed")
return code
#This Function returns the channels on which the subworkflow (things_added_in_cluster) depend on
def get_takes(self, things_added_in_cluster):
#Basiccaly this is a deco of channels to opeartions -> when the value is an empty list
#This means that the channel is totally definied in the subworkflow -> so we are searching for
#Channels which aren't totatly defined in the subworkflow
channels_2_sources = {}
for ele in things_added_in_cluster:
if(ele.get_type() == "Operation"):
for o in ele.get_origins():
if(o.get_type() in ["Channel", "Emitted"]):
channels_2_sources[o] = replace_thing_by_call(o.get_source())
else:
if(o.get_first_element_called().get_type()=="Function"):
None
else:
raise Exception("This shouldn't happen")
elif(ele.get_type() == "Call"):
for param in ele.get_parameters():
if(param.get_type()=="Channel"):
raise Exception("This shouldn't happen -> with the rewrite all the params should be channels")
else:
for o in param.get_origins():
if(o.get_type()=="Channel"):
channels_2_sources[o] = replace_thing_by_call(o.get_source())
else:
raise Exception("This shouldn't happen -> with the rewrite all the params should be channels")
else:
raise Exception("This shouldn't happen")
takes = []
names_added = []
for channel in channels_2_sources:
if(set(channels_2_sources[channel]).intersection(things_added_in_cluster)!=set(channels_2_sources[channel])):
if(channel.get_name() not in names_added):
takes.append(channel)
names_added.append(channel.get_name())
return takes
#This Function returns the channels the subworkflow (things_added_in_cluster) emits (other things depend on)
def get_emits(self, things_added_in_cluster):
channel_2_sink = {}
#Basiccaly this is a deco of channels to opea -> this doesn't really work yetrtions -> when the value is an empty list
#This means that the channel is totally definied in the subworkflow -> so we are searching for
#Channels which aren't totatly defined in the subworkflow
#This means that things outside the subworkflow depend on this channel
channel_2_sink = {}
for ele in things_added_in_cluster:
if(ele.get_type() == "Operation"):
for o in ele.get_gives():
channel_2_sink[o] = replace_thing_by_call(o.get_sink())
elif(ele.get_type() == "Call"):
#thing = ele.get_first_element_called()
for e in ele.get_later_emits():
channel_2_sink[e] = replace_thing_by_call(e.get_sink())
else:
raise Exception("This shouldn't happen")
emits = []
names_added = []
for channel in channel_2_sink:
if(set(channel_2_sink[channel]).intersection(things_added_in_cluster)!=set(channel_2_sink[channel])):
if(channel.get_name() not in names_added):
emits.append(channel)
names_added.append(channel.get_name())
return emits
def remove_GG_from_code(self, code):
def replacer(match):
return match.group(1)
return re.sub(f"(\w+)_GG_\d+", replacer, code)
#Method which rewrites the workflow follwong the user view
#Conert workflow to user_view only makes sense when the option duplicate is activated -> otherwise is doesn't make sense + it makes the analysis way more complicated
def convert_workflow_2_user_view(self, relevant_processes = [], render_graphs = True):
self.iniatilise_tab_processes_2_remove()
self.graph.initialise(processes_2_remove = self.processes_2_remove)
def get_object(address):
address = int(re.findall(r"\dx\w+", address)[0], base=16)
return ctypes.cast(address, ctypes.py_object).value
#Check that there are no cycles which will break the creation of the user view:
edges_create_cycles = self.graph.get_edges_that_create_cycle()
edges_create_cycles_objects = []
for e in edges_create_cycles:
edges_create_cycles_objects.append((get_object(e[0]), get_object(e[1])))
for e in edges_create_cycles_objects:
n1 = e[0].get_alias()
n2 = e[1].get_alias()
if(n1 in relevant_processes and n2 in relevant_processes):
raise BioFlowInsightError(f"The processes '{n1}' and '{n2}' cannot both be relevant processes since there is a dependency apparant in the workflow between the 2")
ternary_operation_dico = self.ternary_operation_dico
map_element_dico = self.map_element_dico
if(self.duplicate):
#First check if there are any duplicate operations
#That method is in the "get_order_execution_executors" method -> so we just run that first
self.get_order_execution_executors()
if(self.get_DSL()=="DSL1"):
code = self.convert_to_DSL2()
self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=render_graphs, def_check_the_same = False)
if(self.get_DSL()=="DSL2"):
code = self.simplify_workflow_code()
self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=render_graphs)
#DETERMING WHICH SUBWORKFLOWS ARE BROKEN WITH THE CLUSTER
def get_clusters_with_calls(clusters):
#Creating the clusters with calls instead of processes or subworkflows
set_clusters_with_calls = []
for c in clusters:
tab = []
for ele in c:
if(ele.get_type()=="Operation"):
if(ele.get_artificial_status()==False):
tab.append(ele)
else:
call = ele.get_call()
tab.append(call)
set_clusters_with_calls.append(set(tab))
return set_clusters_with_calls
#Getting subworkflows to executors
def get_subworkflow_2_executors():
subworkflow_2_executors = {}
for sub in self.get_subworkflows_called():
executors = sub.get_all_executors_in_workflow()
subworkflow_2_executors[sub] = []
for ele in executors:
#Cannot add calls to subworkflows -> they are already present by definition
if(ele.get_type()=="Call" and ele.get_first_element_called().get_type()=="Subworkflow"):
None
#We don't add it
else:
subworkflow_2_executors[sub].append(ele)
return subworkflow_2_executors
#subworkflow_2_executors[sub.get_name()] = set(list(dico.keys()))
#TODO -> write tests to test this function
def get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls):
broken_subworkflows = []
for sub in subworkflow_2_executors:
#You want this list (set) to be equal to subworkflow_2_executors[sub]
elements_in_sub_with_clusters = []
for cluster in set_clusters_with_calls:
if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])):
break
for c in cluster:
if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])):
break
if(c in subworkflow_2_executors[sub]):
elements_in_sub_with_clusters+=list(cluster)
break
if(set(elements_in_sub_with_clusters)==set(subworkflow_2_executors[sub])):
None
#This means that the subworkflow is Intact
else:
#This means that the subworkflow is broken
broken_subworkflows.append(sub)
return broken_subworkflows
#Get the clusters and the code
relevant_processes = self.check_relevant_processes_in_workflow(relevant_processes)
self.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = [], render_graphs=render_graphs)
clusters = self.graph.get_clusters_from_user_view()
broken_subworkflows = get_workflows_broken(get_subworkflow_2_executors(), get_clusters_with_calls(clusters))
#While there still are broken workflows -> need to redo the analysis
while(len(broken_subworkflows)>0):
#Rewrite broken subworkflows
sub = broken_subworkflows[0]
code = self.rewrite_subworkflow_call(code, sub)
self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=render_graphs)
#Get the clusters and the code
#TODO -> remove the generate all_graphs -> it is not necessary
if(render_graphs):
self.generate_all_graphs()
self.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = [], render_graphs=render_graphs)
clusters = self.graph.get_clusters_from_user_view()
broken_subworkflows = get_workflows_broken(get_subworkflow_2_executors(), get_clusters_with_calls(clusters))
#Get the clsuters with the corresponding operations inside
#for i in range(len(clusters)):
# c = clusters[i]
# if(len(c)>1):
# clusters[i] = self.nextflow_file.graph.get_induced_subgraph(c)
#Get the topological order
executors_in_order = self.get_order_execution_executors()
new_clusters = []
for cluster in clusters:
tab = []
for e in executors_in_order:
if(e in cluster):
tab.append(e)
new_clusters.append(tab)
clusters = new_clusters
#This function returns the last executor in the clusters
#This is used to place the anker
def get_last_executor_in_cluster(executors_in_order, clusters):
dico = {}
for cluster in clusters:
for ele in cluster:
dico[ele] = executors_in_order.index(ele)
for ele in {k: v for k, v in sorted(dico.items(), key=lambda item: item[1], reverse=True)}:
return ele
#Replace the last executor in the clusters by the cluster anker
last_executor_in_cluster = get_last_executor_in_cluster(executors_in_order, clusters)
if(last_executor_in_cluster.get_type()=="Process"):
call = last_executor_in_cluster.get_call()
code = code.replace(call.get_code(get_OG = True), "\n//Anker_clusters\n")
elif(last_executor_in_cluster.get_type()=="Operation"):
if(not last_executor_in_cluster.get_artificial_status()):
code = code.replace(last_executor_in_cluster.get_code(get_OG = True), "\n//Anker_clusters\n", 1)
else:
raise Exception("This shoudn't happen")
else:
raise Exception("This shoudn't happen")
#Removing elements from clusters from the code
for cluster in clusters:
for ele in cluster:
if(ele.get_type()=="Process"):
call = ele.get_call()
code = code.replace(call.get_code(get_OG = True), "")
elif(ele.get_type()=="Operation"):
if(not ele.get_artificial_status()):
code = code.replace(ele.get_code(get_OG = True), "", 1)
else:
raise Exception("This shoudn't happen")
else:
raise Exception("This shoudn't happen")
#Remove the empty conditions left in the code
code = remove_empty_conditions_place_anker(code, self)
#Add the subworkflow defintions
#-------------------------------------
#Adding the anker
subworkflow_section = f"//ANKER 4 SUBWORKFLOW DEF"
to_replace = ""
for match in re.finditer(r"workflow\s+\w*\s*\{", code):
to_replace = match.group(0)
break
if(to_replace==""):
for match in re.finditer(r"workflow\s*\{", code):
to_replace = match.group(0)
break
if(to_replace==""):
raise Exception("No call to a workflow")
code = code.replace(to_replace, f"{subworkflow_section}\n\n{to_replace}")
#Creating the subworkflows from clusters
calls_in_operations = []
non_relevant_name = 1
subworkflow_clusters_to_add, subworkflow_cluster_calls_to_add = [], []
index_cluster = len(clusters)
#We replace the last clusters first -> this is cause the outputs of the last clusters aren't used anywhere else in the workflow by definition
for elements in list(reversed(clusters)):
channels_to_replace_outside_of_cluster = []
#Check that there is at least one process in cluster
at_least_one_process = False
for e in elements:
if(e.get_type()=="Process"):
at_least_one_process = True
#Only create the subworkflows for clusters with onr more elements (and that element in a process)
processes_added = []
things_added_in_cluster = []
if(len(elements)>=1 and at_least_one_process):
name, body, take, emit = "", "", "", ""
first_element = True
for ele in elements:
if(ele.get_type()=="Process"):
#Determine the name of the created subworkflow cluster
if(ele.get_alias() in relevant_processes):
name = f"cluster_{ele.get_alias()}"
#Get the call of thing (either process or subworkflow)
call = ele.get_call()
processes_added.append(call.get_first_element_called())
values = []
for condition in call.get_all_conditions():
values.append(condition.get_value())
printed_condition = " && ".join(values)
if(printed_condition!=""):
body+=f"if({printed_condition}) {{\n{call.get_code()}\n}}\n"
else:
body+=f"\n{call.get_code()}\n"
things_added_in_cluster.append(call)
#Below
elif(ele.get_type()=="Operation"):
#TODO -> check this verification there might be some "effet de bord"
if(not ele.get_artificial_status()):
#Ignore these cases
#TODO -> you should be able to remove this
if(ele.get_code()[:4] not in ["emit", "take"]):
origins = ele.get_origins()
for o in origins:
if(o.get_type()=="Call"):
if(o.get_first_element_called().get_type()!="Function"):
calls_in_operations.append(o)
values = []
for condition in ele.get_all_conditions():
values.append(condition.get_value())
printed_condition = " && ".join(values)
if(printed_condition!=""):
body+=f"if({printed_condition}) {{\n{ele.get_code(get_OG = True)}\n}}\n"
else:
body+=f"\n{ele.get_code(get_OG = True)}\n"
things_added_in_cluster.append(ele)
else:
raise Exception("This shoudn't happen")
#TODO check this part of the code is never seen
#Here we removing the Call_12313 thing
for call in calls_in_operations:
raise Exception("This shouldn't happen since the workflows has been rewritten")
body = body.replace(call.get_code(), "")
body = body.replace(str(call), call.get_code())
#If the name=="" -> it means there isn't any relevant processes in the cluster -> it means it's a cluster of non relevant nodes
if(name==""):
#If there are processes called we are going to use them
if(len(processes_added)>0):
#TODO find a better naming system
name = f"non_relevant_cluster_{processes_added[0].get_alias()}"
else:
#TODO find a better naming system
name = f"non_relevant_cluster_{non_relevant_name}"
non_relevant_name+=1
#Check that there is a single condtion in the body
body = group_together_ifs(body)
conditions_in_subworkflow = []
temp_body = body
conditions_in_subworkflow_2 = extract_conditions(temp_body, only_get_inside = False)
if(len(conditions_in_subworkflow_2)==1):
for condition in conditions_in_subworkflow_2:
start, end = conditions_in_subworkflow_2[condition]
temp_body = temp_body.replace(temp_body[start: end], "")
#This means that there is only one codnition with all the executors in the condition
if(temp_body.strip()==""):
conditions_in_subworkflow = extract_conditions(body)
for condition in conditions_in_subworkflow:
start, end = conditions_in_subworkflow[condition]
body = body[start: end-1].strip()
conditions_in_subworkflow = list(conditions_in_subworkflow.keys())
#TAKE
#Adding take parameters on the inside of the subworkflow
takes_param = self.get_takes(things_added_in_cluster)
new_param_names, index, old_param_names = [], 1, []
for param in takes_param:
param_name = f"param_{name}_{index}"
#Here if the input is a channel -> we keep the same name for readibility
#It also solves a bug described on the 18/02/2025
if(param.get_type()!='Channel'):
new_param_names.append(param_name)
old_param_names.append(param.get_code())
else:
new_param_names.append(param.get_code())
old_param_names.append(param.get_code())
index += 1
if(len(new_param_names)>0):
temp = '\n'.join(new_param_names)
take = f"\ntake:\n{temp}\n"
#EMIT
#Adding the emitted outputs
emitted_outputs = self.get_emits(things_added_in_cluster)
new_output_names, index, old_output_names = [], 0, []
for output in emitted_outputs:
output_name = f"{name}.out[{index}]"
new_output_names.append(output_name)
old_output_names.append(output.get_code())
index += 1
if(len(old_output_names)>0):
temp = '\n'.join(old_output_names)
emit = f"\nemit:\n{temp}\n"
#Adding empty channels if it doesn't exist in the case of a negative condition
body = get_channels_to_add_in_false_conditions(body, old_output_names)
#Replace names inside subworkflow
subworkflow_code = f"workflow {name} {{\n{take}\nmain:\n{body}\n{emit}\n}}"
#We do this so that the longest thing are rewritten first in the code -> to avoid problems
takes_param_2_length = {}
takes_param_2_new_param_names = {}
for i in range(len(new_param_names)):
takes_param_2_new_param_names[takes_param[i].get_code()] = new_param_names[i]
takes_param_2_length[takes_param[i].get_code()] = len(takes_param[i].get_code())
sorted_takes_param_2_length = {k: v for k, v in sorted(takes_param_2_length.items(), key=lambda item: item[1], reverse=True)}
for take_param in sorted_takes_param_2_length:
new_param = takes_param_2_new_param_names[take_param]
if(take_param != new_param):
#pattern = fr"[\=\,\(] *({re.escape(takes_param[i].get_code())})[\s\,\)\.]"
pattern = fr"({re.escape(take_param)})[\s\,\)\.]"
temp = subworkflow_code
subworkflow_code = replace_group1(subworkflow_code, pattern, new_param)
if(temp==subworkflow_code):
print(take_param, new_param)
print(pattern)
print(f'"{subworkflow_code}"')
raise Exception("Something went wrong -> cause the paramter wasn't updated")
#subworkflow_code = subworkflow_code.replace(takes_param[i].get_code(), new_param_names[i])
#TODO -> added verification of conditions
params = ", ".join(old_param_names)
subworkfow_call_case_true = f"{name}({params})"
subworkfow_call_case_false = ""
for i in range(len(new_output_names)):
#In the case of channels, we just add chanel = subworkflow.out[i]
if(not bool(re.findall("\.\s*out", old_output_names[i]))):
subworkfow_call_case_true+=f"\n{old_output_names[i]} = {new_output_names[i]}"
subworkfow_call_case_false+=f"\n{old_output_names[i]} = Channel.empty()"
#In the case of emitted values we need to replace the code on the outside
else:
param_out_name= f"{name}_out_{i+1}"
subworkfow_call_case_true+=f"\n{param_out_name} = {new_output_names[i]}"
subworkfow_call_case_false+=f"\n{param_out_name} = Channel.empty()"
channels_to_replace_outside_of_cluster.append((old_output_names[i], param_out_name))
#If there was only one single condition in the subworkflow cluster -> then we add it when the call is done
if(len(conditions_in_subworkflow)==1):
subworkfow_call = f"if({conditions_in_subworkflow[0].split('$$__$$')[0]}) {{\n{subworkfow_call_case_true}\n}} else {{\n{subworkfow_call_case_false}\n}}"
else:
subworkfow_call = subworkfow_call_case_true
##TODO -> added verification of conditions
#params = ", ".join(old_param_names)
#subworkfow_call_case_true = f"{name}({params})"
#for i in range(len(new_output_names)):
# #In the case of channels, we just add chanel = subworkflow.out[i]
# if(not bool(re.findall("\.\s*out", old_output_names[i]))):
# subworkfow_call_case_true+=f"\n{old_output_names[i]} = {new_output_names[i]}"
# #In the case of emitted values we need to replace the code on the outside
# else:
# param_out_name= f"{name}_out_{i+1}"
# subworkfow_call_case_true+=f"\n{param_out_name} = {new_output_names[i]}"
# channels_to_replace_outside_of_cluster.append((old_output_names[i], param_out_name))
##If there was only one single condition in the subworkflow cluster -> then we add it when the call is done
#if(len(conditions_in_subworkflow)==1):
# subworkfow_call = f"if({conditions_in_subworkflow[0].split('$$__$$')[0]}) {{\n{subworkfow_call_case_true}\n}}"
#else:
# subworkfow_call = subworkfow_call_case_true
#subworkflow_clusters_to_add.append(subworkflow_code)
#subworkflow_cluster_calls_to_add.append(subworkfow_call)
#Add the subworkflow call
new_code = f"//Anker_clusters\n\n//Cluster_{index_cluster}\n{subworkfow_call}\n"
code = code.replace("//Anker_clusters", new_code)
for old, new in channels_to_replace_outside_of_cluster:
pattern= fr"[ \(,]({re.escape(old)})[^\w]"
code = replace_group1(code, pattern, new)
#code = code.replace(old, new)
#Add the subworkflow defintions
#-------------------------------------
code = code.replace(f'{subworkflow_section}', f"{subworkflow_code}\n\n{subworkflow_section}")
else:
body = ""
for ele in elements:
values = []
for condition in ele.get_all_conditions():
values.append(condition.get_value())
printed_condition = " && ".join(values)
if(printed_condition!=""):
body+=f"if({printed_condition}) {{\n{ele.get_code(get_OG = True)}\n}}\n"
else:
body+=f"\n{ele.get_code(get_OG = True)}\n"
new_code = f"//Anker_clusters\n\n//Cluster_{index_cluster}\n{body}\n"
code = code.replace("//Anker_clusters", new_code)
index_cluster-=1
#Putting || back
code = self.put_back_old_ternary_operations(code, ternary_operation_dico)
code = code.replace("$OR$", "||")
code = self.put_modified_operations_back(code, map_element_dico)
code = remove_extra_jumps(format_with_tabs(code))
#code = self.remove_GG_from_code(code)
f = open(self.get_output_dir()/ "debug" / "rewritten.nf", "w")
f.write(code)
f.close()
self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=render_graphs)
return code
#return code
#
##So basically when retriving a thing (process or subworkflow)
##There is necessarily one call associated with the thing -> since we have the option duplicate activated
##You can check if len(thing.call)==1 -> and if not raise an error (but this shouldn't happen)
else:
raise BioFlowInsightError("Trying to convert the workflow with user view however option 'duplicate' is not activated. -> to reformulated")