from collections import defaultdict from itertools import product import networkx as nx from pydyton.core import TickProcess class EpochCommunity(TickProcess): def __init__(self, epoch, **kwargs): super().__init__(epoch) self.graph = nx.Graph() self.old_graph = None self.community = defaultdict(frozenset) self.__cbc_memo = {} def set_link(self, a, b, state, now): if a not in self.graph: self.graph.add_node(a) if b not in self.graph: self.graph.add_node(b) if b not in self.graph[a]: self.graph.add_edge(a, b, { 'start': -1 }) edge = self.graph[a][b] if state: edge['start'] = now if 'duration' not in edge: edge['duration'] = 0 else: edge['duration'] = now - edge['start'] edge['start'] = -1 def next_epoch(self, now): self.community = defaultdict(frozenset) edges_to_keep = [] self.__cbc_memo = {} for a, b, start in self.graph.edges(data='start'): if start > -1: self.set_link(a, b, False, now) edges_to_keep.append((a, b)) self.old_graph = self.graph self.graph = nx.Graph() for a, b in edges_to_keep: self.set_link(a, b, True, now) return self.old_graph def __getitem__(self, node): return self.community[node] def get_lp(self, node): '''local popularity of a node''' if node not in self.old_graph: return 0 edges = self.old_graph[node] community = self[node] return sum([ edge['duration'] for other, edge in edges.items() if other in community ]) def get_gp(self, node): '''global popularity of a node''' if node not in self.old_graph: return 0 edges = self.old_graph[node] community = self[node] return sum([ edge['duration'] for other, edge in edges.items() if other not in community ]) def get_ui(self, node): '''unique interactions with a node''' if node not in self.old_graph: return 0 edges = self.old_graph[node] community = self[node] return len([ other for other in edges if other in community ]) def get_cbc(self, a, b): '''''' g = self.old_graph c_x = self[a] c_y = self[b] memo = (c_x, c_y) if a not in g or b not in g or c_x == c_y: return 0 if memo in self.__cbc_memo: return self.__cbc_memo[memo] cbc = sum([ g[x][y]['duration'] for x, y in product(c_x, c_y) if y in g[x] ]) self.__cbc_memo[memo] = cbc self.__cbc_memo[(memo[1], memo[0])] = cbc return cbc def get_ncf(self, x, b): '''''' g = self.old_graph c_y = self[b] if x not in g or b not in g: return 0 return sum([ g[x][y]['duration'] for y in c_y if y in g[x] ])