tests: fix typo in test_vlib
[vpp.git] / test / test_bfd.py
index 4c3f535..c8bda73 100644 (file)
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 """ BFD tests """
 
 from __future__ import division
@@ -15,7 +15,7 @@ from six import moves
 import scapy.compat
 from scapy.layers.inet import UDP, IP
 from scapy.layers.inet6 import IPv6
-from scapy.layers.l2 import Ether
+from scapy.layers.l2 import Ether, GRE
 from scapy.packet import Raw
 
 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
@@ -25,8 +25,11 @@ from util import ppp
 from vpp_ip import DpoProto
 from vpp_ip_route import VppIpRoute, VppRoutePath
 from vpp_lo_interface import VppLoInterface
-from vpp_papi_provider import UnexpectedApiReturnValueError
+from vpp_papi_provider import UnexpectedApiReturnValueError, \
+    CliFailedCommandError
 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
+from vpp_gre_interface import VppGreInterface
+from vpp_papi import VppEnum
 
 USEC_IN_SEC = 1000000
 
@@ -272,7 +275,8 @@ class BFDAPITestCase(VppTestCase):
         self.assertFalse(echo_source.have_usable_ip4)
         self.assertFalse(echo_source.have_usable_ip6)
 
-        self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
+        self.vapi.bfd_udp_set_echo_source(
+            sw_if_index=self.loopback0.sw_if_index)
         echo_source = self.vapi.bfd_udp_get_echo_source()
         self.assertTrue(echo_source.is_set)
         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
@@ -286,7 +290,7 @@ class BFDAPITestCase(VppTestCase):
         self.assertTrue(echo_source.is_set)
         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
         self.assertTrue(echo_source.have_usable_ip4)
-        self.assertEqual(echo_source.ip4_addr, echo_ip4)
+        self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
         self.assertFalse(echo_source.have_usable_ip6)
 
         self.loopback0.config_ip6()
@@ -297,9 +301,9 @@ class BFDAPITestCase(VppTestCase):
         self.assertTrue(echo_source.is_set)
         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
         self.assertTrue(echo_source.have_usable_ip4)
-        self.assertEqual(echo_source.ip4_addr, echo_ip4)
+        self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
         self.assertTrue(echo_source.have_usable_ip6)
-        self.assertEqual(echo_source.ip6_addr, echo_ip6)
+        self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
 
         self.vapi.bfd_udp_del_echo_source()
         echo_source = self.vapi.bfd_udp_get_echo_source()
@@ -312,12 +316,17 @@ class BFDTestSession(object):
     """ BFD session as seen from test framework side """
 
     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
-                 bfd_key_id=None, our_seq_number=None):
+                 bfd_key_id=None, our_seq_number=None,
+                 tunnel_header=None, phy_interface=None):
         self.test = test
         self.af = af
         self.sha1_key = sha1_key
         self.bfd_key_id = bfd_key_id
         self.interface = interface
+        if phy_interface:
+            self.phy_interface = phy_interface
+        else:
+            self.phy_interface = self.interface
         self.udp_sport = randint(49152, 65535)
         if our_seq_number is None:
             self.our_seq_number = randint(0, 40000000)
@@ -333,6 +342,7 @@ class BFDTestSession(object):
         self.your_discriminator = None
         self.state = BFDState.down
         self.auth_type = BFDAuthType.no_auth
+        self.tunnel_header = tunnel_header
 
     def inc_seq_num(self):
         """ increment sequence number, wrapping if needed """
@@ -418,17 +428,19 @@ class BFDTestSession(object):
             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
         else:
             bfd = BFD()
+        packet = Ether(src=self.phy_interface.remote_mac,
+                       dst=self.phy_interface.local_mac)
+        if self.tunnel_header:
+            packet = packet / self.tunnel_header
         if self.af == AF_INET6:
-            packet = (Ether(src=self.interface.remote_mac,
-                            dst=self.interface.local_mac) /
+            packet = (packet /
                       IPv6(src=self.interface.remote_ip6,
                            dst=self.interface.local_ip6,
                            hlim=255) /
                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
                       bfd)
         else:
-            packet = (Ether(src=self.interface.remote_mac,
-                            dst=self.interface.local_mac) /
+            packet = (packet /
                       IP(src=self.interface.remote_ip4,
                          dst=self.interface.local_ip4,
                          ttl=255) /
@@ -450,7 +462,7 @@ class BFDTestSession(object):
         if packet is None:
             packet = self.create_packet()
         if interface is None:
-            interface = self.test.pg0
+            interface = self.phy_interface
         self.test.logger.debug(ppp("Sending packet:", packet))
         interface.add_stream(packet)
         self.test.pg_start()
@@ -514,7 +526,7 @@ class BFDTestSession(object):
 def bfd_session_up(test):
     """ Bring BFD session up """
     test.logger.info("BFD: Waiting for slow hello")
-    p = wait_for_bfd_packet(test, 2)
+    p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
     old_offset = None
     if hasattr(test, 'vpp_clock_offset'):
         old_offset = test.vpp_clock_offset
@@ -589,13 +601,13 @@ def verify_ip(test, packet):
     """ Verify correctness of IP layer. """
     if test.vpp_session.af == AF_INET6:
         ip = packet[IPv6]
-        local_ip = test.pg0.local_ip6
-        remote_ip = test.pg0.remote_ip6
+        local_ip = test.vpp_session.interface.local_ip6
+        remote_ip = test.vpp_session.interface.remote_ip6
         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
     else:
         ip = packet[IP]
-        local_ip = test.pg0.local_ip4
-        remote_ip = test.pg0.remote_ip4
+        local_ip = test.vpp_session.interface.local_ip4
+        remote_ip = test.vpp_session.interface.remote_ip4
         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
     test.assert_equal(ip.src, local_ip, "IP source address")
     test.assert_equal(ip.dst, remote_ip, "IP destination address")
@@ -616,24 +628,15 @@ def verify_event(test, event, expected_state):
     test.assert_equal(e.sw_if_index,
                       test.vpp_session.interface.sw_if_index,
                       "BFD interface index")
-    is_ipv6 = 0
-    if test.vpp_session.af == AF_INET6:
-        is_ipv6 = 1
-    test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
-    if test.vpp_session.af == AF_INET:
-        test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
-                          "Local IPv4 address")
-        test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
-                          "Peer IPv4 address")
-    else:
-        test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
-                          "Local IPv6 address")
-        test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
-                          "Peer IPv6 address")
+
+    test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
+                      "Local IPv6 address")
+    test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
+                      "Peer IPv6 address")
     test.assert_equal(e.state, expected_state, BFDState)
 
 
-def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
+def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
     """ wait for BFD packet and verify its correctness
 
     :param timeout: how long to wait
@@ -659,6 +662,10 @@ def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
                                   (p.time, pcap_time_min), p))
         else:
             break
+    if is_tunnel:
+        # strip an IP layer and move to the next
+        p = p[IP].payload
+
     bfd = p[BFD]
     if bfd is None:
         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
@@ -713,7 +720,7 @@ class BFD4TestCase(VppTestCase):
             self.vpp_session.add_vpp_config()
             self.vpp_session.admin_up()
             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
-        except:
+        except BaseException:
             self.vapi.want_bfd_events(enable_disable=0)
             raise
 
@@ -1145,7 +1152,8 @@ class BFD4TestCase(VppTestCase):
                           self.vpp_session.required_min_rx,
                           "BFD required min rx interval")
         self.test_session.send_packet()
-        self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
+        self.vapi.bfd_udp_set_echo_source(
+            sw_if_index=self.loopback0.sw_if_index)
         echo_seen = False
         # should be turned on - loopback echo packets
         for dummy in range(3):
@@ -1190,7 +1198,8 @@ class BFD4TestCase(VppTestCase):
         self.test_session.send_packet()
         detection_time = self.test_session.detect_mult *\
             self.vpp_session.required_min_rx / USEC_IN_SEC
-        self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
+        self.vapi.bfd_udp_set_echo_source(
+            sw_if_index=self.loopback0.sw_if_index)
         # echo function should be used now, but we will drop the echo packets
         verified_diag = False
         for dummy in range(3):
@@ -1228,7 +1237,8 @@ class BFD4TestCase(VppTestCase):
         bfd_session_up(self)
         self.test_session.update(required_min_echo_rx=150000)
         self.test_session.send_packet()
-        self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
+        self.vapi.bfd_udp_set_echo_source(
+            sw_if_index=self.loopback0.sw_if_index)
         # wait for first echo packet
         while True:
             p = self.pg0.wait_for_packet(1)
@@ -1260,7 +1270,8 @@ class BFD4TestCase(VppTestCase):
         bfd_session_up(self)
         self.test_session.update(required_min_echo_rx=150000)
         self.test_session.send_packet()
-        self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
+        self.vapi.bfd_udp_set_echo_source(
+            sw_if_index=self.loopback0.sw_if_index)
         # wait for first echo packet
         while True:
             p = self.pg0.wait_for_packet(1)
@@ -1291,7 +1302,8 @@ class BFD4TestCase(VppTestCase):
         """ stale echo packets don't keep a session up """
         bfd_session_up(self)
         self.test_session.update(required_min_echo_rx=150000)
-        self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
+        self.vapi.bfd_udp_set_echo_source(
+            sw_if_index=self.loopback0.sw_if_index)
         self.test_session.send_packet()
         # should be turned on - loopback echo packets
         echo_packet = None
@@ -1344,7 +1356,8 @@ class BFD4TestCase(VppTestCase):
         """ echo packets with invalid checksum don't keep a session up """
         bfd_session_up(self)
         self.test_session.update(required_min_echo_rx=150000)
-        self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
+        self.vapi.bfd_udp_set_echo_source(
+            sw_if_index=self.loopback0.sw_if_index)
         self.test_session.send_packet()
         # should be turned on - loopback echo packets
         timeout_at = None
@@ -1525,7 +1538,7 @@ class BFD6TestCase(VppTestCase):
             self.vpp_session.admin_up()
             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
             self.logger.debug(self.vapi.cli("show adj nbr"))
-        except:
+        except BaseException:
             self.vapi.want_bfd_events(enable_disable=0)
             raise
 
@@ -1641,7 +1654,8 @@ class BFD6TestCase(VppTestCase):
                           self.vpp_session.required_min_rx,
                           "BFD required min rx interval")
         self.test_session.send_packet()
-        self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
+        self.vapi.bfd_udp_set_echo_source(
+            sw_if_index=self.loopback0.sw_if_index)
         echo_seen = False
         # should be turned on - loopback echo packets
         for dummy in range(3):
@@ -1723,7 +1737,7 @@ class BFDFIBTestCase(VppTestCase):
 
     def tearDown(self):
         if not self.vpp_dead:
-            self.vapi.want_bfd_events(enable_disable=0)
+            self.vapi.want_bfd_events(enable_disable=False)
 
         super(BFDFIBTestCase, self).tearDown()
 
@@ -1741,24 +1755,20 @@ class BFDFIBTestCase(VppTestCase):
         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
               IPv6(src="3001::1", dst="2001::1") /
               UDP(sport=1234, dport=1234) /
-              Raw('\xa5' * 100)),
+              Raw(b'\xa5' * 100)),
              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
               IPv6(src="3001::1", dst="2002::1") /
               UDP(sport=1234, dport=1234) /
-              Raw('\xa5' * 100))]
+              Raw(b'\xa5' * 100))]
 
         # A recursive and a non-recursive route via a next-hop that
         # will have a BFD session
         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
                                   [VppRoutePath(self.pg0.remote_ip6,
-                                                self.pg0.sw_if_index,
-                                                proto=DpoProto.DPO_PROTO_IP6)],
-                                  is_ip6=1)
+                                                self.pg0.sw_if_index)])
         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
                                   [VppRoutePath(self.pg0.remote_ip6,
-                                                0xffffffff,
-                                                proto=DpoProto.DPO_PROTO_IP6)],
-                                  is_ip6=1)
+                                                0xffffffff)])
         ip_2001_s_64.add_vpp_config()
         ip_2002_s_64.add_vpp_config()
 
@@ -1804,6 +1814,87 @@ class BFDFIBTestCase(VppTestCase):
                              packet[IPv6].dst)
 
 
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
+class BFDTunTestCase(VppTestCase):
+    """ BFD over GRE tunnel """
+
+    vpp_session = None
+    test_session = None
+
+    @classmethod
+    def setUpClass(cls):
+        super(BFDTunTestCase, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(BFDTunTestCase, cls).tearDownClass()
+
+    def setUp(self):
+        super(BFDTunTestCase, self).setUp()
+        self.create_pg_interfaces(range(1))
+
+        self.vapi.want_bfd_events()
+        self.pg0.enable_capture()
+
+        for i in self.pg_interfaces:
+            i.admin_up()
+            i.config_ip4()
+            i.resolve_arp()
+
+    def tearDown(self):
+        if not self.vpp_dead:
+            self.vapi.want_bfd_events(enable_disable=0)
+
+        super(BFDTunTestCase, self).tearDown()
+
+    @staticmethod
+    def pkt_is_not_data_traffic(p):
+        """ not data traffic implies BFD or the usual IPv6 ND/RA"""
+        if p.haslayer(BFD) or is_ipv6_misc(p):
+            return True
+        return False
+
+    def test_bfd_o_gre(self):
+        """ BFD-o-GRE  """
+
+        # A GRE interface over which to run a BFD session
+        gre_if = VppGreInterface(self,
+                                 self.pg0.local_ip4,
+                                 self.pg0.remote_ip4)
+        gre_if.add_vpp_config()
+        gre_if.admin_up()
+        gre_if.config_ip4()
+
+        # bring the session up now the routes are present
+        self.vpp_session = VppBFDUDPSession(self,
+                                            gre_if,
+                                            gre_if.remote_ip4,
+                                            is_tunnel=True)
+        self.vpp_session.add_vpp_config()
+        self.vpp_session.admin_up()
+
+        self.test_session = BFDTestSession(
+            self, gre_if, AF_INET,
+            tunnel_header=(IP(src=self.pg0.remote_ip4,
+                              dst=self.pg0.local_ip4) /
+                           GRE()),
+            phy_interface=self.pg0)
+
+        # packets to match against both of the routes
+        p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+              IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
+              UDP(sport=1234, dport=1234) /
+              Raw(b'\xa5' * 100))]
+
+        # session is up - traffic passes
+        bfd_session_up(self)
+
+        self.send_and_expect(self.pg0, p, self.pg0)
+
+        # bring session down
+        bfd_session_down(self)
+
+
 @unittest.skipUnless(running_extended_tests, "part of extended tests")
 class BFDSHA1TestCase(VppTestCase):
     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
@@ -1839,7 +1930,7 @@ class BFDSHA1TestCase(VppTestCase):
 
     def tearDown(self):
         if not self.vpp_dead:
-            self.vapi.want_bfd_events(enable_disable=0)
+            self.vapi.want_bfd_events(enable_disable=False)
         self.vapi.collect_events()  # clear the event queue
         super(BFDSHA1TestCase, self).tearDown()
 
@@ -2076,7 +2167,7 @@ class BFDAuthOnOffTestCase(VppTestCase):
 
     def tearDown(self):
         if not self.vpp_dead:
-            self.vapi.want_bfd_events(enable_disable=0)
+            self.vapi.want_bfd_events(enable_disable=False)
         self.vapi.collect_events()  # clear the event queue
         super(BFDAuthOnOffTestCase, self).tearDown()
 
@@ -2286,7 +2377,7 @@ class BFDCLITestCase(VppTestCase):
 
     def tearDown(self):
         try:
-            self.vapi.want_bfd_events(enable_disable=0)
+            self.vapi.want_bfd_events(enable_disable=False)
         except UnexpectedApiReturnValueError:
             # some tests aren't subscribed, so this is not an issue
             pass
@@ -2301,7 +2392,11 @@ class BFDCLITestCase(VppTestCase):
 
     def cli_verify_response(self, cli, expected):
         """ execute a CLI, asserting that the response matches expectation """
-        self.assert_equal(self.vapi.cli(cli).strip(),
+        try:
+            reply = self.vapi.cli(cli)
+        except CliFailedCommandError as cli_error:
+            reply = str(cli_error)
+        self.assert_equal(reply.strip(),
                           expected,
                           "CLI command response")
 
@@ -2675,5 +2770,6 @@ class BFDCLITestCase(VppTestCase):
         self.cli_verify_response("show bfd echo-source",
                                  "UDP echo source is not set.")
 
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)