Skip to content
Snippets Groups Projects
workflow.py 59.78 KiB
#Import dependencies
#Local
from .nextflow_file import Nextflow_File
from .ro_crate import RO_Crate
from . import constant
from .outils import is_git_directory, format_with_tabs, replace_thing_by_call, replace_group1, group_together_ifs, extract_curly, remove_extra_jumps, get_channels_to_add_in_false_conditions, extract_conditions, remove_empty_conditions_place_anker
from .outils_graph import get_flatten_dico, initia_link_dico_rec, get_number_cycles, generate_graph
from .outils_annotate import get_tools_commands_from_user_for_process
from .bioflowinsighterror import BioFlowInsightError
from .graph import Graph

#Outside packages
import os
import re
import json
from pathlib import Path
import glob
import ctypes
import time



class Workflow:
    """
    This is the main workflow class, from this class, workflow analysis can be done.
    After analysis, workflow structure reconstruction can be done.

    Attributes:
        file: A string indicating the address to the workflow main or the directory containing the workflow
        duplicate: A boolean indicating if processes are to be duplicated in the structure
        display_info: A boolean indicating if the analysis information should be printed
        output_dir: A string indicating where the results will be saved
        name: A string indicating the name of the workflow
        processes_2_remove: A string indicating the processes to remove from the workflow
    """

    def __init__(self, file, duplicate=True, display_info=True, output_dir = './results',
                 name = None, processes_2_remove = None):
        
        
        #Getting the main nextflow file
        if(not os.path.isfile(file)):
            nextflow_files = glob.glob(f'{file}/*.nf')
            if(len(nextflow_files)==0):
                raise BioFlowInsightError("No Nextflow files ('.nf') are in the directory!", num = -1)
            txt = ""
            #Try to read the main.nf file -> if this cannot be found then the first nextflow file is used
            try:
                
                file = file+"/main.nf"
                with open(file, 'r') as f:
                    txt= f.read()
            except:
                None
                #raise BioFlowInsightError("No 'main.nf' file found at the root of the prohject")
            if(txt==""):
                if(len(nextflow_files)==1):
                    file = nextflow_files[0]
                    with open(file, 'r') as f:
                        txt= f.read()
                else:
                    #If there are multiple files and no main -> we just choose one at random
                    file = nextflow_files[0]
                    with open(file, 'r') as f:
                        txt= f.read()
                    #raise BioFlowInsightError("Multiple Nextflow files found at the root with no 'main.nf' file: I don't know which one to select")


        
        self.duplicate = duplicate
        self.display_info = display_info
        self.processes_2_remove = processes_2_remove
        self.output_dir = Path(output_dir)
        self.nextflow_files = []
        self.workflow_directory = '/'.join(file.split('/')[:-1])
        self.name = name
        self.graph = None
        self.conditions_2_ignore = []
        self.ternary_operation_dico = {}
        self.map_element_dico = {}


        OG_file = Nextflow_File(file, workflow = self, first_file = True)
        self.DSL = OG_file.find_DSL()
        self.create_empty_results()
        if(self.display_info):
            print(f"Workflow is written in {self.DSL}")
        

    def create_empty_results(self):
        os.makedirs(self.output_dir, exist_ok=True)
        os.makedirs(self.output_dir / 'debug', exist_ok=True)
        os.makedirs(self.output_dir / 'graphs', exist_ok=True)

        with open(self.output_dir / "debug" / "operations.nf",'w') as file:
            pass
        with open(self.output_dir / "debug" / "calls.nf",'w') as file:
            pass
        with open(self.output_dir / "debug" / "operations_in_call.nf",'w') as file:
            pass

    def get_root_directory(self):
        first_file = self.get_first_file()
        return '/'.join(str(first_file.get_file_address()).split('/')[:-1])+"/"

    def get_conditions_2_ignore(self):
        return self.conditions_2_ignore

    def get_duplicate_status(self):
        return self.duplicate

    def get_output_dir(self):
        return Path(self.output_dir)

    def get_DSL(self):
        return self.DSL
    
    def get_display_info_bool(self):
        return self.display_info
    
    def set_DSL(self, DSL):
        self.DSL = DSL

    def get_first_file(self):
        for file in self.nextflow_files:
            if(file.first_file):
                return file
            
    def get_workflow_main(self):
        return self.get_first_file().main

    def add_nextflow_file_2_workflow(self, nextflow_file):
        self.nextflow_files.append(nextflow_file)
        self.nextflow_files = list(set(self.nextflow_files))

    def initialise(self, conditions_2_ignore = []):
        """Method that initialises the analysis of the worflow

        Keyword arguments:
        
        """
        self.conditions_2_ignore = conditions_2_ignore
        #Right now i'm just gonna do everything in DSL2

        #At this point there should only be one nextflow file
        if(len(self.nextflow_files)==1):
            self.nextflow_files[0].initialise()
        else:
            raise BioFlowInsightError("This souldn't happen. There are multiple Nextflow files composing the workflow before the analysis has even started.")
        
        
        if(self.display_info):
            citation = """\nTo cite BioFlow-Insight, please use the following publication:
George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen-Boulakia, BioFlow-Insight: facilitating reuse of Nextflow workflows with structure reconstruction and visualization, NAR Genomics and Bioinformatics, Volume 6, Issue 3, September 2024, lqae092, https://doi.org/10.1093/nargab/lqae092"""
            print(citation)

        if(self.graph==None):
            self.graph = Graph(self)



    def iniatilise_tab_processes_2_remove(self):
        if(self.processes_2_remove==None):
            tab_processes_2_remove = []
            if(self.processes_2_remove!=None):
                temp = self.processes_2_remove.split(",")
                for t in temp:
                    tab_processes_2_remove.append(t.strip())
            self.processes_2_remove = tab_processes_2_remove


    def get_structure(self):
        dico = {}
        dico['nodes'] = []
        dico['edges'] = []
        dico['subworkflows'] = {}

        if(self.get_DSL() == "DSL1"):
            main = self.get_workflow_main()
            if(main!=None):
                return main.get_structure(dico)
        elif(self.get_DSL() == "DSL2"):
            main = self.get_workflow_main()
            if(main!=None):
                return main.get_structure(dico)
            else:
                return dico
            #return self.get_structure_DSL2(dico=dico, start = True)
        else:
            raise Exception(f"The workflow's DSL is '{self.DSL}' -> I don't know what this is!")

    #################
    #    GRAPHS
    #################

    def generate_specification_graph(self, render_graphs = True):
        self.iniatilise_tab_processes_2_remove()
        self.graph.initialise(processes_2_remove = self.processes_2_remove)
        self.graph.get_specification_graph(render_graphs = render_graphs)

    def generate_process_dependency_graph(self, render_graphs = True):
        self.iniatilise_tab_processes_2_remove()
        self.graph.initialise(processes_2_remove = self.processes_2_remove)
        self.graph.render_process_dependency_graph(render_graphs = render_graphs)

    #TODO -> update this
    def generate_all_graphs(self, render_graphs = True):
        self.generate_specification_graph(render_graphs = render_graphs)
        self.generate_process_dependency_graph(render_graphs = render_graphs)

    #Method that checks if a given graph sepcification is an isomorphism with the workflows
    def check_if_json_equal_to_full_structure(self, file):
        self.iniatilise_tab_processes_2_remove()
        return self.graph.check_if_json_equal_to_full_structure(file, processes_2_remove = self.processes_2_remove)

    ###########################
    #    Generate test data
    ###########################
    #These are the methods which generate the test data

    def generate_test_specification_graph(self):
        dico = self.graph.get_full_dico()
        with open(self.get_output_dir()/ 'test' /"specification_graph.json", "w") as outfile: 
            json.dump(dico, outfile, indent = 4)

    def generate_all_executors(self):
        executors = self.get_workflow_main().get_all_executors_in_workflow()
        dico= {}
        for e in executors:
            dico[str(e)] = e.get_code(get_OG = True)
        with open(self.get_output_dir()/ 'test' /"all_executors.json", "w") as outfile: 
            json.dump(dico, outfile, indent = 4)

    def generate_executors_per_subworkflows(self):
        subs = self.get_subworkflows_called()
        dico= {}
        for s in subs:
            dico[str(s)]= {}
            executors = s.get_all_executors_in_workflow()
            for e in executors:
                dico[str(s)][str(e)] = e.get_code(get_OG = True)
        with open(self.get_output_dir()/ 'test' /"executors_per_subworkflows.json", "w") as outfile: 
            json.dump(dico, outfile, indent = 4)

    def generate_all_processes(self):
        processes = self.get_processes_called()
        dico= {}
        for p in processes:
            dico[str(p)] = p.get_code()
        with open(self.get_output_dir()/ 'test' /"all_processes.json", "w") as outfile: 
            json.dump(dico, outfile, indent = 4)

    def generate_all_subworkflows(self):
        subs = self.get_subworkflows_called()
        dico= {}
        for s in subs:
            dico[str(s)] = s.get_code()
        with open(self.get_output_dir()/ 'test' /"all_subworkflows.json", "w") as outfile: 
            json.dump(dico, outfile, indent = 4)
        
    def generate_all_test_data(self):
        self.generate_test_specification_graph()
        self.generate_all_executors()
        self.generate_all_processes()
        self.generate_all_subworkflows()
        self.generate_executors_per_subworkflows()

    


    #Returns a dico of number of processes called per each condition 
    #For example : {condition1: 14, condition2: 10, condition:3}
    #14 process calls depend on condition1
    #10 process calls depend on condition2
    #3 process calls depend on condition3
    def get_most_influential_conditions(self, show_values = True):
        if(self.get_duplicate_status()):
            most_influential_conditions = self.get_workflow_main().get_most_influential_conditions()
            #If show values then we replace the the conditions ids with their values
            if(show_values):
                most_influential_conditions_values = {}
                for condition in most_influential_conditions:
                    try:
                        t = most_influential_conditions_values[condition.get_value()]
                    except:
                        most_influential_conditions_values[condition.get_value()] = 0
                    most_influential_conditions_values[condition.get_value()] += most_influential_conditions[condition]
                most_influential_conditions = most_influential_conditions_values
            
            #Sort the dico
            most_influential_conditions = {k: v for k, v in sorted(most_influential_conditions.items(), key=lambda item: item[1], reverse=True)}
            return most_influential_conditions
        else:
            BioFlowInsightError("Need to activate 'duplicate' mode to use this method.")

    #When there are multiple emits turn them into one and the end of the call eg, instead of fastp_ch2 = fastp.out.fastp_ch2 -> have fastp_ch2 = fastp_ch
    def convert_to_DSL2(self):
        if(self.get_DSL()=="DSL2"):
            print("Workflow is already written in DSL2")
        else:
            #This tag is used as an identification to safely manipulate the string 
            tag = str(time.time())
            nextflow_file = self.get_first_file()

            code = nextflow_file.get_code()
            start_code = r"#!/usr/bin/env nextflow"
            start_code_pattern = r"\#\!\s*\/usr\/bin\/env\s+nextflow"
            end_code = "workflow.onComplete"
            
            pos_start, pos_end= 0, len(code)
            if(code.find(end_code)!=-1):
                pos_end = code.find(end_code)
            code_to_replace = code[pos_start:pos_end]
            for match in re.finditer(start_code_pattern, code):
                pos_start = match.span(0)[1]+1
            #if(code.find(start_code)!=-1):
            #    pos_start = code.find(start_code)+len(start_code)
            body = code[pos_start:pos_end]#.replace('\n', '\n\t')

            include_section = f"//INCLUDE_SECTION_{tag}"
            params_section = f"//PARAMS_SECTION_{tag}"
            function_section = f"//FUNCTION_SECTION_{tag}"
            process_section = f"//PROCESS_SECTION_{tag}"


            code = code.replace(code_to_replace, f"""{start_code}\n\n\n{include_section}\n\n\n{params_section}\n\n\n{function_section}\n\n\n{process_section}\n\n\nworkflow{{\n\n{body}\n}}\n\n""")

            ##I've out this in a comment cause since it's a DSL1 
            #params_list = []
            #for match in re.finditer(r"params.\w+ *\= *[^\n=]([^\n])*", code):
            #    params_list.append(match.group(0))
            #for params in params_list:
            #    code = code.replace(params, "")
            #params_code = "\n".join(params_list)
            #code = code.replace(params_section, params_code)

            #Moving Functions
            functions = []
            for f in nextflow_file.functions:
                function = f.get_code()
                functions.append(function)
            for r in functions:
                code = code.replace(r, "")
            code = code.replace(function_section, "\n\n".join(functions))

            #Moving Processes
            processes = []
            to_replace = []
            for p in nextflow_file.get_processes():
                new_process, call = p.convert_to_DSL2()
                processes.append(new_process)
                to_replace.append((p.get_code(get_OG = True), call))
            
            for r in to_replace:
                code = code.replace(r[0], r[1])
            code = code.replace(process_section, "\n\n".join(processes))

            #TODO -> update the operations -> also consider the operations in the params of the calls which need to be updated

            for o in self.get_workflow_main().get_all_executors_in_workflow():
                if(o.get_type()=="Operation"):
                    code = code.replace(o.get_code(get_OG=True), o.convert_to_DSL2())
                else:
                    raise Exception(f"Executor of type '{o.get_type()}' was extracted in a DSL1 workflow! This shoudn't happen! The code is '{o.get_code()}'")

            #Putting || back
            code = code.replace("$OR$", "||")
            #put_modified_operations_back
            #TODO -> add the other things necessary to reformat code
           
            #Somethimes this is incorrect but that's due to the fact that the DSL1 analysis isn't as clean as the DSL2 analyse (concerning the conditions)
            #What i mean that when searching for channels, DSL1 doesn't consider the conditions when searching from the processes while DSL2 does
            #The conversion works well but it's just comparing to the old DSL1 workflow doesn't make sense
            #If you want to put this line back you need #TODO update the DSL1 parsing to consider the blocks when defining the processes 
            #A good example is KevinMenden/hybrid-assembly
            self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=False, def_check_the_same = False)
            
            return code
    
    #This methods generates a random set of processes to consider as relavant 
    #It's not a uniform random it's a bit of a gaussian, centered at 0.5
    def generate_random_relevant_processes(self, alpha = -1):
        edges_create_cycles = self.graph.get_edges_that_create_cycle()
        import random

        #Random value between 0 and 1, centered at 0.5
        def get_value():
            #check = True
            #val = -1
            #while(check):
            #    check = False
            #    val = random.gauss(0.5, 0.1)
            #    if(val<0 or val>1):
            #        check = True
            val = random.random()
            return val

        if(self.duplicate):
            
            processes_called = []
            if(self.get_DSL()=="DSL2"):
                for c in self.get_workflow_main().get_all_calls_in_workflow():
                    p = c.get_first_element_called()
                    if(p.get_type()=="Process"):
                        processes_called.append(p)
            else:
                processes_called = self.get_first_file().get_processes()
            
            searching = True
            while(searching):
                searching = False
                if(alpha == -1):
                    alpha = get_value()
                else:
                    if(0<=alpha and alpha<=1):
                        None
                    else:
                        raise BioFlowInsightError("alpha is not in the interval [0; 1]")
                nb_2_select = int(alpha*len(processes_called))
                sampled = random.sample(set(processes_called), nb_2_select)
                
                sampled_str = []
                for s in sampled:
                    sampled_str.append(str(s))
                for e in edges_create_cycles:
                    if(e[0] in sampled_str and e[1] in sampled_str):
                        #So that means there are the 2 nodes which form the cycle edge in the relevant processes
                        #-> it means we need to regenerated relevant processes
                        searching = True
                        break
                
                name_select = []
                for p in sampled:
                    name_select.append(p.get_alias())
            return name_select
        else:
            raise BioFlowInsightError("Trying to generate random relevant processes however option 'duplicate' is not activated.")
    
    #Method that returns the order of execution for each executor
    def get_order_execution_executors(self):
        dico = {}
        seen = {}
        dico = self.get_workflow_main().get_order_execution_executors(dico, seen)
        tab = []
        def explore_dico(dico):
            if(type(dico)!=dict):
                None
            else:
                for val in dico:
                    tab.append(val)
                    explore_dico(dico[val])
        explore_dico(dico)

        return tab



    def add_to_ternary_operation_dico(self, old, new):
        self.ternary_operation_dico[new] = old

    def add_map_element(self, old, new):
        self.map_element_dico[new] = old

    def put_back_old_ternary_operations(self, code, ternary_operation_dico):
        for new in ternary_operation_dico:
            old = ternary_operation_dico[new]
            code = code.replace(new.strip(), old)
        return code
    
    def put_modified_operations_back(self, code, dico_operations):
        searching = True
        while(searching):
            searching = False
            for match in re.finditer(r"\.(\w+)_modified\s*\{\s*(¤[^¤]+¤)\s*\}", code):
                operator = match.group(1)
                inside = match.group(2)#Cause we want to remove the extras ...'''
                code = code.replace(match.group(0), f".{operator} {{ {dico_operations[inside]} }}")
                searching = True
                break
        return code


    #TODO -> write tests for this method
    #Function that rewrites the workflow code
    #Rewriting everything in one file + simplifying the operations and calls to simplify the analysis
    def simplify_workflow_code(self):
        code = self.get_first_file().get_code()
        #This tag is used as an identification to safely manipulate the string 
        tag = str(time.time())
        
        
        #params_section = f"//PARAMS_SECTION_{tag}"
        function_section = f"//FUNCTION_SECTION"
        process_section = f"//PROCESS_SECTION"
        subworkflow_section = f"//SUBWORKFLOW_SECTION"

        ankers = function_section+ "\n"*3 + process_section+ "\n"*3 + subworkflow_section

        

        #Place ankers
        pos_start = 0
        start_code_pattern = r"\#\!\s*\/usr\/bin\/env\s+nextflow"
        for match in re.finditer(start_code_pattern, code):
            pos_start = match.span(0)[1]+1
        code = code[:pos_start]+ankers+code[pos_start:]
        
        #Remove the includes
        for match in re.finditer(constant.FULL_INLCUDE_2, code):
            full_include = match.group(0)
            for temp in re.finditer(fr"{re.escape(full_include)} *addParams\(", code):
                raise BioFlowInsightError("There is an 'addParams' in an include. BioFlow-Insight doesn not how to rewrite this.")
            code = re.sub(fr"{re.escape(full_include)}.*", "", code)

        processes, subworkflows, functions = [], [], []
        for c in self.get_workflow_main().get_all_calls_in_workflow():
            ele = c.get_first_element_called()
            if(ele.get_type()=="Process"):
                processes.append(ele)
            elif(ele.get_type()=="Subworkflow"):
                subworkflows.append(ele)
            elif(ele.get_type()=="Function"):
                functions.append(ele)
            else:
                raise Exception("This shoudn't happen")
        
        #Get calls to functions made outside of themain which might have been imported -> so we need to add them
        for c in self.get_first_file().get_calls_made_outside_of_main():
            ele = c.get_first_element_called()
            if(ele.get_type()=="Function"):
                functions.append(ele)
            else:
                raise Exception("This shoudn't happen -> either a call to a process or subworkflow outside of main or subworkflow")

        #Simplifying main
        code = code.replace(self.get_workflow_main().get_code(get_OG = True), self.get_workflow_main().simplify_code())


        #Adding processes into code
        for p in processes:
            if(p.get_code_with_alias_and_id() not in code):
                code = code.replace(process_section, '\n'+p.simplify_code()+'\n'+process_section)

        #Adding subworkflows into code
        for sub in subworkflows:
            if(sub.get_code_with_alias_and_id() not in code):
                code = code.replace(subworkflow_section, subworkflow_section+'\n'+sub.simplify_code()+'\n')

        #Adding functions into code
        for fun in functions:
            if(fun.get_code() not in code):
                code = code.replace(function_section, function_section+'\n'+fun.get_code()+'\n')
        
        #Remove the ankers
        #code = code.replace(function_section, "")
        #code = code.replace(process_section, "")
        #code = code.replace(subworkflow_section, "")
        ankers = {"function_section":function_section,
                  "process_section":process_section,
                  "subworkflow_section":subworkflow_section}
        
        return code

    def get_subworkflows_called(self):
        subs = []
        for c in self.get_workflow_main().get_all_calls_in_workflow():
            ele = c.get_first_element_called()
            if(ele.get_type()=="Subworkflow"):
                subs.append(ele)
        return subs
    
    def get_processes_called(self):
        subs = []
        for c in self.get_workflow_main().get_all_calls_in_workflow():
            ele = c.get_first_element_called()
            if(ele.get_type()=="Process"):
                subs.append(ele)
        return subs


    def rewrite_and_initialise(self, code, processes_2_remove, render_graphs, def_check_the_same = True):
        temp_process_dependency_graph = self.graph.get_process_dependency_graph() 
        temp_spec_graph = self.graph.full_dico

        #Remove the "_GG_\d+"
        #code = re.sub(r"_GG_\d+", "", code)

        #Write new code in temporary file
        temp_file = self.get_output_dir()/f"temp_{str(self)[-7:-2]}.nf"
        with open(temp_file, "w") as file:
            file.write(code)
        
        f = open(self.get_output_dir()/ "debug" / "rewritten.nf", "w")
        f.write(code)
        f.close()

        #Replace old analysis with new analysis (simplified code)
        self.__init__(str(temp_file), display_info = False, duplicate=True, processes_2_remove=processes_2_remove)
        self.initialise()
        os.remove(temp_file)
        self.graph.initialise(processes_2_remove = self.processes_2_remove)
        if(def_check_the_same and not self.graph.check_if_process_dependendy_is_equivalent_to_other_without_subworkflows(temp_process_dependency_graph)):
            if(render_graphs==True):
                #generate_graph(self.get_output_dir()/ "debug" /"spec_graph_OG", temp_spec_graph, render_graphs = True)
                generate_graph(self.get_output_dir()/ "debug" /"spec_graph", self.graph.full_dico, render_graphs = True)
                #generate_graph(self.get_output_dir()/ "debug" /"process_dependency_graph_OG", temp_process_dependency_graph, render_graphs = True)
                generate_graph(self.get_output_dir()/ "debug" /"process_dependency_graph", self.graph.get_process_dependency_graph() , render_graphs = True)
            raise Exception("Something went wrong: The flat dependency graph is not the same!")


    def check_relevant_processes_in_workflow(self, relevant_processes):
        #Check all relevat processes are in wf
        workflow_processes = {}
        for c in self.get_workflow_main().get_all_calls_in_workflow():
            ele = c.get_first_element_called()
            if(ele.get_type()=="Process"):
                short_name = ele.get_alias().split("_GG_")[0]
                try:
                    temp = workflow_processes[short_name]
                except:
                    workflow_processes[short_name] = []
                workflow_processes[short_name].append(ele.get_alias())
        
        temporary_relevant = []
        for p in relevant_processes:
            if(p not in workflow_processes):
                raise BioFlowInsightError(f"The element '{p}' given as a relevant processes is not present in the workflow's processes", 24)
            temporary_relevant+=workflow_processes[p]
        relevant_processes = temporary_relevant
        return relevant_processes


    def generate_user_view(self, relevant_processes = [], render_graphs = True, processes_2_remove = []):
        self.graph.initialise(processes_2_remove = processes_2_remove)
        self.graph.generate_user_view(relevant_processes = relevant_processes, render_graphs = render_graphs)


    #I do not recommand that the dev uses the same name for the channels inside and outside the channels
    #Since the local channels become local at the upper level
    def rewrite_subworkflow_call(self, code, subworklfow):
        
        #Remove the defintion from the code
        code = code.replace(subworklfow.get_code(get_OG = True), "")
        OG_call = subworklfow.get_call()
        OG_body = subworklfow.get_work()

        #REPLACE HEADER TAKES
        subworkflow_takes = subworklfow.get_takes()
        parameters = OG_call.get_parameters()
        if(len(subworkflow_takes)!=len(parameters)):
            raise Exception("This shouldn't happen -> the same number of parameters should be kept")
        #This is to replace the paramters and the takes
        new_header = ""
        for i in range(len(parameters)):
            param = parameters[i]
            takes = subworkflow_takes[i].get_gives()[0]
            #Here we're checking that the input inside and outside the subworkflow are the same
            if(takes.get_code()!=param.get_code(get_OG = True)):
                new_header+=f"{takes.get_code()} = {param.get_code(get_OG = True)}\n"

        temp_code = code
        code = code.replace(OG_call.get_code(get_OG = True), f"{new_header}\n\n{OG_body}", 1)
        if(temp_code==code):
            raise Exception("Something went wrong: The code hasn't changed")

        #REPLACE THE EMITS
        #TODO admittedly this code below is very moche -> but it's functionnal -> update it 
        emits = subworklfow.get_emit()
        to_replace = []
        all_executors = self.get_workflow_main().get_all_executors_in_workflow()
        for exe in all_executors:
            #We don't need to check the case call since the workflow has already been rewriteen -> emits only appear in operations
            if(exe.get_type()=="Operation"):
                added = False
                new = exe.get_code(get_OG = True)
                for emited in exe.get_origins():
                    if(emited.get_type()=="Emitted"):
                        if(emited.get_emitted_by().get_first_element_called()==subworklfow):
                            if(emited.get_emits() not in emits):
                                raise Exception("This shoudn't happen -> since it is the actual subworkflow")
                            new = new.replace(emited.get_code(), emited.get_emits().get_origins()[0].get_code())
                            added = True
                            #to_replace.append((exe.get_code(get_OG = True), f"{exe.get_gives()[0].get_code()} = {emited.get_emits().get_origins()[0].get_code()}"))
                if(added):
                    to_replace.append((exe.get_code(get_OG = True), new))
        #This dictionnary is used to check if the replacement has already been done (in the case of dupliactes in new)
        dico_replace = {}
        for r in to_replace:
            old, new = r
            need_to_replace = True
            try:
                t = dico_replace[old]
                if(t==new):
                    need_to_replace = False
                else:
                    raise Exception("This shouldn't happen")
            except:
                dico_replace[old]= new
            
            if(need_to_replace):
                temp_code = code
                #Case of channel = channel
                if(new.find("=")!=-1):
                    if(new.split("=")[0].strip()==new.split("=")[1].strip()):
                        new = ''
                #code = code.replace(old, new)
                code = replace_group1(code, fr"({re.escape(old)})[^\w]", new)
                if(temp_code==code):
                    #print(code)
                    #print("old", f'"{old}"')
                    #print("new", f'"{new}"')
                    raise Exception("Something went wrong: The code hasn't changed")
        
        return code


    #This Function returns the channels on which the subworkflow (things_added_in_cluster) depend on
    def get_takes(self, things_added_in_cluster):
        #Basiccaly this is a deco of channels to opeartions -> when the value is an empty list 
        #This means that the channel is totally definied in the subworkflow -> so we are searching for 
        #Channels which aren't totatly defined in the subworkflow 
        channels_2_sources = {}

        for ele in things_added_in_cluster:
            if(ele.get_type() == "Operation"):
                for o in ele.get_origins():
                    if(o.get_type() in ["Channel", "Emitted"]):
                        channels_2_sources[o] = replace_thing_by_call(o.get_source())
                    else:
                        if(o.get_first_element_called().get_type()=="Function"):
                            None
                        else:
                            raise Exception("This shouldn't happen")
            elif(ele.get_type() == "Call"):
                for param in ele.get_parameters():
                    if(param.get_type()=="Channel"):
                        raise Exception("This shouldn't happen -> with the rewrite all the params should be channels")
                    else: 
                        for o in param.get_origins():
                            if(o.get_type()=="Channel"):
                                channels_2_sources[o] = replace_thing_by_call(o.get_source())
                            else:
                                raise Exception("This shouldn't happen -> with the rewrite all the params should be channels")
            else:
                raise Exception("This shouldn't happen")
            
        takes = []
        names_added = []
        for channel in channels_2_sources:
            if(set(channels_2_sources[channel]).intersection(things_added_in_cluster)!=set(channels_2_sources[channel])):
                if(channel.get_name() not in names_added):
                    takes.append(channel)
                    names_added.append(channel.get_name())
        return takes
    
    #This Function returns the channels the subworkflow (things_added_in_cluster) emits (other things depend on)
    def get_emits(self, things_added_in_cluster):
        channel_2_sink = {}
        #Basiccaly this is a deco of channels to opea -> this doesn't really work yetrtions -> when the value is an empty list 
        #This means that the channel is totally definied in the subworkflow -> so we are searching for 
        #Channels which aren't totatly defined in the subworkflow
        #This means that things outside the subworkflow depend on this channel 
        channel_2_sink = {}

        for ele in things_added_in_cluster:
            if(ele.get_type() == "Operation"):
                for o in ele.get_gives():
                    channel_2_sink[o] = replace_thing_by_call(o.get_sink())
            elif(ele.get_type() == "Call"):
                #thing = ele.get_first_element_called()
                for e in ele.get_later_emits():
                    channel_2_sink[e] = replace_thing_by_call(e.get_sink())
            else:
                raise Exception("This shouldn't happen")

        emits = []  
        names_added = []
        for channel in channel_2_sink:
            if(set(channel_2_sink[channel]).intersection(things_added_in_cluster)!=set(channel_2_sink[channel])):
                if(channel.get_name() not in names_added):
                    emits.append(channel)
                    names_added.append(channel.get_name())
        return emits


    def remove_GG_from_code(self, code):
        def replacer(match):
            return match.group(1)
        return re.sub(f"(\w+)_GG_\d+", replacer, code)
    
    #Method which rewrites the workflow follwong the user view
    #Conert workflow to user_view only makes sense when the option duplicate is activated -> otherwise is doesn't make sense + it makes the analysis way more complicated
    def convert_workflow_2_user_view(self, relevant_processes = [], render_graphs = True):
        self.iniatilise_tab_processes_2_remove()
        self.graph.initialise(processes_2_remove = self.processes_2_remove)

        def get_object(address):
            address = int(re.findall(r"\dx\w+", address)[0], base=16)
            return ctypes.cast(address, ctypes.py_object).value
        #Check that there are no cycles which will break the creation of the user view:
        edges_create_cycles = self.graph.get_edges_that_create_cycle()
        edges_create_cycles_objects = []
        for e in edges_create_cycles:
            edges_create_cycles_objects.append((get_object(e[0]), get_object(e[1])))
        for e in edges_create_cycles_objects:
            n1 = e[0].get_alias()
            n2 = e[1].get_alias()
            if(n1 in relevant_processes and n2 in relevant_processes):
                raise BioFlowInsightError(f"The processes '{n1}' and '{n2}' cannot both be relevant processes since there is a dependency apparant in the workflow between the 2")

        ternary_operation_dico = self.ternary_operation_dico
        map_element_dico = self.map_element_dico
        
        if(self.duplicate): 
            #First check if there are any duplicate operations
            #That method is in the "get_order_execution_executors" method -> so we just run that first
            self.get_order_execution_executors()  

            if(self.get_DSL()=="DSL1"):
                code = self.convert_to_DSL2()
                self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=render_graphs, def_check_the_same = False)

            if(self.get_DSL()=="DSL2"):
                code = self.simplify_workflow_code()
                self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=render_graphs)
                    
            
            #DETERMING WHICH SUBWORKFLOWS ARE BROKEN WITH THE CLUSTER
            def get_clusters_with_calls(clusters):
                #Creating the clusters with calls instead of processes or subworkflows
                set_clusters_with_calls = []
                for c in clusters:
                    tab = []
                    for ele in c:
                        if(ele.get_type()=="Operation"):
                            if(ele.get_artificial_status()==False):
                                tab.append(ele)
                        else:
                            call = ele.get_call()
                            tab.append(call)
                    set_clusters_with_calls.append(set(tab))
                return set_clusters_with_calls

            
            #Getting subworkflows to executors
            def get_subworkflow_2_executors():
                subworkflow_2_executors = {}
                for sub in self.get_subworkflows_called():
                    executors = sub.get_all_executors_in_workflow()
                    subworkflow_2_executors[sub] = []
                    for ele in executors:
                        #Cannot add calls to subworkflows -> they are already present by definition
                        if(ele.get_type()=="Call" and ele.get_first_element_called().get_type()=="Subworkflow"):
                            None
                            #We don't add it
                        else:
                            subworkflow_2_executors[sub].append(ele)
                return subworkflow_2_executors
                #subworkflow_2_executors[sub.get_name()] = set(list(dico.keys()))

            #TODO -> write tests to test this function
            def get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls):
                broken_subworkflows = []
                for sub in subworkflow_2_executors:
                    #You want this list (set) to be equal to subworkflow_2_executors[sub]
                    elements_in_sub_with_clusters = []
                    for cluster in set_clusters_with_calls:
                        if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])):
                            break
                        for c in cluster:
                            if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])):
                                break
                            if(c in subworkflow_2_executors[sub]):
                                elements_in_sub_with_clusters+=list(cluster)
                                break
                    if(set(elements_in_sub_with_clusters)==set(subworkflow_2_executors[sub])):
                        None
                        #This means that the subworkflow is Intact
                    else:
                        #This means that the subworkflow is broken
                        broken_subworkflows.append(sub)
                return broken_subworkflows
            
            #Get the clusters and the code
            relevant_processes = self.check_relevant_processes_in_workflow(relevant_processes)
            self.generate_user_view(relevant_processes = relevant_processes, processes_2_remove =  [], render_graphs=render_graphs)
            clusters = self.graph.get_clusters_from_user_view()

            broken_subworkflows = get_workflows_broken(get_subworkflow_2_executors(), get_clusters_with_calls(clusters))
            #While there still are broken workflows -> need to redo the analysis
            while(len(broken_subworkflows)>0):
                #Rewrite broken subworkflows
                sub = broken_subworkflows[0]
                code = self.rewrite_subworkflow_call(code, sub)
                self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=render_graphs)
                #Get the clusters and the code
                #TODO -> remove the generate all_graphs -> it is not necessary 
                if(render_graphs):
                    self.generate_all_graphs()
                self.generate_user_view(relevant_processes = relevant_processes, processes_2_remove =  [], render_graphs=render_graphs)
                clusters = self.graph.get_clusters_from_user_view()
                broken_subworkflows = get_workflows_broken(get_subworkflow_2_executors(), get_clusters_with_calls(clusters))
            


            #Get the clsuters with the corresponding operations inside
            #for i in range(len(clusters)):
            #    c = clusters[i]
            #    if(len(c)>1):
            #        clusters[i] = self.nextflow_file.graph.get_induced_subgraph(c)
            #Get the topological order
            executors_in_order = self.get_order_execution_executors()
            new_clusters = []


            for cluster in clusters:
                tab = []
                for e in executors_in_order:
                    if(e in cluster):
                        tab.append(e)
                new_clusters.append(tab)
            clusters = new_clusters
            


            

            #This function returns the last executor in the clusters
            #This is used to place the anker
            def get_last_executor_in_cluster(executors_in_order, clusters):
                dico = {}
                for cluster in clusters:
                    for ele in cluster:
                        dico[ele] = executors_in_order.index(ele)
                for ele in {k: v for k, v in sorted(dico.items(), key=lambda item: item[1], reverse=True)}:
                    return ele

            #Replace the last executor in the clusters by the cluster anker
            last_executor_in_cluster = get_last_executor_in_cluster(executors_in_order, clusters)
            if(last_executor_in_cluster.get_type()=="Process"):
                call = last_executor_in_cluster.get_call()    
                code = code.replace(call.get_code(get_OG = True), "\n//Anker_clusters\n")
            elif(last_executor_in_cluster.get_type()=="Operation"):
                if(not last_executor_in_cluster.get_artificial_status()):
                    code = code.replace(last_executor_in_cluster.get_code(get_OG = True), "\n//Anker_clusters\n", 1)
                else:
                    raise Exception("This shoudn't happen")
            else:
                    raise Exception("This shoudn't happen")


            #Removing elements from clusters from the code
            for cluster in clusters:
                for ele in cluster:
                    if(ele.get_type()=="Process"):
                        call = ele.get_call()    
                        code = code.replace(call.get_code(get_OG = True), "")
                    elif(ele.get_type()=="Operation"):
                        if(not ele.get_artificial_status()):
                            code = code.replace(ele.get_code(get_OG = True), "", 1)
                        else:
                            raise Exception("This shoudn't happen")
                    else:
                            raise Exception("This shoudn't happen")

            #Remove the empty conditions left in the code
            code = remove_empty_conditions_place_anker(code, self)


            #Add the subworkflow defintions
            #-------------------------------------
            #Adding the anker
            subworkflow_section = f"//ANKER 4 SUBWORKFLOW DEF"
            to_replace = ""
            for match in re.finditer(r"workflow\s+\w*\s*\{", code):
                to_replace = match.group(0)
                break
            if(to_replace==""):
                for match in re.finditer(r"workflow\s*\{", code):
                    to_replace = match.group(0)
                    break
            if(to_replace==""):
                raise Exception("No call to a workflow")
            code = code.replace(to_replace, f"{subworkflow_section}\n\n{to_replace}")

            #Creating the subworkflows from clusters
            calls_in_operations = []
            non_relevant_name = 1
            
            subworkflow_clusters_to_add, subworkflow_cluster_calls_to_add = [], []
            index_cluster = len(clusters)
            #We replace the last clusters first -> this is cause the outputs of the last clusters aren't used anywhere else in the workflow by definition 
            for elements in list(reversed(clusters)):

                channels_to_replace_outside_of_cluster = []

                #Check that there is at least one process in cluster
                at_least_one_process = False
                for e in elements:
                    if(e.get_type()=="Process"):
                        at_least_one_process = True

                #Only create the subworkflows for clusters with onr more elements (and that element in a process)
                processes_added = []
                things_added_in_cluster = []
                if(len(elements)>=1 and at_least_one_process):
                    name, body, take, emit = "", "", "", ""
                    first_element = True

                    for ele in elements:
            
                        if(ele.get_type()=="Process"):
                            
                            #Determine the name of the created subworkflow cluster
                            if(ele.get_alias() in relevant_processes):
                                name = f"cluster_{ele.get_alias()}"
                            #Get the call of thing (either process or subworkflow)
                            call = ele.get_call()
                            
                            processes_added.append(call.get_first_element_called())
                            values = []
                            for condition in call.get_all_conditions():
                                values.append(condition.get_value())
                            printed_condition = " && ".join(values)
                            if(printed_condition!=""):
                                body+=f"if({printed_condition}) {{\n{call.get_code()}\n}}\n"
                            else:
                                body+=f"\n{call.get_code()}\n"
                            things_added_in_cluster.append(call)
    
                        #Below
                        elif(ele.get_type()=="Operation"):
                            #TODO -> check this verification there might be some "effet de bord"
                            if(not ele.get_artificial_status()):
            
             
                                #Ignore these cases
                                #TODO -> you should be able to remove this
                                if(ele.get_code()[:4] not in ["emit", "take"]):
                                    origins = ele.get_origins()
                                    for o in origins:
                                        if(o.get_type()=="Call"):
                                            if(o.get_first_element_called().get_type()!="Function"):
                                                calls_in_operations.append(o)
                                    
                                    values = []
                                    for condition in ele.get_all_conditions():
                                        values.append(condition.get_value())
                                    printed_condition = " && ".join(values)

                                    if(printed_condition!=""):
                                        body+=f"if({printed_condition}) {{\n{ele.get_code(get_OG = True)}\n}}\n"
                                    else:
                                        body+=f"\n{ele.get_code(get_OG = True)}\n"
                                things_added_in_cluster.append(ele)
                        else:
                            raise Exception("This shoudn't happen")
                    
                     
                    #TODO check this part of the code is never seen
                    #Here we removing the Call_12313 thing
                    for call in calls_in_operations:
                        raise Exception("This shouldn't happen since the workflows has been rewritten")
                        body = body.replace(call.get_code(), "")
                        body = body.replace(str(call), call.get_code())
            
                     
                    #If the name=="" -> it means there isn't any relevant processes in the cluster -> it means it's a cluster of non relevant nodes
                    if(name==""):
                        #If there are processes called we are going to use them
                        if(len(processes_added)>0):
                            #TODO find a better naming system
                            name = f"non_relevant_cluster_{processes_added[0].get_alias()}"
                        else:
                            #TODO find a better naming system
                            name = f"non_relevant_cluster_{non_relevant_name}"
                            non_relevant_name+=1

                    #Check that there is a single condtion in the body
                    body = group_together_ifs(body)
                    conditions_in_subworkflow = []
                    temp_body = body
                    conditions_in_subworkflow_2 = extract_conditions(temp_body, only_get_inside = False)
                    
                    if(len(conditions_in_subworkflow_2)==1):
                        for condition in conditions_in_subworkflow_2:
                            start, end = conditions_in_subworkflow_2[condition]
                            temp_body = temp_body.replace(temp_body[start: end], "")
                        #This means that there is only one codnition with all the executors in the condition
                        if(temp_body.strip()==""):
                            conditions_in_subworkflow = extract_conditions(body)
                            for condition in conditions_in_subworkflow:
                                start, end = conditions_in_subworkflow[condition]
                                body = body[start: end-1].strip()
                                conditions_in_subworkflow = list(conditions_in_subworkflow.keys())


                    #TAKE
                    #Adding take parameters on the inside of the subworkflow
                    takes_param = self.get_takes(things_added_in_cluster)
                    new_param_names, index, old_param_names = [], 1, []
                    for param in takes_param:
                        param_name = f"param_{name}_{index}"
                        #Here if the input is a channel -> we keep the same name for readibility
                        #It also solves a bug described on the 18/02/2025
                        if(param.get_type()!='Channel'):
                            new_param_names.append(param_name)
                            old_param_names.append(param.get_code())
                        else:
                            new_param_names.append(param.get_code())
                            old_param_names.append(param.get_code())
                        index += 1
                    if(len(new_param_names)>0):
                        temp = '\n'.join(new_param_names)
                        take = f"\ntake:\n{temp}\n"
                    
                    #EMIT
                    #Adding the emitted outputs
                    emitted_outputs = self.get_emits(things_added_in_cluster)
                    new_output_names, index, old_output_names = [], 0, []
                    for output in emitted_outputs:
                        output_name = f"{name}.out[{index}]"
                        new_output_names.append(output_name)
                        old_output_names.append(output.get_code())
                        index += 1
                    
                    if(len(old_output_names)>0):
                        temp = '\n'.join(old_output_names)
                        emit = f"\nemit:\n{temp}\n"
                    
                    
                    #Adding empty channels if it doesn't exist in the case of a negative condition
                    body = get_channels_to_add_in_false_conditions(body, old_output_names)
                    
                    
                    #Replace names inside subworkflow
                    subworkflow_code = f"workflow {name} {{\n{take}\nmain:\n{body}\n{emit}\n}}"

                    #We do this so that the longest thing are rewritten first in the code -> to avoid problems
                    takes_param_2_length = {}
                    takes_param_2_new_param_names = {}
                    for i in range(len(new_param_names)):
                        takes_param_2_new_param_names[takes_param[i].get_code()] = new_param_names[i]
                        takes_param_2_length[takes_param[i].get_code()] = len(takes_param[i].get_code())
                    sorted_takes_param_2_length = {k: v for k, v in sorted(takes_param_2_length.items(), key=lambda item: item[1], reverse=True)}
                    for take_param in sorted_takes_param_2_length:
                        new_param = takes_param_2_new_param_names[take_param]
                        if(take_param != new_param):
                            #pattern = fr"[\=\,\(] *({re.escape(takes_param[i].get_code())})[\s\,\)\.]"
                            pattern = fr"({re.escape(take_param)})[\s\,\)\.]"
                            temp = subworkflow_code
                            subworkflow_code = replace_group1(subworkflow_code, pattern, new_param)
                            if(temp==subworkflow_code):
                                print(take_param, new_param)
                                print(pattern)
                                print(f'"{subworkflow_code}"')
                                raise Exception("Something went wrong -> cause the paramter wasn't updated")
                        #subworkflow_code = subworkflow_code.replace(takes_param[i].get_code(), new_param_names[i])
                    
                    #TODO -> added verification of conditions 
                    params = ", ".join(old_param_names)
                    subworkfow_call_case_true = f"{name}({params})"
                    subworkfow_call_case_false = ""
                    for i in range(len(new_output_names)):
                        #In the case of channels, we just add chanel = subworkflow.out[i]
                        if(not bool(re.findall("\.\s*out", old_output_names[i]))):
                            subworkfow_call_case_true+=f"\n{old_output_names[i]} = {new_output_names[i]}"
                            subworkfow_call_case_false+=f"\n{old_output_names[i]} = Channel.empty()"
                        #In the case of emitted values we need to replace the code on the outside
                        else:
                            param_out_name= f"{name}_out_{i+1}"
                            subworkfow_call_case_true+=f"\n{param_out_name} = {new_output_names[i]}"
                            subworkfow_call_case_false+=f"\n{param_out_name} = Channel.empty()"
                            channels_to_replace_outside_of_cluster.append((old_output_names[i], param_out_name))
                    #If there was only one single condition in the subworkflow cluster -> then we add it when the call is done
                    if(len(conditions_in_subworkflow)==1):
                        subworkfow_call = f"if({conditions_in_subworkflow[0].split('$$__$$')[0]}) {{\n{subworkfow_call_case_true}\n}} else {{\n{subworkfow_call_case_false}\n}}"
                    else:
                        subworkfow_call = subworkfow_call_case_true

                    ##TODO -> added verification of conditions 
                    #params = ", ".join(old_param_names)
                    #subworkfow_call_case_true = f"{name}({params})"
                    #for i in range(len(new_output_names)):
                    #    #In the case of channels, we just add chanel = subworkflow.out[i]
                    #    if(not bool(re.findall("\.\s*out", old_output_names[i]))):
                    #        subworkfow_call_case_true+=f"\n{old_output_names[i]} = {new_output_names[i]}"
                    #    #In the case of emitted values we need to replace the code on the outside
                    #    else:
                    #        param_out_name= f"{name}_out_{i+1}"
                    #        subworkfow_call_case_true+=f"\n{param_out_name} = {new_output_names[i]}"
                    #        channels_to_replace_outside_of_cluster.append((old_output_names[i], param_out_name))
                    ##If there was only one single condition in the subworkflow cluster -> then we add it when the call is done
                    #if(len(conditions_in_subworkflow)==1):
                    #    subworkfow_call = f"if({conditions_in_subworkflow[0].split('$$__$$')[0]}) {{\n{subworkfow_call_case_true}\n}}"
                    #else:
                    #    subworkfow_call = subworkfow_call_case_true
                    
                    
                    #subworkflow_clusters_to_add.append(subworkflow_code)
                    #subworkflow_cluster_calls_to_add.append(subworkfow_call)

                    #Add the subworkflow call
                    new_code = f"//Anker_clusters\n\n//Cluster_{index_cluster}\n{subworkfow_call}\n"
                    code = code.replace("//Anker_clusters", new_code)

                    for old, new in channels_to_replace_outside_of_cluster:
                        pattern= fr"[ \(,]({re.escape(old)})[^\w]"
                        code = replace_group1(code, pattern, new)
                        #code = code.replace(old, new)

                    #Add the subworkflow defintions
                    #-------------------------------------                
                    code = code.replace(f'{subworkflow_section}', f"{subworkflow_code}\n\n{subworkflow_section}")
                    
                
                else:
                    body = ""
                    for ele in elements:
                        values = []
                        for condition in ele.get_all_conditions():
                            values.append(condition.get_value())
                        printed_condition = " && ".join(values)

                        if(printed_condition!=""):
                            body+=f"if({printed_condition}) {{\n{ele.get_code(get_OG = True)}\n}}\n"
                        else:
                            body+=f"\n{ele.get_code(get_OG = True)}\n"
                    new_code = f"//Anker_clusters\n\n//Cluster_{index_cluster}\n{body}\n"
                    code = code.replace("//Anker_clusters", new_code)
                index_cluster-=1


    
            
            #Putting || back
            code = self.put_back_old_ternary_operations(code, ternary_operation_dico)
            code = code.replace("$OR$", "||")
            code = self.put_modified_operations_back(code, map_element_dico)
            code = remove_extra_jumps(format_with_tabs(code))
            #code = self.remove_GG_from_code(code)
            f = open(self.get_output_dir()/ "debug" / "rewritten.nf", "w")
            f.write(code)
            f.close()
            self.rewrite_and_initialise(code, self.processes_2_remove, render_graphs=render_graphs)
            return code
            #return code
            #
            ##So basically when retriving a thing (process or subworkflow)
            ##There is necessarily one call associated with the thing -> since we have the option duplicate activated
            ##You can check if len(thing.call)==1 -> and if not raise an error (but this shouldn't happen)
        else:
            raise BioFlowInsightError("Trying to convert the workflow with user view however option 'duplicate' is not activated. -> to reformulated")