-
George Marchment authored3b1a906d
outils_graph.py 10.47 KiB
import graphviz
import copy
import numpy as np
process_id = "<src.process.Process"
operation_id = "<src.operation.Operation"
def is_process(node_id):
if(node_id[:len(process_id)]==process_id):
return True
return False
def is_operation(node_id):
if(node_id[:len(operation_id)]==operation_id):
return True
return False
def fill_dot(dot, dico, label_node = True, label_edge = True):
for n in dico["nodes"]:
if(label_node):
dot.node(n["id"], n["name"], shape=n["shape"], xlabel= n["xlabel"], fillcolor=n["fillcolor"])
else:
dot.node(n["id"], n["name"], shape=n["shape"], fillcolor=n["fillcolor"])
for e in dico["edges"]:
if(label_edge):
dot.edge(e['A'], e['B'], label= e['label'])
else:
dot.edge(e['A'], e['B'])
for sub in dico["subworkflows"]:
with dot.subgraph(name="cluster"+sub) as c:
fill_dot(c, dico["subworkflows"][sub], label_node, label_edge)
c.attr(label=sub)
def generate_graph_dot(filename, dico, label_node = True, label_edge = True, render_graphs = True):
dot = graphviz.Digraph(filename=filename, format='png', comment="temp")
fill_dot(dot, dico, label_node, label_edge)
dot.save(filename=f'{filename}.dot')
if(render_graphs):
dot.render(filename=f'{filename}.dot', outfile=f'{filename}.png')
def generate_graph_mermaid(filename, dico, label_node = True, label_edge = True, render_graphs = True):
txt= "graph TB;\n"
def get_id(txt):
import re
for match in re.finditer(r"object at (\w+)>", txt):
return match.group(1)
def get_graph_wo_operations_mermaid_temp(dico, txt, count):
count+=1
for node in dico["nodes"]:
tab= count*"\t"
if(node['name']==''):
if(label_node):
txt+=f"{tab}{get_id(node['id'])}(({node['xlabel']}));\n"
else:
txt+=f"{tab}{get_id(node['id'])}(({' '}));\n"
else:
txt+=f"{tab}{get_id(node['id'])}({node['name']});\n"
for edge in dico["edges"]:
tab= count*"\t"
if(label_edge):
txt+=f"{tab}{get_id(edge['A'])}--{edge['label']}-->{get_id(edge['B'])};\n"
else:
txt+=f"{tab}{get_id(edge['A'])}-->{get_id(edge['B'])};\n"
for subworkflow in dico["subworkflows"]:
tab= count*"\t"
txt += f"{tab}subgraph {subworkflow}\n{tab}\tdirection TB;\n"
count+=1
txt = get_graph_wo_operations_mermaid_temp(dico["subworkflows"][subworkflow], txt, count)
count-=1
txt += f"{tab}end\n"
return txt
txt = get_graph_wo_operations_mermaid_temp(dico, txt, 0)
with open(f"{filename}.md", "w") as text_file:
text_file.write(txt)
def generate_graph(filename, dico, label_node = True, label_edge = True, render_graphs = True, dot = True, mermaid = True):
if(dot):
generate_graph_dot(filename, dico, label_node, label_edge, render_graphs)
if(mermaid):
generate_graph_mermaid(filename, dico, label_node, label_edge, render_graphs)
#Function that merges to dictionnaries
def merge(x, y):
return { key:list(set(x.get(key,[])+y.get(key,[]))) for key in set(list(x.keys())+list(y.keys())) }
#This function returns a listof the orphan operations in the graph
def get_id_orphan_operation(graph):
id_operations = []
def get_id_operations(graph):
for node in graph['nodes']:
if(is_operation(node['id'])):
id_operations.append(node['id'])
for subworkflow in graph["subworkflows"]:
get_id_operations(graph["subworkflows"][subworkflow])
def get_dico_operation_is_linked(graph, dico_operation_is_linked = {}):
#First call
if(dico_operation_is_linked == {}):
for id in id_operations:
dico_operation_is_linked[id] = False
for edge in graph["edges"]:
dico_operation_is_linked[edge["A"]] = True
dico_operation_is_linked[edge["B"]] = True
for subworkflow in graph["subworkflows"]:
get_dico_operation_is_linked(graph["subworkflows"][subworkflow], dico_operation_is_linked)
return dico_operation_is_linked
get_id_operations(graph)
dico = get_dico_operation_is_linked(graph)
tab = []
for operation_id in dico:
if(not dico[operation_id]):
tab.append(operation_id)
return tab
def graph_dico_wo_orphan_operations(graph_tmp):
graph = copy.deepcopy(graph_tmp)
orphans = get_id_orphan_operation(graph)
def remove_orphans(graph, orphans):
to_remove = []
for node in graph["nodes"]:
if(node["id"] in orphans):
to_remove.append(node)
for r in to_remove:
try:
graph["nodes"].remove(r)
except:
None
for subworkflow in graph["subworkflows"]:
remove_orphans(graph["subworkflows"][subworkflow], orphans)
remove_orphans(graph, orphans)
return graph
#Function that returns the type of a given node
def get_type_node(node):
if(is_process(node['id'])):
return "Process"
else:
if(node["fillcolor"]=="white"):
return "Branch Operation"
else:
return "Create Operation"
#Function that creates the link dico from a given graph dico
def initia_link_dico_rec(dico):
links = {}
for node in dico['nodes']:
try:
temp = links[node['id']]
except:
links[node['id']] = []
for edge in dico['edges']:
A = edge['A']
B = edge['B']
try:
temp = links[A]
except:
links[A] = []
links[A].append(B)
for sub in dico['subworkflows']:
links = merge(links, initia_link_dico_rec(dico['subworkflows'][sub]))
return links
#Returns the number of cycles in a graph (rootes with "Source" and "Sink")
#The input parameter is a links dico
#https://en.wikipedia.org/wiki/Cycle_(graph_theory)#Algorithm
def get_number_cycles(links):
dico_nb_cycles = {'nb':0}
dfs_dico = {}
for node in links:
dfs_dico[node] = {}
dfs_dico[node]['visited'] = False
dfs_dico[node]['finished'] = False
edges_create_cycles = []
def DFS(mother):
if(dfs_dico[mother]["finished"]):
return
if(dfs_dico[mother]["visited"]):
dico_nb_cycles["nb"]+=1
return "found cycle"
dfs_dico[mother]["visited"] = True
for daughter in links[mother]:
_ = DFS(daughter)
if(_ == "found cycle"):
edges_create_cycles.append((mother, daughter))
dfs_dico[mother]["finished"] = True
for node in links:
DFS(node)
return dico_nb_cycles['nb'], edges_create_cycles
#https://en.wikipedia.org/wiki/Topological_sorting#Depth-first_search
def topological_sort(graph):
L = [] # Empty list that will contain the sorted nodes
temporary_marks = set()
permanent_marks = set()
def visit(node):
if node in permanent_marks:
return
if node in temporary_marks:
None
#raise ValueError("Graph has at least one cycle")
else:
temporary_marks.add(node)
for neighbor in graph.get(node, []):
visit(neighbor)
temporary_marks.remove(node)
permanent_marks.add(node)
L.insert(0, node) # add node to head of L
while set(graph.keys()) - permanent_marks:
node = (set(graph.keys()) - permanent_marks).pop()
visit(node)
return L
#A variant of this answer https://stackoverflow.com/a/5164820
def get_number_paths_source_2_sink(graph):
topo_sort = topological_sort(graph)
dict_paths_from_node_2_sink = {}
for node in topo_sort:
dict_paths_from_node_2_sink[node] = 1
for i in range(len(topo_sort)-2, -1, -1):
sum= 0
for y in range(i+1, len(topo_sort)):
sum += graph[topo_sort[i]].count(topo_sort[y])*dict_paths_from_node_2_sink[topo_sort[y]]
dict_paths_from_node_2_sink[topo_sort[i]] = sum
return dict_paths_from_node_2_sink["source"]
#For the shortest path
#https://en.wikipedia.org/wiki/Dijkstra%27s_algorithm#Pseudocode
def dijkstra(graph):
dist, prev = {}, {}
Q = []
for node in graph:
dist[node] = np.Infinity
prev[node] = None
Q.append(node)
dist['source'] = 0
def get_node_in_Q_min_dist():
min, node_min = dist[Q[0]], Q[0]
for node in Q:
if(min>dist[node]):
min, node_min = dist[node], node
return node_min
while(len(Q)>0):
u = get_node_in_Q_min_dist()
Q.remove(u)
for v in graph[u]:
if(v in Q):
alt = dist[u] + 1
if(alt<dist[v]):
dist[v] = alt
prev[v] = u
return dist["sink"]
#https://www.geeksforgeeks.org/find-longest-path-directed-acyclic-graph/
def get_longest_distance(graph):
dist = {}
for node in graph:
dist[node] = -np.Infinity
dist["source"] = 0
topo = topological_sort(graph)
for u in topo:
for v in graph[u]:
if(dist[v]<dist[u]+1):
dist[v] = dist[u]+1
return dist["sink"]
##Returns the of paths, the longest and the shortes (not counting the source and sink)
#def get_paths(links):
# PATHS = []
# shortest_path = {"nb":0}
# longest_path = {"nb":0}
# nb_paths = {"nb":0}
#
# def get_paths_temp(links, mother, path_temp):
# path = path_temp.copy()
# path.append(mother)
# if(mother=="Sink"):
# nb_paths["nb"]+=1
# if(shortest_path["nb"]==0):
# shortest_path["nb"] = len(path)
# if(longest_path["nb"]==0):
# longest_path["nb"] = len(path)
# if(longest_path["nb"]<len(path)):
# longest_path["nb"]=len(path)
# if(shortest_path["nb"]>len(path)):
# shortest_path["nb"]=len(path)
# return
# for daughter in links[mother]:
# if(daughter!=mother):
# if(daughter not in path):
# get_paths_temp(links, daughter, path)
#
#
# get_paths_temp(links, "Source", [])
# number_paths_source_2_sink = nb_paths["nb"]
# longest_path = longest_path["nb"]
# smallest_path = shortest_path["nb"]
#
# return number_paths_source_2_sink, longest_path, smallest_path