+ vpp_session = None
+ test_session = None
+
+ @classmethod
+ def setUpClass(cls):
+ super(BFDFIBTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(BFDFIBTestCase, cls).tearDownClass()
+
+ def setUp(self):
+ super(BFDFIBTestCase, self).setUp()
+ self.create_pg_interfaces(range(1))
+
+ self.vapi.want_bfd_events()
+ self.pg0.enable_capture()
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip6()
+ i.configure_ipv6_neighbors()
+
+ def tearDown(self):
+ if not self.vpp_dead:
+ self.vapi.want_bfd_events(enable_disable=0)
+
+ super(BFDFIBTestCase, self).tearDown()
+
+ @staticmethod
+ def pkt_is_not_data_traffic(p):
+ """ not data traffic implies BFD or the usual IPv6 ND/RA"""
+ if p.haslayer(BFD) or is_ipv6_misc(p):
+ return True
+ return False
+
+ def test_session_with_fib(self):
+ """ BFD-FIB interactions """
+
+ # packets to match against both of the routes
+ p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src="3001::1", dst="2001::1") /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100)),
+ (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src="3001::1", dst="2002::1") /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))]
+
+ # A recursive and a non-recursive route via a next-hop that
+ # will have a BFD session
+ ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
+ [VppRoutePath(self.pg0.remote_ip6,
+ self.pg0.sw_if_index)])
+ ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
+ [VppRoutePath(self.pg0.remote_ip6,
+ 0xffffffff)])
+ ip_2001_s_64.add_vpp_config()
+ ip_2002_s_64.add_vpp_config()
+
+ # bring the session up now the routes are present
+ self.vpp_session = VppBFDUDPSession(self,
+ self.pg0,
+ self.pg0.remote_ip6,
+ af=AF_INET6)
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
+
+ # session is up - traffic passes
+ bfd_session_up(self)
+
+ self.pg0.add_stream(p)
+ self.pg_start()
+ for packet in p:
+ captured = self.pg0.wait_for_packet(
+ 1,
+ filter_out_fn=self.pkt_is_not_data_traffic)
+ self.assertEqual(captured[IPv6].dst,
+ packet[IPv6].dst)
+
+ # session is up - traffic is dropped
+ bfd_session_down(self)
+
+ self.pg0.add_stream(p)
+ self.pg_start()
+ with self.assertRaises(CaptureTimeoutError):
+ self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
+
+ # session is up - traffic passes
+ bfd_session_up(self)
+
+ self.pg0.add_stream(p)
+ self.pg_start()
+ for packet in p:
+ captured = self.pg0.wait_for_packet(
+ 1,
+ filter_out_fn=self.pkt_is_not_data_traffic)
+ self.assertEqual(captured[IPv6].dst,
+ packet[IPv6].dst)
+
+
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
+class BFDTunTestCase(VppTestCase):
+ """ BFD over GRE tunnel """
+
+ vpp_session = None
+ test_session = None
+
+ @classmethod
+ def setUpClass(cls):
+ super(BFDTunTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(BFDTunTestCase, cls).tearDownClass()
+
+ def setUp(self):
+ super(BFDTunTestCase, self).setUp()
+ self.create_pg_interfaces(range(1))
+
+ self.vapi.want_bfd_events()
+ self.pg0.enable_capture()
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ def tearDown(self):
+ if not self.vpp_dead:
+ self.vapi.want_bfd_events(enable_disable=0)
+
+ super(BFDTunTestCase, self).tearDown()
+
+ @staticmethod
+ def pkt_is_not_data_traffic(p):
+ """ not data traffic implies BFD or the usual IPv6 ND/RA"""
+ if p.haslayer(BFD) or is_ipv6_misc(p):
+ return True
+ return False
+
+ def test_bfd_o_gre(self):
+ """ BFD-o-GRE """
+
+ # A GRE interface over which to run a BFD session
+ gre_if = VppGreInterface(self,
+ self.pg0.local_ip4,
+ self.pg0.remote_ip4)
+ gre_if.add_vpp_config()
+ gre_if.admin_up()
+ gre_if.config_ip4()
+
+ # bring the session up now the routes are present
+ self.vpp_session = VppBFDUDPSession(self,
+ gre_if,
+ gre_if.remote_ip4,
+ is_tunnel=True)
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+
+ self.test_session = BFDTestSession(
+ self, gre_if, AF_INET,
+ tunnel_header=(IP(src=self.pg0.remote_ip4,
+ dst=self.pg0.local_ip4) /
+ GRE()),
+ phy_interface=self.pg0)
+
+ # packets to match against both of the routes
+ p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))]
+
+ # session is up - traffic passes
+ bfd_session_up(self)
+
+ self.send_and_expect(self.pg0, p, self.pg0)
+
+ # bring session down
+ bfd_session_down(self)
+
+
+@unittest.skipUnless(running_extended_tests, "part of extended tests")