Skip to content
Snippets Groups Projects
root.py 27.38 KiB

from .nextflow_building_blocks import Nextflow_Building_Blocks
from .code_ import Code
from .bioflowinsighterror import BioFlowInsightError

from .outils import *
from . import constant

import re

class Root(Nextflow_Building_Blocks):
    def __init__(self, code, origin, modules_defined,
                 subworkflow_inputs = []):#These channels are the inputs of the subworkflow
        Nextflow_Building_Blocks.__init__(self, code)
        self.origin = origin
        self.executors = []
        self.blocks = []
        self.modules_defined = modules_defined
        self.elements_being_called = []
        self.channels = subworkflow_inputs
        self.defined_processes = []
    

    #############
    #  GENERAL
    #############
    def get_type(self):
        return "Root"
    
    def get_position_in_main(self, executor):
        return self.origin.get_position_in_main(executor)
    
    def get_subworkflow_calls_to_get_here(self, executor):
        if(self.origin.get_type()=="Main"):
            return []
        elif(self.origin.get_type() in ["Block", "Root"]):
            return self.origin.get_subworkflow_calls_to_get_here(executor)
        #Case subworkflow
        else:
            call = self.origin.get_call()
            return [call]+call.get_subworkflow_calls_to_get_here(executor)

    def get_blocks(self):
        return self.blocks
    
    def get_conditions_2_ignore(self):
        return self.origin.get_conditions_2_ignore()

    def add_element_to_elements_being_called(self, element):
        self.elements_being_called.append(element)

    #Does this on the same level
    def get_blocks_with_same_conditions(self, searching_block):
        tab = []
        for block in self.blocks:
            if(block != searching_block):
                if(block.same_condition(searching_block)):
                    tab.append(block)
        return tab
    
    #This method returns returns all the conditions above the block
    #Basically everything which needs to be true for the block to exist
    def get_all_conditions(self, conditions = {}):
        return conditions


    #############
    # CHANNELS
    #############

    def get_channels_same_level(self):
        return self.channels

    def get_channels_from_name_same_level(self, name):
        tab = []
        for c in self.channels:
            if(c.get_name()==name):
                tab.append(c)
        return tab
    
    def get_channels_above_level(self):
        return []
    
    def get_channels_above_level_rec(self, dico = {}):
        for c in self.channels:
            dico[c] = ''

    def get_channels_from_name_above_level(self, name):
        tab = []
        for c in self.get_channels_above_level():
            if(c.get_name()==name):
                tab.append(c)
        return tab
        
    def get_channels_inside_level_rec(self, dico = {}):
        for c in self.channels:
            dico[c] = ''
        for b in self.blocks:
            b.get_channels_inside_level_rec(dico)
    
    def get_channels_inside_level(self):
        dico = {}
        for b in self.blocks:
            b.get_channels_inside_level_rec(dico)
        return list(dico.keys())
    
    def get_channels_from_name_inside_level(self, name):
        tab = []
        for c in self.get_channels_inside_level():
            if(c.get_name()==name):
                tab.append(c)
        return tab

    def get_channels_from_other_blocks_on_same_level(self):
        return []
    
    def get_channels_from_name_other_blocks_on_same_level(self, name):
        tab = []
        for c in self.get_channels_from_other_blocks_on_same_level():
            if(c.get_name()==name):
                tab.append(c)
        return tab
    
    def get_channels_from_name_all_channels(self, name):
        channels = self.get_channels_same_level()+self.get_channels_inside_level()
        tab = []
        for c in channels:
            if(c.get_name()==name):
                tab.append(c)
        return tab




    #def check_in_channels(self, channel):
    #    for c in self.get_channels():
    #        if(c.equal(channel)):
    #            return True
    #    for b in self.blocks:
    #        if(b.check_in_channels(channel)):
    #            return True
    #    return False
    

    def add_channel(self, channel):        
        self.channels.append(channel)

        
    #def get_channel_from_name(self, name):
    #    for c in self.get_channels():
    #        if(name == c.get_name()):
    #            return c
    #    return None
    #    #tab = []
    #    #for b in self.blocks:
    #    #    channels = b.get_channel_from_name(name)
    #    #    tab+=channels
    #    #raise Exception(f"{name} is not in the list of channels")
        
    

    #############
    # EXECUTORS
    #############
    def get_executors_same_level(self):
        return self.executors
    
    def get_above_executors(self):
        return []

    def get_above_executors_rec(self, dico = {}):
        for e in self.executors:
            dico[e] = ''
    
    #This method returns all the executors inside a block
    def get_inside_executors_rec(self, dico = {}):
        for e in self.executors:
            dico[e] = ''
        for b in self.blocks:
            b.get_inside_executors_rec(dico)
    
    def get_inside_executors(self):
        dico = {}
        for b in self.blocks:
            b.get_inside_executors_rec(dico)
        return list(dico.keys())
    
    #def get_all_executors_from_workflow(self):
    #    return self.get_executors_same_level()+self.get_inside_executors()

    #def get_calls(self):
    #    tab = []
    #    for c in self.get_executors():
    #        if(c.get_type()=="Call"):
    #            tab.append(c)
    #        elif(c.get_type()=="Operation"):
    #            for o in c.get_origins():
    #                if(o.get_type()=="Call"):
    #                    tab.append(o)
    #    return tab
    
    #############
    #   CALLS
    #############

    def get_calls_same_level(self):
        tab = []
        for c in self.executors:
            if(c.get_type()=="Call"):
                tab.append(c)
            elif(c.get_type()=="Operation"):
                for o in c.get_origins():
                    if(o.get_type()=="Call"):
                        tab.append(o)
        return tab
    
    def get_calls_above_level(self):
        return []
    
    #This method returns all the calls inside a block
    def get_calls_inside_level(self):
        tab = []
        executors = self.get_inside_executors()
        for e in executors:
            if(e.get_type()=="Call"):
                tab.append(e)
            elif(e.get_type()=="Operation"):
                for o in e.get_origins():
                    if(o.get_type()=="Call"):
                        tab.append(o)
        return tab
    

    def get_calls_from_other_blocks_on_same_level(self):
        return []

    def get_all_calls_in_subworkflow(self, calls = {}):
        all_calls = self.get_calls_same_level()+self.get_calls_inside_level()
        for call in all_calls:
            for c in call.get_all_calls():
                calls[c] = ''
                #if(c.get_first_element_called().get_type()=="Subworkflow"):
                #    c.get_first_element_called().root.get_all_calls(calls = calls)
    

    def get_all_executors_in_subworkflow(self, calls = {}):
        all_executors = self.get_executors_same_level()+self.get_inside_executors()
        for e in all_executors:
            calls[e] = ''
    

    #############
    # PROCESSES
    #############
    def extract_defined_processes(self):
        code = self.get_code()
        #For each block -> remove its code
        for b in self.blocks:
            code = code.replace(b.get_code(), "", 1)

        for match in re.finditer(r"\<src\.process\.Process object at \w+\>", code):
            for process in self.modules_defined:
                if(str(process)==match.group(0)):
                    process.set_origin(self)
                    self.defined_processes.append(process)
    
    
    def initialise(self):
        #Define the blocks
        code = self.get_code()
        conditions = extract_conditions(code)
        
        #TODO -> normally it is not a problem -> cause i've removed the recursive option
        #But just check that the bodies don't appear twice in the dico

        #For each condition -> create a block
        for c in conditions:
            from .block import Block
            body = code[conditions[c][0]:conditions[c][1]].strip()
            c = c.split("$$__$$")[0]
            import copy
            block = Block(code=body, origin=self, condition=c, modules_defined=self.modules_defined, existing_channels = copy.copy(self.channels))
            self.blocks.append(block)


        self.extract_executors()
        
        #Case DSL1 -> need to extract the processes which have been defined but rplaced in the code
        self.extract_defined_processes()

       
        #This is to get the order of execution
        code = self.get_code()
        position_2_thing_2_analyse = {}
        for block in self.blocks:
            block_code = block.get_code().strip()
            if(block_code!=""):
                found = False
                while(not found):
                    if(len(block_code)<=0):
                        break
                    pos = code.find(block_code)
                    if(pos!=-1):
                        position_2_thing_2_analyse[pos] = block
                        code = code.replace(block_code, "a"*len(block_code), 1)
                        found = True
                    else:
                        block_code = block_code[:-1]
                if(not found):
                    raise Exception("This shouldn't happen")
        
        for process in self.defined_processes:
            found = False
            pos = code.find(str(process))
            if(pos!=-1):
                position_2_thing_2_analyse[pos] = process
                found = True
            if(not found):
                raise Exception("This shouldn't happen")            
        
        for e in self.executors:
            e_code = e.get_code()
            found = False
            while(not found):
                if(len(e_code)<=0):
                    break
                pos = code.find(e_code)
                if(pos!=-1):
                    position_2_thing_2_analyse[pos] = e
                    code = code.replace(e_code, "a"*len(e_code), 1)
                    found = True
                else:
                    e_code = e_code[:-1]
            if(not found):
                raise Exception("This shouldn't happen") 
       
        sorted_position_2_thing_2_analyse = dict(sorted(position_2_thing_2_analyse.items()))

        for key in sorted_position_2_thing_2_analyse:
            element = sorted_position_2_thing_2_analyse[key]
            element.initialise()


    def get_process_from_name(self, name):
        for m in self.modules_defined:
            if(m.get_type()=="Process" and m.get_alias()==name):
                return m
            
    def get_subworkflow_from_name(self, name):
        for m in self.modules_defined:
            if(m.get_type()=="Subworkflow" and m.get_alias()==name):
                return m
            
    def get_function_from_name(self, name):
        for m in self.modules_defined:
            if(m.get_type()=="Function" and m.get_alias()==name):
                return m

    def extract_executors(self):
        from .operation import Operation
        from .call import Call

        #https://github.com/nextflow-io/nextflow/blob/45ceadbdba90b0b7a42a542a9fc241fb04e3719d/docs/operator.rst
        #TODO This list needs to be checked if it's exhaustive

        code = self.get_code()

        #For each block -> remove its code
        for b in self.blocks:
            code = code.replace(b.get_code(), "", 1)

        things_to_remove = []
        #things_to_remove+= self.processes+self.includes+self.subworkflows+self.functions
        #if(self.main!=None):
        #    things_to_remove+=[self.main]
        #
        #for to_remove in things_to_remove:
        #    code = code.replace(to_remove.get_code(get_OG = True), "", 1)

        #We add this to simplify the search of the executors 
        code = "start\n"+code+"\nend"

        #This function takes an executor (already found and expandes it to the pipe operators)
        def expand_to_pipe_operators(text, executor):
            #If the executor ends with the pipe operator -> we remove it so that it can be detected by the pattern
            if(executor[-1]=="|"):
                executor = executor[:-1].strip()
            start = text.find(executor)+len(executor)
            for match in re.finditer(constant.END_PIPE_OPERATOR, text[start:]):
                begining, end = match.span(0)
                if(begining==0):
                    return expand_pipe_operator(text, executor+match.group(0))
                break
            return executor

        

        #---------------------------------------------------------------
        #STEP1 - Extract equal operations eg. 
        # *Case "channel = something"
        # *Case "(channel1, channel2) = something"
        #--------------------------------------------------------------- 
        pattern_equal = constant.LIST_EQUALS
    
        searching = True
        while(searching):
            searching= False
            text = code
            for e in self.executors:
                text = text.replace(e.get_code(), "", 1)
            
            for pattern in pattern_equal:
                for match in re.finditer(pattern, text):
                    
                    start, end = match.span(2)
                    ope = extract_end_operation(text, start, end)
                    ope = expand_to_pipe_operators(text, ope)
                 
                    #If the thing which is extracted is not in the conditon of an if 
                    if(not checks_in_condition_if(text, ope) and not checks_in_string(text, ope)):
                        operation = Operation(ope, self)
                        self.executors.append(operation)
                        searching= True
                        break

        #I switched step 2 and step 3 -> cause there were cases where there was operations in the paramters of a call -> they were extracted and removed
        #-----------------------------------
        #STEP3 - Extract the remaining calls
        #-----------------------------------
        #These are the processes and subworkflows we need to check are called
        if(self.get_DSL()=="DSL2"):
            to_call = []
            for m in self.modules_defined:
                to_call.append(m.get_alias())
            pattern_call = constant.BEGINNING_CALL
            searching = True
            while(searching):
                searching= False
                text = " "+code
                for e in self.executors:
                    text = text.replace(e.get_code(), "", 1)
        
                for match in re.finditer(pattern_call, text):
                    if(match.group(1) in to_call):
                        
                        start, end = match.span(0)
                        #We do this cause the first caracter is a " "
                        start+=1
                        txt_call = get_end_call(text, start, end)
                        txt_call = expand_to_pipe_operators(text, txt_call)
                        #If the thing which is extracted is not in the conditon of an if 
                        if(not checks_in_condition_if(text, txt_call) and not checks_in_string(text, txt_call)):
                            if(txt_call.find("|")!=-1 and txt_call[txt_call.find("|")-1]!="|" and txt_call[txt_call.find("|")+1]!="|"):
                                first_thing_called = txt_call.split('|')[-1].strip()
                                if(first_thing_called in to_call):
                                    call = Call(code =txt_call, origin =self)
                                    self.executors.append(call)
                                else:
                                    added = True
                                    if(first_thing_called in constant.LIST_OPERATORS):
                                        added = True
                                    if(not added):
                                        for operator in constant.LIST_OPERATORS:
                                            for match in re.finditer(operator+constant.END_OPERATOR, txt_call.split('|')[-1].strip()):
                                                start, end = match.span(0)
                                                if(start==0):
                                                    added = True
                                    if(not added):
                                        raise BioFlowInsightError(f"In the executor '{txt_call}', '{first_thing_called}' is neither a process, subworkflow or an operator{self.get_string_line(txt_call)}", num = 14, origin=self)
                                    else:
                                        ope = Operation(code =txt_call, origin =self)
                                        self.executors.append(ope)
                            else:
                                #We need to see if we can expand the call to a operation perhaps process().set{ch}
                                expanded = expand_call_to_operation(text, txt_call)#TODO update this
                                if(txt_call==expanded):
                                    call = Call(code =txt_call, origin =self)
                                    self.executors.append(call)
                                else:
                                    ope = Operation(code =expanded, origin =self)
                                    self.executors.append(ope)
                            
                            searching = True
                            break


        #-------------------------------------------------
        #STEP2 - Extract the terms which use the operators
        #-------------------------------------------------
        pattern_dot = constant.DOT_OPERATOR
        searching = True
        searched = []


        while(searching):
            searching= False
            text = code
            for e in self.executors:
                text = text.replace(e.get_code(), "", 1)
            
            for match in re.finditer(pattern_dot, text):
                start, end = match.span(1)
                
                if(match.group(1) not in constant.ERROR_WORDS):
                    if(match.group(1) in constant.LIST_OPERATORS):
                        #TODO -> the function below might not work perfectly but i don't have any other ideas
                        
                        
                        #Use if there is an operator called right before opening the curlies/parenthse
                        #curly_left, curly_right = get_curly_count(text[:start]), get_curly_count(text[end:])
                        parenthese_left, parenthese_right = get_parenthese_count(text[:start]), get_parenthese_count(text[end:])
                        
                        #if(curly_left==0 and curly_right==0 and parenthese_left==0 and parenthese_right==0 and (start, end) not in searched):
                        #if(parenthese_left==0 and parenthese_right==0 and (start, end, temp) not in searched):
                        if(parenthese_left==0 and parenthese_right==0):
                            
                        
                            try:
                                pot = extract_executor_from_middle(text, start, end) 
                            except:
                                try:
                                    temp = text[start-10:end+10]
                                except:
                                    temp = text[start:end]
                                raise BioFlowInsightError(f"Failed to extract the operation or call{self.get_string_line(temp)}. Try rewriting it in a simplified version.", num = 11, origin=self)
                            
                            pot = expand_to_pipe_operators(text, pot)
                            #IF the exact potential hasn't already been searched, then we don't do it
                            if((start, end, pot) not in searched):
                                searched.append((start, end, pot))
                                #If the thing which is extracted is not in the conditon of an if 
                                if(not checks_in_condition_if(text, pot) and not checks_in_string(text, pot)):
                                    if(self.get_DSL()=="DSL2"):
                                        to_call = []
                                        for m in self.modules_defined:
                                            to_call.append(m.get_alias())
                                        if(pot.find("|")!=-1):
                                            if(not checks_in_condition_if(pot, '|') and not checks_in_string(pot, '|')):#TODO checks_in_string is the first occurance
                                                first_thing_called = pot.split('|')[-1].strip()
                                                if(first_thing_called in to_call):
                                                    call = Call(code =pot, origin =self)
                                                    self.executors.append(call)
                                                elif(first_thing_called in constant.LIST_OPERATORS):
                                                    ope = Operation(code =pot, origin =self)
                                                    self.executors.append(ope)
                                                else:
                                                    raise BioFlowInsightError(f"'{first_thing_called}' is neither a process, subworkflow or an operator. In the executor '{pot}'{self.get_string_line(pot)}.", num=14,origin=self)#TODO -> try rewriting the operation using the standard syntaxe
                                            
                                            else:
                                                from .executor import Executor
                                                executor = Executor(pot, self)
                                                self.executors.append(executor.return_type())
                                        
                                        else:
                                            from .executor import Executor
                                            executor = Executor(pot, self)
                                            self.executors.append(executor.return_type())
                                    else:
                                        ope = Operation(pot, self)
                                        self.executors.append(ope)
                                    searching = True
                                    break
                        

        #---------------------------------------------------------------
        #STEP4 - Extract the Executors which only use the pipe operators (which start with a channel)
        #---------------------------------------------------------------
        to_call = []
        for m in self.modules_defined:
            to_call.append(m.get_alias())

        searching = True
        while(searching):
            searching= False
            text = code
            for e in self.executors:
                text = text.replace(e.get_code(get_OG=True), "", 1)
            pattern = constant.BEGINNING_PIPE_OPERATOR
            
            for match in re.finditer(pattern, text):
                txt_call = expand_pipe_operator(text, match.group(0))
                full_executor =  txt_call
                
                #start, end = match.span(0)
                ## Check to see if a parameter is given such as in the example 'splitLetters | flatten | convertToUpper | view { it.trim() }'
                #params, full_executor = check_if_parameter_is_given_pipe(text, start, end)
                #if(params!=''):
                #    tab_to_call = txt_call.split('|')
                #    start = f"{tab_to_call[0]}({params})"
                #    txt_call = start + '|' + '|'.join(tab_to_call[1:])

                
                #If the thing which is extracted is not in the conditon of an if 
                if(not checks_in_condition_if(text, full_executor) and not checks_in_string(text, full_executor)):
                    tab_to_call = txt_call.split('|')
                    if(tab_to_call[0].strip() in to_call):
                        start = f"{tab_to_call[0]}()"
                        txt_call = start + '|' + '|'.join(tab_to_call[1:])
                    first_thing_called = txt_call.split('|')[-1].strip()

                    if(first_thing_called in to_call):
                        call = Call(code =txt_call, origin =self, OG_code= full_executor)
                        self.executors.append(call)
                        searching = True
                        break
                    elif(first_thing_called in constant.LIST_OPERATORS):
                        ope = Operation(code =txt_call, origin =self, OG_code= full_executor)
                        self.executors.append(ope)
                        searching = True
                        break
                    else:
                        added = False
                        #This is in the case "channel | map {dfvfdvd}"
                        for ope in constant.LIST_OPERATORS:
                            if(first_thing_called[:len(ope)]==ope and not added):
                                ope = Operation(code =txt_call, origin =self, OG_code= full_executor)
                                self.executors.append(ope)
                                added = True
                                searching = True
                        if(added):
                            break
                        elif(not added):
                            raise BioFlowInsightError(f"In the executor '{txt_call}', '{first_thing_called}' is neither a process, subworkflow or an operator (in the file '{self.get_file_address()}')", num = 14,origin=self)
        
        #---------------------------------------------------------------------
        #STEP5 - We remove the things which were falsy extracted as executors
        #---------------------------------------------------------------------
        to_remove = []
        starting_by_to_remove = ["System.out"]
        for e in self.executors:
            for r in starting_by_to_remove:
                if(e.get_code()[:len(r)]==r):
                    to_remove.append(e)
        for e in to_remove:
            self.executors.remove(e)

    def get_structure(self, dico):
        #This only for DSL1 workflows
        if(self.origin.get_DSL()=="DSL1"):
            for process in self.defined_processes:
                process.get_structure(dico)
            for channel in self.channels:
                for sink in channel.get_sink():
                    #If the sink an operation then the edge has already been added in the get_structure method for the operation
                    if(sink.get_type()=="Process"):
                        channel.get_structure(dico, sink)

        
        for block in self.blocks:
            block.get_structure(dico)
        for e in self.executors:
            if(e.get_type()=="Operation"):
                e.get_structure(dico)
            elif(e.get_type()=="Call"):
                e.get_structure(dico)
            else:
                raise Exception(f"Executor of type '{e.get_type()}' was extracted in a DSL2 workflow! I don't know what this is! The code is '{e.get_code()}'")