-
Jarrod Pas authoredJarrod Pas authored
community.py 7.65 KiB
"""
pydtn community module.
Implements the following community detection schemes on epochs:
- Louvain community detection
- K-Clique community detection
"""
__all__ = [
"Community",
"KCliqueCommunity",
"LouvainCommunity",
"CommunityNode",
"BubbleNode",
"HCBFNode",
]
__version__ = '0.1'
__author__ = 'Jarrod Pas <j.pas@usask.ca>'
from collections import defaultdict
# TODO: Consider using igraph instead of networkx - jpas (2017-08-11)
import networkx as nx
from community import best_partition as louvain_partition
from pydtn import Node
class Community:
"""
Base class for community detection algorithms.
Implements partitioning on epoch turn over.
To implement your own community detection algorithm inherit from this class
and implement `partition`. You may need to extend other methods.
"""
def __init__(self, epoch):
"""Create a community detector."""
self.network = None
self.epoch = epoch
self.graph = nx.Graph()
self._prev_graph = None
self._community = {}
def start(self, network):
"""
Start community detection loop.
If it has already been started do nothing.
"""
if self.network is not None:
return
self.network = network
self.network.env.process(self.epoch_loop())
def epoch_loop(self):
"""Transition current epoch to next epoch."""
env = self.network.env
while True:
yield env.timeout(self.epoch)
new_graph = nx.Graph()
for node_a, node_b, start in self.graph.edges(data='start'):
if start is not None:
self.leave(node_a, node_b)
data = {
'start': env.now,
'duration': 0,
}
new_graph.add_edge(node_a, node_b, data)
self.graph, self._prev_graph = new_graph, self.graph
self.partition()
def partition(self):
"""Partition nodes into communities."""
self._community = {}
for node in self._prev_graph.nodes():
self._community[node] = frozenset(list(node))
def join(self, node_a, node_b):
"""
Node a and b join each other's neighbourhoods.
This is an idempotent operation.
"""
if not self.graph.has_edge(node_a, node_b):
data = {
'start': None,
'duration': 0,
}
self.graph.add_edge(node_a, node_b, data)
edge = self.graph[node_a][node_b]
if edge['start'] is None:
edge['start'] = self.network.env.now
def leave(self, node_a, node_b):
"""
Node a and b leave each other's neighbourhoods.
This is an idempotent operation.
"""
if self.graph.has_edge(node_a, node_b):
edge = self.graph[node_a][node_b]
if edge['start'] is not None:
edge['duration'] += self.network.env.now - edge['start']
edge['start'] = None
def __getitem__(self, node):
"""
Return the community of a node.
If the node has no community it is it's own all alone.
"""
if node not in self._community:
self._community[node] = frozenset(list(node))
return self._community[node]
def local_popularity(self, node):
"""Return local popularity of a node."""
graph = self._prev_graph
if node not in graph:
return 0
return sum(
edge['duration']
for other, edge in graph[node].items()
if other in node.community
)
def global_popularity(self, node):
"""Return global popularity of a node."""
graph = self._prev_graph
if node not in graph:
return 0
return sum(
edge['duration']
for other, edge in graph[node].items()
if other not in node.community
)
def unique_interactions(self, node):
graph = self._prev_graph
if node not in graph:
return 0
return len(graph[node])
def community_betweenness(self, node_a, node_b):
"""Return community betweenness for 2 nodes."""
community_a = self[node_a]
community_b = self[node_b]
if community_a == community_b:
return float('inf')
graph = self._prev_graph
return sum(
graph[node_a][node_b]['duration']
for node_a, node_b in product(community_a, community_b)
if node_a in graph and node_b in graph[node_a]
)
class KCliqueCommunity(Community):
"""K-clique community detection."""
def __init__(self, epoch, k, threshold):
"""Create a k-clique community detector."""
super().__init__(epoch)
self.k = k
self.threshold = threshold
def partition(self):
"""Partition graph by k-clique."""
graph = nx.Graph()
for edge in self._prev_graph.edges(data='duration'):
duration = edge[2]
if duration > self.threshold:
graph.add_edge(*edge)
communities = nx.k_clique_communities(graph, self.k)
for community in communities:
for node in community:
self._community[node] = community
class LouvainCommunity(Community):
"""Louviain community detection."""
def partition(self):
"""Partition graph with Louvain algorithm."""
partitions = louvain_partition(self._prev_graph, weight='duration')
communities = defaultdict(set)
for node, community in partitions.items():
communities[community].add(node)
for community in communities.values():
community = frozenset(community)
for node in community:
self._community[node] = community
class CommunityNode(Node):
"""Base node for community based forwarding heuristics."""
def __init__(self, community=None, **options):
"""Create a community based node."""
super().__init__(**options)
if community is None:
raise ValueError('No community set')
self._community = community
def start(self, network):
"""Start event loop for node and community detector."""
super().start(network)
self._community.start(network)
@property
def community(self):
"""Return my community."""
return self._community[self]
@property
def in_community_neighbours(self):
"""Return nodes in my community."""
return [
neighbour
for neighbour in self.neighbours
if neighbour in self.community
]
@property
def out_community_neighbours(self):
"""Return nodes not in my community."""
return [
neighbour
for neighbour in self.neighbours
if neighbour not in self.community
]
class BubbleNode(CommunityNode):
"""Node with BubbleRap forwarding."""
def forward(self, packet):
"""Forward packet with the BubbleRap heuristic."""
# direct forwarding
forward = super().forward(packet)
if forward:
return forward
if self.in_community_neighbours:
return {}
elif self.out_community_neighbour:
return {}
return {}
class HCBFNode(CommunityNode):
"""Node with Hybrid Community Based forwarding."""
def forward(self, packet):
"""Forward packet with Hybrid Community Based heuristic."""
# direct forwarding
forward = super().forward(packet)
if forward:
return forward
return {}