from socket import AF_INET, AF_INET6 from scapy.all import * from scapy.packet import * from scapy.fields import * from framework import * from vpp_object import * from util import NumericConstant class BFDDiagCode(NumericConstant): """ BFD Diagnostic Code """ no_diagnostic = 0 control_detection_time_expired = 1 echo_function_failed = 2 neighbor_signaled_session_down = 3 forwarding_plane_reset = 4 path_down = 5 concatenated_path_down = 6 administratively_down = 7 reverse_concatenated_path_down = 8 desc_dict = { no_diagnostic: "No diagnostic", control_detection_time_expired: "Control Detection Time Expired", echo_function_failed: "Echo Function Failed", neighbor_signaled_session_down: "Neighbor Signaled Session Down", forwarding_plane_reset: "Forwarding Plane Reset", path_down: "Path Down", concatenated_path_down: "Concatenated Path Down", administratively_down: "Administratively Down", reverse_concatenated_path_down: "Reverse Concatenated Path Down", } def __init__(self, value): NumericConstant.__init__(self, value) class BFDState(NumericConstant): """ BFD State """ admin_down = 0 down = 1 init = 2 up = 3 desc_dict = { admin_down: "AdminDown", down: "Down", init: "Init", up: "Up", } def __init__(self, value): NumericConstant.__init__(self, value) class BFD(Packet): udp_dport = 3784 #: BFD destination port per RFC 5881 udp_sport_min = 49152 #: BFD source port min value per RFC 5881 udp_sport_max = 65535 #: BFD source port max value per RFC 5881 name = "BFD" fields_desc = [ BitField("version", 1, 3), BitEnumField("diag", 0, 5, BFDDiagCode.desc_dict), BitEnumField("state", 0, 2, BFDState.desc_dict), FlagsField("flags", 0, 6, ['P', 'F', 'C', 'A', 'D', 'M']), XByteField("detect_mult", 0), XByteField("length", 24), BitField("my_discriminator", 0, 32), BitField("your_discriminator", 0, 32), BitField("desired_min_tx_interval", 0, 32), BitField("required_min_rx_interval", 0, 32), BitField("required_min_echo_rx_interval", 0, 32)] def mysummary(self): return self.sprintf("BFD(my_disc=%BFD.my_discriminator%," "your_disc=%BFD.your_discriminator%)") # glue the BFD packet class to scapy parser bind_layers(UDP, BFD, dport=BFD.udp_dport) class VppBFDUDPSession(VppObject): """ Represents BFD UDP session in VPP """ @property def test(self): """ Test which created this session """ return self._test @property def interface(self): """ Interface on which this session lives """ return self._interface @property def af(self): """ Address family - AF_INET or AF_INET6 """ return self._af @property def bs_index(self): """ BFD session index from VPP """ if self._bs_index is not None: return self._bs_index raise NotConfiguredException("not configured") @property def local_addr(self): """ BFD session local address (VPP address) """ if self._local_addr is None: return self._interface.local_ip4 return self._local_addr @property def local_addr_n(self): """ BFD session local address (VPP address) - raw, suitable for API """ if self._local_addr is None: return self._interface.local_ip4n return self._local_addr_n @property def peer_addr(self): """ BFD session peer address """ return self._peer_addr @property def peer_addr_n(self): """ BFD session peer address - raw, suitable for API """ return self._peer_addr_n @property def state(self): """ BFD session state """ result = self.test.vapi.bfd_udp_session_dump() session = None for s in result: if s.sw_if_index == self.interface.sw_if_index: if self.af == AF_INET \ and s.is_ipv6 == 0 \ and self.interface.local_ip4n == s.local_addr[:4] \ and self.interface.remote_ip4n == s.peer_addr[:4]: session = s break if session is None: raise Exception("Could not find BFD session in VPP response: %s" % repr(result)) return session.state @property def desired_min_tx(self): return self._desired_min_tx @property def required_min_rx(self): return self._required_min_rx @property def detect_mult(self): return self._detect_mult def __init__(self, test, interface, peer_addr, local_addr=None, af=AF_INET, desired_min_tx=100000, required_min_rx=100000, detect_mult=3): self._test = test self._interface = interface self._af = af self._local_addr = local_addr self._peer_addr = peer_addr self._peer_addr_n = socket.inet_pton(af, peer_addr) self._bs_index = None self._desired_min_tx = desired_min_tx self._required_min_rx = required_min_rx self._detect_mult = detect_mult def add_vpp_config(self): is_ipv6 = 1 if AF_INET6 == self.af else 0 result = self.test.vapi.bfd_udp_add( self._interface.sw_if_index, self.desired_min_tx, self.required_min_rx, self.detect_mult, self.local_addr_n, self.peer_addr_n, is_ipv6=is_ipv6) self._bs_index = result.bs_index self._test.registry.register(self, self.test.logger) def query_vpp_config(self): result = self.test.vapi.bfd_udp_session_dump() session = None for s in result: if s.sw_if_index == self.interface.sw_if_index: if self.af == AF_INET \ and s.is_ipv6 == 0 \ and self.interface.local_ip4n == s.local_addr[:4] \ and self.interface.remote_ip4n == s.peer_addr[:4]: session = s break if session is None: return False return True def remove_vpp_config(self): if self._bs_index is not None: is_ipv6 = 1 if AF_INET6 == self._af else 0 self.test.vapi.bfd_udp_del( self._interface.sw_if_index, self.local_addr_n, self.peer_addr_n, is_ipv6=is_ipv6) def object_id(self): return "bfd-udp-%d" % self.bs_index def __str__(self): return self.object_id() def admin_up(self): self.test.vapi.bfd_session_set_flags(self.bs_index, 1)