Commit fcc3728d authored by Jarrod Pas's avatar Jarrod Pas
Browse files

Merge branch 'full-rewrite' into 'develop'

Full rewrite

See merge request !1
parents 0f6bb16b a9f59010
from random import Random
from pydtn import Network, Node, random_trace, random_traffic
class RandomNode(Node):
def __init__(self, seed=None, **options):
super().__init__(**options)
self.random = Random(seed)
def forward(self, packet):
neighbours = list(self.neighbours)
if neighbours:
target = self.random.choice(neighbours)
return {target: 'random'}
return {}
def main():
nodes = 50
seed = 42
traffic_speed = 1
node_options = {
'seed': seed,
'tick_rate': 1,
}
nodes = {
node_id: RandomNode(**node_options)
for node_id in range(nodes)
}
trace = random_trace(nodes, seed=seed)
traffic = random_traffic(nodes, speed=traffic_speed, seed=seed)
network = Network(nodes, traffic=traffic, trace=trace)
# run simulation for 500 ticks
network.run(until=500)
print(network.stats_summary)
if __name__ == '__main__':
exit(main())
import csv
import sys
from argparse import ArgumentParser
from collections import defaultdict, namedtuple
from itertools import groupby, count
from multiprocessing import Pool
from pydtn import Network, random_traffic, Node, EpidemicNode, Contact
from pydtn.community import BubbleNode, HCBFNode, LouvainCommunity
class ShedTrace:
def __init__(self, path, slot_size=300):
self.path = path
self.slot_size = slot_size
pairs = defaultdict(set)
with open(path) as slots:
reader = csv.reader(slots)
next(reader)
for row in reader:
_, source, _, target, _, slot = row
pair = min(source, target), max(source, target)
slot = int(slot)
pairs[pair].add(slot)
node = count()
nodes = {}
self.contacts = []
for (source, target), slots in pairs.items():
if source not in nodes:
nodes[source] = next(node)
source = nodes[source]
if target not in nodes:
nodes[target] = next(node)
target = nodes[target]
slots = sorted(slots)
# groups consecutive slots
# if the lambda is mapped it will return:
# [1, 2, 3, 6, 7, 9] -> [-1, -1, -1, -3, -3, -4]
for _, group in groupby(enumerate(slots), lambda p: p[0]-p[1]):
times = list(map(lambda g: g[1], group))
start = times[0] * self.slot_size
end = (times[-1] + 1) * self.slot_size
self.contacts.append(Contact(start, source, target, True))
self.contacts.append(Contact(end, source, target, False))
self.contacts.sort()
self.nodes = len(nodes)
def __iter__(self):
return iter(self.contacts)
Task = namedtuple('Task', ['trace', 'node_type', 'seed'])
def run_task(task):
seed = task.seed
trace = task.trace
epoch = 7*24*60*60 # 7 days
node_type = task.node_type
node_options = {
'tick_rate': 5 * 60, # 5 mins
'community': LouvainCommunity(epoch),
}
traffic_speed = 30 * 60 # 1 packet every 30 mins
nodes = {
node_id: task.node_type(**node_options)
for node_id in range(trace.nodes)
}
traffic = random_traffic(nodes,
start=epoch,
speed=traffic_speed,
seed=seed)
network = Network(nodes, traffic=traffic, trace=trace)
network.run()
stats = {
'trace': trace.path,
'node_type': node_type.__name__,
'seed': seed,
}
stats.update(network.stats_summary)
return stats
def main(args):
trace = ShedTrace(args['shed'])
pool = Pool()
tasks = []
for seed in args['seeds']:
for node_type in [Node, EpidemicNode, BubbleNode, HCBFNode]:
tasks.append(Task(trace=trace, node_type=node_type, seed=seed))
for stats in pool.imap_unordered(run_task, tasks):
print(stats)
def parse_args(args):
parser = ArgumentParser()
parser.add_argument('shed')
parser.add_argument('--seeds', '-s',
metavar='SEED', type=int, nargs='+', default=[None])
args = parser.parse_args(args)
return vars(args)
if __name__ == '__main__':
exit(main(parse_args(sys.argv[1:])))
from argparse import ArgumentParser
from datetime import datetime
import random
import sys
import simpy
import yaml
from pydtn import Network, NodeFactory
from pydtn.traces import types as traces
from pydtn.routers import types as routers
from pydtn.communities import types as communities
def parse_args(args):
parser = ArgumentParser()
parser.add_argument('--seed', '-s', metavar='seed', type=int,
default=42)
parser.add_argument('--trace', '-t', metavar='type', default='random',
choices=[t for t in traces])
parser.add_argument('--trace-args', '-ta', metavar='arg', nargs='+')
parser.add_argument('--router', '-r', metavar='type', default='direct',
choices=[t for t in routers])
parser.add_argument('--router-args', '-ra', metavar='arg', nargs='+')
parser.add_argument('--community', '-c', metavar='type', default='none',
choices=[t for t in communities])
parser.add_argument('--community-args', '-ca', metavar='arg', nargs='+')
parser.add_argument('--node-args', '-na', metavar='arg', nargs='+')
parser.add_argument('--packet-args', '-pa', metavar='arg', nargs='+')
def list_to_args(args):
if args is None:
return {}
else:
args = '\n'.join(args).replace('=',': ')
return yaml.safe_load(args)
args = parser.parse_args(args)
args.trace_args = list_to_args(args.trace_args)
args.router_args = list_to_args(args.router_args)
args.community_args = list_to_args(args.community_args)
args.node_args = list_to_args(args.node_args)
args.packet_args = list_to_args(args.packet_args)
return args
class Progress:
def __init__(self, length, ticks, alpha):
self.length = length
self.alpha = alpha
self.tick = 0
self.ticks = ticks
self.average = 0
self.__start = None
self.__last_print_len = 0
@property
def bar(self):
progress = round(self.tick / self.ticks * self.length)
return '[' + '='*progress + ' '*(self.length - progress) + ']'
@property
def time(self):
remains = (self.ticks - self.tick) * self.average
ranges = [
('day', 24*60*60),
('hour', 60*60),
('minute', 60),
('second', 1),
]
for unit, weight in ranges:
if remains >= weight:
remains = round(remains / weight)
plural = 's' if remains != 1 else ''
return f'{remains} {unit}{plural}'
return '0 seconds'
def __next__(self):
now = datetime.now().timestamp()
if self.__start is not None:
dt = now - self.__start
self.average = self.average + self.alpha * (dt - self.average)
self.__start = now
self.tick += 1
if self.tick > self.ticks:
raise StopIteration
print('\r' + ' '*self.__last_print_len, end='\r')
line = f'{self.bar} {self.time} to go...'
print(line, end=' ', flush=True)
self.__last_print_len = len(line)
return self.tick
def __iter__(self):
return self
def main(args):
args = parse_args(args)
print(args)
out = {
'seed': args.seed,
'router': args.router,
'community': args.community,
}
if args.trace == 'csv':
out['trace'] = args.trace_args['path']
for k, v in out.items():
print(f'{k}: {v}')
random.seed(args.seed)
env = simpy.Environment()
trace = traces[args.trace](**args.trace_args)
router = routers[args.router]
community = communities[args.community](**args.community_args)
node_factory = NodeFactory(router, **args.node_args)
network = Network(env,
packets=args.packet_args,
node_factory=node_factory,
community=community,
trace=trace)
progress = Progress(50, 500, 0.75)
while True:
try:
for tick in progress:
until = tick / progress.ticks * trace.duration
env.run(until=until)
print(' Done!')
print(str(network.packets))
return 0
except KeyboardInterrupt:
print('\nBye!')
return 0
if __name__ == '__main__':
exit(main(sys.argv[1:]))
from .network import *
"""pydtn is a module for simulating delay tolerant networks."""
__all__ = [
'Network',
'Buffer',
'Node',
'EpidemicNode',
'FloodingNode',
'Contact',
'csv_trace',
'random_trace',
'Traffic',
'random_traffic',
]
__version__ = '0.2'
__author__ = 'Jarrod Pas <j.pas@usask.ca>'
from collections import ChainMap, defaultdict, namedtuple, OrderedDict
import csv
from itertools import count
from random import Random
from time import time
import simpy
class Network:
"""
Network simulation.
TODO: elaborate.
"""
def __init__(self, nodes=None, trace=None, traffic=None):
"""Create a network."""
self.env = simpy.Environment()
self._stats = defaultdict(int)
self._stats['wall-time'] = 0
if nodes is None:
nodes = {}
self.nodes = nodes
if not trace:
trace = []
self.trace = trace
self.trace_done = simpy.Event(self.env)
if not traffic:
traffic = []
self.traffic = traffic
self.packets = []
# start components in order of priority
self.env.process(self.traffic_loop())
self.env.process(self.trace_loop())
for node in self.nodes.values():
node.start(self)
def traffic_loop(self):
"""Trigger traffic for nodes."""
for traffic in self.traffic:
if traffic.created > self.now:
yield self.env.timeout(traffic.created - self.now)
packet = Packet(self, traffic)
self.packets.append(packet)
packet.source.recv(packet)
def trace_loop(self):
"""Trigger contacts for nodes."""
for contact in self.trace:
if contact.time > self.now:
yield self.env.timeout(contact.time - self.now)
node_a = self.nodes[contact.a]
node_b = self.nodes[contact.b]
if contact.join:
node_a.join(node_b)
node_b.join(node_a)
else:
node_a.leave(node_b)
node_b.leave(node_a)
self.trace_done.succeed()
@property
def now(self):
"""Return current simulation time."""
return self.env.now
def run(self, until=None):
"""
Run the simulation.
Keyword Arguments:
until -- tick to run the simulation to and stop
"""
if until is None:
until = self.trace_done
start = time()
self.env.run(until=until)
end = time()
self._stats['wall-time'] += end - start
@property
def stats(self):
"""Return detailed statistics for the simulation."""
packets = [packet.stats for packet in self.packets]
nodes = [node.stats for node in self.nodes.values()]
stats = {
'sim-time': self.env.now,
'packets': packets,
'nodes': nodes,
}
stats.update(self._stats)
return stats
@property
def stats_summary(self):
"""Return summarized stats for the simulation."""
def gather(group):
"""Gather statistics from a group and return a dict of lists."""
stats = defaultdict(list)
for member in group:
for stat, value in member.stats.items():
stats[stat].append(value)
return dict(stats)
packets = gather(self.packets)
nodes = gather(self.nodes.values())
stats = {}
stats['sim-time'] = self.env.now
stats['packets'] = len(self.packets)
stats['recieved'] = len(packets['recieved'])
stats['duplicates'] = sum(packets['recieved']) - stats['recieved']
stats['delivery-ratio'] = stats['recieved'] / stats['packets']
stats['broadcasts'] = sum(nodes['broadcasts'])
stats['delivery-cost'] = stats['broadcasts'] / stats['recieved']
stats.update(self._stats)
return stats
class Packet:
"""An item to route through the network."""
def __init__(self, network, traffic):
self.network = network
self._traffic = traffic
self.recieved = None
self._stats = defaultdict(int)
@property
def source(self):
"""Return source node of the packet."""
return self.network.nodes[self._traffic.source]
@property
def destination(self):
"""Return destination node of the packet."""
return self.network.nodes[self._traffic.destination]
@property
def created(self):
"""Return created time of packet."""
return self._traffic.created
@property
def time_to_live(self):
"""Return time to live of packet."""
return self._traffic.time_to_live
@property
def deadline(self):
"""Return deadline of packet."""
return self.created + self.time_to_live
@property
def expired(self):
"""Is the packet expired."""
return self.network.now > self.deadline
def sent(self, target, reason=None):
"""Send the packet from source to taget."""
self._stats['hops'] += 1
if reason:
self._stats['hops-%s' % reason] += 1
if target is self.destination:
if self.recieved is None:
self.recieved = 0
self._stats['delay'] = self.network.now - self.created
self.recieved += 1
@property
def stats(self):
"""Return statistcs for a packet."""
stats = {}
if self.recieved:
stats['recieved'] = self.recieved
stats.update(self._stats)
return stats
def __len__(self):
"""Return size of packet."""
return self._traffic.payload
class Buffer:
"""A place for a node to hold packets."""
def __init__(self, capacity=None, **options):
"""
Create a buffer.
Stores the order that packet were added to the buffer.
Can be removed from while being iterated over.
Keyword arguments:
capacity -- the maximum number of packets to hold (default infinity).
"""
if capacity is None:
self.capacity = float('inf')
else:
self.capacity = capacity
self.store = OrderedDict()
@property
def full(self):
"""Is the buffer full."""
return len(self.store) >= self.capacity
def add(self, packet):
"""Add a packet to the buffer."""
if self.full:
return False
self.store[packet] = None
return True
def remove(self, packet):
"""Remove packet from the buffer."""
if packet in self.store:
del self.store[packet]
def __contains__(self, packet):
"""Is the packet in the buffer."""
return packet in self.store
def __iter__(self):
"""
Return an iterator for packets in the buffer.
Allows for removing items during iteration.
"""
return iter(list(self.store.keys()))
class Node:
"""Basic implementation of a node implements direct routing."""
class SendFailed(Exception):
"""Raised when a send fails."""
def __init__(self, **options):
"""Create a node."""
self.network = None
self.buffer = Buffer(capacity=options.get('buffer_capacity', None))
self._neighbours = set()
self.options = ChainMap(options, {
'bandwidth': float('inf'),