-
George Marchment authored
Added warning when channels are defined/used multiple times in different blocks -> sometimes breaks the rewrite
32357914
nextflow_file.py 14.41 KiB
import re
import os
import json
import glob
from datetime import date
from pathlib import Path
from . import constant
from .nextflow_building_blocks import Nextflow_Building_Blocks
from .outils import *
from .bioflowinsighterror import BioFlowInsightError
class Nextflow_File(Nextflow_Building_Blocks):
def __init__(self, address, workflow, first_file = False):
self.address = address
self.workflow = workflow
self.first_file = first_file
self.main = None
self.workflow.add_nextflow_file_2_workflow(self)
self.includes = []
self.processes = []
self.subworkflows = []
self.functions = []
self.initialised = False
contents = check_file_exists(self.get_file_address(), self)
Nextflow_Building_Blocks.__init__(self, contents, initialise_code=True)
self.check_file_correctness()
#----------------------
#GENERAL
#----------------------
def add_to_ternary_operation_dico(self, old, new):
self.workflow.add_to_ternary_operation_dico(old, new)
def add_map_element(self, old, new):
self.workflow.add_map_element(old, new)
def get_root_directory(self):
return self.workflow.get_root_directory()
def get_string_line(self, bit_of_code):
return self.code.get_string_line(bit_of_code)
def get_conditions_2_ignore(self):
return self.workflow.get_conditions_2_ignore()
#Method that returns the address of the file
def get_file_address(self):
return Path(os.path.normpath(self.address))
def get_DSL(self):
return self.workflow.get_DSL()
def check_file_correctness(self):
code = self.get_code()
if(code.count("{")!=code.count("}")):
curly_count = get_curly_count(code)
if(curly_count!=0):
raise BioFlowInsightError(f"Not the same number of opening and closing curlies '{'{}'}' in the file.", type = 16,origin=self)
if(code.count("(")!=code.count(")")):
parenthese_count = get_parenthese_count(code)
if(parenthese_count!=0):
raise BioFlowInsightError(f"Not the same number of opening and closing parentheses '()' in the file.", type = 16, origin=self)
if(code.count('"""')%2!=0):
raise BioFlowInsightError(f"An odd number of '\"\"\"' was found in the code.", type = 16, origin=self)
#Method which returns the DSL of the workflow -> by default it's DSL2
#I use the presence of include, subworkflows and into/from in processes as a proxy
def find_DSL(self):
DSL = "DSL2"
#If there are include
pattern = constant.FULL_INLCUDE_2
for match in re.finditer(pattern, self.get_code()):
return DSL
#If there are subworkflows
for match in re.finditer(constant.SUBWORKFLOW_HEADER, self.get_code()):
return DSL
#If there is the main
for match in re.finditer(constant.WORKFLOW_HEADER_2, '\n'+self.get_code()+'\n'):
return DSL
#Analyse the processes
self.extract_processes()
for p in self.processes:
DSL = p.which_DSL()
if(DSL=="DSL1"):
self.processes = []
return DSL
self.processes = []
return DSL
def get_workflow(self):
return self.workflow
def get_duplicate_status(self):
return self.workflow.get_duplicate_status()
#Returns either a subworkflow or process from the name
def get_element_from_name(self, name):
for process in self.processes:
if(name==process.get_alias()):
return process
for subworkflow in self.subworkflows:
if(name==subworkflow.get_alias()):
return subworkflow
for fun in self.functions:
if(name==fun.get_alias()):
return fun
raise BioFlowInsightError(f"'{name}' is expected to be defined in the file, but it could not be found.", type = 18, origin=self)
def get_modules_defined(self):
return self.get_processes()+self.get_subworkflows()+self.get_functions()+self.get_modules_included()
def get_output_dir(self):
return self.workflow.get_output_dir()
#----------------------
#PROCESSES
#----------------------
def extract_processes(self):
from .process import Process
code = self.get_code()
#Find pattern
for match in re.finditer(constant.PROCESS_HEADER, code):
start = match.span(0)[0]
end = extract_curly(code, match.span(0)[1])#This function is defined in the functions file
p = Process(code=code[start:end], nextflow_file=self)
self.processes.append(p)
def get_processes(self):
return self.processes
#----------------------
#SUBWORKFLOW (ones found in the file)
#----------------------
def extract_subworkflows(self):
from .subworkflow import Subworkflow
#Get code without comments
code = self.get_code()
#Find pattern
for match in re.finditer(constant.SUBWORKFLOW_HEADER, code):
start = match.span(0)[0]
end = extract_curly(code, match.span(0)[1])#This function is defined in the functions file
sub = Subworkflow(code=code[start:end], nextflow_file=self, name=match.group(1))
self.subworkflows.append(sub)
def get_subworkflows(self):
return self.subworkflows
#----------------------
#MAIN WORKFLOW
#----------------------
#This method extracts the "main" workflow from the file
def extract_main(self):
if(self.first_file):
from .main import Main
#This returns the code without the comments
code = "\n"+self.get_code()+"\n"
#Find pattern
twice = False
for match in re.finditer(constant.WORKFLOW_HEADER_2, code):
start = match.span(1)[0]
end = extract_curly(code, match.span(1)[1])#This function is defined in the functions file
self.main = Main(code= code[start:end], nextflow_file=self)
if(twice):
#TODO turn into biofow insight error
raise Exception(f"Found multiple 'main workflows' in {self.get_file_address()}")
twice = True
if(self.main==None):
raise BioFlowInsightError("A 'main' workflow was not found in the Nextflow file")
#----------------------
#FUNCTIONS
#----------------------
#Method that extracts the functions from a file -> we don't analyse them
#since they don't structurally change the workflow
def extract_functions(self):
from .function import Function
#pattern_function = r"(def|String|void|Void|byte|short|int|long|float|double|char|Boolean) *(\w+) *\([^,)]*(,[^,)]+)*\)\s*{"
pattern_function = constant.HEADER_FUNCTION
code = self.get_code()
#Find pattern
for match in re.finditer(pattern_function, code):
start = match.span(0)[0]
end = extract_curly(code, match.span(0)[1])#This function is defined in the functions file
#f = Code(code=code[start:end], origin=self)
#Fobiden names of functions
if(match.group(2) not in ['if']):
f = Function(code = code[start:end], name = match.group(2), origin =self)
self.functions.append(f)
def get_functions(self):
return self.functions
#----------------------
#INCLUDES
#----------------------
def extract_includes(self):
from .include import Include
code = self.get_code()
pattern = constant.FULL_INLCUDE_2
for match in re.finditer(pattern, code):
includes = match.group(1).replace('{', '').replace('}', '').strip()
#We do this if there are multiple includes
#TODO -> this in a nicer way
#To take into account
#include {
#PAIRTOOLS_SELECT
# as PAIRTOOLS_SELECT_VP;
#PAIRTOOLS_SELECT
# as PAIRTOOLS_SELECT_LONG
found_semi, found_n = bool(includes.find(";")+1), bool(includes.find("\n")+1)
if(found_semi and found_n):
temp = includes.split(";")
tab = []
for temp_include in temp:
temp_include = temp_include.replace("\n", ' ').strip()
if(temp_include[:3] in constant.LIST_AS):
tab[-1] = tab[-1]+" "+temp_include
else:
tab.append(temp_include)
includes = tab
elif(found_semi):
includes = includes.split(";")
elif(found_n):
temp = includes.split("\n")
tab = []
for temp_include in temp:
temp_include = temp_include.strip()
if(temp_include[:3]in constant.LIST_AS):
tab[-1] = tab[-1]+" "+temp_include
else:
tab.append(temp_include)
includes = tab
else:
includes = [includes]
#TODO -> check this
#https://www.nextflow.io/docs/latest/plugins.html#plugins
#https://github.com/nextflow-io/nf-validation
#address = match.group(0).split('from')[1].strip()
address = match.group(6).strip()
if(address[1:].split('/')[0] not in ['plugin']):
include = Include(code =match.group(0), file = address, importing = includes, nextflow_file=self)
self.includes.append(include)
def get_includes(self):
return self.includes
def get_modules_included(self):
modules = []
for include in self.includes:
modules+=list(include.defines.values())
return modules
def get_calls_made_outside_of_main(self):
#Code without processes
code = self.get_code()
for proecess in self.processes:
temp = code
code = code.replace(proecess.get_code(), "")
if(temp==code):
raise Exception("This souldn't happen")
for sub in self.subworkflows:
temp = code
code = code.replace(sub.get_code(), "")
if(temp==code):
raise Exception("This souldn't happen")
for fun in self.functions:
temp = code
code = code.replace(fun.get_code(), "")
if(temp==code):
raise Exception("This souldn't happen")
if(self.first_file and self.main!=None):
temp = code
code = code.replace(self.main.get_code(), "")
if(temp==code):
raise Exception("This souldn't happen")
for include in self.includes:
temp = code
code = code.replace(include.get_code(), "")
if(temp==code):
raise Exception("This souldn't happen")
from .root import Root
self.root = Root(code=code, origin= self, modules_defined=self.get_modules_defined(), subworkflow_inputs = [])
self.root.initialise()
calls = {}
self.root.get_all_calls_in_subworkflow(calls=calls)
return list(calls.keys())
#----------------------
#INITIALISE
#----------------------
#Method that initialises the nextflow file
def initialise(self):
#If the file is not alreday initialised then we self.initialise it
if(not self.initialised):
self.initialised = True
if(self.workflow.get_display_info_bool()):
print(f"Analysing -> '{self.get_file_address()}'")
if(self.get_DSL()=="DSL2"):
#Extarct Processes
self.extract_processes()
#Analysing Processes
for process in self.processes:
process.initialise()
#Code without processes
code = self.get_code()
for proecess in self.processes:
temp = code
code = code.replace(proecess.get_code(), "")
if(temp==code):
print(code)
print(proecess.get_code())
raise Exception("This souldn't happen")
#Extract includes
self.extract_includes()
#Extract subworkflows
self.extract_subworkflows()
#Analyse Inludes
for include in self.includes:
include.initialise()
#Extract main
self.extract_main()
#Extract functions
self.extract_functions()
#Analyse Main
if(self.first_file and self.main!=None):
self.main.initialise()
#
##Analyse subworkflows
#indice=1
#for sub in self.subworkflows:
# sub.initialise()
# indice+=1
elif(self.get_DSL()=="DSL1"):
from .main import Main
#Extarct Processes
self.extract_processes()
code = self.get_code()
#Extract functions
self.extract_functions()
#Replacing the processes and functions defined with their identifiers -> this is to simplifly the analysis with the conditions
for process in self.processes:
temp = code
code = code.replace(process.get_code(get_OG = True), f"process: {str(process)}")
if(temp==code):
print(process.get_code())
raise Exception("Something went wrong the code hasn't changed")
for function in self.functions:
temp = code
code = code.replace(function.get_code(get_OG = True), f"function: {str(function)}")
if(temp==code):
raise Exception("Something went wrong the code hasn't changed")
self.main = Main(code= code, nextflow_file=self)
self.main.initialise()
else:
raise Exception("This shouldn't happen")