-
George Marchment authorede71f4188
process.py 13.67 KiB
import re
import glob
import copy
from .code_ import Code
from .condition import Condition
from .nextflow_building_blocks import Nextflow_Building_Blocks
from .outils import remove_jumps_inbetween_parentheses, remove_jumps_inbetween_curlies, sort_and_filter, get_dico_from_tab_from_id, check_if_element_in_tab_rocrate, get_python_packages, get_R_libraries, get_perl_modules, process_2_DSL2
from .bioflowinsighterror import BioFlowInsightError
from . import constant
class Process(Nextflow_Building_Blocks):
def __init__(self, code, nextflow_file):
self.nextflow_file = nextflow_file
self.code = Code(code, origin = self)
self.name = ""
self.alias = ""
self.printed_name = ""
self.inputs = []
self.raw_input_names = []#This is used to convert DSL1 workflows to DSL2
self.outputs = []
self.input_code = ""
self.output_code = ""
self.when_code = ""
self.pusblishDir_code = ""
self.script_code = ""
self.called_by = []#List of calls
def copy(self):
process = copy.copy(self)
process.name = ""
process.alias = ""
process.printed_name = ""
process.inputs = []
process.raw_input_names = []#This is used to convert DSL1 workflows to DSL2
process.outputs = []
process.input_code = ""
process.output_code = ""
process.when_code = ""
process.pusblishDir_code = ""
process.script_code = ""
process.called_by = []#List of calls
return process
def add_to_emits(self, emit):
self.later_emits.append(emit)
def get_later_emits(self):
return self.later_emits
def set_alias(self, alias):
self.alias = alias
def get_number_times_called(self):
return self.number_times_called
def incremente_number_times_called(self):
self.number_times_called+=1
def set_call(self, call):
self.call.append(call)
def get_call(self):
return self.call
def get_alias(self):
return self.alias
def get_script_code(self):
return self.script_code
def get_name(self):
return self.name
#Method which returns the DSL type of a process, i use the presence
#of from and into as a proxy. By default it's DSL2
def which_DSL(self):
DSL = "DSL2"
pattern = constant.FROM
for match in re.finditer(pattern, self.code.get_code()):
DSL = "DSL1"
pattern = constant.INTO
for match in re.finditer(pattern, self.code.get_code()):
DSL = "DSL1"
return DSL
def is_initialised(self):
return self.initialised
#def get_sink(self):
# return [self]
def get_type(self):
return "Process"
def get_input_code_lines(self):
tab = []
for l in self.input_code.split('\n'):
tab.append(l.strip())
return tab
def get_inputs(self):
return self.inputs
def get_nb_inputs(self):
return len(self.inputs)
def get_outputs(self):
return self.outputs
def get_output_code_lines(self):
tab = []
for l in self.output_code.split('\n'):
tab.append(l.strip())
return tab
def get_nb_outputs(self):
return len(self.outputs)
#TODO -> Have a much better way of doing this
def extract_tools(self):
script = self.script_code.lower()
for tool in constant.TOOLS:
if tool in script:
self.tools.append(tool)
def initialise_parts(self):
code = self.get_code()
#Check to see if the process is empty
temp_code = re.sub(constant.PROCESS_HEADER, "", code)
temp_code = temp_code[:-1].strip()
if(len(temp_code)==0):
raise BioFlowInsightError(f"The process '{self.get_name()}' defined in the file '{self.get_file_address()}' is an empty process!", num = 22, origin=self)
publishDir_multiple, publishDir_pos= False, (0, 0)
for match in re.finditer(r"publishDir", code):
#if(publishDir_multiple):
# raise BioFlowInsightError(f"Multiple 'publishDir' were found in the process '{self.get_name()}'.", num = 22, origin=self)
publishDir_pos = match.span(0)
publishDir_multiple = True
input_multiple, input_pos= False, (0, 0)
for match in re.finditer(constant.INPUT, code):
if(input_multiple):
raise BioFlowInsightError(f"Multiple 'input:' were found in the process '{self.get_name()}'.", num = 22, origin=self)
input_pos = match.span(0)
input_multiple = True
output_multiple, output_pos= False, (0, 0)
for match in re.finditer(constant.OUTPUT, code):
if(output_multiple):
raise BioFlowInsightError(f"Multiple 'output:' were found in the process '{self.get_name()}'?", num = 22, origin=self)
output_pos = match.span(0)
output_multiple = True
when_multiple, when_pos= False, (0, 0)
for match in re.finditer(constant.WHEN, code):
if(when_multiple):
raise BioFlowInsightError(f"Multiple 'when:' were found in the process '{self.get_name()}'.", num = 22, origin=self)
when_pos = match.span(0)
when_multiple = True
script_pos= (0, 0)
for match in re.finditer(constant.SCRIPT, code):
script_pos = match.span(0)
break
positions = [publishDir_pos, input_pos, output_pos, when_pos, script_pos]
variables_index = ['pusblishDir', 'input', 'output', 'when', 'script']
positions, variables_index = sort_and_filter(positions, variables_index)
for i in range(len(positions)):
temp_code = ""
if(i==len(positions)-1):
temp_code = code[positions[i][1]:code.rfind('}')].strip()
else:
temp_code = code[positions[i][1]:positions[i+1][0]].strip()
if(variables_index[i]=='input'):
self.input_code = temp_code
elif(variables_index[i]=='output'):
self.output_code = temp_code
elif(variables_index[i]=='pusblishDir'):
self.pusblishDir_code = temp_code
elif(variables_index[i]=='when'):
self.when_code = temp_code
elif(variables_index[i]=='script'):
self.script_code = temp_code
#self.extract_tools()
else:
raise Exception("This shoudn't happen!")
#Method that returns the input part of the process code
def get_input_code(self):
return self.input_code
#Function that extracts the inputs from a process
def initialise_inputs_DSL1(self):
code = "\n"+self.get_input_code()+"\n"
code = remove_jumps_inbetween_parentheses(code)
code = remove_jumps_inbetween_curlies(code)
#Simplying the inputs -> when there is a jump line '.' -> it turns it to '.'
code = re.sub(constant.JUMP_DOT, '.', code)
def add_channel(name):
from .channel import Channel
input = Channel(name=name, origin=self.origin)
if(not self.origin.check_in_channels(input)):
self.origin.add_channel(input)
input.add_sink(self)
self.inputs.append(input)
else:
input = self.origin.get_channel_from_name(name)
self.inputs.append(input)
input.add_sink(self)
for line in code.split("\n"):
#Case there is a single channel as an input -> doesn't use from to import channel -> uses file (see https://github.com/nextflow-io/nextflow/blob/45ceadbdba90b0b7a42a542a9fc241fb04e3719d/docs/process.rst)
pattern = constant.FILE
for match in re.finditer(pattern, line+"\n"):
#In the first case it's "file ch" in the second "file (ch)"
try:
extracted = match.group(1).strip()
except:
extracted = match.group(2).strip()
add_channel(extracted)
self.raw_input_names.append(extracted)
#Case there are multiple channels as input (e.g. channel1.mix(channel2))
pattern = constant.FROM
for match in re.finditer(pattern, line+"\n"):
extracted = match.group(1).strip()
self.raw_input_names.append(extracted)
#print(extracted)
if(bool(re.fullmatch(constant.WORD, extracted))):
add_channel(extracted)
else:
from .operation import Operation
operation = Operation(code=extracted, origin=self.origin)
operation.initialise()
operation.is_defined_in_process(self)
self.inputs+=operation.get_origins()
#self.inputs = list(set(self.inputs))#TODO Check this
#Function that extracts the inputs from a process (for DSLS workflows)
def initialise_inputs_DSL2(self):
code = self.get_input_code()
code = remove_jumps_inbetween_parentheses(code)
code = remove_jumps_inbetween_curlies(code)
for input in code.split("\n"):
input = input.strip()
if(input!=""):
self.inputs.append(input)
#Method that returns the input part of the process code
def get_output_code(self):
return self.output_code
def get_file_extensions_outputs(self):
code = self.get_output_code()
extensions = []
for match in re.finditer(r"(\.\w+)+|\.\w+", code):
extensions.append(match.group(0))
return extensions
def get_input_parameters(self):
code = self.get_input_code()
#This is to remove the from for the DSL1 processes
#But also remoce the 'stageAs'
lines = code.split('\n')
code = ""
for l in lines:
code+=l.split(" from ")[0].split("stageAs")[0]
code+'\n'
parameters = []
for match in re.finditer(r"\w+(\.\w+)*", code):
parameters.append(match.group(0))
parameters = list(set(parameters))#Here we can a unique cause a parameter can only be given once in any case
words_2_remove = ["path", "val", "tuple", "into", "stageAs", "emit", "file", "set"]
for word in words_2_remove:
try:
parameters.remove(word)
except:
None
return parameters
def get_modules(self):
return self.modules
def get_commands(self):
return self.commands
def get_code_with_alias(self):
code = self.get_code()
def replacer(match):
return match.group(0).replace(match.group(1), self.get_alias())
return re.sub(r"process\s*(\w+)\s*\{", replacer, code)
#Function that extracts the outputs from a process (DSL1)
def initialise_outputs_DSL1(self):
code = self.get_output_code()
code = remove_jumps_inbetween_parentheses(code)
code = remove_jumps_inbetween_curlies(code)
def add_channel(name):
from .channel import Channel
output = Channel(name=name, origin=self.origin)
if(not self.origin.check_in_channels(output)):
self.origin.add_channel(output)
output.add_source(self)
self.outputs.append(output)
else:
output = self.origin.get_channel_from_name(outputs[i].strip())
self.outputs.append(output)
output.add_source(self)
pattern =constant.INTO_2
for match in re.finditer(pattern, code):
outputs = match.group(1).split(',')
tab = []
for i in range(len(outputs)):
add_channel(outputs[i].strip())
tab.append(self.outputs[-1])
self.outputs_per_line.append(tab)
pattern = constant.FILE
for match in re.finditer(pattern, code):
add_channel(match.group(1))
self.outputs_per_line.append([self.outputs[-1]])
#Function that extracts the inputs from a process (for DSLS workflows)
def initialise_outputs_DSL2(self):
code = self.get_output_code()
code = remove_jumps_inbetween_parentheses(code)
code = remove_jumps_inbetween_curlies(code)
for output in code.split("\n"):
output = output.strip()
if(output!=""):
self.outputs.append(output)
def initialise_name(self):
for match in re.finditer(constant.PROCESS_HEADER, self.code.get_code()):
self.name = match.group(1)
self.name = self.name.replace("'", "")
self.name = self.name.replace('"', '')
self.alias = self.name
self.printed_name = self.name
def get_name_to_print(self):
return self.printed_name
def get_structure(self, dico):
dico['nodes'].append({'id':str(self), 'name':self.get_name_to_print(), "shape":"ellipse", 'xlabel':"", 'fillcolor':''})
def initialise_inputs_outputs(self):
DSL = self.nextflow_file.get_DSL()
if(DSL=="DSL1"):
self.initialise_inputs_DSL1()
self.initialise_outputs_DSL1()
elif(DSL=="DSL2"):
self.initialise_inputs_DSL2()
self.initialise_outputs_DSL2()
else:
raise Exception("Workflow is neither written in DSL1 nor DSL2!")
def initialise(self):
self.initialise_name()
self.initialise_parts()
self.initialise_inputs_outputs()