Commit b5bd0d28 authored by Pedro Oliveira's avatar Pedro Oliveira

assert, unicast routing update... not tested (just for backup)

parent 215be907
from time import time
try:
from threading import _Timer as Timer
except ImportError:
from threading import Timer
class RemainingTimer(Timer):
def __init__(self, interval, function):
super().__init__(interval, function)
self.start_time = time()
def time_remaining(self):
delta_time = time() - self.start_time
return self.interval - delta_time
'''
def test():
print("ola")
x = RemainingTimer(10, test)
x.start()
from time import sleep
for i in range(0, 10):
print(x.time_remaining())
sleep(1)
'''
......@@ -15,6 +15,7 @@ class Interface(object):
def __init__(self, interface_name: str):
self.interface_name = interface_name
ip_interface = netifaces.ifaddresses(interface_name)[netifaces.AF_INET][0]['addr']
self.ip_mask_interface = netifaces.ifaddresses(interface_name)[netifaces.AF_INET][0]['netmask']
self.ip_interface = ip_interface
s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_PIM)
......
......@@ -125,7 +125,10 @@ class InterfacePim(Interface):
def get_neighbor(self, ip):
with self.neighbors_lock.genRlock():
return self.neighbors[ip]
if ip in self.neighbors:
return self.neighbors[ip]
else:
return None
def remove_neighbor(self, ip):
with self.neighbors_lock.genWlock():
......
......@@ -3,6 +3,9 @@ import struct
import netifaces
import threading
import traceback
import ipaddress
from RWLock.RWLock import RWLockWrite
import Main
......@@ -364,8 +367,15 @@ class Kernel:
return None
def neighbor_removed(self, interface_name, neighbor_ip):
def notify_unicast_changes(self, subnet):
# todo
with self.rwlock.genWlock():
for (source_ip, group) in self.routing.keys():
source_ip_obj = ipaddress.ip_address(source_ip)
if source_ip_obj in subnet:
self.routing[(source_ip, group)].network_update()
print(source_ip)
pass
......
......@@ -173,7 +173,7 @@ def list_routing_state():
routing_entries = kernel.routing.values()
vif_indexes = kernel.vif_index_to_name_dic.keys()
t = PrettyTable(['SourceIP', 'GroupIP', 'Interface', 'PruneState', 'AssertState', "Is Forwarding?"])
t = PrettyTable(['SourceIP', 'GroupIP', 'Interface', 'PruneState', 'AssertState', 'LocalMembership', "Is Forwarding?"])
for entry in routing_entries:
ip = entry.source_ip
group = entry.group_ip
......@@ -182,19 +182,21 @@ def list_routing_state():
for index in vif_indexes:
interface_state = entry.interface_state[index]
interface_name = kernel.vif_index_to_name_dic[index]
is_forwarding = interface_state.is_forwarding()
local_membership = type(interface_state._local_membership_state).__name__
try:
if index != upstream_if_index:
prune_state = type(interface_state._prune_state).__name__
assert_state = type(interface_state._assert_state).__name__
is_forwarding = interface_state.is_forwarding()
else:
prune_state = type(interface_state._graft_prune_state).__name__
assert_state = "-"
is_forwarding = "upstream"
except:
prune_state = "-"
assert_state = "-"
t.add_row([ip, group, interface_name, prune_state, assert_state, is_forwarding])
t.add_row([ip, group, interface_name, prune_state, assert_state, local_membership, is_forwarding])
return str(t)
......@@ -222,3 +224,6 @@ def main():
global igmp
igmp = IGMP()
global u
u = UnicastRouting.UnicastRouting()
......@@ -2,6 +2,7 @@ from threading import Timer
import time
from utils import HELLO_HOLD_TIME_NO_TIMEOUT, HELLO_HOLD_TIME_TIMEOUT, TYPE_CHECKING
from threading import Lock
from RWLock.RWLock import RWLockWrite
import Main
if TYPE_CHECKING:
from InterfacePIM import InterfacePim
......@@ -24,6 +25,10 @@ class Neighbor:
self.time_of_last_update = time.time()
self.neighbor_lock = Lock()
self.tree_interface_nlt_subscribers = []
self.tree_interface_nlt_subscribers_lock = RWLockWrite()
# send hello to new neighbor
#self.contact_interface.send_hello()
# todo RANDOM DELAY??? => DO NOTHING... EVENTUALLY THE HELLO MESSAGE WILL BE SENT
......@@ -71,11 +76,14 @@ class Neighbor:
del self.contact_interface.neighbors[self.ip]
# notify interfaces which have this neighbor as AssertWinner
with self.tree_interface_nlt_subscribers_lock.genRlock():
for tree_if in self.tree_interface_nlt_subscribers:
tree_if.assert_winner_nlt_expires()
def reset(self):
interface_name = self.contact_interface.interface_name
neighbor_ip = self.ip
Main.kernel.neighbor_removed(interface_name, neighbor_ip)
# todo new neighbor
return
def receive_hello(self, generation_id, hello_hold_time):
......@@ -85,3 +93,15 @@ class Neighbor:
self.time_of_last_update = time.time()
self.set_generation_id(generation_id)
self.set_hello_hold_time(hello_hold_time)
def subscribe_nlt_expiration(self, tree_if):
with self.tree_interface_nlt_subscribers_lock.genWlock():
if tree_if not in self.tree_interface_nlt_subscribers:
self.tree_interface_nlt_subscribers.append(tree_if)
def unsubscribe_nlt_expiration(self, tree_if):
with self.tree_interface_nlt_subscribers_lock.genWlock():
if tree_if in self.tree_interface_nlt_subscribers:
self.tree_interface_nlt_subscribers.remove(tree_if)
......@@ -2,6 +2,7 @@ import struct
import socket
from Packet.PacketPimEncodedGroupAddress import PacketPimEncodedGroupAddress
from Packet.PacketPimEncodedUnicastAddress import PacketPimEncodedUnicastAddress
from tree.globals import ASSERT_CANCEL_METRIC
'''
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
......@@ -29,12 +30,15 @@ class PacketPimAssert:
PIM_HDR_ASSERT_v4_LEN = struct.calcsize(PIM_HDR_ASSERT_v4)
PIM_HDR_ASSERT_v6_LEN = struct.calcsize(PIM_HDR_ASSERT_v6)
def __init__(self, multicast_group_address: str or bytes, source_address: str or bytes, metric_preference, metric):
def __init__(self, multicast_group_address: str or bytes, source_address: str or bytes, metric_preference: int, metric: int or float):
if type(multicast_group_address) is bytes:
multicast_group_address = socket.inet_ntoa(multicast_group_address)
if type(source_address) is bytes:
source_address = socket.inet_ntoa(source_address)
if metric_preference > ASSERT_CANCEL_METRIC:
metric_preference = ASSERT_CANCEL_METRIC
if metric > ASSERT_CANCEL_METRIC:
metric = ASSERT_CANCEL_METRIC
self.multicast_group_address = multicast_group_address
self.source_address = source_address
self.metric_preference = metric_preference
......
......@@ -65,9 +65,10 @@ class PacketPimHeader(PacketPayload):
raise Exception
msg_to_checksum = data[0:2] + b'\x00\x00' + data[4:]
print("checksum calculated: " + str(checksum(msg_to_checksum)))
if checksum(msg_to_checksum) != rcv_checksum:
print("wrong checksum")
print("checksum calculated: " + str(checksum(msg_to_checksum)))
print("checksum recv: " + str(rcv_checksum))
raise Exception
pim_payload = data[PacketPimHeader.PIM_HDR_LEN:]
......
This diff is collapsed.
......@@ -6,6 +6,7 @@ from tree.tree_if_upstream import TreeInterfaceUpstream
from tree.tree_if_downstream import TreeInterfaceDownstream
from .tree_interface import TreeInterface
from threading import Timer, Lock, RLock
from tree.metric import AssertMetric
import UnicastRouting
class KernelEntry:
......@@ -17,7 +18,25 @@ class KernelEntry:
self.group_ip = group_ip
# ip of neighbor of the rpf
self._rpf_node = None
#next_hop = UnicastRouting.get_route(source_ip)["gateway"]
#self.rpf_node = source_ip if next_hop is None else next_hop
next_hop = UnicastRouting.get_route(source_ip)["gateway"]
multipaths = UnicastRouting.get_route(source_ip)["multipath"]
self.rpf_node = next_hop if next_hop is not None else source_ip
print("MUL", multipaths)
#self.rpf_node = multipaths[0]["gateway"]
for m in multipaths:
if m["gateway"] is None:
self.rpf_node = source_ip
break
else:
self.rpf_node = m["gateway"]
print("RPF_NODE:", UnicastRouting.get_route(source_ip))
print(self.rpf_node == source_ip)
# (S,G) starts IG state
self._was_olist_null = None
......@@ -81,20 +100,29 @@ class KernelEntry:
def recv_assert_msg(self, index, packet):
print("recv assert")
self.interface_state[index].recv_assert_msg()
pkt_assert = packet.payload.payload
metric = pkt_assert.metric
metric_preference = pkt_assert.metric_preference
assert_sender_ip = packet.ip_header.ip_src
received_metric = AssertMetric(metric_preference=metric_preference, route_metric=metric, ip_address=assert_sender_ip)
self.interface_state[index].recv_assert_msg(received_metric)
def recv_prune_msg(self, index, packet):
print("recv prune msg")
self.interface_state[index].recv_prune_msg()
holdtime = packet.payload.payload.hold_time
upstream_neighbor_address = packet.payload.payload.upstream_neighbor_address
self.interface_state[index].recv_prune_msg(upstream_neighbor_address=upstream_neighbor_address, holdtime=holdtime)
def recv_join_msg(self, index, packet):
print("recv join msg")
print("type: ")
self.interface_state[index].recv_join_msg()
upstream_neighbor_address = packet.payload.payload.upstream_neighbor_address
self.interface_state[index].recv_join_msg(upstream_neighbor_address)
def recv_graft_msg(self, index, packet):
print("recv graft msg")
self.interface_state[index].recv_graft_msg()
upstream_neighbor_address = packet.payload.payload.upstream_neighbor_address
self.interface_state[index].recv_graft_msg(upstream_neighbor_address)
def recv_graft_ack_msg(self, index, packet):
print("recv graft ack msg")
......@@ -105,13 +133,47 @@ class KernelEntry:
prune_indicator = 1
self.interface_state[index].recv_state_refresh_msg(prune_indicator)
def network_update(self, change, args):
#todo
return
###############################################################
# Unicast Changes to RPF
###############################################################
def network_update(self):
with self.CHANGE_STATE_LOCK:
#next_hop = UnicastRouting.get_route(self.source_ip)["gateway"]
#rpf_node = self.source_ip if next_hop is None else next_hop
next_hop = UnicastRouting.get_route(self.source_ip)["gateway"]
multipaths = UnicastRouting.get_route(self.source_ip)["multipath"]
rpf_node = next_hop
print("MUL", multipaths)
# self.rpf_node = multipaths[0]["gateway"]
for m in multipaths:
if m["gateway"] is None:
rpf_node = self.source_ip
break
else:
rpf_node = m["gateway"]
print("RPF_NODE:", UnicastRouting.get_route(self.source_ip))
print(self.rpf_node == self.source_ip)
new_inbound_interface_index = Main.kernel.vif_dic[self.check_rpf()]
if new_inbound_interface_index != self.inbound_interface_index:
# todo: criar novo upstream e downstream interface
# todo: stop upstream e downstream
#self.interface_state[self.inbound_interface_index].stop()
#self.interface_state[new_inbound_interface_index].stop()
#Unicast routing or Assert state causes RPF'(S) to change,
self.interface_state[self.inbound_interface_index] = TreeInterfaceDownstream
self.interface_state[new_inbound_interface_index] = TreeInterfaceUpstream
self.inbound_interface_index = new_inbound_interface_index
if self.rpf_node != rpf_node:
self.rpf_node = rpf_node
self.interface_state[self.inbound_interface_index].change_rpf(self._was_olist_null)
def update(self, caller, arg):
#todo
......
This diff is collapsed.
......@@ -133,7 +133,8 @@ class NoInfo(DownstreamStateABS):
@type interface: TreeInterfaceDownstreamDownstream
"""
assert False
#assert False
return
@staticmethod
def PTexpires(interface: "TreeInterfaceDownstream"):
......@@ -142,7 +143,8 @@ class NoInfo(DownstreamStateABS):
@type interface: TreeInterfaceDownstreamDownstream
"""
assert False
#assert False
return
@staticmethod
def is_now_RPF_Interface(interface: "TreeInterfaceDownstream"):
......@@ -230,7 +232,8 @@ class PrunePending(DownstreamStateABS):
#pt = interface.get_pt()
#pt.start(interface.get_lpht() - pim_globals.JT_OVERRIDE_INTERVAL)
interface.set_prune_timer(prune_holdtime - pim_globals.JT_OVERRIDE_INTERVAL)
#interface.set_prune_timer(prune_holdtime - pim_globals.JT_OVERRIDE_INTERVAL)
interface.set_prune_timer(interface.get_received_prune_holdtime() - pim_globals.JT_OVERRIDE_INTERVAL)
interface.send_pruneecho()
......@@ -245,7 +248,8 @@ class PrunePending(DownstreamStateABS):
@type interface: TreeInterfaceDownstreamDownstream
"""
assert False
#assert False
return
@staticmethod
def is_now_RPF_Interface(interface: "TreeInterfaceDownstream"):
......@@ -259,6 +263,8 @@ class PrunePending(DownstreamStateABS):
#interface.get_ppt().stop()
interface.clear_prune_pending_timer()
interface.set_prune_state(DownstreamState.NoInfo)
print('is_now_RPF_Interface, PP -> NI')
@staticmethod
......@@ -293,7 +299,8 @@ class Pruned(DownstreamStateABS):
# ppt.set_timer(interface.get_lpht())
# ppt.reset()
# todo nao percebo... corrigir 0
if holdtime > 0:
#if holdtime > 0:
if interface.get_received_prune_holdtime() > interface.remaining_prune_timer():
interface.set_prune_timer(holdtime)
print('receivedPrune, P -> P')
......@@ -334,7 +341,8 @@ class Pruned(DownstreamStateABS):
@type interface: TreeInterfaceDownstreamDownstream
"""
assert False
#assert False
return
@staticmethod
def PTexpires(interface: "TreeInterfaceDownstream"):
......@@ -358,6 +366,7 @@ class Pruned(DownstreamStateABS):
# todo ver melhor
#interface.get_pt().stop()
interface.clear_prune_timer()
interface.set_prune_state(DownstreamState.NoInfo)
print('is_now_RPF_Interface, P -> NI')
......@@ -372,7 +381,8 @@ class Pruned(DownstreamStateABS):
#pt = interface.get_pt()
#pt.set_timer(interface.get_lpht())
#pt.reset()
interface.set_prune_timer(interface.get_lpht())
#interface.set_prune_timer(interface.get_lpht())
interface.set_prune_timer(interface.get_received_prune_holdtime())
print('send_state_refresh, P -> P')
......
......@@ -13,3 +13,7 @@ OVERRIDE_INTERVAL = 2.5
REFRESH_INTERVAL = 60 # State Refresh Interval
SOURCE_LIFETIME = 210
T_LIMIT = 210
ASSERT_CANCEL_METRIC = 0xFFFFFFFF
\ No newline at end of file
from abc import ABCMeta, abstractmethod
class LocalMembershipStateABC(metaclass=ABCMeta):
@staticmethod
@abstractmethod
def has_members():
raise NotImplementedError
class NoInfo(LocalMembershipStateABC):
@staticmethod
def has_members():
return False
class Include(LocalMembershipStateABC):
@staticmethod
def has_members():
return True
class LocalMembership():
NoInfo = NoInfo()
Include = Include()
\ No newline at end of file
'''
Created on Sep 8, 2014
@author: alex
'''
import ipaddress
class AssertMetric(object):
'''
Note: we consider the node name the ip of the metric.
'''
def __init__(self):
'''
@type tree_if: TreeInterface
'''
self._pref = None
self._metric = None
self._node = None
def is_worse_than(self, other):
if self._pref != other.pref:
return self._pref > other.pref
def __init__(self, metric_preference: int or float = float("Inf"), route_metric: int or float = float("Inf"), ip_address: str = "0.0.0.0"):
if type(ip_address) is str:
ip_address = ipaddress.ip_address(ip_address)
elif self._metric != other.metric:
return self._metric > other.metric
self._metric_preference = metric_preference
self._route_metric = route_metric
self._ip_address = ip_address
def is_better_than(self, other):
if self.metric_preference != other.metric_preference:
return self.metric_preference < other.metric_preference
elif self.route_metric != other.route_metric:
return self.route_metric < other.route_metric
else:
return self._node.__str__() <= other.node.__str__()
return self.ip_address > other.ip_address
@property
def pref(self):
return self._pref
@property
def metric(self):
return self._metric
@property
def node(self):
return self._node
def is_worse(self, other):
return not self.is_better_than(other)
@staticmethod
def infinite_assert_metric():
'''
@type metric: AssertMetric
'''
metric = AssertMetric()
metric._pref = 1
metric._metric = float("Inf")
metric._node = ""
return metric
return AssertMetric(metric_preference=float("Inf"), route_metric=float("Inf"), ip_address="0.0.0.0")
@staticmethod
def spt_assert_metric(tree_if):
......@@ -59,15 +37,40 @@ class AssertMetric(object):
@type metric: AssertMetric
@type tree_if: TreeInterface
'''
metric = AssertMetric()
(source_ip, _) = tree_if.get_tree_id()
import UnicastRouting
metric_preference, metric_cost = UnicastRouting.get_metric(source_ip)
return AssertMetric(metric_preference, metric_cost, tree_if.get_ip())
def i_am_assert_winner(self, tree_if):
interface_ip = ipaddress.ip_address(tree_if.get_ip())
return self._ip_address == interface_ip
@property
def metric_preference(self):
return self._metric_preference
@metric_preference.setter
def metric_preference(self, value):
self._metric_preference = value
metric._pref = 1 # TODO: ver isto melhor no route preference
metric._metric = tree_if.metric
metric._node = tree_if.node
@property
def route_metric(self):
return self._route_metric
@route_metric.setter
def route_metric(self, value):
self._route_metric = value
@property
def ip_address(self):
return self._ip_address
return metric
@ip_address.setter
def ip_address(self, value):
if type(value) is str:
value = ipaddress.ip_address(value)
# overrides
def __str__(self):
return "AssertMetric<{}:{}:{}>".format(self._pref, self._metric,
self._node)
self._ip_address = value
......@@ -7,6 +7,7 @@ Created on Jul 16, 2015
#from convergence import Convergence
#from des.event.timer import Timer
from threading import Timer
from CustomTimer.RemainingTimer import RemainingTimer
from .assert_ import AssertState, AssertStateABC
#from .messages.assert_msg import SFMRAssertMsg
#from .messages.reset import SFMResetMsg
......@@ -22,18 +23,18 @@ class TreeInterfaceDownstream(TreeInterface):
TreeInterface.__init__(self, kernel_entry, interface_id)
# State
self._local_membership_state = None # todo NoInfo or Include
#self._local_membership_state = None # todo NoInfo or Include
# Prune State
self._prune_state = DownstreamState.NoInfo
self._prune_pending_timer = None
self._prune_timer = None
#self._prune_state = DownstreamState.NoInfo
#self._prune_pending_timer = None
#self._prune_timer = None
# Assert Winner State
self._assert_state = AssertState.Winner
self._assert_timer = None
self._assert_winner_ip = None
self._assert_winner_metric = None
#self._assert_state = AssertState.NoInfo
#self._assert_timer = None
#self._assert_winner_ip = None
#self._assert_winner_metric = None
#self.set_dipt_timer()
#self.send_prune()
......@@ -58,6 +59,9 @@ class TreeInterfaceDownstream(TreeInterface):
def is_prune_timer_running(self):
return self._prune_timer is not None and self._prune_timer.is_alive()
def remaining_prune_timer(self):
return 0 if not self._prune_timer else self._prune_timer.time_remaining()
##########################################
# Set timers
##########################################
......@@ -72,7 +76,8 @@ class TreeInterfaceDownstream(TreeInterface):
def set_prune_timer(self, time):
self.clear_prune_timer()
self._prune_timer = Timer(time, self.prune_timeout)
#self._prune_timer = Timer(time, self.prune_timeout)
self._prune_timer = RemainingTimer(time, self.prune_timeout)
self._prune_timer.start()
def clear_prune_timer(self):
......@@ -88,27 +93,32 @@ class TreeInterfaceDownstream(TreeInterface):
def prune_timeout(self):
self._prune_state.PTexpires(self)
###########################################
# Recv packets
###########################################
def recv_data_msg(self):
self._assert_state.receivedDataFromDownstreamIf(self)
# Override
def recv_prune_msg(self):
self._prune_state.receivedPrune(self, 0)
def recv_prune_msg(self, upstream_neighbor_address, holdtime):
super().recv_prune_msg(upstream_neighbor_address, holdtime)
# set here???
self.set_receceived_prune_holdtime(holdtime)
self._prune_state.receivedPrune(self, holdtime)
# Override
def recv_join_msg(self):
def recv_join_msg(self, upstream_neighbor_address):
super().recv_join_msg(upstream_neighbor_address)
self._prune_state.receivedJoin(self)
# Override
def recv_graft_msg(self):
def recv_graft_msg(self, upstream_neighbor_address):
super().recv_graft_msg(upstream_neighbor_address)
self._prune_state.receivedGraft(self)
# Override
def is_forwarding(self):
return ((len(self.get_interface().neighbors) >= 1 and not self.is_pruned()) or self.igmp_has_members()) and not self.lost_assert()
......@@ -133,15 +143,6 @@ class TreeInterfaceDownstream(TreeInterface):
def get_metric(self):
return AssertMetric.spt_assert_metric(self)
def _set_assert_state(self, value: AssertStateABC):
with self.get_state_lock():
if value != self._assert_state:
self._assert_state = value
self.change_tree()
self.evaluate_ingroup()
#Convergence.mark_change()
def _get_winner_metric(self):
'''
@rtype: SFMRAssertMetric
......
......@@ -24,6 +24,9 @@ class TreeInterfaceUpstream(TreeInterface):
self._originator_state = None
if self.is_S_directly_conn():
self._graft_prune_state.sourceIsNowDirectConnect(self)
##########################################
# Set state
##########################################
......@@ -95,7 +98,7 @@ class TreeInterfaceUpstream(TreeInterface):
###########################################
def recv_data_msg(self):
# todo check olist
if self.is_olist_null() and not self.is_prune_limit_timer_running():
if self.is_olist_null() and not self.is_prune_limit_timer_running() and not self.is_S_directly_conn():
self._graft_prune_state.dataArrivesRPFinterface_OListNull_PLTstoped(self)
def recv_state_refresh_msg(self, prune_indicator: int):
......@@ -105,11 +108,13 @@ class TreeInterfaceUpstream(TreeInterface):
elif prune_indicator == 0 and not self.is_prune_limit_timer_running():
self._graft_prune_state.stateRefreshArrivesRPFnbr_pruneIs0_PLTstoped(self)
def recv_join_msg(self):
def recv_join_msg(self, upstream_neighbor_address):
super().recv_join_msg(upstream_neighbor_address)
# todo check rpf nbr
self._graft_prune_state.seeJoinToRPFnbr(self)
def recv_prune_msg(self):
def recv_prune_msg(self, upstream_neighbor_address, holdtime):
super().recv_prune_msg(upstream_neighbor_address, holdtime)
self._graft_prune_state.seePrune(self)
def recv_graft_ack_msg(self):
......@@ -128,8 +133,11 @@ class TreeInterfaceUpstream(TreeInterface):
###########################################
# Changes on Unicast Routing Table
###########################################
# todo
def change_rpf(self, olist_is_null):
if olist_is_null:
self._graft_prune_state.RPFnbrChanges_olistIsNull()
else:
self._graft_prune_state.RPFnbrChanges_olistIsNotNull()
#Override
def is_forwarding(self):
......@@ -149,5 +157,5 @@ class TreeInterfaceUpstream(TreeInterface):
@property
def t_override(self):
oi = self.get_interface()._override_internal
oi = self.get_interface()._override_interval
return random.uniform(0, oi)
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment