Skip to content
Snippets Groups Projects
Commit 1f9738f4 authored by Jarrod Pas's avatar Jarrod Pas
Browse files

Refactore core

parent 0f6bb16b
No related branches found
No related tags found
2 merge requests!3Version 0.2,!1Full rewrite
from .network import *
"""pydtn is a module for simulating delay tolerant networks."""
__all__ = [
'Contact',
'Traffic',
'Network',
'Buffer',
'Node',
'EpidemicNode',
]
__version__ = '0.1'
__author__ = 'Jarrod Pas <j.pas@usask.ca>'
from collections import ChainMap, defaultdict, namedtuple, OrderedDict
import csv
from itertools import count
from random import Random
from time import time
import simpy
class Network:
"""
Network simulation.
TODO: elaborate.
"""
def __init__(self, nodes=None, trace=None, traffic=None):
"""Create a network."""
self.env = simpy.Environment()
self._stats = defaultdict(int)
self._stats['wall-time'] = 0
if nodes is None:
nodes = {}
self.nodes = nodes
if not trace:
trace = []
self.trace = trace
self.trace_done = simpy.Event(self.env)
if not traffic:
traffic = []
self.traffic = traffic
self.packets = []
# start components in order of priority
self.env.process(self.traffic_loop())
self.env.process(self.trace_loop())
for node in self.nodes.values():
node.start(self)
def traffic_loop(self):
"""Trigger traffic for nodes."""
for traffic in self.traffic:
if traffic.created > self.now:
yield self.env.timeout(traffic.created - self.now)
packet = Packet(self, traffic)
self.packets.append(packet)
packet.source.recv(packet)
def trace_loop(self):
"""Trigger contacts for nodes."""
for contact in self.trace:
if contact.time > self.now:
yield self.env.timeout(contact.time - self.now)
node_a = self.nodes[contact.a]
node_b = self.nodes[contact.b]
if contact.join:
node_a.join(node_b)
node_b.join(node_a)
else:
node_a.leave(node_b)
node_b.leave(node_a)
self.trace_done.succeed()
@property
def now(self):
"""Return current simulation time."""
return self.env.now
def run(self, until=None):
"""
Run the simulation.
Keyword Arguments:
until -- tick to run the simulation to and stop
"""
if until is None:
until = self.trace_done
start = time()
self.env.run(until=until)
end = time()
self._stats['wall-time'] += end - start
@property
def stats(self):
"""Return statistics for the simulation."""
def gather(group):
"""Gather statistics from a group and return a dict of lists."""
stats = defaultdict(list)
for member in group:
for stat, value in member.stats.items():
stats[stat].append(value)
return dict(stats)
packet_stats = gather(self.packets)
node_stats = gather(self.nodes.values())
stats = {}
stats['sim-time'] = self.env.now
stats['broadcasts'] = sum(node_stats['broadcasts'])
stats['packets'] = len(self.packets)
delivered = list(filter(lambda r: r > 0, packet_stats['recieved']))
stats['delivered'] = len(delivered)
stats['delivery-ratio'] = stats['delivered'] / stats['packets']
stats['delivery-cost'] = stats['broadcasts'] / stats['delivered']
stats['delay'] = sum(packet_stats['delay']) / stats['delivered']
stats.update(self._stats)
return stats
class Packet:
"""An item to route through the network."""
def __init__(self, network, traffic):
self.network = network
self._traffic = traffic
self.recieved = None
self._stats = defaultdict(int)
@property
def source(self):
"""Return source node of the packet."""
return self.network.nodes[self._traffic.source]
@property
def destination(self):
"""Return destination node of the packet."""
return self.network.nodes[self._traffic.destination]
@property
def created(self):
"""Return created time of packet."""
return self._traffic.created
@property
def time_to_live(self):
"""Return time to live of packet."""
return self._traffic.time_to_live
@property
def deadline(self):
"""Return deadline of packet."""
return self.created + self.time_to_live
@property
def expired(self):
"""Is the packet expired."""
return self.network.now > self.deadline
def sent(self, target, reason=None):
"""Send the packet from source to taget."""
self._stats['hops'] += 1
if reason:
self._stats['hops-%s' % reason] += 1
if target is self.destination:
if self.recieved is None:
self.recieved = 0
self._stats['delay'] = self.network.now - self.created
self.recieved += 1
@property
def stats(self):
"""Return statistcs for a packet."""
stats = {}
if self.recieved:
stats['recieved'] = self.recieved
stats.update(self._stats)
return stats
def __len__(self):
"""Return size of packet."""
return self._traffic.payload
class Buffer:
"""A place for a node to hold packets."""
def __init__(self, **options):
"""
Create a buffer.
Stores the order that packet were added to the buffer.
Can be removed from while being iterated over.
Keyword arguments:
capacity -- the maximum number of packets to hold (default infinity).
"""
self.capacity = options.get('capacity', simpy.core.Infinity)
self.store = OrderedDict()
@property
def full(self):
"""Is the buffer full."""
return len(self.store) >= self.capacity
def add(self, packet):
"""Add a packet to the buffer."""
if self.full:
return False
self.store[packet] = None
return True
def remove(self, packet):
"""Remove packet from the buffer."""
if packet in self.store:
del self.store[packet]
def __contains__(self, packet):
"""Is the packet in the buffer."""
return packet in self.store
def __iter__(self):
"""
Return an iterator for packets in the buffer.
Allows for removing items during iteration.
"""
return iter(list(self.store.keys()))
class Node:
"""Basic implementation of a node implements direct routing."""
class SendFailed(Exception):
"""Raised when a send fails."""
def __init__(self, **options):
"""Create a node."""
self.network = None
self.buffer = Buffer()
self._neighbours = set()
self.options = ChainMap(options, {
'bandwidth': simpy.core.Infinity,
'tick_rate': 1,
})
self.stats = defaultdict(int)
self.stats['broadcasts'] = 0
def start(self, network):
"""
Start event loop.
If it has already been started do nothing.
"""
if self.network is not None:
return
self.network = network
self.network.env.process(self.tick())
def tick(self):
"""Thread of execution for a node."""
tick = self.options['tick_rate']
while True:
start = self.network.now
yield from self.forward_all()
delay = tick - (self.network.now - start) % tick
yield self.network.env.timeout(delay)
def join(self, node):
"""
Approach the neighbouhood of this node.
This is an idempotent operation.
"""
if node not in self._neighbours:
self._neighbours.add(node)
def leave(self, node):
"""
Leave the neighbouhood of this node.
This is an idempotent operation.
"""
if node in self._neighbours:
self._neighbours.remove(node)
@property
def neighbours(self):
"""Return nodes within transmission range."""
return frozenset(self._neighbours)
def forward(self, packet):
"""Forward packet directly to their destination if possible."""
if packet.destination in self.neighbours:
return {packet.destination: 'direct'}
# forward to nobody
return {}
def forward_all(self):
"""Try to forward all packets."""
env = self.network.env
for packet in self.buffer:
if packet.expired:
self.packet_expiry(packet)
continue
forwards = self.forward(packet)
if forwards:
delay = len(packet) / self.options['bandwidth']
yield env.timeout(delay)
self.stats['broadcasts'] += 1
for target, reason in forwards.items():
try:
self.send(packet, target, reason)
self.send_success(packet, target)
except Node.SendFailed:
self.send_failure(packet, target)
def send(self, packet, target, reason=None):
"""
Send a packet to another node.
Keyword Arguments:
reason -- why the packet was sent (optional)
"""
if target in self.neighbours:
target.recv(packet, source=target)
packet.sent(target, reason=reason)
else:
raise Node.SendFailed('nodes are not neighbours')
def send_success(self, packet, target):
"""
Call when a send succeeds.
Removes the sent packet from the buffer.
"""
self.buffer.remove(packet)
def send_failure(self, packet, target):
"""Call when a send fails."""
pass
def packet_expiry(self, packet):
"""Call when a packet expires."""
self.buffer.remove(packet)
def recv(self, packet, source=None):
"""
Recieve a packet.
If the packet has reached it's destination notes the time then
"""
if packet.destination is self:
return
if not self.buffer.add(packet):
raise Node.SendFailed('buffer full')
def __repr__(self):
"""Return representation."""
return '%s(**%r)' % (
self.__class__.__name__,
self.options
)
class EpidemicNode(Node):
"""Node which forwards epidemically."""
def __init__(self, **options):
"""Create an epidemic node."""
super().__init__(**options)
self.__sent = defaultdict(set)
def forward(self, packet):
"""
Forward based on the epidemic heuristic.
Epidemic Heuristic:
- Forward the packet to all neighbours that I have not forwarded
to so far.
"""
forward = {
neighbour: 'epidemic'
for neighbour in self.neighbours
if neighbour not in self.__sent[packet]
}
return forward
def send_success(self, packet, target):
"""
Call when a send succeeds.
Adds target to packet tracking.
"""
super().send_success(packet, target)
self.__sent[packet].add(target)
def packet_expiry(self, packet):
"""
Call when a packet expires.
Removes packet from tracking.
"""
super().packet_expiry(packet)
if packet in self.__sent:
del self.__sent[packet]
Contact = namedtuple('Contact', ['time', 'a', 'b', 'join'])
def random_trace(nodes, seed=None, step=1):
"""Generate a random contact trace."""
random = Random(seed)
for now in count(step=step):
join = bool(random.getrandbits(1))
node_a, node_b = random.sample(nodes, 2)
yield Contact(now, node_a, node_b, join)
def csv_trace(path):
"""Generate contact trace from csv file."""
with open(path) as trace_file:
reader = csv.reader(trace_file)
# skip header
next(reader)
# skip first line, it is additional stats about the trace:
# duration, placeholder, placehold, node_count
next(reader)
for row in reader:
now, source, destination, join = map(int, row)
yield Contact(now, source, destination, join)
Traffic = namedtuple('Traffic', [
'source', 'destination', 'created', 'time_to_live', 'payload'
])
def random_traffic(nodes, step=1, seed=None, time_to_live=None, payload=None):
"""Generate traffic from random source to random destination every step."""
random = Random(seed)
if time_to_live is None:
time_to_live = simpy.core.Infinity
if payload is None:
payload = 0
for created in count(step=step):
source, destination = random.sample(nodes, 2)
yield Traffic(source, destination, created, time_to_live, payload)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment