-
George Marchment authored3fb7c24c
nextflow_file.py 10.34 KiB
import re
import os
import json
import glob
from datetime import date
#TODO -> check this or either change the warnings to nothing
import warnings
from pathlib import Path
from . import constant
warnings.filterwarnings("ignore")
from .nextflow_building_blocks import Nextflow_Building_Blocks
from .outils import *
from .bioflowinsighterror import BioFlowInsightError
class Nextflow_File(Nextflow_Building_Blocks):
def __init__(self, address, workflow, first_file = False):
self.address = address
self.workflow = workflow
self.first_file = first_file
self.main = None
self.workflow.add_nextflow_file_2_workflow(self)
self.includes = []
self.processes = []
self.subworkflows = []
self.functions = []
self.initialised = False
contents = check_file_exists(self.get_file_address(), self)
Nextflow_Building_Blocks.__init__(self, contents)
#----------------------
#GENERAL
#----------------------
def get_string_line(self, bit_of_code):
return self.code.get_string_line(bit_of_code)
def get_conditions_2_ignore(self):
return self.workflow.get_conditions_2_ignore()
#Method that returns the address of the file
def get_file_address(self):
return Path(os.path.normpath(self.address))
def get_DSL(self):
return self.workflow.get_DSL()
#Method which returns the DSL of the workflow -> by default it's DSL2
#I use the presence of include, subworkflows and into/from in processes as a proxy
def find_DSL(self):
DSL = "DSL2"
#If there are include
pattern = constant.FULL_INLCUDE_2
for match in re.finditer(pattern, self.get_code()):
return DSL
#If there are subworkflows
for match in re.finditer(constant.SUBWORKFLOW_HEADER, self.get_code()):
return DSL
#If there is the main
for match in re.finditer(constant.WORKFLOW_HEADER_2, '\n'+self.get_code()+'\n'):
return DSL
#Analyse the processes
self.extract_processes()
for p in self.processes:
DSL = p.which_DSL()
if(DSL=="DSL1"):
return DSL
return DSL
def get_workflow(self):
return self.workflow
def get_duplicate_status(self):
return self.workflow.get_duplicate_status()
#Returns either a subworkflow or process from the name
def get_element_from_name(self, name):
for process in self.processes:
if(name==process.get_alias()):
return process
for subworkflow in self.subworkflows:
if(name==subworkflow.get_alias()):
return subworkflow
for fun in self.functions:
if(name==fun.get_alias()):
return fun
raise BioFlowInsightError(f"'{name}' is expected to be defined in the file, but it could not be found.", num = 18, origin=self)
def get_modules_defined(self):
return self.get_processes()+self.get_subworkflows()+self.get_functions()+self.get_modules_included()
def get_output_dir(self):
return self.workflow.get_output_dir()
#----------------------
#PROCESSES
#----------------------
def extract_processes(self):
from .process import Process
code = self.get_code()
#Find pattern
for match in re.finditer(constant.PROCESS_HEADER, code):
start = match.span(0)[0]
end = extract_curly(code, match.span(0)[1])#This function is defined in the functions file
p = Process(code=code[start:end], nextflow_file=self)
self.processes.append(p)
def get_processes(self):
return self.processes
#----------------------
#SUBWORKFLOW (ones found in the file)
#----------------------
def extract_subworkflows(self):
from .subworkflow import Subworkflow
#Get code without comments
code = self.get_code()
#Find pattern
for match in re.finditer(constant.SUBWORKFLOW_HEADER, code):
start = match.span(0)[0]
end = extract_curly(code, match.span(0)[1])#This function is defined in the functions file
sub = Subworkflow(code=code[start:end], nextflow_file=self, name=match.group(1))
self.subworkflows.append(sub)
def get_subworkflows(self):
return self.subworkflows
#----------------------
#MAIN WORKFLOW
#----------------------
#This method extracts the "main" workflow from the file
def extract_main(self):
from .main import Main
#This returns the code without the comments
code = "\n"+self.get_code()+"\n"
#Find pattern
twice = False
for match in re.finditer(constant.WORKFLOW_HEADER_2, code):
if(self.first_file):
start = match.span(1)[0]
end = extract_curly(code, match.span(1)[1])#This function is defined in the functions file
self.main = Main(code= code[start:end], nextflow_file=self)
if(twice):
#TODO turn into biofow insight error
raise Exception(f"Found multiple 'main workflows' in {self.get_file_address()}")
twice = True
else:
#TODO add num
BioFlowInsightError("A 'main' workflow was found in the Nextflow file")
#----------------------
#FUNCTIONS
#----------------------
#Method that extracts the functions from a file -> we don't analyse them
#since they don't structurally change the workflow
def extract_functions(self):
from .function import Function
#pattern_function = r"(def|String|void|Void|byte|short|int|long|float|double|char|Boolean) *(\w+) *\([^,)]*(,[^,)]+)*\)\s*{"
pattern_function = constant.HEADER_FUNCTION
code = self.get_code()
#Find pattern
for match in re.finditer(pattern_function, code):
start = match.span(0)[0]
end = extract_curly(code, match.span(0)[1])#This function is defined in the functions file
#f = Code(code=code[start:end], origin=self)
f = Function(code = code[start:end], name = match.group(2), origin =self)
self.functions.append(f)
def get_functions(self):
return self.functions
#----------------------
#INCLUDES
#----------------------
def extract_includes(self):
from .include import Include
code = self.get_code()
pattern = constant.FULL_INLCUDE_2
for match in re.finditer(pattern, code):
includes = match.group(1).replace('{', '').replace('}', '').strip()
#We do this if there are multiple includes
#TODO -> this in a nicer way
#To take into account
#include {
#PAIRTOOLS_SELECT
# as PAIRTOOLS_SELECT_VP;
#PAIRTOOLS_SELECT
# as PAIRTOOLS_SELECT_LONG
found_semi, found_n = bool(includes.find(";")+1), bool(includes.find("\n")+1)
if(found_semi and found_n):
temp = includes.split(";")
tab = []
for temp_include in temp:
temp_include = temp_include.replace("\n", ' ').strip()
if(temp_include[:3] in constant.LIST_AS):
tab[-1] = tab[-1]+" "+temp_include
else:
tab.append(temp_include)
includes = tab
elif(found_semi):
includes = includes.split(";")
elif(found_n):
temp = includes.split("\n")
tab = []
for temp_include in temp:
temp_include = temp_include.strip()
if(temp_include[:3]in constant.LIST_AS):
tab[-1] = tab[-1]+" "+temp_include
else:
tab.append(temp_include)
includes = tab
else:
includes = [includes]
#TODO -> check this
#https://www.nextflow.io/docs/latest/plugins.html#plugins
#https://github.com/nextflow-io/nf-validation
#address = match.group(0).split('from')[1].strip()
address = match.group(6).strip()
if(address[1:].split('/')[0] not in ['plugin']):
include = Include(code =match.group(0), file = address, importing = includes, nextflow_file=self)
self.includes.append(include)
def get_includes(self):
return self.includes
def get_modules_included(self):
modules = []
for include in self.includes:
modules+=list(include.defines.values())
return modules
#----------------------
#INITIALISE
#----------------------
#Method that initialises the nextflow file
def initialise(self):
#If the file is not alreday initialised then we self.initialise it
if(not self.initialised):
self.initialised = True
if(self.get_DSL()=="DSL2"):
if(self.workflow.get_display_info_bool()):
print(f"Analysing -> '{self.get_file_address()}'")
#Extarct Processes
self.extract_processes()
#Analysing Processes
for process in self.processes:
process.initialise()
#Code without processes
code = self.get_code()
for proecess in self.processes:
code = code.replace(proecess.get_code(), "")
#Extract includes
self.extract_includes()
#Extract subworkflows
self.extract_subworkflows()
#Analyse Inludes
for include in self.includes:
include.initialise()
#Extract main
self.extract_main()
#Extract functions
self.extract_functions()
#Analyse Main
if(self.first_file and self.main!=None):
self.main.initialise()
#
##Analyse subworkflows
#indice=1
#for sub in self.subworkflows:
# sub.initialise()
# indice+=1