From f9dba7a95e54f8380bd5ea9aee977fa0c18c197e Mon Sep 17 00:00:00 2001 From: George Marchment <georgemarchment@yahoo.fr> Date: Wed, 12 Feb 2025 13:05:37 +0100 Subject: [PATCH] Weird memory bug when repeating the analysis of a DSL1 workflow without starting the kernel --- src/block.py | 5 ++++ src/call.py | 6 ++++- src/code_.py | 4 +++ src/graph.py | 13 ++++++--- src/main.py | 2 -- src/nextflow_building_blocks.py | 4 +++ src/nextflow_file.py | 18 +++++++++++++ src/operation.py | 4 +-- src/process.py | 44 +++++++++++++++++++++--------- src/root.py | 48 ++++++++++++++++++++++++++++----- src/workflow.py | 6 +++-- 11 files changed, 124 insertions(+), 30 deletions(-) diff --git a/src/block.py b/src/block.py index 8d03cfc..375341b 100644 --- a/src/block.py +++ b/src/block.py @@ -7,6 +7,11 @@ class Block(Root): Root.__init__(self = self, code = code, origin = origin, modules_defined = modules_defined, subworkflow_inputs = existing_channels) self.condition = Condition(origin=self, condition = condition) + def delete(self): + super().delete() + self.condition.delete() + del self.condition + def initialise(self): if(self.condition.value not in self.origin.get_conditions_2_ignore()): return super().initialise() diff --git a/src/call.py b/src/call.py index 7febe83..1ad4f15 100644 --- a/src/call.py +++ b/src/call.py @@ -25,7 +25,11 @@ class Call(Executor): self.analyse_first_element_called(self.get_code(clean_pipe = True)) #It's important this is last #self.condition = Condition(self) - + + def delete(self): + self.condition.delete() + del self.condition + #This method returns all the calls inside a call eg p1(p2(), p3()) returns [p1(p2(), p3()), p2(), p3()] def get_all_calls(self): tab = [] diff --git a/src/code_.py b/src/code_.py index 6e45913..c03ff90 100644 --- a/src/code_.py +++ b/src/code_.py @@ -11,6 +11,10 @@ class Code: self.initialise() #self.check_its_nextflow() + def delete(self): + del self.code + del self.code_wo_comments + def initialise(self): #I do this just to avoid out of file problems later on diff --git a/src/graph.py b/src/graph.py index ef0caf1..bc0afd1 100644 --- a/src/graph.py +++ b/src/graph.py @@ -15,9 +15,10 @@ def get_object(address): class Graph(): def __init__(self, workflow): self.workflow = workflow - self.full_dico = workflow.get_structure() - with open(f"{self.get_output_dir()}/graphs/specification_graph.json", 'w') as output_file : - json.dump(self.full_dico, output_file, indent=4) + self.full_dico = {} + #self.full_dico = workflow.get_structure() + #with open(f"{self.get_output_dir()}/graphs/specification_graph.json", 'w') as output_file : + # json.dump(self.full_dico, output_file, indent=4) #This dico give for the nodes its sister nodes self.link_dico = None #Dico to graph without operations @@ -36,6 +37,10 @@ class Graph(): def initialise(self, processes_2_remove = []): if(not self.is_initialised()): + self.initialised = True + self.full_dico = self.workflow.get_structure() + with open(f"{self.get_output_dir()}/graphs/specification_graph.json", 'w') as output_file : + json.dump(self.full_dico, output_file, indent=4) def get_node_id(dico, process): for node in dico["nodes"]: if(node['name']==process): @@ -83,7 +88,7 @@ class Graph(): self.dico_flattened["edges"] = [] #This will stay empty -> it's just so we can use the same function self.dico_flattened["subworkflows"] = [] - self.initialised = True + def is_initialised(self): return self.initialised diff --git a/src/main.py b/src/main.py index 9b2d0a9..2e2551b 100644 --- a/src/main.py +++ b/src/main.py @@ -73,9 +73,7 @@ class Main(Nextflow_Building_Blocks): #Check that includes are not defined in the main or subworkflows self.check_includes() - self.root = Root(code=self.get_code(), origin=self, modules_defined=self.modules_defined) - self.root.initialise() diff --git a/src/nextflow_building_blocks.py b/src/nextflow_building_blocks.py index cf683ff..993e585 100644 --- a/src/nextflow_building_blocks.py +++ b/src/nextflow_building_blocks.py @@ -14,6 +14,10 @@ class Nextflow_Building_Blocks: def __init__(self, code): self.code = Code(code = code, origin = self) + def delete(self): + self.code.delete() + del self.code + diff --git a/src/nextflow_file.py b/src/nextflow_file.py index d62b04d..719cb96 100644 --- a/src/nextflow_file.py +++ b/src/nextflow_file.py @@ -73,7 +73,9 @@ class Nextflow_File(Nextflow_Building_Blocks): for p in self.processes: DSL = p.which_DSL() if(DSL=="DSL1"): + self.processes = [] return DSL + self.processes = [] return DSL def get_workflow(self): @@ -299,3 +301,19 @@ class Nextflow_File(Nextflow_Building_Blocks): #for sub in self.subworkflows: # sub.initialise() # indice+=1 + elif(self.get_DSL()=="DSL1"): + if(self.workflow.get_display_info_bool()): + print(f"Analysing -> '{self.get_file_address()}'") + from .main import Main + #Extarct Processes + self.extract_processes() + print(len(self.processes)) + code = self.get_code() + #Replacing the processes defined with their identifiers -> this is to simplifly the analysis with the conditions + for process in self.processes: + code = code.replace(process.get_code(), f"process: {str(process)}") + self.main = Main(code= code, nextflow_file=self) + self.main.initialise() + + else: + raise Exception("This shouldn't happen") diff --git a/src/operation.py b/src/operation.py index 29d383f..7735694 100644 --- a/src/operation.py +++ b/src/operation.py @@ -191,7 +191,7 @@ class Operation(Executor): from .channel import Channel #Case it's a call and it's been replaced if(re.fullmatch(constant.CALL_ID, name)): - self.gives.append(self.calls[name]) + self.add_element_gives(self.calls[name]) raise Exception("This shoudn't happen! -> a call is taking a value") else: @@ -216,7 +216,7 @@ class Operation(Executor): channels = [channel] for channel in channels: - self.gives.append(channel) + self.add_element_gives(channel) #channel.initialise() channel.add_source(self) diff --git a/src/process.py b/src/process.py index 5c0cdf4..e717fe8 100644 --- a/src/process.py +++ b/src/process.py @@ -14,12 +14,15 @@ class Process(Nextflow_Building_Blocks): def __init__(self, code, nextflow_file): self.nextflow_file = nextflow_file self.code = Code(code, origin = self) + #Origin is only used in the case DSL1 + self.origin = None self.name = "" self.alias = "" self.printed_name = "" self.inputs = [] self.raw_input_names = []#This is used to convert DSL1 workflows to DSL2 self.outputs = [] + self.outputs_per_line = [] self.input_code = "" self.output_code = "" @@ -51,6 +54,12 @@ class Process(Nextflow_Building_Blocks): self.number_times_copied += 1 return process, num + def set_origin(self, thing): + if(self.nextflow_file.get_DSL()=="DSL1"): + self.origin = thing + else: + raise Exception("This shouldn't happen") + def add_to_emits(self, emit): self.later_emits.append(emit) @@ -226,16 +235,21 @@ class Process(Nextflow_Building_Blocks): code = re.sub(constant.JUMP_DOT, '.', code) def add_channel(name): - from .channel import Channel - input = Channel(name=name, origin=self.origin) - if(not self.origin.check_in_channels(input)): + channels = self.origin.get_channels_from_name_same_level(name) + channels += self.origin.get_channels_from_name_above_level(name) + channels += self.origin.get_channels_from_name_inside_level(name) + channels += self.origin.get_channels_from_name_other_blocks_on_same_level(name) + if(len(channels)==0): + from .channel import Channel + input = Channel(name=name, origin=self.origin) self.origin.add_channel(input) input.add_sink(self) self.inputs.append(input) else: - input = self.origin.get_channel_from_name(name) - self.inputs.append(input) - input.add_sink(self) + for ch in channels: + self.inputs.append(ch) + ch.add_sink(self) + for line in code.split("\n"): @@ -333,18 +347,22 @@ class Process(Nextflow_Building_Blocks): code = self.get_output_code() code = remove_jumps_inbetween_parentheses(code) code = remove_jumps_inbetween_curlies(code) + def add_channel(name): - from .channel import Channel - output = Channel(name=name, origin=self.origin) - if(not self.origin.check_in_channels(output)): + channels = self.origin.get_channels_from_name_same_level(name) + channels += self.origin.get_channels_from_name_above_level(name) + channels += self.origin.get_channels_from_name_inside_level(name) + channels += self.origin.get_channels_from_name_other_blocks_on_same_level(name) + if(len(channels)==0): + from .channel import Channel + output = Channel(name=name, origin=self.origin) self.origin.add_channel(output) output.add_source(self) self.outputs.append(output) else: - output = self.origin.get_channel_from_name(outputs[i].strip()) - self.outputs.append(output) - output.add_source(self) - + for ch in channels: + self.outputs.append(ch) + ch.add_source(self) pattern =constant.INTO_2 for match in re.finditer(pattern, code): diff --git a/src/root.py b/src/root.py index bd202ce..fd4c755 100644 --- a/src/root.py +++ b/src/root.py @@ -18,6 +18,7 @@ class Root(Nextflow_Building_Blocks): self.modules_defined = modules_defined self.elements_being_called = [] self.channels = subworkflow_inputs + self.defined_processes = [] ############# @@ -219,12 +220,21 @@ class Root(Nextflow_Building_Blocks): # c.get_first_element_called().root.get_all_calls(calls = calls) - - - - - + ############# + # PROCESSES + ############# + def extract_defined_processes(self): + code = self.get_code() + #For each block -> remove its code + for b in self.blocks: + code = code.replace(b.get_code(), "") + for match in re.finditer(r"\<src\.process\.Process object at \w+\>", code): + for process in self.modules_defined: + if(str(process)==match.group(0)): + process.set_origin(self) + self.defined_processes.append(process) + def initialise(self): #Define the blocks @@ -244,6 +254,9 @@ class Root(Nextflow_Building_Blocks): self.extract_executors() + + #Case DSL1 -> need to extract the processes which have been defined but rplaced in the code + self.extract_defined_processes() #This is to get the order of execution code = self.get_code() @@ -261,6 +274,17 @@ class Root(Nextflow_Building_Blocks): block_code = block_code[:-1] if(not found): raise Exception("This shouldn't happen") + + for process in self.defined_processes: + found = False + pos = code.find(str(process)) + if(pos!=-1): + position_2_thing_2_analyse[pos] = process + found = True + if(not found): + raise Exception("This shouldn't happen") + + for e in self.executors: e_code = e.get_code() found = False @@ -273,7 +297,8 @@ class Root(Nextflow_Building_Blocks): else: e_code = e_code[:-1] if(not found): - raise Exception("This shouldn't happen") + raise Exception("This shouldn't happen") + sorted_position_2_thing_2_analyse = dict(sorted(position_2_thing_2_analyse.items())) for key in sorted_position_2_thing_2_analyse: @@ -586,6 +611,17 @@ class Root(Nextflow_Building_Blocks): self.executors.remove(e) def get_structure(self, dico): + #This only for DSL1 workflows + if(self.origin.get_DSL()=="DSL1"): + for process in self.defined_processes: + process.get_structure(dico) + for channel in self.channels: + for sink in channel.get_sink(): + #If the sink an operation then the edge has already been added in the get_structure method for the operation + if(sink.get_type()=="Process"): + channel.get_structure(dico, sink) + + for block in self.blocks: block.get_structure(dico) for e in self.executors: diff --git a/src/workflow.py b/src/workflow.py index 280392c..21ddda5 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -163,11 +163,13 @@ George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen dico['subworkflows'] = {} if(self.get_DSL() == "DSL1"): - return self.get_structure_DSL1(dico=dico) + main = self.get_workflow_main() + if(main!=None): + return main.get_structure(dico) elif(self.get_DSL() == "DSL2"): main = self.get_workflow_main() if(main!=None): - return self.get_workflow_main().get_structure(dico) + return main.get_structure(dico) else: return dico #return self.get_structure_DSL2(dico=dico, start = True) -- GitLab