Skip to content
Snippets Groups Projects
Commit f9dba7a9 authored by George Marchment's avatar George Marchment
Browse files

Weird memory bug when repeating the analysis of a DSL1 workflow without starting the kernel

parent 3fb7c24c
No related branches found
No related tags found
No related merge requests found
Pipeline #14355 failed with stage
in 2 minutes and 37 seconds
......@@ -7,6 +7,11 @@ class Block(Root):
Root.__init__(self = self, code = code, origin = origin, modules_defined = modules_defined, subworkflow_inputs = existing_channels)
self.condition = Condition(origin=self, condition = condition)
def delete(self):
super().delete()
self.condition.delete()
del self.condition
def initialise(self):
if(self.condition.value not in self.origin.get_conditions_2_ignore()):
return super().initialise()
......
......@@ -25,7 +25,11 @@ class Call(Executor):
self.analyse_first_element_called(self.get_code(clean_pipe = True))
#It's important this is last
#self.condition = Condition(self)
def delete(self):
self.condition.delete()
del self.condition
#This method returns all the calls inside a call eg p1(p2(), p3()) returns [p1(p2(), p3()), p2(), p3()]
def get_all_calls(self):
tab = []
......
......@@ -11,6 +11,10 @@ class Code:
self.initialise()
#self.check_its_nextflow()
def delete(self):
del self.code
del self.code_wo_comments
def initialise(self):
#I do this just to avoid out of file problems later on
......
......@@ -15,9 +15,10 @@ def get_object(address):
class Graph():
def __init__(self, workflow):
self.workflow = workflow
self.full_dico = workflow.get_structure()
with open(f"{self.get_output_dir()}/graphs/specification_graph.json", 'w') as output_file :
json.dump(self.full_dico, output_file, indent=4)
self.full_dico = {}
#self.full_dico = workflow.get_structure()
#with open(f"{self.get_output_dir()}/graphs/specification_graph.json", 'w') as output_file :
# json.dump(self.full_dico, output_file, indent=4)
#This dico give for the nodes its sister nodes
self.link_dico = None
#Dico to graph without operations
......@@ -36,6 +37,10 @@ class Graph():
def initialise(self, processes_2_remove = []):
if(not self.is_initialised()):
self.initialised = True
self.full_dico = self.workflow.get_structure()
with open(f"{self.get_output_dir()}/graphs/specification_graph.json", 'w') as output_file :
json.dump(self.full_dico, output_file, indent=4)
def get_node_id(dico, process):
for node in dico["nodes"]:
if(node['name']==process):
......@@ -83,7 +88,7 @@ class Graph():
self.dico_flattened["edges"] = []
#This will stay empty -> it's just so we can use the same function
self.dico_flattened["subworkflows"] = []
self.initialised = True
def is_initialised(self):
return self.initialised
......
......@@ -73,9 +73,7 @@ class Main(Nextflow_Building_Blocks):
#Check that includes are not defined in the main or subworkflows
self.check_includes()
self.root = Root(code=self.get_code(), origin=self, modules_defined=self.modules_defined)
self.root.initialise()
......
......@@ -14,6 +14,10 @@ class Nextflow_Building_Blocks:
def __init__(self, code):
self.code = Code(code = code, origin = self)
def delete(self):
self.code.delete()
del self.code
......
......@@ -73,7 +73,9 @@ class Nextflow_File(Nextflow_Building_Blocks):
for p in self.processes:
DSL = p.which_DSL()
if(DSL=="DSL1"):
self.processes = []
return DSL
self.processes = []
return DSL
def get_workflow(self):
......@@ -299,3 +301,19 @@ class Nextflow_File(Nextflow_Building_Blocks):
#for sub in self.subworkflows:
# sub.initialise()
# indice+=1
elif(self.get_DSL()=="DSL1"):
if(self.workflow.get_display_info_bool()):
print(f"Analysing -> '{self.get_file_address()}'")
from .main import Main
#Extarct Processes
self.extract_processes()
print(len(self.processes))
code = self.get_code()
#Replacing the processes defined with their identifiers -> this is to simplifly the analysis with the conditions
for process in self.processes:
code = code.replace(process.get_code(), f"process: {str(process)}")
self.main = Main(code= code, nextflow_file=self)
self.main.initialise()
else:
raise Exception("This shouldn't happen")
......@@ -191,7 +191,7 @@ class Operation(Executor):
from .channel import Channel
#Case it's a call and it's been replaced
if(re.fullmatch(constant.CALL_ID, name)):
self.gives.append(self.calls[name])
self.add_element_gives(self.calls[name])
raise Exception("This shoudn't happen! -> a call is taking a value")
else:
......@@ -216,7 +216,7 @@ class Operation(Executor):
channels = [channel]
for channel in channels:
self.gives.append(channel)
self.add_element_gives(channel)
#channel.initialise()
channel.add_source(self)
......
......@@ -14,12 +14,15 @@ class Process(Nextflow_Building_Blocks):
def __init__(self, code, nextflow_file):
self.nextflow_file = nextflow_file
self.code = Code(code, origin = self)
#Origin is only used in the case DSL1
self.origin = None
self.name = ""
self.alias = ""
self.printed_name = ""
self.inputs = []
self.raw_input_names = []#This is used to convert DSL1 workflows to DSL2
self.outputs = []
self.outputs_per_line = []
self.input_code = ""
self.output_code = ""
......@@ -51,6 +54,12 @@ class Process(Nextflow_Building_Blocks):
self.number_times_copied += 1
return process, num
def set_origin(self, thing):
if(self.nextflow_file.get_DSL()=="DSL1"):
self.origin = thing
else:
raise Exception("This shouldn't happen")
def add_to_emits(self, emit):
self.later_emits.append(emit)
......@@ -226,16 +235,21 @@ class Process(Nextflow_Building_Blocks):
code = re.sub(constant.JUMP_DOT, '.', code)
def add_channel(name):
from .channel import Channel
input = Channel(name=name, origin=self.origin)
if(not self.origin.check_in_channels(input)):
channels = self.origin.get_channels_from_name_same_level(name)
channels += self.origin.get_channels_from_name_above_level(name)
channels += self.origin.get_channels_from_name_inside_level(name)
channels += self.origin.get_channels_from_name_other_blocks_on_same_level(name)
if(len(channels)==0):
from .channel import Channel
input = Channel(name=name, origin=self.origin)
self.origin.add_channel(input)
input.add_sink(self)
self.inputs.append(input)
else:
input = self.origin.get_channel_from_name(name)
self.inputs.append(input)
input.add_sink(self)
for ch in channels:
self.inputs.append(ch)
ch.add_sink(self)
for line in code.split("\n"):
......@@ -333,18 +347,22 @@ class Process(Nextflow_Building_Blocks):
code = self.get_output_code()
code = remove_jumps_inbetween_parentheses(code)
code = remove_jumps_inbetween_curlies(code)
def add_channel(name):
from .channel import Channel
output = Channel(name=name, origin=self.origin)
if(not self.origin.check_in_channels(output)):
channels = self.origin.get_channels_from_name_same_level(name)
channels += self.origin.get_channels_from_name_above_level(name)
channels += self.origin.get_channels_from_name_inside_level(name)
channels += self.origin.get_channels_from_name_other_blocks_on_same_level(name)
if(len(channels)==0):
from .channel import Channel
output = Channel(name=name, origin=self.origin)
self.origin.add_channel(output)
output.add_source(self)
self.outputs.append(output)
else:
output = self.origin.get_channel_from_name(outputs[i].strip())
self.outputs.append(output)
output.add_source(self)
for ch in channels:
self.outputs.append(ch)
ch.add_source(self)
pattern =constant.INTO_2
for match in re.finditer(pattern, code):
......
......@@ -18,6 +18,7 @@ class Root(Nextflow_Building_Blocks):
self.modules_defined = modules_defined
self.elements_being_called = []
self.channels = subworkflow_inputs
self.defined_processes = []
#############
......@@ -219,12 +220,21 @@ class Root(Nextflow_Building_Blocks):
# c.get_first_element_called().root.get_all_calls(calls = calls)
#############
# PROCESSES
#############
def extract_defined_processes(self):
code = self.get_code()
#For each block -> remove its code
for b in self.blocks:
code = code.replace(b.get_code(), "")
for match in re.finditer(r"\<src\.process\.Process object at \w+\>", code):
for process in self.modules_defined:
if(str(process)==match.group(0)):
process.set_origin(self)
self.defined_processes.append(process)
def initialise(self):
#Define the blocks
......@@ -244,6 +254,9 @@ class Root(Nextflow_Building_Blocks):
self.extract_executors()
#Case DSL1 -> need to extract the processes which have been defined but rplaced in the code
self.extract_defined_processes()
#This is to get the order of execution
code = self.get_code()
......@@ -261,6 +274,17 @@ class Root(Nextflow_Building_Blocks):
block_code = block_code[:-1]
if(not found):
raise Exception("This shouldn't happen")
for process in self.defined_processes:
found = False
pos = code.find(str(process))
if(pos!=-1):
position_2_thing_2_analyse[pos] = process
found = True
if(not found):
raise Exception("This shouldn't happen")
for e in self.executors:
e_code = e.get_code()
found = False
......@@ -273,7 +297,8 @@ class Root(Nextflow_Building_Blocks):
else:
e_code = e_code[:-1]
if(not found):
raise Exception("This shouldn't happen")
raise Exception("This shouldn't happen")
sorted_position_2_thing_2_analyse = dict(sorted(position_2_thing_2_analyse.items()))
for key in sorted_position_2_thing_2_analyse:
......@@ -586,6 +611,17 @@ class Root(Nextflow_Building_Blocks):
self.executors.remove(e)
def get_structure(self, dico):
#This only for DSL1 workflows
if(self.origin.get_DSL()=="DSL1"):
for process in self.defined_processes:
process.get_structure(dico)
for channel in self.channels:
for sink in channel.get_sink():
#If the sink an operation then the edge has already been added in the get_structure method for the operation
if(sink.get_type()=="Process"):
channel.get_structure(dico, sink)
for block in self.blocks:
block.get_structure(dico)
for e in self.executors:
......
......@@ -163,11 +163,13 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen
dico['subworkflows'] = {}
if(self.get_DSL() == "DSL1"):
return self.get_structure_DSL1(dico=dico)
main = self.get_workflow_main()
if(main!=None):
return main.get_structure(dico)
elif(self.get_DSL() == "DSL2"):
main = self.get_workflow_main()
if(main!=None):
return self.get_workflow_main().get_structure(dico)
return main.get_structure(dico)
else:
return dico
#return self.get_structure_DSL2(dico=dico, start = True)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment