import re import glob from .code_ import Code from .condition import Condition from .nextflow_building_blocks import Nextflow_Building_Blocks from .outils import remove_jumps_inbetween_parentheses, remove_jumps_inbetween_curlies, sort_and_filter, get_dico_from_tab_from_id, check_if_element_in_tab_rocrate, get_python_packages, get_R_libraries, get_perl_modules, process_2_DSL2 from .bioflowinsighterror import BioFlowInsightError from . import constant class Process(Nextflow_Building_Blocks): def __init__(self, code, origin): self.origin = origin self.code = Code(code, origin = self) self.name = "" self.alias = "" self.inputs = [] self.raw_input_names = []#This is used to convert DSL1 workflows to DSL2 self.outputs = [] self.input_code = "" self.output_code = "" self.when_code = "" self.pusblishDir_code = "" self.script_code = "" self.tools = [] self.modules = [] self.commands = [] self.external_scripts = [-1] self.initialise() self.initialised = True ##It's important this is last #self.condition = Condition(self) def set_alias(self, alias): self.alias = alias def get_alias(self): return self.alias def get_script_code(self): return self.script_code def get_name(self): return self.name def get_tools(self, remove_script_calls = True): def remove_script_calls(tab_temp): tab = tab_temp.copy() if("python" in tab): tab.remove("python") if("R" in tab): tab.remove("R") if("perl" in tab): tab.remove("perl") return tab if(remove_script_calls): return remove_script_calls(self.tools) else: return self.tools def get_external_scripts_call(self, code): tab = [] for match in re.finditer(r"((\s|\/|\'|\")([\w\_\-\&]+\/)*([\w\_\-\&]+)\.(sh|py|R|r|pl|rg|bash))[^\w]", code): word = match.group(1).strip() if(word[0]=="'" or word[0]=='"'): word = word[1:] tab.append(word) return list(set(tab)) def initialise_external_scripts_code(self, code, extension = "", previously_called = {}): calls = self.get_external_scripts_call(code+'\n') #workflow_directory = self.origin.get_address() #print(workflow_directory) #import os #print(os.getcwd(), self.origin.get_address(), self.get_workflow_address()) scripts = [] extensions = [] bash_scripts = [] tab = [] for call in calls: #Check first if the file is in the bin file = glob.glob(f'{self.get_workflow_address()}/bin/**/{call}', recursive=True) if(len(file)>1): raise BioFlowInsightError(f"More than one script named '{call}' in the workflow source code bin, don't know which one to use when using the process '{self.get_name()}'", num = 13, origin=self) #If not we search again elif(len(file)==0): file = glob.glob(f'{self.get_workflow_address()}/**/{call}', recursive=True) if(len(file)>1): raise BioFlowInsightError(f"More than one script named '{call}' in the workflow source code, don't know which one to use when using the process '{self.get_name()}'", num = 13, origin=self) for f in file: if(f not in previously_called): val = "" with open(f, 'r', encoding='latin-1') as s: #with open(f, 'r') as s: val = s.read() scripts.append(val) previously_called[f] = "" #Recursive Call to get the ones which are being called if the current file is a bash script if(extension not in ["py", "R", "pl", "rg", "r"]): tab += self.initialise_external_scripts_code(val, extension=f.split(".")[-1], previously_called= previously_called) scripts+=tab return list(set(scripts)) def get_external_scripts_code(self): if(self.external_scripts!=[-1]): None else: self.external_scripts = self.initialise_external_scripts_code(self.get_script_code()) return self.external_scripts def get_python_packages_imported_internal_script(self): packages = [] packages+= get_python_packages(self.get_script_code()) return packages def get_python_packages_imported_external_scripts(self): packages = [] external_scripts = self.get_external_scripts_code() for s in external_scripts: packages+= get_python_packages(s) return packages #This methods checks the script and the external script calls for python packages imports def get_python_packages_imported(self): packages = [] packages+= self.get_python_packages_imported_internal_script() packages+= self.get_python_packages_imported_external_scripts() return list(set(packages)) def get_R_libraries_loaded_internal_script(self): libraries = [] libraries+= get_R_libraries(self.get_script_code()) return libraries def get_R_libraries_loaded_external_scripts(self): libraries = [] for s in self.get_external_scripts_code(): libraries+= get_R_libraries(s) return libraries #This methods checks the script and the external script calls for R libraries loaded def get_R_libraries_loaded(self): libraries = [] libraries+= self.get_R_libraries_loaded_internal_script() libraries+= self.get_R_libraries_loaded_external_scripts() return list(set(libraries)) def get_perl_modules_imported_internal_script(self): libraries = [] libraries+= get_perl_modules(self.get_script_code()) return libraries def get_perl_modules_imported_external_scripts(self): libraries = [] for s in self.get_external_scripts_code(): libraries+= get_perl_modules(s) return libraries def get_perl_modules_imported(self): libraries = [] libraries+= self.get_perl_modules_imported_internal_script() libraries+= self.get_perl_modules_imported_external_scripts() return list(set(libraries)) #def get_source(self): # return [self] #MEthod which returns the DSL type of a process, i use the presence #of from and into as a proxy. By default it's DSL2 def which_DSL(self): DSL = "DSL2" pattern = constant.FROM for match in re.finditer(pattern, self.code.get_code()): DSL = "DSL1" pattern = constant.INTO for match in re.finditer(pattern, self.code.get_code()): DSL = "DSL1" return DSL def is_initialised(self): return self.initialised #def get_sink(self): # return [self] def get_type(self): return "Process" def get_input_code_lines(self): tab = [] for l in self.input_code.split('\n'): tab.append(l.strip()) return tab def get_inputs(self): return self.inputs def get_nb_inputs(self): return len(self.inputs) def get_outputs(self): return self.outputs def get_output_code_lines(self): tab = [] for l in self.output_code.split('\n'): tab.append(l.strip()) return tab def get_nb_outputs(self): return len(self.outputs) #TODO -> Have a much better way of doing this def extract_tools(self): script = self.script_code.lower() for tool in constant.TOOLS: if tool in script: self.tools.append(tool) def initialise_parts(self): code = self.get_code() #Check to see if the process is empty temp_code = re.sub(constant.PROCESS_HEADER, "", code) temp_code = temp_code[:-1].strip() if(len(temp_code)==0): raise BioFlowInsightError(f"The process '{self.get_name()}' defined in the file '{self.get_file_address()}' is an empty process!", num = 22, origin=self) publishDir_multiple, publishDir_pos= False, (0, 0) for match in re.finditer(r"publishDir", code): #if(publishDir_multiple): # raise BioFlowInsightError(f"Multiple 'publishDir' were found in the process '{self.get_name()}'.", num = 22, origin=self) publishDir_pos = match.span(0) publishDir_multiple = True input_multiple, input_pos= False, (0, 0) for match in re.finditer(constant.INPUT, code): if(input_multiple): raise BioFlowInsightError(f"Multiple 'input:' were found in the process '{self.get_name()}'.", num = 22, origin=self) input_pos = match.span(0) input_multiple = True output_multiple, output_pos= False, (0, 0) for match in re.finditer(constant.OUTPUT, code): if(output_multiple): raise BioFlowInsightError(f"Multiple 'output:' were found in the process '{self.get_name()}'?", num = 22, origin=self) output_pos = match.span(0) output_multiple = True when_multiple, when_pos= False, (0, 0) for match in re.finditer(constant.WHEN, code): if(when_multiple): raise BioFlowInsightError(f"Multiple 'when:' were found in the process '{self.get_name()}'.", num = 22, origin=self) when_pos = match.span(0) when_multiple = True script_pos= (0, 0) for match in re.finditer(constant.SCRIPT, code): script_pos = match.span(0) break positions = [publishDir_pos, input_pos, output_pos, when_pos, script_pos] variables_index = ['pusblishDir', 'input', 'output', 'when', 'script'] positions, variables_index = sort_and_filter(positions, variables_index) for i in range(len(positions)): temp_code = "" if(i==len(positions)-1): temp_code = code[positions[i][1]:code.rfind('}')].strip() else: temp_code = code[positions[i][1]:positions[i+1][0]].strip() if(variables_index[i]=='input'): self.input_code = temp_code elif(variables_index[i]=='output'): self.output_code = temp_code elif(variables_index[i]=='pusblishDir'): self.pusblishDir_code = temp_code elif(variables_index[i]=='when'): self.when_code = temp_code elif(variables_index[i]=='script'): self.script_code = temp_code self.extract_tools() else: raise Exception("This shoudn't happen!") #Method that returns the input part of the process code def get_input_code(self): return self.input_code #Function that extracts the inputs from a process def initialise_inputs_DSL1(self): code = "\n"+self.get_input_code()+"\n" code = remove_jumps_inbetween_parentheses(code) code = remove_jumps_inbetween_curlies(code) #Simplying the inputs -> when there is a jump line '.' -> it turns it to '.' code = re.sub(constant.JUMP_DOT, '.', code) def add_channel(name): from .channel import Channel input = Channel(name=name, origin=self.origin) if(not self.origin.check_in_channels(input)): self.origin.add_channel(input) input.add_sink(self) self.inputs.append(input) else: input = self.origin.get_channel_from_name(name) self.inputs.append(input) input.add_sink(self) for line in code.split("\n"): #Case there is a single channel as an input -> doesn't use from to import channel -> uses file (see https://github.com/nextflow-io/nextflow/blob/45ceadbdba90b0b7a42a542a9fc241fb04e3719d/docs/process.rst) pattern = constant.FILE for match in re.finditer(pattern, line+"\n"): extracted = match.group(1).strip() add_channel(extracted) self.raw_input_names.append(extracted) #Case there are multiple channels as input (e.g. channel1.mix(channel2)) pattern = constant.FROM for match in re.finditer(pattern, line+"\n"): extracted = match.group(1).strip() self.raw_input_names.append(extracted) #print(extracted) if(bool(re.fullmatch(constant.WORD, extracted))): add_channel(extracted) else: from .operation import Operation operation = Operation(code=extracted, origin=self.origin) operation.initialise() operation.is_defined_in_process(self) self.inputs+=operation.get_origins() #self.inputs = list(set(self.inputs))#TODO Check this #Function that extracts the inputs from a process (for DSLS workflows) def initialise_inputs_DSL2(self): code = self.get_input_code() code = remove_jumps_inbetween_parentheses(code) code = remove_jumps_inbetween_curlies(code) for input in code.split("\n"): input = input.strip() if(input!=""): self.inputs.append(input) #Method that returns the input part of the process code def get_output_code(self): return self.output_code def get_file_extensions_outputs(self): code = self.get_output_code() extensions = [] for match in re.finditer(r"(\.\w+)+|\.\w+", code): extensions.append(match.group(0)) return extensions def get_input_parameters(self): code = self.get_input_code() #This is to remove the from for the DSL1 processes #But also remoce the 'stageAs' lines = code.split('\n') code = "" for l in lines: code+=l.split(" from ")[0].split("stageAs")[0] code+'\n' parameters = [] for match in re.finditer(r"\w+(\.\w+)*", code): parameters.append(match.group(0)) parameters = list(set(parameters))#Here we can a unique cause a parameter can only be given once in any case words_2_remove = ["path", "val", "tuple", "into", "stageAs", "emit", "file", "set"] for word in words_2_remove: try: parameters.remove(word) except: None return parameters def get_modules(self): return self.modules def get_commands(self): return self.commands #Function that extracts the outputs from a process (DSL1) def initialise_outputs_DSL1(self): code = self.get_output_code() code = remove_jumps_inbetween_parentheses(code) code = remove_jumps_inbetween_curlies(code) def add_channel(name): from .channel import Channel output = Channel(name=name, origin=self.origin) if(not self.origin.check_in_channels(output)): self.origin.add_channel(output) output.add_source(self) self.outputs.append(output) else: output = self.origin.get_channel_from_name(outputs[i].strip()) self.outputs.append(output) output.add_source(self) pattern =constant.INTO_2 for match in re.finditer(pattern, code): outputs = match.group(1).split(',') for i in range(len(outputs)): add_channel(outputs[i].strip()) pattern = constant.FILE for match in re.finditer(pattern, code): add_channel(match.group(1)) #Function that extracts the inputs from a process (for DSLS workflows) def initialise_outputs_DSL2(self): code = self.get_output_code() code = remove_jumps_inbetween_parentheses(code) code = remove_jumps_inbetween_curlies(code) for output in code.split("\n"): output = output.strip() if(output!=""): self.outputs.append(output) def initialise_name(self): for match in re.finditer(constant.PROCESS_HEADER, self.code.get_code()): self.name = match.group(1) self.name = self.name.replace("'", "") self.name = self.name.replace('"', '') self.alias = self.name def get_structure(self, dico): dico['nodes'].append({'id':str(self), 'name':self.get_name(), "shape":"ellipse", 'xlabel':"", 'fillcolor':''}) def initialise_inputs_outputs(self): DSL = self.origin.get_DSL() if(DSL=="DSL1"): self.initialise_inputs_DSL1() self.initialise_outputs_DSL1() elif(DSL=="DSL2"): self.initialise_inputs_DSL2() self.initialise_outputs_DSL2() #else: # raise Exception("Workflow is neither written in DSL1 nor DSL2!") def initialise(self): self.initialise_name() self.initialise_parts() self.initialise_inputs_outputs() #annotations = self.get_processes_annotation() annotations = None if(annotations!=None): self.tools = annotations[self.get_code()]["tools"] self.commands = annotations[self.get_code()]["commands"] self.modules = annotations[self.get_code()]["modules"] def add_2_rocrate(self, dico, parent_key): process_key = self.get_rocrate_key(dico) dico_process = get_dico_from_tab_from_id(dico, process_key) if(dico_process==None): dico_process = {} dico_process["@id"] = process_key dico_process["name"] = "Process" dico_process["@type"] = ["SoftwareSourceCode"] #ADD INPUTS dico_process["input"] = [] for input in self.get_inputs(): if(type(input)==str): name_input = input else: name_input = input.get_code() dico_input = get_dico_from_tab_from_id(dico, name_input) if(dico_input==None): dico_input = {"@id":f"#{name_input}", "@name": name_input, "@type": "FormalParameter"} dico["@graph"].append(dico_input) dico_process["input"].append({"@id":dico_input["@id"]}) #ADD OUTPUTS dico_process["output"] = [] for output in self.get_outputs(): if(type(output)==str): name_output = output else: name_output = output.get_code() dico_output = get_dico_from_tab_from_id(dico, name_output) if(dico_output==None): dico_output = {"@id":f"#{name_output}", "@name": name_output, "@type": "FormalParameter"} dico["@graph"].append(dico_output) dico_process["output"].append({"@id":dico_output["@id"]}) #ADD isPartOf dico_process["isPartOf"] = [] dico_process["isPartOf"].append({"@id":parent_key}) #ADD hasPart dico_process["hasPart"] = [] for tool in self.get_tools(): dico_tool = get_dico_from_tab_from_id(dico, tool) if(dico_tool==None): dico_tool = {"@id":tool, "name": "Tool" #TODO in later versions #, "url": "https://some.link.com" #, "identifier": "tool_identifier" } dico["@graph"].append(dico_tool) dico_process["hasPart"].append({"@id":dico_tool["@id"]}) dico["@graph"].append(dico_process) else: if(not check_if_element_in_tab_rocrate(dico_process["isPartOf"], parent_key)): dico_process["isPartOf"].append({"@id":parent_key}) def convert_input_code_to_DSL2(self): code = self.input_code code = process_2_DSL2(code) lines = [] for line in code.split("\n"): lines.append(line.split(" from ")[0]) code = "\n".join(lines) return code def convert_output_code_to_DSL2(self): code = self.output_code code = process_2_DSL2(code) code = code.replace(" into ", ", emits: ") code = code.replace(" mode flatten", "") return code #This method is to detect which are the channels which need to be flattened #See https://github.com/nextflow-io/nextflow/blob/be1694bfebeb2df509ec4b42ea5b878ebfbb6627/docs/dsl1.md def get_channels_to_flatten(self): code = self.output_code channels = [] for match in re.finditer(r"(\w+) mode flatten", code): channels.append(match.group(1)) return channels def convert_to_DSL2(self): if(self.get_DSL()=="DSL2"): print("Workflow is already written in DSL2") else: code = self.get_code() call = [f"{self.get_name()}({', '.join(self.raw_input_names)})"] code = code.replace(self.input_code, self.convert_input_code_to_DSL2()) code = code.replace(self.output_code, self.convert_output_code_to_DSL2()) channels_to_flatten = self.get_channels_to_flatten() for o in self.outputs: if(o.get_code() in channels_to_flatten): call.append(f"{o.get_code()} = {self.get_name()}.out.{o.get_code()}.flatten()") else: call.append(f"{o.get_code()} = {self.get_name()}.out.{o.get_code()}") call = "\n".join(call) return code, call