Newer
Older
Fize Jacques
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# coding = utf-8
from collections import Iterable
import numpy as np
import networkx as nx
import pandas as pd
import random
def powerlaw(nb_nodes, nb_edges, exponent=2, tries=100, min_deg=1):
nb_stubs = nb_edges * 2
# Draw a first time a powerlaw degree sequence
degs = np.round(nx.utils.powerlaw_sequence(nb_nodes, exponent=exponent))
degs = degs[degs >= min_deg]
# Compute de degree sum
sum_deg = degs.sum()
for _ in range(tries):
# If the sum of the degree sequence is equal to the number of stubs, then it's good
if sum_deg == nb_stubs:
return degs
# Draw a a new powerlaw degree sequence
new_draw = np.round(nx.utils.powerlaw_sequence(nb_nodes, exponent=exponent))
new_draw = new_draw[new_draw >= min_deg]
new_sum_deg = new_draw.sum()
# If the new degree sequence is closer to the objective than the previously draw sequence
if abs(nb_stubs - new_sum_deg) < abs(nb_stubs - sum_deg):
degs = new_draw
sum_deg = new_sum_deg
# Once the final draw is executed and the sequence degree sum is not equal to number of stubs expected
if not sum_deg == nb_stubs:
# We randomly pick sequence degrees and increment (or decrement) their values
diff = abs(sum_deg - nb_stubs)
signe = -1 if (nb_stubs - sum_deg) < 0 else 1
indexes = np.random.choice(np.arange(len(degs)), int(diff))
for ind in indexes:
degs[ind] = degs[ind] + signe
return degs
def get_countries_coords():
"""
Return the coordinates of each country in the world.
Returns
-------
np.ndarray
coordinates
"""
try:
import geopandas as gpd
except:
raise ImportError("Geopandas is not installed !")
gdf = gpd.read_file(gpd.datasets.get_path("naturalearth_lowres"))
return np.asarray(gdf.centroid.apply(lambda x: [x.x, x.y]).values.tolist())
def powerlaw_graph(nb_nodes, nb_edges, exponent=2, tries=100, min_deg=1):
return nx.configuration_model(powerlaw(nb_nodes,nb_edges,exponent,tries,min_deg))
def spatial_graph(nb_nodes, nb_edges, coords="country", dist_func=lambda a, b: np.linalg.norm(a - b), self_link=False):
if coords and isinstance(coords, Iterable) and not isinstance(coords, str):
if len(coords) != nb_nodes:
raise ValueError("number of nodes must match the size of the coords dict")
elif coords == "random":
coords = np.random.random(nb_nodes * 2).reshape(nb_nodes, 2)
coords[:, 0] = (coords[:, 0] * 360) - 180
coords[:, 1] = (coords[:, 1] * 180) - 90
else:
coords = get_countries_coords()
if nb_nodes > len(coords):
raise ValueError(
"Too many nodes for coords = \"country\". Change nb_nodes value or change coords to 'random' or your own list of coords")
coords_index = np.random.choice(np.arange(len(coords)), nb_nodes)
coords = coords[coords_index]
data = []
for i in range(nb_nodes):
for j in range(nb_nodes):
if i == j and not self_link:
continue
data.append([i, j, dist_func(coords[i], coords[j])])
df = pd.DataFrame(data, columns="src tar weight".split())
df = df.sample(nb_edges, weights="weight")
G = nx.from_pandas_edgelist(df, source="src", target="tar", edge_attr="weight")
for n in list(G.nodes()): G.nodes[n]["pos"] = coords[n]
return G
def ER_graph(nb_nodes,nb_edges):
return nx.dense_gnm_random_graph(nb_nodes,nb_edges)
def stochastic_block_model_graph(nb_nodes,nb_edges,nb_com,percentage_edge_betw,verbose=False):
percentage_edge_within = 1 - percentage_edge_betw
if nb_edges > (1 / nb_com) * (nb_nodes * (nb_nodes - 1)) / 2:
raise ValueError("nb_edges must be inferior to {0}".format((1 / nb_com) * (nb_nodes * (nb_nodes - 1)) / 2))
G = nx.planted_partition_graph(nb_com, int(np.round(nb_nodes / nb_com)), 1, 1)
if verbose:
print(G.size())
block_assign = nx.get_node_attributes(G, "block")
inter_edges,intra_edges = [], []
register = set([])
for n1 in list(G.nodes()):
for n2 in list(G.nodes()):
hash_ = "_".join(sorted([str(n1), str(n2)]))
if (n1 == n2) or (hash_ in register):
continue
b1, b2 = block_assign[n1], block_assign[n2]
if b1 != b2:
inter_edges.append([n1, n2])
else:
intra_edges.append([n1, n2])
register.add(hash_)
inter_edges = np.asarray(inter_edges)
intra_edges = np.asarray(intra_edges)
inter_N, intra_N = len(inter_edges), len(intra_edges)
if verbose:
print(inter_N, intra_N)
print(int(np.ceil(nb_edges * percentage_edge_betw)), int(np.ceil(nb_edges * percentage_edge_within)))
final_edges = []
index_inter = np.random.choice(np.arange(inter_N), int(np.ceil(nb_edges * percentage_edge_betw)), replace=False)
index_intra = np.random.choice(np.arange(intra_N), int(np.ceil(nb_edges * percentage_edge_within)), replace=False)
final_edges.extend(inter_edges[index_inter])
final_edges.extend(intra_edges[index_intra])
if verbose:
print(len(final_edges))
G2 = nx.from_edgelist(final_edges)
for n in list(G2.nodes()):
G2.nodes[n]["block"] = block_assign[n]
return G2