diff --git a/src/bioflowinsighterror.py b/src/bioflowinsighterror.py index f7490de97df3fd62e36a47897b55796b10c6700a..d10e7656149010933c7fb5cbf0636d3ee187e7d8 100644 --- a/src/bioflowinsighterror.py +++ b/src/bioflowinsighterror.py @@ -9,12 +9,15 @@ class BioFlowInsightError(Exception): origin: A "Nextflow Building Bloc" derived type object, from this the file address can be given to the user """ - def __init__(self, error, num, origin = None): + def __init__(self, error, num=None, origin = None): self.origin = origin - if(origin!=None): - super().__init__(f"[{num}] Error in the file '{self.origin.get_file_address()}': "+error) + if(num!=None): + if(origin!=None): + super().__init__(f"[{num}] Error in the file '{self.origin.get_file_address()}': "+error) + else: + super().__init__(f"[{num}] {error}") else: - super().__init__(f"[{num}] {error}") + super().__init__(f"{error}") #To handle the different type of errors; I'm gonna add numbers to the errors #Pair numbers if it's the users fault diff --git a/src/block.py b/src/block.py new file mode 100644 index 0000000000000000000000000000000000000000..7d50ec98e38e6d1031fcdefaa95549e7e8314e98 --- /dev/null +++ b/src/block.py @@ -0,0 +1,14 @@ + +from .root import Root +from .condition import Condition + +class Block(Root): + def __init__(self, code, origin, condition, modules_defined): + Root.__init__(self = self, code = code, origin = origin, modules_defined = modules_defined) + self.condition = Condition(origin=self, condition = condition) + + def initialise(self): + return super().initialise() + + def get_channels(self): + return self.channels+self.origin.get_channels() \ No newline at end of file diff --git a/src/call.py b/src/call.py index 65fe210edf79c3131a85a280ab35e360a1354692..931397680998bf75bca729f76c28241485eb5fa1 100644 --- a/src/call.py +++ b/src/call.py @@ -21,7 +21,7 @@ class Call(Executor): self.parameters = []#These are in the order self.OG_code = OG_code #It's important this is last - self.condition = Condition(self) + #self.condition = Condition(self) def __str__(self): return f"Call_{id(self)}" @@ -92,7 +92,6 @@ class Call(Executor): def get_elements_called(self, tab_input = [], first_call = True): tab = tab_input.copy() #if(first_call): - # print(tab) # if(tab!=[]): # raise Exception("herer") # tab = [] @@ -428,10 +427,16 @@ class Call(Executor): # if(process.get_number_times_called()>0): # temp = copy.copy(process) # temp.set_alias(f"{process.get_name()}_{process.get_number_times_called()}") + if(self.get_duplicate_status()): + process = copy.copy(process) self.first_element_called = process + self.origin.add_element_to_elements_being_called(process) #temp.incremente_number_times_called() if(process==None and subworkflow!=None and fun==None): + if(self.get_duplicate_status()): + subworkflow = copy.copy(subworkflow) self.first_element_called = subworkflow + self.origin.add_element_to_elements_being_called(subworkflow) if(process==None and subworkflow==None and fun!=None): self.first_element_called = fun if(process==None and subworkflow==None and fun==None): @@ -485,8 +490,7 @@ class Call(Executor): def initialise(self): self.analyse_call(self.get_code(clean_pipe = True)) self.write_summary() - self.first_element_called.set_call(self) - #self.add_call_count() + diff --git a/src/condition.py b/src/condition.py index bf9ed4cc4c588a554b2f0475b77bf63c90250380..a7de014c1f972527e41319863b9b4a8eaddf1b30 100644 --- a/src/condition.py +++ b/src/condition.py @@ -2,31 +2,31 @@ from .outils import extract_conditions class Condition: - def __init__(self, origin): + def __init__(self, origin, condition): self.origin = origin - self.conditions = [] - self.initialise() + self.conditions = condition + #self.initialise() - def get_conditions(self): - return self.conditions + #def get_conditions(self): + # return self.conditions - def initialise(self): - thing_defined = self.origin.get_code(get_OG=True) - code = self.origin.get_workflow_code() - #print(f"'{thing_defined}'") - #print(f"'{code}'") - - - conditions_dico = self.origin.get_file_conditions() - #conditions_dico_temp = extract_conditions(code) - #print(conditions_dico==conditions_dico_temp) - pos = code.find(thing_defined) - for c in conditions_dico: - condition_extend = conditions_dico[c] - if(condition_extend[0]<pos and pos<condition_extend[1]): - self.conditions.append(c) - #print(conditions_dico) - #print(thing_defined, self.conditions) - #print() + #def initialise(self): + # thing_defined = self.origin.get_code(get_OG=True) + # code = self.origin.get_workflow_code() + # #print(f"'{thing_defined}'") + # #print(f"'{code}'") + # + # + # conditions_dico = self.origin.get_file_conditions() + # #conditions_dico_temp = extract_conditions(code) + # #print(conditions_dico==conditions_dico_temp) + # pos = code.find(thing_defined) + # for c in conditions_dico: + # condition_extend = conditions_dico[c] + # if(condition_extend[0]<pos and pos<condition_extend[1]): + # self.conditions.append(c) + # #print(conditions_dico) + # #print(thing_defined, self.conditions) + # #print() diff --git a/src/emitted.py b/src/emitted.py index b58a8690a93786dca9c30b9b4ff4722a8d1e95a5..14acf2e004b506e1d8221e72702b46445cd0b299 100644 --- a/src/emitted.py +++ b/src/emitted.py @@ -69,12 +69,12 @@ class Emitted(Channel): def get_structure(self, dico, B): emits = self.get_emitted_by() - if(not emits.is_called(self)): - end = "in the file" - if(self.origin.get_type()=="Subworkflow"): - end = f"in the subworkflow '{self.origin.get_name()}'" - raise BioFlowInsightError(f"Tried to access the emit '{self.get_code()}' but the {emits.get_type()} '{emits.get_name()}' ({emits}) has not been called {end}.", num = 8, origin=self) - + #if(not emits.is_called(self)): + # end = "in the file" + # if(self.origin.get_type()=="Subworkflow"): + # end = f"in the subworkflow '{self.origin.get_name()}'" + # raise BioFlowInsightError(f"Tried to access the emit '{self.get_code()}' but the {emits.get_type()} '{emits.get_name()}' ({emits}) has not been called {end}.", num = 8, origin=self) + # #Case if the emit emits a process if(emits.get_type()=="Process"): diff --git a/src/executor.py b/src/executor.py index 28f5cff9c6a9de1c1f75faa7596fd41b894e211f..85250d6d2002f12d6a1f62d7d373d444c174b2cb 100644 --- a/src/executor.py +++ b/src/executor.py @@ -31,8 +31,7 @@ class Executor(Nextflow_Building_Blocks): def get_list_name_processes(self): return self.origin.get_list_name_processes() - def get_process_from_name(self, name): - return self.origin.get_process_from_name(name) + def get_subworkflow_from_name(self, name): return self.origin.get_subworkflow_from_name(name) @@ -47,7 +46,11 @@ class Executor(Nextflow_Building_Blocks): return self.origin.get_list_name_includes() def add_channel(self, channel): - self.origin.add_channel(channel) + self.origin.add_channel(channel) + + + def get_channels(self): + return self.origin.get_channels() def check_in_channels(self, channel): return self.origin.check_in_channels(channel) @@ -61,9 +64,8 @@ class Executor(Nextflow_Building_Blocks): def get_condition(self): return self.condition - - def get_file_address(self): - return self.origin.get_file_address() + def add_element_to_elements_being_called(self, element): + self.origin.add_element_to_elements_being_called(element) def get_code(self, get_OG=False): diff --git a/src/graph.py b/src/graph.py index 084b061185e3c163ba313360dc1d002d9a42aa6e..bce297469f066ec9506ceb9dc0a4e50804d30d0f 100644 --- a/src/graph.py +++ b/src/graph.py @@ -13,9 +13,9 @@ def get_object(address): return ctypes.cast(address, ctypes.py_object).value class Graph(): - def __init__(self, nextflow_file): - self.workflow = nextflow_file - self.full_dico = nextflow_file.get_structure() + def __init__(self, workflow): + self.workflow = workflow + self.full_dico = workflow.get_structure() with open(f"{self.get_output_dir()}/graphs/specification_graph.json", 'w') as output_file : json.dump(self.full_dico, output_file, indent=4) #This dico give for the nodes its sister nodes @@ -343,7 +343,6 @@ class Graph(): max_level = get_max_level(dico) for l in range(max_level+1): new_dico = get_graph_level_l(dico, l) - #print(new_dico) generate_graph(self.get_output_dir()/'graphs'/f"level_{l}", new_dico, label_edge=label_edge, label_node=label_node, render_graphs = render_graphs) #============================ @@ -621,14 +620,14 @@ class Graph(): if(dico['shortest_path']!=nx.shortest_path_length(G_DAG, source=source_node, target=sink_node)): raise Exception(f"{dico['shortest_path']}, {nx.shortest_path_length(G_DAG, source=source_node, target=sink_node)}") - #print("test1") + if(dico['longest_path']+1!=len(nx.dag_longest_path(G_DAG))): raise Exception(f"{dico['longest_path']}, {len(nx.dag_longest_path(G_DAG))}") - #print("test2") + #if(len(list(nx.all_simple_paths(G_DAG, source=source_node, target=sink_node)))!=dico['number_of_paths_source_2_sink']): # raise Exception(f"{len(list(nx.all_simple_paths(G_DAG, source=source_node, target=sink_node)))}, {dico['number_of_paths_source_2_sink']}") - #print("test3")""" + """ return dico diff --git a/src/include.py b/src/include.py index 0b2547d8fa7eb6fab516796430905a267aabb489..0bf8662072a5b32cb2bcbc3cc569d501ce332547 100644 --- a/src/include.py +++ b/src/include.py @@ -20,30 +20,19 @@ def clean_string(txt): return txt class Include(Nextflow_Building_Blocks): - def __init__(self, code, file, importing, origin): - self.origin = origin + def __init__(self, code, file, importing, nextflow_file): + self.nextflow_file_origin = nextflow_file self.importing = importing self.code = Code(code = code, origin = self) - self.file = None - self.address = file + self.nextflow_file = None self.define_file(file) - self.aliases = {} - self.defines = [] + self.defines = {} #self.initialise() - - def get_aliases(self): - return self.aliases - def get_defines(self): - return self.defines - - def get_file(self): - return self.file - - def get_address(self): - return self.address - + def get_duplicate_status(self): + return self.nextflow_file_origin.get_duplicate_status() + def get_root_directory(self): return self.origin.get_root_directory() @@ -60,8 +49,8 @@ class Include(Nextflow_Building_Blocks): def define_file(self, file): from .nextflow_file import Nextflow_File address = clean_string(file) - root = self.origin.get_file_address() - root = '/'.join(root.split('/')[:-1]) + root = self.nextflow_file_origin.get_file_address() + root = '/'.join(str(root).split('/')[:-1]) found_file = False if(os.path.isfile(address)): @@ -85,14 +74,14 @@ class Include(Nextflow_Building_Blocks): found_file = True if(not found_file and os.path.isfile(address[:-3]+"/main.nf")): - self.file = Nextflow_File(address[:-3]+"/main.nf", origin=self) + self.nextflow_file = Nextflow_File(address[:-3]+"/main.nf", origin=self) #TODO -> check if the nextflow_file is defined somewhere else? #In the cas the nextflow file is imported multiple times else: if(os.path.isfile(address)): - self.file = Nextflow_File(address, origin=self) + self.nextflow_file = Nextflow_File(address, workflow = self.nextflow_file_origin.get_workflow()) else: address = os.path.normpath(address) raise BioFlowInsightError(f"Something went wrong in an include{self.get_string_line(self.get_code())}. No such file: '{address}'.", num = 10,origin=self) @@ -101,45 +90,36 @@ class Include(Nextflow_Building_Blocks): #If not duplicate -> we need to see if there is another include which has already defined the file #TODO -> if you wanna generalise this to all include (inbetween files -> you just need to update get_include() ) if(not self.get_duplicate_status()): - #other_includes = self.origin.get_all_includes() - other_includes = self.origin.get_includes() + other_includes = self.nextflow_file_origin.get_includes() for other in other_includes: - if(self.get_address()==other.get_address()): - self.file = other.get_file() + if(self.nextflow_file.get_file_address()==other.nextflow_file.get_file_address()): + self.nextflow_file = other.nextflow_file + + def initialise(self): - self.file.initialise() + self.nextflow_file.initialise() for include in self.importing: include = include.strip() found = False if(include!=''): if(re.fullmatch(constant.WORD, include)): - if(self.get_duplicate_status()): - self.defines.append(self.file.get_element_from_name(include)) - else: - self.aliases[include] = self.file.get_element_from_name(include) + self.defines[include] = self.nextflow_file.get_element_from_name(include) found = True else: pattern_as = constant.INCLUDE_AS for match in re.finditer(pattern_as, include): found = True if(self.get_duplicate_status()): - thing_as = copy.copy(self.file.get_element_from_name(match.group(1))) + thing_as = copy.copy(self.nextflow_file.get_element_from_name(match.group(1))) thing_as.set_alias(match.group(3)) - self.defines.append(thing_as) + self.defines[match.group(3)] = thing_as else: - #other_includes = self.origin.get_includes() - #added_from_other = False - #for other in other_includes: - # if(self.get_address()==other.get_address()): - # self.aliases[match.group(3)] = other.file.get_element_from_name(match.group(1)) - # added_from_other = True - #if(not added_from_other): - self.aliases[match.group(3)] = self.file.get_element_from_name(match.group(1)) + self.defines[match.group(3)] = self.nextflow_file.get_element_from_name(match.group(1)) if(not found): - raise Exception(f"I was not able to import '{include}' from {self.file.get_file_address()}") + raise Exception(f"I was not able to import '{include}' from {self.nextflow_file.get_file_address()}") diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000000000000000000000000000000000000..07b762e73ca98ef942eef91c311350909fd94e75 --- /dev/null +++ b/src/main.py @@ -0,0 +1,217 @@ +from .nextflow_building_blocks import Nextflow_Building_Blocks +from .bioflowinsighterror import BioFlowInsightError +from .root import Root +import re +from .outils import * + +from . import constant + + + +class Main(Nextflow_Building_Blocks): + def __init__(self, code, nextflow_file): + Nextflow_Building_Blocks.__init__(self, code) + self.nextflow_file = nextflow_file + self.initialised = False + self.root = None + + def get_modules_defined(self): + return self.nextflow_file.get_modules_defined() + + def get_duplicate_status(self): + return self.nextflow_file.get_duplicate_status() + + def get_DSL(self): + return self.nextflow_file.get_DSL() + + def get_file_address(self): + return self.nextflow_file.get_file_address() + + def get_all_executors(self, dico): + for e in self.get_executors(): + dico[e] = 1 + + for exe in self.get_executors(): + if(exe.get_type()=="Call"): + first = exe.get_first_element_called() + if(first.get_type()=="Subworkflow"): + first.get_all_executors(dico) + elif(exe.get_type()=="Operation"): + for o in exe.get_origins(): + if(o.get_type()=="Call"): + first = o.get_first_element_called() + if(first.get_type()=="Subworkflow"): + first.get_all_executors(dico) + else: + raise Exception("This shouldn't happen") + + + def get_output_dir(self): + return self.nextflow_file.get_output_dir() + + + def get_workflow_code(self): + return self.get_code() + + def get_file_conditions(self): + if(self.conditions==None): + self.conditions = extract_conditions(self.get_code()) + return self.conditions + + def get_type(self): + return "Main DSL2" + + + def is_initialised(self): + return self.initialised + + def get_all_called(self): + called = [] + for exe in self.get_executors(): + if(exe.get_type()=="Call"): + called+=exe.get_elements_called() + else: + for o in exe.get_origins(): + if(o.get_type()=="Call"): + called+=o.get_elements_called() + return called + + #def get_processes(self): + # return self.origin.get_processes()+super().get_processes() + + #def get_process_from_name(self, name): + # print("here") + # return self.origin.get_process_from_name(name) + + + def get_processes_called(self, defined = {}): + + for c in self.get_all_called(): + if(c.get_type()=="Process"): + defined[c] = [] + elif(c.get_type()=="Subworkflow"): + _ = c.get_processes_called(defined = defined) + + return list(defined.keys()) + + def get_subworkflows_called(self, defined = {}): + for c in self.get_all_called(): + if(c.get_type()=="Subworkflow"): + defined[c] = [] + _ = c.get_subworkflows_called(defined = defined) + return list(defined.keys()) + + def get_functions_called(self, defined = {}): + for c in self.get_all_called(): + if(c.get_type()=="Function"): + defined[c] = [] + elif(c.get_type()=="Subworkflow"): + _ = c.get_functions_called(defined = defined) + return list(defined.keys()) + + + def get_function_from_name(self, name): + return self.origin.get_function_from_name(name) + + def get_list_name_subworkflows(self): + return self.origin.get_list_name_subworkflows() + + def get_list_name_includes(self): + return self.origin.get_list_name_includes() + + + #def get_channel_from_name(self, name): + # channel_file = self.origin.get_channel_from_name(name) + # if(channel_file!=None): + # return channel_file + # return super().get_channel_from_name(name) + + + """def get_added_operations_structure(self): + return self.origin.get_added_operations_structure()""" + + #def check_in_channels(self, channel): + # found = super().check_in_channels(channel) + # if(not found): + # if(self.origin.get_type()=="Nextflow File"): + # return self.origin.check_in_channels(channel) + # else: + # raise Exception(f"The origin is a '{self.origin.get_type()}' it should be a 'Nextflow File'") + # return found + + + def get_subworkflow_from_name(self, name): + return self.origin.get_subworkflow_from_name(name) + + def check_includes(self): + code = self.get_code() + + pattern = constant.FULL_INCLUDE + for match in re.finditer(pattern, code): + if(self.get_type()=="Main DSL2"): + raise BioFlowInsightError(f"An include ('{match.group(0)}') was found in the main in the file '{self.get_file_address()}'. FlowInsight does not support this -> see specification list.", num = 12,origin=self) + elif(self.get_type()=="Subworkflow"): + raise BioFlowInsightError(f"An include ('{match.group(0)}') was found in the subworkflow '{self.get_name()}' in the file '{self.get_file_address()}'. FlowInsight does not support this -> see specification list.", num = 12, origin=self) + else: + raise Exception("This shouldn't happen!") + + + def initialise(self): + if(not self.initialised): + #print(self, self.get_all_processes()) + + self.initialised=True + + #Get the modules (Processes defined for the main/subworkflow) + self.modules_defined = self.nextflow_file.get_modules_defined() + + #Check that includes are not defined in the main or subworkflows + self.check_includes() + + self.root = Root(code=self.get_code(), origin=self, modules_defined=self.modules_defined) + + self.root.initialise() + + ##Extract Executors + #self.extract_executors() + # + ##Analyse Executors + #for e in self.executors: + # e.initialise() + + + + + """def add_channels_structure(self, dot): + return self.add_channels_structure_temp(dot, self.origin.get_added_operations_structure()) + """ + def get_origin(self): + return self.origin + + def check_same_origin(self, sub): + return self.get_origin()== sub.get_origin() + + ##Add "global" channels and operation to the structure defined in the file + #def get_structure_DSL2(self, dico): + # self.origin.get_structure_DSL2(dico) + + + def get_structure(self, dico): + + self.root.get_structure(dico) + return dico + + ##Add "global" channels and operation to the structure defined in the file + #self.get_structure_DSL2(dico) + # + # + #for e in self.executors: + # if(e.get_type()=="Operation"): + # e.get_structure(dico) + # elif(e.get_type()=="Call"): + # e.get_structure(dico) + # else: + # raise Exception(f"Executor of type '{e.get_type()}' was extracted in a DSL2 workflow! I don't know what this is! The code is '{e.get_code()}'") + + + \ No newline at end of file diff --git a/src/nextflow_building_blocks.py b/src/nextflow_building_blocks.py index b01127f6fb9c015f8cb6c1bb3420e8650a52d949..1488bf347cd01bf768fad073ae3db8e94aaf64ad 100644 --- a/src/nextflow_building_blocks.py +++ b/src/nextflow_building_blocks.py @@ -14,28 +14,22 @@ class Nextflow_Building_Blocks: def __init__(self, code): self.code = Code(code = code, origin = self) - self.processes = [] - self.channels = [] - #self.DSL = "" - #DSL2 - self.includes = [] - self.main = None - self.executors = [] - self.subworkflows = [] - self.functions=[] #--------------------------------- #AUXILIARY METHODS FOR ALL CLASSES #--------------------------------- - - def get_modules_defined(self): - return self.origin.get_modules_defined() def get_code(self, get_OG = False): return self.code.get_code(get_OG = get_OG) + def get_modules_defined(self): + return self.origin.get_modules_defined() + + def get_process_from_name(self, name): + return self.origin.get_process_from_name(name) + def get_origin(self): return self.origin @@ -60,28 +54,28 @@ class Nextflow_Building_Blocks: def get_name_processes_subworkflows(self): return self.origin.get_list_name_subworkflows()+self.origin.get_list_name_includes()+ self.origin.get_list_name_processes() - #Only used by the process or subworkflow - def is_called(self, called_from): - #if(self.get_type() in ["Subworkflow", "Process"]): - if(self.get_type() in ["Subworkflow"]): - - executors = called_from.origin.get_executors() - for exe in executors: - if(exe.get_type()=="Call"): - if(self in exe.get_elements_called()): - return True - #Case operation - else: - for o in exe.get_origins(): - if(o.get_type()=="Call"): - if(self in o.get_elements_called()): - return True - return False - - elif(self.get_type() in ["Process"]): - if(self.get_number_times_called()>=1): - return True - raise Exception("You can't do this!") + ##Only used by the process or subworkflow + #def is_called(self, called_from): + # #if(self.get_type() in ["Subworkflow", "Process"]): + # if(self.get_type() in ["Subworkflow"]): + # + # executors = called_from.origin.get_executors() + # for exe in executors: + # if(exe.get_type()=="Call"): + # if(self in exe.get_elements_called()): + # return True + # #Case operation + # else: + # for o in exe.get_origins(): + # if(o.get_type()=="Call"): + # if(self in o.get_elements_called()): + # return True + # return False + # + # elif(self.get_type() in ["Process"]): + # if(self.get_number_times_called()>=1): + # return True + # raise Exception("You can't do this!") def get_line(self, bit_of_code): return self.origin.get_line(bit_of_code) @@ -98,27 +92,13 @@ class Nextflow_Building_Blocks: def get_rocrate_key(self, dico): return f"{self.get_file_address()[len(dico['temp_directory'])+1:]}#{self.get_name()}" - def get_address(self): - return self.origin.get_address() + def get_file_address(self): + return self.origin.get_file_address() def get_workflow_address(self): return self.origin.get_workflow_address() - - #---------------------- - #PROCESSES - #---------------------- - def extract_processes(self): - from .process import Process - code = self.get_code() - #Find pattern - for match in re.finditer(constant.PROCESS_HEADER, code): - start = match.span(0)[0] - end = extract_curly(code, match.span(0)[1])#This function is defined in the functions file - p = Process(code=code[start:end], origin=self) - self.processes.append(p) - #def get_process_from_name(self, name): # for p in self.get_processes(): @@ -126,8 +106,8 @@ class Nextflow_Building_Blocks: # return p # return None - def get_channels(self): - return self.origin.get_channels() + #def get_channels(self): + # return self.origin.get_channels() #def get_processes(self): # return self.processes @@ -136,33 +116,32 @@ class Nextflow_Building_Blocks: return self.origin.get_workflow_code() def get_file_conditions(self): - #print(self) return self.origin.get_file_conditions() #---------------------- #CHANNELS #---------------------- - #Check if a channel given in parameters is already in channels - def check_in_channels(self, channel): - for c in self.channels: - if(c.equal(channel)): - return True - return False - - def get_channel_from_name(self, name): - for c in self.channels: - if(name == c.get_name()): - return c - #raise Exception(f"{name} is not in the list of channels") - return None - - #Method that adds channel into the lists of channels - def add_channel(self, channel): - if(not self.check_in_channels(channel)): - self.channels.append(channel) - else: - raise Exception("This shoudn't happen!") + ##Check if a channel given in parameters is already in channels + #def check_in_channels(self, channel): + # for c in self.channels: + # if(c.equal(channel)): + # return True + # return False + + #def get_channel_from_name(self, name): + # for c in self.channels: + # if(name == c.get_name()): + # return c + # #raise Exception(f"{name} is not in the list of channels") + # return None + + ##Method that adds channel into the lists of channels + #def add_channel(self, channel): + # if(not self.check_in_channels(channel)): + # self.channels.append(channel) + # else: + # raise Exception("This shoudn't happen!") """def add_channels_structure_temp(self, dico, added_operations): diff --git a/src/nextflow_file.py b/src/nextflow_file.py index 2871d1d63ed2762c1dc5e762731aedb77d61cefc..4e591eb85eb06bd9d70265ddbc1d1e64b5f8fb03 100644 --- a/src/nextflow_file.py +++ b/src/nextflow_file.py @@ -21,230 +21,34 @@ from .bioflowinsighterror import BioFlowInsightError class Nextflow_File(Nextflow_Building_Blocks): - def __init__(self, address, origin): - self.file = address - if(self.get_file_address().find('/')==-1): - raise BioFlowInsightError(f"BioFlow-Insight cannot directly analyse a workflow from its directory. Please analyse the workflow from the parent directory instead.", num = -1) - - contents = check_file_exists(self.get_file_address(), self) - Nextflow_Building_Blocks.__init__(self, contents) - - self.origin = origin - self.all_includes = [] - - from src.workflow import Workflow - self.first_file = type(origin)==Workflow - if(self.first_file==True): - self.origin.set_DSL(self.which_DSL()) - self.graph = None - self.added_2_rocrate = False - self.conditions=None - self.check_file_correctness() - self.do_start_stuff() - #self.extract_metadata() - self.check_file_correctness_after_DSL() - self.set_null() - - def set_new_code(self, code): - #self.DSL = self.get_DSL() - Nextflow_Building_Blocks.__init__(self, code) - - def get_address(self): - return self.get_file_address() - - def get_name_file(self): - name = self.get_file_address().split('/')[-1] - return name[:-3] - - def get_workflow_code(self): - return self.get_code() - - def get_file_conditions(self): - if(self.conditions==None): - self.conditions = extract_conditions(self.get_code()) - return self.conditions - - - def check_file_correctness(self): - code = self.get_code() - if(code.count("{")!=code.count("}")): - curly_count = get_curly_count(code) - if(curly_count!=0): - raise BioFlowInsightError(f"Not the same number of opening and closing curlies '{'{}'}' in the file.", num = 16,origin=self) - if(code.count("(")!=code.count(")")): - parenthese_count = get_parenthese_count(code) - if(parenthese_count!=0): - raise BioFlowInsightError(f"Not the same number of opening and closing parentheses '()' in the file.", num = 16, origin=self) - - if(code.count('"""')%2!=0): - raise BioFlowInsightError(f"An odd number of '\"\"\"' was found in the code.", num = 16, origin=self) - - #if(code.count("'''")!=code.count("'''")): - # raise BioFlowInsightError(f"Not the same number of ''' in the file '{self.get_file_address()}'") - # - #if(code.count('"""')!=code.count('"""')): - # raise BioFlowInsightError(f'Not the same number of """ in the file "{self.get_file_address()}"') - - #TODO -> finish function - def check_file_correctness_after_DSL(self): - if(self.first_file): - if(self.DSL=="DSL2"): - code = "\n"+self.get_code()+"\n" - found_main = False - for match in re.finditer(constant.WORKFLOW_HEADER_2, code): - found_main = True - if(not found_main): - raise BioFlowInsightError(f"No 'main' workflow was found.", num = 16, origin=self) - - - def get_processes_annotation(self): - if(self.first_file): - return self.origin.get_processes_annotation() - else: - if(self.origin==None): - return None - else: - return self.origin.get_processes_annotation() - - - def get_workflow_address(self): - if(self.origin==None): - return self.origin.get_workflow_directory() - else: - return self.origin.get_workflow_address() - - - - def set_name(self): - if self.first_file and self.origin.get_name() is None: - address = self.get_file_address() - self.origin.set_name(address.split('/')[-2]) - - - - def get_channels(self): - return self.channels - - def set_null(self): - self.processes = [] - self.channels = [] - self.functions = [] - #DSL2 + def __init__(self, address, workflow, first_file = False): + self.address = address + self.workflow = workflow + self.first_file = first_file + self.workflow.add_nextflow_file_2_workflow(self) self.includes = [] - self.main = None - self.executors = [] + self.processes = [] self.subworkflows = [] - self.already_added_structure = False - self.graph = None - self.all_includes = [] - self.added_2_rocrate = False - - def get_all_executors(self, dico): - return self.main.get_all_executors(dico) - - def extract_metadata(self): - - #When the start=="" it means it's the first analysis - if(self.first_file): - self.set_null() - self.set_name() - self.set_author() - dico_wf = {} - dico_wf["workflow name"] = self.origin.get_name() - dico_wf["date analysis"] = date.today().strftime("%m/%d/%y")#m/d/y - dico_wf["DSL"] = self.DSL - dico_wf["link"] = "TODO" - dico_wf["publish date"] = "TODO" - dico_wf["file given"] = self.get_file_address() - #dico_wf["processes"] = {} - - if(self.DSL=="DSL1"): - #self.extract_processes() - #dico_wf["processes"]["number defined"] = len(self.processes) - #dico_wf["processes"]["number used"] = len(self.processes) - None - - elif(self.DSL=="DSL2"): - dico_wf["number nextflow files from root"] = "TODO" - - ##Number of process defined - #root = '/'.join(self.get_file_address().split('/')[:-1]) - #nextflow_files = glob.glob(f'{root}/**/*.nf', recursive=True) - #number_defined=0 - # - #for file in nextflow_files: - # - # wf = Nextflow_File(file, DSL="DSL2") - # wf.extract_processes() - # number_defined+=wf.get_number_processes() - #dico_wf["processes"]["number defined"] = number_defined - # - ##Number of process used - processes_used = {} - with open(self.get_output_dir() / "debug" / "processes_used.json", "w") as outfile: - json.dump(processes_used, outfile, indent=4) - - else: - raise Exception(f"The workflow's DSL is '{self.DSL}' -> I don't know what this is!") - - with open(self.get_output_dir() / "general.json", "w") as outfile: - json.dump(dico_wf, outfile, indent=4) - - def get_type(self): - return "Nextflow File" - - - def get_line(self, bit_of_code): - return self.code.get_line(bit_of_code) - - def get_string_line(self, bit_of_code): - return self.code.get_string_line(bit_of_code) - - def do_start_stuff(self): - #Set the DSL - if(self.first_file): - os.makedirs(self.get_output_dir(), exist_ok=True) - os.makedirs(self.get_output_dir() / 'debug', exist_ok=True) - os.makedirs(self.get_output_dir() / 'graphs', exist_ok=True) - with open(self.get_output_dir() / "debug" / "operations.nf",'w') as file: - pass - with open(self.get_output_dir() / "debug" / "calls.nf",'w') as file: - pass - with open(self.get_output_dir() / "debug" / "operations_in_call.nf",'w') as file: - pass - - self.DSL = self.which_DSL() - self.set_null() - if(self.get_display_info()): - print(f"The workflow is written in '{self.get_DSL()}'") - + self.functions = [] + self.initialised = False + contents = check_file_exists(self.get_file_address(), self) + Nextflow_Building_Blocks.__init__(self, contents) #---------------------- #GENERAL #---------------------- + + #Method that returns the address of the file def get_file_address(self): - return os.path.normpath(self.file) - - - - #Returns either a subworkflow or process from the name - def get_element_from_name(self, name): - for process in self.processes: - if(name==process.get_name()): - return process - for subworkflow in self.subworkflows: - if(name==subworkflow.get_name()): - return subworkflow - for fun in self.functions: - if(name==fun.get_name()): - return fun - raise BioFlowInsightError(f"'{name}' is expected to be defined in the file, but it could not be found.", num = 18, origin=self) + return Path(os.path.normpath(self.address)) + def get_DSL(self): + return self.workflow.get_DSL() #Method which returns the DSL of the workflow -> by default it's DSL2 #I use the presence of include, subworkflows and into/from in processes as a proxy - def which_DSL(self): + def find_DSL(self): DSL = "DSL2" #If there are include pattern = constant.FULL_INLCUDE_2 @@ -263,129 +67,110 @@ class Nextflow_File(Nextflow_Building_Blocks): if(DSL=="DSL1"): return DSL return DSL + + def get_workflow(self): + return self.workflow + + def get_duplicate_status(self): + return self.workflow.get_duplicate_status() + + #Returns either a subworkflow or process from the name + def get_element_from_name(self, name): + for process in self.processes: + if(name==process.get_alias()): + return process + for subworkflow in self.subworkflows: + if(name==subworkflow.get_alias()): + return subworkflow + for fun in self.functions: + if(name==fun.get_alias()): + return fun + raise BioFlowInsightError(f"'{name}' is expected to be defined in the file, but it could not be found.", num = 18, origin=self) + + def get_modules_defined(self): + return self.get_processes()+self.get_subworkflows()+self.get_functions()+self.get_modules_included() + + def get_output_dir(self): + return self.workflow.get_output_dir() #---------------------- - #PROCESS + #PROCESSES #---------------------- - #def get_process_from_name(self, name): - # for process in self.processes: - # if(process.get_name()==name): - # return process - # if(self.get_duplicate_status()): - # for include in self.includes: - # defines = include.get_defines() - # for d in defines: - # if(d.get_alias()==name and d.get_type()=="Process"): - # return d - # else: - # for include in self.includes: - # aliases = include.get_aliases() - # for a in aliases: - # if(a==name and aliases[a].get_type()=="Process"): - # return aliases[a] - # - # return None - # raise Exception(f"Process '{name}' couldn't be found in '{self.get_file_address()}'") - - - def get_processes_defined(self, dict = {}): - processes = self.get_processes() - for p in processes: - dict[p] = [] - for include in self.includes: - _ = include.get_file().get_processes_defined(dict = dict) - return dict - + def extract_processes(self): + from .process import Process + code = self.get_code() + #Find pattern + for match in re.finditer(constant.PROCESS_HEADER, code): + start = match.span(0)[0] + end = extract_curly(code, match.span(0)[1])#This function is defined in the functions file + p = Process(code=code[start:end], nextflow_file=self) + self.processes.append(p) + def get_processes(self): return self.processes - def get_processes_called(self): - if(self.get_DSL()=="DSL1"): - return self.get_processes() - elif(self.get_DSL()=="DSL2"): - return self.main.get_processes_called(defined={}) - else: - raise Exception("This shouldn't happen!") - - def get_subworkflows_called(self): - if(self.get_DSL()=="DSL1"): - return [] - elif(self.get_DSL()=="DSL2"): - return self.main.get_subworkflows_called(defined={}) - else: - raise Exception("This shouldn't happen!") - - def get_functions_called(self): - if(self.get_DSL()=="DSL1"): - return self.functions - elif(self.get_DSL()=="DSL2"): - return self.main.get_functions_called(defined={}) - else: - raise Exception("This shouldn't happen!") - - + #---------------------- + #SUBWORKFLOW (ones found in the file) + #---------------------- + def extract_subworkflows(self): + from .subworkflow import Subworkflow + #Get code without comments + code = self.get_code() + #Find pattern + for match in re.finditer(constant.SUBWORKFLOW_HEADER, code): + start = match.span(0)[0] + end = extract_curly(code, match.span(0)[1])#This function is defined in the functions file + sub = Subworkflow(code=code[start:end], nextflow_file=self, name=match.group(1)) + self.subworkflows.append(sub) + def get_subworkflows(self): + return self.subworkflows #---------------------- #MAIN WORKFLOW #---------------------- #This method extracts the "main" workflow from the file def extract_main(self): - from .main_DSL2 import Main_DSL2 + from .main import Main #This returns the code without the comments code = "\n"+self.get_code()+"\n" #Find pattern twice = False for match in re.finditer(constant.WORKFLOW_HEADER_2, code): - start = match.span(1)[0] - end = extract_curly(code, match.span(1)[1])#This function is defined in the functions file - self.main = Main_DSL2(code= code[start:end], origin=self) - if(twice): - raise Exception(f"Found multiple 'main workflows' in {self.get_file_address()}") - twice = True + if(self.first_file): + start = match.span(1)[0] + end = extract_curly(code, match.span(1)[1])#This function is defined in the functions file + self.main = Main(code= code[start:end], nextflow_file=self) + if(twice): + #TODO turn into biofow insight error + raise Exception(f"Found multiple 'main workflows' in {self.get_file_address()}") + twice = True + else: + #TODO add num + BioFlowInsightError("A 'main' workflow was found in the Nextflow file") #---------------------- - #SUBWORKFLOW (ones found in the file) + #FUNCTIONS #---------------------- - def extract_subworkflows(self): - from .subworkflow import Subworkflow - #Get code without comments + + #Method that extracts the functions from a file -> we don't analyse them + #since they don't structurally change the workflow + def extract_functions(self): + from .function import Function + #pattern_function = r"(def|String|void|Void|byte|short|int|long|float|double|char|Boolean) *(\w+) *\([^,)]*(,[^,)]+)*\)\s*{" + pattern_function = constant.HEADER_FUNCTION code = self.get_code() #Find pattern - for match in re.finditer(constant.SUBWORKFLOW_HEADER, code): + for match in re.finditer(pattern_function, code): start = match.span(0)[0] end = extract_curly(code, match.span(0)[1])#This function is defined in the functions file - sub = Subworkflow(code=code[start:end], origin=self, name=match.group(1)) - self.subworkflows.append(sub) - - def get_list_name_subworkflows(self): - names = [] - for sub in self.subworkflows: - names.append(sub.get_name()) - return names - - def get_subworkflows(self): - return self.subworkflows + #f = Code(code=code[start:end], origin=self) + f = Function(code = code[start:end], name = match.group(2), origin =self) + self.functions.append(f) - def get_subworkflow_from_name(self, name): - for sub in self.subworkflows: - if(sub.get_name()==name): - return sub - if(self.get_duplicate_status()): - for include in self.includes: - defines = include.get_defines() - for d in defines: - if(d.get_alias()==name and d.get_type()=="Subworkflow"): - return d - else: - for include in self.includes: - aliases = include.get_aliases() - for a in aliases: - if(a==name and aliases[a].get_type()=="Subworkflow"): - return aliases[a] - return None - raise Exception(f"Subworkflow '{name}' couldn't be found in '{self.get_file_address()}'") + def get_functions(self): + return self.functions #---------------------- @@ -395,9 +180,6 @@ class Nextflow_File(Nextflow_Building_Blocks): from .include import Include code = self.get_code() - - #pattern = r"include +{([^\}]+)} +from +([^\n ]+)" - #pattern = r"include +({([^\}]+)}|(\w+)) +from +([^\n ]+)" pattern = constant.FULL_INLCUDE_2 for match in re.finditer(pattern, code): @@ -445,550 +227,67 @@ class Nextflow_File(Nextflow_Building_Blocks): #address = match.group(0).split('from')[1].strip() address = match.group(6).strip() if(address[1:].split('/')[0] not in ['plugin']): - include = Include(code =match.group(0), file = address, importing = includes, origin=self) + include = Include(code =match.group(0), file = address, importing = includes, nextflow_file=self) self.includes.append(include) - self.add_include_to_all_includes(include) - - - def get_list_name_includes(self): - names = [] - for include in self.includes: - names+=include.get_list_name_includes() - return names - - #---------------------- - #FUNCTIONS - #---------------------- - - #Method that extracts the functions from a file -> we don't analyse them - #since they don't structurally change the workflow - def extract_functions(self): - from .function import Function - #pattern_function = r"(def|String|void|Void|byte|short|int|long|float|double|char|Boolean) *(\w+) *\([^,)]*(,[^,)]+)*\)\s*{" - pattern_function = constant.HEADER_FUNCTION - code = self.get_code() - #Find pattern - for match in re.finditer(pattern_function, code): - start = match.span(0)[0] - end = extract_curly(code, match.span(0)[1])#This function is defined in the functions file - #f = Code(code=code[start:end], origin=self) - f = Function(code = code[start:end], name = match.group(2), origin =self) - self.functions.append(f) - - def get_function_from_name(self, name): - for fun in self.functions: - if(fun.get_name()==name): - return fun - - if(self.get_duplicate_status()): - for include in self.includes: - defines = include.get_defines() - for d in defines: - if(d.get_alias()==name and d.get_type()=="Function"): - return d - else: - for include in self.includes: - aliases = include.get_aliases() - for a in aliases: - if(a==name and aliases[a].get_type()=="Function"): - return aliases[a] - return None def get_includes(self): return self.includes def get_modules_included(self): modules = [] - for include in self.get_includes(): - modules+=include.get_defines() + for include in self.includes: + modules+=list(include.defines.values()) return modules - - def get_all_includes(self): - if(self.first_file): - return self.all_includes - else: - return self.origin.get_all_includes() - - def add_include_to_all_includes(self, include): - if(self.first_file): - self.all_includes.append(include) - else: - self.origin.add_include_to_all_includes(include) - + + #---------------------- #INITIALISE #---------------------- #Method that initialises the nextflow file def initialise(self): - + #If the file is not alreday initialised then we self.initialise it + if(not self.initialised): + self.initialised = True - if(self.get_DSL()=="DSL2"): - if(self.get_display_info()): - print(self.get_file_address()) - - #Extarct Processes - self.extract_processes() - #print("Extract processes :", self.processes) - - #CODE without processes - code = self.get_code() - for proecess in self.processes: - code = code.replace(proecess.get_code(), "") - #for match in re.finditer(r"\\\s*\n\s*\|", code): - # #TODO add line - # print(code) - # raise BioFlowInsightError(f"The use of backslash '\\' and pipe operator '|' was found in the file '{self.get_file_address()}.' ", origin=self) - - - #Analyse Processes - #TODO analyse processes - - #Extarct includes - self.extract_includes() - #print("Extract includes :", self.includes) - - #Analyse Inludes - for include in self.includes: - include.initialise() - - #Extract subworkflows - self.extract_subworkflows() - #print("Extract subworkflows :", self.subworkflows) - - #Extract main - self.extract_main() - #print("Extract main :", self.main) - - #Extract functions - self.extract_functions() - - #Extract Executors - #self.extract_executors() - - #Analyse Executors - for e in self.executors: - e.initialise() + if(self.get_DSL()=="DSL2"): + if(self.workflow.get_display_info_bool()): + print(f"Analysing -> '{self.get_file_address()}'") + + #Extarct Processes + self.extract_processes() + #Analysing Processes + for process in self.processes: + process.initialise() + + #Code without processes + code = self.get_code() + for proecess in self.processes: + code = code.replace(proecess.get_code(), "") - - - #Analyse Main - if(self.main!=None and self.first_file): - self.main.initialise() - - #Analyse subworkflows - indice=1 - for sub in self.subworkflows: - sub.initialise() - indice+=1 - - #if(self.first_file): - # number_process_used = 0 - # with open(self.get_output_dir() / 'debug/processes_used.json') as json_file: - # dict = json.load(json_file) - # for file in dict: - # number_process_used+=len(set(dict[file])) - # - # with open(self.get_output_dir() / "general.json") as json_file: - # dico_wf = json.load(json_file) - # - # #dico_wf["processes"]["number used"] = number_process_used - # - # with open(self.get_output_dir() / "general.json", "w") as outfile: - # json.dump(dico_wf, outfile, indent=4) - - - elif(self.get_DSL()=="DSL1"): - if(self.get_display_info()): - print(self.get_file_address()) - self.extract_processes() - self.extract_functions() - self.DSL1_extract_executors() - for e in self.executors: - e.initialise() - - else: - raise Exception(f"I don't know what to do with this:/ '{self.get_DSL()}'") - - if(self.first_file): - self.initialise_graph() + #Extract includes + self.extract_includes() + #Extract subworkflows + self.extract_subworkflows() - def DSL1_extract_executors(self): - from .operation import Operation - - code = self.get_code() - - things_to_remove = [] - things_to_remove+= self.processes+self.includes+self.subworkflows+self.functions - - for to_remove in things_to_remove: - code = code.replace(to_remove.get_code(get_OG = True), "", 1) - - #We add this to simplify the search of the executors - code = "start\n"+code+"\nend" - - #This function takes an executor (already found and expandes it to the pipe operators) - def expand_to_pipe_operators(text, executor): - #If the executor ends with the pipe operator -> we remove it so that it can be detected by the pattern - if(executor[-1]=="|"): - executor = executor[:-1].strip() - start = text.find(executor)+len(executor) - for match in re.finditer(constant.END_PIPE_OPERATOR, text[start:]): - begining, end = match.span(0) - if(begining==0): - return expand_pipe_operator(text, executor+match.group(0)) - break - return executor - - - - #--------------------------------------------------------------- - #STEP1 - Extract equal operations eg. - # *Case "channel = something" - # *Case "(channel1, channel2) = something" - #--------------------------------------------------------------- - pattern_equal = constant.LIST_EQUALS - - searching = True - while(searching): - searching= False - text = code - for e in self.executors: - text = text.replace(e.get_code(), "", 1) - - for pattern in pattern_equal: - for match in re.finditer(pattern, text): - - start, end = match.span(2) - ope = extract_end_operation(text, start, end) - ope = expand_to_pipe_operators(text, ope) - - #If the thing which is extracted is not in the conditon of an if - if(not checks_in_condition_if(text, ope) and not checks_in_string(text, ope)): - operation = Operation(ope, self) - self.executors.append(operation) - searching= True - break + #Analyse Inludes + for include in self.includes: + include.initialise() - - #------------------------------------------------- - #STEP2 - Extract the terms which use the operators - #------------------------------------------------- - pattern_dot = constant.DOT_OPERATOR - searching = True - searched = [] - - - while(searching): - searching= False - text = code - for e in self.executors: - text = text.replace(e.get_code(), "", 1) - - for match in re.finditer(pattern_dot, text): - start, end = match.span(1) - - if(match.group(1) not in constant.ERROR_WORDS): - if(match.group(1) in constant.LIST_OPERATORS): - #TODO -> the function below might not work perfectly but i don't have any other ideas - - - #Use if there is an operator called right before opening the curlies/parenthse - #curly_left, curly_right = get_curly_count(text[:start]), get_curly_count(text[end:]) - parenthese_left, parenthese_right = get_parenthese_count(text[:start]), get_parenthese_count(text[end:]) - - #if(curly_left==0 and curly_right==0 and parenthese_left==0 and parenthese_right==0 and (start, end) not in searched): - #if(parenthese_left==0 and parenthese_right==0 and (start, end, temp) not in searched): - if(parenthese_left==0 and parenthese_right==0): - - - try: - pot = extract_executor_from_middle(text, start, end) - except: - try: - temp = text[start-10:end+10] - except: - temp = text[start:end] - raise BioFlowInsightError(f"Failed to extract the operation or call{self.get_string_line(temp)}. Try rewriting it in a simplified version.", num = 11, origin=self) - - pot = expand_to_pipe_operators(text, pot) - #IF the exact potential hasn't already been searched, then we don't do it - if((start, end, pot) not in searched): - searched.append((start, end, pot)) - #If the thing which is extracted is not in the conditon of an if - if(not checks_in_condition_if(text, pot) and not checks_in_string(text, pot)): - - ope = Operation(pot, self) - self.executors.append(ope) - searching = True - break - - - #--------------------------------------------------------------- - #STEP4 - Extract the Executors which only use the pipe operators (which start with a channel) - #--------------------------------------------------------------- - to_call = self.get_list_name_processes()+self.get_list_name_subworkflows()+self.get_list_name_includes() - - searching = True - while(searching): - searching= False - text = code - for e in self.executors: - text = text.replace(e.get_code(get_OG=True), "", 1) - pattern = constant.BEGINNING_PIPE_OPERATOR - - for match in re.finditer(pattern, text): - txt_call = expand_pipe_operator(text, match.group(0)) - full_executor = txt_call - - #start, end = match.span(0) - ## Check to see if a parameter is given such as in the example 'splitLetters | flatten | convertToUpper | view { it.trim() }' - #params, full_executor = check_if_parameter_is_given_pipe(text, start, end) - #if(params!=''): - # tab_to_call = txt_call.split('|') - # start = f"{tab_to_call[0]}({params})" - # txt_call = start + '|' + '|'.join(tab_to_call[1:]) - # print(start) - #print(params, full_executor) + #Extract main + self.extract_main() - #If the thing which is extracted is not in the conditon of an if - if(not checks_in_condition_if(text, full_executor) and not checks_in_string(text, full_executor)): - tab_to_call = txt_call.split('|') - if(tab_to_call[0].strip() in to_call): - start = f"{tab_to_call[0]}()" - txt_call = start + '|' + '|'.join(tab_to_call[1:]) - first_thing_called = txt_call.split('|')[-1].strip() - - if(first_thing_called in constant.LIST_OPERATORS): - ope = Operation(code =txt_call, origin =self, OG_code= full_executor) - self.executors.append(ope) - searching = True - break - else: - added = False - #This is in the case "channel | map {dfvfdvd}" - for ope in constant.LIST_OPERATORS: - if(first_thing_called[:len(ope)]==ope and not added): - ope = Operation(code =txt_call, origin =self, OG_code= full_executor) - self.executors.append(ope) - added = True - searching = True - if(added): - break - elif(not added): - raise BioFlowInsightError(f"In the executor '{txt_call}', '{first_thing_called}' is neither a process, subworkflow or an operator (in the file '{self.get_file_address()}')", num = 14,origin=self) - - #--------------------------------------------------------------------- - #STEP5 - We remove the things which were falsy extracted as executors - #--------------------------------------------------------------------- - to_remove = [] - starting_by_to_remove = ["System.out"] - for e in self.executors: - for r in starting_by_to_remove: - if(e.get_code()[:len(r)]==r): - to_remove.append(e) - for e in to_remove: - self.executors.remove(e) - - #The start parameter is for when we call 'get_structure_DSL2' for the first time - def get_structure_DSL2(self, dico, start = False): - if(not self.already_added_structure): - self.already_added_structure = True - #Add the operations found in the file (outside of main or subworkflow) to the structure - for o in self.executors: - if(o.get_type()=="Operation"): - o.get_structure(dico) - else: - if(o.get_first_element_called().get_type()!="Function"): - raise Exception(f"Executor of type '{o.get_type()}' was extracted in a DSL2 workflow (outside of a subworkflow or main)! This shoudn't happen! The code is '{o.get_code()}' -> it was called in file '{o.get_file_address()}'") - - #for c in self.get_channels(): - # for source in c.get_source(): - # for sink in c.get_sink(): - # dico["edges"].append({'A':str(source), 'B':str(sink), "label":c.get_name()}) - - if(start): - if(self.main!=None): - self.main.get_structure(dico) - if(not start and self.main!=None): - warnings.warn(f"Another main was detected in the file '{self.get_file_address()}' (it is not represented in the graph)") - #raise Exception(f'There was a second main which was detected in the workflow in the file {self.get_file_address()}') - return dico - + #Extract functions + self.extract_functions() - def get_structure_DSL1(self, dico): - for p in self.get_processes(): - p.get_structure(dico) - - for o in self.get_executors(): - if(o.get_type()=="Operation"): - o.get_structure(dico) - else: - raise Exception(f"Executor of type '{o.get_type()}' was extracted in a DSL1 workflow! This shoudn't happen! The code is '{o.get_code()}'") - - for c in self.get_channels(): - for source in c.get_source(): - for sink in c.get_sink(): - #If the sink an operation then the edge has already been added in the get_structure method for the operation - if(sink.get_type()=="Process"): - dico["edges"].append({'A':str(source), 'B':str(sink), "label":c.get_name()}) - - return dico - - - def get_structure(self): - dico = {} - dico['nodes'] = [] - dico['edges'] = [] - dico['subworkflows'] = {} - - if(self.DSL == "DSL1"): - return self.get_structure_DSL1(dico=dico) - elif(self.DSL == "DSL2"): - return self.get_structure_DSL2(dico=dico, start = True) - else: - raise Exception(f"The workflow's DSL is '{self.DSL}' -> I don't know what this is!") - - - def initialise_graph(self): - from .graph import Graph - if(self.graph==None): - self.graph = Graph(self) - - def generate_all_graphs(self, render_graphs = True, processes_2_remove = []): - #Initialisation (obligatory) - self.graph.initialise(processes_2_remove = processes_2_remove) - - #Generate the different graphs - self.graph.get_specification_graph(render_graphs = render_graphs) - self.graph.get_specification_graph_wo_labels(render_graphs = render_graphs) - self.graph.render_graph_wo_operations(render_graphs = render_graphs) - self.graph.get_specification_graph_wo_orphan_operations(render_graphs = render_graphs) - self.graph.get_specification_graph_wo_orphan_operations_wo_labels(render_graphs = render_graphs) - self.graph.render_dependency_graph(render_graphs = render_graphs) - self.graph.get_dependency_graph_wo_labels(render_graphs = render_graphs) - self.graph.get_dependency_graph_wo_orphan_operations(render_graphs = render_graphs) - self.graph.get_dependency_graph_wo_orphan_operations_wo_labels(render_graphs = render_graphs) - - #Generate the different metadata associated with the graphs - self.graph.get_metadata_specification_graph() - self.graph.get_metadata_dependency_graph() - self.graph.get_metadata_process_dependency_graph() - - def generate_specification_graph(self, render_graphs = True, processes_2_remove = []): - self.graph.initialise(processes_2_remove = processes_2_remove) - self.graph.get_specification_graph(render_graphs = render_graphs) - - def generate_process_dependency_graph(self, render_graphs = True, processes_2_remove = []): - self.graph.initialise(processes_2_remove = processes_2_remove) - self.graph.render_graph_wo_operations(render_graphs = render_graphs) - - def generate_user_view(self, relevant_processes = [], render_graphs = True, processes_2_remove = []): - self.graph.initialise(processes_2_remove = processes_2_remove) - self.graph.generate_user_view(relevant_processes = relevant_processes, render_graphs = render_graphs) - - def generate_level_graphs(self, render_graphs = True, processes_2_remove = [], label_edge=True, label_node=True): - self.graph.initialise(processes_2_remove = processes_2_remove) - self.graph.generate_level_graphs(render_graphs = render_graphs, label_edge=label_edge, label_node=label_node) - - def generate_user_and_process_metadata(self): - #TODO -> this first line is added in reality it needs to be commented - self.graph.get_metadata_specification_graph() - self.graph.get_metadata_process_dependency_graph() - self.graph.get_metadata_user_view() - - - def get_graph(self): - return self.graph - #def get_metadata_graph_wo_operations(self): - # self.graph.get_metadata_graph_wo_operations() - - def get_number_subworkflows_process_dependency_graph(self): - return self.graph.get_number_subworkflows_process_dependency_graph() - - def get_number_subworkflows_user_view(self): - return self.graph.get_number_subworkflows_user_view() - - def node_2_subworkflows_process_dependency_graph(self): - return self.graph.node_2_subworkflows_process_dependency_graph() - - def node_2_subworkflows_user_view(self): - return self.graph.node_2_subworkflows_user_view() - - def check_fake_dependency_user_view(self): - return self.graph.check_fake_dependency_user_view() - - - - def add_main_DSL1_2_rocrate(self, dico, file_dico, file_name): - main_key = f"{file_name}#main" - file_dico["hasPart"].append(main_key) - dico_main = {} - dico_main["@id"] = main_key - dico_main["name"] = "Main Workflow" - dico_main["@type"] = ["SoftwareSourceCode", "ComputationalWorkflow"] - #TODO -> check if this remains true - #dico_main["conformsTo"] = {"@id": "https://bioschemas.org/profiles/ComputationalWorkflow/0.5-DRAFT-2020_07_21"} - #dico_main["dct:conformsTo"]= "https://bioschemas.org/profiles/ComputationalWorkflow/1.0-RELEASE/" - dico_main["input"] = [] - dico_main["output"] = [] - dico_main["isPartOf"] = [{"@id": file_name}] - dico_main["hasPart"] = [] - self.add_processes_2_rocrate(dico, dico_main, main_key.split("#")[0]) - dico["@graph"].append(dico_main) - - def add_processes_2_rocrate(self, dico, file_dico, file_name): - for p in self.processes: - process_key = f"{file_name}#{p.get_name()}" - file_dico["hasPart"].append(process_key) - p.add_2_rocrate(dico, file_name) - - def add_includes_2_rocrate(self, dico, file_dico, file_name): - for include in self.includes: - included_key = include.get_file().get_file_address()[len(dico["temp_directory"])+1:] - file_dico["hasPart"].append({"@id":included_key}) - included_dico = get_dico_from_tab_from_id(dico, included_key) - included_dico["isPartOf"].append({"@id":file_name}) - include.get_file().add_2_rocrate(dico) - - def add_subworkflows_2_rocrate(self, dico, file_dico, file_name): - for sub in self.subworkflows: - sub_key = sub.get_rocrate_key(dico) - file_dico["hasPart"].append({"@id":sub_key}) - sub.add_2_rocrate(dico, file_name) - - def add_2_rocrate(self, dico): - if(not self.added_2_rocrate): - self.added_2_rocrate = True - file_name = self.get_file_address()[len(dico["temp_directory"])+1:] - file_dico = get_dico_from_tab_from_id(dico, file_name) - if(self.first_file): - - #Case DSL1 - if(self.get_DSL()=="DSL1"): - #file_dico["@type"].append("ComputationalWorkflow") - self.add_main_DSL1_2_rocrate(dico, file_dico, file_name) - self.add_processes_2_rocrate(dico, file_dico, file_name) - - #Case DSL2 - elif(self.get_DSL()=="DSL2"): - self.add_processes_2_rocrate(dico, file_dico, file_name) - self.add_includes_2_rocrate(dico, file_dico, file_name) - self.main.add_2_rocrate(dico, file_name) - self.add_subworkflows_2_rocrate(dico, file_dico, file_name) - - else: - raise Exception("This shoudn't happen!") - else: - if(self.get_DSL()=="DSL2"): - self.add_processes_2_rocrate(dico, file_dico, file_name) - self.add_includes_2_rocrate(dico, file_dico, file_name) - self.add_subworkflows_2_rocrate(dico, file_dico, file_name) - - #TODO - else: - raise Exception("This shoudn't happen!") - - + #Analyse Main + if(self.first_file and self.main!=None): + self.main.initialise() + # + ##Analyse subworkflows + #indice=1 + #for sub in self.subworkflows: + # sub.initialise() + # indice+=1 diff --git a/src/operation.py b/src/operation.py index 5425fbd70e0bdbe543a6bee3084375241fd95ffd..70bf34154c7c4c375d66b019176a920dcd217b83 100644 --- a/src/operation.py +++ b/src/operation.py @@ -29,7 +29,7 @@ class Operation(Executor): #Artificial means that it is created by the analysis -> it is not native in the code self.artificial = False #It's important this is last - self.condition = Condition(self) + #self.condition = Condition(self) def change_code(self, code): self.code = Code(code, origin = self) @@ -67,11 +67,11 @@ class Operation(Executor): def get_type(self): return "Operation" - def check_in_channels(self, channel): - return self.origin.check_in_channels(channel) + #def check_in_channels(self, channel): + # return self.origin.check_in_channels(channel) - def add_channel(self, channel): - self.origin.add_channel(channel) + #def add_channel(self, channel): + # self.origin.add_channel(channel) def get_elements_called(self, tab = []): for o in self.origins: @@ -84,13 +84,16 @@ class Operation(Executor): from .channel import Channel #Check that the name is not the list of illegal words #and Check that the thing extarcted is not WorkflowNameFile like 'WorkflowHgtseq' in nf-core/hgtseq - if(name not in constant.ERROR_WORDS_ORIGINS and name.lower()!=f"workflow{self.get_name_file().lower()}"): + if(name not in constant.ERROR_WORDS_ORIGINS):# and name.lower()!=f"workflow{self.get_name_file().lower()}"): channel = Channel(name=name, origin=self.origin) #TODO -> this needs to be checked - if(not self.origin.check_in_channels(channel)): - self.origin.add_channel(channel) + if(self.origin.get_type()!="Subworkflow"): + if(not self.origin.check_in_channels(channel)): + self.origin.add_channel(channel) + else: + channel = self.origin.get_channel_from_name(name) else: - channel = self.origin.get_channel_from_name(name) + self.origin.takes_channels.append(channel) self.origins.append(channel) #channel.initialise() channel.add_sink(self) diff --git a/src/outils.py b/src/outils.py index 0570b0b89cb2f46ce712bd7e51c364e2e2b87d48..240cadd622196b19f31093e0ee8a2547de1badbb 100644 --- a/src/outils.py +++ b/src/outils.py @@ -919,6 +919,7 @@ def is_git_directory(path = '.'): #Function that extracts the conditions defined in some code #TODO -> need to update this -> if the same condition appears multiple times in the code -> in the dico it is only counted once +#Right now the function is not recursif -> since i call blocks recursively and that this function is only used by blocks -> it is indirectrly called recursiverly def extract_conditions(code): conditions_dico = {} @@ -996,9 +997,9 @@ def extract_conditions(code): condition = match.group(1) conditions.append(condition) end = extract_curly(code, match.span(0)[1]+start)#Here we nedd to add the start index since we're only working on a subpart of code - conditions_dico[condition] = (start,end) start_inside, end_inside = match.span(0)[1]+start, end-1 - conditions_dico = adding_inside(conditions_dico, code, start_inside, end_inside) + conditions_dico[condition] = (start_inside, end_inside) + #conditions_dico = adding_inside(conditions_dico, code, start_inside, end_inside) break searching_for_else = True while(searching_for_else): @@ -1015,9 +1016,9 @@ def extract_conditions(code): start_else, end_else = match.span(0) start_else+=end end_else = extract_curly(code, end_else+end) - conditions_dico[condition] = (start_else,end_else) start_inside, end_inside = match.span(0)[1]+end, end_else-1 - conditions_dico = adding_inside(conditions_dico, code, start_inside, end_inside) + conditions_dico[condition] = (start_inside, end_inside) + #conditions_dico = adding_inside(conditions_dico, code, start_inside, end_inside) break #CASE of "else" if(not found_else_if): @@ -1025,9 +1026,9 @@ def extract_conditions(code): start_else, end_else = match.span(0) start_else+=end end_else = extract_curly(code, end_else+end) - conditions_dico[' && '.join(["!({})".format(v) for v in conditions])] = (start_else,end_else) start_inside, end_inside = match.span(0)[1]+end, end_else-1 - conditions_dico = adding_inside(conditions_dico, code, start_inside, end_inside) + conditions_dico[' && '.join(["!({})".format(v) for v in conditions])] = (start_inside, end_inside) + #conditions_dico = adding_inside(conditions_dico, code, start_inside, end_inside) #print(code[start_else:end_else]) break end = end_else diff --git a/src/process.py b/src/process.py index 29c10c0f939958a0fcc01fce9983c4ee75c3f281..b5bf580589d65ddcf8e7c10812fbc8eb38179401 100644 --- a/src/process.py +++ b/src/process.py @@ -10,8 +10,8 @@ from .bioflowinsighterror import BioFlowInsightError from . import constant class Process(Nextflow_Building_Blocks): - def __init__(self, code, origin): - self.origin = origin + def __init__(self, code, nextflow_file): + self.nextflow_file = nextflow_file self.code = Code(code, origin = self) self.name = "" self.alias = "" @@ -19,24 +19,16 @@ class Process(Nextflow_Building_Blocks): self.inputs = [] self.raw_input_names = []#This is used to convert DSL1 workflows to DSL2 self.outputs = [] - #This attribut is a list of lists - self.outputs_per_line = [] + self.input_code = "" self.output_code = "" self.when_code = "" self.pusblishDir_code = "" self.script_code = "" - self.tools = [] - self.modules = [] - self.commands = [] - self.external_scripts = [-1] - self.initialise() - self.initialised = True - self.call = [] - self.number_times_called = 0 - self.later_emits = [] - ##It's important this is last - #self.condition = Condition(self) + + self.called_by = []#List of calls + + def add_to_emits(self, emit): self.later_emits.append(emit) @@ -68,140 +60,7 @@ class Process(Nextflow_Building_Blocks): def get_name(self): return self.name - def get_tools(self, remove_script_calls = True): - def remove_script_calls(tab_temp): - tab = tab_temp.copy() - if("python" in tab): - tab.remove("python") - if("R" in tab): - tab.remove("R") - if("perl" in tab): - tab.remove("perl") - return tab - if(remove_script_calls): - return remove_script_calls(self.tools) - else: - return self.tools - - - def get_external_scripts_call(self, code): - tab = [] - for match in re.finditer(r"((\s|\/|\'|\")([\w\_\-\&]+\/)*([\w\_\-\&]+)\.(sh|py|R|r|pl|rg|bash))[^\w]", code): - word = match.group(1).strip() - if(word[0]=="'" or word[0]=='"'): - word = word[1:] - tab.append(word) - return list(set(tab)) - - def initialise_external_scripts_code(self, code, extension = "", previously_called = {}): - calls = self.get_external_scripts_call(code+'\n') - - #workflow_directory = self.origin.get_address() - #print(workflow_directory) - #import os - #print(os.getcwd(), self.origin.get_address(), self.get_workflow_address()) - scripts = [] - extensions = [] - bash_scripts = [] - - tab = [] - for call in calls: - #Check first if the file is in the bin - file = glob.glob(f'{self.get_workflow_address()}/bin/**/{call}', recursive=True) - if(len(file)>1): - raise BioFlowInsightError(f"More than one script named '{call}' in the workflow source code bin, don't know which one to use when using the process '{self.get_name()}'", num = 13, origin=self) - #If not we search again - elif(len(file)==0): - file = glob.glob(f'{self.get_workflow_address()}/**/{call}', recursive=True) - if(len(file)>1): - raise BioFlowInsightError(f"More than one script named '{call}' in the workflow source code, don't know which one to use when using the process '{self.get_name()}'", num = 13, origin=self) - - - for f in file: - if(f not in previously_called): - val = "" - with open(f, 'r', encoding='latin-1') as s: - #with open(f, 'r') as s: - val = s.read() - scripts.append(val) - previously_called[f] = "" - #Recursive Call to get the ones which are being called if the current file is a bash script - if(extension not in ["py", "R", "pl", "rg", "r"]): - tab += self.initialise_external_scripts_code(val, extension=f.split(".")[-1], previously_called= previously_called) - scripts+=tab - - return list(set(scripts)) - - - def get_external_scripts_code(self): - if(self.external_scripts!=[-1]): - None - else: - self.external_scripts = self.initialise_external_scripts_code(self.get_script_code()) - return self.external_scripts - - - def get_python_packages_imported_internal_script(self): - packages = [] - packages+= get_python_packages(self.get_script_code()) - return packages - - def get_python_packages_imported_external_scripts(self): - packages = [] - external_scripts = self.get_external_scripts_code() - for s in external_scripts: - packages+= get_python_packages(s) - return packages - - #This methods checks the script and the external script calls for python packages imports - def get_python_packages_imported(self): - packages = [] - packages+= self.get_python_packages_imported_internal_script() - packages+= self.get_python_packages_imported_external_scripts() - return list(set(packages)) - - - def get_R_libraries_loaded_internal_script(self): - libraries = [] - libraries+= get_R_libraries(self.get_script_code()) - return libraries - - def get_R_libraries_loaded_external_scripts(self): - libraries = [] - for s in self.get_external_scripts_code(): - libraries+= get_R_libraries(s) - return libraries - - #This methods checks the script and the external script calls for R libraries loaded - def get_R_libraries_loaded(self): - libraries = [] - libraries+= self.get_R_libraries_loaded_internal_script() - libraries+= self.get_R_libraries_loaded_external_scripts() - return list(set(libraries)) - - - def get_perl_modules_imported_internal_script(self): - libraries = [] - libraries+= get_perl_modules(self.get_script_code()) - return libraries - - def get_perl_modules_imported_external_scripts(self): - libraries = [] - for s in self.get_external_scripts_code(): - libraries+= get_perl_modules(s) - return libraries - - def get_perl_modules_imported(self): - libraries = [] - libraries+= self.get_perl_modules_imported_internal_script() - libraries+= self.get_perl_modules_imported_external_scripts() - return list(set(libraries)) - - - #def get_source(self): - # return [self] - - #MEthod which returns the DSL type of a process, i use the presence + #Method which returns the DSL type of a process, i use the presence #of from and into as a proxy. By default it's DSL2 def which_DSL(self): DSL = "DSL2" @@ -320,7 +179,7 @@ class Process(Nextflow_Building_Blocks): self.when_code = temp_code elif(variables_index[i]=='script'): self.script_code = temp_code - self.extract_tools() + #self.extract_tools() else: raise Exception("This shoudn't happen!") @@ -499,158 +358,20 @@ class Process(Nextflow_Building_Blocks): dico['nodes'].append({'id':str(self), 'name':self.get_name_to_print(), "shape":"ellipse", 'xlabel':"", 'fillcolor':''}) def initialise_inputs_outputs(self): - DSL = self.origin.get_DSL() + DSL = self.nextflow_file.get_DSL() if(DSL=="DSL1"): self.initialise_inputs_DSL1() self.initialise_outputs_DSL1() elif(DSL=="DSL2"): self.initialise_inputs_DSL2() self.initialise_outputs_DSL2() - #else: - # raise Exception("Workflow is neither written in DSL1 nor DSL2!") + else: + raise Exception("Workflow is neither written in DSL1 nor DSL2!") def initialise(self): self.initialise_name() self.initialise_parts() self.initialise_inputs_outputs() - #annotations = self.get_processes_annotation() - annotations = None - if(annotations!=None): - self.tools = annotations[self.get_code()]["tools"] - self.commands = annotations[self.get_code()]["commands"] - self.modules = annotations[self.get_code()]["modules"] - - def add_2_rocrate(self, dico, parent_key): - process_key = self.get_rocrate_key(dico) - dico_process = get_dico_from_tab_from_id(dico, process_key) - if(dico_process==None): - dico_process = {} - dico_process["@id"] = process_key - dico_process["name"] = "Process" - dico_process["@type"] = ["SoftwareSourceCode"] - #ADD INPUTS - dico_process["input"] = [] - for input in self.get_inputs(): - if(type(input)==str): - name_input = input - else: - name_input = input.get_code() - dico_input = get_dico_from_tab_from_id(dico, name_input) - if(dico_input==None): - dico_input = {"@id":f"#{name_input}", "@name": name_input, "@type": "FormalParameter"} - dico["@graph"].append(dico_input) - dico_process["input"].append({"@id":dico_input["@id"]}) - #ADD OUTPUTS - dico_process["output"] = [] - for output in self.get_outputs(): - if(type(output)==str): - name_output = output - else: - name_output = output.get_code() - dico_output = get_dico_from_tab_from_id(dico, name_output) - if(dico_output==None): - dico_output = {"@id":f"#{name_output}", "@name": name_output, "@type": "FormalParameter"} - dico["@graph"].append(dico_output) - dico_process["output"].append({"@id":dico_output["@id"]}) - #ADD isPartOf - dico_process["isPartOf"] = [] - dico_process["isPartOf"].append({"@id":parent_key}) - #ADD hasPart - dico_process["hasPart"] = [] - for tool in self.get_tools(): - dico_tool = get_dico_from_tab_from_id(dico, tool) - if(dico_tool==None): - dico_tool = {"@id":tool, - "name": "Tool" - #TODO in later versions - #, "url": "https://some.link.com" - #, "identifier": "tool_identifier" - } - dico["@graph"].append(dico_tool) - dico_process["hasPart"].append({"@id":dico_tool["@id"]}) - - dico["@graph"].append(dico_process) - else: - if(not check_if_element_in_tab_rocrate(dico_process["isPartOf"], parent_key)): - dico_process["isPartOf"].append({"@id":parent_key}) - def convert_input_code_to_DSL2(self): - code = self.input_code - #code = process_2_DSL2(code) - lines = [] - for line in code.split("\n"): - temp = process_2_DSL2(line.split(" from ")[0]) - lines.append(temp) - code = "\n".join(lines) - return code - - def convert_output_code_to_DSL2(self): - code = self.output_code - lines = [] - for line in code.split("\n"): - line = line.replace(" into ", ", emit: ") - line = line.replace(" mode flatten", "") - #Remove optionnal true #TODO check if this breaks soemthing - line = line.replace("optional true", "") - line = process_2_DSL2(line) - lines.append(line) - code = "\n".join(lines) - #Removing the extra emits - #For it to only have one, - for line in self.outputs_per_line: - def replacer(match): - return match.group(1) - for o in line[1:]: - code = re.sub(fr"\,\s*{re.escape(o.get_code())}(\s|\,|\))", replacer, code+"\n") - return code - - #This method is to detect which are the channels which need to be flattened - #See https://github.com/nextflow-io/nextflow/blob/be1694bfebeb2df509ec4b42ea5b878ebfbb6627/docs/dsl1.md - def get_channels_to_flatten(self): - code = self.output_code - channels = [] - for match in re.finditer(r"(\w+) mode flatten", code): - channels.append(match.group(1)) - return channels - - #This method cleans the raw_input_names to use when rewriting DSL1 workflows - def clean_raw_input_names(self, raw_input_names): - for i in range(len(raw_input_names)): - if(bool(re.fullmatch(r"\w+\.val", raw_input_names[i]))): - raw_input_names[i] = raw_input_names[i].split('.')[0] - return raw_input_names - - def get_parameters_call(self): - return ', '.join(self.clean_raw_input_names(self.raw_input_names)) - - def convert_to_DSL2(self): - if(self.get_DSL()=="DSL2"): - print("Workflow is already written in DSL2") - else: - code = self.get_code() - call = [f"{self.get_name()}({self.get_parameters_call()})"] - code = code.replace(self.input_code, self.convert_input_code_to_DSL2()) - code = code.replace(self.output_code, self.convert_output_code_to_DSL2()) - channels_to_flatten = self.get_channels_to_flatten() - - - #Rewriting the attributions of the channels for it to match the new values emitted (single values) - index = 0 - for line in self.outputs_per_line: - for emitted in line: - o = self.outputs[index] - if(o.get_code() in channels_to_flatten): - call.append(f"{o.get_code()} = {self.get_name()}.out.{line[0].get_code()}.flatten()") - else: - call.append(f"{o.get_code()} = {self.get_name()}.out.{line[0].get_code()}") - index+=1 - - #for o in self.outputs: - # if(o.get_code() in channels_to_flatten): - # call.append(f"{o.get_code()} = {self.get_name()}.out.{o.get_code()}.flatten()") - # else: - # call.append(f"{o.get_code()} = {self.get_name()}.out.{o.get_code()}") - call = "\n".join(call) - return code, call diff --git a/src/ro_crate.py b/src/ro_crate.py index cc8e3947d9e60f9b3e77c600104c31c055b55a11..186c53f2f0ef9ca9e229d129074a05d45e128229 100644 --- a/src/ro_crate.py +++ b/src/ro_crate.py @@ -127,7 +127,7 @@ class RO_Crate: #TODO -> update this -> it's incomplet def get_url(self, file): if(self.workflow.dico!={}): - return f"https://github.com/{self.workflow.get_address()}/blob/main/{file}" + return f"https://github.com/{self.workflow.get_file_address()}/blob/main/{file}" return None diff --git a/src/main_DSL2.py b/src/root.py similarity index 57% rename from src/main_DSL2.py rename to src/root.py index ae921c877f4da99eb255752a549e17a12b540bcc..d275afd4d84a6e17d52b81b4c34198e78f31a0c5 100644 --- a/src/main_DSL2.py +++ b/src/root.py @@ -1,330 +1,94 @@ + from .nextflow_building_blocks import Nextflow_Building_Blocks +from .code_ import Code from .bioflowinsighterror import BioFlowInsightError -import re -from .outils import * +from .outils import * from . import constant +import re - -class Main_DSL2(Nextflow_Building_Blocks): - def __init__(self, code, origin): +class Root(Nextflow_Building_Blocks): + def __init__(self, code, origin, modules_defined, + subworkflow_inputs = []):#These channels are the inputs of the subworkflow Nextflow_Building_Blocks.__init__(self, code) self.origin = origin - self.calls = [] - self.initialised = False - self.conditions=None - self.modules_defined = [] - self.modules_called = [] - - def get_modules_defined(self): - return self.modules_defined - - def get_all_executors(self, dico): - for e in self.get_executors(): - dico[e] = 1 - - for exe in self.get_executors(): - if(exe.get_type()=="Call"): - first = exe.get_first_element_called() - if(first.get_type()=="Subworkflow"): - first.get_all_executors(dico) - elif(exe.get_type()=="Operation"): - for o in exe.get_origins(): - if(o.get_type()=="Call"): - first = o.get_first_element_called() - if(first.get_type()=="Subworkflow"): - first.get_all_executors(dico) - else: - raise Exception("This shouldn't happen") - - - - def get_channels(self): - return self.channels + self.executors = [] + self.blocks = [] + self.modules_defined = modules_defined + self.elements_being_called = [] + self.channels = subworkflow_inputs - def get_workflow_code(self): - return self.get_code() - - def get_file_conditions(self): - if(self.conditions==None): - self.conditions = extract_conditions(self.get_code()) - return self.conditions - def get_type(self): - return "Main DSL2" - - def get_calls(self): - return self.calls + return "Root" - def is_initialised(self): - return self.initialised - - def get_all_called(self): - called = [] - for exe in self.get_executors(): - if(exe.get_type()=="Call"): - called+=exe.get_elements_called() - else: - for o in exe.get_origins(): - if(o.get_type()=="Call"): - called+=o.get_elements_called() - return called - - #def get_processes(self): - # return self.origin.get_processes()+super().get_processes() + def add_element_to_elements_being_called(self, element): + self.elements_being_called.append(element) - #def get_process_from_name(self, name): - # print("here") - # return self.origin.get_process_from_name(name) - - def get_process_from_name(self, name): - for m in self.modules_called: - if(m.get_type()=="Process" and m.get_alias()==name): - return m - - for m in self.modules_defined: - if(m.get_type()=="Process" and m.get_alias()==name): - #If we're duplicating we need to check that the processes hasn't already been called - #In that case we duplicate it - if(self.get_duplicate_status()): - if(m.get_number_times_called()>0): - import copy - process = copy.deepcopy(m) - self.modules_called.append(process) - return process - else: - m.incremente_number_times_called() - self.modules_called.append(m) - return m - else: - return m - return None + def check_in_channels(self, channel): + for c in self.get_channels(): + if(c.equal(channel)): + return True + return False - def get_processes_called(self, defined = {}): - - for c in self.get_all_called(): - if(c.get_type()=="Process"): - defined[c] = [] - elif(c.get_type()=="Subworkflow"): - _ = c.get_processes_called(defined = defined) + def add_channel(self, channel): + if(not self.check_in_channels(channel)): + self.channels.append(channel) + else: + raise Exception("This shoudn't happen!") - return list(defined.keys()) - - def get_subworkflows_called(self, defined = {}): - for c in self.get_all_called(): - if(c.get_type()=="Subworkflow"): - defined[c] = [] - _ = c.get_subworkflows_called(defined = defined) - return list(defined.keys()) - - def get_functions_called(self, defined = {}): - for c in self.get_all_called(): - if(c.get_type()=="Function"): - defined[c] = [] - elif(c.get_type()=="Subworkflow"): - _ = c.get_functions_called(defined = defined) - return list(defined.keys()) - - - def get_function_from_name(self, name): - return self.origin.get_function_from_name(name) - - def get_list_name_subworkflows(self): - return self.origin.get_list_name_subworkflows() - - def get_list_name_includes(self): - return self.origin.get_list_name_includes() - - def get_channel_from_name(self, name): - channel_file = self.origin.get_channel_from_name(name) - if(channel_file!=None): - return channel_file - return super().get_channel_from_name(name) - - - """def get_added_operations_structure(self): - return self.origin.get_added_operations_structure()""" - - def check_in_channels(self, channel): - found = super().check_in_channels(channel) - if(not found): - if(self.origin.get_type()=="Nextflow File"): - return self.origin.check_in_channels(channel) - else: - raise Exception(f"The origin is a '{self.origin.get_type()}' it should be a 'Nextflow File'") - return found - - - def get_subworkflow_from_name(self, name): - return self.origin.get_subworkflow_from_name(name) + for c in self.get_channels(): + if(name == c.get_name()): + return c + #raise Exception(f"{name} is not in the list of channels") + return None - def check_includes(self): - code = self.get_code() - - pattern = constant.FULL_INCLUDE - for match in re.finditer(pattern, code): - if(self.get_type()=="Main DSL2"): - raise BioFlowInsightError(f"An include ('{match.group(0)}') was found in the main in the file '{self.get_file_address()}'. FlowInsight does not support this -> see specification list.", num = 12,origin=self) - elif(self.get_type()=="Subworkflow"): - raise BioFlowInsightError(f"An include ('{match.group(0)}') was found in the subworkflow '{self.get_name()}' in the file '{self.get_file_address()}'. FlowInsight does not support this -> see specification list.", num = 12, origin=self) - else: - raise Exception("This shouldn't happen!") - - def initialise(self): - if(not self.initialised): - #print(self, self.get_all_processes()) - - self.initialised=True - - #Get the modules (Processes defined for the main/subworkflow) - self.modules_defined = self.origin.get_processes()+self.origin.get_subworkflows()+self.origin.get_modules_included() - - #Check that includes are not defined in the main or subworkflows - self.check_includes() - - #Extract Executors - self.extract_executors() - - #Analyse Executors - for e in self.executors: - e.initialise() - - - - - """def add_channels_structure(self, dot): - return self.add_channels_structure_temp(dot, self.origin.get_added_operations_structure()) - """ - def get_origin(self): - return self.origin + #Define the blocks + code = self.get_code() + conditions = extract_conditions(code) + #TODO -> normally it is not a problem -> cause i've removed the recursive option + #But just check that the bodies don't appear twice in the dico - def check_same_origin(self, sub): - return self.get_origin()== sub.get_origin() + #For each condition -> create a block + for c in conditions: + from .block import Block + body = code[conditions[c][0]:conditions[c][1]] + block = Block(code=body, origin=self, condition=c, modules_defined=self.modules_defined) + block.initialise() + self.blocks.append(block) - #Add "global" channels and operation to the structure defined in the file - def get_structure_DSL2(self, dico): - self.origin.get_structure_DSL2(dico) - - - def get_structure(self, dico): - #Add "global" channels and operation to the structure defined in the file - self.get_structure_DSL2(dico) + self.extract_executors() + #Analyse Executors for e in self.executors: - if(e.get_type()=="Operation"): - e.get_structure(dico) - elif(e.get_type()=="Call"): - e.get_structure(dico) - else: - raise Exception(f"Executor of type '{e.get_type()}' was extracted in a DSL2 workflow! I don't know what this is! The code is '{e.get_code()}'") + e.initialise() + #Initialise each subworkflow being called + for sub in self.elements_being_called: + if(sub.get_type()=="Subworkflow"): + sub.initialise() + - # - #nodes_added = [] - # - ##Add operation - #for o in self.get_operations(): - # dico['nodes'].append({'id':str(o), 'name':"", "shape":"point", 'xlabel':o.get_code()}) - # nodes_added.append(str(o)) - # - # #Need to check for cases where the origin is a process or a subworkflow - # for origin in o.get_origins(): - # - # if(origin.get_type()=="Process"): - # #Here i'm not adding the node but an edge -> the node is add when the call happens - # dico["edges"].append({'A':str(origin), 'B':str(o), "label":""}) - # - # elif(origin.get_type()=="Subworkflow"): - # emits = origin.get_emit() - # #TODO -> i'm only doing one parameter for now - # if(len(emits)==1): - # for source in emits[0].get_source(): - # dico["edges"].append({'A':str(source), 'B':str(o), "label":""}) - # else: - # raise Exception(f'TO much to unpack for "{o.get_code()}"') - # - # elif(origin.get_type()=="Emitted"): - # if(origin.get_emitted_by().get_type()=="Process"): - # dico["edges"].append({'A':str(origin.get_emitted_by()), 'B':str(o), "label":origin.get_name()}) - # - # elif(origin.get_emitted_by().get_type()=="Subworkflow"): - # for source in origin.get_emits().get_source(): - # dico["edges"].append({'A':str(source), 'B':str(o), "label":origin.get_name()}) - # - # else: - # raise Exception(f"I don't know how to handle {origin.get_emitted_by()}") - # - # - # elif(origin.get_type()=="Channel"): - # None - # #Here we do nothing since the channels are gonna be added below - # - # else: - # raise Exception(f"George I don't know if this should be an error or not -> i don't think it should be") - # #TODO check this -> it should be added by the channel here below - # - # - ##Adding channels - #for c in self.get_channels(): - # for source in c.get_source(): - # for sink in c.get_sink(): - # #Here we check that the operation exists (already added to the structure) -> it's to avoid showing the operation for the emited channel - # if(str(sink) in nodes_added): - # dico["edges"].append({'A':str(source), 'B':str(sink), "label":c.get_name()}) - # - # - #for c in self.get_calls(): - # c.get_structure(dico) - # - ##return dico - - def add_2_rocrate(self, dico, parent_key): - main_key = f"{parent_key}/MAIN" - dico_main = get_dico_from_tab_from_id(dico, main_key) - if(dico_main==None): - dico_main = {} - dico_main["@id"] = main_key - dico_main["name"] = "Main Workflow" - dico_main["@type"] = ["SoftwareSourceCode", "ComputationalWorkflow"] - #TODO -> check if this remains true - #dico_main["conformsTo"] = {"@id": "https://bioschemas.org/profiles/ComputationalWorkflow/0.5-DRAFT-2020_07_21"} - #dico_main["dct:conformsTo"]= "https://bioschemas.org/profiles/ComputationalWorkflow/1.0-RELEASE/" - dico_main["input"] = [] - dico_main["output"] = [] - dico_main["isPartOf"] = [{"@id": parent_key}] - dico_main["hasPart"] = [] - called = self.get_all_called() - for c in called: - c.add_2_rocrate(dico, main_key) - dico_main["hasPart"].append({"@id":c.get_rocrate_key(dico)}) - - dico["@graph"].append(dico_main) - - def check_if_there_is_a_thing_called_multiple_times(self): - from collections import Counter - called = [] - for e in self.get_executors(): - if(e.get_type()=="Call"): - for thing in e.get_called(): - called.append(thing.get_name()) - dico = Counter(called) - errors = [] - for thing in dico: - if(dico[thing]>1): - errors.append(thing) - if(len(errors)>1): - if(self.get_type()=="Main DSL2"): - text = "the workflow main" - else: - text = f"the subworkflow {self.get_name()}" - raise BioFlowInsightError(f"The elements {errors} were called multiple times in {text}", num="-64") - return called - + def get_channels(self): + return self.channels + def get_process_from_name(self, name): + for m in self.modules_defined: + if(m.get_type()=="Process" and m.get_alias()==name): + return m + + def get_subworkflow_from_name(self, name): + for m in self.modules_defined: + if(m.get_type()=="Subworkflow" and m.get_alias()==name): + return m + + def get_function_from_name(self, name): + for m in self.modules_defined: + if(m.get_type()=="Function" and m.get_alias()==name): + return m def extract_executors(self): from .operation import Operation @@ -333,25 +97,19 @@ class Main_DSL2(Nextflow_Building_Blocks): #https://github.com/nextflow-io/nextflow/blob/45ceadbdba90b0b7a42a542a9fc241fb04e3719d/docs/operator.rst #TODO This list needs to be checked if it's exhaustive - if(self.get_type()=="Subworkflow"): - code = self.get_work() - elif(self.get_type()=="Main DSL2"): - code = self.get_code() - code = re.sub(constant.WORKFLOW_HEADER, "", code) - if(code[-1]!='}'): - raise Exception("This shoudn't happen") - code = code[:-1] + code = self.get_code() - else: - code = self.get_code() + #For each block -> remove its code + for b in self.blocks: + code = code.replace(b.get_code(), "") things_to_remove = [] - things_to_remove+= self.processes+self.includes+self.subworkflows+self.functions - if(self.main!=None): - things_to_remove+=[self.main] - - for to_remove in things_to_remove: - code = code.replace(to_remove.get_code(get_OG = True), "", 1) + #things_to_remove+= self.processes+self.includes+self.subworkflows+self.functions + #if(self.main!=None): + # things_to_remove+=[self.main] + # + #for to_remove in things_to_remove: + # code = code.replace(to_remove.get_code(get_OG = True), "", 1) #We add this to simplify the search of the executors code = "start\n"+code+"\nend" @@ -607,4 +365,17 @@ class Main_DSL2(Nextflow_Building_Blocks): if(e.get_code()[:len(r)]==r): to_remove.append(e) for e in to_remove: - self.executors.remove(e) \ No newline at end of file + self.executors.remove(e) + + def get_structure(self, dico): + + for block in self.blocks: + block.get_structure(dico) + + for e in self.executors: + if(e.get_type()=="Operation"): + e.get_structure(dico) + elif(e.get_type()=="Call"): + e.get_structure(dico) + else: + raise Exception(f"Executor of type '{e.get_type()}' was extracted in a DSL2 workflow! I don't know what this is! The code is '{e.get_code()}'") diff --git a/src/subworkflow.py b/src/subworkflow.py index 920dd29e4a9b7264ab74be4c332c561cd89117cc..7db6cea153e5ffc8292a47baa9ec656ae5afece7 100644 --- a/src/subworkflow.py +++ b/src/subworkflow.py @@ -1,28 +1,33 @@ import re from . import constant from .code_ import Code -from .main_DSL2 import Main_DSL2 +from .root import Root +from .main import Main from .bioflowinsighterror import BioFlowInsightError from .outils import remove_jumps_inbetween_parentheses, get_dico_from_tab_from_id, check_if_element_in_tab_rocrate -class Subworkflow(Main_DSL2): - def __init__(self, code, origin, name): - Main_DSL2.__init__(self, code, origin) +class Subworkflow(Main): + def __init__(self, code, nextflow_file, name): + Main.__init__(self, code, nextflow_file) self.name = name.replace("'", "").replace('"', '') self.alias = self.name #These are the different parts of of a subworkflow -> work corresponds to the main self.take = [] + self.takes_channels = [] self.work = None self.emit = [] self.call = [] + #These are probably to remove self.initialised = False self.later_emits = [] self.number_times_called = 0 + self.called_by = []#List of calls + def add_to_emits(self, emit): self.later_emits.append(emit) @@ -30,9 +35,6 @@ class Subworkflow(Main_DSL2): def get_later_emits(self): return self.later_emits - def set_call(self, call): - self.call.append(call) - def get_code_with_alias(self): code = self.get_code() def replacer(match): @@ -132,7 +134,7 @@ class Subworkflow(Main_DSL2): raise Exception("Not possible!") def get_channel_from_name_takes(self, name): - for c in self.channels: + for c in self.takes_channels: if(name == c.get_name()): return c return None @@ -145,6 +147,7 @@ class Subworkflow(Main_DSL2): code[i] = code[i].strip() if(code[i]!=''): channel = self.get_channel_from_name_takes(code[i]) + #channel = self.root.get_channel_from_name(code[i]) #In the case the channel doesn't exist if(channel==None): from .operation import Operation @@ -159,8 +162,7 @@ class Subworkflow(Main_DSL2): raise BioFlowInsightError(f"The channel '{code[i]}' is already defined somewhere else in the subworkflow ('{self.get_name()}') or in the file.", num=4, origin=self) tab.append(ope) for channel in ope.get_gives(): - self.channels.append(channel) - + self.takes_channels.append(channel) self.take = tab #def initialise_emit(self): @@ -194,7 +196,7 @@ class Subworkflow(Main_DSL2): for i in range(len(code)): code[i] = code[i].strip() if(code[i]!=""): - channel = self.get_channel_from_name(code[i]) + channel = self.root.get_channel_from_name(code[i]) if(channel!=None): ope = Operation(code=f"emit: {code[i]}", origin=self) ope.set_as_artificial() @@ -243,7 +245,11 @@ class Subworkflow(Main_DSL2): if(not self.initialised): self.initialise_parts() self.initialise_takes() - super().initialise() + self.modules_defined = self.nextflow_file.get_modules_defined() + #Check that includes are not defined in the main or subworkflows + self.check_includes() + self.root = Root(code=self.get_work(), origin=self, modules_defined=self.modules_defined, subworkflow_inputs = self.takes_channels) + self.root.initialise() self.initialise_emit() self.initialised = True diff --git a/src/workflow.py b/src/workflow.py index 4e6c72843701cc078a0f951a1d9c0f9ec387690d..cc4ef304fd33c58f500c18c6a2f6816367cb3449 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -7,6 +7,7 @@ from .outils import is_git_directory, format_with_tabs, replace_thing_by_call, r from .outils_graph import flatten_dico, initia_link_dico_rec, get_number_cycles from .outils_annotate import get_tools_commands_from_user_for_process from .bioflowinsighterror import BioFlowInsightError +from .graph import Graph #Outside packages import os @@ -31,14 +32,13 @@ class Workflow: output_dir: A string indicating where the results will be saved name: A string indicating the name of the workflow processes_2_remove: A string indicating the processes to remove from the workflow - processes_annotation: A dictionnary containing processes 2 annotations (tools, commands and modules) - personnal_acces_token: The Github personnal access token (this is to use the Github API with more requests per hour) """ def __init__(self, file, duplicate=False, display_info=True, output_dir = './results', - name = None, processes_2_remove = None, - processes_annotation = None, - personnal_acces_token = None): + name = None, processes_2_remove = None): + + + #Getting the main nextflow file if(not os.path.isfile(file)): nextflow_files = glob.glob(f'{file}/*.nf') if(len(nextflow_files)==0): @@ -49,496 +49,79 @@ class Workflow: with open(file, 'r') as f: txt= f.read() except: - file =nextflow_files[0] + raise BioFlowInsightError("No 'main.nf' file found at the root of the prohject") self.duplicate = duplicate - self.DSL = "" self.display_info = display_info + self.processes_2_remove = processes_2_remove self.output_dir = Path(output_dir) - - self.nextflow_file = Nextflow_File( - file, - origin = self - ) - - + self.nextflow_files = [] self.workflow_directory = '/'.join(file.split('/')[:-1]) + self.name = name + self.graph = None - self.processes_annotation = processes_annotation - self.rocrate = None + OG_file = Nextflow_File(file, workflow = self, first_file = True) + self.DSL = OG_file.find_DSL() + self.create_empty_results() + if(self.display_info): + print(f"Workflow is written in {self.DSL}") - self.name = name - self.tab_processes_2_remove = None - self.personnal_acces_token = personnal_acces_token - if(processes_2_remove==""): - processes_2_remove = None - self.processes_2_remove = processes_2_remove - self.log = "" - self.fill_log() - self.address = "" - self.set_address() - self.dico = {} - self.get_dico() + def create_empty_results(self): + os.makedirs(self.output_dir, exist_ok=True) + os.makedirs(self.output_dir / 'debug', exist_ok=True) + os.makedirs(self.output_dir / 'graphs', exist_ok=True) + with open(self.output_dir / "debug" / "operations.nf",'w') as file: + pass + with open(self.output_dir / "debug" / "calls.nf",'w') as file: + pass + with open(self.output_dir / "debug" / "operations_in_call.nf",'w') as file: + pass def get_duplicate_status(self): return self.duplicate - - def get_display_info(self): - return self.display_info - + def get_output_dir(self): return Path(self.output_dir) def get_DSL(self): return self.DSL + def get_display_info_bool(self): + return self.display_info + def set_DSL(self, DSL): self.DSL = DSL - def get_all_executors(self): - dico = {} - self.nextflow_file.get_all_executors(dico) - return list(dico.keys()) - - def get_is_a_git_repo(self): - return is_git_directory(path = self.get_repo_adress()) - - def get_repo_adress(self): - """Method that returns the adress of the workflow repository - - Keyword arguments: - - """ - current_directory = os.getcwd() - repo = "/".join(self.nextflow_file.get_file_address().split("/")[:-1]) - if(repo==''): - repo = current_directory - return repo - - def get_processes_annotation(self): - """Method the dictionnary of the process annotations - - Keyword arguments: - - """ - return self.processes_annotation - - def fill_log(self): - """Method that reads the git log and saves it - - Keyword arguments: - - """ - if(self.get_is_a_git_repo()): - current_directory = os.getcwd() - os.chdir(self.get_repo_adress()) - - try: - os.system(f"git log --reverse > temp_{id(self)}.txt") - with open(f'temp_{id(self)}.txt') as f: - self.log = f.read() - os.system(f"rm temp_{id(self)}.txt") - except: - None - os.chdir(current_directory) - - def get_address(self): - """Method that returns the adress of the workflow main - - Keyword arguments: - - """ - return self.address - - def get_root_directory(self): - return '/'.join(self.get_address().split('/')[:-1]) - - def get_workflow_directory(self): - """Method that returns the workflow directory - - Keyword arguments: - - """ - return self.workflow_directory - - - def set_address(self): - """Method that sets the adress of the workflow main - - Keyword arguments: - - """ - current_directory = os.getcwd() - os.chdir(self.get_repo_adress()) - try: - os.system(f"git ls-remote --get-url origin > temp_address_{id(self)}.txt") - with open(f'temp_address_{id(self)}.txt') as f: - self.address = f.read() - os.system(f"rm temp_address_{id(self)}.txt") - except: - None - os.chdir(current_directory) - for match in re.finditer(r"https:\/\/github\.com\/([^\.]+)\.git", self.address): - self.address = match.group(1) - - def get_dico(self): - """Method that returns a dictionnary containg information regarding the github repository - - Keyword arguments: - - """ - if(self.get_is_a_git_repo()): - current_directory = os.getcwd() - os.chdir(self.get_repo_adress()) - try: - if(self.personnal_acces_token!=None): - command = f'curl --silent --request GET --url "https://api.github.com/repos/{self.address}" --header "Authorization: Bearer {self.personnal_acces_token}" --header "X-GitHub-Api-Version: 2022-11-28" > temp_dico_{id(self)}.json' - else: - command = f'curl --silent --request GET --url "https://api.github.com/repos/{self.address}" > temp_dico_{id(self)}.json' - _ = os.system(command) - with open(f'temp_dico_{id(self)}.json') as json_file: - self.dico = json.load(json_file) - os.system(f"rm temp_dico_{id(self)}.json") - - except: - _ = os.system(f"rm temp_dico_{id(self)}.json") - os.chdir(current_directory) - - - def set_name(self, name): - self.name = name - - def get_name(self): - """Method that returns the name of the workflow - - Keyword arguments: - - """ - if(self.name==None): - self.set_name(self.nextflow_file.get_file_address().split("/")[-2]) - return self.name - else: - return self.name - - #Format yyyy-mm-dd - #Here i return the first commit date - def get_datePublished(self): - """Method that returns the date of publication - - Keyword arguments: - - """ - if(self.datePublished==None): - for match in re.finditer(r"Date: +\w+ +(\w+) +(\d+) +\d+:\d+:\d+ +(\d+)",self.log): - month = constant.month_mapping[match.group(1)] - day = match.group(2) - year = match.group(3) - return f"{year}-{month}-{day}" - else: - return self.datePublished - - - def get_description(self): - """Method that returns the description - - Keyword arguments: - - """ - if(self.description==None): - try: - res = self.dico["description"] - except: - res = None - return res - else: - return self.description - - - - def get_main_file(self): - """Method that returns the name of the main file - - Keyword arguments: - - """ - return self.nextflow_file.get_file_address().split("/")[-1] - - - def get_license(self): - """Method that returns the license - - Keyword arguments: - - """ - if(self.license==None): - try: - res = self.dico["license"]["key"] - except: - res = None - return res - else: - return self.license - - - #TODO - def get_creativeWorkStatus(self): - return "TODO" - - #TODO - def get_version(self): - return "TODO" - - - def get_authors(self): - """Method that returns a list of the authors - - Keyword arguments: - - """ - if(self.authors==None): - authors = {} - for match in re.finditer(r"Author: ([^>]+)<([^>]+)>",self.log): - authors[match.group(2)] = match.group(1).strip() - tab = [] - for author in authors: - #tab.append({"@id":author, "name":authors[author]}) - tab.append({"@id":authors[author], "email":author}) - return tab - else: - authors = self.authors.split(',') - tab = [] - for a in authors: - tab.append({"@id":a.strip()}) - return tab - - - #Need to follow this format : "rna-seq, nextflow, bioinformatics, reproducibility, workflow, reproducible-research, bioinformatics-pipeline" - def get_keywords(self): - """Method that returns the keywords + def get_first_file(self): + for file in self.nextflow_files: + if(file.first_file): + return file + + def get_workflow_main(self): + return self.get_first_file().main - Keyword arguments: - - """ - if(self.keywords==None): - try: - res = ", ".join(self.dico["topics"]) - except: - res = None - return res - else: - return self.keywords + def add_nextflow_file_2_workflow(self, nextflow_file): + self.nextflow_files.append(nextflow_file) + self.nextflow_files = list(set(self.nextflow_files)) - - - def get_producer(self): - """Method that returns the producer + def initialise(self): + """Method that initialises the analysis of the worflow Keyword arguments: """ - if(self.producer==None): - try: - res = {"@id": self.dico["owner"]["login"]} - except: - res = None - return res - else: - return self.producer - - def get_publisher(self): - """Method that returns the publisher + #Right now i'm just gonna do everything in DSL2 - Keyword arguments: - - """ - if(self.dico!={}): - return "https://github.com/" + #At this point there should only be one nextflow file + if(len(self.nextflow_files)==1): + self.nextflow_files[0].initialise() else: - return None - - - def get_file_address(self): - """Method that returns the adress of the workflow main - - Keyword arguments: + raise BioFlowInsightError("This souldn't happen. There are multiple Nextflow files composing the workflow before the analysis has even started.") - """ - return self.nextflow_file.get_file_address() - - def add_2_rocrate(self, dico): - """TODO - """ - self.nextflow_file.add_2_rocrate(dico) - - def get_processes_defined(self): - """Method that returns a list of the processes defined - - Keyword arguments: - - """ - processes = self.nextflow_file.get_processes_defined(dict={}).keys() - return list(processes) - - def get_processes_called(self): - """Method that returns a list of the processes called/used during the workflow execution - - Keyword arguments: - - """ - return self.nextflow_file.get_processes_called() - - def get_subworkflows_called(self): - """Method that returns a list of the subworkflows called/used during the workflow execution - - Keyword arguments: - - """ - return self.nextflow_file.get_subworkflows_called() - - def get_functions_called(self): - """Method that returns a list of the functions called/used during the workflow execution - - Keyword arguments: - - """ - return self.nextflow_file.get_functions_called() - - - - def get_tools(self): - """Method that returns a list of the tools used by the workflow - - Keyword arguments: - - """ - processes = self.get_processes_called() - tab = [] - for p in processes: - tab+=p.get_tools() - return list(set(tab)) - - def get_commands(self): - """Method that returns a list of the commands used by the workflow - - Keyword arguments: - - """ - processes = self.get_processes_called() - tab = [] - for p in processes: - tab+=p.get_commands() - return list(set(tab)) - - def get_modules(self): - """Method that returns a list of the modules used by the workflow - - Keyword arguments: - - """ - processes = self.get_processes_called() - tab = [] - for p in processes: - tab+=p.get_modules() - return list(set(tab)) - - def initialise_rocrate(self): - """Method that initialises the RO-Crate file - - Keyword arguments: - - """ - self.rocrate = RO_Crate(self) - self.rocrate.initialise() - - def get_layers(self): - """TODO - """ - graph = self.nextflow_file.get_graph() - if(not graph.is_initialised()): - graph.initialise() - process_dependency_graph = graph.get_process_dependency_graph_dico() - dico_flattened = {"nodes": [], "edges": [], "subworkflows":[]} - - def get_node(dico, id): - for n in dico['nodes']: - if(n['id']==id): - return n - return None - - def remove_node(dico, id): - node = None - for n in dico['nodes']: - if(n['id']==id): - node = n.copy() - break - try: - dico['nodes'].remove(node) - except: - print("prob1") - - def remove_edge_if_A(dico, id_A): - edges = [] - for edge in dico['edges']: - if(edge['A']==id_A): - edges.append(edge) - for edge in edges: - try: - dico['edges'].remove(edge) - except: - print("prob2") - - flatten_dico(process_dependency_graph, dico_flattened) - links = initia_link_dico_rec(dico_flattened) - _, edges_create_cycles = get_number_cycles(links) - #If the graph isn't a dag -> we remoce the edges which make it cyclic - for A, B in edges_create_cycles: - #print({"A":A, "B":B}) - #print(dico_flattened["edges"]) - dico_flattened["edges"].remove({"A":A, "B":B, "label":''}) - - layers = [] - while(dico_flattened["nodes"]!=[]): - - layer = dico_flattened["nodes"].copy() - - for edge in dico_flattened["edges"]: - removed = False - node = get_node(dico_flattened, edge['B']) - while(not removed): - try: - layer.remove(node) - except: - removed = True - - - for node in layer: - dico_flattened['nodes'].remove(node) - remove_edge_if_A(dico_flattened, node['id']) - layers.append(layer) - - layers_object = [] - for layer in layers: - tab = [] - for element in layer: - address = int(re.findall(r"\dx\w+", element['id'])[0], base=16) - tab.append(ctypes.cast(address, ctypes.py_object).value) - layers_object.append(tab) - return layers_object - - - def initialise(self, create_rocrate = True): - """Method that initialises the analysis of the worflow - - Keyword arguments: - - """ - self.nextflow_file.initialise() - if(create_rocrate): - self.initialise_rocrate() if(self.display_info): citation = """To cite BioFlow-Insight, please use the following publication: @@ -546,697 +129,39 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen print() print(citation) + if(self.graph==None): + self.graph = Graph(self) + + + def iniatilise_tab_processes_2_remove(self): - if(self.tab_processes_2_remove==None): + if(self.processes_2_remove==None): tab_processes_2_remove = [] if(self.processes_2_remove!=None): temp = self.processes_2_remove.split(",") for t in temp: tab_processes_2_remove.append(t.strip()) - self.tab_processes_2_remove = tab_processes_2_remove - - def generate_all_graphs(self, render_graphs = True): - """Method that generates all graphs representing the workflow - - Keyword arguments: - - """ - self.iniatilise_tab_processes_2_remove() - self.nextflow_file.generate_all_graphs(render_graphs = render_graphs, processes_2_remove = self.tab_processes_2_remove) - - def generate_specification_graph(self, render_graphs = True): - self.iniatilise_tab_processes_2_remove() - self.nextflow_file.generate_specification_graph(render_graphs = render_graphs, processes_2_remove = self.tab_processes_2_remove) - - def generate_process_dependency_graph(self, render_graphs = True): - self.iniatilise_tab_processes_2_remove() - self.nextflow_file.generate_process_dependency_graph(render_graphs = render_graphs, processes_2_remove = self.tab_processes_2_remove) - - def check_relevant_processes_in_workflow(self, relevant_processes): - #Check all relevat processes are in wf - workflow_processes = [] - for p in self.get_processes_called(): - workflow_processes.append(p.get_alias()) - for p in relevant_processes: - if(p not in workflow_processes): - raise BioFlowInsightError(f"The element '{p}' given as a relevant processes is not present in the workflow's processes", 24) - - def generate_user_view(self, relevant_processes = [], render_graphs = True): - self.check_relevant_processes_in_workflow(relevant_processes) - self.iniatilise_tab_processes_2_remove() - self.nextflow_file.generate_user_view(relevant_processes = relevant_processes, render_graphs = render_graphs, processes_2_remove = self.tab_processes_2_remove) + self.processes_2_remove = tab_processes_2_remove - def generate_level_graphs(self, render_graphs = True, label_edge=True, label_node=True): - self.iniatilise_tab_processes_2_remove() - self.nextflow_file.generate_level_graphs(render_graphs = render_graphs, processes_2_remove = self.tab_processes_2_remove, label_edge=label_edge, label_node=label_node) - def build_processes_2_tools(self): - - print() - print("Let's extarct the tools from the processes") - print("------------------------------------------") - print() - exiting_tools, existing_commands = [], [] - processes = self.get_processes_called() + def get_structure(self): dico = {} - index=0 - for p in processes: - print(f"* {index/len(processes)*100:.2f}% ({index}) processes annotated") - tools_found, commands_found, exiting_tools, existing_commands = get_tools_commands_from_user_for_process(p, exiting_tools, existing_commands) - dico[p.get_code()] = {} - dico[p.get_code()]["tools"] = tools_found - dico[p.get_code()]["commands"] = commands_found - index+=1 - - with open(f"{self.get_output_dir()}/processes_2_tools.json", 'w') as output_file : - json.dump(dico, output_file, indent=2) - return dico - - - - def get_number_subworkflows_process_dependency_graph(self): - return self.nextflow_file.get_number_subworkflows_process_dependency_graph() - - def get_number_subworkflows_user_view(self): - return self.nextflow_file.get_number_subworkflows_user_view() - - def node_2_subworkflows_process_dependency_graph(self): - return self.nextflow_file.node_2_subworkflows_process_dependency_graph() - - def node_2_subworkflows_user_view(self): - return self.nextflow_file.node_2_subworkflows_user_view() - - def check_fake_dependency_user_view(self): - return self.nextflow_file.check_fake_dependency_user_view() - - def generate_user_and_process_metadata(self): - self.nextflow_file.generate_user_and_process_metadata() - - #This Function returns the channels on which the subworkflow (things_added_in_cluster) depend on - def get_takes(self, things_added_in_cluster): - #Basiccaly this is a deco of channels to opeartions -> when the value is an empty list - #This means that the channel is totally definied in the subworkflow -> so we are searching for - #Channels which aren't totatly defined in the subworkflow - channels_2_sources = {} - - for ele in things_added_in_cluster: - if(ele.get_type() == "Operation"): - for o in ele.get_origins(): - channels_2_sources[o] = replace_thing_by_call(o.get_source()) - elif(ele.get_type() == "Call"): - for param in ele.get_parameters(): - if(param.get_type()!="Channel"): - raise Exception("This shouldn't happen -> with the rewrite all the params should be channels") - else: - channels_2_sources[param] = replace_thing_by_call(param.get_source()) - else: - raise Exception("This shouldn't happen") - - takes = [] - for channel in channels_2_sources: - if(set(channels_2_sources[channel]).intersection(things_added_in_cluster)!=set(channels_2_sources[channel])): - takes.append(channel) - #print(things_added_in_cluster) - #print(channels_2_operations_needed) - return takes - - #This Function returns the channels the subworkflow (things_added_in_cluster) emits (other things depend on) - def get_emits(self, things_added_in_cluster): - emits = [] - channel_2_sink = {} - #Basiccaly this is a deco of channels to opeartions -> when the value is an empty list - #This means that the channel is totally definied in the subworkflow -> so we are searching for - #Channels which aren't totatly defined in the subworkflow - #This means that things outside the subworkflow depend on this channel - channel_2_sink = {} - - for ele in things_added_in_cluster: - if(ele.get_type() == "Operation"): - for o in ele.get_gives(): - channel_2_sink[o] = replace_thing_by_call(o.get_sink()) - elif(ele.get_type() == "Call"): - thing = ele.get_first_element_called() - for e in thing.get_later_emits(): - channel_2_sink[e] = replace_thing_by_call(e.get_sink()) - else: - print(ele) - raise Exception("This shouldn't happen") - - for channel in channel_2_sink: - if(set(channel_2_sink[channel]).intersection(things_added_in_cluster)!=set(channel_2_sink[channel])): - emits.append(channel) - return emits - - #When there are multiple emits turn them into one and the end of the call eg, instead of fastp_ch2 = fastp.out.fastp_ch2 -> have fastp_ch2 = fastp_ch - def convert_to_DSL2(self): - if(self.get_DSL()=="DSL2"): - print("Workflow is already written in DSL2") - else: - #This tag is used as an identification to safely manipulate the string - tag = str(time.time()) - - code = self.nextflow_file.get_code() - start_code = r"#!/usr/bin/env nextflow" - start_code_pattern = r"\#\!\s*\/usr\/bin\/env\s+nextflow" - end_code = "workflow.onComplete" - - pos_start, pos_end= 0, len(code) - if(code.find(end_code)!=-1): - pos_end = code.find(end_code) - code_to_replace = code[pos_start:pos_end] - for match in re.finditer(start_code_pattern, code): - pos_start = match.span(0)[1]+1 - #if(code.find(start_code)!=-1): - # pos_start = code.find(start_code)+len(start_code) - body = code[pos_start:pos_end]#.replace('\n', '\n\t') - - include_section = f"//INCLUDE_SECTION_{tag}" - params_section = f"//PARAMS_SECTION_{tag}" - function_section = f"//FUNCTION_SECTION_{tag}" - process_section = f"//PROCESS_SECTION_{tag}" - - - code = code.replace(code_to_replace, f"""{start_code}\n\n\n{include_section}\n\n\n{params_section}\n\n\n{function_section}\n\n\n{process_section}\n\n\nworkflow{{\n\n{body}\n}}\n\n""") - - ##I've out this in a comment cause since it's a DSL1 - #params_list = [] - #for match in re.finditer(r"params.\w+ *\= *[^\n=]([^\n])*", code): - # params_list.append(match.group(0)) - #for params in params_list: - # code = code.replace(params, "") - #params_code = "\n".join(params_list) - #code = code.replace(params_section, params_code) - - #Moving Functions - functions = [] - for f in self.nextflow_file.functions: - function = f.get_code() - functions.append(function) - - for r in functions: - code = code.replace(r, "") - code = code.replace(function_section, "\n\n".join(functions)) - - #Moving Processes - processes = [] - to_replace = [] - for p in self.nextflow_file.get_processes(): - new_process, call = p.convert_to_DSL2() - processes.append(new_process) - to_replace.append((p.get_code(), call)) - - for r in to_replace: - code = code.replace(r[0], r[1]) - code = code.replace(process_section, "\n\n".join(processes)) - - #TODO -> update the operations -> also consider the operations in the params of the calls which need to be updated - - for o in self.nextflow_file.get_executors(): - if(o.get_type()=="Operation"): - code = code.replace(o.get_code(get_OG=True), o.convert_to_DSL2()) - else: - raise Exception(f"Executor of type '{o.get_type()}' was extracted in a DSL1 workflow! This shoudn't happen! The code is '{o.get_code()}'") - - #Putting || back - code = code.replace("$OR$", "||") - - return code - - def write_workflow_into_one_file(self): - #This tag is used as an identification to safely manipulate the string - tag = str(time.time()) - - code = self.nextflow_file.get_code() - - #params_section = f"//PARAMS_SECTION_{tag}" - function_section = f"//FUNCTION_SECTION" - process_section = f"//PROCESS_SECTION" - subworkflow_section = f"//SUBWORKFLOW_SECTION" - - ankers = function_section+ "\n"*3 + process_section+ "\n"*3 + subworkflow_section - - to_replace = [] - for match in re.finditer(constant.FULL_INLCUDE_2, code): - to_replace.append(match.group(0)) - for r in to_replace: - code = code.replace(r, ankers) - ankers = "" - - #Adding processes into code - for p in self.get_processes_called(): - if(p.get_code() not in code): - code = code.replace(process_section, '\n'+p.get_code_with_alias()+'\n'+process_section) - - #Adding subworkflows into code - for sub in self.get_subworkflows_called(): - if(sub.get_code() not in code): - code = code.replace(subworkflow_section, subworkflow_section+'\n'+sub.get_code_with_alias()+'\n') - - #Adding functions into code - for fun in self.get_functions_called(): - if(fun.get_code() not in code): - code = code.replace(function_section, function_section+'\n'+fun.get_code()+'\n') - - #Remove the ankers - #code = code.replace(function_section, "") - #code = code.replace(process_section, "") - #code = code.replace(subworkflow_section, "") - ankers = {"function_section":function_section, - "process_section":process_section, - "subworkflow_section":subworkflow_section} - - return code, ankers - - def simplify_workflow_code(self): - code, ankers = self.write_workflow_into_one_file() - for exe in self.get_all_executors(): - if(exe.get_type()=="Call" or exe.get_type()=="Operation"): - code = code.replace(exe.get_code(get_OG = True), exe.simplify_code()) - else: - print(exe.get_code(), exe.get_type()) - raise Exception("This shouldn't happen") - return code - - #I do not recommand that the dev uses the same name for the channels inside and outside the channels - #Since the local channels become local at the upper level - def rewrite_subworkflow_call(self, code, subworklfow): - - #Remove the defintion from the code - code = code.replace(subworklfow.get_code(), "") - OG_call = subworklfow.get_call() - if(len(OG_call)>1): - raise Exception("This shouldn't happen") - OG_call = OG_call[0] - OG_body = subworklfow.get_work() - - #REPLACE HEADER TAKES - subworkflow_takes = subworklfow.get_takes() - parameters = OG_call.get_parameters() - if(len(subworkflow_takes)!=len(parameters)): - raise Exception("This shouldn't happen -> the same number of parameters should be kept") - #This is to replace the paramters and the takes - new_header = "" - for i in range(len(parameters)): - param = parameters[i] - takes = subworkflow_takes[i].get_gives()[0] - if(takes.get_code()!=param.get_code(get_OG = True)): - new_header+=f"{takes.get_code()} = {param.get_code(get_OG = True)}" - - code = code.replace(OG_call.get_code(get_OG = True), f"{new_header}\n\n{OG_body}") - - #REPLACE THE EMITS - #TODO admittedly this code below is very moche -> but it's functionnal -> update it - emits = subworklfow.get_emit() - to_replace = [] - all_executors = self.get_all_executors() - for exe in all_executors: - #We don't need to check the case call since the workflow has already been rewriteen -> emits only appear in operations - if(exe.get_type()=="Operation"): - emited = exe.get_origins() - if(len(emited)==1): - emited = emited[0] - if(emited.get_type()=="Emitted"): - if(emited.get_emitted_by()==subworklfow): - if(emited.get_emits() not in emits): - raise Exception("This shoudn't happen -> since it is the actual subworkflow") - to_replace.append((exe.get_code(get_OG = True), f"{exe.get_gives()[0].get_code()} = {emited.get_emits().get_origins()[0].get_code()}")) - for r in to_replace: - old, new = r - #Case of channel == channel - if(new.split("=")[0].strip()==new.split("=")[1].strip()): - new = '' - code = code.replace(old, new) - - - return code - - def rewrite_and_initialise(self, code): - #Write new code in temporary file - temp_file = self.get_output_dir()/f"temp_{str(self)[-7:-2]}.nf" - with open(temp_file, "w") as file: - file.write(code) - - #Replace old analysis with new analysis (simplified code) - self.__init__(str(temp_file), display_info = False, duplicate=True) - self.initialise(create_rocrate=False) - - def generate_random_relevant_processes(self, alpha = -1): - import random - - #Random value between 0 and 1, centered at 0.5 - def get_value(): - check = True - val = -1 - while(check): - check = False - val = random.gauss(0.5, 0.1) - if(val<0 or val>1): - check = True - return val - - if(self.duplicate): - if(alpha == -1): - alpha = get_value() - else: - if(0<=alpha and alpha<=1): - None - else: - raise BioFlowInsightError("alpha is not in the interval [0; 1]") - - processes_called = self.get_processes_called() - nb_2_select = int(alpha*len(processes_called)) - sampled = random.sample(set(processes_called), nb_2_select) - name_select = [] - for p in sampled: - name_select.append(p.get_alias()) - return name_select + dico['nodes'] = [] + dico['edges'] = [] + dico['subworkflows'] = {} + + if(self.get_DSL() == "DSL1"): + return self.get_structure_DSL1(dico=dico) + elif(self.get_DSL() == "DSL2"): + return self.get_workflow_main().get_structure(dico) + #return self.get_structure_DSL2(dico=dico, start = True) else: - raise BioFlowInsightError("Trying to generate random relevant processes however option 'duplicate' is not activated.") - - - #Conert workflow to user_view only makes sense when the option duplicate is activated -> otherwise is doesn't make sense + it makes the analysis way more complicated - def convert_workflow_2_user_view(self, relevant_processes = []): - if(self.duplicate): - - code = self.simplify_workflow_code() - self.rewrite_and_initialise(code) - - #Get the clusters and the code - self.check_relevant_processes_in_workflow(relevant_processes) - self.nextflow_file.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = []) - clusters = self.nextflow_file.graph.get_clusters_from_user_view() - - #DETERMING WHICH SUBWORKFLOWS ARE BROKEN WITH THE CLUSTER - #Creating the clusters with calls instead of processes or subworkflows - set_clusters_with_calls = [] - for c in clusters: - tab = [] - for ele in c: - if(ele.get_type()=="Operation"): - if(ele.get_artificial_status()==False): - tab.append(ele) - else: - call = ele.get_call() - if(len(call)!=1): - raise Exception("This shouldn't happen") - tab.append(call[0]) - set_clusters_with_calls.append(set(tab)) - - #Getting subworkflows to executors - subworkflow_2_executors = {} - for sub in self.get_subworkflows_called(): - dico = {} - sub.get_all_executors(dico) - temp = set(list(dico.keys())) - subworkflow_2_executors[sub] = [] - for ele in temp: - #Cannot add calls to subworkflows -> they are already present by definition - if(ele.get_type()=="Call" and ele.get_first_element_called().get_type()=="Subworkflow"): - None - #We don't add it - else: - subworkflow_2_executors[sub].append(ele) - #subworkflow_2_executors[sub.get_name()] = set(list(dico.keys())) - - def get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls): - broken_subworkflows = [] - for sub in subworkflow_2_executors: - #You want this list (set) to be equal to subworkflow_2_executors[sub] - elements_in_sub_with_clusters = [] - for cluster in set_clusters_with_calls: - if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])): - break - for c in cluster: - if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])): - break - if(c in subworkflow_2_executors[sub]): - elements_in_sub_with_clusters+=list(cluster) - break - if(set(elements_in_sub_with_clusters)==set(subworkflow_2_executors[sub])): - None - #This means that the subworkflow is Intact - else: - #This means that the subworkflow is broken - broken_subworkflows.append(sub) - return broken_subworkflows + raise Exception(f"The workflow's DSL is '{self.DSL}' -> I don't know what this is!") - broken_subworkflows = get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls) - #Rewrite broken subworkflows - for sub in broken_subworkflows: - code = self.rewrite_subworkflow_call(code, sub) - - #TODO -> this needs to be optimised - self.rewrite_and_initialise(code) - #Get the clusters and the code - self.nextflow_file.generate_user_view(relevant_processes = relevant_processes, processes_2_remove = []) - clusters = self.nextflow_file.graph.get_clusters_from_user_view() - - - - #Get the clsuters with the corresponding operations inside - #for i in range(len(clusters)): - # c = clusters[i] - # if(len(c)>1): - # clusters[i] = self.nextflow_file.graph.get_induced_subgraph(c) - #print(clusters) - #Get the topological order - clusters = self.nextflow_file.graph.get_topogical_order(clusters) - #print(clusters) - - - #Creating the subworkflows from clusters - calls_in_operations = [] - non_relevant_name = 1 - channels_to_replace_outside_of_cluster = [] - subworkflow_clusters_to_add, subworkflow_cluster_calls_to_add = [], [] - index_cluster = 0 - for elements in clusters: - #Only create the subworkflows for clusters with more than one element - processes_added = [] - things_added_in_cluster = [] - if(len(elements)>1): - name, body, take, emit = "", "", "", "" - first_element = True - for ele in elements: - - if(ele.get_type()=="Process"): - - #Determine the name of the created subworkflow cluster - if(ele.get_alias() in relevant_processes): - name = f"cluster_{ele.get_alias()}" - #Get the call of thing (either process or subworkflow) - #TODO -> check it works with subworkflows - call = ele.get_call() - - #This verification is really important - if(len(call)!=1): - for c in call: - print(c.get_code(get_OG=True)) - - raise Exception("This shoudn't happen since duplicate mode is activated") - call = call[0] - - #If first element -> add marker for the subworkflow call - if(first_element): - code = code.replace(call.get_code(get_OG = True), f"//Anker_cluster{index_cluster}") - first_element = False - else: - code = code.replace(call.get_code(get_OG = True), "") - - - processes_added.append(call.get_first_element_called()) - printed_condition = " && ".join(call.get_condition().get_conditions()) - if(printed_condition!=""): - body+=f"if({printed_condition}) {{\n{call.get_code()}\n}}\n" - else: - body+=f"\n{call.get_code()}\n" - things_added_in_cluster.append(call) - elif(ele.get_type()=="Operation"): - - #If first element -> add marker for the subworkflow call - if(first_element): - code = code.replace(ele.get_code(get_OG = True), f"//Anker_cluster{index_cluster}") - first_element = False - else: - code = code.replace(ele.get_code(get_OG = True), "") - - #Ignore these cases - if(ele.get_code()[:4] not in ["emit", "take"]): - origins = ele.get_origins() - for o in origins: - if(o.get_type()=="Call"): - calls_in_operations.append(o) - printed_condition = " && ".join(ele.get_condition().get_conditions()) - if(printed_condition!=""): - body+=f"if({printed_condition}) {{\n{ele.get_code()}\n}}\n" - else: - body+=f"\n{ele.get_code()}\n" - things_added_in_cluster.append(ele) - else: - raise Exception("This shoudn't happen") - - #TODO check this part of the code is never seen - #Here we removing the Call_12313 thing - for call in calls_in_operations: - raise Exception("This shouldn't happen since the workflows has been rewritten") - body = body.replace(call.get_code(), "") - body = body.replace(str(call), call.get_code()) - - #If the name=="" -> it means there isn't any relevant processes in the cluster -> it means it's a cluster of non relevant nodes - if(name==""): - #If there are processes called we are going to use them - if(len(processes_added)>0): - #TODO find a better naming system - name = f"non_relevant_cluster_{processes_added[0].get_alias()}" - else: - #TODO find a better naming system - name = f"non_relevant_cluster_{non_relevant_name}" - non_relevant_name+=1 - - #Check that there is a single condtion in the body - body = group_together_ifs(body) - conditions_in_subworkflow = [] - end = -1 - for match in re.finditer(r"if\s*(\([^\{]+)\{", body): - conditions_in_subworkflow.append(match.group(1).strip()) - _, start = match.span(0) - - if(len(conditions_in_subworkflow)==1): - end = extract_curly(body, start) - body = body[start: end-1].strip() - - - #TAKE - #Adding take parameters on the inside of the subworkflow - takes_param = self.get_takes(things_added_in_cluster) - new_param_names, index, old_param_names = [], 1, [] - for param in takes_param: - param_name = f"param_{name}_{index}" - new_param_names.append(param_name) - old_param_names.append(param.get_code()) - index += 1 - if(len(new_param_names)>0): - temp = '\n'.join(new_param_names) - take = f"\ntake:\n{temp}\n" - - #EMIT - #Adding the emitted outputs - emitted_outputs = self.get_emits(things_added_in_cluster) - new_output_names, index, old_output_names = [], 0, [] - for output in emitted_outputs: - output_name = f"{name}.out[{index}]" - new_output_names.append(output_name) - old_output_names.append(output.get_code()) - index += 1 - - if(len(old_output_names)>0): - temp = '\n'.join(old_output_names) - emit = f"\nemit:\n{temp}\n" - - #Adding empty channels if it doesn't exist in the case of a negative condition - body = get_channels_to_add_in_false_conditions(body, old_output_names) - - - - #Replace names inside subworkflow - subworkflow_code = f"workflow {name} {{\n{take}\nmain:\n{body}\n{emit}\n}}" - for i in range(len(new_param_names)): - pattern = fr"[\=\,\(] *({re.escape(takes_param[i].get_code())})[\s\,\)\.]" - subworkflow_code = replace_group1(subworkflow_code, pattern, new_param_names[i]) - #subworkflow_code = subworkflow_code.replace(takes_param[i].get_code(), new_param_names[i]) - - #TODO -> added verification of conditions - params = ", ".join(old_param_names) - subworkfow_call_case_true = f"{name}({params})" - subworkfow_call_case_false = "" - for i in range(len(new_output_names)): - #In the case of channels, we just add chanel = subworkflow.out[i] - if(not bool(re.findall("\.\s*out", old_output_names[i]))): - subworkfow_call_case_true+=f"\n{old_output_names[i]} = {new_output_names[i]}" - subworkfow_call_case_false+=f"\n{old_output_names[i]} = Channel.empty()" - #In the case of emitted values we need to replace the code on the outside - else: - param_out_name= f"{name}_out_{i+1}" - subworkfow_call_case_true+=f"\n{param_out_name} = {new_output_names[i]}" - subworkfow_call_case_false+=f"\n{param_out_name} = Channel.empty()" - channels_to_replace_outside_of_cluster.append((old_output_names[i], param_out_name)) - #If there was only one single condition in the subworkflow cluster -> then we add it when the call is done - if(len(conditions_in_subworkflow)==1): - subworkfow_call = f"if{conditions_in_subworkflow[0]} {{\n{subworkfow_call_case_true}\n}} else {{\n{subworkfow_call_case_false}\n}}" - None - else: - subworkfow_call = subworkfow_call_case_true - - - subworkflow_clusters_to_add.append(subworkflow_code) - subworkflow_cluster_calls_to_add.append(subworkfow_call) - index_cluster+=1 - - - #TODO -> rmoving the conditions which are problematic - #This might not be the probleme -> when rerunnung the analysis isn't totally robust - still_simplifying_conditions = True - while(still_simplifying_conditions): - still_simplifying_conditions = False - to_replace, anker1, anker2 = "", "", "" - #Replace if/else - for match in re.finditer(r"if\s*\([^\{]+\{\s*(\/\/Anker_cluster\d|\s)\s*\}\s*else\s*\{\s*(\/\/Anker_cluster\d|\s)\s*\}", code): - to_replace = match.group(0) - anker1, anker2 = match.group(1), match.group(2) - still_simplifying_conditions = True - break - #Replace empty if on its own - if(not still_simplifying_conditions): - for match in re.finditer(r"(if\s*\([^\{]+\{\s*(\/\/Anker_cluster\d|\s)\s*\})\s*[^e]", code): - to_replace = match.group(1) - anker1 = match.group(2) - still_simplifying_conditions = True - break - if(still_simplifying_conditions): - code = code.replace(to_replace, f"{anker1}\n{anker2}") - - - #Replace the ankers by the calls of the subworkflows - for i in range(len(subworkflow_clusters_to_add)): - #print(f"//Anker_cluster{i}", subworkflow_cluster_calls_to_add[i]) - code = code.replace(f"//Anker_cluster{i}", subworkflow_cluster_calls_to_add[i]) - - for old, new in channels_to_replace_outside_of_cluster: - pattern= fr"[ \(,]({re.escape(old)})" - code = replace_group1(code, pattern, new) - #code = code.replace(old, new) - - - #Add the subworkflow defintions - #------------------------------------- - #Add anker - subworkflow_section = f"//ANKER 4 SUBWORKFLOW DEF" - to_replace = "" - for match in re.finditer(r"workflow\s+\w*\s*\{", code): - to_replace = match.group(0) - break - if(to_replace==""): - raise Exception("No call to a workflow") - - code = code.replace(to_replace, f"{subworkflow_section}\n\n{to_replace}") - - for sub in subworkflow_clusters_to_add: - code = code.replace(f'{subworkflow_section}', f"{sub}\n\n{subworkflow_section}") - - - #Putting || back - code = code.replace("$OR$", "||") - - return remove_extra_jumps(format_with_tabs(code)) - - - #So basically when retriving a thing (process or subworkflow) - #There is necessarily one call associated with the thing -> since we have the option duplicate activated - #You can check if len(thing.call)==1 -> and if not raise an error (but this shouldn't happen) - else: - raise BioFlowInsightError("Trying to convert the workflow with user view however option 'duplicate' is not activated. -> to reformulated") - - + def generate_specification_graph(self, render_graphs = True): + self.iniatilise_tab_processes_2_remove() + self.graph.initialise(processes_2_remove = self.processes_2_remove) + self.graph.get_specification_graph(render_graphs = render_graphs) diff --git a/tests/test_main_DSL2.py b/tests/test_main_DSL2.py index 6585066fa30ebe9217cadf9badf5d96172149b6e..463a752aa43e30e2e5cccb4de65ce0d7c278b5b2 100644 --- a/tests/test_main_DSL2.py +++ b/tests/test_main_DSL2.py @@ -1,7 +1,7 @@ import unittest -from src.main_DSL2 import * +from src.main import * -class TestMain_DSL2(unittest.TestCase): +class TestMain(unittest.TestCase): def test_check_everything_works(self): self.assertTrue(True)