5 from random import randint
7 from framework import *
11 class BFDCLITestCase(VppTestCase):
12 """Bidirectional Forwarding Detection (BFD) - CLI"""
16 super(BFDCLITestCase, cls).setUpClass()
19 cls.create_pg_interfaces([0])
24 super(BFDCLITestCase, cls).tearDownClass()
27 def test_add_bfd(self):
28 """ create a BFD session """
29 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
30 session.add_vpp_config()
31 self.logger.debug("Session state is %s" % str(session.state))
32 session.remove_vpp_config()
33 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
34 session.add_vpp_config()
35 self.logger.debug("Session state is %s" % str(session.state))
36 session.remove_vpp_config()
38 def test_double_add(self):
39 """ create the same BFD session twice (negative case) """
40 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
41 session.add_vpp_config()
43 session.add_vpp_config()
45 session.remove_vpp_config()
47 session.remove_vpp_config()
48 raise Exception("Expected failure while adding duplicate "
52 def create_packet(interface, ttl=255, src_port=50000, **kwargs):
53 p = (Ether(src=interface.remote_mac, dst=interface.local_mac) /
54 IP(src=interface.remote_ip4, dst=interface.local_ip4, ttl=ttl) /
55 UDP(sport=src_port, dport=BFD.udp_dport) /
60 def verify_ip(test, packet, local_ip, remote_ip):
61 """ Verify correctness of IP layer. """
63 test.assert_equal(ip.src, local_ip, "IP source address")
64 test.assert_equal(ip.dst, remote_ip, "IP destination address")
65 test.assert_equal(ip.ttl, 255, "IP TTL")
68 def verify_udp(test, packet):
69 """ Verify correctness of UDP layer. """
71 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
72 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
76 class BFDTestSession(object):
78 def __init__(self, test, interface, detect_mult=3):
80 self.interface = interface
82 'my_discriminator': 0,
83 'desired_min_tx_interval': 500000,
84 'detect_mult': detect_mult,
85 'diag': BFDDiagCode.no_diagnostic,
88 def update(self, **kwargs):
89 self.bfd_values.update(kwargs)
91 def create_packet(self):
92 packet = create_packet(self.interface)
93 for name, value in self.bfd_values.iteritems():
94 packet[BFD].setfieldval(name, value)
97 def send_packet(self):
98 p = self.create_packet()
99 self.test.logger.debug(ppp("Sending packet:", p))
100 self.test.pg0.add_stream([p])
103 def verify_packet(self, packet):
104 """ Verify correctness of BFD layer. """
106 self.test.assert_equal(bfd.version, 1, "BFD version")
107 self.test.assert_equal(bfd.your_discriminator,
108 self.bfd_values['my_discriminator'],
109 "BFD - your discriminator")
112 class BFDTestCase(VppTestCase):
113 """Bidirectional Forwarding Detection (BFD)"""
117 super(BFDTestCase, cls).setUpClass()
119 cls.create_pg_interfaces([0, 1])
121 cls.pg0.generate_remote_hosts()
122 cls.pg0.configure_ipv4_neighbors()
124 cls.pg0.resolve_arp()
127 super(BFDTestCase, cls).tearDownClass()
131 super(BFDTestCase, self).setUp()
132 self.vapi.want_bfd_events()
133 self.vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
134 self.vpp_session.add_vpp_config()
135 self.vpp_session.admin_up()
136 self.test_session = BFDTestSession(self, self.pg0)
139 self.vapi.want_bfd_events(enable_disable=0)
140 if not self.vpp_dead:
141 self.vpp_session.remove_vpp_config()
142 super(BFDTestCase, self).tearDown()
144 def verify_event(self, event, expected_state):
145 """ Verify correctness of event values. """
147 self.logger.debug("Event: %s" % repr(e))
148 self.assert_equal(e.bs_index, self.vpp_session.bs_index,
150 self.assert_equal(e.sw_if_index, self.vpp_session.interface.sw_if_index,
151 "BFD interface index")
153 if self.vpp_session.af == AF_INET6:
155 self.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
156 if self.vpp_session.af == AF_INET:
157 self.assert_equal(e.local_addr[:4], self.vpp_session.local_addr_n,
158 "Local IPv4 address")
159 self.assert_equal(e.peer_addr[:4], self.vpp_session.peer_addr_n,
162 self.assert_equal(e.local_addr, self.vpp_session.local_addr_n,
163 "Local IPv6 address")
164 self.assert_equal(e.peer_addr, self.vpp_session.peer_addr_n,
166 self.assert_equal(e.state, expected_state, BFDState)
168 def wait_for_bfd_packet(self, timeout=1):
169 p = self.pg0.wait_for_packet(timeout=timeout)
172 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
174 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
175 verify_ip(self, p, self.pg0.local_ip4, self.pg0.remote_ip4)
177 self.test_session.verify_packet(p)
180 def test_slow_timer(self):
183 self.pg_enable_capture([self.pg0])
184 expected_packets = 10
185 self.logger.info("Waiting for %d BFD packets" % expected_packets)
186 self.wait_for_bfd_packet()
187 for i in range(expected_packets):
189 self.wait_for_bfd_packet()
191 self.assert_in_range(
192 after - before, 0.75, 1, "time between slow packets")
195 def test_zero_remote_min_rx(self):
196 """ Zero RemoteMinRxInterval """
197 self.pg_enable_capture([self.pg0])
198 p = self.wait_for_bfd_packet()
199 self.test_session.update(my_discriminator=randint(0, 40000000),
200 your_discriminator=p[BFD].my_discriminator,
202 required_min_rx_interval=0)
203 self.test_session.send_packet()
204 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
205 self.verify_event(e, expected_state=BFDState.up)
208 p = self.pg0.wait_for_packet(timeout=1)
211 raise Exception(ppp("Received unexpected BFD packet:", p))
213 def bfd_conn_up(self):
214 self.pg_enable_capture([self.pg0])
215 self.logger.info("Waiting for slow hello")
216 p = self.wait_for_bfd_packet()
217 self.logger.info("Sending Init")
218 self.test_session.update(my_discriminator=randint(0, 40000000),
219 your_discriminator=p[BFD].my_discriminator,
221 required_min_rx_interval=500000)
222 self.test_session.send_packet()
223 self.logger.info("Waiting for event")
224 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
225 self.verify_event(e, expected_state=BFDState.up)
226 self.logger.info("Session is Up")
227 self.test_session.update(state=BFDState.up)
229 def test_conn_up(self):
230 """ Basic connection up """
233 def test_hold_up(self):
237 self.wait_for_bfd_packet()
238 self.test_session.send_packet()
240 def test_conn_down(self):
241 """ Session down after inactivity """
243 self.wait_for_bfd_packet()
245 0, len(self.vapi.collect_events()),
246 "number of bfd events")
247 self.wait_for_bfd_packet()
249 0, len(self.vapi.collect_events()),
250 "number of bfd events")
251 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
252 self.verify_event(e, expected_state=BFDState.down)
254 @unittest.skip("this test is not working yet")
255 def test_large_required_min_rx(self):
258 self.test_session.update(required_min_rx_interval=interval)
259 self.test_session.send_packet()
262 while time.time() < now + interval / 1000000:
264 p = self.wait_for_bfd_packet()
266 self.logger.error(ppp("Received unexpected packet:", p))
270 self.assert_equal(count, 1, "number of packets received")
273 if __name__ == '__main__':
274 unittest.main(testRunner=VppTestRunner)