-
Fize Jacques authored64dc3924
random.py 13.52 KiB
# coding = utf-8
from collections import Iterable
import numpy as np
import networkx as nx
import pandas as pd
from networkx.generators.degree_seq import _to_stublist
import random
def powerlaw(nb_nodes, nb_edges, exponent=2, tries=100, min_deg=1):
"""
Return a degree distribution that fit the power law and specified number of edges and vertices.
Parameters
----------
nb_nodes : int
nb_edges : int
exponent : int
tries : int
min_deg : int
Returns
-------
np.ndarray
degree sequence
"""
nb_stubs = nb_edges * 2
# Draw a first time a powerlaw degree sequence
degs = np.round(nx.utils.powerlaw_sequence(nb_nodes, exponent=exponent))
degs = degs[degs >= min_deg]
# Compute de degree sum
sum_deg = degs.sum()
for _ in range(tries):
# If the sum of the degree sequence is equal to the number of stubs, then it's good
if sum_deg == nb_stubs:
return degs
# Draw a a new powerlaw degree sequence
new_draw = np.round(nx.utils.powerlaw_sequence(nb_nodes, exponent=exponent))
new_draw = new_draw[new_draw >= min_deg]
new_sum_deg = new_draw.sum()
# If the new degree sequence is closer to the objective than the previously draw sequence
if abs(nb_stubs - new_sum_deg) < abs(nb_stubs - sum_deg):
degs = new_draw
sum_deg = new_sum_deg
# Once the final draw is executed and the sequence degree sum is not equal to number of stubs expected
if not sum_deg == nb_stubs:
# We randomly pick sequence degrees and increment (or decrement) their values
diff = abs(sum_deg - nb_stubs)
signe = -1 if (nb_stubs - sum_deg) < 0 else 1
indexes = np.random.choice(np.arange(len(degs)), int(diff))
for ind in indexes:
degs[ind] = degs[ind] + signe
return degs.astype(int)
def get_countries_coords():
"""
Return the coordinates of each country in the world.
Returns
-------
np.ndarray
coordinates
"""
try:
import geopandas as gpd
except:
raise ImportError("Geopandas is not installed !")
gdf = gpd.read_file(gpd.datasets.get_path("naturalearth_lowres"))
return np.asarray(gdf.centroid.apply(lambda x: [x.x, x.y]).values.tolist())
def _conf_model(degree_seq):
stubs_list = _to_stublist(degree_seq)
random.shuffle(stubs_list)
register = set()
edges = []
hash_func = lambda x, y: "_".join(sorted([str(x), str(y)]))
tries = 0
while len(stubs_list) > 0 and tries < 100:
to_del = set([])
for i in range(0, len(stubs_list) - 2, 2):
u, v = stubs_list[i], stubs_list[i + 1]
hash_ = hash_func(u, v)
if hash_ in register:
continue
else:
register.add(hash_)
edges.append([u, v])
to_del.add(i)
to_del.add(i + 1)
stubs_list = [stubs_list[i] for i in range(len(stubs_list)) if not i in to_del]
random.shuffle(stubs_list)
tries += 1
G = nx.from_edgelist(edges)
return G
def powerlaw_graph(nb_nodes, nb_edges, exponent=2, tries=1000, min_deg=0):
"""
Generate a graph with a defined number of vertices, edges, and a degree distribution that fit the power law.
Using the Molloy-Reed algorithm to
Parameters
----------
nb_nodes : int
nb_edges : int
exponent : int
tries : int
min_deg : int
Returns
-------
nx.Graph
generated graph
"""
G = _conf_model(powerlaw(nb_nodes, nb_edges, exponent, tries, min_deg).astype(int))
tries_ = 0
while len(G) != nb_nodes and tries_ <tries:
G = _conf_model(powerlaw(nb_nodes, nb_edges, exponent, tries, min_deg).astype(int))
tries_ += 1
if len(G) != nb_nodes:
print(nb_nodes,nb_edges,exponent)
raise Exception("Cant compute configuration model based on parameters")
if G.size() != nb_edges:
diff = abs(G.size() - nb_edges)
signe = 1 if G.size() - nb_edges < 0 else -1
if signe:
for n in list(G.nodes()):
if G.size() == nb_edges:
break
for n2 in list(G.nodes()):
if not G.has_edge(n, n2): G.add_edge(n, n2)
if G.size() == nb_edges:
break
else:
edges_ = list(G.edges())
random.shuffle(edges_)
i = diff
for ed in edges_:
u, v = ed[0], ed[1]
if G.degree(u) > 1 and G.degree(v) > 1:
G.remove_edge(u, v)
i -= 1
return G
def spatial_graph(nb_nodes, nb_edges, coords="country", dist_func=lambda a, b: np.linalg.norm(a - b), self_link=False,weighted = False):
"""
Generate a spatial graph with a specific number of vertices and edges
Parameters
----------
nb_nodes : int
nb_edges : int
coords : array of shape (n,2) or str
if str, possible choice are "random" or "country"
dist_func : callable
self_link : bool
Returns
-------
nx.Graph
generated graph
"""
if coords and isinstance(coords, Iterable) and not isinstance(coords, str):
if len(coords) != nb_nodes:
raise ValueError("number of nodes must match the size of the coords dict")
elif coords == "random":
coords = np.random.random(nb_nodes * 2).reshape(nb_nodes, 2)
coords[:, 0] = (coords[:, 0] * 360) - 180
coords[:, 1] = (coords[:, 1] * 180) - 90
else:
coords = get_countries_coords()
if nb_nodes > len(coords):
raise ValueError(
"Too many nodes for coords = \"country\". Change nb_nodes value or change coords to 'random' or your own list of coords")
coords_index = np.random.choice(np.arange(len(coords)), nb_nodes)
coords = coords[coords_index]
data = []
for i in range(nb_nodes):
for j in range(nb_nodes):
if i == j and not self_link:
continue
data.append([i, j, 1/((1+dist_func(coords[i], coords[j])**4))])
df = pd.DataFrame(data, columns="src tar weight".split()).astype({"src": int, "tar": int})
df["hash"] = df.apply(lambda x: "_".join(sorted([str(int(x.src)), str(int(x.tar))])), axis=1)
df = df.drop_duplicates(subset="hash")
df["weight"] = df.weight/df.weight.sum()
register = set([])
def add_register(hashes):
for hash_ in hashes:
register.add(hash_)
def in_register(hashes):
return np.array([True if hash_ in register else False for hash_ in hashes])
nodes = np.arange(nb_nodes).astype(int)
sizes = [len(x) for x in np.array_split(np.arange(nb_edges), nb_nodes)]
new_df = df[(df.src == nodes[0]) | (df.tar == nodes[0])].sample(n=sizes[0], weights="weight").copy()
add_register(new_df.hash.values)
df = df[~in_register(df.hash.values)]
for ix, node in enumerate(nodes[1:]):
sample = df[(df.src == node) | (df.tar == node)].sample(n=sizes[ix + 1], weights="weight").copy()
new_df = pd.concat((new_df, sample))
add_register(new_df.hash.values)
df = df[~in_register(df.hash.values)]
if weighted:
G = nx.from_pandas_edgelist(new_df, source="src", target="tar", edge_attr="weight")
else:
G = nx.from_pandas_edgelist(new_df, source="src", target="tar")
for n in list(G.nodes()): G.nodes[n]["pos"] = coords[n]
return G
def ER_graph(nb_nodes, nb_edges):
"""
Generate a random graph with a specific nb of nodes and edges.
Parameters
----------
nb_nodes : int
nb_edges : int
Returns
-------
nx.Graph
generated graph
"""
return nx.dense_gnm_random_graph(nb_nodes, nb_edges)
def stochastic_block_model_graph(nb_nodes, nb_edges, nb_com, percentage_edge_betw, verbose=False):
"""
Generate a stochastic block model graph with defined number of vertices and edges.
Parameters
----------
nb_nodes : int
nb_edges : int
nb_com : int
percentage_edge_betw : float
verbose : bool
Returns
-------
nx.Graph
generated graph
"""
if nb_nodes % nb_com != 0:
raise ValueError("Modulo between the number of nodes and community must be equal to 0")
edge_max = (1 / nb_com) * ((nb_nodes * (nb_nodes - 1)) / 2)
if nb_edges > edge_max:
raise ValueError("nb_edges must be inferior to {0}".format(edge_max))
def nb_of_pair(N):
return (N*(N-1))/2
G = nx.planted_partition_graph(nb_com, nb_nodes // nb_com, 1, 1)
block_assign = nx.get_node_attributes(G, "block")
b_assign_array = np.asarray(list(nx.get_node_attributes(G,"block").values()))
if verbose:
print(G.size())
u_in = sum([nb_of_pair((b_assign_array==b).sum()) for b in range(nb_com)])
u_out = nb_of_pair(len(G)) - u_in
l_out = u_out*percentage_edge_betw
l_in = nb_edges - l_out
percentage_edge_within = l_in / u_in
if verbose:
print("u_out",u_out)
print("u_in",u_in)
print("l_out",l_out)
print("l_in", l_in)
print("p_in",percentage_edge_within)
# percentage_edge_within = 1 - percentage_edge_betw
inter_edges, intra_edges = [], []
register = set([])
for n1 in list(G.nodes()):
for n2 in list(G.nodes()):
hash_ = "_".join(sorted([str(n1), str(n2)]))
if (n1 == n2) or (hash_ in register):
continue
b1, b2 = block_assign[n1], block_assign[n2]
if b1 != b2:
inter_edges.append([n1, n2])
else:
intra_edges.append([n1, n2])
register.add(hash_)
inter_edges = np.asarray(inter_edges)
intra_edges = np.asarray(intra_edges)
inter_N, intra_N = len(inter_edges), len(intra_edges)
probs_inter = np.ones(inter_N) * percentage_edge_betw
probs_intra = np.ones(intra_N) * percentage_edge_within
all_edges = np.concatenate((inter_edges, intra_edges))
del inter_edges
del intra_edges
all_probs = np.concatenate((probs_inter, probs_intra))
del probs_inter
del probs_intra
all_probs /= all_probs.sum()
if verbose:
print(inter_N, intra_N)
print(int(np.ceil(nb_edges * percentage_edge_betw)), int(np.ceil(nb_edges * percentage_edge_within)))
final_edges = []
index_selected_pairs = np.random.choice(np.arange(len(all_edges)), nb_edges, p=all_probs, replace=False)
final_edges.extend(all_edges[index_selected_pairs])
if verbose:
print(len(final_edges))
G2 = nx.from_edgelist(final_edges)
if len(G2) != nb_nodes:
equilibrate(G2, nb_nodes, percentage_edge_betw, percentage_edge_within, inter_edges, intra_edges, block_assign)
for n in list(G2.nodes()):
G2.nodes[n]["block"] = block_assign[n]
return G2
def equilibrate(G, nb_nodes, percentage_edge_betw, percentage_edge_within, inter_edges, intra_edges, block_assign):
"""
Sometimes the generated graph from the stochastic block model have some missing nodes due to the sampling method.
This function fix this issue.
Parameters
----------
G
nb_nodes
percentage_edge_betw
percentage_edge_within
inter_edges
intra_edges
block_assign
Returns
-------
"""
diff = nb_nodes - len(G)
nodes_missing = list(set(list(range(nb_nodes))) - set(list(G.nodes())))
nb_edge_inter = int(np.round(len(nodes_missing) * percentage_edge_betw))
nodes_inter = np.random.choice(nodes_missing, nb_edge_inter, replace=False)
nodes_intra = list(set(nodes_missing) - set(nodes_inter))
def draw_(array, register, hash_func=lambda x, y: "_".join(sorted([str(x), str(y)]))):
tries = 0
while tries < 1000:
index_array = np.random.choice(np.arange(len(array)), 1)
res = array[index_array]
res = res[0]
hash_ = hash_func(res[0], res[1])
if not hash_ in register:
register.add(hash_)
return index_array
tries += 1
raise Exception("Error ! (TODO)")
# Draw new edges
new_edges = []
register = set([])
for node in nodes_inter:
mask_inter = np.isin(inter_edges, node).sum(axis=1).astype(bool)
index_inter = draw_(inter_edges[mask_inter], register)
new_edges.extend(inter_edges[mask_inter][index_inter])
for node in nodes_intra:
mask_intra = np.isin(intra_edges, node).sum(axis=1).astype(bool)
index_intra = draw_(intra_edges[mask_intra], register)
new_edges.extend(intra_edges[mask_intra][index_intra])
# Draw existing edge to delete
edge_to_delete = []
inter_edges_c, intra_edges_c = [], []
for ed in list(G.edges()):
if G.degree(ed[0]) > 1 and G.degree(ed[1]) > 1: # We don't want the nodes edge to be removed with the
# disappearance of the edge
if block_assign[ed[0]] != block_assign[ed[1]]:
inter_edges_c.append([ed[0], ed[1]])
else:
intra_edges_c.append([ed[0], ed[1]])
index_inter = np.random.choice(np.arange(len(inter_edges_c)), int(np.round(diff * percentage_edge_betw)),
replace=False)
index_intra = np.random.choice(np.arange(len(intra_edges_c)), int(np.round(diff * percentage_edge_within)),
replace=False)
edge_to_delete.extend(np.asarray(inter_edges_c)[index_inter])
edge_to_delete.extend(np.asarray(intra_edges_c)[index_intra])
for ed in edge_to_delete:
G.remove_edge(*ed)
for ed in new_edges:
G.add_edge(*ed)
return G