4 from __future__ import division
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
15 from scapy.layers.inet import UDP, IP
16 from scapy.layers.inet6 import IPv6
17 from scapy.layers.l2 import Ether
18 from scapy.packet import Raw
20 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
21 BFDDiagCode, BFDState, BFD_vpp_echo
22 from framework import VppTestCase, VppTestRunner, running_extended_tests
24 from vpp_ip import DpoProto
25 from vpp_ip_route import VppIpRoute, VppRoutePath
26 from vpp_lo_interface import VppLoInterface
27 from vpp_papi_provider import UnexpectedApiReturnValueError
28 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
33 class AuthKeyFactory(object):
34 """Factory class for creating auth keys with unique conf key ID"""
37 self._conf_key_ids = {}
39 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
40 """ create a random key with unique conf key id """
41 conf_key_id = randint(0, 0xFFFFFFFF)
42 while conf_key_id in self._conf_key_ids:
43 conf_key_id = randint(0, 0xFFFFFFFF)
44 self._conf_key_ids[conf_key_id] = 1
45 key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
46 return VppBFDAuthKey(test=test, auth_type=auth_type,
47 conf_key_id=conf_key_id, key=key)
50 @unittest.skipUnless(running_extended_tests, "part of extended tests")
51 class BFDAPITestCase(VppTestCase):
52 """Bidirectional Forwarding Detection (BFD) - API"""
59 super(BFDAPITestCase, cls).setUpClass()
60 cls.vapi.cli("set log class bfd level debug")
62 cls.create_pg_interfaces(range(2))
63 for i in cls.pg_interfaces:
69 super(BFDAPITestCase, cls).tearDownClass()
73 super(BFDAPITestCase, self).setUp()
74 self.factory = AuthKeyFactory()
76 def test_add_bfd(self):
77 """ create a BFD session """
78 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
79 session.add_vpp_config()
80 self.logger.debug("Session state is %s", session.state)
81 session.remove_vpp_config()
82 session.add_vpp_config()
83 self.logger.debug("Session state is %s", session.state)
84 session.remove_vpp_config()
86 def test_double_add(self):
87 """ create the same BFD session twice (negative case) """
88 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
89 session.add_vpp_config()
91 with self.vapi.assert_negative_api_retval():
92 session.add_vpp_config()
94 session.remove_vpp_config()
96 def test_add_bfd6(self):
97 """ create IPv6 BFD session """
98 session = VppBFDUDPSession(
99 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
100 session.add_vpp_config()
101 self.logger.debug("Session state is %s", session.state)
102 session.remove_vpp_config()
103 session.add_vpp_config()
104 self.logger.debug("Session state is %s", session.state)
105 session.remove_vpp_config()
107 def test_mod_bfd(self):
108 """ modify BFD session parameters """
109 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
110 desired_min_tx=50000,
111 required_min_rx=10000,
113 session.add_vpp_config()
114 s = session.get_bfd_udp_session_dump_entry()
115 self.assert_equal(session.desired_min_tx,
117 "desired min transmit interval")
118 self.assert_equal(session.required_min_rx,
120 "required min receive interval")
121 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
122 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
123 required_min_rx=session.required_min_rx * 2,
124 detect_mult=session.detect_mult * 2)
125 s = session.get_bfd_udp_session_dump_entry()
126 self.assert_equal(session.desired_min_tx,
128 "desired min transmit interval")
129 self.assert_equal(session.required_min_rx,
131 "required min receive interval")
132 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
134 def test_add_sha1_keys(self):
135 """ add SHA1 keys """
137 keys = [self.factory.create_random_key(
138 self) for i in range(0, key_count)]
140 self.assertFalse(key.query_vpp_config())
144 self.assertTrue(key.query_vpp_config())
146 indexes = range(key_count)
151 key.remove_vpp_config()
153 for j in range(key_count):
156 self.assertFalse(key.query_vpp_config())
158 self.assertTrue(key.query_vpp_config())
159 # should be removed now
161 self.assertFalse(key.query_vpp_config())
162 # add back and remove again
166 self.assertTrue(key.query_vpp_config())
168 key.remove_vpp_config()
170 self.assertFalse(key.query_vpp_config())
172 def test_add_bfd_sha1(self):
173 """ create a BFD session (SHA1) """
174 key = self.factory.create_random_key(self)
176 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
178 session.add_vpp_config()
179 self.logger.debug("Session state is %s", session.state)
180 session.remove_vpp_config()
181 session.add_vpp_config()
182 self.logger.debug("Session state is %s", session.state)
183 session.remove_vpp_config()
185 def test_double_add_sha1(self):
186 """ create the same BFD session twice (negative case) (SHA1) """
187 key = self.factory.create_random_key(self)
189 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
191 session.add_vpp_config()
192 with self.assertRaises(Exception):
193 session.add_vpp_config()
195 def test_add_auth_nonexistent_key(self):
196 """ create BFD session using non-existent SHA1 (negative case) """
197 session = VppBFDUDPSession(
198 self, self.pg0, self.pg0.remote_ip4,
199 sha1_key=self.factory.create_random_key(self))
200 with self.assertRaises(Exception):
201 session.add_vpp_config()
203 def test_shared_sha1_key(self):
204 """ share single SHA1 key between multiple BFD sessions """
205 key = self.factory.create_random_key(self)
208 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
210 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
211 sha1_key=key, af=AF_INET6),
212 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
214 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
215 sha1_key=key, af=AF_INET6)]
220 e = key.get_bfd_auth_keys_dump_entry()
221 self.assert_equal(e.use_count, len(sessions) - removed,
222 "Use count for shared key")
223 s.remove_vpp_config()
225 e = key.get_bfd_auth_keys_dump_entry()
226 self.assert_equal(e.use_count, len(sessions) - removed,
227 "Use count for shared key")
229 def test_activate_auth(self):
230 """ activate SHA1 authentication """
231 key = self.factory.create_random_key(self)
233 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
234 session.add_vpp_config()
235 session.activate_auth(key)
237 def test_deactivate_auth(self):
238 """ deactivate SHA1 authentication """
239 key = self.factory.create_random_key(self)
241 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
242 session.add_vpp_config()
243 session.activate_auth(key)
244 session.deactivate_auth()
246 def test_change_key(self):
247 """ change SHA1 key """
248 key1 = self.factory.create_random_key(self)
249 key2 = self.factory.create_random_key(self)
250 while key2.conf_key_id == key1.conf_key_id:
251 key2 = self.factory.create_random_key(self)
252 key1.add_vpp_config()
253 key2.add_vpp_config()
254 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
256 session.add_vpp_config()
257 session.activate_auth(key2)
259 def test_set_del_udp_echo_source(self):
260 """ set/del udp echo source """
261 self.create_loopback_interfaces(1)
262 self.loopback0 = self.lo_interfaces[0]
263 self.loopback0.admin_up()
264 echo_source = self.vapi.bfd_udp_get_echo_source()
265 self.assertFalse(echo_source.is_set)
266 self.assertFalse(echo_source.have_usable_ip4)
267 self.assertFalse(echo_source.have_usable_ip6)
269 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
270 echo_source = self.vapi.bfd_udp_get_echo_source()
271 self.assertTrue(echo_source.is_set)
272 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
273 self.assertFalse(echo_source.have_usable_ip4)
274 self.assertFalse(echo_source.have_usable_ip6)
276 self.loopback0.config_ip4()
277 unpacked = unpack("!L", self.loopback0.local_ip4n)
278 echo_ip4 = pack("!L", unpacked[0] ^ 1)
279 echo_source = self.vapi.bfd_udp_get_echo_source()
280 self.assertTrue(echo_source.is_set)
281 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
282 self.assertTrue(echo_source.have_usable_ip4)
283 self.assertEqual(echo_source.ip4_addr, echo_ip4)
284 self.assertFalse(echo_source.have_usable_ip6)
286 self.loopback0.config_ip6()
287 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
288 echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
290 echo_source = self.vapi.bfd_udp_get_echo_source()
291 self.assertTrue(echo_source.is_set)
292 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
293 self.assertTrue(echo_source.have_usable_ip4)
294 self.assertEqual(echo_source.ip4_addr, echo_ip4)
295 self.assertTrue(echo_source.have_usable_ip6)
296 self.assertEqual(echo_source.ip6_addr, echo_ip6)
298 self.vapi.bfd_udp_del_echo_source()
299 echo_source = self.vapi.bfd_udp_get_echo_source()
300 self.assertFalse(echo_source.is_set)
301 self.assertFalse(echo_source.have_usable_ip4)
302 self.assertFalse(echo_source.have_usable_ip6)
305 @unittest.skipUnless(running_extended_tests, "part of extended tests")
306 class BFDTestSession(object):
307 """ BFD session as seen from test framework side """
309 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
310 bfd_key_id=None, our_seq_number=None):
313 self.sha1_key = sha1_key
314 self.bfd_key_id = bfd_key_id
315 self.interface = interface
316 self.udp_sport = randint(49152, 65535)
317 if our_seq_number is None:
318 self.our_seq_number = randint(0, 40000000)
320 self.our_seq_number = our_seq_number
321 self.vpp_seq_number = None
322 self.my_discriminator = 0
323 self.desired_min_tx = 300000
324 self.required_min_rx = 300000
325 self.required_min_echo_rx = None
326 self.detect_mult = detect_mult
327 self.diag = BFDDiagCode.no_diagnostic
328 self.your_discriminator = None
329 self.state = BFDState.down
330 self.auth_type = BFDAuthType.no_auth
332 def inc_seq_num(self):
333 """ increment sequence number, wrapping if needed """
334 if self.our_seq_number == 0xFFFFFFFF:
335 self.our_seq_number = 0
337 self.our_seq_number += 1
339 def update(self, my_discriminator=None, your_discriminator=None,
340 desired_min_tx=None, required_min_rx=None,
341 required_min_echo_rx=None, detect_mult=None,
342 diag=None, state=None, auth_type=None):
343 """ update BFD parameters associated with session """
344 if my_discriminator is not None:
345 self.my_discriminator = my_discriminator
346 if your_discriminator is not None:
347 self.your_discriminator = your_discriminator
348 if required_min_rx is not None:
349 self.required_min_rx = required_min_rx
350 if required_min_echo_rx is not None:
351 self.required_min_echo_rx = required_min_echo_rx
352 if desired_min_tx is not None:
353 self.desired_min_tx = desired_min_tx
354 if detect_mult is not None:
355 self.detect_mult = detect_mult
358 if state is not None:
360 if auth_type is not None:
361 self.auth_type = auth_type
363 def fill_packet_fields(self, packet):
364 """ set packet fields with known values in packet """
366 if self.my_discriminator:
367 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
368 self.my_discriminator)
369 bfd.my_discriminator = self.my_discriminator
370 if self.your_discriminator:
371 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
372 self.your_discriminator)
373 bfd.your_discriminator = self.your_discriminator
374 if self.required_min_rx:
375 self.test.logger.debug(
376 "BFD: setting packet.required_min_rx_interval=%s",
377 self.required_min_rx)
378 bfd.required_min_rx_interval = self.required_min_rx
379 if self.required_min_echo_rx:
380 self.test.logger.debug(
381 "BFD: setting packet.required_min_echo_rx=%s",
382 self.required_min_echo_rx)
383 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
384 if self.desired_min_tx:
385 self.test.logger.debug(
386 "BFD: setting packet.desired_min_tx_interval=%s",
388 bfd.desired_min_tx_interval = self.desired_min_tx
390 self.test.logger.debug(
391 "BFD: setting packet.detect_mult=%s", self.detect_mult)
392 bfd.detect_mult = self.detect_mult
394 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
397 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
398 bfd.state = self.state
400 # this is used by a negative test-case
401 self.test.logger.debug("BFD: setting packet.auth_type=%s",
403 bfd.auth_type = self.auth_type
405 def create_packet(self):
406 """ create a BFD packet, reflecting the current state of session """
409 bfd.auth_type = self.sha1_key.auth_type
410 bfd.auth_len = BFD.sha1_auth_len
411 bfd.auth_key_id = self.bfd_key_id
412 bfd.auth_seq_num = self.our_seq_number
413 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
416 if self.af == AF_INET6:
417 packet = (Ether(src=self.interface.remote_mac,
418 dst=self.interface.local_mac) /
419 IPv6(src=self.interface.remote_ip6,
420 dst=self.interface.local_ip6,
422 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
425 packet = (Ether(src=self.interface.remote_mac,
426 dst=self.interface.local_mac) /
427 IP(src=self.interface.remote_ip4,
428 dst=self.interface.local_ip4,
430 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
432 self.test.logger.debug("BFD: Creating packet")
433 self.fill_packet_fields(packet)
435 hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
436 "\0" * (20 - len(self.sha1_key.key))
437 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
438 hashlib.sha1(hash_material).hexdigest())
439 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
442 def send_packet(self, packet=None, interface=None):
443 """ send packet on interface, creating the packet if needed """
445 packet = self.create_packet()
446 if interface is None:
447 interface = self.test.pg0
448 self.test.logger.debug(ppp("Sending packet:", packet))
449 interface.add_stream(packet)
452 def verify_sha1_auth(self, packet):
453 """ Verify correctness of authentication in BFD layer. """
455 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
456 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
458 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
459 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
460 if self.vpp_seq_number is None:
461 self.vpp_seq_number = bfd.auth_seq_num
462 self.test.logger.debug("Received initial sequence number: %s" %
465 recvd_seq_num = bfd.auth_seq_num
466 self.test.logger.debug("Received followup sequence number: %s" %
468 if self.vpp_seq_number < 0xffffffff:
469 if self.sha1_key.auth_type == \
470 BFDAuthType.meticulous_keyed_sha1:
471 self.test.assert_equal(recvd_seq_num,
472 self.vpp_seq_number + 1,
473 "BFD sequence number")
475 self.test.assert_in_range(recvd_seq_num,
477 self.vpp_seq_number + 1,
478 "BFD sequence number")
480 if self.sha1_key.auth_type == \
481 BFDAuthType.meticulous_keyed_sha1:
482 self.test.assert_equal(recvd_seq_num, 0,
483 "BFD sequence number")
485 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
486 "BFD sequence number not one of "
487 "(%s, 0)" % self.vpp_seq_number)
488 self.vpp_seq_number = recvd_seq_num
489 # last 20 bytes represent the hash - so replace them with the key,
490 # pad the result with zeros and hash the result
491 hash_material = bfd.original[:-20] + self.sha1_key.key + \
492 "\0" * (20 - len(self.sha1_key.key))
493 expected_hash = hashlib.sha1(hash_material).hexdigest()
494 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
495 expected_hash, "Auth key hash")
497 def verify_bfd(self, packet):
498 """ Verify correctness of BFD layer. """
500 self.test.assert_equal(bfd.version, 1, "BFD version")
501 self.test.assert_equal(bfd.your_discriminator,
502 self.my_discriminator,
503 "BFD - your discriminator")
505 self.verify_sha1_auth(packet)
508 def bfd_session_up(test):
509 """ Bring BFD session up """
510 test.logger.info("BFD: Waiting for slow hello")
511 p = wait_for_bfd_packet(test, 2)
513 if hasattr(test, 'vpp_clock_offset'):
514 old_offset = test.vpp_clock_offset
515 test.vpp_clock_offset = time.time() - p.time
516 test.logger.debug("BFD: Calculated vpp clock offset: %s",
517 test.vpp_clock_offset)
519 test.assertAlmostEqual(
520 old_offset, test.vpp_clock_offset, delta=0.5,
521 msg="vpp clock offset not stable (new: %s, old: %s)" %
522 (test.vpp_clock_offset, old_offset))
523 test.logger.info("BFD: Sending Init")
524 test.test_session.update(my_discriminator=randint(0, 40000000),
525 your_discriminator=p[BFD].my_discriminator,
527 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
528 BFDAuthType.meticulous_keyed_sha1:
529 test.test_session.inc_seq_num()
530 test.test_session.send_packet()
531 test.logger.info("BFD: Waiting for event")
532 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
533 verify_event(test, e, expected_state=BFDState.up)
534 test.logger.info("BFD: Session is Up")
535 test.test_session.update(state=BFDState.up)
536 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
537 BFDAuthType.meticulous_keyed_sha1:
538 test.test_session.inc_seq_num()
539 test.test_session.send_packet()
540 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
543 def bfd_session_down(test):
544 """ Bring BFD session down """
545 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
546 test.test_session.update(state=BFDState.down)
547 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
548 BFDAuthType.meticulous_keyed_sha1:
549 test.test_session.inc_seq_num()
550 test.test_session.send_packet()
551 test.logger.info("BFD: Waiting for event")
552 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
553 verify_event(test, e, expected_state=BFDState.down)
554 test.logger.info("BFD: Session is Down")
555 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
558 def verify_bfd_session_config(test, session, state=None):
559 dump = session.get_bfd_udp_session_dump_entry()
560 test.assertIsNotNone(dump)
561 # since dump is not none, we have verified that sw_if_index and addresses
562 # are valid (in get_bfd_udp_session_dump_entry)
564 test.assert_equal(dump.state, state, "session state")
565 test.assert_equal(dump.required_min_rx, session.required_min_rx,
566 "required min rx interval")
567 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
568 "desired min tx interval")
569 test.assert_equal(dump.detect_mult, session.detect_mult,
571 if session.sha1_key is None:
572 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
574 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
575 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
577 test.assert_equal(dump.conf_key_id,
578 session.sha1_key.conf_key_id,
582 def verify_ip(test, packet):
583 """ Verify correctness of IP layer. """
584 if test.vpp_session.af == AF_INET6:
586 local_ip = test.pg0.local_ip6
587 remote_ip = test.pg0.remote_ip6
588 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
591 local_ip = test.pg0.local_ip4
592 remote_ip = test.pg0.remote_ip4
593 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
594 test.assert_equal(ip.src, local_ip, "IP source address")
595 test.assert_equal(ip.dst, remote_ip, "IP destination address")
598 def verify_udp(test, packet):
599 """ Verify correctness of UDP layer. """
601 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
602 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
606 def verify_event(test, event, expected_state):
607 """ Verify correctness of event values. """
609 test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
610 test.assert_equal(e.sw_if_index,
611 test.vpp_session.interface.sw_if_index,
612 "BFD interface index")
614 if test.vpp_session.af == AF_INET6:
616 test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
617 if test.vpp_session.af == AF_INET:
618 test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
619 "Local IPv4 address")
620 test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
623 test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
624 "Local IPv6 address")
625 test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
627 test.assert_equal(e.state, expected_state, BFDState)
630 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
631 """ wait for BFD packet and verify its correctness
633 :param timeout: how long to wait
634 :param pcap_time_min: ignore packets with pcap timestamp lower than this
636 :returns: tuple (packet, time spent waiting for packet)
638 test.logger.info("BFD: Waiting for BFD packet")
639 deadline = time.time() + timeout
644 test.assert_in_range(counter, 0, 100, "number of packets ignored")
645 time_left = deadline - time.time()
647 raise CaptureTimeoutError("Packet did not arrive within timeout")
648 p = test.pg0.wait_for_packet(timeout=time_left)
649 test.logger.debug(ppp("BFD: Got packet:", p))
650 if pcap_time_min is not None and p.time < pcap_time_min:
651 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
652 "pcap time min %s):" %
653 (p.time, pcap_time_min), p))
658 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
660 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
663 test.test_session.verify_bfd(p)
667 @unittest.skipUnless(running_extended_tests, "part of extended tests")
668 class BFD4TestCase(VppTestCase):
669 """Bidirectional Forwarding Detection (BFD)"""
672 vpp_clock_offset = None
678 super(BFD4TestCase, cls).setUpClass()
679 cls.vapi.cli("set log class bfd level debug")
681 cls.create_pg_interfaces([0])
682 cls.create_loopback_interfaces(1)
683 cls.loopback0 = cls.lo_interfaces[0]
684 cls.loopback0.config_ip4()
685 cls.loopback0.admin_up()
687 cls.pg0.configure_ipv4_neighbors()
689 cls.pg0.resolve_arp()
692 super(BFD4TestCase, cls).tearDownClass()
696 super(BFD4TestCase, self).setUp()
697 self.factory = AuthKeyFactory()
698 self.vapi.want_bfd_events()
699 self.pg0.enable_capture()
701 self.vpp_session = VppBFDUDPSession(self, self.pg0,
703 self.vpp_session.add_vpp_config()
704 self.vpp_session.admin_up()
705 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
707 self.vapi.want_bfd_events(enable_disable=0)
711 if not self.vpp_dead:
712 self.vapi.want_bfd_events(enable_disable=0)
713 self.vapi.collect_events() # clear the event queue
714 super(BFD4TestCase, self).tearDown()
716 def test_session_up(self):
717 """ bring BFD session up """
720 def test_session_up_by_ip(self):
721 """ bring BFD session up - first frame looked up by address pair """
722 self.logger.info("BFD: Sending Slow control frame")
723 self.test_session.update(my_discriminator=randint(0, 40000000))
724 self.test_session.send_packet()
725 self.pg0.enable_capture()
726 p = self.pg0.wait_for_packet(1)
727 self.assert_equal(p[BFD].your_discriminator,
728 self.test_session.my_discriminator,
729 "BFD - your discriminator")
730 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
731 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
733 self.logger.info("BFD: Waiting for event")
734 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
735 verify_event(self, e, expected_state=BFDState.init)
736 self.logger.info("BFD: Sending Up")
737 self.test_session.send_packet()
738 self.logger.info("BFD: Waiting for event")
739 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
740 verify_event(self, e, expected_state=BFDState.up)
741 self.logger.info("BFD: Session is Up")
742 self.test_session.update(state=BFDState.up)
743 self.test_session.send_packet()
744 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
746 def test_session_down(self):
747 """ bring BFD session down """
749 bfd_session_down(self)
751 @unittest.skipUnless(running_extended_tests, "part of extended tests")
752 def test_hold_up(self):
753 """ hold BFD session up """
755 for dummy in range(self.test_session.detect_mult * 2):
756 wait_for_bfd_packet(self)
757 self.test_session.send_packet()
758 self.assert_equal(len(self.vapi.collect_events()), 0,
759 "number of bfd events")
761 @unittest.skipUnless(running_extended_tests, "part of extended tests")
762 def test_slow_timer(self):
763 """ verify slow periodic control frames while session down """
765 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
766 prev_packet = wait_for_bfd_packet(self, 2)
767 for dummy in range(packet_count):
768 next_packet = wait_for_bfd_packet(self, 2)
769 time_diff = next_packet.time - prev_packet.time
770 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
771 # to work around timing issues
772 self.assert_in_range(
773 time_diff, 0.70, 1.05, "time between slow packets")
774 prev_packet = next_packet
776 @unittest.skipUnless(running_extended_tests, "part of extended tests")
777 def test_zero_remote_min_rx(self):
778 """ no packets when zero remote required min rx interval """
780 self.test_session.update(required_min_rx=0)
781 self.test_session.send_packet()
782 for dummy in range(self.test_session.detect_mult):
783 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
784 "sleep before transmitting bfd packet")
785 self.test_session.send_packet()
787 p = wait_for_bfd_packet(self, timeout=0)
788 self.logger.error(ppp("Received unexpected packet:", p))
789 except CaptureTimeoutError:
792 len(self.vapi.collect_events()), 0, "number of bfd events")
793 self.test_session.update(required_min_rx=300000)
794 for dummy in range(3):
795 self.test_session.send_packet()
797 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
799 len(self.vapi.collect_events()), 0, "number of bfd events")
801 @unittest.skipUnless(running_extended_tests, "part of extended tests")
802 def test_conn_down(self):
803 """ verify session goes down after inactivity """
805 detection_time = self.test_session.detect_mult *\
806 self.vpp_session.required_min_rx / USEC_IN_SEC
807 self.sleep(detection_time, "waiting for BFD session time-out")
808 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
809 verify_event(self, e, expected_state=BFDState.down)
811 @unittest.skipUnless(running_extended_tests, "part of extended tests")
812 def test_large_required_min_rx(self):
813 """ large remote required min rx interval """
815 p = wait_for_bfd_packet(self)
817 self.test_session.update(required_min_rx=interval)
818 self.test_session.send_packet()
819 time_mark = time.time()
821 # busy wait here, trying to collect a packet or event, vpp is not
822 # allowed to send packets and the session will timeout first - so the
823 # Up->Down event must arrive before any packets do
824 while time.time() < time_mark + interval / USEC_IN_SEC:
826 p = wait_for_bfd_packet(self, timeout=0)
827 # if vpp managed to send a packet before we did the session
828 # session update, then that's fine, ignore it
829 if p.time < time_mark - self.vpp_clock_offset:
831 self.logger.error(ppp("Received unexpected packet:", p))
833 except CaptureTimeoutError:
835 events = self.vapi.collect_events()
837 verify_event(self, events[0], BFDState.down)
839 self.assert_equal(count, 0, "number of packets received")
841 @unittest.skipUnless(running_extended_tests, "part of extended tests")
842 def test_immediate_remote_min_rx_reduction(self):
843 """ immediately honor remote required min rx reduction """
844 self.vpp_session.remove_vpp_config()
845 self.vpp_session = VppBFDUDPSession(
846 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
847 self.pg0.enable_capture()
848 self.vpp_session.add_vpp_config()
849 self.test_session.update(desired_min_tx=1000000,
850 required_min_rx=1000000)
852 reference_packet = wait_for_bfd_packet(self)
853 time_mark = time.time()
855 self.test_session.update(required_min_rx=interval)
856 self.test_session.send_packet()
857 extra_time = time.time() - time_mark
858 p = wait_for_bfd_packet(self)
859 # first packet is allowed to be late by time we spent doing the update
860 # calculated in extra_time
861 self.assert_in_range(p.time - reference_packet.time,
862 .95 * 0.75 * interval / USEC_IN_SEC,
863 1.05 * interval / USEC_IN_SEC + extra_time,
864 "time between BFD packets")
866 for dummy in range(3):
867 p = wait_for_bfd_packet(self)
868 diff = p.time - reference_packet.time
869 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
870 1.05 * interval / USEC_IN_SEC,
871 "time between BFD packets")
874 @unittest.skipUnless(running_extended_tests, "part of extended tests")
875 def test_modify_req_min_rx_double(self):
876 """ modify session - double required min rx """
878 p = wait_for_bfd_packet(self)
879 self.test_session.update(desired_min_tx=10000,
880 required_min_rx=10000)
881 self.test_session.send_packet()
882 # double required min rx
883 self.vpp_session.modify_parameters(
884 required_min_rx=2 * self.vpp_session.required_min_rx)
885 p = wait_for_bfd_packet(
886 self, pcap_time_min=time.time() - self.vpp_clock_offset)
887 # poll bit needs to be set
888 self.assertIn("P", p.sprintf("%BFD.flags%"),
889 "Poll bit not set in BFD packet")
890 # finish poll sequence with final packet
891 final = self.test_session.create_packet()
892 final[BFD].flags = "F"
893 timeout = self.test_session.detect_mult * \
894 max(self.test_session.desired_min_tx,
895 self.vpp_session.required_min_rx) / USEC_IN_SEC
896 self.test_session.send_packet(final)
897 time_mark = time.time()
898 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
899 verify_event(self, e, expected_state=BFDState.down)
900 time_to_event = time.time() - time_mark
901 self.assert_in_range(time_to_event, .9 * timeout,
902 1.1 * timeout, "session timeout")
904 @unittest.skipUnless(running_extended_tests, "part of extended tests")
905 def test_modify_req_min_rx_halve(self):
906 """ modify session - halve required min rx """
907 self.vpp_session.modify_parameters(
908 required_min_rx=2 * self.vpp_session.required_min_rx)
910 p = wait_for_bfd_packet(self)
911 self.test_session.update(desired_min_tx=10000,
912 required_min_rx=10000)
913 self.test_session.send_packet()
914 p = wait_for_bfd_packet(
915 self, pcap_time_min=time.time() - self.vpp_clock_offset)
916 # halve required min rx
917 old_required_min_rx = self.vpp_session.required_min_rx
918 self.vpp_session.modify_parameters(
919 required_min_rx=0.5 * self.vpp_session.required_min_rx)
920 # now we wait 0.8*3*old-req-min-rx and the session should still be up
921 self.sleep(0.8 * self.vpp_session.detect_mult *
922 old_required_min_rx / USEC_IN_SEC,
923 "wait before finishing poll sequence")
924 self.assert_equal(len(self.vapi.collect_events()), 0,
925 "number of bfd events")
926 p = wait_for_bfd_packet(self)
927 # poll bit needs to be set
928 self.assertIn("P", p.sprintf("%BFD.flags%"),
929 "Poll bit not set in BFD packet")
930 # finish poll sequence with final packet
931 final = self.test_session.create_packet()
932 final[BFD].flags = "F"
933 self.test_session.send_packet(final)
934 # now the session should time out under new conditions
935 detection_time = self.test_session.detect_mult *\
936 self.vpp_session.required_min_rx / USEC_IN_SEC
938 e = self.vapi.wait_for_event(
939 2 * detection_time, "bfd_udp_session_details")
941 self.assert_in_range(after - before,
942 0.9 * detection_time,
943 1.1 * detection_time,
944 "time before bfd session goes down")
945 verify_event(self, e, expected_state=BFDState.down)
947 @unittest.skipUnless(running_extended_tests, "part of extended tests")
948 def test_modify_detect_mult(self):
949 """ modify detect multiplier """
951 p = wait_for_bfd_packet(self)
952 self.vpp_session.modify_parameters(detect_mult=1)
953 p = wait_for_bfd_packet(
954 self, pcap_time_min=time.time() - self.vpp_clock_offset)
955 self.assert_equal(self.vpp_session.detect_mult,
958 # poll bit must not be set
959 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
960 "Poll bit not set in BFD packet")
961 self.vpp_session.modify_parameters(detect_mult=10)
962 p = wait_for_bfd_packet(
963 self, pcap_time_min=time.time() - self.vpp_clock_offset)
964 self.assert_equal(self.vpp_session.detect_mult,
967 # poll bit must not be set
968 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
969 "Poll bit not set in BFD packet")
971 @unittest.skipUnless(running_extended_tests, "part of extended tests")
972 def test_queued_poll(self):
973 """ test poll sequence queueing """
975 p = wait_for_bfd_packet(self)
976 self.vpp_session.modify_parameters(
977 required_min_rx=2 * self.vpp_session.required_min_rx)
978 p = wait_for_bfd_packet(self)
979 poll_sequence_start = time.time()
980 poll_sequence_length_min = 0.5
981 send_final_after = time.time() + poll_sequence_length_min
982 # poll bit needs to be set
983 self.assertIn("P", p.sprintf("%BFD.flags%"),
984 "Poll bit not set in BFD packet")
985 self.assert_equal(p[BFD].required_min_rx_interval,
986 self.vpp_session.required_min_rx,
987 "BFD required min rx interval")
988 self.vpp_session.modify_parameters(
989 required_min_rx=2 * self.vpp_session.required_min_rx)
990 # 2nd poll sequence should be queued now
991 # don't send the reply back yet, wait for some time to emulate
992 # longer round-trip time
994 while time.time() < send_final_after:
995 self.test_session.send_packet()
996 p = wait_for_bfd_packet(self)
997 self.assert_equal(len(self.vapi.collect_events()), 0,
998 "number of bfd events")
999 self.assert_equal(p[BFD].required_min_rx_interval,
1000 self.vpp_session.required_min_rx,
1001 "BFD required min rx interval")
1003 # poll bit must be set
1004 self.assertIn("P", p.sprintf("%BFD.flags%"),
1005 "Poll bit not set in BFD packet")
1006 final = self.test_session.create_packet()
1007 final[BFD].flags = "F"
1008 self.test_session.send_packet(final)
1009 # finish 1st with final
1010 poll_sequence_length = time.time() - poll_sequence_start
1011 # vpp must wait for some time before starting new poll sequence
1012 poll_no_2_started = False
1013 for dummy in range(2 * packet_count):
1014 p = wait_for_bfd_packet(self)
1015 self.assert_equal(len(self.vapi.collect_events()), 0,
1016 "number of bfd events")
1017 if "P" in p.sprintf("%BFD.flags%"):
1018 poll_no_2_started = True
1019 if time.time() < poll_sequence_start + poll_sequence_length:
1020 raise Exception("VPP started 2nd poll sequence too soon")
1021 final = self.test_session.create_packet()
1022 final[BFD].flags = "F"
1023 self.test_session.send_packet(final)
1026 self.test_session.send_packet()
1027 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1028 # finish 2nd with final
1029 final = self.test_session.create_packet()
1030 final[BFD].flags = "F"
1031 self.test_session.send_packet(final)
1032 p = wait_for_bfd_packet(self)
1033 # poll bit must not be set
1034 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1035 "Poll bit set in BFD packet")
1037 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1038 def test_poll_response(self):
1039 """ test correct response to control frame with poll bit set """
1040 bfd_session_up(self)
1041 poll = self.test_session.create_packet()
1042 poll[BFD].flags = "P"
1043 self.test_session.send_packet(poll)
1044 final = wait_for_bfd_packet(
1045 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1046 self.assertIn("F", final.sprintf("%BFD.flags%"))
1048 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1049 def test_no_periodic_if_remote_demand(self):
1050 """ no periodic frames outside poll sequence if remote demand set """
1051 bfd_session_up(self)
1052 demand = self.test_session.create_packet()
1053 demand[BFD].flags = "D"
1054 self.test_session.send_packet(demand)
1055 transmit_time = 0.9 \
1056 * max(self.vpp_session.required_min_rx,
1057 self.test_session.desired_min_tx) \
1060 for dummy in range(self.test_session.detect_mult * 2):
1061 time.sleep(transmit_time)
1062 self.test_session.send_packet(demand)
1064 p = wait_for_bfd_packet(self, timeout=0)
1065 self.logger.error(ppp("Received unexpected packet:", p))
1067 except CaptureTimeoutError:
1069 events = self.vapi.collect_events()
1071 self.logger.error("Received unexpected event: %s", e)
1072 self.assert_equal(count, 0, "number of packets received")
1073 self.assert_equal(len(events), 0, "number of events received")
1075 def test_echo_looped_back(self):
1076 """ echo packets looped back """
1077 # don't need a session in this case..
1078 self.vpp_session.remove_vpp_config()
1079 self.pg0.enable_capture()
1080 echo_packet_count = 10
1081 # random source port low enough to increment a few times..
1082 udp_sport_tx = randint(1, 50000)
1083 udp_sport_rx = udp_sport_tx
1084 echo_packet = (Ether(src=self.pg0.remote_mac,
1085 dst=self.pg0.local_mac) /
1086 IP(src=self.pg0.remote_ip4,
1087 dst=self.pg0.remote_ip4) /
1088 UDP(dport=BFD.udp_dport_echo) /
1089 Raw("this should be looped back"))
1090 for dummy in range(echo_packet_count):
1091 self.sleep(.01, "delay between echo packets")
1092 echo_packet[UDP].sport = udp_sport_tx
1094 self.logger.debug(ppp("Sending packet:", echo_packet))
1095 self.pg0.add_stream(echo_packet)
1097 for dummy in range(echo_packet_count):
1098 p = self.pg0.wait_for_packet(1)
1099 self.logger.debug(ppp("Got packet:", p))
1101 self.assert_equal(self.pg0.remote_mac,
1102 ether.dst, "Destination MAC")
1103 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1105 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1106 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1108 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1109 "UDP destination port")
1110 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1112 # need to compare the hex payload here, otherwise BFD_vpp_echo
1114 self.assertEqual(str(p[UDP].payload),
1115 str(echo_packet[UDP].payload),
1116 "Received packet is not the echo packet sent")
1117 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1118 "ECHO packet identifier for test purposes)")
1120 def test_echo(self):
1121 """ echo function """
1122 bfd_session_up(self)
1123 self.test_session.update(required_min_echo_rx=150000)
1124 self.test_session.send_packet()
1125 detection_time = self.test_session.detect_mult *\
1126 self.vpp_session.required_min_rx / USEC_IN_SEC
1127 # echo shouldn't work without echo source set
1128 for dummy in range(10):
1129 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1130 self.sleep(sleep, "delay before sending bfd packet")
1131 self.test_session.send_packet()
1132 p = wait_for_bfd_packet(
1133 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1134 self.assert_equal(p[BFD].required_min_rx_interval,
1135 self.vpp_session.required_min_rx,
1136 "BFD required min rx interval")
1137 self.test_session.send_packet()
1138 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1140 # should be turned on - loopback echo packets
1141 for dummy in range(3):
1142 loop_until = time.time() + 0.75 * detection_time
1143 while time.time() < loop_until:
1144 p = self.pg0.wait_for_packet(1)
1145 self.logger.debug(ppp("Got packet:", p))
1146 if p[UDP].dport == BFD.udp_dport_echo:
1148 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1149 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1150 "BFD ECHO src IP equal to loopback IP")
1151 self.logger.debug(ppp("Looping back packet:", p))
1152 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1153 "ECHO packet destination MAC address")
1154 p[Ether].dst = self.pg0.local_mac
1155 self.pg0.add_stream(p)
1158 elif p.haslayer(BFD):
1160 self.assertGreaterEqual(
1161 p[BFD].required_min_rx_interval,
1163 if "P" in p.sprintf("%BFD.flags%"):
1164 final = self.test_session.create_packet()
1165 final[BFD].flags = "F"
1166 self.test_session.send_packet(final)
1168 raise Exception(ppp("Received unknown packet:", p))
1170 self.assert_equal(len(self.vapi.collect_events()), 0,
1171 "number of bfd events")
1172 self.test_session.send_packet()
1173 self.assertTrue(echo_seen, "No echo packets received")
1175 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1176 def test_echo_fail(self):
1177 """ session goes down if echo function fails """
1178 bfd_session_up(self)
1179 self.test_session.update(required_min_echo_rx=150000)
1180 self.test_session.send_packet()
1181 detection_time = self.test_session.detect_mult *\
1182 self.vpp_session.required_min_rx / USEC_IN_SEC
1183 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1184 # echo function should be used now, but we will drop the echo packets
1185 verified_diag = False
1186 for dummy in range(3):
1187 loop_until = time.time() + 0.75 * detection_time
1188 while time.time() < loop_until:
1189 p = self.pg0.wait_for_packet(1)
1190 self.logger.debug(ppp("Got packet:", p))
1191 if p[UDP].dport == BFD.udp_dport_echo:
1194 elif p.haslayer(BFD):
1195 if "P" in p.sprintf("%BFD.flags%"):
1196 self.assertGreaterEqual(
1197 p[BFD].required_min_rx_interval,
1199 final = self.test_session.create_packet()
1200 final[BFD].flags = "F"
1201 self.test_session.send_packet(final)
1202 if p[BFD].state == BFDState.down:
1203 self.assert_equal(p[BFD].diag,
1204 BFDDiagCode.echo_function_failed,
1206 verified_diag = True
1208 raise Exception(ppp("Received unknown packet:", p))
1209 self.test_session.send_packet()
1210 events = self.vapi.collect_events()
1211 self.assert_equal(len(events), 1, "number of bfd events")
1212 self.assert_equal(events[0].state, BFDState.down, BFDState)
1213 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1215 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1216 def test_echo_stop(self):
1217 """ echo function stops if peer sets required min echo rx zero """
1218 bfd_session_up(self)
1219 self.test_session.update(required_min_echo_rx=150000)
1220 self.test_session.send_packet()
1221 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1222 # wait for first echo packet
1224 p = self.pg0.wait_for_packet(1)
1225 self.logger.debug(ppp("Got packet:", p))
1226 if p[UDP].dport == BFD.udp_dport_echo:
1227 self.logger.debug(ppp("Looping back packet:", p))
1228 p[Ether].dst = self.pg0.local_mac
1229 self.pg0.add_stream(p)
1232 elif p.haslayer(BFD):
1236 raise Exception(ppp("Received unknown packet:", p))
1237 self.test_session.update(required_min_echo_rx=0)
1238 self.test_session.send_packet()
1239 # echo packets shouldn't arrive anymore
1240 for dummy in range(5):
1241 wait_for_bfd_packet(
1242 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1243 self.test_session.send_packet()
1244 events = self.vapi.collect_events()
1245 self.assert_equal(len(events), 0, "number of bfd events")
1247 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1248 def test_echo_source_removed(self):
1249 """ echo function stops if echo source is removed """
1250 bfd_session_up(self)
1251 self.test_session.update(required_min_echo_rx=150000)
1252 self.test_session.send_packet()
1253 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1254 # wait for first echo packet
1256 p = self.pg0.wait_for_packet(1)
1257 self.logger.debug(ppp("Got packet:", p))
1258 if p[UDP].dport == BFD.udp_dport_echo:
1259 self.logger.debug(ppp("Looping back packet:", p))
1260 p[Ether].dst = self.pg0.local_mac
1261 self.pg0.add_stream(p)
1264 elif p.haslayer(BFD):
1268 raise Exception(ppp("Received unknown packet:", p))
1269 self.vapi.bfd_udp_del_echo_source()
1270 self.test_session.send_packet()
1271 # echo packets shouldn't arrive anymore
1272 for dummy in range(5):
1273 wait_for_bfd_packet(
1274 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1275 self.test_session.send_packet()
1276 events = self.vapi.collect_events()
1277 self.assert_equal(len(events), 0, "number of bfd events")
1279 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1280 def test_stale_echo(self):
1281 """ stale echo packets don't keep a session up """
1282 bfd_session_up(self)
1283 self.test_session.update(required_min_echo_rx=150000)
1284 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1285 self.test_session.send_packet()
1286 # should be turned on - loopback echo packets
1290 for dummy in range(10 * self.vpp_session.detect_mult):
1291 p = self.pg0.wait_for_packet(1)
1292 if p[UDP].dport == BFD.udp_dport_echo:
1293 if echo_packet is None:
1294 self.logger.debug(ppp("Got first echo packet:", p))
1296 timeout_at = time.time() + self.vpp_session.detect_mult * \
1297 self.test_session.required_min_echo_rx / USEC_IN_SEC
1299 self.logger.debug(ppp("Got followup echo packet:", p))
1300 self.logger.debug(ppp("Looping back first echo packet:", p))
1301 echo_packet[Ether].dst = self.pg0.local_mac
1302 self.pg0.add_stream(echo_packet)
1304 elif p.haslayer(BFD):
1305 self.logger.debug(ppp("Got packet:", p))
1306 if "P" in p.sprintf("%BFD.flags%"):
1307 final = self.test_session.create_packet()
1308 final[BFD].flags = "F"
1309 self.test_session.send_packet(final)
1310 if p[BFD].state == BFDState.down:
1311 self.assertIsNotNone(
1313 "Session went down before first echo packet received")
1315 self.assertGreaterEqual(
1317 "Session timeout at %s, but is expected at %s" %
1319 self.assert_equal(p[BFD].diag,
1320 BFDDiagCode.echo_function_failed,
1322 events = self.vapi.collect_events()
1323 self.assert_equal(len(events), 1, "number of bfd events")
1324 self.assert_equal(events[0].state, BFDState.down, BFDState)
1328 raise Exception(ppp("Received unknown packet:", p))
1329 self.test_session.send_packet()
1330 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1332 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1333 def test_invalid_echo_checksum(self):
1334 """ echo packets with invalid checksum don't keep a session up """
1335 bfd_session_up(self)
1336 self.test_session.update(required_min_echo_rx=150000)
1337 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1338 self.test_session.send_packet()
1339 # should be turned on - loopback echo packets
1342 for dummy in range(10 * self.vpp_session.detect_mult):
1343 p = self.pg0.wait_for_packet(1)
1344 if p[UDP].dport == BFD.udp_dport_echo:
1345 self.logger.debug(ppp("Got echo packet:", p))
1346 if timeout_at is None:
1347 timeout_at = time.time() + self.vpp_session.detect_mult * \
1348 self.test_session.required_min_echo_rx / USEC_IN_SEC
1349 p[BFD_vpp_echo].checksum = getrandbits(64)
1350 p[Ether].dst = self.pg0.local_mac
1351 self.logger.debug(ppp("Looping back modified echo packet:", p))
1352 self.pg0.add_stream(p)
1354 elif p.haslayer(BFD):
1355 self.logger.debug(ppp("Got packet:", p))
1356 if "P" in p.sprintf("%BFD.flags%"):
1357 final = self.test_session.create_packet()
1358 final[BFD].flags = "F"
1359 self.test_session.send_packet(final)
1360 if p[BFD].state == BFDState.down:
1361 self.assertIsNotNone(
1363 "Session went down before first echo packet received")
1365 self.assertGreaterEqual(
1367 "Session timeout at %s, but is expected at %s" %
1369 self.assert_equal(p[BFD].diag,
1370 BFDDiagCode.echo_function_failed,
1372 events = self.vapi.collect_events()
1373 self.assert_equal(len(events), 1, "number of bfd events")
1374 self.assert_equal(events[0].state, BFDState.down, BFDState)
1378 raise Exception(ppp("Received unknown packet:", p))
1379 self.test_session.send_packet()
1380 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1382 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1383 def test_admin_up_down(self):
1384 """ put session admin-up and admin-down """
1385 bfd_session_up(self)
1386 self.vpp_session.admin_down()
1387 self.pg0.enable_capture()
1388 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1389 verify_event(self, e, expected_state=BFDState.admin_down)
1390 for dummy in range(2):
1391 p = wait_for_bfd_packet(self)
1392 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1393 # try to bring session up - shouldn't be possible
1394 self.test_session.update(state=BFDState.init)
1395 self.test_session.send_packet()
1396 for dummy in range(2):
1397 p = wait_for_bfd_packet(self)
1398 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1399 self.vpp_session.admin_up()
1400 self.test_session.update(state=BFDState.down)
1401 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1402 verify_event(self, e, expected_state=BFDState.down)
1403 p = wait_for_bfd_packet(
1404 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1405 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1406 self.test_session.send_packet()
1407 p = wait_for_bfd_packet(
1408 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1409 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1410 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1411 verify_event(self, e, expected_state=BFDState.init)
1412 self.test_session.update(state=BFDState.up)
1413 self.test_session.send_packet()
1414 p = wait_for_bfd_packet(
1415 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1416 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1417 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1418 verify_event(self, e, expected_state=BFDState.up)
1420 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1421 def test_config_change_remote_demand(self):
1422 """ configuration change while peer in demand mode """
1423 bfd_session_up(self)
1424 demand = self.test_session.create_packet()
1425 demand[BFD].flags = "D"
1426 self.test_session.send_packet(demand)
1427 self.vpp_session.modify_parameters(
1428 required_min_rx=2 * self.vpp_session.required_min_rx)
1429 p = wait_for_bfd_packet(
1430 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1431 # poll bit must be set
1432 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1433 # terminate poll sequence
1434 final = self.test_session.create_packet()
1435 final[BFD].flags = "D+F"
1436 self.test_session.send_packet(final)
1437 # vpp should be quiet now again
1438 transmit_time = 0.9 \
1439 * max(self.vpp_session.required_min_rx,
1440 self.test_session.desired_min_tx) \
1443 for dummy in range(self.test_session.detect_mult * 2):
1444 time.sleep(transmit_time)
1445 self.test_session.send_packet(demand)
1447 p = wait_for_bfd_packet(self, timeout=0)
1448 self.logger.error(ppp("Received unexpected packet:", p))
1450 except CaptureTimeoutError:
1452 events = self.vapi.collect_events()
1454 self.logger.error("Received unexpected event: %s", e)
1455 self.assert_equal(count, 0, "number of packets received")
1456 self.assert_equal(len(events), 0, "number of events received")
1458 def test_intf_deleted(self):
1459 """ interface with bfd session deleted """
1460 intf = VppLoInterface(self)
1463 sw_if_index = intf.sw_if_index
1464 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1465 vpp_session.add_vpp_config()
1466 vpp_session.admin_up()
1467 intf.remove_vpp_config()
1468 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1469 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1470 self.assertFalse(vpp_session.query_vpp_config())
1473 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1474 class BFD6TestCase(VppTestCase):
1475 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1478 vpp_clock_offset = None
1483 def setUpClass(cls):
1484 super(BFD6TestCase, cls).setUpClass()
1485 cls.vapi.cli("set log class bfd level debug")
1487 cls.create_pg_interfaces([0])
1488 cls.pg0.config_ip6()
1489 cls.pg0.configure_ipv6_neighbors()
1491 cls.pg0.resolve_ndp()
1492 cls.create_loopback_interfaces(1)
1493 cls.loopback0 = cls.lo_interfaces[0]
1494 cls.loopback0.config_ip6()
1495 cls.loopback0.admin_up()
1498 super(BFD6TestCase, cls).tearDownClass()
1502 super(BFD6TestCase, self).setUp()
1503 self.factory = AuthKeyFactory()
1504 self.vapi.want_bfd_events()
1505 self.pg0.enable_capture()
1507 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1508 self.pg0.remote_ip6,
1510 self.vpp_session.add_vpp_config()
1511 self.vpp_session.admin_up()
1512 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1513 self.logger.debug(self.vapi.cli("show adj nbr"))
1515 self.vapi.want_bfd_events(enable_disable=0)
1519 if not self.vpp_dead:
1520 self.vapi.want_bfd_events(enable_disable=0)
1521 self.vapi.collect_events() # clear the event queue
1522 super(BFD6TestCase, self).tearDown()
1524 def test_session_up(self):
1525 """ bring BFD session up """
1526 bfd_session_up(self)
1528 def test_session_up_by_ip(self):
1529 """ bring BFD session up - first frame looked up by address pair """
1530 self.logger.info("BFD: Sending Slow control frame")
1531 self.test_session.update(my_discriminator=randint(0, 40000000))
1532 self.test_session.send_packet()
1533 self.pg0.enable_capture()
1534 p = self.pg0.wait_for_packet(1)
1535 self.assert_equal(p[BFD].your_discriminator,
1536 self.test_session.my_discriminator,
1537 "BFD - your discriminator")
1538 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1539 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1541 self.logger.info("BFD: Waiting for event")
1542 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1543 verify_event(self, e, expected_state=BFDState.init)
1544 self.logger.info("BFD: Sending Up")
1545 self.test_session.send_packet()
1546 self.logger.info("BFD: Waiting for event")
1547 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1548 verify_event(self, e, expected_state=BFDState.up)
1549 self.logger.info("BFD: Session is Up")
1550 self.test_session.update(state=BFDState.up)
1551 self.test_session.send_packet()
1552 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1554 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1555 def test_hold_up(self):
1556 """ hold BFD session up """
1557 bfd_session_up(self)
1558 for dummy in range(self.test_session.detect_mult * 2):
1559 wait_for_bfd_packet(self)
1560 self.test_session.send_packet()
1561 self.assert_equal(len(self.vapi.collect_events()), 0,
1562 "number of bfd events")
1563 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1565 def test_echo_looped_back(self):
1566 """ echo packets looped back """
1567 # don't need a session in this case..
1568 self.vpp_session.remove_vpp_config()
1569 self.pg0.enable_capture()
1570 echo_packet_count = 10
1571 # random source port low enough to increment a few times..
1572 udp_sport_tx = randint(1, 50000)
1573 udp_sport_rx = udp_sport_tx
1574 echo_packet = (Ether(src=self.pg0.remote_mac,
1575 dst=self.pg0.local_mac) /
1576 IPv6(src=self.pg0.remote_ip6,
1577 dst=self.pg0.remote_ip6) /
1578 UDP(dport=BFD.udp_dport_echo) /
1579 Raw("this should be looped back"))
1580 for dummy in range(echo_packet_count):
1581 self.sleep(.01, "delay between echo packets")
1582 echo_packet[UDP].sport = udp_sport_tx
1584 self.logger.debug(ppp("Sending packet:", echo_packet))
1585 self.pg0.add_stream(echo_packet)
1587 for dummy in range(echo_packet_count):
1588 p = self.pg0.wait_for_packet(1)
1589 self.logger.debug(ppp("Got packet:", p))
1591 self.assert_equal(self.pg0.remote_mac,
1592 ether.dst, "Destination MAC")
1593 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1595 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1596 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1598 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1599 "UDP destination port")
1600 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1602 # need to compare the hex payload here, otherwise BFD_vpp_echo
1604 self.assertEqual(str(p[UDP].payload),
1605 str(echo_packet[UDP].payload),
1606 "Received packet is not the echo packet sent")
1607 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1608 "ECHO packet identifier for test purposes)")
1609 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1610 "ECHO packet identifier for test purposes)")
1612 def test_echo(self):
1613 """ echo function """
1614 bfd_session_up(self)
1615 self.test_session.update(required_min_echo_rx=150000)
1616 self.test_session.send_packet()
1617 detection_time = self.test_session.detect_mult *\
1618 self.vpp_session.required_min_rx / USEC_IN_SEC
1619 # echo shouldn't work without echo source set
1620 for dummy in range(10):
1621 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1622 self.sleep(sleep, "delay before sending bfd packet")
1623 self.test_session.send_packet()
1624 p = wait_for_bfd_packet(
1625 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1626 self.assert_equal(p[BFD].required_min_rx_interval,
1627 self.vpp_session.required_min_rx,
1628 "BFD required min rx interval")
1629 self.test_session.send_packet()
1630 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1632 # should be turned on - loopback echo packets
1633 for dummy in range(3):
1634 loop_until = time.time() + 0.75 * detection_time
1635 while time.time() < loop_until:
1636 p = self.pg0.wait_for_packet(1)
1637 self.logger.debug(ppp("Got packet:", p))
1638 if p[UDP].dport == BFD.udp_dport_echo:
1640 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1641 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1642 "BFD ECHO src IP equal to loopback IP")
1643 self.logger.debug(ppp("Looping back packet:", p))
1644 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1645 "ECHO packet destination MAC address")
1646 p[Ether].dst = self.pg0.local_mac
1647 self.pg0.add_stream(p)
1650 elif p.haslayer(BFD):
1652 self.assertGreaterEqual(
1653 p[BFD].required_min_rx_interval,
1655 if "P" in p.sprintf("%BFD.flags%"):
1656 final = self.test_session.create_packet()
1657 final[BFD].flags = "F"
1658 self.test_session.send_packet(final)
1660 raise Exception(ppp("Received unknown packet:", p))
1662 self.assert_equal(len(self.vapi.collect_events()), 0,
1663 "number of bfd events")
1664 self.test_session.send_packet()
1665 self.assertTrue(echo_seen, "No echo packets received")
1667 def test_intf_deleted(self):
1668 """ interface with bfd session deleted """
1669 intf = VppLoInterface(self)
1672 sw_if_index = intf.sw_if_index
1673 vpp_session = VppBFDUDPSession(
1674 self, intf, intf.remote_ip6, af=AF_INET6)
1675 vpp_session.add_vpp_config()
1676 vpp_session.admin_up()
1677 intf.remove_vpp_config()
1678 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1679 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1680 self.assertFalse(vpp_session.query_vpp_config())
1683 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1684 class BFDFIBTestCase(VppTestCase):
1685 """ BFD-FIB interactions (IPv6) """
1691 super(BFDFIBTestCase, self).setUp()
1692 self.create_pg_interfaces(range(1))
1694 self.vapi.want_bfd_events()
1695 self.pg0.enable_capture()
1697 for i in self.pg_interfaces:
1700 i.configure_ipv6_neighbors()
1703 if not self.vpp_dead:
1704 self.vapi.want_bfd_events(enable_disable=0)
1706 super(BFDFIBTestCase, self).tearDown()
1709 def pkt_is_not_data_traffic(p):
1710 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1711 if p.haslayer(BFD) or is_ipv6_misc(p):
1715 def test_session_with_fib(self):
1716 """ BFD-FIB interactions """
1718 # packets to match against both of the routes
1719 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1720 IPv6(src="3001::1", dst="2001::1") /
1721 UDP(sport=1234, dport=1234) /
1723 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1724 IPv6(src="3001::1", dst="2002::1") /
1725 UDP(sport=1234, dport=1234) /
1728 # A recursive and a non-recursive route via a next-hop that
1729 # will have a BFD session
1730 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1731 [VppRoutePath(self.pg0.remote_ip6,
1732 self.pg0.sw_if_index,
1733 proto=DpoProto.DPO_PROTO_IP6)],
1735 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1736 [VppRoutePath(self.pg0.remote_ip6,
1738 proto=DpoProto.DPO_PROTO_IP6)],
1740 ip_2001_s_64.add_vpp_config()
1741 ip_2002_s_64.add_vpp_config()
1743 # bring the session up now the routes are present
1744 self.vpp_session = VppBFDUDPSession(self,
1746 self.pg0.remote_ip6,
1748 self.vpp_session.add_vpp_config()
1749 self.vpp_session.admin_up()
1750 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1752 # session is up - traffic passes
1753 bfd_session_up(self)
1755 self.pg0.add_stream(p)
1758 captured = self.pg0.wait_for_packet(
1760 filter_out_fn=self.pkt_is_not_data_traffic)
1761 self.assertEqual(captured[IPv6].dst,
1764 # session is up - traffic is dropped
1765 bfd_session_down(self)
1767 self.pg0.add_stream(p)
1769 with self.assertRaises(CaptureTimeoutError):
1770 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1772 # session is up - traffic passes
1773 bfd_session_up(self)
1775 self.pg0.add_stream(p)
1778 captured = self.pg0.wait_for_packet(
1780 filter_out_fn=self.pkt_is_not_data_traffic)
1781 self.assertEqual(captured[IPv6].dst,
1785 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1786 class BFDSHA1TestCase(VppTestCase):
1787 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1790 vpp_clock_offset = None
1795 def setUpClass(cls):
1796 super(BFDSHA1TestCase, cls).setUpClass()
1797 cls.vapi.cli("set log class bfd level debug")
1799 cls.create_pg_interfaces([0])
1800 cls.pg0.config_ip4()
1802 cls.pg0.resolve_arp()
1805 super(BFDSHA1TestCase, cls).tearDownClass()
1809 super(BFDSHA1TestCase, self).setUp()
1810 self.factory = AuthKeyFactory()
1811 self.vapi.want_bfd_events()
1812 self.pg0.enable_capture()
1815 if not self.vpp_dead:
1816 self.vapi.want_bfd_events(enable_disable=0)
1817 self.vapi.collect_events() # clear the event queue
1818 super(BFDSHA1TestCase, self).tearDown()
1820 def test_session_up(self):
1821 """ bring BFD session up """
1822 key = self.factory.create_random_key(self)
1823 key.add_vpp_config()
1824 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1825 self.pg0.remote_ip4,
1827 self.vpp_session.add_vpp_config()
1828 self.vpp_session.admin_up()
1829 self.test_session = BFDTestSession(
1830 self, self.pg0, AF_INET, sha1_key=key,
1831 bfd_key_id=self.vpp_session.bfd_key_id)
1832 bfd_session_up(self)
1834 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1835 def test_hold_up(self):
1836 """ hold BFD session up """
1837 key = self.factory.create_random_key(self)
1838 key.add_vpp_config()
1839 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1840 self.pg0.remote_ip4,
1842 self.vpp_session.add_vpp_config()
1843 self.vpp_session.admin_up()
1844 self.test_session = BFDTestSession(
1845 self, self.pg0, AF_INET, sha1_key=key,
1846 bfd_key_id=self.vpp_session.bfd_key_id)
1847 bfd_session_up(self)
1848 for dummy in range(self.test_session.detect_mult * 2):
1849 wait_for_bfd_packet(self)
1850 self.test_session.send_packet()
1851 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1853 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1854 def test_hold_up_meticulous(self):
1855 """ hold BFD session up - meticulous auth """
1856 key = self.factory.create_random_key(
1857 self, BFDAuthType.meticulous_keyed_sha1)
1858 key.add_vpp_config()
1859 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1860 self.pg0.remote_ip4, sha1_key=key)
1861 self.vpp_session.add_vpp_config()
1862 self.vpp_session.admin_up()
1863 # specify sequence number so that it wraps
1864 self.test_session = BFDTestSession(
1865 self, self.pg0, AF_INET, sha1_key=key,
1866 bfd_key_id=self.vpp_session.bfd_key_id,
1867 our_seq_number=0xFFFFFFFF - 4)
1868 bfd_session_up(self)
1869 for dummy in range(30):
1870 wait_for_bfd_packet(self)
1871 self.test_session.inc_seq_num()
1872 self.test_session.send_packet()
1873 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1875 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1876 def test_send_bad_seq_number(self):
1877 """ session is not kept alive by msgs with bad sequence numbers"""
1878 key = self.factory.create_random_key(
1879 self, BFDAuthType.meticulous_keyed_sha1)
1880 key.add_vpp_config()
1881 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1882 self.pg0.remote_ip4, sha1_key=key)
1883 self.vpp_session.add_vpp_config()
1884 self.test_session = BFDTestSession(
1885 self, self.pg0, AF_INET, sha1_key=key,
1886 bfd_key_id=self.vpp_session.bfd_key_id)
1887 bfd_session_up(self)
1888 detection_time = self.test_session.detect_mult *\
1889 self.vpp_session.required_min_rx / USEC_IN_SEC
1890 send_until = time.time() + 2 * detection_time
1891 while time.time() < send_until:
1892 self.test_session.send_packet()
1893 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1894 "time between bfd packets")
1895 e = self.vapi.collect_events()
1896 # session should be down now, because the sequence numbers weren't
1898 self.assert_equal(len(e), 1, "number of bfd events")
1899 verify_event(self, e[0], expected_state=BFDState.down)
1901 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1902 legitimate_test_session,
1904 rogue_bfd_values=None):
1905 """ execute a rogue session interaction scenario
1907 1. create vpp session, add config
1908 2. bring the legitimate session up
1909 3. copy the bfd values from legitimate session to rogue session
1910 4. apply rogue_bfd_values to rogue session
1911 5. set rogue session state to down
1912 6. send message to take the session down from the rogue session
1913 7. assert that the legitimate session is unaffected
1916 self.vpp_session = vpp_bfd_udp_session
1917 self.vpp_session.add_vpp_config()
1918 self.test_session = legitimate_test_session
1919 # bring vpp session up
1920 bfd_session_up(self)
1921 # send packet from rogue session
1922 rogue_test_session.update(
1923 my_discriminator=self.test_session.my_discriminator,
1924 your_discriminator=self.test_session.your_discriminator,
1925 desired_min_tx=self.test_session.desired_min_tx,
1926 required_min_rx=self.test_session.required_min_rx,
1927 detect_mult=self.test_session.detect_mult,
1928 diag=self.test_session.diag,
1929 state=self.test_session.state,
1930 auth_type=self.test_session.auth_type)
1931 if rogue_bfd_values:
1932 rogue_test_session.update(**rogue_bfd_values)
1933 rogue_test_session.update(state=BFDState.down)
1934 rogue_test_session.send_packet()
1935 wait_for_bfd_packet(self)
1936 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1938 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1939 def test_mismatch_auth(self):
1940 """ session is not brought down by unauthenticated msg """
1941 key = self.factory.create_random_key(self)
1942 key.add_vpp_config()
1943 vpp_session = VppBFDUDPSession(
1944 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1945 legitimate_test_session = BFDTestSession(
1946 self, self.pg0, AF_INET, sha1_key=key,
1947 bfd_key_id=vpp_session.bfd_key_id)
1948 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1949 self.execute_rogue_session_scenario(vpp_session,
1950 legitimate_test_session,
1953 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1954 def test_mismatch_bfd_key_id(self):
1955 """ session is not brought down by msg with non-existent key-id """
1956 key = self.factory.create_random_key(self)
1957 key.add_vpp_config()
1958 vpp_session = VppBFDUDPSession(
1959 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1960 # pick a different random bfd key id
1962 while x == vpp_session.bfd_key_id:
1964 legitimate_test_session = BFDTestSession(
1965 self, self.pg0, AF_INET, sha1_key=key,
1966 bfd_key_id=vpp_session.bfd_key_id)
1967 rogue_test_session = BFDTestSession(
1968 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1969 self.execute_rogue_session_scenario(vpp_session,
1970 legitimate_test_session,
1973 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1974 def test_mismatched_auth_type(self):
1975 """ session is not brought down by msg with wrong auth type """
1976 key = self.factory.create_random_key(self)
1977 key.add_vpp_config()
1978 vpp_session = VppBFDUDPSession(
1979 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1980 legitimate_test_session = BFDTestSession(
1981 self, self.pg0, AF_INET, sha1_key=key,
1982 bfd_key_id=vpp_session.bfd_key_id)
1983 rogue_test_session = BFDTestSession(
1984 self, self.pg0, AF_INET, sha1_key=key,
1985 bfd_key_id=vpp_session.bfd_key_id)
1986 self.execute_rogue_session_scenario(
1987 vpp_session, legitimate_test_session, rogue_test_session,
1988 {'auth_type': BFDAuthType.keyed_md5})
1990 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1991 def test_restart(self):
1992 """ simulate remote peer restart and resynchronization """
1993 key = self.factory.create_random_key(
1994 self, BFDAuthType.meticulous_keyed_sha1)
1995 key.add_vpp_config()
1996 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1997 self.pg0.remote_ip4, sha1_key=key)
1998 self.vpp_session.add_vpp_config()
1999 self.test_session = BFDTestSession(
2000 self, self.pg0, AF_INET, sha1_key=key,
2001 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2002 bfd_session_up(self)
2003 # don't send any packets for 2*detection_time
2004 detection_time = self.test_session.detect_mult *\
2005 self.vpp_session.required_min_rx / USEC_IN_SEC
2006 self.sleep(2 * detection_time, "simulating peer restart")
2007 events = self.vapi.collect_events()
2008 self.assert_equal(len(events), 1, "number of bfd events")
2009 verify_event(self, events[0], expected_state=BFDState.down)
2010 self.test_session.update(state=BFDState.down)
2011 # reset sequence number
2012 self.test_session.our_seq_number = 0
2013 self.test_session.vpp_seq_number = None
2014 # now throw away any pending packets
2015 self.pg0.enable_capture()
2016 bfd_session_up(self)
2019 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2020 class BFDAuthOnOffTestCase(VppTestCase):
2021 """Bidirectional Forwarding Detection (BFD) (changing auth) """
2028 def setUpClass(cls):
2029 super(BFDAuthOnOffTestCase, cls).setUpClass()
2030 cls.vapi.cli("set log class bfd level debug")
2032 cls.create_pg_interfaces([0])
2033 cls.pg0.config_ip4()
2035 cls.pg0.resolve_arp()
2038 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2042 super(BFDAuthOnOffTestCase, self).setUp()
2043 self.factory = AuthKeyFactory()
2044 self.vapi.want_bfd_events()
2045 self.pg0.enable_capture()
2048 if not self.vpp_dead:
2049 self.vapi.want_bfd_events(enable_disable=0)
2050 self.vapi.collect_events() # clear the event queue
2051 super(BFDAuthOnOffTestCase, self).tearDown()
2053 def test_auth_on_immediate(self):
2054 """ turn auth on without disturbing session state (immediate) """
2055 key = self.factory.create_random_key(self)
2056 key.add_vpp_config()
2057 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2058 self.pg0.remote_ip4)
2059 self.vpp_session.add_vpp_config()
2060 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2061 bfd_session_up(self)
2062 for dummy in range(self.test_session.detect_mult * 2):
2063 p = wait_for_bfd_packet(self)
2064 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2065 self.test_session.send_packet()
2066 self.vpp_session.activate_auth(key)
2067 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2068 self.test_session.sha1_key = key
2069 for dummy in range(self.test_session.detect_mult * 2):
2070 p = wait_for_bfd_packet(self)
2071 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2072 self.test_session.send_packet()
2073 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2074 self.assert_equal(len(self.vapi.collect_events()), 0,
2075 "number of bfd events")
2077 def test_auth_off_immediate(self):
2078 """ turn auth off without disturbing session state (immediate) """
2079 key = self.factory.create_random_key(self)
2080 key.add_vpp_config()
2081 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2082 self.pg0.remote_ip4, sha1_key=key)
2083 self.vpp_session.add_vpp_config()
2084 self.test_session = BFDTestSession(
2085 self, self.pg0, AF_INET, sha1_key=key,
2086 bfd_key_id=self.vpp_session.bfd_key_id)
2087 bfd_session_up(self)
2088 # self.vapi.want_bfd_events(enable_disable=0)
2089 for dummy in range(self.test_session.detect_mult * 2):
2090 p = wait_for_bfd_packet(self)
2091 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2092 self.test_session.inc_seq_num()
2093 self.test_session.send_packet()
2094 self.vpp_session.deactivate_auth()
2095 self.test_session.bfd_key_id = None
2096 self.test_session.sha1_key = None
2097 for dummy in range(self.test_session.detect_mult * 2):
2098 p = wait_for_bfd_packet(self)
2099 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2100 self.test_session.inc_seq_num()
2101 self.test_session.send_packet()
2102 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2103 self.assert_equal(len(self.vapi.collect_events()), 0,
2104 "number of bfd events")
2106 def test_auth_change_key_immediate(self):
2107 """ change auth key without disturbing session state (immediate) """
2108 key1 = self.factory.create_random_key(self)
2109 key1.add_vpp_config()
2110 key2 = self.factory.create_random_key(self)
2111 key2.add_vpp_config()
2112 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2113 self.pg0.remote_ip4, sha1_key=key1)
2114 self.vpp_session.add_vpp_config()
2115 self.test_session = BFDTestSession(
2116 self, self.pg0, AF_INET, sha1_key=key1,
2117 bfd_key_id=self.vpp_session.bfd_key_id)
2118 bfd_session_up(self)
2119 for dummy in range(self.test_session.detect_mult * 2):
2120 p = wait_for_bfd_packet(self)
2121 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2122 self.test_session.send_packet()
2123 self.vpp_session.activate_auth(key2)
2124 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2125 self.test_session.sha1_key = key2
2126 for dummy in range(self.test_session.detect_mult * 2):
2127 p = wait_for_bfd_packet(self)
2128 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2129 self.test_session.send_packet()
2130 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2131 self.assert_equal(len(self.vapi.collect_events()), 0,
2132 "number of bfd events")
2134 def test_auth_on_delayed(self):
2135 """ turn auth on without disturbing session state (delayed) """
2136 key = self.factory.create_random_key(self)
2137 key.add_vpp_config()
2138 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2139 self.pg0.remote_ip4)
2140 self.vpp_session.add_vpp_config()
2141 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2142 bfd_session_up(self)
2143 for dummy in range(self.test_session.detect_mult * 2):
2144 wait_for_bfd_packet(self)
2145 self.test_session.send_packet()
2146 self.vpp_session.activate_auth(key, delayed=True)
2147 for dummy in range(self.test_session.detect_mult * 2):
2148 p = wait_for_bfd_packet(self)
2149 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2150 self.test_session.send_packet()
2151 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2152 self.test_session.sha1_key = key
2153 self.test_session.send_packet()
2154 for dummy in range(self.test_session.detect_mult * 2):
2155 p = wait_for_bfd_packet(self)
2156 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2157 self.test_session.send_packet()
2158 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2159 self.assert_equal(len(self.vapi.collect_events()), 0,
2160 "number of bfd events")
2162 def test_auth_off_delayed(self):
2163 """ turn auth off without disturbing session state (delayed) """
2164 key = self.factory.create_random_key(self)
2165 key.add_vpp_config()
2166 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2167 self.pg0.remote_ip4, sha1_key=key)
2168 self.vpp_session.add_vpp_config()
2169 self.test_session = BFDTestSession(
2170 self, self.pg0, AF_INET, sha1_key=key,
2171 bfd_key_id=self.vpp_session.bfd_key_id)
2172 bfd_session_up(self)
2173 for dummy in range(self.test_session.detect_mult * 2):
2174 p = wait_for_bfd_packet(self)
2175 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2176 self.test_session.send_packet()
2177 self.vpp_session.deactivate_auth(delayed=True)
2178 for dummy in range(self.test_session.detect_mult * 2):
2179 p = wait_for_bfd_packet(self)
2180 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2181 self.test_session.send_packet()
2182 self.test_session.bfd_key_id = None
2183 self.test_session.sha1_key = None
2184 self.test_session.send_packet()
2185 for dummy in range(self.test_session.detect_mult * 2):
2186 p = wait_for_bfd_packet(self)
2187 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2188 self.test_session.send_packet()
2189 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2190 self.assert_equal(len(self.vapi.collect_events()), 0,
2191 "number of bfd events")
2193 def test_auth_change_key_delayed(self):
2194 """ change auth key without disturbing session state (delayed) """
2195 key1 = self.factory.create_random_key(self)
2196 key1.add_vpp_config()
2197 key2 = self.factory.create_random_key(self)
2198 key2.add_vpp_config()
2199 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2200 self.pg0.remote_ip4, sha1_key=key1)
2201 self.vpp_session.add_vpp_config()
2202 self.vpp_session.admin_up()
2203 self.test_session = BFDTestSession(
2204 self, self.pg0, AF_INET, sha1_key=key1,
2205 bfd_key_id=self.vpp_session.bfd_key_id)
2206 bfd_session_up(self)
2207 for dummy in range(self.test_session.detect_mult * 2):
2208 p = wait_for_bfd_packet(self)
2209 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2210 self.test_session.send_packet()
2211 self.vpp_session.activate_auth(key2, delayed=True)
2212 for dummy in range(self.test_session.detect_mult * 2):
2213 p = wait_for_bfd_packet(self)
2214 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2215 self.test_session.send_packet()
2216 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2217 self.test_session.sha1_key = key2
2218 self.test_session.send_packet()
2219 for dummy in range(self.test_session.detect_mult * 2):
2220 p = wait_for_bfd_packet(self)
2221 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2222 self.test_session.send_packet()
2223 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2224 self.assert_equal(len(self.vapi.collect_events()), 0,
2225 "number of bfd events")
2228 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2229 class BFDCLITestCase(VppTestCase):
2230 """Bidirectional Forwarding Detection (BFD) (CLI) """
2234 def setUpClass(cls):
2235 super(BFDCLITestCase, cls).setUpClass()
2236 cls.vapi.cli("set log class bfd level debug")
2238 cls.create_pg_interfaces((0,))
2239 cls.pg0.config_ip4()
2240 cls.pg0.config_ip6()
2241 cls.pg0.resolve_arp()
2242 cls.pg0.resolve_ndp()
2245 super(BFDCLITestCase, cls).tearDownClass()
2249 super(BFDCLITestCase, self).setUp()
2250 self.factory = AuthKeyFactory()
2251 self.pg0.enable_capture()
2255 self.vapi.want_bfd_events(enable_disable=0)
2256 except UnexpectedApiReturnValueError:
2257 # some tests aren't subscribed, so this is not an issue
2259 self.vapi.collect_events() # clear the event queue
2260 super(BFDCLITestCase, self).tearDown()
2262 def cli_verify_no_response(self, cli):
2263 """ execute a CLI, asserting that the response is empty """
2264 self.assert_equal(self.vapi.cli(cli),
2266 "CLI command response")
2268 def cli_verify_response(self, cli, expected):
2269 """ execute a CLI, asserting that the response matches expectation """
2270 self.assert_equal(self.vapi.cli(cli).strip(),
2272 "CLI command response")
2274 def test_show(self):
2275 """ show commands """
2276 k1 = self.factory.create_random_key(self)
2278 k2 = self.factory.create_random_key(
2279 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2281 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2283 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2286 self.logger.info(self.vapi.ppcli("show bfd keys"))
2287 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2288 self.logger.info(self.vapi.ppcli("show bfd"))
2290 def test_set_del_sha1_key(self):
2291 """ set/delete SHA1 auth key """
2292 k = self.factory.create_random_key(self)
2293 self.registry.register(k, self.logger)
2294 self.cli_verify_no_response(
2295 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2297 "".join("{:02x}".format(ord(c)) for c in k.key)))
2298 self.assertTrue(k.query_vpp_config())
2299 self.vpp_session = VppBFDUDPSession(
2300 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2301 self.vpp_session.add_vpp_config()
2302 self.test_session = \
2303 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2304 bfd_key_id=self.vpp_session.bfd_key_id)
2305 self.vapi.want_bfd_events()
2306 bfd_session_up(self)
2307 bfd_session_down(self)
2308 # try to replace the secret for the key - should fail because the key
2310 k2 = self.factory.create_random_key(self)
2311 self.cli_verify_response(
2312 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2314 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2315 "bfd key set: `bfd_auth_set_key' API call failed, "
2316 "rv=-103:BFD object in use")
2317 # manipulating the session using old secret should still work
2318 bfd_session_up(self)
2319 bfd_session_down(self)
2320 self.vpp_session.remove_vpp_config()
2321 self.cli_verify_no_response(
2322 "bfd key del conf-key-id %s" % k.conf_key_id)
2323 self.assertFalse(k.query_vpp_config())
2325 def test_set_del_meticulous_sha1_key(self):
2326 """ set/delete meticulous SHA1 auth key """
2327 k = self.factory.create_random_key(
2328 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2329 self.registry.register(k, self.logger)
2330 self.cli_verify_no_response(
2331 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2333 "".join("{:02x}".format(ord(c)) for c in k.key)))
2334 self.assertTrue(k.query_vpp_config())
2335 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2336 self.pg0.remote_ip6, af=AF_INET6,
2338 self.vpp_session.add_vpp_config()
2339 self.vpp_session.admin_up()
2340 self.test_session = \
2341 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2342 bfd_key_id=self.vpp_session.bfd_key_id)
2343 self.vapi.want_bfd_events()
2344 bfd_session_up(self)
2345 bfd_session_down(self)
2346 # try to replace the secret for the key - should fail because the key
2348 k2 = self.factory.create_random_key(self)
2349 self.cli_verify_response(
2350 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2352 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2353 "bfd key set: `bfd_auth_set_key' API call failed, "
2354 "rv=-103:BFD object in use")
2355 # manipulating the session using old secret should still work
2356 bfd_session_up(self)
2357 bfd_session_down(self)
2358 self.vpp_session.remove_vpp_config()
2359 self.cli_verify_no_response(
2360 "bfd key del conf-key-id %s" % k.conf_key_id)
2361 self.assertFalse(k.query_vpp_config())
2363 def test_add_mod_del_bfd_udp(self):
2364 """ create/modify/delete IPv4 BFD UDP session """
2365 vpp_session = VppBFDUDPSession(
2366 self, self.pg0, self.pg0.remote_ip4)
2367 self.registry.register(vpp_session, self.logger)
2368 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2369 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2370 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2371 self.pg0.remote_ip4,
2372 vpp_session.desired_min_tx,
2373 vpp_session.required_min_rx,
2374 vpp_session.detect_mult)
2375 self.cli_verify_no_response(cli_add_cmd)
2376 # 2nd add should fail
2377 self.cli_verify_response(
2379 "bfd udp session add: `bfd_add_add_session' API call"
2380 " failed, rv=-101:Duplicate BFD object")
2381 verify_bfd_session_config(self, vpp_session)
2382 mod_session = VppBFDUDPSession(
2383 self, self.pg0, self.pg0.remote_ip4,
2384 required_min_rx=2 * vpp_session.required_min_rx,
2385 desired_min_tx=3 * vpp_session.desired_min_tx,
2386 detect_mult=4 * vpp_session.detect_mult)
2387 self.cli_verify_no_response(
2388 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2389 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2390 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2391 mod_session.desired_min_tx, mod_session.required_min_rx,
2392 mod_session.detect_mult))
2393 verify_bfd_session_config(self, mod_session)
2394 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2395 "peer-addr %s" % (self.pg0.name,
2396 self.pg0.local_ip4, self.pg0.remote_ip4)
2397 self.cli_verify_no_response(cli_del_cmd)
2398 # 2nd del is expected to fail
2399 self.cli_verify_response(
2400 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2401 " failed, rv=-102:No such BFD object")
2402 self.assertFalse(vpp_session.query_vpp_config())
2404 def test_add_mod_del_bfd_udp6(self):
2405 """ create/modify/delete IPv6 BFD UDP session """
2406 vpp_session = VppBFDUDPSession(
2407 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2408 self.registry.register(vpp_session, self.logger)
2409 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2410 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2411 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2412 self.pg0.remote_ip6,
2413 vpp_session.desired_min_tx,
2414 vpp_session.required_min_rx,
2415 vpp_session.detect_mult)
2416 self.cli_verify_no_response(cli_add_cmd)
2417 # 2nd add should fail
2418 self.cli_verify_response(
2420 "bfd udp session add: `bfd_add_add_session' API call"
2421 " failed, rv=-101:Duplicate BFD object")
2422 verify_bfd_session_config(self, vpp_session)
2423 mod_session = VppBFDUDPSession(
2424 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2425 required_min_rx=2 * vpp_session.required_min_rx,
2426 desired_min_tx=3 * vpp_session.desired_min_tx,
2427 detect_mult=4 * vpp_session.detect_mult)
2428 self.cli_verify_no_response(
2429 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2430 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2431 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2432 mod_session.desired_min_tx,
2433 mod_session.required_min_rx, mod_session.detect_mult))
2434 verify_bfd_session_config(self, mod_session)
2435 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2436 "peer-addr %s" % (self.pg0.name,
2437 self.pg0.local_ip6, self.pg0.remote_ip6)
2438 self.cli_verify_no_response(cli_del_cmd)
2439 # 2nd del is expected to fail
2440 self.cli_verify_response(
2442 "bfd udp session del: `bfd_udp_del_session' API call"
2443 " failed, rv=-102:No such BFD object")
2444 self.assertFalse(vpp_session.query_vpp_config())
2446 def test_add_mod_del_bfd_udp_auth(self):
2447 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2448 key = self.factory.create_random_key(self)
2449 key.add_vpp_config()
2450 vpp_session = VppBFDUDPSession(
2451 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2452 self.registry.register(vpp_session, self.logger)
2453 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2454 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2455 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2456 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2457 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2458 vpp_session.detect_mult, key.conf_key_id,
2459 vpp_session.bfd_key_id)
2460 self.cli_verify_no_response(cli_add_cmd)
2461 # 2nd add should fail
2462 self.cli_verify_response(
2464 "bfd udp session add: `bfd_add_add_session' API call"
2465 " failed, rv=-101:Duplicate BFD object")
2466 verify_bfd_session_config(self, vpp_session)
2467 mod_session = VppBFDUDPSession(
2468 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2469 bfd_key_id=vpp_session.bfd_key_id,
2470 required_min_rx=2 * vpp_session.required_min_rx,
2471 desired_min_tx=3 * vpp_session.desired_min_tx,
2472 detect_mult=4 * vpp_session.detect_mult)
2473 self.cli_verify_no_response(
2474 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2475 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2476 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2477 mod_session.desired_min_tx,
2478 mod_session.required_min_rx, mod_session.detect_mult))
2479 verify_bfd_session_config(self, mod_session)
2480 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2481 "peer-addr %s" % (self.pg0.name,
2482 self.pg0.local_ip4, self.pg0.remote_ip4)
2483 self.cli_verify_no_response(cli_del_cmd)
2484 # 2nd del is expected to fail
2485 self.cli_verify_response(
2487 "bfd udp session del: `bfd_udp_del_session' API call"
2488 " failed, rv=-102:No such BFD object")
2489 self.assertFalse(vpp_session.query_vpp_config())
2491 def test_add_mod_del_bfd_udp6_auth(self):
2492 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2493 key = self.factory.create_random_key(
2494 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2495 key.add_vpp_config()
2496 vpp_session = VppBFDUDPSession(
2497 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2498 self.registry.register(vpp_session, self.logger)
2499 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2500 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2501 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2502 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2503 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2504 vpp_session.detect_mult, key.conf_key_id,
2505 vpp_session.bfd_key_id)
2506 self.cli_verify_no_response(cli_add_cmd)
2507 # 2nd add should fail
2508 self.cli_verify_response(
2510 "bfd udp session add: `bfd_add_add_session' API call"
2511 " failed, rv=-101:Duplicate BFD object")
2512 verify_bfd_session_config(self, vpp_session)
2513 mod_session = VppBFDUDPSession(
2514 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2515 bfd_key_id=vpp_session.bfd_key_id,
2516 required_min_rx=2 * vpp_session.required_min_rx,
2517 desired_min_tx=3 * vpp_session.desired_min_tx,
2518 detect_mult=4 * vpp_session.detect_mult)
2519 self.cli_verify_no_response(
2520 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2521 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2522 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2523 mod_session.desired_min_tx,
2524 mod_session.required_min_rx, mod_session.detect_mult))
2525 verify_bfd_session_config(self, mod_session)
2526 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2527 "peer-addr %s" % (self.pg0.name,
2528 self.pg0.local_ip6, self.pg0.remote_ip6)
2529 self.cli_verify_no_response(cli_del_cmd)
2530 # 2nd del is expected to fail
2531 self.cli_verify_response(
2533 "bfd udp session del: `bfd_udp_del_session' API call"
2534 " failed, rv=-102:No such BFD object")
2535 self.assertFalse(vpp_session.query_vpp_config())
2537 def test_auth_on_off(self):
2538 """ turn authentication on and off """
2539 key = self.factory.create_random_key(
2540 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2541 key.add_vpp_config()
2542 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2543 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2545 session.add_vpp_config()
2547 "bfd udp session auth activate interface %s local-addr %s "\
2548 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2549 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2550 key.conf_key_id, auth_session.bfd_key_id)
2551 self.cli_verify_no_response(cli_activate)
2552 verify_bfd_session_config(self, auth_session)
2553 self.cli_verify_no_response(cli_activate)
2554 verify_bfd_session_config(self, auth_session)
2556 "bfd udp session auth deactivate interface %s local-addr %s "\
2558 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2559 self.cli_verify_no_response(cli_deactivate)
2560 verify_bfd_session_config(self, session)
2561 self.cli_verify_no_response(cli_deactivate)
2562 verify_bfd_session_config(self, session)
2564 def test_auth_on_off_delayed(self):
2565 """ turn authentication on and off (delayed) """
2566 key = self.factory.create_random_key(
2567 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2568 key.add_vpp_config()
2569 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2570 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2572 session.add_vpp_config()
2574 "bfd udp session auth activate interface %s local-addr %s "\
2575 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2576 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2577 key.conf_key_id, auth_session.bfd_key_id)
2578 self.cli_verify_no_response(cli_activate)
2579 verify_bfd_session_config(self, auth_session)
2580 self.cli_verify_no_response(cli_activate)
2581 verify_bfd_session_config(self, auth_session)
2583 "bfd udp session auth deactivate interface %s local-addr %s "\
2584 "peer-addr %s delayed yes"\
2585 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2586 self.cli_verify_no_response(cli_deactivate)
2587 verify_bfd_session_config(self, session)
2588 self.cli_verify_no_response(cli_deactivate)
2589 verify_bfd_session_config(self, session)
2591 def test_admin_up_down(self):
2592 """ put session admin-up and admin-down """
2593 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2594 session.add_vpp_config()
2596 "bfd udp session set-flags admin down interface %s local-addr %s "\
2598 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2600 "bfd udp session set-flags admin up interface %s local-addr %s "\
2602 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2603 self.cli_verify_no_response(cli_down)
2604 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2605 self.cli_verify_no_response(cli_up)
2606 verify_bfd_session_config(self, session, state=BFDState.down)
2608 def test_set_del_udp_echo_source(self):
2609 """ set/del udp echo source """
2610 self.create_loopback_interfaces(1)
2611 self.loopback0 = self.lo_interfaces[0]
2612 self.loopback0.admin_up()
2613 self.cli_verify_response("show bfd echo-source",
2614 "UDP echo source is not set.")
2615 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2616 self.cli_verify_no_response(cli_set)
2617 self.cli_verify_response("show bfd echo-source",
2618 "UDP echo source is: %s\n"
2619 "IPv4 address usable as echo source: none\n"
2620 "IPv6 address usable as echo source: none" %
2621 self.loopback0.name)
2622 self.loopback0.config_ip4()
2623 unpacked = unpack("!L", self.loopback0.local_ip4n)
2624 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2625 self.cli_verify_response("show bfd echo-source",
2626 "UDP echo source is: %s\n"
2627 "IPv4 address usable as echo source: %s\n"
2628 "IPv6 address usable as echo source: none" %
2629 (self.loopback0.name, echo_ip4))
2630 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2631 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2632 unpacked[2], unpacked[3] ^ 1))
2633 self.loopback0.config_ip6()
2634 self.cli_verify_response("show bfd echo-source",
2635 "UDP echo source is: %s\n"
2636 "IPv4 address usable as echo source: %s\n"
2637 "IPv6 address usable as echo source: %s" %
2638 (self.loopback0.name, echo_ip4, echo_ip6))
2639 cli_del = "bfd udp echo-source del"
2640 self.cli_verify_no_response(cli_del)
2641 self.cli_verify_response("show bfd echo-source",
2642 "UDP echo source is not set.")
2644 if __name__ == '__main__':
2645 unittest.main(testRunner=VppTestRunner)