1 from socket import AF_INET, AF_INET6
2 from scapy.all import *
3 from scapy.packet import *
4 from scapy.fields import *
5 from framework import *
6 from vpp_object import *
7 from util import NumericConstant
10 class BFDDiagCode(NumericConstant):
11 """ BFD Diagnostic Code """
13 control_detection_time_expired = 1
14 echo_function_failed = 2
15 neighbor_signaled_session_down = 3
16 forwarding_plane_reset = 4
18 concatenated_path_down = 6
19 administratively_down = 7
20 reverse_concatenated_path_down = 8
23 no_diagnostic: "No diagnostic",
24 control_detection_time_expired: "Control Detection Time Expired",
25 echo_function_failed: "Echo Function Failed",
26 neighbor_signaled_session_down: "Neighbor Signaled Session Down",
27 forwarding_plane_reset: "Forwarding Plane Reset",
28 path_down: "Path Down",
29 concatenated_path_down: "Concatenated Path Down",
30 administratively_down: "Administratively Down",
31 reverse_concatenated_path_down: "Reverse Concatenated Path Down",
34 def __init__(self, value):
35 NumericConstant.__init__(self, value)
38 class BFDState(NumericConstant):
46 admin_down: "AdminDown",
52 def __init__(self, value):
53 NumericConstant.__init__(self, value)
58 udp_dport = 3784 #: BFD destination port per RFC 5881
59 udp_sport_min = 49152 #: BFD source port min value per RFC 5881
60 udp_sport_max = 65535 #: BFD source port max value per RFC 5881
65 BitField("version", 1, 3),
66 BitEnumField("diag", 0, 5, BFDDiagCode.desc_dict),
67 BitEnumField("state", 0, 2, BFDState.desc_dict),
68 FlagsField("flags", 0, 6, ['P', 'F', 'C', 'A', 'D', 'M']),
69 XByteField("detect_mult", 0),
70 XByteField("length", 24),
71 BitField("my_discriminator", 0, 32),
72 BitField("your_discriminator", 0, 32),
73 BitField("desired_min_tx_interval", 0, 32),
74 BitField("required_min_rx_interval", 0, 32),
75 BitField("required_min_echo_rx_interval", 0, 32)]
78 return self.sprintf("BFD(my_disc=%BFD.my_discriminator%,"
79 "your_disc=%BFD.your_discriminator%)")
81 # glue the BFD packet class to scapy parser
82 bind_layers(UDP, BFD, dport=BFD.udp_dport)
85 class VppBFDUDPSession(VppObject):
86 """ Represents BFD UDP session in VPP """
90 """ Test which created this session """
95 """ Interface on which this session lives """
96 return self._interface
100 """ Address family - AF_INET or AF_INET6 """
105 """ BFD session index from VPP """
106 if self._bs_index is not None:
107 return self._bs_index
108 raise NotConfiguredException("not configured")
111 def local_addr(self):
112 """ BFD session local address (VPP address) """
113 if self._local_addr is None:
114 return self._interface.local_ip4
115 return self._local_addr
118 def local_addr_n(self):
119 """ BFD session local address (VPP address) - raw, suitable for API """
120 if self._local_addr is None:
121 return self._interface.local_ip4n
122 return self._local_addr_n
126 """ BFD session peer address """
127 return self._peer_addr
130 def peer_addr_n(self):
131 """ BFD session peer address - raw, suitable for API """
132 return self._peer_addr_n
136 """ BFD session state """
137 result = self.test.vapi.bfd_udp_session_dump()
140 if s.sw_if_index == self.interface.sw_if_index:
141 if self.af == AF_INET \
143 and self.interface.local_ip4n == s.local_addr[:4] \
144 and self.interface.remote_ip4n == s.peer_addr[:4]:
149 "Could not find BFD session in VPP response: %s" % repr(result))
153 def desired_min_tx(self):
154 return self._desired_min_tx
157 def required_min_rx(self):
158 return self._required_min_rx
161 def detect_mult(self):
162 return self._detect_mult
164 def __init__(self, test, interface, peer_addr, local_addr=None, af=AF_INET,
165 desired_min_tx=100000, required_min_rx=100000, detect_mult=3):
167 self._interface = interface
169 self._local_addr = local_addr
170 self._peer_addr = peer_addr
171 self._peer_addr_n = socket.inet_pton(af, peer_addr)
172 self._bs_index = None
173 self._desired_min_tx = desired_min_tx
174 self._required_min_rx = required_min_rx
175 self._detect_mult = detect_mult
177 def add_vpp_config(self):
178 is_ipv6 = 1 if AF_INET6 == self.af else 0
179 result = self.test.vapi.bfd_udp_add(
180 self._interface.sw_if_index,
182 self.required_min_rx,
187 self._bs_index = result.bs_index
189 def query_vpp_config(self):
190 result = self.test.vapi.bfd_udp_session_dump()
193 if s.sw_if_index == self.interface.sw_if_index:
194 if self.af == AF_INET \
196 and self.interface.local_ip4n == s.local_addr[:4] \
197 and self.interface.remote_ip4n == s.peer_addr[:4]:
204 def remove_vpp_config(self):
205 if hasattr(self, '_bs_index'):
206 is_ipv6 = 1 if AF_INET6 == self._af else 0
207 self.test.vapi.bfd_udp_del(
208 self._interface.sw_if_index,
214 return "bfd-udp-%d" % self.bs_index
217 self.test.vapi.bfd_session_set_flags(self.bs_index, 1)