-
George Marchment authored8fb86f17
process.py 21.89 KiB
import re
import glob
from .code_ import Code
from .condition import Condition
from .nextflow_building_blocks import Nextflow_Building_Blocks
from .outils import remove_jumps_inbetween_parentheses, remove_jumps_inbetween_curlies, sort_and_filter, get_dico_from_tab_from_id, check_if_element_in_tab_rocrate, get_python_packages, get_R_libraries, get_perl_modules, process_2_DSL2
from .bioflowinsighterror import BioFlowInsightError
from . import constant
class Process(Nextflow_Building_Blocks):
def __init__(self, code, origin):
self.origin = origin
self.code = Code(code, origin = self)
self.name = ""
self.alias = ""
self.inputs = []
self.raw_input_names = []#This is used to convert DSL1 workflows to DSL2
self.outputs = []
self.input_code = ""
self.output_code = ""
self.when_code = ""
self.pusblishDir_code = ""
self.script_code = ""
self.tools = []
self.modules = []
self.commands = []
self.external_scripts = [-1]
self.initialise()
self.initialised = True
##It's important this is last
#self.condition = Condition(self)
def set_alias(self, alias):
self.alias = alias
def get_alias(self):
return self.alias
def get_script_code(self):
return self.script_code
def get_name(self):
return self.name
def get_tools(self, remove_script_calls = True):
def remove_script_calls(tab_temp):
tab = tab_temp.copy()
if("python" in tab):
tab.remove("python")
if("R" in tab):
tab.remove("R")
if("perl" in tab):
tab.remove("perl")
return tab
if(remove_script_calls):
return remove_script_calls(self.tools)
else:
return self.tools
def get_external_scripts_call(self, code):
tab = []
for match in re.finditer(r"((\s|\/|\'|\")([\w\_\-\&]+\/)*([\w\_\-\&]+)\.(sh|py|R|r|pl|rg|bash))[^\w]", code):
word = match.group(1).strip()
if(word[0]=="'" or word[0]=='"'):
word = word[1:]
tab.append(word)
return list(set(tab))
def initialise_external_scripts_code(self, code, extension = "", previously_called = {}):
calls = self.get_external_scripts_call(code+'\n')
#workflow_directory = self.origin.get_address()
#print(workflow_directory)
#import os
#print(os.getcwd(), self.origin.get_address(), self.get_workflow_address())
scripts = []
extensions = []
bash_scripts = []
tab = []
for call in calls:
#Check first if the file is in the bin
file = glob.glob(f'{self.get_workflow_address()}/bin/**/{call}', recursive=True)
if(len(file)>1):
raise BioFlowInsightError(f"More than one script named '{call}' in the workflow source code bin, don't know which one to use when using the process '{self.get_name()}'", num = 13, origin=self)
#If not we search again
elif(len(file)==0):
file = glob.glob(f'{self.get_workflow_address()}/**/{call}', recursive=True)
if(len(file)>1):
raise BioFlowInsightError(f"More than one script named '{call}' in the workflow source code, don't know which one to use when using the process '{self.get_name()}'", num = 13, origin=self)
for f in file:
if(f not in previously_called):
val = ""
with open(f, 'r', encoding='latin-1') as s:
#with open(f, 'r') as s:
val = s.read()
scripts.append(val)
previously_called[f] = ""
#Recursive Call to get the ones which are being called if the current file is a bash script
if(extension not in ["py", "R", "pl", "rg", "r"]):
tab += self.initialise_external_scripts_code(val, extension=f.split(".")[-1], previously_called= previously_called)
scripts+=tab
return list(set(scripts))
def get_external_scripts_code(self):
if(self.external_scripts!=[-1]):
None
else:
self.external_scripts = self.initialise_external_scripts_code(self.get_script_code())
return self.external_scripts
def get_python_packages_imported_internal_script(self):
packages = []
packages+= get_python_packages(self.get_script_code())
return packages
def get_python_packages_imported_external_scripts(self):
packages = []
external_scripts = self.get_external_scripts_code()
for s in external_scripts:
packages+= get_python_packages(s)
return packages
#This methods checks the script and the external script calls for python packages imports
def get_python_packages_imported(self):
packages = []
packages+= self.get_python_packages_imported_internal_script()
packages+= self.get_python_packages_imported_external_scripts()
return list(set(packages))
def get_R_libraries_loaded_internal_script(self):
libraries = []
libraries+= get_R_libraries(self.get_script_code())
return libraries
def get_R_libraries_loaded_external_scripts(self):
libraries = []
for s in self.get_external_scripts_code():
libraries+= get_R_libraries(s)
return libraries
#This methods checks the script and the external script calls for R libraries loaded
def get_R_libraries_loaded(self):
libraries = []
libraries+= self.get_R_libraries_loaded_internal_script()
libraries+= self.get_R_libraries_loaded_external_scripts()
return list(set(libraries))
def get_perl_modules_imported_internal_script(self):
libraries = []
libraries+= get_perl_modules(self.get_script_code())
return libraries
def get_perl_modules_imported_external_scripts(self):
libraries = []
for s in self.get_external_scripts_code():
libraries+= get_perl_modules(s)
return libraries
def get_perl_modules_imported(self):
libraries = []
libraries+= self.get_perl_modules_imported_internal_script()
libraries+= self.get_perl_modules_imported_external_scripts()
return list(set(libraries))
#def get_source(self):
# return [self]
#MEthod which returns the DSL type of a process, i use the presence
#of from and into as a proxy. By default it's DSL2
def which_DSL(self):
DSL = "DSL2"
pattern = constant.FROM
for match in re.finditer(pattern, self.code.get_code()):
DSL = "DSL1"
pattern = constant.INTO
for match in re.finditer(pattern, self.code.get_code()):
DSL = "DSL1"
return DSL
def is_initialised(self):
return self.initialised
#def get_sink(self):
# return [self]
def get_type(self):
return "Process"
def get_input_code_lines(self):
tab = []
for l in self.input_code.split('\n'):
tab.append(l.strip())
return tab
def get_inputs(self):
return self.inputs
def get_nb_inputs(self):
return len(self.inputs)
def get_outputs(self):
return self.outputs
def get_output_code_lines(self):
tab = []
for l in self.output_code.split('\n'):
tab.append(l.strip())
return tab
def get_nb_outputs(self):
return len(self.outputs)
#TODO -> Have a much better way of doing this
def extract_tools(self):
script = self.script_code.lower()
for tool in constant.TOOLS:
if tool in script:
self.tools.append(tool)
def initialise_parts(self):
code = self.get_code()
#Check to see if the process is empty
temp_code = re.sub(constant.PROCESS_HEADER, "", code)
temp_code = temp_code[:-1].strip()
if(len(temp_code)==0):
raise BioFlowInsightError(f"The process '{self.get_name()}' defined in the file '{self.get_file_address()}' is an empty process!", num = 22, origin=self)
publishDir_multiple, publishDir_pos= False, (0, 0)
for match in re.finditer(r"publishDir", code):
#if(publishDir_multiple):
# raise BioFlowInsightError(f"Multiple 'publishDir' were found in the process '{self.get_name()}'.", num = 22, origin=self)
publishDir_pos = match.span(0)
publishDir_multiple = True
input_multiple, input_pos= False, (0, 0)
for match in re.finditer(constant.INPUT, code):
if(input_multiple):
raise BioFlowInsightError(f"Multiple 'input:' were found in the process '{self.get_name()}'.", num = 22, origin=self)
input_pos = match.span(0)
input_multiple = True
output_multiple, output_pos= False, (0, 0)
for match in re.finditer(constant.OUTPUT, code):
if(output_multiple):
raise BioFlowInsightError(f"Multiple 'output:' were found in the process '{self.get_name()}'?", num = 22, origin=self)
output_pos = match.span(0)
output_multiple = True
when_multiple, when_pos= False, (0, 0)
for match in re.finditer(constant.WHEN, code):
if(when_multiple):
raise BioFlowInsightError(f"Multiple 'when:' were found in the process '{self.get_name()}'.", num = 22, origin=self)
when_pos = match.span(0)
when_multiple = True
script_pos= (0, 0)
for match in re.finditer(constant.SCRIPT, code):
script_pos = match.span(0)
break
positions = [publishDir_pos, input_pos, output_pos, when_pos, script_pos]
variables_index = ['pusblishDir', 'input', 'output', 'when', 'script']
positions, variables_index = sort_and_filter(positions, variables_index)
for i in range(len(positions)):
temp_code = ""
if(i==len(positions)-1):
temp_code = code[positions[i][1]:code.rfind('}')].strip()
else:
temp_code = code[positions[i][1]:positions[i+1][0]].strip()
if(variables_index[i]=='input'):
self.input_code = temp_code
elif(variables_index[i]=='output'):
self.output_code = temp_code
elif(variables_index[i]=='pusblishDir'):
self.pusblishDir_code = temp_code
elif(variables_index[i]=='when'):
self.when_code = temp_code
elif(variables_index[i]=='script'):
self.script_code = temp_code
self.extract_tools()
else:
raise Exception("This shoudn't happen!")
#Method that returns the input part of the process code
def get_input_code(self):
return self.input_code
#Function that extracts the inputs from a process
def initialise_inputs_DSL1(self):
code = "\n"+self.get_input_code()+"\n"
code = remove_jumps_inbetween_parentheses(code)
code = remove_jumps_inbetween_curlies(code)
#Simplying the inputs -> when there is a jump line '.' -> it turns it to '.'
code = re.sub(constant.JUMP_DOT, '.', code)
def add_channel(name):
from .channel import Channel
input = Channel(name=name, origin=self.origin)
if(not self.origin.check_in_channels(input)):
self.origin.add_channel(input)
input.add_sink(self)
self.inputs.append(input)
else:
input = self.origin.get_channel_from_name(name)
self.inputs.append(input)
input.add_sink(self)
for line in code.split("\n"):
#Case there is a single channel as an input -> doesn't use from to import channel -> uses file (see https://github.com/nextflow-io/nextflow/blob/45ceadbdba90b0b7a42a542a9fc241fb04e3719d/docs/process.rst)
pattern = constant.FILE
for match in re.finditer(pattern, line+"\n"):
extracted = match.group(1).strip()
add_channel(extracted)
self.raw_input_names.append(extracted)
#Case there are multiple channels as input (e.g. channel1.mix(channel2))
pattern = constant.FROM
for match in re.finditer(pattern, line+"\n"):
extracted = match.group(1).strip()
self.raw_input_names.append(extracted)
#print(extracted)
if(bool(re.fullmatch(constant.WORD, extracted))):
add_channel(extracted)
else:
from .operation import Operation
operation = Operation(code=extracted, origin=self.origin)
operation.initialise()
operation.is_defined_in_process(self)
self.inputs+=operation.get_origins()
#self.inputs = list(set(self.inputs))#TODO Check this
#Function that extracts the inputs from a process (for DSLS workflows)
def initialise_inputs_DSL2(self):
code = self.get_input_code()
code = remove_jumps_inbetween_parentheses(code)
code = remove_jumps_inbetween_curlies(code)
for input in code.split("\n"):
input = input.strip()
if(input!=""):
self.inputs.append(input)
#Method that returns the input part of the process code
def get_output_code(self):
return self.output_code
def get_file_extensions_outputs(self):
code = self.get_output_code()
extensions = []
for match in re.finditer(r"(\.\w+)+|\.\w+", code):
extensions.append(match.group(0))
return extensions
def get_input_parameters(self):
code = self.get_input_code()
#This is to remove the from for the DSL1 processes
#But also remoce the 'stageAs'
lines = code.split('\n')
code = ""
for l in lines:
code+=l.split(" from ")[0].split("stageAs")[0]
code+'\n'
parameters = []
for match in re.finditer(r"\w+(\.\w+)*", code):
parameters.append(match.group(0))
parameters = list(set(parameters))#Here we can a unique cause a parameter can only be given once in any case
words_2_remove = ["path", "val", "tuple", "into", "stageAs", "emit", "file", "set"]
for word in words_2_remove:
try:
parameters.remove(word)
except:
None
return parameters
def get_modules(self):
return self.modules
def get_commands(self):
return self.commands
#Function that extracts the outputs from a process (DSL1)
def initialise_outputs_DSL1(self):
code = self.get_output_code()
code = remove_jumps_inbetween_parentheses(code)
code = remove_jumps_inbetween_curlies(code)
def add_channel(name):
from .channel import Channel
output = Channel(name=name, origin=self.origin)
if(not self.origin.check_in_channels(output)):
self.origin.add_channel(output)
output.add_source(self)
self.outputs.append(output)
else:
output = self.origin.get_channel_from_name(outputs[i].strip())
self.outputs.append(output)
output.add_source(self)
pattern =constant.INTO_2
for match in re.finditer(pattern, code):
outputs = match.group(1).split(',')
for i in range(len(outputs)):
add_channel(outputs[i].strip())
pattern = constant.FILE
for match in re.finditer(pattern, code):
add_channel(match.group(1))
#Function that extracts the inputs from a process (for DSLS workflows)
def initialise_outputs_DSL2(self):
code = self.get_output_code()
code = remove_jumps_inbetween_parentheses(code)
code = remove_jumps_inbetween_curlies(code)
for output in code.split("\n"):
output = output.strip()
if(output!=""):
self.outputs.append(output)
def initialise_name(self):
for match in re.finditer(constant.PROCESS_HEADER, self.code.get_code()):
self.name = match.group(1)
self.name = self.name.replace("'", "")
self.name = self.name.replace('"', '')
self.alias = self.name
def get_structure(self, dico):
dico['nodes'].append({'id':str(self), 'name':self.get_name(), "shape":"ellipse", 'xlabel':"", 'fillcolor':''})
def initialise_inputs_outputs(self):
DSL = self.origin.get_DSL()
if(DSL=="DSL1"):
self.initialise_inputs_DSL1()
self.initialise_outputs_DSL1()
elif(DSL=="DSL2"):
self.initialise_inputs_DSL2()
self.initialise_outputs_DSL2()
#else:
# raise Exception("Workflow is neither written in DSL1 nor DSL2!")
def initialise(self):
self.initialise_name()
self.initialise_parts()
self.initialise_inputs_outputs()
#annotations = self.get_processes_annotation()
annotations = None
if(annotations!=None):
self.tools = annotations[self.get_code()]["tools"]
self.commands = annotations[self.get_code()]["commands"]
self.modules = annotations[self.get_code()]["modules"]
def add_2_rocrate(self, dico, parent_key):
process_key = self.get_rocrate_key(dico)
dico_process = get_dico_from_tab_from_id(dico, process_key)
if(dico_process==None):
dico_process = {}
dico_process["@id"] = process_key
dico_process["name"] = "Process"
dico_process["@type"] = ["SoftwareSourceCode"]
#ADD INPUTS
dico_process["input"] = []
for input in self.get_inputs():
if(type(input)==str):
name_input = input
else:
name_input = input.get_code()
dico_input = get_dico_from_tab_from_id(dico, name_input)
if(dico_input==None):
dico_input = {"@id":f"#{name_input}", "@name": name_input, "@type": "FormalParameter"}
dico["@graph"].append(dico_input)
dico_process["input"].append({"@id":dico_input["@id"]})
#ADD OUTPUTS
dico_process["output"] = []
for output in self.get_outputs():
if(type(output)==str):
name_output = output
else:
name_output = output.get_code()
dico_output = get_dico_from_tab_from_id(dico, name_output)
if(dico_output==None):
dico_output = {"@id":f"#{name_output}", "@name": name_output, "@type": "FormalParameter"}
dico["@graph"].append(dico_output)
dico_process["output"].append({"@id":dico_output["@id"]})
#ADD isPartOf
dico_process["isPartOf"] = []
dico_process["isPartOf"].append({"@id":parent_key})
#ADD hasPart
dico_process["hasPart"] = []
for tool in self.get_tools():
dico_tool = get_dico_from_tab_from_id(dico, tool)
if(dico_tool==None):
dico_tool = {"@id":tool,
"name": "Tool"
#TODO in later versions
#, "url": "https://some.link.com"
#, "identifier": "tool_identifier"
}
dico["@graph"].append(dico_tool)
dico_process["hasPart"].append({"@id":dico_tool["@id"]})
dico["@graph"].append(dico_process)
else:
if(not check_if_element_in_tab_rocrate(dico_process["isPartOf"], parent_key)):
dico_process["isPartOf"].append({"@id":parent_key})
def convert_input_code_to_DSL2(self):
code = self.input_code
code = process_2_DSL2(code)
lines = []
for line in code.split("\n"):
lines.append(line.split(" from ")[0])
code = "\n".join(lines)
return code
def convert_output_code_to_DSL2(self):
code = self.output_code
code = process_2_DSL2(code)
code = code.replace(" into ", ", emits: ")
code = code.replace(" mode flatten", "")
return code
#This method is to detect which are the channels which need to be flattened
#See https://github.com/nextflow-io/nextflow/blob/be1694bfebeb2df509ec4b42ea5b878ebfbb6627/docs/dsl1.md
def get_channels_to_flatten(self):
code = self.output_code
channels = []
for match in re.finditer(r"(\w+) mode flatten", code):
channels.append(match.group(1))
return channels
def convert_to_DSL2(self):
if(self.get_DSL()=="DSL2"):
print("Workflow is already written in DSL2")
else:
code = self.get_code()
call = [f"{self.get_name()}({', '.join(self.raw_input_names)})"]
code = code.replace(self.input_code, self.convert_input_code_to_DSL2())
code = code.replace(self.output_code, self.convert_output_code_to_DSL2())
channels_to_flatten = self.get_channels_to_flatten()
for o in self.outputs:
if(o.get_code() in channels_to_flatten):
call.append(f"{o.get_code()} = {self.get_name()}.out.{o.get_code()}.flatten()")
else:
call.append(f"{o.get_code()} = {self.get_name()}.out.{o.get_code()}")
call = "\n".join(call)
return code, call