Skip to content
Snippets Groups Projects
Commit 3152bdb3 authored by George Marchment's avatar George Marchment
Browse files

Good version which works on DSL1

parent 6df3ee7e
No related branches found
No related tags found
No related merge requests found
Pipeline #14357 failed with stage
in 2 minutes and 29 seconds
......@@ -12,10 +12,10 @@ class BioFlowInsightError(Exception):
def __init__(self, error, num=None, origin = None):
self.origin = origin
if(num!=None):
if(origin!=None):
super().__init__(f"[{num}] Error in the file '{self.origin.get_file_address()}': "+error)
else:
super().__init__(f"[{num}] {error}")
#if(origin!=None):
# super().__init__(f"[{num}] Error in the file '{self.origin.get_file_address()}': "+error)
#else:
super().__init__(f"[{num}] {error}")
else:
super().__init__(f"{error}")
......
......@@ -38,9 +38,6 @@ class Include(Nextflow_Building_Blocks):
def get_duplicate_status(self):
return self.nextflow_file_origin.get_duplicate_status()
def get_root_directory(self):
return self.origin.get_root_directory()
#def get_list_name_includes(self):
# if(self.get_duplicate_status()):
......@@ -67,7 +64,8 @@ class Include(Nextflow_Building_Blocks):
if(address.split('/')[0] in ["$projectDir", "${projectDir}", "${baseDir}", "$baseDir"]):
address = '/'.join(address.split('/')[1:])
root = self.get_root_directory()
#root = self.get_root_directory()
root = self.nextflow_file_origin.get_root_directory()
address = root+'/'+address
if(os.path.isfile(address)):
found_file = True
......@@ -79,14 +77,14 @@ class Include(Nextflow_Building_Blocks):
found_file = True
if(not found_file and os.path.isfile(address[:-3]+"/main.nf")):
self.nextflow_file = Nextflow_File(address[:-3]+"/main.nf", workflow = self.nextflow_file_origin.get_workflow())
self.nextflow_file = Nextflow_File(address[:-3]+"/main.nf", workflow = self.nextflow_file_origin.get_workflow(), first_file=False)
#TODO -> check if the nextflow_file is defined somewhere else?
#In the cas the nextflow file is imported multiple times
else:
if(os.path.isfile(address)):
self.nextflow_file = Nextflow_File(address, workflow = self.nextflow_file_origin.get_workflow())
self.nextflow_file = Nextflow_File(address, workflow = self.nextflow_file_origin.get_workflow(), first_file=False)
else:
address = os.path.normpath(address)
raise BioFlowInsightError(f"Something went wrong in an include{self.get_string_line(self.get_code())}. No such file: '{address}'.", num = 10,origin=self)
......
......@@ -85,38 +85,52 @@ class Main(Nextflow_Building_Blocks):
def get_most_influential_conditions(self):
most_influential_conditions = {}
all_calls = self.get_all_calls_in_subworkflow()
for c in all_calls:
if(c.get_first_element_called().get_type()=="Process"):
for condition in c.get_block().get_all_conditions(conditions = {}):
try:
temp = most_influential_conditions[condition]
except:
most_influential_conditions[condition] = 0
most_influential_conditions[condition]+=1
if(c.get_first_element_called().get_type()=="Subworkflow"):
#Adding the number of calls from a process at the root of the subworkflow
num = 0
calls_at_root = c.get_first_element_called().root.get_calls_same_level()
for call_at_root in calls_at_root:
for c_at_root in call_at_root.get_all_calls():
if(c_at_root.get_first_element_called().get_type()=="Process"):
num +=1
most_influential_conditions_in_sub = c.get_first_element_called().get_most_influential_conditions()
#Adding the conditions from inside the subworkflow
for condition in most_influential_conditions_in_sub:
try:
temp = most_influential_conditions[condition]
except:
most_influential_conditions[condition] = 0
num+=most_influential_conditions_in_sub[condition]
most_influential_conditions[condition]+=most_influential_conditions_in_sub[condition]
#Adding calls from the subworkflow to the conditions
for condition in c.get_block().get_all_conditions(conditions = {}):
try:
temp = most_influential_conditions[condition]
except:
most_influential_conditions[condition] = 0
most_influential_conditions[condition]+=num
return most_influential_conditions
\ No newline at end of file
if(self.nextflow_file.get_DSL()=="DSL2"):
all_calls = self.get_all_calls_in_subworkflow()
for c in all_calls:
if(c.get_first_element_called().get_type()=="Process"):
for condition in c.get_block().get_all_conditions(conditions = {}):
try:
temp = most_influential_conditions[condition]
except:
most_influential_conditions[condition] = 0
most_influential_conditions[condition]+=1
if(c.get_first_element_called().get_type()=="Subworkflow"):
#Adding the number of calls from a process at the root of the subworkflow
num = 0
calls_at_root = c.get_first_element_called().root.get_calls_same_level()
for call_at_root in calls_at_root:
for c_at_root in call_at_root.get_all_calls():
if(c_at_root.get_first_element_called().get_type()=="Process"):
num +=1
most_influential_conditions_in_sub = c.get_first_element_called().get_most_influential_conditions()
#Adding the conditions from inside the subworkflow
for condition in most_influential_conditions_in_sub:
try:
temp = most_influential_conditions[condition]
except:
most_influential_conditions[condition] = 0
num+=most_influential_conditions_in_sub[condition]
most_influential_conditions[condition]+=most_influential_conditions_in_sub[condition]
#Adding calls from the subworkflow to the conditions
for condition in c.get_block().get_all_conditions(conditions = {}):
try:
temp = most_influential_conditions[condition]
except:
most_influential_conditions[condition] = 0
most_influential_conditions[condition]+=num
return most_influential_conditions
elif(self.nextflow_file.get_DSL()=="DSL1"):
processes = self.get_modules_defined()
for p in processes:
if(p.is_initialised()):
for condition in p.origin.get_all_conditions(conditions = {}):
try:
temp = most_influential_conditions[condition]
except:
most_influential_conditions[condition] = 0
most_influential_conditions[condition]+=1
return most_influential_conditions
else:
raise Exception("This shouldn't happen")
\ No newline at end of file
......@@ -83,8 +83,6 @@ class Nextflow_Building_Blocks:
def get_name_file(self):
return self.origin.get_name_file()
def get_root_directory(self):
return self.origin.get_root_directory()
def get_rocrate_key(self, dico):
return f"{self.get_file_address()[len(dico['temp_directory'])+1:]}#{self.get_name()}"
......
......@@ -40,6 +40,9 @@ class Nextflow_File(Nextflow_Building_Blocks):
#GENERAL
#----------------------
def get_root_directory(self):
return self.workflow.get_root_directory()
def get_string_line(self, bit_of_code):
return self.code.get_string_line(bit_of_code)
......
......@@ -56,7 +56,7 @@ class Workflow:
with open(file, 'r') as f:
txt= f.read()
else:
raise BioFlowInsightError("Multiple Nextflow files found at the root with no 'main.nf' file: I don't know which want to select")
raise BioFlowInsightError("Multiple Nextflow files found at the root with no 'main.nf' file: I don't know which one to select")
......@@ -90,6 +90,10 @@ class Workflow:
with open(self.output_dir / "debug" / "operations_in_call.nf",'w') as file:
pass
def get_root_directory(self):
first_file = self.get_first_file()
return '/'.join(str(first_file.get_file_address()).split('/')[:-1])+"/"
def get_conditions_2_ignore(self):
return self.conditions_2_ignore
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment