4 from __future__ import division
7 from collections import namedtuple
13 from random import randint, shuffle, getrandbits
14 from socket import AF_INET, AF_INET6, inet_ntop
15 from struct import pack, unpack
18 from scapy.layers.inet import UDP, IP
19 from scapy.layers.inet6 import IPv6
20 from scapy.layers.l2 import Ether, GRE
21 from scapy.packet import Raw
23 from config import config
33 from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11
34 from framework import is_distro_ubuntu2204, is_distro_debian11
35 from framework import VppTestCase, VppTestRunner
36 from framework import tag_run_solo
38 from vpp_ip import DpoProto
39 from vpp_ip_route import VppIpRoute, VppRoutePath
40 from vpp_lo_interface import VppLoInterface
41 from vpp_papi_provider import UnexpectedApiReturnValueError, CliFailedCommandError
42 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
43 from vpp_gre_interface import VppGreInterface
44 from vpp_papi import VppEnum
49 class AuthKeyFactory(object):
50 """Factory class for creating auth keys with unique conf key ID"""
53 self._conf_key_ids = {}
55 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
56 """create a random key with unique conf key id"""
57 conf_key_id = randint(0, 0xFFFFFFFF)
58 while conf_key_id in self._conf_key_ids:
59 conf_key_id = randint(0, 0xFFFFFFFF)
60 self._conf_key_ids[conf_key_id] = 1
61 key = scapy.compat.raw(
62 bytearray([randint(0, 255) for _ in range(randint(1, 20))])
65 test=test, auth_type=auth_type, conf_key_id=conf_key_id, key=key
69 class BFDAPITestCase(VppTestCase):
70 """Bidirectional Forwarding Detection (BFD) - API"""
77 super(BFDAPITestCase, cls).setUpClass()
78 cls.vapi.cli("set log class bfd level debug")
80 cls.create_pg_interfaces(range(2))
81 for i in cls.pg_interfaces:
87 super(BFDAPITestCase, cls).tearDownClass()
91 def tearDownClass(cls):
92 super(BFDAPITestCase, cls).tearDownClass()
95 super(BFDAPITestCase, self).setUp()
96 self.factory = AuthKeyFactory()
98 def test_add_bfd(self):
99 """create a BFD session"""
100 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
101 session.add_vpp_config()
102 self.logger.debug("Session state is %s", session.state)
103 session.remove_vpp_config()
104 session.add_vpp_config()
105 self.logger.debug("Session state is %s", session.state)
106 session.remove_vpp_config()
108 def test_double_add(self):
109 """create the same BFD session twice (negative case)"""
110 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
111 session.add_vpp_config()
113 with self.vapi.assert_negative_api_retval():
114 session.add_vpp_config()
116 session.remove_vpp_config()
118 def test_add_bfd6(self):
119 """create IPv6 BFD session"""
120 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
121 session.add_vpp_config()
122 self.logger.debug("Session state is %s", session.state)
123 session.remove_vpp_config()
124 session.add_vpp_config()
125 self.logger.debug("Session state is %s", session.state)
126 session.remove_vpp_config()
128 def test_mod_bfd(self):
129 """modify BFD session parameters"""
130 session = VppBFDUDPSession(
134 desired_min_tx=50000,
135 required_min_rx=10000,
138 session.add_vpp_config()
139 s = session.get_bfd_udp_session_dump_entry()
141 session.desired_min_tx, s.desired_min_tx, "desired min transmit interval"
144 session.required_min_rx, s.required_min_rx, "required min receive interval"
146 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
147 session.modify_parameters(
148 desired_min_tx=session.desired_min_tx * 2,
149 required_min_rx=session.required_min_rx * 2,
150 detect_mult=session.detect_mult * 2,
152 s = session.get_bfd_udp_session_dump_entry()
154 session.desired_min_tx, s.desired_min_tx, "desired min transmit interval"
157 session.required_min_rx, s.required_min_rx, "required min receive interval"
159 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
161 def test_upd_bfd(self):
162 """Create/Modify w/ Update BFD session parameters"""
163 session = VppBFDUDPSession(
167 desired_min_tx=50000,
168 required_min_rx=10000,
171 session.upd_vpp_config()
172 s = session.get_bfd_udp_session_dump_entry()
174 session.desired_min_tx, s.desired_min_tx, "desired min transmit interval"
177 session.required_min_rx, s.required_min_rx, "required min receive interval"
180 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
181 session.upd_vpp_config(
182 desired_min_tx=session.desired_min_tx * 2,
183 required_min_rx=session.required_min_rx * 2,
184 detect_mult=session.detect_mult * 2,
186 s = session.get_bfd_udp_session_dump_entry()
188 session.desired_min_tx, s.desired_min_tx, "desired min transmit interval"
191 session.required_min_rx, s.required_min_rx, "required min receive interval"
193 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
195 def test_add_sha1_keys(self):
198 keys = [self.factory.create_random_key(self) for i in range(0, key_count)]
200 self.assertFalse(key.query_vpp_config())
204 self.assertTrue(key.query_vpp_config())
206 indexes = list(range(key_count))
211 key.remove_vpp_config()
213 for j in range(key_count):
216 self.assertFalse(key.query_vpp_config())
218 self.assertTrue(key.query_vpp_config())
219 # should be removed now
221 self.assertFalse(key.query_vpp_config())
222 # add back and remove again
226 self.assertTrue(key.query_vpp_config())
228 key.remove_vpp_config()
230 self.assertFalse(key.query_vpp_config())
232 def test_add_bfd_sha1(self):
233 """create a BFD session (SHA1)"""
234 key = self.factory.create_random_key(self)
236 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
237 session.add_vpp_config()
238 self.logger.debug("Session state is %s", session.state)
239 session.remove_vpp_config()
240 session.add_vpp_config()
241 self.logger.debug("Session state is %s", session.state)
242 session.remove_vpp_config()
244 def test_double_add_sha1(self):
245 """create the same BFD session twice (negative case) (SHA1)"""
246 key = self.factory.create_random_key(self)
248 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
249 session.add_vpp_config()
250 with self.assertRaises(Exception):
251 session.add_vpp_config()
253 def test_add_auth_nonexistent_key(self):
254 """create BFD session using non-existent SHA1 (negative case)"""
255 session = VppBFDUDPSession(
259 sha1_key=self.factory.create_random_key(self),
261 with self.assertRaises(Exception):
262 session.add_vpp_config()
264 def test_shared_sha1_key(self):
265 """single SHA1 key shared by multiple BFD sessions"""
266 key = self.factory.create_random_key(self)
269 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key),
271 self, self.pg0, self.pg0.remote_ip6, sha1_key=key, af=AF_INET6
273 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4, sha1_key=key),
275 self, self.pg1, self.pg1.remote_ip6, sha1_key=key, af=AF_INET6
282 e = key.get_bfd_auth_keys_dump_entry()
284 e.use_count, len(sessions) - removed, "Use count for shared key"
286 s.remove_vpp_config()
288 e = key.get_bfd_auth_keys_dump_entry()
290 e.use_count, len(sessions) - removed, "Use count for shared key"
293 def test_activate_auth(self):
294 """activate SHA1 authentication"""
295 key = self.factory.create_random_key(self)
297 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
298 session.add_vpp_config()
299 session.activate_auth(key)
301 def test_deactivate_auth(self):
302 """deactivate SHA1 authentication"""
303 key = self.factory.create_random_key(self)
305 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
306 session.add_vpp_config()
307 session.activate_auth(key)
308 session.deactivate_auth()
310 def test_change_key(self):
311 """change SHA1 key"""
312 key1 = self.factory.create_random_key(self)
313 key2 = self.factory.create_random_key(self)
314 while key2.conf_key_id == key1.conf_key_id:
315 key2 = self.factory.create_random_key(self)
316 key1.add_vpp_config()
317 key2.add_vpp_config()
318 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key1)
319 session.add_vpp_config()
320 session.activate_auth(key2)
322 def test_set_del_udp_echo_source(self):
323 """set/del udp echo source"""
324 self.create_loopback_interfaces(1)
325 self.loopback0 = self.lo_interfaces[0]
326 self.loopback0.admin_up()
327 echo_source = self.vapi.bfd_udp_get_echo_source()
328 self.assertFalse(echo_source.is_set)
329 self.assertFalse(echo_source.have_usable_ip4)
330 self.assertFalse(echo_source.have_usable_ip6)
332 self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
333 echo_source = self.vapi.bfd_udp_get_echo_source()
334 self.assertTrue(echo_source.is_set)
335 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
336 self.assertFalse(echo_source.have_usable_ip4)
337 self.assertFalse(echo_source.have_usable_ip6)
339 self.loopback0.config_ip4()
340 echo_ip4 = ipaddress.IPv4Address(
341 int(ipaddress.IPv4Address(self.loopback0.local_ip4)) ^ 1
343 echo_source = self.vapi.bfd_udp_get_echo_source()
344 self.assertTrue(echo_source.is_set)
345 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
346 self.assertTrue(echo_source.have_usable_ip4)
347 self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
348 self.assertFalse(echo_source.have_usable_ip6)
350 self.loopback0.config_ip6()
351 echo_ip6 = ipaddress.IPv6Address(
352 int(ipaddress.IPv6Address(self.loopback0.local_ip6)) ^ 1
355 echo_source = self.vapi.bfd_udp_get_echo_source()
356 self.assertTrue(echo_source.is_set)
357 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
358 self.assertTrue(echo_source.have_usable_ip4)
359 self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
360 self.assertTrue(echo_source.have_usable_ip6)
361 self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
363 self.vapi.bfd_udp_del_echo_source()
364 echo_source = self.vapi.bfd_udp_get_echo_source()
365 self.assertFalse(echo_source.is_set)
366 self.assertFalse(echo_source.have_usable_ip4)
367 self.assertFalse(echo_source.have_usable_ip6)
370 class BFDTestSession(object):
371 """BFD session as seen from test framework side"""
387 self.sha1_key = sha1_key
388 self.bfd_key_id = bfd_key_id
389 self.interface = interface
391 self.phy_interface = phy_interface
393 self.phy_interface = self.interface
394 self.udp_sport = randint(49152, 65535)
395 if our_seq_number is None:
396 self.our_seq_number = randint(0, 40000000)
398 self.our_seq_number = our_seq_number
399 self.vpp_seq_number = None
400 self.my_discriminator = 0
401 self.desired_min_tx = 300000
402 self.required_min_rx = 300000
403 self.required_min_echo_rx = None
404 self.detect_mult = detect_mult
405 self.diag = BFDDiagCode.no_diagnostic
406 self.your_discriminator = None
407 self.state = BFDState.down
408 self.auth_type = BFDAuthType.no_auth
409 self.tunnel_header = tunnel_header
412 self.tx_packets_echo = 0
413 self.rx_packets_echo = 0
415 def inc_seq_num(self):
416 """increment sequence number, wrapping if needed"""
417 if self.our_seq_number == 0xFFFFFFFF:
418 self.our_seq_number = 0
420 self.our_seq_number += 1
424 my_discriminator=None,
425 your_discriminator=None,
427 required_min_rx=None,
428 required_min_echo_rx=None,
434 """update BFD parameters associated with session"""
435 if my_discriminator is not None:
436 self.my_discriminator = my_discriminator
437 if your_discriminator is not None:
438 self.your_discriminator = your_discriminator
439 if required_min_rx is not None:
440 self.required_min_rx = required_min_rx
441 if required_min_echo_rx is not None:
442 self.required_min_echo_rx = required_min_echo_rx
443 if desired_min_tx is not None:
444 self.desired_min_tx = desired_min_tx
445 if detect_mult is not None:
446 self.detect_mult = detect_mult
449 if state is not None:
451 if auth_type is not None:
452 self.auth_type = auth_type
454 def fill_packet_fields(self, packet):
455 """set packet fields with known values in packet"""
457 if self.my_discriminator:
458 self.test.logger.debug(
459 "BFD: setting packet.my_discriminator=%s", self.my_discriminator
461 bfd.my_discriminator = self.my_discriminator
462 if self.your_discriminator:
463 self.test.logger.debug(
464 "BFD: setting packet.your_discriminator=%s", self.your_discriminator
466 bfd.your_discriminator = self.your_discriminator
467 if self.required_min_rx:
468 self.test.logger.debug(
469 "BFD: setting packet.required_min_rx_interval=%s", self.required_min_rx
471 bfd.required_min_rx_interval = self.required_min_rx
472 if self.required_min_echo_rx:
473 self.test.logger.debug(
474 "BFD: setting packet.required_min_echo_rx=%s", self.required_min_echo_rx
476 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
477 if self.desired_min_tx:
478 self.test.logger.debug(
479 "BFD: setting packet.desired_min_tx_interval=%s", self.desired_min_tx
481 bfd.desired_min_tx_interval = self.desired_min_tx
483 self.test.logger.debug(
484 "BFD: setting packet.detect_mult=%s", self.detect_mult
486 bfd.detect_mult = self.detect_mult
488 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
491 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
492 bfd.state = self.state
494 # this is used by a negative test-case
495 self.test.logger.debug("BFD: setting packet.auth_type=%s", self.auth_type)
496 bfd.auth_type = self.auth_type
498 def create_packet(self):
499 """create a BFD packet, reflecting the current state of session"""
502 bfd.auth_type = self.sha1_key.auth_type
503 bfd.auth_len = BFD.sha1_auth_len
504 bfd.auth_key_id = self.bfd_key_id
505 bfd.auth_seq_num = self.our_seq_number
506 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
510 src=self.phy_interface.remote_mac, dst=self.phy_interface.local_mac
512 if self.tunnel_header:
513 packet = packet / self.tunnel_header
514 if self.af == AF_INET6:
518 src=self.interface.remote_ip6,
519 dst=self.interface.local_ip6,
522 / UDP(sport=self.udp_sport, dport=BFD.udp_dport)
529 src=self.interface.remote_ip4, dst=self.interface.local_ip4, ttl=255
531 / UDP(sport=self.udp_sport, dport=BFD.udp_dport)
534 self.test.logger.debug("BFD: Creating packet")
535 self.fill_packet_fields(packet)
538 scapy.compat.raw(packet[BFD])[:32]
540 + b"\0" * (20 - len(self.sha1_key.key))
542 self.test.logger.debug(
543 "BFD: Calculated SHA1 hash: %s"
544 % hashlib.sha1(hash_material).hexdigest()
546 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
549 def send_packet(self, packet=None, interface=None):
550 """send packet on interface, creating the packet if needed"""
552 packet = self.create_packet()
553 if interface is None:
554 interface = self.phy_interface
555 self.test.logger.debug(ppp("Sending packet:", packet))
556 interface.add_stream(packet)
560 def verify_sha1_auth(self, packet):
561 """Verify correctness of authentication in BFD layer."""
563 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
564 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type, BFDAuthType)
565 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
566 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
567 if self.vpp_seq_number is None:
568 self.vpp_seq_number = bfd.auth_seq_num
569 self.test.logger.debug(
570 "Received initial sequence number: %s" % self.vpp_seq_number
573 recvd_seq_num = bfd.auth_seq_num
574 self.test.logger.debug(
575 "Received followup sequence number: %s" % recvd_seq_num
577 if self.vpp_seq_number < 0xFFFFFFFF:
578 if self.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1:
579 self.test.assert_equal(
580 recvd_seq_num, self.vpp_seq_number + 1, "BFD sequence number"
583 self.test.assert_in_range(
586 self.vpp_seq_number + 1,
587 "BFD sequence number",
590 if self.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1:
591 self.test.assert_equal(recvd_seq_num, 0, "BFD sequence number")
595 (self.vpp_seq_number, 0),
596 "BFD sequence number not one of "
597 "(%s, 0)" % self.vpp_seq_number,
599 self.vpp_seq_number = recvd_seq_num
600 # last 20 bytes represent the hash - so replace them with the key,
601 # pad the result with zeros and hash the result
605 + b"\0" * (20 - len(self.sha1_key.key))
607 expected_hash = hashlib.sha1(hash_material).hexdigest()
608 self.test.assert_equal(
609 binascii.hexlify(bfd.auth_key_hash), expected_hash.encode(), "Auth key hash"
612 def verify_bfd(self, packet):
613 """Verify correctness of BFD layer."""
615 self.test.assert_equal(bfd.version, 1, "BFD version")
616 self.test.assert_equal(
617 bfd.your_discriminator, self.my_discriminator, "BFD - your discriminator"
620 self.verify_sha1_auth(packet)
623 def bfd_session_up(test):
624 """Bring BFD session up"""
625 test.logger.info("BFD: Waiting for slow hello")
626 p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
628 if hasattr(test, "vpp_clock_offset"):
629 old_offset = test.vpp_clock_offset
630 test.vpp_clock_offset = time.time() - float(p.time)
631 test.logger.debug("BFD: Calculated vpp clock offset: %s", test.vpp_clock_offset)
633 test.assertAlmostEqual(
635 test.vpp_clock_offset,
637 msg="vpp clock offset not stable (new: %s, old: %s)"
638 % (test.vpp_clock_offset, old_offset),
640 test.logger.info("BFD: Sending Init")
641 test.test_session.update(
642 my_discriminator=randint(0, 40000000),
643 your_discriminator=p[BFD].my_discriminator,
647 test.test_session.sha1_key
648 and test.test_session.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1
650 test.test_session.inc_seq_num()
651 test.test_session.send_packet()
652 test.logger.info("BFD: Waiting for event")
653 e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
654 verify_event(test, e, expected_state=BFDState.up)
655 test.logger.info("BFD: Session is Up")
656 test.test_session.update(state=BFDState.up)
658 test.test_session.sha1_key
659 and test.test_session.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1
661 test.test_session.inc_seq_num()
662 test.test_session.send_packet()
663 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
666 def bfd_session_down(test):
667 """Bring BFD session down"""
668 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
669 test.test_session.update(state=BFDState.down)
671 test.test_session.sha1_key
672 and test.test_session.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1
674 test.test_session.inc_seq_num()
675 test.test_session.send_packet()
676 test.logger.info("BFD: Waiting for event")
677 e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
678 verify_event(test, e, expected_state=BFDState.down)
679 test.logger.info("BFD: Session is Down")
680 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
683 def verify_bfd_session_config(test, session, state=None):
684 dump = session.get_bfd_udp_session_dump_entry()
685 test.assertIsNotNone(dump)
686 # since dump is not none, we have verified that sw_if_index and addresses
687 # are valid (in get_bfd_udp_session_dump_entry)
689 test.assert_equal(dump.state, state, "session state")
691 dump.required_min_rx, session.required_min_rx, "required min rx interval"
694 dump.desired_min_tx, session.desired_min_tx, "desired min tx interval"
696 test.assert_equal(dump.detect_mult, session.detect_mult, "detect multiplier")
697 if session.sha1_key is None:
698 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
700 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
701 test.assert_equal(dump.bfd_key_id, session.bfd_key_id, "bfd key id")
703 dump.conf_key_id, session.sha1_key.conf_key_id, "config key id"
707 def verify_ip(test, packet):
708 """Verify correctness of IP layer."""
709 if test.vpp_session.af == AF_INET6:
711 local_ip = test.vpp_session.interface.local_ip6
712 remote_ip = test.vpp_session.interface.remote_ip6
713 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
716 local_ip = test.vpp_session.interface.local_ip4
717 remote_ip = test.vpp_session.interface.remote_ip4
718 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
719 test.assert_equal(ip.src, local_ip, "IP source address")
720 test.assert_equal(ip.dst, remote_ip, "IP destination address")
723 def verify_udp(test, packet):
724 """Verify correctness of UDP layer."""
726 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
727 test.assert_in_range(
728 udp.sport, BFD.udp_sport_min, BFD.udp_sport_max, "UDP source port"
732 def verify_event(test, event, expected_state):
733 """Verify correctness of event values."""
735 test.logger.debug("BFD: Event: %s" % reprlib.repr(e))
737 e.sw_if_index, test.vpp_session.interface.sw_if_index, "BFD interface index"
741 str(e.local_addr), test.vpp_session.local_addr, "Local IPv6 address"
743 test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr, "Peer IPv6 address")
744 test.assert_equal(e.state, expected_state, BFDState)
747 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
748 """wait for BFD packet and verify its correctness
750 :param timeout: how long to wait
751 :param pcap_time_min: ignore packets with pcap timestamp lower than this
753 :returns: tuple (packet, time spent waiting for packet)
755 test.logger.info("BFD: Waiting for BFD packet")
756 deadline = time.time() + timeout
761 test.assert_in_range(counter, 0, 100, "number of packets ignored")
762 time_left = deadline - time.time()
764 raise CaptureTimeoutError("Packet did not arrive within timeout")
765 p = test.pg0.wait_for_packet(timeout=time_left)
766 test.test_session.rx_packets += 1
767 test.logger.debug(ppp("BFD: Got packet:", p))
768 if pcap_time_min is not None and p.time < pcap_time_min:
771 "BFD: ignoring packet (pcap time %s < "
772 "pcap time min %s):" % (p.time, pcap_time_min),
778 test.logger.debug(test.vapi.ppcli("show trace"))
780 # strip an IP layer and move to the next
785 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
787 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
790 test.test_session.verify_bfd(p)
794 BFDStats = namedtuple("BFDStats", "rx rx_echo tx tx_echo")
797 def bfd_grab_stats_snapshot(test, bs_idx=0, thread_index=None):
801 rx = s["/bfd/rx-session-counters"][:, bs_idx].sum_packets()
802 rx_echo = s["/bfd/rx-session-echo-counters"][:, bs_idx].sum_packets()
803 tx = s["/bfd/tx-session-counters"][:, bs_idx].sum_packets()
804 tx_echo = s["/bfd/tx-session-echo-counters"][:, bs_idx].sum_packets()
806 rx = s["/bfd/rx-session-counters"][ti, bs_idx].sum_packets()
807 rx_echo = s["/bfd/rx-session-echo-counters"][ti, bs_idx].sum_packets()
808 tx = s["/bfd/tx-session-counters"][ti, bs_idx].sum_packets()
809 tx_echo = s["/bfd/tx-session-echo-counters"][ti, bs_idx].sum_packets()
810 return BFDStats(rx, rx_echo, tx, tx_echo)
813 def bfd_stats_diff(stats_before, stats_after):
814 rx = stats_after.rx - stats_before.rx
815 rx_echo = stats_after.rx_echo - stats_before.rx_echo
816 tx = stats_after.tx - stats_before.tx
817 tx_echo = stats_after.tx_echo - stats_before.tx_echo
818 return BFDStats(rx, rx_echo, tx, tx_echo)
822 @tag_fixme_ubuntu2204
824 class BFD4TestCase(VppTestCase):
825 """Bidirectional Forwarding Detection (BFD)"""
828 vpp_clock_offset = None
834 if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
838 super(BFD4TestCase, cls).setUpClass()
839 cls.vapi.cli("set log class bfd level debug")
841 cls.create_pg_interfaces([0])
842 cls.create_loopback_interfaces(1)
843 cls.loopback0 = cls.lo_interfaces[0]
844 cls.loopback0.config_ip4()
845 cls.loopback0.admin_up()
847 cls.pg0.configure_ipv4_neighbors()
849 cls.pg0.resolve_arp()
852 super(BFD4TestCase, cls).tearDownClass()
856 def tearDownClass(cls):
857 super(BFD4TestCase, cls).tearDownClass()
860 super(BFD4TestCase, self).setUp()
861 self.factory = AuthKeyFactory()
862 self.vapi.want_bfd_events()
863 self.pg0.enable_capture()
865 self.bfd_udp4_sessions = self.statistics["/bfd/udp4/sessions"]
866 self.bfd_udp6_sessions = self.statistics["/bfd/udp6/sessions"]
867 self.vapi.cli("trace add bfd-process 500")
868 self.vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
869 self.vpp_session.add_vpp_config()
870 self.vpp_session.admin_up()
871 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
872 except BaseException:
873 self.vapi.want_bfd_events(enable_disable=0)
877 if not self.vpp_dead:
878 self.vapi.want_bfd_events(enable_disable=0)
879 self.vapi.collect_events() # clear the event queue
880 super(BFD4TestCase, self).tearDown()
882 def test_session_up(self):
883 """bring BFD session up"""
885 bfd_udp4_sessions = self.statistics["/bfd/udp4/sessions"]
886 bfd_udp6_sessions = self.statistics["/bfd/udp6/sessions"]
887 self.assert_equal(bfd_udp4_sessions - self.bfd_udp4_sessions, 1)
888 self.assert_equal(bfd_udp6_sessions, self.bfd_udp6_sessions)
890 def test_session_up_by_ip(self):
891 """bring BFD session up - first frame looked up by address pair"""
892 self.logger.info("BFD: Sending Slow control frame")
893 self.test_session.update(my_discriminator=randint(0, 40000000))
894 self.test_session.send_packet()
895 self.pg0.enable_capture()
896 p = self.pg0.wait_for_packet(1)
898 p[BFD].your_discriminator,
899 self.test_session.my_discriminator,
900 "BFD - your discriminator",
902 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
903 self.test_session.update(
904 your_discriminator=p[BFD].my_discriminator, state=BFDState.up
906 self.logger.info("BFD: Waiting for event")
907 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
908 verify_event(self, e, expected_state=BFDState.init)
909 self.logger.info("BFD: Sending Up")
910 self.test_session.send_packet()
911 self.logger.info("BFD: Waiting for event")
912 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
913 verify_event(self, e, expected_state=BFDState.up)
914 self.logger.info("BFD: Session is Up")
915 self.test_session.update(state=BFDState.up)
916 self.test_session.send_packet()
917 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
919 def test_session_down(self):
920 """bring BFD session down"""
922 bfd_session_down(self)
924 def test_hold_up(self):
925 """hold BFD session up"""
927 for dummy in range(self.test_session.detect_mult * 2):
928 wait_for_bfd_packet(self)
929 self.test_session.send_packet()
930 self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
932 def test_slow_timer(self):
933 """verify slow periodic control frames while session down"""
935 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
936 prev_packet = wait_for_bfd_packet(self, 2)
937 for dummy in range(packet_count):
938 next_packet = wait_for_bfd_packet(self, 2)
939 time_diff = next_packet.time - prev_packet.time
940 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
941 # to work around timing issues
942 self.assert_in_range(time_diff, 0.70, 1.05, "time between slow packets")
943 prev_packet = next_packet
945 def test_zero_remote_min_rx(self):
946 """no packets when zero remote required min rx interval"""
948 self.test_session.update(required_min_rx=0)
949 self.test_session.send_packet()
950 for dummy in range(self.test_session.detect_mult):
952 self.vpp_session.required_min_rx / USEC_IN_SEC,
953 "sleep before transmitting bfd packet",
955 self.test_session.send_packet()
957 p = wait_for_bfd_packet(self, timeout=0)
958 self.logger.error(ppp("Received unexpected packet:", p))
959 except CaptureTimeoutError:
961 self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
962 self.test_session.update(required_min_rx=300000)
963 for dummy in range(3):
964 self.test_session.send_packet()
966 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC
968 self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
970 def test_conn_down(self):
971 """verify session goes down after inactivity"""
974 self.test_session.detect_mult
975 * self.vpp_session.required_min_rx
978 self.sleep(detection_time, "waiting for BFD session time-out")
979 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
980 verify_event(self, e, expected_state=BFDState.down)
982 def test_peer_discr_reset_sess_down(self):
983 """peer discriminator reset after session goes down"""
986 self.test_session.detect_mult
987 * self.vpp_session.required_min_rx
990 self.sleep(detection_time, "waiting for BFD session time-out")
991 self.test_session.my_discriminator = 0
992 wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
994 def test_large_required_min_rx(self):
995 """large remote required min rx interval"""
997 p = wait_for_bfd_packet(self)
999 self.test_session.update(required_min_rx=interval)
1000 self.test_session.send_packet()
1001 time_mark = time.time()
1003 # busy wait here, trying to collect a packet or event, vpp is not
1004 # allowed to send packets and the session will timeout first - so the
1005 # Up->Down event must arrive before any packets do
1006 while time.time() < time_mark + interval / USEC_IN_SEC:
1008 p = wait_for_bfd_packet(self, timeout=0)
1009 # if vpp managed to send a packet before we did the session
1010 # session update, then that's fine, ignore it
1011 if p.time < time_mark - self.vpp_clock_offset:
1013 self.logger.error(ppp("Received unexpected packet:", p))
1015 except CaptureTimeoutError:
1017 events = self.vapi.collect_events()
1019 verify_event(self, events[0], BFDState.down)
1021 self.assert_equal(count, 0, "number of packets received")
1023 def test_immediate_remote_min_rx_reduction(self):
1024 """immediately honor remote required min rx reduction"""
1025 self.vpp_session.remove_vpp_config()
1026 self.vpp_session = VppBFDUDPSession(
1027 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000
1029 self.pg0.enable_capture()
1030 self.vpp_session.add_vpp_config()
1031 self.test_session.update(desired_min_tx=1000000, required_min_rx=1000000)
1032 bfd_session_up(self)
1033 reference_packet = wait_for_bfd_packet(self)
1034 time_mark = time.time()
1036 self.test_session.update(required_min_rx=interval)
1037 self.test_session.send_packet()
1038 extra_time = time.time() - time_mark
1039 p = wait_for_bfd_packet(self)
1040 # first packet is allowed to be late by time we spent doing the update
1041 # calculated in extra_time
1042 self.assert_in_range(
1043 p.time - reference_packet.time,
1044 0.95 * 0.75 * interval / USEC_IN_SEC,
1045 1.05 * interval / USEC_IN_SEC + extra_time,
1046 "time between BFD packets",
1048 reference_packet = p
1049 for dummy in range(3):
1050 p = wait_for_bfd_packet(self)
1051 diff = p.time - reference_packet.time
1052 self.assert_in_range(
1054 0.95 * 0.75 * interval / USEC_IN_SEC,
1055 1.05 * interval / USEC_IN_SEC,
1056 "time between BFD packets",
1058 reference_packet = p
1060 def test_modify_req_min_rx_double(self):
1061 """modify session - double required min rx"""
1062 bfd_session_up(self)
1063 p = wait_for_bfd_packet(self)
1064 self.test_session.update(desired_min_tx=10000, required_min_rx=10000)
1065 self.test_session.send_packet()
1066 # double required min rx
1067 self.vpp_session.modify_parameters(
1068 required_min_rx=2 * self.vpp_session.required_min_rx
1070 p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1071 # poll bit needs to be set
1072 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet")
1073 # finish poll sequence with final packet
1074 final = self.test_session.create_packet()
1075 final[BFD].flags = "F"
1077 self.test_session.detect_mult
1078 * max(self.test_session.desired_min_tx, self.vpp_session.required_min_rx)
1081 self.test_session.send_packet(final)
1082 time_mark = time.time()
1083 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_event")
1084 verify_event(self, e, expected_state=BFDState.down)
1085 time_to_event = time.time() - time_mark
1086 self.assert_in_range(
1087 time_to_event, 0.9 * timeout, 1.1 * timeout, "session timeout"
1090 def test_modify_req_min_rx_halve(self):
1091 """modify session - halve required min rx"""
1092 self.vpp_session.modify_parameters(
1093 required_min_rx=2 * self.vpp_session.required_min_rx
1095 bfd_session_up(self)
1096 p = wait_for_bfd_packet(self)
1097 self.test_session.update(desired_min_tx=10000, required_min_rx=10000)
1098 self.test_session.send_packet()
1099 p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1100 # halve required min rx
1101 old_required_min_rx = self.vpp_session.required_min_rx
1102 self.vpp_session.modify_parameters(
1103 required_min_rx=self.vpp_session.required_min_rx // 2
1105 # now we wait 0.8*3*old-req-min-rx and the session should still be up
1107 0.8 * self.vpp_session.detect_mult * old_required_min_rx / USEC_IN_SEC,
1108 "wait before finishing poll sequence",
1110 self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
1111 p = wait_for_bfd_packet(self)
1112 # poll bit needs to be set
1113 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet")
1114 # finish poll sequence with final packet
1115 final = self.test_session.create_packet()
1116 final[BFD].flags = "F"
1117 self.test_session.send_packet(final)
1118 # now the session should time out under new conditions
1120 self.test_session.detect_mult
1121 * self.vpp_session.required_min_rx
1124 before = time.time()
1125 e = self.vapi.wait_for_event(2 * detection_time, "bfd_udp_session_event")
1127 self.assert_in_range(
1129 0.9 * detection_time,
1130 1.1 * detection_time,
1131 "time before bfd session goes down",
1133 verify_event(self, e, expected_state=BFDState.down)
1135 def test_modify_detect_mult(self):
1136 """modify detect multiplier"""
1137 bfd_session_up(self)
1138 p = wait_for_bfd_packet(self)
1139 self.vpp_session.modify_parameters(detect_mult=1)
1140 p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1142 self.vpp_session.detect_mult, p[BFD].detect_mult, "detect mult"
1144 # poll bit must not be set
1146 "P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet"
1148 self.vpp_session.modify_parameters(detect_mult=10)
1149 p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1151 self.vpp_session.detect_mult, p[BFD].detect_mult, "detect mult"
1153 # poll bit must not be set
1155 "P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet"
1158 def test_queued_poll(self):
1159 """test poll sequence queueing"""
1160 bfd_session_up(self)
1161 p = wait_for_bfd_packet(self)
1162 self.vpp_session.modify_parameters(
1163 required_min_rx=2 * self.vpp_session.required_min_rx
1165 p = wait_for_bfd_packet(self)
1166 poll_sequence_start = time.time()
1167 poll_sequence_length_min = 0.5
1168 send_final_after = time.time() + poll_sequence_length_min
1169 # poll bit needs to be set
1170 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet")
1172 p[BFD].required_min_rx_interval,
1173 self.vpp_session.required_min_rx,
1174 "BFD required min rx interval",
1176 self.vpp_session.modify_parameters(
1177 required_min_rx=2 * self.vpp_session.required_min_rx
1179 # 2nd poll sequence should be queued now
1180 # don't send the reply back yet, wait for some time to emulate
1181 # longer round-trip time
1183 while time.time() < send_final_after:
1184 self.test_session.send_packet()
1185 p = wait_for_bfd_packet(self)
1187 len(self.vapi.collect_events()), 0, "number of bfd events"
1190 p[BFD].required_min_rx_interval,
1191 self.vpp_session.required_min_rx,
1192 "BFD required min rx interval",
1195 # poll bit must be set
1197 "P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet"
1199 final = self.test_session.create_packet()
1200 final[BFD].flags = "F"
1201 self.test_session.send_packet(final)
1202 # finish 1st with final
1203 poll_sequence_length = time.time() - poll_sequence_start
1204 # vpp must wait for some time before starting new poll sequence
1205 poll_no_2_started = False
1206 for dummy in range(2 * packet_count):
1207 p = wait_for_bfd_packet(self)
1209 len(self.vapi.collect_events()), 0, "number of bfd events"
1211 if "P" in p.sprintf("%BFD.flags%"):
1212 poll_no_2_started = True
1213 if time.time() < poll_sequence_start + poll_sequence_length:
1214 raise Exception("VPP started 2nd poll sequence too soon")
1215 final = self.test_session.create_packet()
1216 final[BFD].flags = "F"
1217 self.test_session.send_packet(final)
1220 self.test_session.send_packet()
1221 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1222 # finish 2nd with final
1223 final = self.test_session.create_packet()
1224 final[BFD].flags = "F"
1225 self.test_session.send_packet(final)
1226 p = wait_for_bfd_packet(self)
1227 # poll bit must not be set
1228 self.assertNotIn("P", p.sprintf("%BFD.flags%"), "Poll bit set in BFD packet")
1230 # returning inconsistent results requiring retries in per-patch tests
1231 @unittest.skipUnless(config.extended, "part of extended tests")
1232 def test_poll_response(self):
1233 """test correct response to control frame with poll bit set"""
1234 bfd_session_up(self)
1235 poll = self.test_session.create_packet()
1236 poll[BFD].flags = "P"
1237 self.test_session.send_packet(poll)
1238 final = wait_for_bfd_packet(
1239 self, pcap_time_min=time.time() - self.vpp_clock_offset
1241 self.assertIn("F", final.sprintf("%BFD.flags%"))
1243 def test_no_periodic_if_remote_demand(self):
1244 """no periodic frames outside poll sequence if remote demand set"""
1245 bfd_session_up(self)
1246 demand = self.test_session.create_packet()
1247 demand[BFD].flags = "D"
1248 self.test_session.send_packet(demand)
1251 * max(self.vpp_session.required_min_rx, self.test_session.desired_min_tx)
1255 for dummy in range(self.test_session.detect_mult * 2):
1256 self.sleep(transmit_time)
1257 self.test_session.send_packet(demand)
1259 p = wait_for_bfd_packet(self, timeout=0)
1260 self.logger.error(ppp("Received unexpected packet:", p))
1262 except CaptureTimeoutError:
1264 events = self.vapi.collect_events()
1266 self.logger.error("Received unexpected event: %s", e)
1267 self.assert_equal(count, 0, "number of packets received")
1268 self.assert_equal(len(events), 0, "number of events received")
1270 def test_echo_looped_back(self):
1271 """echo packets looped back"""
1272 bfd_session_up(self)
1273 stats_before = bfd_grab_stats_snapshot(self)
1274 self.pg0.enable_capture()
1275 echo_packet_count = 10
1276 # random source port low enough to increment a few times..
1277 udp_sport_tx = randint(1, 50000)
1278 udp_sport_rx = udp_sport_tx
1280 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1281 / IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4)
1282 / UDP(dport=BFD.udp_dport_echo)
1283 / Raw("this should be looped back")
1285 for dummy in range(echo_packet_count):
1286 self.sleep(0.01, "delay between echo packets")
1287 echo_packet[UDP].sport = udp_sport_tx
1289 self.logger.debug(ppp("Sending packet:", echo_packet))
1290 self.pg0.add_stream(echo_packet)
1292 self.logger.debug(self.vapi.ppcli("show trace"))
1294 bfd_control_packets_rx = 0
1295 while counter < echo_packet_count:
1296 p = self.pg0.wait_for_packet(1)
1297 self.logger.debug(ppp("Got packet:", p))
1299 self.assert_equal(self.pg0.remote_mac, ether.dst, "Destination MAC")
1300 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1302 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1304 if udp.dport == BFD.udp_dport:
1305 bfd_control_packets_rx += 1
1307 self.assert_equal(self.pg0.remote_ip4, ip.src, "Source IP")
1308 self.assert_equal(udp.dport, BFD.udp_dport_echo, "UDP destination port")
1309 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1311 # need to compare the hex payload here, otherwise BFD_vpp_echo
1314 scapy.compat.raw(p[UDP].payload),
1315 scapy.compat.raw(echo_packet[UDP].payload),
1316 "Received packet is not the echo packet sent",
1322 "UDP source port (== ECHO packet identifier for test purposes)",
1324 stats_after = bfd_grab_stats_snapshot(self)
1325 diff = bfd_stats_diff(stats_before, stats_after)
1326 self.assertEqual(0, diff.rx, "RX counter bumped but no BFD packets sent")
1327 self.assertEqual(bfd_control_packets_rx, diff.tx, "TX counter incorrect")
1329 0, diff.rx_echo, "RX echo counter bumped but no BFD session exists"
1332 0, diff.tx_echo, "TX echo counter bumped but no BFD session exists"
1335 def test_echo(self):
1337 stats_before = bfd_grab_stats_snapshot(self)
1338 bfd_session_up(self)
1339 self.test_session.update(required_min_echo_rx=150000)
1340 self.test_session.send_packet()
1342 self.test_session.detect_mult
1343 * self.vpp_session.required_min_rx
1346 # echo shouldn't work without echo source set
1347 for dummy in range(10):
1348 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1349 self.sleep(sleep, "delay before sending bfd packet")
1350 self.test_session.send_packet()
1351 p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1353 p[BFD].required_min_rx_interval,
1354 self.vpp_session.required_min_rx,
1355 "BFD required min rx interval",
1357 self.test_session.send_packet()
1358 self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1360 # should be turned on - loopback echo packets
1361 for dummy in range(3):
1362 loop_until = time.time() + 0.75 * detection_time
1363 while time.time() < loop_until:
1364 p = self.pg0.wait_for_packet(1)
1365 self.logger.debug(ppp("Got packet:", p))
1366 if p[UDP].dport == BFD.udp_dport_echo:
1367 self.assert_equal(p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1368 self.assertNotEqual(
1370 self.loopback0.local_ip4,
1371 "BFD ECHO src IP equal to loopback IP",
1373 self.logger.debug(ppp("Looping back packet:", p))
1376 self.pg0.remote_mac,
1377 "ECHO packet destination MAC address",
1379 p[Ether].dst = self.pg0.local_mac
1380 self.pg0.add_stream(p)
1381 self.test_session.rx_packets_echo += 1
1382 self.test_session.tx_packets_echo += 1
1385 elif p.haslayer(BFD):
1386 self.test_session.rx_packets += 1
1388 self.assertGreaterEqual(
1389 p[BFD].required_min_rx_interval, 1000000
1391 if "P" in p.sprintf("%BFD.flags%"):
1392 final = self.test_session.create_packet()
1393 final[BFD].flags = "F"
1394 self.test_session.send_packet(final)
1396 raise Exception(ppp("Received unknown packet:", p))
1399 len(self.vapi.collect_events()), 0, "number of bfd events"
1401 self.test_session.send_packet()
1402 self.assertTrue(echo_seen, "No echo packets received")
1404 stats_after = bfd_grab_stats_snapshot(self)
1405 diff = bfd_stats_diff(stats_before, stats_after)
1406 # our rx is vpp tx and vice versa, also tolerate one packet off
1407 self.assert_in_range(
1408 self.test_session.tx_packets, diff.rx - 1, diff.rx + 1, "RX counter"
1410 self.assert_in_range(
1411 self.test_session.rx_packets, diff.tx - 1, diff.tx + 1, "TX counter"
1413 self.assert_in_range(
1414 self.test_session.tx_packets_echo,
1419 self.assert_in_range(
1420 self.test_session.rx_packets_echo,
1426 def test_echo_fail(self):
1427 """session goes down if echo function fails"""
1428 bfd_session_up(self)
1429 self.test_session.update(required_min_echo_rx=150000)
1430 self.test_session.send_packet()
1432 self.test_session.detect_mult
1433 * self.vpp_session.required_min_rx
1436 self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1437 # echo function should be used now, but we will drop the echo packets
1438 verified_diag = False
1439 for dummy in range(3):
1440 loop_until = time.time() + 0.75 * detection_time
1441 while time.time() < loop_until:
1442 p = self.pg0.wait_for_packet(1)
1443 self.logger.debug(ppp("Got packet:", p))
1444 if p[UDP].dport == BFD.udp_dport_echo:
1447 elif p.haslayer(BFD):
1448 if "P" in p.sprintf("%BFD.flags%"):
1449 self.assertGreaterEqual(
1450 p[BFD].required_min_rx_interval, 1000000
1452 final = self.test_session.create_packet()
1453 final[BFD].flags = "F"
1454 self.test_session.send_packet(final)
1455 if p[BFD].state == BFDState.down:
1457 p[BFD].diag, BFDDiagCode.echo_function_failed, BFDDiagCode
1459 verified_diag = True
1461 raise Exception(ppp("Received unknown packet:", p))
1462 self.test_session.send_packet()
1463 events = self.vapi.collect_events()
1464 self.assert_equal(len(events), 1, "number of bfd events")
1465 self.assert_equal(events[0].state, BFDState.down, BFDState)
1466 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1468 def test_echo_stop(self):
1469 """echo function stops if peer sets required min echo rx zero"""
1470 bfd_session_up(self)
1471 self.test_session.update(required_min_echo_rx=150000)
1472 self.test_session.send_packet()
1473 self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1474 # wait for first echo packet
1476 p = self.pg0.wait_for_packet(1)
1477 self.logger.debug(ppp("Got packet:", p))
1478 if p[UDP].dport == BFD.udp_dport_echo:
1479 self.logger.debug(ppp("Looping back packet:", p))
1480 p[Ether].dst = self.pg0.local_mac
1481 self.pg0.add_stream(p)
1484 elif p.haslayer(BFD):
1488 raise Exception(ppp("Received unknown packet:", p))
1489 self.test_session.update(required_min_echo_rx=0)
1490 self.test_session.send_packet()
1491 # echo packets shouldn't arrive anymore
1492 for dummy in range(5):
1493 wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1494 self.test_session.send_packet()
1495 events = self.vapi.collect_events()
1496 self.assert_equal(len(events), 0, "number of bfd events")
1498 def test_echo_source_removed(self):
1499 """echo function stops if echo source is removed"""
1500 bfd_session_up(self)
1501 self.test_session.update(required_min_echo_rx=150000)
1502 self.test_session.send_packet()
1503 self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1504 # wait for first echo packet
1506 p = self.pg0.wait_for_packet(1)
1507 self.logger.debug(ppp("Got packet:", p))
1508 if p[UDP].dport == BFD.udp_dport_echo:
1509 self.logger.debug(ppp("Looping back packet:", p))
1510 p[Ether].dst = self.pg0.local_mac
1511 self.pg0.add_stream(p)
1514 elif p.haslayer(BFD):
1518 raise Exception(ppp("Received unknown packet:", p))
1519 self.vapi.bfd_udp_del_echo_source()
1520 self.test_session.send_packet()
1521 # echo packets shouldn't arrive anymore
1522 for dummy in range(5):
1523 wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1524 self.test_session.send_packet()
1525 events = self.vapi.collect_events()
1526 self.assert_equal(len(events), 0, "number of bfd events")
1528 def test_stale_echo(self):
1529 """stale echo packets don't keep a session up"""
1530 bfd_session_up(self)
1531 self.test_session.update(required_min_echo_rx=150000)
1532 self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1533 self.test_session.send_packet()
1534 # should be turned on - loopback echo packets
1538 for dummy in range(10 * self.vpp_session.detect_mult):
1539 p = self.pg0.wait_for_packet(1)
1540 if p[UDP].dport == BFD.udp_dport_echo:
1541 if echo_packet is None:
1542 self.logger.debug(ppp("Got first echo packet:", p))
1546 + self.vpp_session.detect_mult
1547 * self.test_session.required_min_echo_rx
1551 self.logger.debug(ppp("Got followup echo packet:", p))
1552 self.logger.debug(ppp("Looping back first echo packet:", p))
1553 echo_packet[Ether].dst = self.pg0.local_mac
1554 self.pg0.add_stream(echo_packet)
1556 elif p.haslayer(BFD):
1557 self.logger.debug(ppp("Got packet:", p))
1558 if "P" in p.sprintf("%BFD.flags%"):
1559 final = self.test_session.create_packet()
1560 final[BFD].flags = "F"
1561 self.test_session.send_packet(final)
1562 if p[BFD].state == BFDState.down:
1563 self.assertIsNotNone(
1565 "Session went down before first echo packet received",
1568 self.assertGreaterEqual(
1571 "Session timeout at %s, but is expected at %s"
1572 % (now, timeout_at),
1575 p[BFD].diag, BFDDiagCode.echo_function_failed, BFDDiagCode
1577 events = self.vapi.collect_events()
1578 self.assert_equal(len(events), 1, "number of bfd events")
1579 self.assert_equal(events[0].state, BFDState.down, BFDState)
1583 raise Exception(ppp("Received unknown packet:", p))
1584 self.test_session.send_packet()
1585 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1587 def test_invalid_echo_checksum(self):
1588 """echo packets with invalid checksum don't keep a session up"""
1589 bfd_session_up(self)
1590 self.test_session.update(required_min_echo_rx=150000)
1591 self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1592 self.test_session.send_packet()
1593 # should be turned on - loopback echo packets
1596 for dummy in range(10 * self.vpp_session.detect_mult):
1597 p = self.pg0.wait_for_packet(1)
1598 if p[UDP].dport == BFD.udp_dport_echo:
1599 self.logger.debug(ppp("Got echo packet:", p))
1600 if timeout_at is None:
1603 + self.vpp_session.detect_mult
1604 * self.test_session.required_min_echo_rx
1607 p[BFD_vpp_echo].checksum = getrandbits(64)
1608 p[Ether].dst = self.pg0.local_mac
1609 self.logger.debug(ppp("Looping back modified echo packet:", p))
1610 self.pg0.add_stream(p)
1612 elif p.haslayer(BFD):
1613 self.logger.debug(ppp("Got packet:", p))
1614 if "P" in p.sprintf("%BFD.flags%"):
1615 final = self.test_session.create_packet()
1616 final[BFD].flags = "F"
1617 self.test_session.send_packet(final)
1618 if p[BFD].state == BFDState.down:
1619 self.assertIsNotNone(
1621 "Session went down before first echo packet received",
1624 self.assertGreaterEqual(
1627 "Session timeout at %s, but is expected at %s"
1628 % (now, timeout_at),
1631 p[BFD].diag, BFDDiagCode.echo_function_failed, BFDDiagCode
1633 events = self.vapi.collect_events()
1634 self.assert_equal(len(events), 1, "number of bfd events")
1635 self.assert_equal(events[0].state, BFDState.down, BFDState)
1639 raise Exception(ppp("Received unknown packet:", p))
1640 self.test_session.send_packet()
1641 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1643 def test_admin_up_down(self):
1644 """put session admin-up and admin-down"""
1645 bfd_session_up(self)
1646 self.vpp_session.admin_down()
1647 self.pg0.enable_capture()
1648 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1649 verify_event(self, e, expected_state=BFDState.admin_down)
1650 for dummy in range(2):
1651 p = wait_for_bfd_packet(self)
1652 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1653 # try to bring session up - shouldn't be possible
1654 self.test_session.update(state=BFDState.init)
1655 self.test_session.send_packet()
1656 for dummy in range(2):
1657 p = wait_for_bfd_packet(self)
1658 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1659 self.vpp_session.admin_up()
1660 self.test_session.update(state=BFDState.down)
1661 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1662 verify_event(self, e, expected_state=BFDState.down)
1663 p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1664 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1665 self.test_session.send_packet()
1666 p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1667 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1668 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1669 verify_event(self, e, expected_state=BFDState.init)
1670 self.test_session.update(state=BFDState.up)
1671 self.test_session.send_packet()
1672 p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1673 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1674 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1675 verify_event(self, e, expected_state=BFDState.up)
1677 def test_config_change_remote_demand(self):
1678 """configuration change while peer in demand mode"""
1679 bfd_session_up(self)
1680 demand = self.test_session.create_packet()
1681 demand[BFD].flags = "D"
1682 self.test_session.send_packet(demand)
1683 self.vpp_session.modify_parameters(
1684 required_min_rx=2 * self.vpp_session.required_min_rx
1686 p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1687 # poll bit must be set
1688 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1689 # terminate poll sequence
1690 final = self.test_session.create_packet()
1691 final[BFD].flags = "D+F"
1692 self.test_session.send_packet(final)
1693 # vpp should be quiet now again
1696 * max(self.vpp_session.required_min_rx, self.test_session.desired_min_tx)
1700 for dummy in range(self.test_session.detect_mult * 2):
1701 self.sleep(transmit_time)
1702 self.test_session.send_packet(demand)
1704 p = wait_for_bfd_packet(self, timeout=0)
1705 self.logger.error(ppp("Received unexpected packet:", p))
1707 except CaptureTimeoutError:
1709 events = self.vapi.collect_events()
1711 self.logger.error("Received unexpected event: %s", e)
1712 self.assert_equal(count, 0, "number of packets received")
1713 self.assert_equal(len(events), 0, "number of events received")
1715 def test_intf_deleted(self):
1716 """interface with bfd session deleted"""
1717 intf = VppLoInterface(self)
1720 sw_if_index = intf.sw_if_index
1721 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1722 vpp_session.add_vpp_config()
1723 vpp_session.admin_up()
1724 intf.remove_vpp_config()
1725 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1726 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1727 self.assertFalse(vpp_session.query_vpp_config())
1731 @tag_fixme_vpp_workers
1732 @tag_fixme_ubuntu2204
1733 class BFD6TestCase(VppTestCase):
1734 """Bidirectional Forwarding Detection (BFD) (IPv6)"""
1737 vpp_clock_offset = None
1742 def setUpClass(cls):
1743 if is_distro_ubuntu2204 == True and not hasattr(cls, "vpp"):
1745 super(BFD6TestCase, cls).setUpClass()
1746 cls.vapi.cli("set log class bfd level debug")
1748 cls.create_pg_interfaces([0])
1749 cls.pg0.config_ip6()
1750 cls.pg0.configure_ipv6_neighbors()
1752 cls.pg0.resolve_ndp()
1753 cls.create_loopback_interfaces(1)
1754 cls.loopback0 = cls.lo_interfaces[0]
1755 cls.loopback0.config_ip6()
1756 cls.loopback0.admin_up()
1759 super(BFD6TestCase, cls).tearDownClass()
1763 def tearDownClass(cls):
1764 super(BFD6TestCase, cls).tearDownClass()
1767 super(BFD6TestCase, self).setUp()
1768 if is_distro_ubuntu2204 == True and not hasattr(self, "vpp"):
1770 self.factory = AuthKeyFactory()
1771 self.vapi.want_bfd_events()
1772 self.pg0.enable_capture()
1774 self.bfd_udp4_sessions = self.statistics["/bfd/udp4/sessions"]
1775 self.bfd_udp6_sessions = self.statistics["/bfd/udp6/sessions"]
1776 self.vpp_session = VppBFDUDPSession(
1777 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6
1779 self.vpp_session.add_vpp_config()
1780 self.vpp_session.admin_up()
1781 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1782 self.logger.debug(self.vapi.cli("show adj nbr"))
1783 except BaseException:
1784 self.vapi.want_bfd_events(enable_disable=0)
1788 if not self.vpp_dead:
1789 self.vapi.want_bfd_events(enable_disable=0)
1790 self.vapi.collect_events() # clear the event queue
1791 super(BFD6TestCase, self).tearDown()
1793 def test_session_up(self):
1794 """bring BFD session up"""
1795 bfd_session_up(self)
1796 bfd_udp4_sessions = self.statistics["/bfd/udp4/sessions"]
1797 bfd_udp6_sessions = self.statistics["/bfd/udp6/sessions"]
1798 self.assert_equal(bfd_udp4_sessions, self.bfd_udp4_sessions)
1799 self.assert_equal(bfd_udp6_sessions - self.bfd_udp6_sessions, 1)
1801 def test_session_up_by_ip(self):
1802 """bring BFD session up - first frame looked up by address pair"""
1803 self.logger.info("BFD: Sending Slow control frame")
1804 self.test_session.update(my_discriminator=randint(0, 40000000))
1805 self.test_session.send_packet()
1806 self.pg0.enable_capture()
1807 p = self.pg0.wait_for_packet(1)
1809 p[BFD].your_discriminator,
1810 self.test_session.my_discriminator,
1811 "BFD - your discriminator",
1813 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1814 self.test_session.update(
1815 your_discriminator=p[BFD].my_discriminator, state=BFDState.up
1817 self.logger.info("BFD: Waiting for event")
1818 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1819 verify_event(self, e, expected_state=BFDState.init)
1820 self.logger.info("BFD: Sending Up")
1821 self.test_session.send_packet()
1822 self.logger.info("BFD: Waiting for event")
1823 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1824 verify_event(self, e, expected_state=BFDState.up)
1825 self.logger.info("BFD: Session is Up")
1826 self.test_session.update(state=BFDState.up)
1827 self.test_session.send_packet()
1828 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1830 def test_hold_up(self):
1831 """hold BFD session up"""
1832 bfd_session_up(self)
1833 for dummy in range(self.test_session.detect_mult * 2):
1834 wait_for_bfd_packet(self)
1835 self.test_session.send_packet()
1836 self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
1837 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1839 def test_echo_looped_back(self):
1840 """echo packets looped back"""
1841 bfd_session_up(self)
1842 stats_before = bfd_grab_stats_snapshot(self)
1843 self.pg0.enable_capture()
1844 echo_packet_count = 10
1845 # random source port low enough to increment a few times..
1846 udp_sport_tx = randint(1, 50000)
1847 udp_sport_rx = udp_sport_tx
1849 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1850 / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.remote_ip6)
1851 / UDP(dport=BFD.udp_dport_echo)
1852 / Raw("this should be looped back")
1854 for dummy in range(echo_packet_count):
1855 self.sleep(0.01, "delay between echo packets")
1856 echo_packet[UDP].sport = udp_sport_tx
1858 self.logger.debug(ppp("Sending packet:", echo_packet))
1859 self.pg0.add_stream(echo_packet)
1862 bfd_control_packets_rx = 0
1863 while counter < echo_packet_count:
1864 p = self.pg0.wait_for_packet(1)
1865 self.logger.debug(ppp("Got packet:", p))
1867 self.assert_equal(self.pg0.remote_mac, ether.dst, "Destination MAC")
1868 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1870 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1872 if udp.dport == BFD.udp_dport:
1873 bfd_control_packets_rx += 1
1875 self.assert_equal(self.pg0.remote_ip6, ip.src, "Source IP")
1876 self.assert_equal(udp.dport, BFD.udp_dport_echo, "UDP destination port")
1877 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1879 # need to compare the hex payload here, otherwise BFD_vpp_echo
1882 scapy.compat.raw(p[UDP].payload),
1883 scapy.compat.raw(echo_packet[UDP].payload),
1884 "Received packet is not the echo packet sent",
1890 "UDP source port (== ECHO packet identifier for test purposes)",
1892 stats_after = bfd_grab_stats_snapshot(self)
1893 diff = bfd_stats_diff(stats_before, stats_after)
1894 self.assertEqual(0, diff.rx, "RX counter bumped but no BFD packets sent")
1895 self.assertEqual(bfd_control_packets_rx, diff.tx, "TX counter incorrect")
1897 0, diff.rx_echo, "RX echo counter bumped but no BFD session exists"
1900 0, diff.tx_echo, "TX echo counter bumped but no BFD session exists"
1903 def test_echo(self):
1905 stats_before = bfd_grab_stats_snapshot(self)
1906 bfd_session_up(self)
1907 self.test_session.update(required_min_echo_rx=150000)
1908 self.test_session.send_packet()
1910 self.test_session.detect_mult
1911 * self.vpp_session.required_min_rx
1914 # echo shouldn't work without echo source set
1915 for dummy in range(10):
1916 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1917 self.sleep(sleep, "delay before sending bfd packet")
1918 self.test_session.send_packet()
1919 p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1921 p[BFD].required_min_rx_interval,
1922 self.vpp_session.required_min_rx,
1923 "BFD required min rx interval",
1925 self.test_session.send_packet()
1926 self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1928 # should be turned on - loopback echo packets
1929 for dummy in range(3):
1930 loop_until = time.time() + 0.75 * detection_time
1931 while time.time() < loop_until:
1932 p = self.pg0.wait_for_packet(1)
1933 self.logger.debug(ppp("Got packet:", p))
1934 if p[UDP].dport == BFD.udp_dport_echo:
1936 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP"
1938 self.assertNotEqual(
1940 self.loopback0.local_ip6,
1941 "BFD ECHO src IP equal to loopback IP",
1943 self.logger.debug(ppp("Looping back packet:", p))
1946 self.pg0.remote_mac,
1947 "ECHO packet destination MAC address",
1949 self.test_session.rx_packets_echo += 1
1950 self.test_session.tx_packets_echo += 1
1951 p[Ether].dst = self.pg0.local_mac
1952 self.pg0.add_stream(p)
1955 elif p.haslayer(BFD):
1956 self.test_session.rx_packets += 1
1958 self.assertGreaterEqual(
1959 p[BFD].required_min_rx_interval, 1000000
1961 if "P" in p.sprintf("%BFD.flags%"):
1962 final = self.test_session.create_packet()
1963 final[BFD].flags = "F"
1964 self.test_session.send_packet(final)
1966 raise Exception(ppp("Received unknown packet:", p))
1969 len(self.vapi.collect_events()), 0, "number of bfd events"
1971 self.test_session.send_packet()
1972 self.assertTrue(echo_seen, "No echo packets received")
1974 stats_after = bfd_grab_stats_snapshot(self)
1975 diff = bfd_stats_diff(stats_before, stats_after)
1976 # our rx is vpp tx and vice versa, also tolerate one packet off
1977 self.assert_in_range(
1978 self.test_session.tx_packets, diff.rx - 1, diff.rx + 1, "RX counter"
1980 self.assert_in_range(
1981 self.test_session.rx_packets, diff.tx - 1, diff.tx + 1, "TX counter"
1983 self.assert_in_range(
1984 self.test_session.tx_packets_echo,
1989 self.assert_in_range(
1990 self.test_session.rx_packets_echo,
1996 def test_intf_deleted(self):
1997 """interface with bfd session deleted"""
1998 intf = VppLoInterface(self)
2001 sw_if_index = intf.sw_if_index
2002 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip6, af=AF_INET6)
2003 vpp_session.add_vpp_config()
2004 vpp_session.admin_up()
2005 intf.remove_vpp_config()
2006 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
2007 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
2008 self.assertFalse(vpp_session.query_vpp_config())
2012 class BFDFIBTestCase(VppTestCase):
2013 """BFD-FIB interactions (IPv6)"""
2019 def setUpClass(cls):
2020 super(BFDFIBTestCase, cls).setUpClass()
2023 def tearDownClass(cls):
2024 super(BFDFIBTestCase, cls).tearDownClass()
2027 super(BFDFIBTestCase, self).setUp()
2028 self.create_pg_interfaces(range(1))
2030 self.vapi.want_bfd_events()
2031 self.pg0.enable_capture()
2033 for i in self.pg_interfaces:
2036 i.configure_ipv6_neighbors()
2039 if not self.vpp_dead:
2040 self.vapi.want_bfd_events(enable_disable=False)
2042 super(BFDFIBTestCase, self).tearDown()
2045 def pkt_is_not_data_traffic(p):
2046 """not data traffic implies BFD or the usual IPv6 ND/RA"""
2047 if p.haslayer(BFD) or is_ipv6_misc(p):
2051 def test_session_with_fib(self):
2052 """BFD-FIB interactions"""
2054 # packets to match against both of the routes
2057 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2058 / IPv6(src="3001::1", dst="2001::1")
2059 / UDP(sport=1234, dport=1234)
2060 / Raw(b"\xa5" * 100)
2063 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2064 / IPv6(src="3001::1", dst="2002::1")
2065 / UDP(sport=1234, dport=1234)
2066 / Raw(b"\xa5" * 100)
2070 # A recursive and a non-recursive route via a next-hop that
2071 # will have a BFD session
2072 ip_2001_s_64 = VppIpRoute(
2076 [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
2078 ip_2002_s_64 = VppIpRoute(
2079 self, "2002::", 64, [VppRoutePath(self.pg0.remote_ip6, 0xFFFFFFFF)]
2081 ip_2001_s_64.add_vpp_config()
2082 ip_2002_s_64.add_vpp_config()
2084 # bring the session up now the routes are present
2085 self.vpp_session = VppBFDUDPSession(
2086 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6
2088 self.vpp_session.add_vpp_config()
2089 self.vpp_session.admin_up()
2090 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
2092 # session is up - traffic passes
2093 bfd_session_up(self)
2095 self.pg0.add_stream(p)
2098 captured = self.pg0.wait_for_packet(
2099 1, filter_out_fn=self.pkt_is_not_data_traffic
2101 self.assertEqual(captured[IPv6].dst, packet[IPv6].dst)
2103 # session is up - traffic is dropped
2104 bfd_session_down(self)
2106 self.pg0.add_stream(p)
2108 with self.assertRaises(CaptureTimeoutError):
2109 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
2111 # session is up - traffic passes
2112 bfd_session_up(self)
2114 self.pg0.add_stream(p)
2117 captured = self.pg0.wait_for_packet(
2118 1, filter_out_fn=self.pkt_is_not_data_traffic
2120 self.assertEqual(captured[IPv6].dst, packet[IPv6].dst)
2123 @unittest.skipUnless(config.extended, "part of extended tests")
2124 class BFDTunTestCase(VppTestCase):
2125 """BFD over GRE tunnel"""
2131 def setUpClass(cls):
2132 super(BFDTunTestCase, cls).setUpClass()
2135 def tearDownClass(cls):
2136 super(BFDTunTestCase, cls).tearDownClass()
2139 super(BFDTunTestCase, self).setUp()
2140 self.create_pg_interfaces(range(1))
2142 self.vapi.want_bfd_events()
2143 self.pg0.enable_capture()
2145 for i in self.pg_interfaces:
2151 if not self.vpp_dead:
2152 self.vapi.want_bfd_events(enable_disable=0)
2154 super(BFDTunTestCase, self).tearDown()
2157 def pkt_is_not_data_traffic(p):
2158 """not data traffic implies BFD or the usual IPv6 ND/RA"""
2159 if p.haslayer(BFD) or is_ipv6_misc(p):
2163 def test_bfd_o_gre(self):
2166 # A GRE interface over which to run a BFD session
2167 gre_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
2168 gre_if.add_vpp_config()
2172 # bring the session up now the routes are present
2173 self.vpp_session = VppBFDUDPSession(
2174 self, gre_if, gre_if.remote_ip4, is_tunnel=True
2176 self.vpp_session.add_vpp_config()
2177 self.vpp_session.admin_up()
2179 self.test_session = BFDTestSession(
2183 tunnel_header=(IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / GRE()),
2184 phy_interface=self.pg0,
2187 # packets to match against both of the routes
2190 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2191 / IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4)
2192 / UDP(sport=1234, dport=1234)
2193 / Raw(b"\xa5" * 100)
2197 # session is up - traffic passes
2198 bfd_session_up(self)
2200 self.send_and_expect(self.pg0, p, self.pg0)
2202 # bring session down
2203 bfd_session_down(self)
2207 class BFDSHA1TestCase(VppTestCase):
2208 """Bidirectional Forwarding Detection (BFD) (SHA1 auth)"""
2211 vpp_clock_offset = None
2216 def setUpClass(cls):
2217 super(BFDSHA1TestCase, cls).setUpClass()
2218 cls.vapi.cli("set log class bfd level debug")
2220 cls.create_pg_interfaces([0])
2221 cls.pg0.config_ip4()
2223 cls.pg0.resolve_arp()
2226 super(BFDSHA1TestCase, cls).tearDownClass()
2230 def tearDownClass(cls):
2231 super(BFDSHA1TestCase, cls).tearDownClass()
2234 super(BFDSHA1TestCase, self).setUp()
2235 self.factory = AuthKeyFactory()
2236 self.vapi.want_bfd_events()
2237 self.pg0.enable_capture()
2240 if not self.vpp_dead:
2241 self.vapi.want_bfd_events(enable_disable=False)
2242 self.vapi.collect_events() # clear the event queue
2243 super(BFDSHA1TestCase, self).tearDown()
2245 def test_session_up(self):
2246 """bring BFD session up"""
2247 key = self.factory.create_random_key(self)
2248 key.add_vpp_config()
2249 self.vpp_session = VppBFDUDPSession(
2250 self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2252 self.vpp_session.add_vpp_config()
2253 self.vpp_session.admin_up()
2254 self.test_session = BFDTestSession(
2259 bfd_key_id=self.vpp_session.bfd_key_id,
2261 bfd_session_up(self)
2263 def test_hold_up(self):
2264 """hold BFD session up"""
2265 key = self.factory.create_random_key(self)
2266 key.add_vpp_config()
2267 self.vpp_session = VppBFDUDPSession(
2268 self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2270 self.vpp_session.add_vpp_config()
2271 self.vpp_session.admin_up()
2272 self.test_session = BFDTestSession(
2277 bfd_key_id=self.vpp_session.bfd_key_id,
2279 bfd_session_up(self)
2280 for dummy in range(self.test_session.detect_mult * 2):
2281 wait_for_bfd_packet(self)
2282 self.test_session.send_packet()
2283 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2285 def test_hold_up_meticulous(self):
2286 """hold BFD session up - meticulous auth"""
2287 key = self.factory.create_random_key(self, BFDAuthType.meticulous_keyed_sha1)
2288 key.add_vpp_config()
2289 self.vpp_session = VppBFDUDPSession(
2290 self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2292 self.vpp_session.add_vpp_config()
2293 self.vpp_session.admin_up()
2294 # specify sequence number so that it wraps
2295 self.test_session = BFDTestSession(
2300 bfd_key_id=self.vpp_session.bfd_key_id,
2301 our_seq_number=0xFFFFFFFF - 4,
2303 bfd_session_up(self)
2304 for dummy in range(30):
2305 wait_for_bfd_packet(self)
2306 self.test_session.inc_seq_num()
2307 self.test_session.send_packet()
2308 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2310 def test_send_bad_seq_number(self):
2311 """session is not kept alive by msgs with bad sequence numbers"""
2312 key = self.factory.create_random_key(self, BFDAuthType.meticulous_keyed_sha1)
2313 key.add_vpp_config()
2314 self.vpp_session = VppBFDUDPSession(
2315 self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2317 self.vpp_session.add_vpp_config()
2318 self.test_session = BFDTestSession(
2323 bfd_key_id=self.vpp_session.bfd_key_id,
2325 bfd_session_up(self)
2327 self.test_session.detect_mult
2328 * self.vpp_session.required_min_rx
2331 send_until = time.time() + 2 * detection_time
2332 while time.time() < send_until:
2333 self.test_session.send_packet()
2335 0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2336 "time between bfd packets",
2338 e = self.vapi.collect_events()
2339 # session should be down now, because the sequence numbers weren't
2341 self.assert_equal(len(e), 1, "number of bfd events")
2342 verify_event(self, e[0], expected_state=BFDState.down)
2344 def execute_rogue_session_scenario(
2346 vpp_bfd_udp_session,
2347 legitimate_test_session,
2349 rogue_bfd_values=None,
2351 """execute a rogue session interaction scenario
2353 1. create vpp session, add config
2354 2. bring the legitimate session up
2355 3. copy the bfd values from legitimate session to rogue session
2356 4. apply rogue_bfd_values to rogue session
2357 5. set rogue session state to down
2358 6. send message to take the session down from the rogue session
2359 7. assert that the legitimate session is unaffected
2362 self.vpp_session = vpp_bfd_udp_session
2363 self.vpp_session.add_vpp_config()
2364 self.test_session = legitimate_test_session
2365 # bring vpp session up
2366 bfd_session_up(self)
2367 # send packet from rogue session
2368 rogue_test_session.update(
2369 my_discriminator=self.test_session.my_discriminator,
2370 your_discriminator=self.test_session.your_discriminator,
2371 desired_min_tx=self.test_session.desired_min_tx,
2372 required_min_rx=self.test_session.required_min_rx,
2373 detect_mult=self.test_session.detect_mult,
2374 diag=self.test_session.diag,
2375 state=self.test_session.state,
2376 auth_type=self.test_session.auth_type,
2378 if rogue_bfd_values:
2379 rogue_test_session.update(**rogue_bfd_values)
2380 rogue_test_session.update(state=BFDState.down)
2381 rogue_test_session.send_packet()
2382 wait_for_bfd_packet(self)
2383 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2385 def test_mismatch_auth(self):
2386 """session is not brought down by unauthenticated msg"""
2387 key = self.factory.create_random_key(self)
2388 key.add_vpp_config()
2389 vpp_session = VppBFDUDPSession(
2390 self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2392 legitimate_test_session = BFDTestSession(
2393 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=vpp_session.bfd_key_id
2395 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2396 self.execute_rogue_session_scenario(
2397 vpp_session, legitimate_test_session, rogue_test_session
2400 def test_mismatch_bfd_key_id(self):
2401 """session is not brought down by msg with non-existent key-id"""
2402 key = self.factory.create_random_key(self)
2403 key.add_vpp_config()
2404 vpp_session = VppBFDUDPSession(
2405 self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2407 # pick a different random bfd key id
2409 while x == vpp_session.bfd_key_id:
2411 legitimate_test_session = BFDTestSession(
2412 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=vpp_session.bfd_key_id
2414 rogue_test_session = BFDTestSession(
2415 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x
2417 self.execute_rogue_session_scenario(
2418 vpp_session, legitimate_test_session, rogue_test_session
2421 def test_mismatched_auth_type(self):
2422 """session is not brought down by msg with wrong auth type"""
2423 key = self.factory.create_random_key(self)
2424 key.add_vpp_config()
2425 vpp_session = VppBFDUDPSession(
2426 self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2428 legitimate_test_session = BFDTestSession(
2429 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=vpp_session.bfd_key_id
2431 rogue_test_session = BFDTestSession(
2432 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=vpp_session.bfd_key_id
2434 self.execute_rogue_session_scenario(
2436 legitimate_test_session,
2438 {"auth_type": BFDAuthType.keyed_md5},
2441 def test_restart(self):
2442 """simulate remote peer restart and resynchronization"""
2443 key = self.factory.create_random_key(self, BFDAuthType.meticulous_keyed_sha1)
2444 key.add_vpp_config()
2445 self.vpp_session = VppBFDUDPSession(
2446 self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2448 self.vpp_session.add_vpp_config()
2449 self.test_session = BFDTestSession(
2454 bfd_key_id=self.vpp_session.bfd_key_id,
2457 bfd_session_up(self)
2458 # don't send any packets for 2*detection_time
2460 self.test_session.detect_mult
2461 * self.vpp_session.required_min_rx
2464 self.sleep(2 * detection_time, "simulating peer restart")
2465 events = self.vapi.collect_events()
2466 self.assert_equal(len(events), 1, "number of bfd events")
2467 verify_event(self, events[0], expected_state=BFDState.down)
2468 self.test_session.update(state=BFDState.down)
2469 # reset sequence number
2470 self.test_session.our_seq_number = 0
2471 self.test_session.vpp_seq_number = None
2472 # now throw away any pending packets
2473 self.pg0.enable_capture()
2474 self.test_session.my_discriminator = 0
2475 bfd_session_up(self)
2479 class BFDAuthOnOffTestCase(VppTestCase):
2480 """Bidirectional Forwarding Detection (BFD) (changing auth)"""
2487 def setUpClass(cls):
2488 super(BFDAuthOnOffTestCase, cls).setUpClass()
2489 cls.vapi.cli("set log class bfd level debug")
2491 cls.create_pg_interfaces([0])
2492 cls.pg0.config_ip4()
2494 cls.pg0.resolve_arp()
2497 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2501 def tearDownClass(cls):
2502 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2505 super(BFDAuthOnOffTestCase, self).setUp()
2506 self.factory = AuthKeyFactory()
2507 self.vapi.want_bfd_events()
2508 self.pg0.enable_capture()
2511 if not self.vpp_dead:
2512 self.vapi.want_bfd_events(enable_disable=False)
2513 self.vapi.collect_events() # clear the event queue
2514 super(BFDAuthOnOffTestCase, self).tearDown()
2516 def test_auth_on_immediate(self):
2517 """turn auth on without disturbing session state (immediate)"""
2518 key = self.factory.create_random_key(self)
2519 key.add_vpp_config()
2520 self.vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2521 self.vpp_session.add_vpp_config()
2522 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2523 bfd_session_up(self)
2524 for dummy in range(self.test_session.detect_mult * 2):
2525 p = wait_for_bfd_packet(self)
2526 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2527 self.test_session.send_packet()
2528 self.vpp_session.activate_auth(key)
2529 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2530 self.test_session.sha1_key = key
2531 for dummy in range(self.test_session.detect_mult * 2):
2532 p = wait_for_bfd_packet(self)
2533 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2534 self.test_session.send_packet()
2535 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2536 self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2538 def test_auth_off_immediate(self):
2539 """turn auth off without disturbing session state (immediate)"""
2540 key = self.factory.create_random_key(self)
2541 key.add_vpp_config()
2542 self.vpp_session = VppBFDUDPSession(
2543 self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2545 self.vpp_session.add_vpp_config()
2546 self.test_session = BFDTestSession(
2551 bfd_key_id=self.vpp_session.bfd_key_id,
2553 bfd_session_up(self)
2554 # self.vapi.want_bfd_events(enable_disable=0)
2555 for dummy in range(self.test_session.detect_mult * 2):
2556 p = wait_for_bfd_packet(self)
2557 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2558 self.test_session.inc_seq_num()
2559 self.test_session.send_packet()
2560 self.vpp_session.deactivate_auth()
2561 self.test_session.bfd_key_id = None
2562 self.test_session.sha1_key = None
2563 for dummy in range(self.test_session.detect_mult * 2):
2564 p = wait_for_bfd_packet(self)
2565 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2566 self.test_session.inc_seq_num()
2567 self.test_session.send_packet()
2568 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2569 self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2571 def test_auth_change_key_immediate(self):
2572 """change auth key without disturbing session state (immediate)"""
2573 key1 = self.factory.create_random_key(self)
2574 key1.add_vpp_config()
2575 key2 = self.factory.create_random_key(self)
2576 key2.add_vpp_config()
2577 self.vpp_session = VppBFDUDPSession(
2578 self, self.pg0, self.pg0.remote_ip4, sha1_key=key1
2580 self.vpp_session.add_vpp_config()
2581 self.test_session = BFDTestSession(
2586 bfd_key_id=self.vpp_session.bfd_key_id,
2588 bfd_session_up(self)
2589 for dummy in range(self.test_session.detect_mult * 2):
2590 p = wait_for_bfd_packet(self)
2591 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2592 self.test_session.send_packet()
2593 self.vpp_session.activate_auth(key2)
2594 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2595 self.test_session.sha1_key = key2
2596 for dummy in range(self.test_session.detect_mult * 2):
2597 p = wait_for_bfd_packet(self)
2598 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2599 self.test_session.send_packet()
2600 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2601 self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2603 def test_auth_on_delayed(self):
2604 """turn auth on without disturbing session state (delayed)"""
2605 key = self.factory.create_random_key(self)
2606 key.add_vpp_config()
2607 self.vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2608 self.vpp_session.add_vpp_config()
2609 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2610 bfd_session_up(self)
2611 for dummy in range(self.test_session.detect_mult * 2):
2612 wait_for_bfd_packet(self)
2613 self.test_session.send_packet()
2614 self.vpp_session.activate_auth(key, delayed=True)
2615 for dummy in range(self.test_session.detect_mult * 2):
2616 p = wait_for_bfd_packet(self)
2617 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2618 self.test_session.send_packet()
2619 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2620 self.test_session.sha1_key = key
2621 self.test_session.send_packet()
2622 for dummy in range(self.test_session.detect_mult * 2):
2623 p = wait_for_bfd_packet(self)
2624 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2625 self.test_session.send_packet()
2626 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2627 self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2629 def test_auth_off_delayed(self):
2630 """turn auth off without disturbing session state (delayed)"""
2631 key = self.factory.create_random_key(self)
2632 key.add_vpp_config()
2633 self.vpp_session = VppBFDUDPSession(
2634 self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2636 self.vpp_session.add_vpp_config()
2637 self.test_session = BFDTestSession(
2642 bfd_key_id=self.vpp_session.bfd_key_id,
2644 bfd_session_up(self)
2645 for dummy in range(self.test_session.detect_mult * 2):
2646 p = wait_for_bfd_packet(self)
2647 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2648 self.test_session.send_packet()
2649 self.vpp_session.deactivate_auth(delayed=True)
2650 for dummy in range(self.test_session.detect_mult * 2):
2651 p = wait_for_bfd_packet(self)
2652 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2653 self.test_session.send_packet()
2654 self.test_session.bfd_key_id = None
2655 self.test_session.sha1_key = None
2656 self.test_session.send_packet()
2657 for dummy in range(self.test_session.detect_mult * 2):
2658 p = wait_for_bfd_packet(self)
2659 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2660 self.test_session.send_packet()
2661 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2662 self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2664 def test_auth_change_key_delayed(self):
2665 """change auth key without disturbing session state (delayed)"""
2666 key1 = self.factory.create_random_key(self)
2667 key1.add_vpp_config()
2668 key2 = self.factory.create_random_key(self)
2669 key2.add_vpp_config()
2670 self.vpp_session = VppBFDUDPSession(
2671 self, self.pg0, self.pg0.remote_ip4, sha1_key=key1
2673 self.vpp_session.add_vpp_config()
2674 self.vpp_session.admin_up()
2675 self.test_session = BFDTestSession(
2680 bfd_key_id=self.vpp_session.bfd_key_id,
2682 bfd_session_up(self)
2683 for dummy in range(self.test_session.detect_mult * 2):
2684 p = wait_for_bfd_packet(self)
2685 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2686 self.test_session.send_packet()
2687 self.vpp_session.activate_auth(key2, delayed=True)
2688 for dummy in range(self.test_session.detect_mult * 2):
2689 p = wait_for_bfd_packet(self)
2690 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2691 self.test_session.send_packet()
2692 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2693 self.test_session.sha1_key = key2
2694 self.test_session.send_packet()
2695 for dummy in range(self.test_session.detect_mult * 2):
2696 p = wait_for_bfd_packet(self)
2697 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2698 self.test_session.send_packet()
2699 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2700 self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2704 class BFDCLITestCase(VppTestCase):
2705 """Bidirectional Forwarding Detection (BFD) (CLI)"""
2710 def setUpClass(cls):
2711 super(BFDCLITestCase, cls).setUpClass()
2712 cls.vapi.cli("set log class bfd level debug")
2714 cls.create_pg_interfaces((0,))
2715 cls.pg0.config_ip4()
2716 cls.pg0.config_ip6()
2717 cls.pg0.resolve_arp()
2718 cls.pg0.resolve_ndp()
2721 super(BFDCLITestCase, cls).tearDownClass()
2725 def tearDownClass(cls):
2726 super(BFDCLITestCase, cls).tearDownClass()
2729 super(BFDCLITestCase, self).setUp()
2730 self.factory = AuthKeyFactory()
2731 self.pg0.enable_capture()
2735 self.vapi.want_bfd_events(enable_disable=False)
2736 except UnexpectedApiReturnValueError:
2737 # some tests aren't subscribed, so this is not an issue
2739 self.vapi.collect_events() # clear the event queue
2740 super(BFDCLITestCase, self).tearDown()
2742 def cli_verify_no_response(self, cli):
2743 """execute a CLI, asserting that the response is empty"""
2744 self.assert_equal(self.vapi.cli(cli), "", "CLI command response")
2746 def cli_verify_response(self, cli, expected):
2747 """execute a CLI, asserting that the response matches expectation"""
2749 reply = self.vapi.cli(cli)
2750 except CliFailedCommandError as cli_error:
2751 reply = str(cli_error)
2752 self.assert_equal(reply.strip(), expected, "CLI command response")
2754 def test_show(self):
2756 k1 = self.factory.create_random_key(self)
2758 k2 = self.factory.create_random_key(
2759 self, auth_type=BFDAuthType.meticulous_keyed_sha1
2762 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2764 s2 = VppBFDUDPSession(
2765 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=k2
2768 self.logger.info(self.vapi.ppcli("show bfd keys"))
2769 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2770 self.logger.info(self.vapi.ppcli("show bfd"))
2772 def test_set_del_sha1_key(self):
2773 """set/delete SHA1 auth key"""
2774 k = self.factory.create_random_key(self)
2775 self.registry.register(k, self.logger)
2776 self.cli_verify_no_response(
2777 "bfd key set conf-key-id %s type keyed-sha1 secret %s"
2780 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key),
2783 self.assertTrue(k.query_vpp_config())
2784 self.vpp_session = VppBFDUDPSession(
2785 self, self.pg0, self.pg0.remote_ip4, sha1_key=k
2787 self.vpp_session.add_vpp_config()
2788 self.test_session = BFDTestSession(
2789 self, self.pg0, AF_INET, sha1_key=k, bfd_key_id=self.vpp_session.bfd_key_id
2791 self.vapi.want_bfd_events()
2792 bfd_session_up(self)
2793 bfd_session_down(self)
2794 # try to replace the secret for the key - should fail because the key
2796 k2 = self.factory.create_random_key(self)
2797 self.cli_verify_response(
2798 "bfd key set conf-key-id %s type keyed-sha1 secret %s"
2801 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key),
2803 "bfd key set: `bfd_auth_set_key' API call failed, "
2804 "rv=-103:BFD object in use",
2806 # manipulating the session using old secret should still work
2807 bfd_session_up(self)
2808 bfd_session_down(self)
2809 self.vpp_session.remove_vpp_config()
2810 self.cli_verify_no_response("bfd key del conf-key-id %s" % k.conf_key_id)
2811 self.assertFalse(k.query_vpp_config())
2813 def test_set_del_meticulous_sha1_key(self):
2814 """set/delete meticulous SHA1 auth key"""
2815 k = self.factory.create_random_key(
2816 self, auth_type=BFDAuthType.meticulous_keyed_sha1
2818 self.registry.register(k, self.logger)
2819 self.cli_verify_no_response(
2820 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s"
2823 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key),
2826 self.assertTrue(k.query_vpp_config())
2827 self.vpp_session = VppBFDUDPSession(
2828 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=k
2830 self.vpp_session.add_vpp_config()
2831 self.vpp_session.admin_up()
2832 self.test_session = BFDTestSession(
2833 self, self.pg0, AF_INET6, sha1_key=k, bfd_key_id=self.vpp_session.bfd_key_id
2835 self.vapi.want_bfd_events()
2836 bfd_session_up(self)
2837 bfd_session_down(self)
2838 # try to replace the secret for the key - should fail because the key
2840 k2 = self.factory.create_random_key(self)
2841 self.cli_verify_response(
2842 "bfd key set conf-key-id %s type keyed-sha1 secret %s"
2845 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key),
2847 "bfd key set: `bfd_auth_set_key' API call failed, "
2848 "rv=-103:BFD object in use",
2850 # manipulating the session using old secret should still work
2851 bfd_session_up(self)
2852 bfd_session_down(self)
2853 self.vpp_session.remove_vpp_config()
2854 self.cli_verify_no_response("bfd key del conf-key-id %s" % k.conf_key_id)
2855 self.assertFalse(k.query_vpp_config())
2857 def test_add_mod_del_bfd_udp(self):
2858 """create/modify/delete IPv4 BFD UDP session"""
2859 vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2860 self.registry.register(vpp_session, self.logger)
2862 "bfd udp session add interface %s local-addr %s "
2863 "peer-addr %s desired-min-tx %s required-min-rx %s "
2868 self.pg0.remote_ip4,
2869 vpp_session.desired_min_tx,
2870 vpp_session.required_min_rx,
2871 vpp_session.detect_mult,
2874 self.cli_verify_no_response(cli_add_cmd)
2875 # 2nd add should fail
2876 self.cli_verify_response(
2878 "bfd udp session add: `bfd_add_add_session' API call"
2879 " failed, rv=-101:Duplicate BFD object",
2881 verify_bfd_session_config(self, vpp_session)
2882 mod_session = VppBFDUDPSession(
2885 self.pg0.remote_ip4,
2886 required_min_rx=2 * vpp_session.required_min_rx,
2887 desired_min_tx=3 * vpp_session.desired_min_tx,
2888 detect_mult=4 * vpp_session.detect_mult,
2890 self.cli_verify_no_response(
2891 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2892 "desired-min-tx %s required-min-rx %s detect-mult %s"
2896 self.pg0.remote_ip4,
2897 mod_session.desired_min_tx,
2898 mod_session.required_min_rx,
2899 mod_session.detect_mult,
2902 verify_bfd_session_config(self, mod_session)
2904 "bfd udp session del interface %s local-addr %s "
2905 "peer-addr %s" % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2907 self.cli_verify_no_response(cli_del_cmd)
2908 # 2nd del is expected to fail
2909 self.cli_verify_response(
2911 "bfd udp session del: `bfd_udp_del_session' API call"
2912 " failed, rv=-102:No such BFD object",
2914 self.assertFalse(vpp_session.query_vpp_config())
2916 def test_add_mod_del_bfd_udp6(self):
2917 """create/modify/delete IPv6 BFD UDP session"""
2918 vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2919 self.registry.register(vpp_session, self.logger)
2921 "bfd udp session add interface %s local-addr %s "
2922 "peer-addr %s desired-min-tx %s required-min-rx %s "
2927 self.pg0.remote_ip6,
2928 vpp_session.desired_min_tx,
2929 vpp_session.required_min_rx,
2930 vpp_session.detect_mult,
2933 self.cli_verify_no_response(cli_add_cmd)
2934 # 2nd add should fail
2935 self.cli_verify_response(
2937 "bfd udp session add: `bfd_add_add_session' API call"
2938 " failed, rv=-101:Duplicate BFD object",
2940 verify_bfd_session_config(self, vpp_session)
2941 mod_session = VppBFDUDPSession(
2944 self.pg0.remote_ip6,
2946 required_min_rx=2 * vpp_session.required_min_rx,
2947 desired_min_tx=3 * vpp_session.desired_min_tx,
2948 detect_mult=4 * vpp_session.detect_mult,
2950 self.cli_verify_no_response(
2951 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2952 "desired-min-tx %s required-min-rx %s detect-mult %s"
2956 self.pg0.remote_ip6,
2957 mod_session.desired_min_tx,
2958 mod_session.required_min_rx,
2959 mod_session.detect_mult,
2962 verify_bfd_session_config(self, mod_session)
2964 "bfd udp session del interface %s local-addr %s "
2965 "peer-addr %s" % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6)
2967 self.cli_verify_no_response(cli_del_cmd)
2968 # 2nd del is expected to fail
2969 self.cli_verify_response(
2971 "bfd udp session del: `bfd_udp_del_session' API call"
2972 " failed, rv=-102:No such BFD object",
2974 self.assertFalse(vpp_session.query_vpp_config())
2976 def test_add_mod_del_bfd_udp_auth(self):
2977 """create/modify/delete IPv4 BFD UDP session (authenticated)"""
2978 key = self.factory.create_random_key(self)
2979 key.add_vpp_config()
2980 vpp_session = VppBFDUDPSession(
2981 self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2983 self.registry.register(vpp_session, self.logger)
2985 "bfd udp session add interface %s local-addr %s "
2986 "peer-addr %s desired-min-tx %s required-min-rx %s "
2987 "detect-mult %s conf-key-id %s bfd-key-id %s"
2991 self.pg0.remote_ip4,
2992 vpp_session.desired_min_tx,
2993 vpp_session.required_min_rx,
2994 vpp_session.detect_mult,
2996 vpp_session.bfd_key_id,
2999 self.cli_verify_no_response(cli_add_cmd)
3000 # 2nd add should fail
3001 self.cli_verify_response(
3003 "bfd udp session add: `bfd_add_add_session' API call"
3004 " failed, rv=-101:Duplicate BFD object",
3006 verify_bfd_session_config(self, vpp_session)
3007 mod_session = VppBFDUDPSession(
3010 self.pg0.remote_ip4,
3012 bfd_key_id=vpp_session.bfd_key_id,
3013 required_min_rx=2 * vpp_session.required_min_rx,
3014 desired_min_tx=3 * vpp_session.desired_min_tx,
3015 detect_mult=4 * vpp_session.detect_mult,
3017 self.cli_verify_no_response(
3018 "bfd udp session mod interface %s local-addr %s peer-addr %s "
3019 "desired-min-tx %s required-min-rx %s detect-mult %s"
3023 self.pg0.remote_ip4,
3024 mod_session.desired_min_tx,
3025 mod_session.required_min_rx,
3026 mod_session.detect_mult,
3029 verify_bfd_session_config(self, mod_session)
3031 "bfd udp session del interface %s local-addr %s "
3032 "peer-addr %s" % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3034 self.cli_verify_no_response(cli_del_cmd)
3035 # 2nd del is expected to fail
3036 self.cli_verify_response(
3038 "bfd udp session del: `bfd_udp_del_session' API call"
3039 " failed, rv=-102:No such BFD object",
3041 self.assertFalse(vpp_session.query_vpp_config())
3043 def test_add_mod_del_bfd_udp6_auth(self):
3044 """create/modify/delete IPv6 BFD UDP session (authenticated)"""
3045 key = self.factory.create_random_key(
3046 self, auth_type=BFDAuthType.meticulous_keyed_sha1
3048 key.add_vpp_config()
3049 vpp_session = VppBFDUDPSession(
3050 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key
3052 self.registry.register(vpp_session, self.logger)
3054 "bfd udp session add interface %s local-addr %s "
3055 "peer-addr %s desired-min-tx %s required-min-rx %s "
3056 "detect-mult %s conf-key-id %s bfd-key-id %s"
3060 self.pg0.remote_ip6,
3061 vpp_session.desired_min_tx,
3062 vpp_session.required_min_rx,
3063 vpp_session.detect_mult,
3065 vpp_session.bfd_key_id,
3068 self.cli_verify_no_response(cli_add_cmd)
3069 # 2nd add should fail
3070 self.cli_verify_response(
3072 "bfd udp session add: `bfd_add_add_session' API call"
3073 " failed, rv=-101:Duplicate BFD object",
3075 verify_bfd_session_config(self, vpp_session)
3076 mod_session = VppBFDUDPSession(
3079 self.pg0.remote_ip6,
3082 bfd_key_id=vpp_session.bfd_key_id,
3083 required_min_rx=2 * vpp_session.required_min_rx,
3084 desired_min_tx=3 * vpp_session.desired_min_tx,
3085 detect_mult=4 * vpp_session.detect_mult,
3087 self.cli_verify_no_response(
3088 "bfd udp session mod interface %s local-addr %s peer-addr %s "
3089 "desired-min-tx %s required-min-rx %s detect-mult %s"
3093 self.pg0.remote_ip6,
3094 mod_session.desired_min_tx,
3095 mod_session.required_min_rx,
3096 mod_session.detect_mult,
3099 verify_bfd_session_config(self, mod_session)
3101 "bfd udp session del interface %s local-addr %s "
3102 "peer-addr %s" % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6)
3104 self.cli_verify_no_response(cli_del_cmd)
3105 # 2nd del is expected to fail
3106 self.cli_verify_response(
3108 "bfd udp session del: `bfd_udp_del_session' API call"
3109 " failed, rv=-102:No such BFD object",
3111 self.assertFalse(vpp_session.query_vpp_config())
3113 def test_auth_on_off(self):
3114 """turn authentication on and off"""
3115 key = self.factory.create_random_key(
3116 self, auth_type=BFDAuthType.meticulous_keyed_sha1
3118 key.add_vpp_config()
3119 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
3120 auth_session = VppBFDUDPSession(
3121 self, self.pg0, self.pg0.remote_ip4, sha1_key=key
3123 session.add_vpp_config()
3125 "bfd udp session auth activate interface %s local-addr %s "
3126 "peer-addr %s conf-key-id %s bfd-key-id %s"
3130 self.pg0.remote_ip4,
3132 auth_session.bfd_key_id,
3135 self.cli_verify_no_response(cli_activate)
3136 verify_bfd_session_config(self, auth_session)
3137 self.cli_verify_no_response(cli_activate)
3138 verify_bfd_session_config(self, auth_session)
3140 "bfd udp session auth deactivate interface %s local-addr %s "
3141 "peer-addr %s " % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3143 self.cli_verify_no_response(cli_deactivate)
3144 verify_bfd_session_config(self, session)
3145 self.cli_verify_no_response(cli_deactivate)
3146 verify_bfd_session_config(self, session)
3148 def test_auth_on_off_delayed(self):
3149 """turn authentication on and off (delayed)"""
3150 key = self.factory.create_random_key(
3151 self, auth_type=BFDAuthType.meticulous_keyed_sha1
3153 key.add_vpp_config()
3154 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
3155 auth_session = VppBFDUDPSession(
3156 self, self.pg0, self.pg0.remote_ip4, sha1_key=key
3158 session.add_vpp_config()
3160 "bfd udp session auth activate interface %s local-addr %s "
3161 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"
3165 self.pg0.remote_ip4,
3167 auth_session.bfd_key_id,
3170 self.cli_verify_no_response(cli_activate)
3171 verify_bfd_session_config(self, auth_session)
3172 self.cli_verify_no_response(cli_activate)
3173 verify_bfd_session_config(self, auth_session)
3175 "bfd udp session auth deactivate interface %s local-addr %s "
3176 "peer-addr %s delayed yes"
3177 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3179 self.cli_verify_no_response(cli_deactivate)
3180 verify_bfd_session_config(self, session)
3181 self.cli_verify_no_response(cli_deactivate)
3182 verify_bfd_session_config(self, session)
3184 def test_admin_up_down(self):
3185 """put session admin-up and admin-down"""
3186 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
3187 session.add_vpp_config()
3189 "bfd udp session set-flags admin down interface %s local-addr %s "
3190 "peer-addr %s " % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3193 "bfd udp session set-flags admin up interface %s local-addr %s "
3194 "peer-addr %s " % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3196 self.cli_verify_no_response(cli_down)
3197 verify_bfd_session_config(self, session, state=BFDState.admin_down)
3198 self.cli_verify_no_response(cli_up)
3199 verify_bfd_session_config(self, session, state=BFDState.down)
3201 def test_set_del_udp_echo_source(self):
3202 """set/del udp echo source"""
3203 self.create_loopback_interfaces(1)
3204 self.loopback0 = self.lo_interfaces[0]
3205 self.loopback0.admin_up()
3206 self.cli_verify_response("show bfd echo-source", "UDP echo source is not set.")
3207 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
3208 self.cli_verify_no_response(cli_set)
3209 self.cli_verify_response(
3210 "show bfd echo-source",
3211 "UDP echo source is: %s\n"
3212 "IPv4 address usable as echo source: none\n"
3213 "IPv6 address usable as echo source: none" % self.loopback0.name,
3215 self.loopback0.config_ip4()
3217 ipaddress.IPv4Address(
3218 int(ipaddress.IPv4Address(self.loopback0.local_ip4)) ^ 1
3221 self.cli_verify_response(
3222 "show bfd echo-source",
3223 "UDP echo source is: %s\n"
3224 "IPv4 address usable as echo source: %s\n"
3225 "IPv6 address usable as echo source: none"
3226 % (self.loopback0.name, echo_ip4),
3229 ipaddress.IPv6Address(
3230 int(ipaddress.IPv6Address(self.loopback0.local_ip6)) ^ 1
3233 self.loopback0.config_ip6()
3234 self.cli_verify_response(
3235 "show bfd echo-source",
3236 "UDP echo source is: %s\n"
3237 "IPv4 address usable as echo source: %s\n"
3238 "IPv6 address usable as echo source: %s"
3239 % (self.loopback0.name, echo_ip4, echo_ip6),
3241 cli_del = "bfd udp echo-source del"
3242 self.cli_verify_no_response(cli_del)
3243 self.cli_verify_response("show bfd echo-source", "UDP echo source is not set.")
3246 if __name__ == "__main__":
3247 unittest.main(testRunner=VppTestRunner)