import re import json import copy import time from .code_ import Code from .condition import Condition from .outils import get_next_param, replace_group1 from .executor import Executor from .bioflowinsighterror import BioFlowInsightError from . import constant class Call(Executor): def __init__(self, code, origin, OG_code = ''): self.code = Code(code = code, origin = self) self.origin = origin self.called = [] self.first_element_called = None self.parameters = []#These are in the order self.OG_code = OG_code self.initialised = False self.emits = [] self.analyse_first_element_called(self.get_code(clean_pipe = True)) #It's important this is last #self.condition = Condition(self) #This method returns all the calls inside a call eg p1(p2(), p3()) returns [p1(p2(), p3()), p2(), p3()] def get_all_calls(self): tab = [] tab.append(self) for param in self.parameters: if(param.get_type()=="Operation"): for o in param.get_origins(): if(o.get_type()=="Call"): tab+=o.get_all_calls() return tab def add_to_emits(self, emitted): self.emits.append(emitted) def get_later_emits(self): return self.emits def __str__(self): return f"Call_{id(self)}" def get_parameters(self): return self.parameters def get_code(self, clean_pipe = False, get_OG=False, remove_emit_and_take = False, replace_calls = False): if(get_OG): if(self.OG_code==''): return self.code.get_code() return self.OG_code if(clean_pipe): code, _ = self.clean_pipe_operator(self.code.get_code()) return code else: return self.code.get_code() def simplify_code(self, new_name = ""): if(self.get_first_element_called().get_type()=="Function"): return self.get_code() else: new_call_name = self.get_first_element_called().get_alias_and_id() code = self.get_code(clean_pipe = True) code = re.sub(fr'{re.escape(self.get_first_element_called().get_alias())} *\(', f'{new_call_name}(', code) if(new_name!=""): code = f"{new_name} = {code}" tag_to_add = "//AREA TO ADD PARAMS" code = f"{tag_to_add}\n{code}" index = 1 for param in self.parameters: param_new_name = f"{self.get_first_element_called().get_alias_and_id()}_param_{index}" #Case the param is a call if(param.get_type()=="Call"): #If it's not a function -> then we rewrite it if(param.get_first_element_called().get_type()!="Function"): temp = code code = replace_group1(code, fr"[^\w]({re.escape(param.get_code(get_OG=True))})", param_new_name) if(temp==code): raise Exception("This souldn't happen") #code = code.replace(param.get_code(get_OG=True), param_new_name) new_bit = param.simplify_code(new_name = param_new_name) temp = code code = code.replace(tag_to_add, f"{tag_to_add}\n{new_bit}") if(temp==code): raise Exception("This souldn't happen") #Case the param is an operation elif(param.get_type()=="Operation"): #If it's an artificial operation -> we don't need to do anything if(not param.get_artificial_status()): temp = code code = replace_group1(code, fr"[^\w]({re.escape(param.get_code(get_OG=True, replace_calls = False))})", param_new_name) if(temp==code): raise Exception("This souldn't happen") #code = code.replace(param.get_code(get_OG=True), param_new_name) lines = param.simplify_code().split('\n') if(len(lines)==1): new_bit = f"{param_new_name} = {lines[0]}" else: head = '\n'.join(lines[:-1]) new_bit = f"{head}\n{param_new_name} = {lines[-1]}" temp = code code = code.replace(tag_to_add, f"{tag_to_add}\n{new_bit}") if(temp==code): raise Exception("This souldn't happen") #Case Channel elif(param.get_type()=="Channel"): raise Exception("This shouldn't happen") None elif(param.get_type()=="Emitted"): temp = code code = replace_group1(code, fr"[^\w]({re.escape(param.get_code(get_OG=True))})", param_new_name) if(temp==code): raise Exception("This souldn't happen") #code = code.replace(param.get_code(get_OG=True), param_new_name) new_bit = f"{param_new_name} = {param.simplify_code()}" temp = code code = code.replace(tag_to_add, f"{tag_to_add}\n{new_bit}") if(temp==code): raise Exception("This souldn't happen") else: raise Exception("This shouldn't happen") index+=1 temp = code code = code.replace(tag_to_add, "").strip() if(temp==code): raise Exception("This souldn't happen") return code def get_type(self): return "Call" def get_first_element_called(self): return self.first_element_called def get_elements_called(self, tab_input = [], first_call = True): tab = tab_input.copy() #if(first_call): # if(tab!=[]): # raise Exception("herer") # tab = [] tab += [self.first_element_called] for para in self.parameters: if(para.get_type()=="Call"): tab = para.get_elements_called(tab_input = tab.copy(), first_call = False) elif(para.get_type()=="Operation"): tab = para.get_elements_called(tab = tab.copy()) temp = list(set(tab)) #del tab return temp def get_code_split_space(self, code): to_add_spaces = ['(', ')', '}', '{'] for character in to_add_spaces: temp = code code = code.replace(f'{character}', f' {character} ') if(temp==code): raise Exception("This shouldn't happen") return code.split() def analye_parameters(self, param): #Step 1 -> get parameters tab_params, start, next_param = [], 0, None temp_param = param while(start!=-1): temp_param = temp_param[start:] next_param, start = get_next_param(temp_param) tab_params.append(next_param.strip()) #Step 2 -> analyse paramters for param in tab_params: analysed_param = False if param!='': #Case it's a channel if(re.fullmatch(constant.WORD, param) and not analysed_param): #if(re.fullmatch(constant.WORD, param) and not analysed_param or param in ['[]'] or param[:7]=="params."): #TODO this needs to be updated to proper formalise how you search for channels channels = self.origin.get_channels_from_name_same_level(param) #if(channels==[]): # channels = self.origin.get_channels_from_name_inside_level(param) #if(channels==[]): # channels = self.origin.get_channels_from_name_above_level(param) #if(channels==[]): # channels = self.origin.get_channels_from_name_other_blocks_on_same_level(param) if(channels==[]): channels = self.origin.get_channels_from_name_all_channels(param) if(channels==[]): from .channel import Channel channel = Channel(name=param, origin=self.origin) self.origin.add_channel(channel) channels = [channel] from .operation import Operation ope = Operation(f"{param}", self) ope.set_as_artificial() for channel in channels: channel.add_sink(self) ope.add_element_origins(channel) ope.set_as_artificial() self.parameters.append(ope) analysed_param = True else: from .executor import Executor executor = Executor(param, self) executor = executor.return_type() if(executor.get_type()=="Call"): temp_call = executor temp_call.initialise() self.parameters.append(temp_call) elif(executor.get_type()=="Operation"): ope = executor ope.initialise_from_call() #Case is an Emitted -> there's only one value given and it's an emitted if(ope.check_if_operation_is_an_full_emitted() and len(ope.get_gives())==1 and ope.get_gives()[0].get_type()=="Emitted"): emit = ope.get_gives()[0] self.parameters.append(emit) else: self.parameters.append(ope) else: raise Exception(f"I don't know what type '{param}' is!") def get_nb_outputs(self): first=self.get_first_element_called() if(first.get_type()=="Process"): return first.get_nb_outputs() elif(first.get_type()=="Subworkflow"): return first.get_nb_emit() raise Exception("This soudn't happen!") def get_structure(self, dico): if(self.get_first_element_called().get_type()=="Process"): process = self.get_first_element_called() #Add process here process.get_structure(dico) def add_parameter(p): #Case parameter is a channel if(p.get_type()=="Channel"): channel = p channel.get_structure(dico, B=process) #Case parameter is a Emitted elif(p.get_type()=="Emitted"): emitted = p emitted.get_structure(dico, B=process) #Case parameter is a Operation elif(p.get_type()=="Operation"): operation = p if(operation.show_in_structure): operation.get_structure(dico) dico["edges"].append({'A':str(operation), 'B':str(process), "label":""}) #Case parameter is a Call elif(p.get_type()=="Call"): call = p call.get_structure(dico) #Case the first call is a process if(call.get_first_element_called().get_type()=="Process"): for output in call.get_first_element_called().get_outputs(): dico["edges"].append({'A':str(call.get_first_element_called()), 'B':str(process), "label":""})#TODO check name of channel #Case the first call is a subworkflow elif(call.get_first_element_called().get_type()=="Subworkflow"): for emit in call.get_first_element_called().get_emit(): dico["edges"].append({'A':str(emit), 'B':str(process), "label":""})#TODO check name of channel else: raise Exception(f"Type '{p.get_type()}' was given as a parameter -> I don't know how to handle this!") #If the name number of parameters are given if(len(self.parameters)==process.get_nb_inputs()): for p in self.parameters: add_parameter(p) #If they are not -> we check that the right number isn't implied else: #TODO this needs to be checked #num_inputs = 0 #for p in self.parameters: # if(p.get_type()=="Call"): # num_inputs+= p.get_nb_outputs() # elif(p.get_type()=="Emitted"): # emitted = p # if(emitted.get_emitted_by().get_type()=="Subworkflow"): # if(emitted.get_emits()==None): # num_inputs+= emitted.get_emitted_by().get_nb_emit() # else: # num_inputs+=1 # elif(emitted.get_emitted_by().get_type()=="Process"): # if(emitted.get_emits()==None): # num_inputs+= emitted.get_emitted_by().get_nb_outputs() # else: # num_inputs+=1 # else: # raise Exception("This shoudn't happen") # else: # #Cause in case channel, operation or emit, it is only one channel given # num_inputs+=1 #if(num_inputs==process.get_nb_inputs()): # for p in self.parameters: # add_parameter(p) # #else: # raise BioFlowInsightError(f"Not the same number of parameters given as input for the process '{process.get_alias()}'{self.get_string_line(self.get_code(get_OG=True))}.", num=2, origin=self) raise BioFlowInsightError(f"Not the same number of parameters given as input for the process '{process.get_alias()}'{self.get_string_line(self.get_code(get_OG=True))}.", num=2, origin=self) elif(self.get_first_element_called().get_type()=="Subworkflow"): sub = self.get_first_element_called() temp_dico = {} temp_dico['nodes'] = [] temp_dico['edges'] = [] temp_dico['subworkflows'] = {} sub.get_structure(temp_dico) dico['subworkflows'][sub.get_printed_name()] = temp_dico param_index = 0 def add_parameter(p, param_index): sub_input = sub.get_takes()[param_index] #Case parameter is a channel if(p.get_type()=="Channel"): channel = p channel.get_structure(dico, B=sub_input) #Case parameter is a Emitted elif(p.get_type()=="Emitted"): emitted = p emitted.get_structure(dico, B=sub_input) #Case parameter is a Operation elif(p.get_type()=="Operation"): operation = p if(operation.show_in_structure): operation.get_structure(dico) dico["edges"].append({'A':str(operation), 'B':str(sub_input), "label":""}) #Case parameter is a Call elif(p.get_type()=="Call"): call = p call.get_structure(dico) #Case the first call is a process if(call.get_first_element_called().get_type()=="Process"): for output in call.get_first_element_called().get_outputs(): dico["edges"].append({'A':str(call.get_first_element_called()), 'B':str(sub_input), "label":""})#TODO check name of channel #Case the first call is a subworkflow elif(call.get_first_element_called().get_type()=="Subworkflow"): for emit in call.get_first_element_called().get_emit(): dico["edges"].append({'A':str(emit), 'B':str(sub_input), "label":""})#TODO check name of channel else: raise Exception(f"Type '{p.get_type()}' was given as a parameter -> I don't know how to handle this!") param_index+=1 return param_index #If the name number of parameters are given if(len(self.parameters)==sub.get_nb_takes()): for p in self.parameters: param_index = add_parameter(p, param_index) ##If they are not -> we check that the right number isn't implied else: #TODO check this raise BioFlowInsightError(f"Not the same number of parameters given as input for the subworklfow '{sub.get_alias()}' in the call{self.get_string_line(self.get_code())}.", num = 2, origin=self) # num_inputs = 0 # for p in self.parameters: # if(p.get_type()=="Call"): # num_inputs+= p.get_nb_outputs() # else: # #Cause in case channel, operation or emit, it is only one channel given # num_inputs+=1 # if(num_inputs==sub.get_nb_takes()): # for p in self.parameters: # param_index = add_parameter(p, param_index ) # # else: # raise BioFlowInsightError(f"Not the same number of parameters given as input for the subworklfow '{sub.get_alias()}' in the call{self.get_string_line(self.get_code())}.", num = 2, origin=self) elif(self.get_first_element_called().get_type()=="Function"): None else: raise Exception(f"This shoudn't happen! is type") def analyse_call(self, call): tab_call = self.get_code_split_space(call) if(re.fullmatch(constant.WORD, tab_call[0]) and tab_call[1]=='('): #params1 = ' '.join(tab_call[2:-1]) start = re.findall(tab_call[0]+constant.END_CALL, call)[0] params = call.replace(start, "") if(params[-1]==')'): params = params[:-1] else: raise Exception("This shouldn't happens") self.analye_parameters(params) # process = self.get_process_from_name(tab_call[0]) # subworkflow = self.get_subworkflow_from_name(tab_call[0]) # fun = self.get_function_from_name(tab_call[0]) # if(process!=None and subworkflow==None and fun==None): # #If the elements need to duplicated -> then we need to duplicate it # if(self.get_duplicate_status()): # process = process.copy() # process.initialise() # self.first_element_called = process # self.origin.add_element_to_elements_being_called(process) # #temp.incremente_number_times_called() # if(process==None and subworkflow!=None and fun==None): # if(self.get_duplicate_status()): # subworkflow = subworkflow.copy() # subworkflow.initialise() # self.first_element_called = subworkflow # self.origin.add_element_to_elements_being_called(subworkflow) # if(process==None and subworkflow==None and fun!=None): # self.first_element_called = fun # if(process==None and subworkflow==None and fun==None): # raise Exception("No first call found!!") # self.called.append(self.first_element_called) #else: # raise BioFlowInsightError(f"Failed to extract the call{self.get_string_line(self.get_code())}. Try rewriting it in a simplified version.", num = 15, origin=self) def analyse_first_element_called(self, call): tab_call = self.get_code_split_space(call) if(re.fullmatch(constant.WORD, tab_call[0]) and tab_call[1]=='('): #params1 = ' '.join(tab_call[2:-1]) start = re.findall(tab_call[0]+constant.END_CALL, call)[0] params = call.replace(start, "") if(params[-1]==')'): params = params[:-1] else: raise Exception("This shouldn't happens") #self.analye_parameters(params) process = self.get_process_from_name(tab_call[0]) subworkflow = self.get_subworkflow_from_name(tab_call[0]) fun = self.get_function_from_name(tab_call[0]) if(process!=None and subworkflow==None and fun==None): #If the elements need to duplicated -> then we need to duplicate it if(self.get_duplicate_status()): temp = process process, num = process.copy() process.set_alias(temp.get_alias()) process.set_printed_name(f"{temp.get_alias()}_{num}") process.initialise() self.first_element_called = process process.add_to_calls(self) self.origin.add_element_to_elements_being_called(process) #temp.incremente_number_times_called() if(process==None and subworkflow!=None and fun==None): if(self.get_duplicate_status()): temp = subworkflow subworkflow, num = subworkflow.copy() subworkflow.set_alias(temp.get_alias()) subworkflow.set_printed_name(f"{temp.get_alias()}_{num}") subworkflow.initialise() self.first_element_called = subworkflow subworkflow.add_to_calls(self) self.origin.add_element_to_elements_being_called(subworkflow) if(process==None and subworkflow==None and fun!=None): self.first_element_called = fun if(process==None and subworkflow==None and fun==None): raise Exception("No first call found!!") self.called.append(self.first_element_called) else: raise BioFlowInsightError(f"Failed to extract the call{self.get_string_line(self.get_code())}. Try rewriting it in a simplified version.", num = 15, origin=self) def get_called(self): tab = self.called for params in self.parameters: if(isinstance(params, Call)): tab += params.get_called() #TODO -> check this tab = list(set(tab)) return tab def write_summary(self, tab=0): file = open(f"{self.get_output_dir()}/debug/calls.nf", "a") file.write(" "*tab+f"{self}"+"\n") file.write(" "*(tab+1)+"* Called "+str(self.get_called())+"\n") file.write(" "*(tab+1)+"* Code : "+ str(self.get_code())+"\n") file.write(" "*(tab+1)+"* Parameters"+"\n") for p in self.parameters: file.write(" "*(tab+3)+p.get_code()+f" '{p.get_type()}'"+"\n") file.write("\n") def add_call_count(self): if(self.get_first_element_called().get_type()=="Process"): process = self.get_first_element_called() with open(f"{self.get_output_dir()}/debug/processes_used.json") as json_file: dict = json.load(json_file) try: a = dict[process.get_file_address()] except: dict[process.get_file_address()] = [] dict[process.get_file_address()].append(process.get_code()) with open(f"{self.get_output_dir()}/debug/processes_used.json", "w") as outfile: json.dump(dict, outfile, indent=4) elif(self.get_first_element_called().get_type()=="Subworkflow"): None #TODO elif(self.get_first_element_called().get_type()=="Function"): None #TODO else: raise Exception(f"I don't know what to do with '{self.get_first_element_called().get_type()}' in the call '{self.get_code()}' (in file ''{self.get_file_address()}'')") def initialise(self): if(not self.initialised): self.initialised = True self.analyse_call(self.get_code(clean_pipe = True)) self.write_summary()