import re import os import json import glob from datetime import date from pathlib import Path from . import constant from .nextflow_building_blocks import Nextflow_Building_Blocks from .outils import * from .bioflowinsighterror import BioFlowInsightError class Nextflow_File(Nextflow_Building_Blocks): def __init__(self, address, workflow, first_file = False): self.address = address self.workflow = workflow self.first_file = first_file self.main = None self.workflow.add_nextflow_file_2_workflow(self) self.includes = [] self.processes = [] self.subworkflows = [] self.functions = [] self.initialised = False contents = check_file_exists(self.get_file_address(), self) Nextflow_Building_Blocks.__init__(self, contents, initialise_code=True) self.check_file_correctness() #---------------------- #GENERAL #---------------------- def add_to_ternary_operation_dico(self, old, new): self.workflow.add_to_ternary_operation_dico(old, new) def add_map_element(self, old, new): self.workflow.add_map_element(old, new) def get_root_directory(self): return self.workflow.get_root_directory() def get_string_line(self, bit_of_code): return self.code.get_string_line(bit_of_code) def get_conditions_2_ignore(self): return self.workflow.get_conditions_2_ignore() #Method that returns the address of the file def get_file_address(self): return Path(os.path.normpath(self.address)) def get_DSL(self): return self.workflow.get_DSL() def check_file_correctness(self): code = self.get_code() if(code.count("{")!=code.count("}")): curly_count = get_curly_count(code) if(curly_count!=0): raise BioFlowInsightError(f"Not the same number of opening and closing curlies '{'{}'}' in the file.", type = 16,origin=self) if(code.count("(")!=code.count(")")): parenthese_count = get_parenthese_count(code) if(parenthese_count!=0): raise BioFlowInsightError(f"Not the same number of opening and closing parentheses '()' in the file.", type = 16, origin=self) if(code.count('"""')%2!=0): raise BioFlowInsightError(f"An odd number of '\"\"\"' was found in the code.", type = 16, origin=self) #Method which returns the DSL of the workflow -> by default it's DSL2 #I use the presence of include, subworkflows and into/from in processes as a proxy def find_DSL(self): DSL = "DSL2" #If there are include pattern = constant.FULL_INLCUDE_2 for match in re.finditer(pattern, self.get_code()): return DSL #If there are subworkflows for match in re.finditer(constant.SUBWORKFLOW_HEADER, self.get_code()): return DSL #If there is the main for match in re.finditer(constant.WORKFLOW_HEADER_2, '\n'+self.get_code()+'\n'): return DSL #Analyse the processes self.extract_processes() for p in self.processes: DSL = p.which_DSL() if(DSL=="DSL1"): self.processes = [] return DSL self.processes = [] return DSL def get_workflow(self): return self.workflow def get_duplicate_status(self): return self.workflow.get_duplicate_status() #Returns either a subworkflow or process from the name def get_element_from_name(self, name): for process in self.processes: if(name==process.get_alias()): return process for subworkflow in self.subworkflows: if(name==subworkflow.get_alias()): return subworkflow for fun in self.functions: if(name==fun.get_alias()): return fun raise BioFlowInsightError(f"'{name}' is expected to be defined in the file, but it could not be found.", type = 18, origin=self) def get_modules_defined(self): return self.get_processes()+self.get_subworkflows()+self.get_functions()+self.get_modules_included() def get_output_dir(self): return self.workflow.get_output_dir() #---------------------- #PROCESSES #---------------------- def extract_processes(self): from .process import Process code = self.get_code() #Find pattern for match in re.finditer(constant.PROCESS_HEADER, code): start = match.span(0)[0] end = extract_curly(code, match.span(0)[1])#This function is defined in the functions file p = Process(code=code[start:end], nextflow_file=self) self.processes.append(p) def get_processes(self): return self.processes #---------------------- #SUBWORKFLOW (ones found in the file) #---------------------- def extract_subworkflows(self): from .subworkflow import Subworkflow #Get code without comments code = self.get_code() #Find pattern for match in re.finditer(constant.SUBWORKFLOW_HEADER, code): start = match.span(0)[0] end = extract_curly(code, match.span(0)[1])#This function is defined in the functions file sub = Subworkflow(code=code[start:end], nextflow_file=self, name=match.group(1)) self.subworkflows.append(sub) def get_subworkflows(self): return self.subworkflows #---------------------- #MAIN WORKFLOW #---------------------- #This method extracts the "main" workflow from the file def extract_main(self): if(self.first_file): from .main import Main #This returns the code without the comments code = "\n"+self.get_code()+"\n" #Find pattern twice = False for match in re.finditer(constant.WORKFLOW_HEADER_2, code): start = match.span(1)[0] end = extract_curly(code, match.span(1)[1])#This function is defined in the functions file self.main = Main(code= code[start:end], nextflow_file=self) if(twice): #TODO turn into biofow insight error raise Exception(f"Found multiple 'main workflows' in {self.get_file_address()}") twice = True if(self.main==None): raise BioFlowInsightError("A 'main' workflow was not found in the Nextflow file") #---------------------- #FUNCTIONS #---------------------- #Method that extracts the functions from a file -> we don't analyse them #since they don't structurally change the workflow def extract_functions(self): from .function import Function #pattern_function = r"(def|String|void|Void|byte|short|int|long|float|double|char|Boolean) *(\w+) *\([^,)]*(,[^,)]+)*\)\s*{" pattern_function = constant.HEADER_FUNCTION code = self.get_code() #Find pattern for match in re.finditer(pattern_function, code): start = match.span(0)[0] end = extract_curly(code, match.span(0)[1])#This function is defined in the functions file #f = Code(code=code[start:end], origin=self) #Fobiden names of functions if(match.group(2) not in ['if']): f = Function(code = code[start:end], name = match.group(2), origin =self) self.functions.append(f) def get_functions(self): return self.functions #---------------------- #INCLUDES #---------------------- def extract_includes(self): from .include import Include code = self.get_code() pattern = constant.FULL_INLCUDE_2 for match in re.finditer(pattern, code): includes = match.group(1).replace('{', '').replace('}', '').strip() #We do this if there are multiple includes #TODO -> this in a nicer way #To take into account #include { #PAIRTOOLS_SELECT # as PAIRTOOLS_SELECT_VP; #PAIRTOOLS_SELECT # as PAIRTOOLS_SELECT_LONG found_semi, found_n = bool(includes.find(";")+1), bool(includes.find("\n")+1) if(found_semi and found_n): temp = includes.split(";") tab = [] for temp_include in temp: temp_include = temp_include.replace("\n", ' ').strip() if(temp_include[:3] in constant.LIST_AS): tab[-1] = tab[-1]+" "+temp_include else: tab.append(temp_include) includes = tab elif(found_semi): includes = includes.split(";") elif(found_n): temp = includes.split("\n") tab = [] for temp_include in temp: temp_include = temp_include.strip() if(temp_include[:3]in constant.LIST_AS): tab[-1] = tab[-1]+" "+temp_include else: tab.append(temp_include) includes = tab else: includes = [includes] #TODO -> check this #https://www.nextflow.io/docs/latest/plugins.html#plugins #https://github.com/nextflow-io/nf-validation #address = match.group(0).split('from')[1].strip() address = match.group(6).strip() if(address[1:].split('/')[0] not in ['plugin']): include = Include(code =match.group(0), file = address, importing = includes, nextflow_file=self) self.includes.append(include) def get_includes(self): return self.includes def get_modules_included(self): modules = [] for include in self.includes: modules+=list(include.defines.values()) return modules def get_calls_made_outside_of_main(self): #Code without processes code = self.get_code() for proecess in self.processes: temp = code code = code.replace(proecess.get_code(), "") if(temp==code): raise Exception("This souldn't happen") for sub in self.subworkflows: temp = code code = code.replace(sub.get_code(), "") if(temp==code): raise Exception("This souldn't happen") for fun in self.functions: temp = code code = code.replace(fun.get_code(), "") if(temp==code): raise Exception("This souldn't happen") if(self.first_file and self.main!=None): temp = code code = code.replace(self.main.get_code(), "") if(temp==code): raise Exception("This souldn't happen") for include in self.includes: temp = code code = code.replace(include.get_code(), "") if(temp==code): raise Exception("This souldn't happen") from .root import Root self.root = Root(code=code, origin= self, modules_defined=self.get_modules_defined(), subworkflow_inputs = []) self.root.initialise() calls = {} self.root.get_all_calls_in_subworkflow(calls=calls) return list(calls.keys()) #---------------------- #INITIALISE #---------------------- #Method that initialises the nextflow file def initialise(self): #If the file is not alreday initialised then we self.initialise it if(not self.initialised): self.initialised = True if(self.workflow.get_display_info_bool()): print(f"Analysing -> '{self.get_file_address()}'") if(self.get_DSL()=="DSL2"): #Extarct Processes self.extract_processes() #Analysing Processes for process in self.processes: process.initialise() #Code without processes code = self.get_code() for proecess in self.processes: temp = code code = code.replace(proecess.get_code(), "") if(temp==code): print(code) print(proecess.get_code()) raise Exception("This souldn't happen") #Extract includes self.extract_includes() #Extract subworkflows self.extract_subworkflows() #Analyse Inludes for include in self.includes: include.initialise() #Extract main self.extract_main() #Extract functions self.extract_functions() #Analyse Main if(self.first_file and self.main!=None): self.main.initialise() # ##Analyse subworkflows #indice=1 #for sub in self.subworkflows: # sub.initialise() # indice+=1 elif(self.get_DSL()=="DSL1"): from .main import Main #Extarct Processes self.extract_processes() code = self.get_code() #Extract functions self.extract_functions() #Replacing the processes and functions defined with their identifiers -> this is to simplifly the analysis with the conditions for process in self.processes: temp = code code = code.replace(process.get_code(get_OG = True), f"process: {str(process)}") if(temp==code): print(process.get_code()) raise Exception("Something went wrong the code hasn't changed") for function in self.functions: temp = code code = code.replace(function.get_code(get_OG = True), f"function: {str(function)}") if(temp==code): raise Exception("Something went wrong the code hasn't changed") self.main = Main(code= code, nextflow_file=self) self.main.initialise() else: raise Exception("This shouldn't happen")