+ self.assertTrue(echo_seen, "No echo packets received")
+
+ stats_after = bfd_grab_stats_snapshot(self)
+ diff = bfd_stats_diff(stats_before, stats_after)
+ # our rx is vpp tx and vice versa, also tolerate one packet off
+ self.assert_in_range(
+ self.test_session.tx_packets, diff.rx - 1, diff.rx + 1, "RX counter"
+ )
+ self.assert_in_range(
+ self.test_session.rx_packets, diff.tx - 1, diff.tx + 1, "TX counter"
+ )
+ self.assert_in_range(
+ self.test_session.tx_packets_echo,
+ diff.rx_echo - 1,
+ diff.rx_echo + 1,
+ "RX echo counter",
+ )
+ self.assert_in_range(
+ self.test_session.rx_packets_echo,
+ diff.tx_echo - 1,
+ diff.tx_echo + 1,
+ "TX echo counter",
+ )
+
+ def test_intf_deleted(self):
+ """interface with bfd session deleted"""
+ intf = VppLoInterface(self)
+ intf.config_ip6()
+ intf.admin_up()
+ sw_if_index = intf.sw_if_index
+ vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip6, af=AF_INET6)
+ vpp_session.add_vpp_config()
+ vpp_session.admin_up()
+ intf.remove_vpp_config()
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
+ self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
+ self.assertFalse(vpp_session.query_vpp_config())
+
+
+@tag_run_solo
+class BFDFIBTestCase(VppTestCase):
+ """BFD-FIB interactions (IPv6)"""
+
+ vpp_session = None
+ test_session = None
+
+ @classmethod
+ def setUpClass(cls):
+ super(BFDFIBTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(BFDFIBTestCase, cls).tearDownClass()
+
+ def setUp(self):
+ super(BFDFIBTestCase, self).setUp()
+ self.create_pg_interfaces(range(1))
+
+ self.vapi.want_bfd_events()
+ self.pg0.enable_capture()
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip6()
+ i.configure_ipv6_neighbors()
+
+ def tearDown(self):
+ if not self.vpp_dead:
+ self.vapi.want_bfd_events(enable_disable=False)
+
+ super(BFDFIBTestCase, self).tearDown()
+
+ @staticmethod
+ def pkt_is_not_data_traffic(p):
+ """not data traffic implies BFD or the usual IPv6 ND/RA"""
+ if p.haslayer(BFD) or is_ipv6_misc(p):
+ return True
+ return False
+
+ def test_session_with_fib(self):
+ """BFD-FIB interactions"""
+
+ # packets to match against both of the routes
+ p = [
+ (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IPv6(src="3001::1", dst="2001::1")
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ ),
+ (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IPv6(src="3001::1", dst="2002::1")
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ ),
+ ]
+
+ # A recursive and a non-recursive route via a next-hop that
+ # will have a BFD session
+ ip_2001_s_64 = VppIpRoute(
+ self,
+ "2001::",
+ 64,
+ [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
+ )
+ ip_2002_s_64 = VppIpRoute(
+ self, "2002::", 64, [VppRoutePath(self.pg0.remote_ip6, 0xFFFFFFFF)]
+ )
+ ip_2001_s_64.add_vpp_config()
+ ip_2002_s_64.add_vpp_config()
+
+ # bring the session up now the routes are present
+ self.vpp_session = VppBFDUDPSession(
+ self, self.pg0, self.pg0.remote_ip6, af=AF_INET6
+ )
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
+
+ # session is up - traffic passes
+ bfd_session_up(self)
+
+ self.pg0.add_stream(p)
+ self.pg_start()
+ for packet in p:
+ captured = self.pg0.wait_for_packet(
+ 1, filter_out_fn=self.pkt_is_not_data_traffic
+ )
+ self.assertEqual(captured[IPv6].dst, packet[IPv6].dst)
+
+ # session is up - traffic is dropped
+ bfd_session_down(self)
+
+ self.pg0.add_stream(p)
+ self.pg_start()
+ with self.assertRaises(CaptureTimeoutError):
+ self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
+
+ # session is up - traffic passes
+ bfd_session_up(self)
+
+ self.pg0.add_stream(p)
+ self.pg_start()
+ for packet in p:
+ captured = self.pg0.wait_for_packet(
+ 1, filter_out_fn=self.pkt_is_not_data_traffic
+ )
+ self.assertEqual(captured[IPv6].dst, packet[IPv6].dst)
+
+
+@unittest.skipUnless(config.extended, "part of extended tests")
+class BFDTunTestCase(VppTestCase):
+ """BFD over GRE tunnel"""
+
+ vpp_session = None
+ test_session = None
+
+ @classmethod
+ def setUpClass(cls):
+ super(BFDTunTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(BFDTunTestCase, cls).tearDownClass()
+
+ def setUp(self):
+ super(BFDTunTestCase, self).setUp()
+ self.create_pg_interfaces(range(1))
+
+ self.vapi.want_bfd_events()
+ self.pg0.enable_capture()
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ def tearDown(self):
+ if not self.vpp_dead:
+ self.vapi.want_bfd_events(enable_disable=0)
+
+ super(BFDTunTestCase, self).tearDown()
+
+ @staticmethod
+ def pkt_is_not_data_traffic(p):
+ """not data traffic implies BFD or the usual IPv6 ND/RA"""
+ if p.haslayer(BFD) or is_ipv6_misc(p):
+ return True
+ return False
+
+ def test_bfd_o_gre(self):
+ """BFD-o-GRE"""
+
+ # A GRE interface over which to run a BFD session
+ gre_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
+ gre_if.add_vpp_config()
+ gre_if.admin_up()
+ gre_if.config_ip4()
+
+ # bring the session up now the routes are present
+ self.vpp_session = VppBFDUDPSession(
+ self, gre_if, gre_if.remote_ip4, is_tunnel=True
+ )
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+
+ self.test_session = BFDTestSession(
+ self,
+ gre_if,
+ AF_INET,
+ tunnel_header=(IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / GRE()),
+ phy_interface=self.pg0,
+ )
+
+ # packets to match against both of the routes
+ p = [
+ (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4)
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
+ ]
+
+ # session is up - traffic passes
+ bfd_session_up(self)
+
+ self.send_and_expect(self.pg0, p, self.pg0)
+
+ # bring session down
+ bfd_session_down(self)