Newer
Older
Fize Jacques
committed
# coding = utf-8
from collections import Iterable
import numpy as np
import networkx as nx
import pandas as pd
import random
def powerlaw(nb_nodes, nb_edges, exponent=2, tries=100, min_deg=1):
"""
Return a degree distribution that fit the power law and specified number of edges and vertices.
Parameters
----------
nb_nodes : int
nb_edges : int
exponent : int
tries : int
min_deg : int
Returns
-------
np.ndarray
degree sequence
"""
Fize Jacques
committed
nb_stubs = nb_edges * 2
# Draw a first time a powerlaw degree sequence
degs = np.round(nx.utils.powerlaw_sequence(nb_nodes, exponent=exponent))
degs = degs[degs >= min_deg]
# Compute de degree sum
sum_deg = degs.sum()
for _ in range(tries):
# If the sum of the degree sequence is equal to the number of stubs, then it's good
if sum_deg == nb_stubs:
return degs
# Draw a a new powerlaw degree sequence
new_draw = np.round(nx.utils.powerlaw_sequence(nb_nodes, exponent=exponent))
new_draw = new_draw[new_draw >= min_deg]
new_sum_deg = new_draw.sum()
# If the new degree sequence is closer to the objective than the previously draw sequence
if abs(nb_stubs - new_sum_deg) < abs(nb_stubs - sum_deg):
degs = new_draw
sum_deg = new_sum_deg
# Once the final draw is executed and the sequence degree sum is not equal to number of stubs expected
if not sum_deg == nb_stubs:
# We randomly pick sequence degrees and increment (or decrement) their values
diff = abs(sum_deg - nb_stubs)
signe = -1 if (nb_stubs - sum_deg) < 0 else 1
indexes = np.random.choice(np.arange(len(degs)), int(diff))
for ind in indexes:
degs[ind] = degs[ind] + signe
Fize Jacques
committed
def get_countries_coords():
"""
Return the coordinates of each country in the world.
Returns
-------
np.ndarray
coordinates
"""
try:
import geopandas as gpd
except:
raise ImportError("Geopandas is not installed !")
gdf = gpd.read_file(gpd.datasets.get_path("naturalearth_lowres"))
Fize Jacques
committed
return np.asarray(gdf.centroid.apply(lambda x: [x.x, x.y]).values.tolist())
def powerlaw_graph(nb_nodes, nb_edges, exponent=2, tries=100, min_deg=1):
"""
Generate a graph with a definied number of vertices, edges, and a degree distribution that fit the power law.
Parameters
----------
nb_nodes : int
nb_edges : int
exponent : int
tries : int
min_deg : int
Returns
-------
nx.Graph
generated graph
"""
seq = powerlaw(nb_nodes, nb_edges, exponent, tries, min_deg)
return nx.configuration_model(seq.astype(int))
Fize Jacques
committed
Fize Jacques
committed
Fize Jacques
committed
def spatial_graph(nb_nodes, nb_edges, coords="country", dist_func=lambda a, b: np.linalg.norm(a - b), self_link=False):
"""
Generate a spatial graph with a specific number of vertices and edges
Parameters
----------
nb_nodes : int
nb_edges : int
coords : array of shape (n,2) or str
if str, possible choice are "random" or "country"
dist_func : callable
self_link : bool
Returns
-------
nx.Graph
generated graph
"""
Fize Jacques
committed
if coords and isinstance(coords, Iterable) and not isinstance(coords, str):
if len(coords) != nb_nodes:
raise ValueError("number of nodes must match the size of the coords dict")
elif coords == "random":
coords = np.random.random(nb_nodes * 2).reshape(nb_nodes, 2)
coords[:, 0] = (coords[:, 0] * 360) - 180
coords[:, 1] = (coords[:, 1] * 180) - 90
else:
coords = get_countries_coords()
if nb_nodes > len(coords):
raise ValueError(
"Too many nodes for coords = \"country\". Change nb_nodes value or change coords to 'random' or your own list of coords")
coords_index = np.random.choice(np.arange(len(coords)), nb_nodes)
coords = coords[coords_index]
data = []
for i in range(nb_nodes):
for j in range(nb_nodes):
if i == j and not self_link:
continue
data.append([i, j, dist_func(coords[i], coords[j])])
df = pd.DataFrame(data, columns="src tar weight".split())
Fize Jacques
committed
df["hash"] = df.apply(lambda x: "_".join(sorted([str(x.src), str(x.tar)])), axis=1)
Fize Jacques
committed
df = df.sample(nb_edges, weights="weight")
G = nx.from_pandas_edgelist(df, source="src", target="tar", edge_attr="weight")
for n in list(G.nodes()): G.nodes[n]["pos"] = coords[n]
return G
Fize Jacques
committed
def ER_graph(nb_nodes, nb_edges):
"""
Generate a random graph with a specific nb of nodes and edges.
Parameters
----------
nb_nodes : int
nb_edges : int
Returns
-------
nx.Graph
generated graph
"""
Fize Jacques
committed
return nx.dense_gnm_random_graph(nb_nodes, nb_edges)
Fize Jacques
committed
Fize Jacques
committed
def stochastic_block_model_graph(nb_nodes, nb_edges, nb_com, percentage_edge_betw, verbose=False):
"""
Generate a stochastic block model graph with defined number of vertices and edges.
Parameters
----------
nb_nodes : int
nb_edges : int
nb_com : int
percentage_edge_betw : float
verbose : bool
Returns
-------
nx.Graph
generated graph
"""
Fize Jacques
committed
if nb_nodes % nb_com != 0:
raise ValueError("Modulo between the number of nodes and community must be equal to 0")
edge_max = (1 / nb_com) * ((nb_nodes * (nb_nodes - 1)) / 2)
if nb_edges > edge_max:
raise ValueError("nb_edges must be inferior to {0}".format(edge_max))
Fize Jacques
committed
percentage_edge_within = 1 - percentage_edge_betw
G = nx.planted_partition_graph(nb_com, int(np.round(nb_nodes / nb_com)), 1, 1)
if verbose:
print(G.size())
block_assign = nx.get_node_attributes(G, "block")
Fize Jacques
committed
inter_edges, intra_edges = [], []
Fize Jacques
committed
register = set([])
for n1 in list(G.nodes()):
for n2 in list(G.nodes()):
hash_ = "_".join(sorted([str(n1), str(n2)]))
if (n1 == n2) or (hash_ in register):
continue
b1, b2 = block_assign[n1], block_assign[n2]
Fize Jacques
committed
if b1 != b2:
Fize Jacques
committed
inter_edges.append([n1, n2])
else:
intra_edges.append([n1, n2])
register.add(hash_)
inter_edges = np.asarray(inter_edges)
intra_edges = np.asarray(intra_edges)
inter_N, intra_N = len(inter_edges), len(intra_edges)
if verbose:
print(inter_N, intra_N)
print(int(np.ceil(nb_edges * percentage_edge_betw)), int(np.ceil(nb_edges * percentage_edge_within)))
final_edges = []
Fize Jacques
committed
index_inter = np.random.choice(np.arange(inter_N), int(np.round(nb_edges * percentage_edge_betw)), replace=False)
index_intra = np.random.choice(np.arange(intra_N), int(np.round(nb_edges * percentage_edge_within)), replace=False)
Fize Jacques
committed
final_edges.extend(inter_edges[index_inter])
final_edges.extend(intra_edges[index_intra])
Fize Jacques
committed
Fize Jacques
committed
if verbose:
print(len(final_edges))
G2 = nx.from_edgelist(final_edges)
Fize Jacques
committed
if len(G2) != nb_nodes:
equilibrate(G2, nb_nodes, percentage_edge_betw, percentage_edge_within, inter_edges, intra_edges, block_assign)
Fize Jacques
committed
for n in list(G2.nodes()):
G2.nodes[n]["block"] = block_assign[n]
return G2
Fize Jacques
committed
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def equilibrate(G, nb_nodes, percentage_edge_betw, percentage_edge_within, inter_edges, intra_edges, block_assign):
"""
Sometimes the generated graph from the stochastic block model have some missing nodes due to the sampling method.
This function fix this issue.
Parameters
----------
G
nb_nodes
percentage_edge_betw
percentage_edge_within
inter_edges
intra_edges
block_assign
Returns
-------
"""
diff = nb_nodes - len(G)
nodes_missing = list(set(list(range(nb_nodes))) - set(list(G.nodes())))
nb_edge_inter = int(np.round(len(nodes_missing) * percentage_edge_betw))
nodes_inter = np.random.choice(nodes_missing, nb_edge_inter, replace=False)
nodes_intra = list(set(nodes_missing) - set(nodes_inter))
def draw_(array, register, hash_func=lambda x, y: "_".join(sorted([str(x), str(y)]))):
tries = 0
while tries <1000:
index_array = np.random.choice(np.arange(len(array)), 1)
res = array[index_array]
res = res[0]
hash_ = hash_func(res[0], res[1])
if not hash_ in register:
register.add(hash_)
return index_array
tries +=1
raise Exception("Error ! (TODO)")
# Draw new edges
new_edges = []
register = set([])
for node in nodes_inter:
mask_inter = np.isin(inter_edges, node).sum(axis=1).astype(bool)
index_inter = draw_(inter_edges[mask_inter], register)
new_edges.extend(inter_edges[mask_inter][index_inter])
for node in nodes_intra:
mask_intra = np.isin(intra_edges, node).sum(axis=1).astype(bool)
index_intra = draw_(intra_edges[mask_intra], register)
new_edges.extend(intra_edges[mask_intra][index_intra])
# Draw existing edge to delete
edge_to_delete = []
inter_edges_c, intra_edges_c = [], []
for ed in list(G.edges()):
if G.degree(ed[0]) > 1 and G.degree(ed[1]) > 1: # We don't want the nodes edge to be removed with the
# disappearance of the edge
if block_assign[ed[0]] != block_assign[ed[1]]:
inter_edges_c.append([ed[0], ed[1]])
else:
intra_edges_c.append([ed[0], ed[1]])
index_inter = np.random.choice(np.arange(len(inter_edges_c)), int(np.round(diff * percentage_edge_betw)),
replace=False)
index_intra = np.random.choice(np.arange(len(intra_edges_c)), int(np.round(diff * percentage_edge_within)),
replace=False)
edge_to_delete.extend(np.asarray(inter_edges_c)[index_inter])
edge_to_delete.extend(np.asarray(intra_edges_c)[index_intra])
for ed in edge_to_delete:
G.remove_edge(*ed)
for ed in new_edges:
G.add_edge(*ed)
return G