Skip to content
Snippets Groups Projects
workflow.py 45.15 KiB
#Import dependencies
#Local
from .nextflow_file import Nextflow_File
from .ro_crate import RO_Crate
from . import constant
from .outils import is_git_directory, format_with_tabs, replace_thing_by_call, replace_group1, group_together_ifs, extract_curly, remove_extra_jumps, get_channels_to_add_in_false_conditions
from .outils_graph import flatten_dico, initia_link_dico_rec, get_number_cycles
from .outils_annotate import get_tools_commands_from_user_for_process
from .bioflowinsighterror import BioFlowInsightError
from .graph import Graph

#Outside packages
import os
import re
import json
from pathlib import Path
import glob
import ctypes
import time



class Workflow:
    """
    This is the main workflow class, from this class, workflow analysis can be done.
    After analysis, workflow structure reconstruction can be done.

    Attributes:
        file: A string indicating the address to the workflow main or the directory containing the workflow
        duplicate: A boolean indicating if processes are to be duplicated in the structure
        display_info: A boolean indicating if the analysis information should be printed
        output_dir: A string indicating where the results will be saved
        name: A string indicating the name of the workflow
        processes_2_remove: A string indicating the processes to remove from the workflow
    """

    def __init__(self, file, duplicate=False, display_info=True, output_dir = './results',
                 name = None, processes_2_remove = None):
        
        
        #Getting the main nextflow file
        if(not os.path.isfile(file)):
            nextflow_files = glob.glob(f'{file}/*.nf')
            if(len(nextflow_files)==0):
                raise BioFlowInsightError("No Nextflow files ('.nf') are in the directory!", num = -1)
            txt = ""
            #Try to read the main.nf file -> if this cannot be found then the first nextflow file is used
            try:
                
                file = file+"/main.nf"
                with open(file, 'r') as f:
                    txt= f.read()
            except:
                None
                #raise BioFlowInsightError("No 'main.nf' file found at the root of the prohject")
            if(txt==""):
                if(len(nextflow_files)==1):
                    file = nextflow_files[0]
                    with open(file, 'r') as f:
                        txt= f.read()
                else:
                    raise BioFlowInsightError("Multiple Nextflow files found at the root with no 'main.nf' file: I don't know which one to select")


        
        self.duplicate = duplicate
        self.display_info = display_info
        self.processes_2_remove = processes_2_remove
        self.output_dir = Path(output_dir)
        self.nextflow_files = []
        self.workflow_directory = '/'.join(file.split('/')[:-1])
        self.name = name
        self.graph = None
        self.conditions_2_ignore = []


        OG_file = Nextflow_File(file, workflow = self, first_file = True)
        self.DSL = OG_file.find_DSL()
        self.create_empty_results()
        if(self.display_info):
            print(f"Workflow is written in {self.DSL}")
        

    def create_empty_results(self):
        os.makedirs(self.output_dir, exist_ok=True)
        os.makedirs(self.output_dir / 'debug', exist_ok=True)
        os.makedirs(self.output_dir / 'graphs', exist_ok=True)

        with open(self.output_dir / "debug" / "operations.nf",'w') as file:
            pass
        with open(self.output_dir / "debug" / "calls.nf",'w') as file:
            pass
        with open(self.output_dir / "debug" / "operations_in_call.nf",'w') as file:
            pass

    def get_root_directory(self):
        first_file = self.get_first_file()
        return '/'.join(str(first_file.get_file_address()).split('/')[:-1])+"/"

    def get_conditions_2_ignore(self):
        return self.conditions_2_ignore

    def get_duplicate_status(self):
        return self.duplicate

    def get_output_dir(self):
        return Path(self.output_dir)

    def get_DSL(self):
        return self.DSL
    
    def get_display_info_bool(self):
        return self.display_info
    
    def set_DSL(self, DSL):
        self.DSL = DSL

    def get_first_file(self):
        for file in self.nextflow_files:
            if(file.first_file):
                return file
            
    def get_workflow_main(self):
        return self.get_first_file().main

    def add_nextflow_file_2_workflow(self, nextflow_file):
        self.nextflow_files.append(nextflow_file)
        self.nextflow_files = list(set(self.nextflow_files))

    def initialise(self, conditions_2_ignore = []):
        """Method that initialises the analysis of the worflow

        Keyword arguments:
        
        """
        self.conditions_2_ignore = conditions_2_ignore
        #Right now i'm just gonna do everything in DSL2

        #At this point there should only be one nextflow file
        if(len(self.nextflow_files)==1):
            self.nextflow_files[0].initialise()
        else:
            raise BioFlowInsightError("This souldn't happen. There are multiple Nextflow files composing the workflow before the analysis has even started.")
        
        
        if(self.display_info):
            citation = """\nTo cite BioFlow-Insight, please use the following publication:
George Marchment, Bryan Brancotte, Marie Schmit, Frédéric Lemoine, Sarah Cohen-Boulakia, BioFlow-Insight: facilitating reuse of Nextflow workflows with structure reconstruction and visualization, NAR Genomics and Bioinformatics, Volume 6, Issue 3, September 2024, lqae092, https://doi.org/10.1093/nargab/lqae092"""
            print(citation)

        if(self.graph==None):
            self.graph = Graph(self)



    def iniatilise_tab_processes_2_remove(self):
        if(self.processes_2_remove==None):
            tab_processes_2_remove = []
            if(self.processes_2_remove!=None):
                temp = self.processes_2_remove.split(",")
                for t in temp:
                    tab_processes_2_remove.append(t.strip())
            self.processes_2_remove = tab_processes_2_remove


    def get_structure(self):
        dico = {}
        dico['nodes'] = []
        dico['edges'] = []
        dico['subworkflows'] = {}

        if(self.get_DSL() == "DSL1"):
            main = self.get_workflow_main()
            if(main!=None):
                return main.get_structure(dico)
        elif(self.get_DSL() == "DSL2"):
            main = self.get_workflow_main()
            if(main!=None):
                return main.get_structure(dico)
            else:
                return dico
            #return self.get_structure_DSL2(dico=dico, start = True)
        else:
            raise Exception(f"The workflow's DSL is '{self.DSL}' -> I don't know what this is!")

    #################
    #    GRAPHS
    #################

    def generate_specification_graph(self, render_graphs = True):
        self.iniatilise_tab_processes_2_remove()
        self.graph.initialise(processes_2_remove = self.processes_2_remove)
        self.graph.get_specification_graph(render_graphs = render_graphs)

    #TODO -> update this
    def generate_all_graphs(self, render_graphs = True):
        self.generate_specification_graph(render_graphs = render_graphs)

    #Method that checks if a given graph sepcification is an isomorphism with the workflows
    def check_if_equal(self, file):
        self.iniatilise_tab_processes_2_remove()
        return self.graph.check_if_equal(file, processes_2_remove = self.processes_2_remove)

    ###########################
    #    Generate test data
    ###########################
    #These are the methods which generate the test data

    def generate_test_specification_graph(self):
        dico = self.graph.get_full_dico()
        with open(self.get_output_dir()/ 'test' /"specification_graph.json", "w") as outfile: 
            json.dump(dico, outfile, indent = 4)

    def generate_all_executors(self):
        executors = self.get_workflow_main().get_all_executors_in_workflow()
        dico= {}
        for e in executors:
            dico[str(e)] = e.get_code(get_OG = True)
        with open(self.get_output_dir()/ 'test' /"all_executors.json", "w") as outfile: 
            json.dump(dico, outfile, indent = 4)

    def generate_executors_per_subworkflows(self):
        subs = self.get_subworkflows_called()
        dico= {}
        for s in subs:
            dico[str(s)]= {}
            executors = s.get_all_executors_in_workflow()
            for e in executors:
                dico[str(s)][str(e)] = e.get_code(get_OG = True)
        with open(self.get_output_dir()/ 'test' /"executors_per_subworkflows.json", "w") as outfile: 
            json.dump(dico, outfile, indent = 4)

    def generate_all_processes(self):
        processes = self.get_processes_called()
        dico= {}
        for p in processes:
            dico[str(p)] = p.get_code()
        with open(self.get_output_dir()/ 'test' /"all_processes.json", "w") as outfile: 
            json.dump(dico, outfile, indent = 4)

    def generate_all_subworkflows(self):
        subs = self.get_subworkflows_called()
        dico= {}
        for s in subs:
            dico[str(s)] = s.get_code()
        with open(self.get_output_dir()/ 'test' /"all_subworkflows.json", "w") as outfile: 
            json.dump(dico, outfile, indent = 4)
        
    def generate_all_test_data(self):
        self.generate_test_specification_graph()
        self.generate_all_executors()
        self.generate_all_processes()
        self.generate_all_subworkflows()
        self.generate_executors_per_subworkflows()

    


    #Returns a dico of number of processes called per each condition 
    #For example : {condition1: 14, condition2: 10, condition:3}
    #14 process calls depend on condition1
    #10 process calls depend on condition2
    #3 process calls depend on condition3
    def get_most_influential_conditions(self, show_values = True):
        if(self.get_duplicate_status()):
            most_influential_conditions = self.get_workflow_main().get_most_influential_conditions()
            #If show values then we replace the the conditions ids with their values
            if(show_values):
                most_influential_conditions_values = {}
                for condition in most_influential_conditions:
                    try:
                        t = most_influential_conditions_values[condition.get_value()]
                    except:
                        most_influential_conditions_values[condition.get_value()] = 0
                    most_influential_conditions_values[condition.get_value()] += most_influential_conditions[condition]
                most_influential_conditions = most_influential_conditions_values
            
            #Sort the dico
            most_influential_conditions = {k: v for k, v in sorted(most_influential_conditions.items(), key=lambda item: item[1], reverse=True)}
            return most_influential_conditions
        else:
            BioFlowInsightError("Need to activate 'duplicate' mode to use this method.")

    #When there are multiple emits turn them into one and the end of the call eg, instead of fastp_ch2 = fastp.out.fastp_ch2 -> have fastp_ch2 = fastp_ch
    def convert_to_DSL2(self):
        if(self.get_DSL()=="DSL2"):
            print("Workflow is already written in DSL2")
        else:
            #This tag is used as an identification to safely manipulate the string 
            tag = str(time.time())
            nextflow_file = self.get_first_file()

            code = nextflow_file.get_code()
            start_code = r"#!/usr/bin/env nextflow"
            start_code_pattern = r"\#\!\s*\/usr\/bin\/env\s+nextflow"
            end_code = "workflow.onComplete"
            
            pos_start, pos_end= 0, len(code)
            if(code.find(end_code)!=-1):
                pos_end = code.find(end_code)
            code_to_replace = code[pos_start:pos_end]
            for match in re.finditer(start_code_pattern, code):
                pos_start = match.span(0)[1]+1
            #if(code.find(start_code)!=-1):
            #    pos_start = code.find(start_code)+len(start_code)
            body = code[pos_start:pos_end]#.replace('\n', '\n\t')

            include_section = f"//INCLUDE_SECTION_{tag}"
            params_section = f"//PARAMS_SECTION_{tag}"
            function_section = f"//FUNCTION_SECTION_{tag}"
            process_section = f"//PROCESS_SECTION_{tag}"


            code = code.replace(code_to_replace, f"""{start_code}\n\n\n{include_section}\n\n\n{params_section}\n\n\n{function_section}\n\n\n{process_section}\n\n\nworkflow{{\n\n{body}\n}}\n\n""")

            ##I've out this in a comment cause since it's a DSL1 
            #params_list = []
            #for match in re.finditer(r"params.\w+ *\= *[^\n=]([^\n])*", code):
            #    params_list.append(match.group(0))
            #for params in params_list:
            #    code = code.replace(params, "")
            #params_code = "\n".join(params_list)
            #code = code.replace(params_section, params_code)

            #Moving Functions
            functions = []
            for f in nextflow_file.functions:
                function = f.get_code()
                functions.append(function)
            for r in functions:
                code = code.replace(r, "")
            code = code.replace(function_section, "\n\n".join(functions))

            #Moving Processes
            processes = []
            to_replace = []
            for p in nextflow_file.get_processes():
                new_process, call = p.convert_to_DSL2()
                processes.append(new_process)
                to_replace.append((p.get_code(), call))
            
            for r in to_replace:
                code = code.replace(r[0], r[1])
            code = code.replace(process_section, "\n\n".join(processes))

            #TODO -> update the operations -> also consider the operations in the params of the calls which need to be updated

            for o in self.get_workflow_main().get_all_executors_in_workflow():
                if(o.get_type()=="Operation"):
                    code = code.replace(o.get_code(get_OG=True), o.convert_to_DSL2())
                else:
                    raise Exception(f"Executor of type '{o.get_type()}' was extracted in a DSL1 workflow! This shoudn't happen! The code is '{o.get_code()}'")

            #Putting || back
            code = code.replace("$OR$", "||")

            return code
    
    #This methods generates a random set of processes to consider as relavant 
    #It's not a uniform random it's a bit of a gaussian, centered at 0.5
    def generate_random_relevant_processes(self, alpha = -1):
        import random

        #Random value between 0 and 1, centered at 0.5
        def get_value():
            check = True
            val = -1
            while(check):
                check = False
                val = random.gauss(0.5, 0.1)
                if(val<0 or val>1):
                    check = True
            return val

        if(self.duplicate):
            if(alpha == -1):
                alpha = get_value()
            else:
                if(0<=alpha and alpha<=1):
                    None
                else:
                    raise BioFlowInsightError("alpha is not in the interval [0; 1]")
            
            processes_called = []
            for c in self.get_workflow_main().get_all_calls_in_workflow():
                p = c.get_first_element_called()
                if(p.get_type()=="Process"):
                    processes_called.append(p)
            nb_2_select = int(alpha*len(processes_called))
            sampled = random.sample(set(processes_called), nb_2_select)
            name_select = []
            for p in sampled:
                name_select.append(p.get_alias())
            return name_select
        else:
            raise BioFlowInsightError("Trying to generate random relevant processes however option 'duplicate' is not activated.")
    
    #This method rewrites the entire workflow into one single file
    def write_workflow_into_one_file(self):
        #This tag is used as an identification to safely manipulate the string 
        tag = str(time.time())
        
        self.get_first_file
        code = self.get_first_file().get_code()
        
        #params_section = f"//PARAMS_SECTION_{tag}"
        function_section = f"//FUNCTION_SECTION"
        process_section = f"//PROCESS_SECTION"
        subworkflow_section = f"//SUBWORKFLOW_SECTION"

        ankers = function_section+ "\n"*3 + process_section+ "\n"*3 + subworkflow_section

        to_replace = []
        for match in re.finditer(constant.FULL_INLCUDE_2, code):
            to_replace.append(match.group(0))
        for r in to_replace:
            code = code.replace(r, ankers)
            ankers = ""
        

        processes, subworkflows, functions = [], [], []
        for c in self.get_workflow_main().get_all_calls_in_workflow():
            ele = c.get_first_element_called()
            if(ele.get_type()=="Process"):
                processes.append(ele)
            elif(ele.get_type()=="Subworkflow"):
                subworkflows.append(ele)
            elif(ele.get_type()=="Function"):
                functions.append(ele)
            else:
                raise Exception("This shoudn't happen")


        #Adding processes into code
        for p in processes:
            if(p.get_code() not in code):
                code = code.replace(process_section, '\n'+p.get_code_with_alias()+'\n'+process_section)

        #Adding subworkflows into code
        for sub in subworkflows:
            if(sub.get_code() not in code):
                code = code.replace(subworkflow_section, subworkflow_section+'\n'+sub.get_code_with_alias()+'\n')

        #Adding functions into code
        for fun in functions:
            if(fun.get_code() not in code):
                code = code.replace(function_section, function_section+'\n'+fun.get_code()+'\n')
        
        #Remove the ankers
        #code = code.replace(function_section, "")
        #code = code.replace(process_section, "")
        #code = code.replace(subworkflow_section, "")
        ankers = {"function_section":function_section,
                  "process_section":process_section,
                  "subworkflow_section":subworkflow_section}

        return code, ankers



    #TODO -> write tests for this method
    #Function that rewrites the workflow code
    #Rewriting everything in one file + simplifying the operations and calls to simplify the analysis
    def simplify_workflow_code(self):
        code = self.get_first_file().get_code()
        code, ankers = self.write_workflow_into_one_file()
        all_executors = self.get_workflow_main().get_all_executors_in_workflow()
        
        #We do this so that the longest operation and calls are rewritten first in the code -> to avoid problems
        executor_2_length = {}
        for e in all_executors:
            executor_2_length[e] = len(e.get_code(get_OG = True))
        sorted_executor_2_length = {k: v for k, v in sorted(executor_2_length.items(), key=lambda item: item[1], reverse=True)}
        
        for exe in sorted_executor_2_length:
            if(exe.get_type()=="Call" or exe.get_type()=="Operation"):
                code = code.replace(exe.get_code(get_OG = True), exe.simplify_code(), 1)
            else:
                print(exe.get_code(), exe.get_type())
                raise Exception("This shouldn't happen")
        return code

    def get_subworkflows_called(self):
        subs = []
        for c in self.get_workflow_main().get_all_calls_in_workflow():
            ele = c.get_first_element_called()
            if(ele.get_type()=="Subworkflow"):
                subs.append(ele)
        return subs
    
    def get_processes_called(self):
        subs = []
        for c in self.get_workflow_main().get_all_calls_in_workflow():
            ele = c.get_first_element_called()
            if(ele.get_type()=="Process"):
                subs.append(ele)
        return subs


    def rewrite_and_initialise(self, code):
        #Write new code in temporary file
        temp_file = self.get_output_dir()/f"temp_{str(self)[-7:-2]}.nf"
        with open(temp_file, "w") as file:
            file.write(code)
        
        #Replace old analysis with new analysis (simplified code)
        self.__init__(str(temp_file), display_info = False, duplicate=True)
        self.initialise()

    def check_relevant_processes_in_workflow(self, relevant_processes):
        #Check all relevat processes are in wf
        workflow_processes = []
        for c in self.get_workflow_main().get_all_calls_in_workflow():
            ele = c.get_first_element_called()
            if(ele.get_type()=="Process"):
                workflow_processes.append(ele.get_alias())
        
        for p in relevant_processes:
            if(p not in workflow_processes):
                raise BioFlowInsightError(f"The element '{p}' given as a relevant processes is not present in the workflow's processes", 24)


    def generate_user_view(self, relevant_processes = [], render_graphs = True, processes_2_remove = []):
        self.graph.initialise(processes_2_remove = processes_2_remove)
        self.graph.generate_user_view(relevant_processes = relevant_processes, render_graphs = render_graphs)


    #I do not recommand that the dev uses the same name for the channels inside and outside the channels
    #Since the local channels become local at the upper level
    def rewrite_subworkflow_call(self, code, subworklfow):
        
        #Remove the defintion from the code
        code = code.replace(subworklfow.get_code(), "")
        OG_call = subworklfow.get_call()
        OG_body = subworklfow.get_work()
        
        #REPLACE HEADER TAKES
        subworkflow_takes = subworklfow.get_takes()
        parameters = OG_call.get_parameters()
        if(len(subworkflow_takes)!=len(parameters)):
            raise Exception("This shouldn't happen -> the same number of parameters should be kept")
        #This is to replace the paramters and the takes
        new_header = ""
        for i in range(len(parameters)):
            param = parameters[i]
            takes = subworkflow_takes[i].get_gives()[0]
            #Here we're checking that the input inside and outside the subworkflow are the same
            #If they are we're going to remove everything to avoid the case
            
            """
            param_1 = fastq
            sub(param_1)
            
            AND in the subworkflow 

            sub{
                take:
                fastq
            }
            ----------
            This would mean when removing the subworkflow def -> we would get this:

            param_1 = fastq
            fastq = param_1
            """
            #Obviously we want to avoid this case
            input_val = ""
            try:
                input_val = param.origins[0].get_source()[0].get_origins()[0].get_name()
            except:
                input_val = param.get_code(get_OG = True)
            if(takes.get_code()!=input_val):
                new_header+=f"{takes.get_code()} = {param.get_code(get_OG = True)}"
            else:
                #In the case they are the same -> we remove the remaining operation (which doesn't serve a purpose)
                #The "param_1 = fastq" operation 
                operation_code = param.origins[0].get_source()[0].get_code()
                code = code.replace(operation_code, "", 1)
        
    
        code = code.replace(OG_call.get_code(get_OG = True), f"{new_header}\n\n{OG_body}", 1)

        #REPLACE THE EMITS
        #TODO admittedly this code below is very moche -> but it's functionnal -> update it 
        emits = subworklfow.get_emit()
        to_replace = []
        all_executors = self.get_workflow_main().get_all_executors_in_workflow()
        for exe in all_executors:
            #We don't need to check the case call since the workflow has already been rewriteen -> emits only appear in operations
            if(exe.get_type()=="Operation"):
                emited = exe.get_origins()
                if(len(emited)==1):
                    emited = emited[0]
                    if(emited.get_type()=="Emitted"):
                        if(emited.get_emitted_by().get_first_element_called()==subworklfow):
                            if(emited.get_emits() not in emits):
                                raise Exception("This shoudn't happen -> since it is the actual subworkflow")
                            to_replace.append((exe.get_code(get_OG = True), f"{exe.get_gives()[0].get_code()} = {emited.get_emits().get_origins()[0].get_code()}"))
        for r in to_replace:
            old, new = r
            #Case of channel == channel
            if(new.split("=")[0].strip()==new.split("=")[1].strip()):
                new = ''
            code = code.replace(old, new)
        
        return code


    #This Function returns the channels on which the subworkflow (things_added_in_cluster) depend on
    def get_takes(self, things_added_in_cluster):
        #Basiccaly this is a deco of channels to opeartions -> when the value is an empty list 
        #This means that the channel is totally definied in the subworkflow -> so we are searching for 
        #Channels which aren't totatly defined in the subworkflow 
        channels_2_sources = {}

        for ele in things_added_in_cluster:
            if(ele.get_type() == "Operation"):
                for o in ele.get_origins():
                    channels_2_sources[o] = replace_thing_by_call(o.get_source())
            elif(ele.get_type() == "Call"):
                for param in ele.get_parameters():
                    if(param.get_type()=="Channel"):
                        print(param, param.get_code())
                        raise Exception("This shouldn't happen -> with the rewrite all the params should be channels")
                    else: 
                        for o in param.get_origins():
                            if(o.get_type()=="Channel"):
                                channels_2_sources[o] = replace_thing_by_call(o.get_source())
                            else:
                                raise Exception("This shouldn't happen -> with the rewrite all the params should be channels")
            else:
                raise Exception("This shouldn't happen")
            
        takes = []
        for channel in channels_2_sources:
            if(set(channels_2_sources[channel]).intersection(things_added_in_cluster)!=set(channels_2_sources[channel])):
                takes.append(channel)
        #print(things_added_in_cluster)
        #print(channels_2_operations_needed)   
        return takes
    
    #This Function returns the channels the subworkflow (things_added_in_cluster) emits (other things depend on)
    def get_emits(self, things_added_in_cluster):
        emits = []  
        channel_2_sink = {}
        #Basiccaly this is a deco of channels to opeartions -> when the value is an empty list 
        #This means that the channel is totally definied in the subworkflow -> so we are searching for 
        #Channels which aren't totatly defined in the subworkflow
        #This means that things outside the subworkflow depend on this channel 
        channel_2_sink = {}

        for ele in things_added_in_cluster:
            if(ele.get_type() == "Operation"):
                for o in ele.get_gives():
                    channel_2_sink[o] = replace_thing_by_call(o.get_sink())
            elif(ele.get_type() == "Call"):
                #thing = ele.get_first_element_called()
                for e in ele.get_later_emits():
                    channel_2_sink[e] = replace_thing_by_call(e.get_sink())
            else:
                print(ele)
                raise Exception("This shouldn't happen")

        for channel in channel_2_sink:
            if(set(channel_2_sink[channel]).intersection(things_added_in_cluster)!=set(channel_2_sink[channel])):
                emits.append(channel)
        return emits



    #Method which rewrites the workflow follwong the user view
    #Conert workflow to user_view only makes sense when the option duplicate is activated -> otherwise is doesn't make sense + it makes the analysis way more complicated
    def convert_workflow_2_user_view(self, relevant_processes = []):
        if(self.duplicate):
            code = self.simplify_workflow_code()
            self.rewrite_and_initialise(code)
            
            #Get the clusters and the code
            self.check_relevant_processes_in_workflow(relevant_processes)
            self.generate_user_view(relevant_processes = relevant_processes, processes_2_remove =  [])
            clusters = self.graph.get_clusters_from_user_view()
            
            #DETERMING WHICH SUBWORKFLOWS ARE BROKEN WITH THE CLUSTER
            #Creating the clusters with calls instead of processes or subworkflows
            set_clusters_with_calls = []
            for c in clusters:
                tab = []
                for ele in c:
                    if(ele.get_type()=="Operation"):
                        if(ele.get_artificial_status()==False):
                            tab.append(ele)
                    else:
                        call = ele.get_call()
                        tab.append(call)
                set_clusters_with_calls.append(set(tab))

            
            #Getting subworkflows to executors
            subworkflow_2_executors = {}
            for sub in self.get_subworkflows_called():
                executors = sub.get_all_executors_in_workflow()
                subworkflow_2_executors[sub] = []
                for ele in executors:
                    #Cannot add calls to subworkflows -> they are already present by definition
                    if(ele.get_type()=="Call" and ele.get_first_element_called().get_type()=="Subworkflow"):
                        None
                        #We don't add it
                    else:
                        subworkflow_2_executors[sub].append(ele)
                #subworkflow_2_executors[sub.get_name()] = set(list(dico.keys()))

            #TODO -> write tests to test this function
            def get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls):
                broken_subworkflows = []
                for sub in subworkflow_2_executors:
                    #You want this list (set) to be equal to subworkflow_2_executors[sub]
                    elements_in_sub_with_clusters = []
                    for cluster in set_clusters_with_calls:
                        if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])):
                            break
                        for c in cluster:
                            if(len(elements_in_sub_with_clusters)>len(subworkflow_2_executors[sub])):
                                break
                            if(c in subworkflow_2_executors[sub]):
                                elements_in_sub_with_clusters+=list(cluster)
                                break
                    if(set(elements_in_sub_with_clusters)==set(subworkflow_2_executors[sub])):
                        None
                        #This means that the subworkflow is Intact
                    else:
                        #This means that the subworkflow is broken
                        broken_subworkflows.append(sub)
                return broken_subworkflows
            
            broken_subworkflows = get_workflows_broken(subworkflow_2_executors, set_clusters_with_calls)
            

            #Rewrite broken subworkflows
            for sub in broken_subworkflows:
                code = self.rewrite_subworkflow_call(code, sub)
            
            #TODO -> this needs to be optimised
            self.rewrite_and_initialise(code)
            #Get the clusters and the code
            self.generate_user_view(relevant_processes = relevant_processes, processes_2_remove =  [])
            clusters = self.graph.get_clusters_from_user_view()
            
            
            #Get the clsuters with the corresponding operations inside
            #for i in range(len(clusters)):
            #    c = clusters[i]
            #    if(len(c)>1):
            #        clusters[i] = self.nextflow_file.graph.get_induced_subgraph(c)
            #print(clusters)
            #Get the topological order
            clusters = self.graph.get_topogical_order(clusters)
    


            #Creating the subworkflows from clusters
            calls_in_operations = []
            non_relevant_name = 1
            channels_to_replace_outside_of_cluster = []
            subworkflow_clusters_to_add, subworkflow_cluster_calls_to_add = [], []
            index_cluster = 0
            for elements in clusters:
                #Only create the subworkflows for clusters with more than one element
                processes_added = []
                things_added_in_cluster = []
                if(len(elements)>1):
                    name, body, take, emit = "", "", "", ""
                    first_element = True
                    for ele in elements:
            
                        if(ele.get_type()=="Process"):
                            
                            #Determine the name of the created subworkflow cluster
                            if(ele.get_alias() in relevant_processes):
                                name = f"cluster_{ele.get_alias()}"
                            #Get the call of thing (either process or subworkflow)
                            call = ele.get_call()
            
                            #If first element -> add marker for the subworkflow call
                            if(first_element):
                                code = code.replace(call.get_code(get_OG = True), f"//Anker_cluster{index_cluster}")
                                first_element = False
                            else:
                                code = code.replace(call.get_code(get_OG = True), "")
            
            
                            processes_added.append(call.get_first_element_called())
                            values = []
                            for condition in call.get_all_conditions():
                                values.append(condition.get_value())
                            printed_condition = " && ".join(values)
                            if(printed_condition!=""):
                                body+=f"if({printed_condition}) {{\n{call.get_code()}\n}}\n"
                            else:
                                body+=f"\n{call.get_code()}\n"
                            things_added_in_cluster.append(call)
                        #Below
                        elif(ele.get_type()=="Operation"):
                            #TODO -> check this verification there might be some "effet de bord"
                            if(not ele.get_artificial_status()):
            
                                #If first element -> add marker for the subworkflow call
                                print(ele.get_code(get_OG = True))
                                if(first_element):
                                    code = code.replace(ele.get_code(get_OG = True), f"//Anker_cluster{index_cluster}", 1)
                                    first_element = False
                                else:
                                    code = code.replace(ele.get_code(get_OG = True), "", 1)
                
                                #Ignore these cases
                                if(ele.get_code()[:4] not in ["emit", "take"]):
                                    origins = ele.get_origins()
                                    for o in origins:
                                        if(o.get_type()=="Call"):
                                            calls_in_operations.append(o)
                                    
                                    values = []
                                    for condition in ele.get_all_conditions():
                                        values.append(condition.get_value())
                                    printed_condition = " && ".join(values)

                                    if(printed_condition!=""):
                                        body+=f"if({printed_condition}) {{\n{ele.get_code()}\n}}\n"
                                    else:
                                        body+=f"\n{ele.get_code()}\n"
                                things_added_in_cluster.append(ele)
                        else:
                            raise Exception("This shoudn't happen")
                    
                     
                    #TODO check this part of the code is never seen
                    #Here we removing the Call_12313 thing
                    for call in calls_in_operations:
                        raise Exception("This shouldn't happen since the workflows has been rewritten")
                        body = body.replace(call.get_code(), "")
                        body = body.replace(str(call), call.get_code())
            
                     
                    #If the name=="" -> it means there isn't any relevant processes in the cluster -> it means it's a cluster of non relevant nodes
                    if(name==""):
                        #If there are processes called we are going to use them
                        if(len(processes_added)>0):
                            #TODO find a better naming system
                            name = f"non_relevant_cluster_{processes_added[0].get_alias()}"
                        else:
                            #TODO find a better naming system
                            name = f"non_relevant_cluster_{non_relevant_name}"
                            non_relevant_name+=1
                    
                    #Check that there is a single condtion in the body
                    body = group_together_ifs(body)
                    conditions_in_subworkflow = []
                    end = -1
                    for match in re.finditer(r"if\s*(\([^\{]+)\{", body):
                        conditions_in_subworkflow.append(match.group(1).strip())
                        _, start = match.span(0)
                        
                    if(len(conditions_in_subworkflow)==1):
                        end = extract_curly(body, start)
                        body = body[start: end-1].strip()
                    
                     
                    #TAKE
                    #Adding take parameters on the inside of the subworkflow
                    takes_param = self.get_takes(things_added_in_cluster)
                    new_param_names, index, old_param_names = [], 1, []
                    for param in takes_param:
                        param_name = f"param_{name}_{index}"
                        new_param_names.append(param_name)
                        old_param_names.append(param.get_code())
                        index += 1
                    if(len(new_param_names)>0):
                        temp = '\n'.join(new_param_names)
                        take = f"\ntake:\n{temp}\n"
                    
                    #EMIT
                    #Adding the emitted outputs
                    emitted_outputs = self.get_emits(things_added_in_cluster)
                    new_output_names, index, old_output_names = [], 0, []
                    for output in emitted_outputs:
                        output_name = f"{name}.out[{index}]"
                        new_output_names.append(output_name)
                        old_output_names.append(output.get_code())
                        index += 1
                    
                    if(len(old_output_names)>0):
                        temp = '\n'.join(old_output_names)
                        emit = f"\nemit:\n{temp}\n"
                    
                    #Adding empty channels if it doesn't exist in the case of a negative condition
                    body = get_channels_to_add_in_false_conditions(body, old_output_names)
                    
                    
                    
                    #Replace names inside subworkflow
                    subworkflow_code = f"workflow {name} {{\n{take}\nmain:\n{body}\n{emit}\n}}"
                    for i in range(len(new_param_names)):
                        pattern = fr"[\=\,\(] *({re.escape(takes_param[i].get_code())})[\s\,\)\.]"
                        subworkflow_code = replace_group1(subworkflow_code, pattern, new_param_names[i])
                        #subworkflow_code = subworkflow_code.replace(takes_param[i].get_code(), new_param_names[i])
                    
                    #TODO -> added verification of conditions 
                    params = ", ".join(old_param_names)
                    subworkfow_call_case_true = f"{name}({params})"
                    subworkfow_call_case_false = ""
                    for i in range(len(new_output_names)):
                        #In the case of channels, we just add chanel = subworkflow.out[i]
                        if(not bool(re.findall("\.\s*out", old_output_names[i]))):
                            subworkfow_call_case_true+=f"\n{old_output_names[i]} = {new_output_names[i]}"
                            subworkfow_call_case_false+=f"\n{old_output_names[i]} = Channel.empty()"
                        #In the case of emitted values we need to replace the code on the outside
                        else:
                            param_out_name= f"{name}_out_{i+1}"
                            subworkfow_call_case_true+=f"\n{param_out_name} = {new_output_names[i]}"
                            subworkfow_call_case_false+=f"\n{param_out_name} = Channel.empty()"
                            channels_to_replace_outside_of_cluster.append((old_output_names[i], param_out_name))
                    #If there was only one single condition in the subworkflow cluster -> then we add it when the call is done
                    if(len(conditions_in_subworkflow)==1):
                        subworkfow_call = f"if{conditions_in_subworkflow[0]} {{\n{subworkfow_call_case_true}\n}} else {{\n{subworkfow_call_case_false}\n}}"
                        None
                    else:
                        subworkfow_call = subworkfow_call_case_true
                    
                    
                    subworkflow_clusters_to_add.append(subworkflow_code)
                    subworkflow_cluster_calls_to_add.append(subworkfow_call)
                    index_cluster+=1
                    

              
            #TODO -> rmoving the conditions which are problematic
            #This might not be the probleme -> when rerunnung the analysis isn't totally robust
            still_simplifying_conditions = True
            while(still_simplifying_conditions):
                still_simplifying_conditions = False
                to_replace, anker1, anker2 = "", "", ""
                #Replace if/else
                for match in re.finditer(r"if\s*\([^\{]+\{\s*(\/\/Anker_cluster\d|\s)\s*\}\s*else\s*\{\s*(\/\/Anker_cluster\d|\s)\s*\}", code):
                    to_replace = match.group(0)
                    anker1, anker2 = match.group(1), match.group(2)
                    still_simplifying_conditions = True
                    break 
                #Replace empty if on its own
                if(not still_simplifying_conditions):
                    for match in re.finditer(r"(if\s*\([^\{]+\{\s*(\/\/Anker_cluster\d|\s)\s*\})\s*[^e]", code):
                        to_replace = match.group(1)
                        anker1 = match.group(2)
                        still_simplifying_conditions = True
                        break 
                if(still_simplifying_conditions):
                    code = code.replace(to_replace, f"{anker1}\n{anker2}")
            
            
        
            #Replace the ankers by the calls of the subworkflows  
            for i in range(len(subworkflow_clusters_to_add)):
                #print(f"//Anker_cluster{i}", subworkflow_cluster_calls_to_add[i])
                code = code.replace(f"//Anker_cluster{i}", subworkflow_cluster_calls_to_add[i])
            
           
            for old, new in channels_to_replace_outside_of_cluster:
                pattern= fr"[ \(,]({re.escape(old)})"
                code = replace_group1(code, pattern, new)
                #code = code.replace(old, new)
            
          
            #Add the subworkflow defintions
            #-------------------------------------
            #Add anker
            subworkflow_section = f"//ANKER 4 SUBWORKFLOW DEF"
            to_replace = ""
            for match in re.finditer(r"workflow\s+\w*\s*\{", code):
                to_replace = match.group(0)
                break
            if(to_replace==""):
                raise Exception("No call to a workflow")
            
            code = code.replace(to_replace, f"{subworkflow_section}\n\n{to_replace}")
            
            for sub in subworkflow_clusters_to_add:
                code = code.replace(f'{subworkflow_section}', f"{sub}\n\n{subworkflow_section}")
            
            
            #Putting || back
            code = code.replace("$OR$", "||")
            
            return remove_extra_jumps(format_with_tabs(code))
            #return code
            #
            ##So basically when retriving a thing (process or subworkflow)
            ##There is necessarily one call associated with the thing -> since we have the option duplicate activated
            ##You can check if len(thing.call)==1 -> and if not raise an error (but this shouldn't happen)
        else:
            raise BioFlowInsightError("Trying to convert the workflow with user view however option 'duplicate' is not activated. -> to reformulated")