Newer
Older
Fize Jacques
committed
# coding = utf-8
Fize Jacques
committed
from collections import Iterable
import numpy as np
import networkx as nx
import pandas as pd
from networkx.generators.degree_seq import _to_stublist
Fize Jacques
committed
import random
float_epsilon = np.finfo(float).eps
Fize Jacques
committed
def powerlaw(nb_nodes, nb_edges, exponent=2, tries=100, min_deg=1):
"""
Return a degree distribution that fit the power law and specified number of edges and vertices.
Parameters
----------
nb_nodes : int
nb_edges : int
exponent : int
tries : int
min_deg : int
Returns
-------
np.ndarray
degree sequence
"""
Fize Jacques
committed
nb_stubs = nb_edges * 2
# Draw a first time a powerlaw degree sequence
degs = np.round(nx.utils.powerlaw_sequence(nb_nodes, exponent=exponent))
degs = degs[degs >= min_deg]
# Compute de degree sum
sum_deg = degs.sum()
for _ in range(tries):
# If the sum of the degree sequence is equal to the number of stubs, then it's good
if sum_deg == nb_stubs:
return degs
# Draw a a new powerlaw degree sequence
new_draw = np.round(nx.utils.powerlaw_sequence(nb_nodes, exponent=exponent))
new_draw = new_draw[new_draw >= min_deg]
new_sum_deg = new_draw.sum()
# If the new degree sequence is closer to the objective than the previously draw sequence
if abs(nb_stubs - new_sum_deg) < abs(nb_stubs - sum_deg):
degs = new_draw
sum_deg = new_sum_deg
# Once the final draw is executed and the sequence degree sum is not equal to number of stubs expected
if not sum_deg == nb_stubs:
# We randomly pick sequence degrees and increment (or decrement) their values
diff = abs(sum_deg - nb_stubs)
signe = -1 if (nb_stubs - sum_deg) < 0 else 1
indexes = np.random.choice(np.arange(len(degs)), int(diff))
for ind in indexes:
degs[ind] = degs[ind] + signe
Fize Jacques
committed
def get_countries_coords():
"""
Return the coordinates of each country in the world.
Returns
-------
np.ndarray
coordinates
"""
try:
import geopandas as gpd
except:
raise ImportError("Geopandas is not installed !")
gdf = gpd.read_file(gpd.datasets.get_path("naturalearth_lowres"))
Fize Jacques
committed
return np.asarray(gdf.centroid.apply(lambda x: [x.x, x.y]).values.tolist())
def _conf_model(degree_seq):
stubs_list = _to_stublist(degree_seq)
random.shuffle(stubs_list)
register = set()
edges = []
hash_func = lambda x, y: "_".join(sorted([str(x), str(y)]))
tries = 0
while len(stubs_list) > 0 and tries < 100:
to_del = set([])
for i in range(0, len(stubs_list) - 2, 2):
u, v = stubs_list[i], stubs_list[i + 1]
hash_ = hash_func(u, v)
if hash_ in register:
continue
else:
register.add(hash_)
edges.append([u, v])
to_del.add(i)
to_del.add(i + 1)
stubs_list = [stubs_list[i] for i in range(len(stubs_list)) if not i in to_del]
random.shuffle(stubs_list)
tries += 1
G = nx.from_edgelist(edges)
return G
def powerlaw_graph(nb_nodes, nb_edges, exponent=2, tries=1000, min_deg=0):
Generate a graph with a defined number of vertices, edges, and a degree distribution that fit the power law.
Using the Molloy-Reed algorithm to
Parameters
----------
nb_nodes : int
nb_edges : int
exponent : int
tries : int
min_deg : int
Returns
-------
nx.Graph
generated graph
"""
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
G = _conf_model(powerlaw(nb_nodes, nb_edges, exponent, tries, min_deg).astype(int))
tries_ = 0
while len(G) != nb_nodes and tries_ <tries:
G = _conf_model(powerlaw(nb_nodes, nb_edges, exponent, tries, min_deg).astype(int))
tries_ += 1
if len(G) != nb_nodes:
print(nb_nodes,nb_edges,exponent)
raise Exception("Cant compute configuration model based on parameters")
if G.size() != nb_edges:
diff = abs(G.size() - nb_edges)
signe = 1 if G.size() - nb_edges < 0 else -1
if signe:
for n in list(G.nodes()):
if G.size() == nb_edges:
break
for n2 in list(G.nodes()):
if not G.has_edge(n, n2): G.add_edge(n, n2)
if G.size() == nb_edges:
break
else:
edges_ = list(G.edges())
random.shuffle(edges_)
i = diff
for ed in edges_:
u, v = ed[0], ed[1]
if G.degree(u) > 1 and G.degree(v) > 1:
G.remove_edge(u, v)
i -= 1
return G
Fize Jacques
committed
Fize Jacques
committed
def spatial_graph(nb_nodes, nb_edges, coords="country", dist_func=lambda a, b: np.linalg.norm(a - b) ** 2,
self_link=False, weighted=False):
"""
Generate a spatial graph with a specific number of vertices and edges
Parameters
----------
nb_nodes : int
nb_edges : int
coords : array of shape (n,2) or str
if str, possible choice are "random" or "country"
dist_func : callable
self_link : bool
Returns
-------
nx.Graph
generated graph
"""
if nb_nodes > nb_edges:
raise ValueError(
"To obtain a specific nb of nodes, the number of edges must be equal or superior to the number of nodes !")
Fize Jacques
committed
if coords and isinstance(coords, Iterable) and not isinstance(coords, str):
if len(coords) != nb_nodes:
raise ValueError("number of nodes must match the size of the coords dict")
elif coords == "random":
coords = np.random.random(nb_nodes * 2).reshape(nb_nodes, 2)
coords[:, 0] = (coords[:, 0] * 360) - 180
coords[:, 1] = (coords[:, 1] * 180) - 90
else:
coords = get_countries_coords()
if nb_nodes > len(coords):
raise ValueError(
"Too many nodes for coords = \"country\". Change nb_nodes value or change coords to 'random' or your own list of coords")
coords_index = np.random.choice(np.arange(len(coords)), nb_nodes)
coords = coords[coords_index]
data = []
Fize Jacques
committed
for i in range(nb_nodes):
for j in range(nb_nodes):
if i == j and not self_link:
continue
data.append([i, j, 1 / (float_epsilon+(dist_func(coords[i], coords[j])))])
df = pd.DataFrame(data, columns="src tar weight".split()).astype({"src": int, "tar": int})
df["hash"] = df.apply(lambda x: "_".join(sorted([str(int(x.src)), str(int(x.tar))])), axis=1)
df = df.drop_duplicates(subset="hash")
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
df["weight"] = df.weight / df.weight.sum()
iter_ = 0
f = False
best_G = None
new_df = None
while iter_ < 50 and f == False:
new_df = df.sample(n=nb_edges, weights="weight")
if weighted:
G = nx.from_pandas_edgelist(new_df, source="src", target="tar", edge_attr="weight")
else:
G = nx.from_pandas_edgelist(new_df, source="src", target="tar")
f = (len(G) == nb_nodes and G.size() == nb_edges)
if not best_G == None:
if abs(len(best_G) - nb_nodes) > abs(len(G) - nb_nodes):
best_G = G.copy()
else:
best_G = G.copy()
iter_ += 1
G = best_G.copy()
if not len(G) == nb_nodes:
diff = abs(len(G) - nb_nodes)
df_deg = pd.DataFrame(nx.degree(G), columns="node degree".split())
df_deg = df_deg[df_deg.degree > 2].sort_values(by="degree", ascending=False)
i = 1
while 1:
new_df = df_deg.head(i)
if ((new_df.degree) - 1).sum() > diff:
break
i += 1
df_deg = df_deg.head(i)
edges_ = []
for node in df_deg.node.values:
node_edges = list(G.edges(node))
random.shuffle(node_edges)
edges_.extend([[ed[0], ed[1]] for ed in node_edges[1:]])
idx = np.random.choice(np.arange(len(edges_)), size=diff, replace=False)
edges_ = np.array(edges_)[idx]
G.remove_edges_from(edges_)
missing_nodes = list(set(range(nb_nodes)) - set(list(G.nodes())))
for node in missing_nodes:
new_df = df[(df.src == node) | (df.tar == node)].sample(n=1, weights="weight")
edges = new_df["src tar".split()].values
G.add_edges_from(edges)
for n in list(G.nodes()):
G.nodes[n]["pos"] = coords[n]
Fize Jacques
committed
return G
Fize Jacques
committed
def ER_graph(nb_nodes, nb_edges):
"""
Generate a random graph with a specific nb of nodes and edges.
Parameters
----------
nb_nodes : int
nb_edges : int
Returns
-------
nx.Graph
generated graph
"""
Fize Jacques
committed
return nx.dense_gnm_random_graph(nb_nodes, nb_edges)
Fize Jacques
committed
Fize Jacques
committed
def stochastic_block_model_graph(nb_nodes, nb_edges, nb_com, percentage_edge_betw, verbose=False):
"""
Generate a stochastic block model graph with defined number of vertices and edges.
Parameters
----------
nb_nodes : int
nb_edges : int
nb_com : int
percentage_edge_betw : float
verbose : bool
Returns
-------
nx.Graph
generated graph
"""
Fize Jacques
committed
if nb_nodes % nb_com != 0:
raise ValueError("Modulo between the number of nodes and community must be equal to 0")
edge_max = (1 / nb_com) * ((nb_nodes * (nb_nodes - 1)) / 2)
if nb_edges > edge_max:
raise ValueError("nb_edges must be inferior to {0}".format(edge_max))
Fize Jacques
committed
def nb_of_pair(N):
return (N*(N-1))/2
Fize Jacques
committed
G = nx.planted_partition_graph(nb_com, nb_nodes // nb_com, 1, 1)
block_assign = nx.get_node_attributes(G, "block")
b_assign_array = np.asarray(list(nx.get_node_attributes(G,"block").values()))
Fize Jacques
committed
if verbose:
print(G.size())
u_in = sum([nb_of_pair((b_assign_array==b).sum()) for b in range(nb_com)])
u_out = nb_of_pair(len(G)) - u_in
l_in = nb_edges - l_out
if verbose:
print("u_in",u_in)
print("l_out",l_out)
print("l_in", l_in)
inter_edges, intra_edges = get_inter_intra_edges(G,G.is_directed())
Fize Jacques
committed
inter_edges = np.asarray(inter_edges)
intra_edges = np.asarray(intra_edges)
inter_N, intra_N = len(inter_edges), len(intra_edges)
probs_inter = np.ones(inter_N) * p_out
probs_intra = np.ones(intra_N) * p_in
all_edges = np.concatenate((inter_edges, intra_edges))
all_probs = np.concatenate((probs_inter, probs_intra))
del probs_inter
del probs_intra
all_probs /= all_probs.sum()
Fize Jacques
committed
if verbose:
print(inter_N, intra_N)
final_edges = []
index_selected_pairs = np.random.choice(np.arange(len(all_edges)), nb_edges, p=all_probs, replace=False)
final_edges.extend(all_edges[index_selected_pairs])
Fize Jacques
committed
Fize Jacques
committed
if verbose:
print(len(final_edges))
G2 = nx.from_edgelist(final_edges)
Fize Jacques
committed
if len(G2) != nb_nodes:
equilibrate(G2, nb_nodes, percentage_edge_betw, 1-percentage_edge_betw, inter_edges, intra_edges, block_assign)
Fize Jacques
committed
Fize Jacques
committed
for n in list(G2.nodes()):
G2.nodes[n]["block"] = block_assign[n]
return G2
Fize Jacques
committed
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def equilibrate(G, nb_nodes, percentage_edge_betw, percentage_edge_within, inter_edges, intra_edges, block_assign):
"""
Sometimes the generated graph from the stochastic block model have some missing nodes due to the sampling method.
This function fix this issue.
Parameters
----------
G
nb_nodes
percentage_edge_betw
percentage_edge_within
inter_edges
intra_edges
block_assign
Returns
-------
"""
diff = nb_nodes - len(G)
nodes_missing = list(set(list(range(nb_nodes))) - set(list(G.nodes())))
nb_edge_inter = int(np.round(len(nodes_missing) * percentage_edge_betw))
nodes_inter = np.random.choice(nodes_missing, nb_edge_inter, replace=False)
nodes_intra = list(set(nodes_missing) - set(nodes_inter))
def draw_(array, register, hash_func=lambda x, y: "_".join(sorted([str(x), str(y)]))):
tries = 0
while tries < 1000:
Fize Jacques
committed
index_array = np.random.choice(np.arange(len(array)), 1)
res = array[index_array]
res = res[0]
hash_ = hash_func(res[0], res[1])
if not hash_ in register:
register.add(hash_)
return index_array
tries += 1
Fize Jacques
committed
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
raise Exception("Error ! (TODO)")
# Draw new edges
new_edges = []
register = set([])
for node in nodes_inter:
mask_inter = np.isin(inter_edges, node).sum(axis=1).astype(bool)
index_inter = draw_(inter_edges[mask_inter], register)
new_edges.extend(inter_edges[mask_inter][index_inter])
for node in nodes_intra:
mask_intra = np.isin(intra_edges, node).sum(axis=1).astype(bool)
index_intra = draw_(intra_edges[mask_intra], register)
new_edges.extend(intra_edges[mask_intra][index_intra])
# Draw existing edge to delete
edge_to_delete = []
inter_edges_c, intra_edges_c = [], []
for ed in list(G.edges()):
if G.degree(ed[0]) > 1 and G.degree(ed[1]) > 1: # We don't want the nodes edge to be removed with the
# disappearance of the edge
if block_assign[ed[0]] != block_assign[ed[1]]:
inter_edges_c.append([ed[0], ed[1]])
else:
intra_edges_c.append([ed[0], ed[1]])
index_inter = np.random.choice(np.arange(len(inter_edges_c)), int(np.round(diff * percentage_edge_betw)),
replace=False)
index_intra = np.random.choice(np.arange(len(intra_edges_c)), int(np.round(diff * percentage_edge_within)),
replace=False)
edge_to_delete.extend(np.asarray(inter_edges_c)[index_inter])
edge_to_delete.extend(np.asarray(intra_edges_c)[index_intra])
for ed in edge_to_delete:
G.remove_edge(*ed)
for ed in new_edges:
G.add_edge(*ed)
return G
def add_partitions_G(G, nb_com=5):
blocks = np.random.choice(np.arange(nb_com), len(G), p=[1 / nb_com] * nb_com)
for ix, node in enumerate(list(G.nodes())):
try:
G.nodes[node]["block"] = blocks[ix]
except KeyError:
G.nodes[node]["block"] = 0
return G
def get_inter_intra_edges(G, directed=False):
block_assign = nx.get_node_attributes(G, "block")
assert (len(block_assign) ==len(G))
inter_edges, intra_edges = [], []
register = set([])
for n1 in list(G.nodes()):
for n2 in list(G.nodes()):
if directed:
hash_ = "_".join([str(n1), str(n2)])
else:
hash_ = "_".join(sorted([str(n1), str(n2)]))
if (n1 == n2) or (hash_ in register):
continue
b1, b2 = block_assign[n1], block_assign[n2]
if b1 != b2:
inter_edges.append([n1, n2])
else:
intra_edges.append([n1, n2])
register.add(hash_)
return inter_edges,intra_edges
def mixed_model_spat_sbm(nb_nodes, nb_edges, nb_com, alpha, percentage_edge_betw=0.01,
dist_func=lambda p1, p2: np.linalg.norm(p1 - p2) ** 2):
G = spatial_graph(nb_nodes, nb_edges, coords="random", dist_func=dist_func)
G = add_partitions_G(G, nb_com)
nb_edges = G.size()
def nb_of_pair(N):
return (N * (N - 1)) / 2
block_assign = nx.get_node_attributes(G, "block")
assert (len(block_assign) == len(G))
b_assign_array = np.asarray(list(block_assign.values()))
u_in = sum([nb_of_pair((b_assign_array == b).sum()) for b in range(np.max(b_assign_array))])
u_out = nb_of_pair(len(G)) - u_in
l_out = nb_edges * percentage_edge_betw
p_out = l_out / u_out
l_in = nb_edges - l_out
p_in = l_in / u_in
inter_edges, intra_edges = get_inter_intra_edges(G,G.is_directed())
inter_N, intra_N = len(inter_edges), len(intra_edges)
probs_sbm_inter = np.ones(inter_N) * p_out
probs_sbm_intra = np.ones(intra_N) * p_in
all_edges = np.concatenate((inter_edges, intra_edges))
all_probs_sbm = np.concatenate((probs_sbm_inter, probs_sbm_intra))
all_probs_sbm /= all_probs_sbm.sum()
all_probs_spa = np.asarray([1 / (float_epsilon + dist_func(pos[edge[0]], pos[edge[1]])) for edge in all_edges])
#all_probs = alpha * (all_probs_sbm) + (1 - alpha) * all_probs_spa
nb_edges_sbm,nb_edges_spa = round(alpha*nb_edges),round((1-alpha)*nb_edges)
index_selected_pairs_sbm = np.random.choice(np.arange(len(all_edges)), nb_edges_sbm, p=all_probs_sbm, replace=False)
final_edges.extend(all_edges[index_selected_pairs_sbm])
all_probs_spa[index_selected_pairs_sbm] = all_probs_spa.min()
all_probs_spa/= all_probs_spa.sum()
index_selected_pairs_spa = np.random.choice(np.arange(len(all_edges)), nb_edges_spa, p=all_probs_spa, replace=False)
final_edges.extend(all_edges[index_selected_pairs_spa])
G2 = nx.from_edgelist(final_edges)
for n in list(G2.nodes()):
G2.nodes[n]["block"] = block_assign[n]
G2.nodes[n]["pos"] = G.nodes[n]["pos"]
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
def get_sbm_probs(G, percentage_edge_betw, verbose=False):
hash_func = lambda x: "_".join(sorted([str(x[0]), str(x[1])]))
def nb_of_pair(N):
return (N*(N-1))/2
block_assign = nx.get_node_attributes(G, "block")
nb_com = len(set(block_assign.values()))
nb_nodes=len(G)
nb_edges = G.size()
b_assign_array = np.asarray(list(nx.get_node_attributes(G,"block").values()))
u_in = sum([nb_of_pair((b_assign_array==b).sum()) for b in range(nb_com)])
u_out = nb_of_pair(len(G)) - u_in
l_out = nb_edges*percentage_edge_betw
p_out = l_out/u_out
l_in = nb_edges - l_out
p_in = l_in / u_in
inter_edges, intra_edges = get_inter_intra_edges(G,G.is_directed())
inter_edges = np.asarray(inter_edges)
intra_edges = np.asarray(intra_edges)
inter_N, intra_N = len(inter_edges), len(intra_edges)
probs_inter = np.ones(inter_N) * p_out
probs_intra = np.ones(intra_N) * p_in
all_edges = np.concatenate((inter_edges, intra_edges))
all_probs = np.concatenate((probs_inter, probs_intra))
del probs_inter
del probs_intra
return all_edges,all_probs
def get_spat_probs(G,dist = lambda a,b : np.linalg.norm(a-b)**2):
hash_func = lambda x: "_".join(sorted([str(x[0]), str(x[1])]))
pos = nx.get_node_attributes(G, "pos")
spat_model = lambda u, v: 1 / (float_epsilon + dist(pos[u], pos[v]))
register = set([])
edges, probs = [], []
for n1 in list(G.nodes()):
for n2 in list(G.nodes()):
if n1 != n2 and hash_func((n1, n2)) not in register:
edges.append([n1, n2])
probs.append(spat_model(n1, n2))
register.add(hash_func((n1, n2)))
return edges, probs