Skip to content
Snippets Groups Projects
random.py 7.02 KiB
Newer Older
# coding = utf-8
from collections import Iterable

import numpy as np
import networkx as nx
import pandas as pd

import random


def powerlaw(nb_nodes, nb_edges, exponent=2, tries=100, min_deg=1):
Fize Jacques's avatar
Fize Jacques committed
    """
    Return a degree distribution that fit the power law and specified number of edges and vertices.
    Parameters
    ----------
    nb_nodes : int
    nb_edges : int
    exponent : int
    tries : int
    min_deg : int

    Returns
    -------
    np.ndarray
        degree sequence
    """
    nb_stubs = nb_edges * 2
    # Draw a first time a powerlaw degree sequence
    degs = np.round(nx.utils.powerlaw_sequence(nb_nodes, exponent=exponent))

    degs = degs[degs >= min_deg]
    # Compute de degree sum
    sum_deg = degs.sum()

    for _ in range(tries):
        # If the sum of the degree sequence is equal to the number of stubs, then it's good
        if sum_deg == nb_stubs:
            return degs
        # Draw a a new powerlaw degree sequence
        new_draw = np.round(nx.utils.powerlaw_sequence(nb_nodes, exponent=exponent))
        new_draw = new_draw[new_draw >= min_deg]
        new_sum_deg = new_draw.sum()

        # If the new degree sequence is closer to the objective than the previously draw sequence
        if abs(nb_stubs - new_sum_deg) < abs(nb_stubs - sum_deg):
            degs = new_draw
            sum_deg = new_sum_deg

    # Once the final draw is executed and the sequence degree sum is not equal to number of stubs expected
    if not sum_deg == nb_stubs:
        # We randomly pick sequence degrees and increment (or decrement) their values
        diff = abs(sum_deg - nb_stubs)
        signe = -1 if (nb_stubs - sum_deg) < 0 else 1
        indexes = np.random.choice(np.arange(len(degs)), int(diff))
        for ind in indexes:
            degs[ind] = degs[ind] + signe

Fize Jacques's avatar
Fize Jacques committed
    return degs.astype(int)


def get_countries_coords():
    """
    Return the coordinates of each country in the world.
    Returns
    -------
    np.ndarray
        coordinates
    """
    try:
        import geopandas as gpd
    except:
        raise ImportError("Geopandas is not installed !")
    gdf = gpd.read_file(gpd.datasets.get_path("naturalearth_lowres"))
Fize Jacques's avatar
Fize Jacques committed

    return np.asarray(gdf.centroid.apply(lambda x: [x.x, x.y]).values.tolist())


def powerlaw_graph(nb_nodes, nb_edges, exponent=2, tries=100, min_deg=1):
Fize Jacques's avatar
Fize Jacques committed
    """
    Generate a graph with a definied number of vertices, edges, and a degree distribution that fit the power law.
    Parameters
    ----------
    nb_nodes : int
    nb_edges : int
    exponent : int
    tries : int
    min_deg : int

    Returns
    -------
    nx.Graph
            generated graph
    """
    seq = powerlaw(nb_nodes, nb_edges, exponent, tries, min_deg)
    return nx.configuration_model(seq.astype(int))

def spatial_graph(nb_nodes, nb_edges, coords="country", dist_func=lambda a, b: np.linalg.norm(a - b), self_link=False):
Fize Jacques's avatar
Fize Jacques committed
    """
    Generate a spatial graph with a specific number of vertices and edges
    Parameters
    ----------
    nb_nodes : int
    nb_edges : int
    coords : array of shape (n,2) or str
        if str, possible choice are "random" or "country"
    dist_func : callable
    self_link : bool

    Returns
    -------
    nx.Graph
        generated graph
    """
    if coords and isinstance(coords, Iterable) and not isinstance(coords, str):
        if len(coords) != nb_nodes:
            raise ValueError("number of nodes must match the size of the coords dict")
    elif coords == "random":
        coords = np.random.random(nb_nodes * 2).reshape(nb_nodes, 2)
        coords[:, 0] = (coords[:, 0] * 360) - 180
        coords[:, 1] = (coords[:, 1] * 180) - 90
    else:
        coords = get_countries_coords()
        if nb_nodes > len(coords):
            raise ValueError(
                "Too many nodes for coords = \"country\". Change nb_nodes value or change coords to 'random' or your own list of coords")
        coords_index = np.random.choice(np.arange(len(coords)), nb_nodes)
        coords = coords[coords_index]
    data = []
    for i in range(nb_nodes):
        for j in range(nb_nodes):
            if i == j and not self_link:
                continue
            data.append([i, j, dist_func(coords[i], coords[j])])
    df = pd.DataFrame(data, columns="src tar weight".split())
Fize Jacques's avatar
Fize Jacques committed
    df["hash"] = df.apply(lambda x : "_".join(sorted([str(x.src),str(x.tar)])) ,axis=1)
    df = df.drop_duplicates(subset=["hash"])
    df = df.sample(nb_edges, weights="weight")
    G = nx.from_pandas_edgelist(df, source="src", target="tar", edge_attr="weight")
    for n in list(G.nodes()): G.nodes[n]["pos"] = coords[n]
    return G

def ER_graph(nb_nodes,nb_edges):
Fize Jacques's avatar
Fize Jacques committed
    """
    Generate a random graph with a specific nb of nodes and edges.
    Parameters
    ----------
    nb_nodes : int
    nb_edges : int

    Returns
    -------
    nx.Graph
        generated graph
    """
    return nx.dense_gnm_random_graph(nb_nodes,nb_edges)


def stochastic_block_model_graph(nb_nodes,nb_edges,nb_com,percentage_edge_betw,verbose=False):
Fize Jacques's avatar
Fize Jacques committed
    """
    Generate a stochastic block model graph with defined number of vertices and edges.
    Parameters
    ----------
    nb_nodes : int
    nb_edges : int
    nb_com : int
    percentage_edge_betw : float
    verbose : bool

    Returns
    -------
    nx.Graph
        generated graph
    """

    if nb_nodes%nb_com != 0:
        raise ValueError("Modulo between the number of nodes and community must be equal to 0")

    edge_max = (1 / nb_com) * ((nb_nodes * (nb_nodes - 1)) / 2)
    if nb_edges > edge_max:
        raise ValueError("nb_edges must be inferior to {0}".format(edge_max))

    percentage_edge_within = 1 - percentage_edge_betw

    G = nx.planted_partition_graph(nb_com, int(np.round(nb_nodes / nb_com)), 1, 1)
    if verbose:
        print(G.size())

    block_assign = nx.get_node_attributes(G, "block")
    inter_edges,intra_edges = [], []
    register = set([])
    for n1 in list(G.nodes()):
        for n2 in list(G.nodes()):
            hash_ = "_".join(sorted([str(n1), str(n2)]))
            if (n1 == n2) or (hash_ in register):
                continue
            b1, b2 = block_assign[n1], block_assign[n2]
Fize Jacques's avatar
Fize Jacques committed
            if b1 != b2 :
                inter_edges.append([n1, n2])
            else:
                intra_edges.append([n1, n2])
            register.add(hash_)

    inter_edges = np.asarray(inter_edges)
    intra_edges = np.asarray(intra_edges)
    inter_N, intra_N = len(inter_edges), len(intra_edges)

    if verbose:
        print(inter_N, intra_N)
        print(int(np.ceil(nb_edges * percentage_edge_betw)), int(np.ceil(nb_edges * percentage_edge_within)))

    final_edges = []
    index_inter = np.random.choice(np.arange(inter_N), int(np.ceil(nb_edges * percentage_edge_betw)), replace=False)
    index_intra = np.random.choice(np.arange(intra_N), int(np.ceil(nb_edges * percentage_edge_within)), replace=False)
    final_edges.extend(inter_edges[index_inter])
    final_edges.extend(intra_edges[index_intra])
    if verbose:
        print(len(final_edges))

    G2 = nx.from_edgelist(final_edges)
    for n in list(G2.nodes()):
        G2.nodes[n]["block"] = block_assign[n]
    return G2