# coding = utf-8 from collections import Iterable import numpy as np import networkx as nx import pandas as pd import random def powerlaw(nb_nodes, nb_edges, exponent=2, tries=100, min_deg=1): """ Return a degree distribution that fit the power law and specified number of edges and vertices. Parameters ---------- nb_nodes : int nb_edges : int exponent : int tries : int min_deg : int Returns ------- np.ndarray degree sequence """ nb_stubs = nb_edges * 2 # Draw a first time a powerlaw degree sequence degs = np.round(nx.utils.powerlaw_sequence(nb_nodes, exponent=exponent)) degs = degs[degs >= min_deg] # Compute de degree sum sum_deg = degs.sum() for _ in range(tries): # If the sum of the degree sequence is equal to the number of stubs, then it's good if sum_deg == nb_stubs: return degs # Draw a a new powerlaw degree sequence new_draw = np.round(nx.utils.powerlaw_sequence(nb_nodes, exponent=exponent)) new_draw = new_draw[new_draw >= min_deg] new_sum_deg = new_draw.sum() # If the new degree sequence is closer to the objective than the previously draw sequence if abs(nb_stubs - new_sum_deg) < abs(nb_stubs - sum_deg): degs = new_draw sum_deg = new_sum_deg # Once the final draw is executed and the sequence degree sum is not equal to number of stubs expected if not sum_deg == nb_stubs: # We randomly pick sequence degrees and increment (or decrement) their values diff = abs(sum_deg - nb_stubs) signe = -1 if (nb_stubs - sum_deg) < 0 else 1 indexes = np.random.choice(np.arange(len(degs)), int(diff)) for ind in indexes: degs[ind] = degs[ind] + signe return degs.astype(int) def get_countries_coords(): """ Return the coordinates of each country in the world. Returns ------- np.ndarray coordinates """ try: import geopandas as gpd except: raise ImportError("Geopandas is not installed !") gdf = gpd.read_file(gpd.datasets.get_path("naturalearth_lowres")) return np.asarray(gdf.centroid.apply(lambda x: [x.x, x.y]).values.tolist()) def powerlaw_graph(nb_nodes, nb_edges, exponent=2, tries=100, min_deg=1): """ Generate a graph with a definied number of vertices, edges, and a degree distribution that fit the power law. Parameters ---------- nb_nodes : int nb_edges : int exponent : int tries : int min_deg : int Returns ------- nx.Graph generated graph """ seq = powerlaw(nb_nodes, nb_edges, exponent, tries, min_deg) return nx.configuration_model(seq.astype(int)) def spatial_graph(nb_nodes, nb_edges, coords="country", dist_func=lambda a, b: np.linalg.norm(a - b), self_link=False): """ Generate a spatial graph with a specific number of vertices and edges Parameters ---------- nb_nodes : int nb_edges : int coords : array of shape (n,2) or str if str, possible choice are "random" or "country" dist_func : callable self_link : bool Returns ------- nx.Graph generated graph """ if coords and isinstance(coords, Iterable) and not isinstance(coords, str): if len(coords) != nb_nodes: raise ValueError("number of nodes must match the size of the coords dict") elif coords == "random": coords = np.random.random(nb_nodes * 2).reshape(nb_nodes, 2) coords[:, 0] = (coords[:, 0] * 360) - 180 coords[:, 1] = (coords[:, 1] * 180) - 90 else: coords = get_countries_coords() if nb_nodes > len(coords): raise ValueError( "Too many nodes for coords = \"country\". Change nb_nodes value or change coords to 'random' or your own list of coords") coords_index = np.random.choice(np.arange(len(coords)), nb_nodes) coords = coords[coords_index] data = [] for i in range(nb_nodes): for j in range(nb_nodes): if i == j and not self_link: continue data.append([i, j, dist_func(coords[i], coords[j])]) df = pd.DataFrame(data, columns="src tar weight".split()) df["hash"] = df.apply(lambda x : "_".join(sorted([str(x.src),str(x.tar)])) ,axis=1) df = df.drop_duplicates(subset=["hash"]) df = df.sample(nb_edges, weights="weight") G = nx.from_pandas_edgelist(df, source="src", target="tar", edge_attr="weight") for n in list(G.nodes()): G.nodes[n]["pos"] = coords[n] return G def ER_graph(nb_nodes,nb_edges): """ Generate a random graph with a specific nb of nodes and edges. Parameters ---------- nb_nodes : int nb_edges : int Returns ------- nx.Graph generated graph """ return nx.dense_gnm_random_graph(nb_nodes,nb_edges) def stochastic_block_model_graph(nb_nodes,nb_edges,nb_com,percentage_edge_betw,verbose=False): """ Generate a stochastic block model graph with defined number of vertices and edges. Parameters ---------- nb_nodes : int nb_edges : int nb_com : int percentage_edge_betw : float verbose : bool Returns ------- nx.Graph generated graph """ if nb_nodes%nb_com != 0: raise ValueError("Modulo between the number of nodes and community must be equal to 0") edge_max = (1 / nb_com) * ((nb_nodes * (nb_nodes - 1)) / 2) if nb_edges > edge_max: raise ValueError("nb_edges must be inferior to {0}".format(edge_max)) percentage_edge_within = 1 - percentage_edge_betw G = nx.planted_partition_graph(nb_com, int(np.round(nb_nodes / nb_com)), 1, 1) if verbose: print(G.size()) block_assign = nx.get_node_attributes(G, "block") inter_edges,intra_edges = [], [] register = set([]) for n1 in list(G.nodes()): for n2 in list(G.nodes()): hash_ = "_".join(sorted([str(n1), str(n2)])) if (n1 == n2) or (hash_ in register): continue b1, b2 = block_assign[n1], block_assign[n2] if b1 != b2 : inter_edges.append([n1, n2]) else: intra_edges.append([n1, n2]) register.add(hash_) inter_edges = np.asarray(inter_edges) intra_edges = np.asarray(intra_edges) inter_N, intra_N = len(inter_edges), len(intra_edges) if verbose: print(inter_N, intra_N) print(int(np.ceil(nb_edges * percentage_edge_betw)), int(np.ceil(nb_edges * percentage_edge_within))) final_edges = [] index_inter = np.random.choice(np.arange(inter_N), int(np.ceil(nb_edges * percentage_edge_betw)), replace=False) index_intra = np.random.choice(np.arange(intra_N), int(np.ceil(nb_edges * percentage_edge_within)), replace=False) final_edges.extend(inter_edges[index_inter]) final_edges.extend(intra_edges[index_intra]) if verbose: print(len(final_edges)) G2 = nx.from_edgelist(final_edges) for n in list(G2.nodes()): G2.nodes[n]["block"] = block_assign[n] return G2