Skip to content
Snippets Groups Projects
outils_graph.py 10.47 KiB
import graphviz
import copy
import numpy as np

process_id = "<src.process.Process"
operation_id = "<src.operation.Operation"

def is_process(node_id):
    if(node_id[:len(process_id)]==process_id):
        return True
    return False

def is_operation(node_id):
    if(node_id[:len(operation_id)]==operation_id):
        return True
    return False

def fill_dot(dot, dico, label_node = True, label_edge = True):
    for n in dico["nodes"]:
        if(label_node):
            dot.node(n["id"], n["name"], shape=n["shape"], xlabel= n["xlabel"], fillcolor=n["fillcolor"])
        else:
            dot.node(n["id"], n["name"], shape=n["shape"], fillcolor=n["fillcolor"])
    for e in dico["edges"]:
        if(label_edge):
            dot.edge(e['A'], e['B'], label= e['label'])
        else:
            dot.edge(e['A'], e['B'])
    for sub in dico["subworkflows"]:
        with dot.subgraph(name="cluster"+sub) as c:
            fill_dot(c, dico["subworkflows"][sub], label_node, label_edge)
            c.attr(label=sub)

def generate_graph_dot(filename, dico, label_node = True, label_edge = True, render_graphs = True):
    dot = graphviz.Digraph(filename=filename, format='png', comment="temp")
    fill_dot(dot, dico, label_node, label_edge)
    dot.save(filename=f'{filename}.dot')
    if(render_graphs):
        dot.render(filename=f'{filename}.dot', outfile=f'{filename}.png')

def generate_graph_mermaid(filename, dico, label_node = True, label_edge = True, render_graphs = True):
    txt= "graph TB;\n"
 
    def get_id(txt):
        import re
        for match in re.finditer(r"object at (\w+)>", txt):
            return match.group(1)
    def get_graph_wo_operations_mermaid_temp(dico, txt, count):
        count+=1
        for node in dico["nodes"]:
            tab= count*"\t"
            if(node['name']==''):
                if(label_node):
                    txt+=f"{tab}{get_id(node['id'])}(({node['xlabel']}));\n"
                else:
                    txt+=f"{tab}{get_id(node['id'])}(({' '}));\n"
            else:
                txt+=f"{tab}{get_id(node['id'])}({node['name']});\n"
        
        for edge in dico["edges"]:
            tab= count*"\t"
            if(label_edge):
                txt+=f"{tab}{get_id(edge['A'])}--{edge['label']}-->{get_id(edge['B'])};\n"
            else:
                txt+=f"{tab}{get_id(edge['A'])}-->{get_id(edge['B'])};\n"
        for subworkflow in dico["subworkflows"]:
            tab= count*"\t"
            txt += f"{tab}subgraph {subworkflow}\n{tab}\tdirection TB;\n"
            count+=1
            txt = get_graph_wo_operations_mermaid_temp(dico["subworkflows"][subworkflow], txt, count)
            count-=1
            txt += f"{tab}end\n"
        return txt
    txt = get_graph_wo_operations_mermaid_temp(dico, txt, 0)

    with open(f"{filename}.md", "w") as text_file:
        text_file.write(txt)



def generate_graph(filename, dico, label_node = True, label_edge = True, render_graphs = True, dot = True, mermaid = True):
    if(dot):
        generate_graph_dot(filename, dico, label_node, label_edge, render_graphs)
    if(mermaid):
        generate_graph_mermaid(filename, dico, label_node, label_edge, render_graphs)


#Function that merges to dictionnaries
def merge(x, y):
    return { key:list(set(x.get(key,[])+y.get(key,[]))) for key in set(list(x.keys())+list(y.keys())) }

#This function returns a listof the orphan operations in the graph
def get_id_orphan_operation(graph):
    id_operations = []

    def get_id_operations(graph):
        for node in graph['nodes']:
            if(is_operation(node['id'])):
                id_operations.append(node['id'])
        for subworkflow in graph["subworkflows"]:
            get_id_operations(graph["subworkflows"][subworkflow])
    
    def get_dico_operation_is_linked(graph, dico_operation_is_linked = {}):
        #First call
        if(dico_operation_is_linked == {}):
            for id in id_operations:
                dico_operation_is_linked[id] = False
        for edge in graph["edges"]:
            dico_operation_is_linked[edge["A"]] = True
            dico_operation_is_linked[edge["B"]] = True
        for subworkflow in graph["subworkflows"]:
            get_dico_operation_is_linked(graph["subworkflows"][subworkflow], dico_operation_is_linked)
        return dico_operation_is_linked
    

    get_id_operations(graph)
    dico = get_dico_operation_is_linked(graph)
    tab = []
    for operation_id in dico:
        if(not dico[operation_id]):
            tab.append(operation_id)
    return tab

def graph_dico_wo_orphan_operations(graph_tmp):
    graph = copy.deepcopy(graph_tmp)
    orphans = get_id_orphan_operation(graph)

    def remove_orphans(graph, orphans):
        to_remove = []
        for node in graph["nodes"]:
            if(node["id"] in orphans):
                to_remove.append(node)
        for r in to_remove:
            try:
                graph["nodes"].remove(r)
            except:
                None
        for subworkflow in graph["subworkflows"]:
            remove_orphans(graph["subworkflows"][subworkflow], orphans)
    remove_orphans(graph, orphans)
    return graph

#Function that returns the type of a given node
def get_type_node(node):
    if(is_process(node['id'])):
        return "Process"
    else:
        if(node["fillcolor"]=="white"):
            return "Branch Operation"
        else:
            return "Create Operation"

#Function that creates the link dico from a given graph dico      
def initia_link_dico_rec(dico):
    links = {}
    for node in dico['nodes']:
        try:
            temp = links[node['id']]
        except:
            links[node['id']] = []
    for edge in dico['edges']:
        A = edge['A']
        B = edge['B']
        try:
            temp = links[A]
        except:
            links[A] = []
        links[A].append(B)
    
    for sub in dico['subworkflows']:
        links = merge(links, initia_link_dico_rec(dico['subworkflows'][sub]))
    return links





#Returns the number of cycles in a graph (rootes with "Source" and "Sink")
#The input parameter is a links dico
#https://en.wikipedia.org/wiki/Cycle_(graph_theory)#Algorithm
def get_number_cycles(links):
    dico_nb_cycles = {'nb':0}
    dfs_dico = {}
    for node in links:
        dfs_dico[node] = {}
        dfs_dico[node]['visited'] = False
        dfs_dico[node]['finished'] = False

    edges_create_cycles = []

    def DFS(mother):
        if(dfs_dico[mother]["finished"]):
            return 
        if(dfs_dico[mother]["visited"]):
            dico_nb_cycles["nb"]+=1
            return "found cycle"
        dfs_dico[mother]["visited"] = True
        for daughter in links[mother]:
            _ = DFS(daughter)
            if(_ == "found cycle"):
                edges_create_cycles.append((mother, daughter))
        dfs_dico[mother]["finished"] = True

    for node in links:
        DFS(node)
    return dico_nb_cycles['nb'], edges_create_cycles


#https://en.wikipedia.org/wiki/Topological_sorting#Depth-first_search
def topological_sort(graph):
    L = []  # Empty list that will contain the sorted nodes
    temporary_marks = set()
    permanent_marks = set()

    def visit(node):
        if node in permanent_marks:
            return
        
        if node in temporary_marks:
            None
            #raise ValueError("Graph has at least one cycle")
        else:

            temporary_marks.add(node)

            for neighbor in graph.get(node, []):
                visit(neighbor)

            temporary_marks.remove(node)
            permanent_marks.add(node)
            L.insert(0, node)  # add node to head of L

    while set(graph.keys()) - permanent_marks:
        node = (set(graph.keys()) - permanent_marks).pop()
        visit(node)

    return L

#A variant of this answer https://stackoverflow.com/a/5164820
def get_number_paths_source_2_sink(graph):
    topo_sort  = topological_sort(graph)

    dict_paths_from_node_2_sink = {}
    for node in topo_sort:
        dict_paths_from_node_2_sink[node] = 1

    for i in range(len(topo_sort)-2, -1, -1):
        sum= 0
        for y in range(i+1, len(topo_sort)):
            sum += graph[topo_sort[i]].count(topo_sort[y])*dict_paths_from_node_2_sink[topo_sort[y]]
        dict_paths_from_node_2_sink[topo_sort[i]] = sum

    return dict_paths_from_node_2_sink["source"]


#For the shortest path
#https://en.wikipedia.org/wiki/Dijkstra%27s_algorithm#Pseudocode
def dijkstra(graph):
    dist, prev = {}, {}
    Q = []
    for node in graph:
        dist[node] = np.Infinity
        prev[node] = None
        Q.append(node)
    dist['source'] = 0

    def get_node_in_Q_min_dist():
        min, node_min = dist[Q[0]], Q[0]
        for node in Q:
            if(min>dist[node]):
                min, node_min = dist[node], node
        return node_min

    while(len(Q)>0):
        u = get_node_in_Q_min_dist()
        Q.remove(u)
        for v in graph[u]:
            if(v in Q):
                alt = dist[u] + 1
                if(alt<dist[v]):
                    dist[v] = alt 
                    prev[v] = u
    return dist["sink"]

#https://www.geeksforgeeks.org/find-longest-path-directed-acyclic-graph/
def get_longest_distance(graph):
    dist = {}
    for node in graph:
        dist[node] = -np.Infinity
    dist["source"] = 0
    topo = topological_sort(graph)
    for u in topo:
        for v in graph[u]:
            if(dist[v]<dist[u]+1):
                dist[v] = dist[u]+1
    return dist["sink"]

##Returns the of paths, the longest and the shortes (not counting the source and sink)
#def get_paths(links):
#    PATHS = []
#    shortest_path = {"nb":0}
#    longest_path = {"nb":0}
#    nb_paths = {"nb":0}
#    
#    def get_paths_temp(links, mother, path_temp):
#        path = path_temp.copy()
#        path.append(mother)
#        if(mother=="Sink"):
#            nb_paths["nb"]+=1
#            if(shortest_path["nb"]==0):
#                shortest_path["nb"] = len(path)
#            if(longest_path["nb"]==0):
#                longest_path["nb"] = len(path)
#            if(longest_path["nb"]<len(path)):
#                longest_path["nb"]=len(path)
#            if(shortest_path["nb"]>len(path)):
#                shortest_path["nb"]=len(path)
#            return
#        for daughter in links[mother]:
#            if(daughter!=mother):
#                if(daughter not in path):
#                    get_paths_temp(links, daughter, path)
#
#
#    get_paths_temp(links, "Source", [])
#    number_paths_source_2_sink = nb_paths["nb"]
#    longest_path = longest_path["nb"]
#    smallest_path = shortest_path["nb"]
#
#    return number_paths_source_2_sink, longest_path, smallest_path