VPP-1508: Use scapy.compat to manage packet level library differences.
[vpp.git] / test / test_srv6.py
index 3e5f856..cc53c6b 100644 (file)
@@ -9,6 +9,7 @@ from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable
 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
 
 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
 
+import scapy.compat
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether, Dot1Q
 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether, Dot1Q
 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
@@ -1224,14 +1225,14 @@ class TestSRv6(VppTestCase):
 
         # add classify table
         # mask on dst ip address prefix a7::/8
 
         # add classify table
         # mask on dst ip address prefix a7::/8
-        mask = '{:0<16}'.format('ff')
+        mask = '{!s:0<16}'.format('ff')
         r = self.vapi.classify_add_del_table(
             1,
             binascii.unhexlify(mask),
             match_n_vectors=(len(mask) - 1) // 32 + 1,
             current_data_flag=1,
             skip_n_vectors=2)  # data offset
         r = self.vapi.classify_add_del_table(
             1,
             binascii.unhexlify(mask),
             match_n_vectors=(len(mask) - 1) // 32 + 1,
             current_data_flag=1,
             skip_n_vectors=2)  # data offset
-        self.assertIsNotNone(r, msg='No response msg for add_del_table')
+        self.assertIsNotNone(r, 'No response msg for add_del_table')
         table_index = r.new_table_index
 
         # add the source routign node as a ip6 inacl netxt node
         table_index = r.new_table_index
 
         # add the source routign node as a ip6 inacl netxt node
@@ -1239,7 +1240,7 @@ class TestSRv6(VppTestCase):
                                     'sr-pl-rewrite-insert')
         inacl_next_node_index = r.node_index
 
                                     'sr-pl-rewrite-insert')
         inacl_next_node_index = r.node_index
 
-        match = '{:0<16}'.format('a7')
+        match = '{!s:0<16}'.format('a7')
         r = self.vapi.classify_add_del_session(
             1,
             table_index,
         r = self.vapi.classify_add_del_session(
             1,
             table_index,
@@ -1247,7 +1248,7 @@ class TestSRv6(VppTestCase):
             hit_next_index=inacl_next_node_index,
             action=3,
             metadata=0)  # sr policy index
             hit_next_index=inacl_next_node_index,
             action=3,
             metadata=0)  # sr policy index
-        self.assertIsNotNone(r, msg='No response msg for add_del_session')
+        self.assertIsNotNone(r, 'No response msg for add_del_session')
 
         # log the classify table used in the steering policy
         self.logger.info(self.vapi.cli("show classify table"))
 
         # log the classify table used in the steering policy
         self.logger.info(self.vapi.cli("show classify table"))
@@ -1257,7 +1258,7 @@ class TestSRv6(VppTestCase):
             sw_if_index=self.pg3.sw_if_index,
             ip6_table_index=table_index)
         self.assertIsNotNone(r,
             sw_if_index=self.pg3.sw_if_index,
             ip6_table_index=table_index)
         self.assertIsNotNone(r,
-                             msg='No response msg for input_acl_set_interface')
+                             'No response msg for input_acl_set_interface')
 
         # log the ip6 inacl
         self.logger.info(self.vapi.cli("show inacl type ip6"))
 
         # log the ip6 inacl
         self.logger.info(self.vapi.cli("show inacl type ip6"))
@@ -1292,7 +1293,7 @@ class TestSRv6(VppTestCase):
             sw_if_index=self.pg3.sw_if_index,
             ip6_table_index=table_index)
         self.assertIsNotNone(r,
             sw_if_index=self.pg3.sw_if_index,
             ip6_table_index=table_index)
         self.assertIsNotNone(r,
-                             msg='No response msg for input_acl_set_interface')
+                             'No response msg for input_acl_set_interface')
 
         # log the ip6 inacl after cleaning
         self.logger.info(self.vapi.cli("show inacl type ip6"))
 
         # log the ip6 inacl after cleaning
         self.logger.info(self.vapi.cli("show inacl type ip6"))
@@ -1313,13 +1314,13 @@ class TestSRv6(VppTestCase):
             0,
             table_index,
             binascii.unhexlify(match))
             0,
             table_index,
             binascii.unhexlify(match))
-        self.assertIsNotNone(r, msg='No response msg for add_del_session')
+        self.assertIsNotNone(r, 'No response msg for add_del_session')
 
         r = self.vapi.classify_add_del_table(
             0,
             binascii.unhexlify(mask),
             table_index=table_index)
 
         r = self.vapi.classify_add_del_table(
             0,
             binascii.unhexlify(mask),
             table_index=table_index)
-        self.assertIsNotNone(r, msg='No response msg for add_del_table')
+        self.assertIsNotNone(r, 'No response msg for add_del_table')
 
         self.logger.info(self.vapi.cli("show classify table"))
 
 
         self.logger.info(self.vapi.cli("show classify table"))
 
@@ -1437,7 +1438,7 @@ class TestSRv6(VppTestCase):
         tx_ip.chksum = None
         # read back the pkt (with str()) to force computing these fields
         # probably other ways to accomplish this are possible
         tx_ip.chksum = None
         # read back the pkt (with str()) to force computing these fields
         # probably other ways to accomplish this are possible
-        tx_ip = IP(str(tx_ip))
+        tx_ip = IP(scapy.compat.raw(tx_ip))
 
         self.assertEqual(rx_srh.payload, tx_ip)
 
 
         self.assertEqual(rx_srh.payload, tx_ip)
 
@@ -1489,7 +1490,7 @@ class TestSRv6(VppTestCase):
         self.assertEqual(rx_srh.nh, 59)
 
         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
         self.assertEqual(rx_srh.nh, 59)
 
         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
-        self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
+        self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
 
         self.logger.debug("packet verification: SUCCESS")
 
 
         self.logger.debug("packet verification: SUCCESS")
 
@@ -1765,7 +1766,7 @@ class TestSRv6(VppTestCase):
         tx_ip2.chksum = None
         # read back the pkt (with str()) to force computing these fields
         # probably other ways to accomplish this are possible
         tx_ip2.chksum = None
         # read back the pkt (with str()) to force computing these fields
         # probably other ways to accomplish this are possible
-        tx_ip2 = IP(str(tx_ip2))
+        tx_ip2 = IP(scapy.compat.raw(tx_ip2))
 
         self.assertEqual(rx_ip, tx_ip2)
 
 
         self.assertEqual(rx_ip, tx_ip2)
 
@@ -1791,7 +1792,7 @@ class TestSRv6(VppTestCase):
         tx_ip = tx_pkt.getlayer(IPv6)
         # we can't just get the 2nd Ether layer
         # get the Raw content and dissect it as Ether
         tx_ip = tx_pkt.getlayer(IPv6)
         # we can't just get the 2nd Ether layer
         # get the Raw content and dissect it as Ether
-        tx_eth1 = Ether(str(tx_pkt[Raw]))
+        tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
 
         # verify if rx'ed packet has no SRH
         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
 
         # verify if rx'ed packet has no SRH
         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
@@ -1837,7 +1838,7 @@ class TestSRv6(VppTestCase):
             # read back the dumped packet (with str())
             # to force computing these fields
             # probably other ways are possible
             # read back the dumped packet (with str())
             # to force computing these fields
             # probably other ways are possible
-            p = Ether(str(p))
+            p = Ether(scapy.compat.raw(p))
             payload_info.data = p.copy()
             self.logger.debug(ppp("Created packet:", p))
             pkts.append(p)
             payload_info.data = p.copy()
             self.logger.debug(ppp("Created packet:", p))
             pkts.append(p)
@@ -2080,14 +2081,14 @@ class TestSRv6(VppTestCase):
         # but packet[Raw] gives the complete payload
         # (incl L2 header) for the T.Encaps L2 case
         try:
         # but packet[Raw] gives the complete payload
         # (incl L2 header) for the T.Encaps L2 case
         try:
-            payload_info = self.payload_to_info(str(packet[Raw]))
+            payload_info = self.payload_to_info(packet[Raw])
 
         except:
             # remote L2 header from packet[Raw]:
             # take packet[Raw], convert it to an Ether layer
             # and then extract Raw from it
             payload_info = self.payload_to_info(
 
         except:
             # remote L2 header from packet[Raw]:
             # take packet[Raw], convert it to an Ether layer
             # and then extract Raw from it
             payload_info = self.payload_to_info(
-                str(Ether(str(packet[Raw]))[Raw]))
+                Ether(scapy.compat.r(packet[Raw]))[Raw])
 
         return payload_info
 
 
         return payload_info
 
@@ -2101,7 +2102,7 @@ class TestSRv6(VppTestCase):
         :param compare_func: function to compare in and out packet
         """
         self.logger.info("Verifying capture on interface %s using function %s"
         :param compare_func: function to compare in and out packet
         """
         self.logger.info("Verifying capture on interface %s using function %s"
-                         % (dst_if.name, compare_func.func_name))
+                         % (dst_if.name, compare_func.__name__))
 
         last_info = dict()
         for i in self.pg_interfaces:
 
         last_info = dict()
         for i in self.pg_interfaces:
@@ -2144,7 +2145,6 @@ class TestSRv6(VppTestCase):
                 compare_func(txed_packet, packet)
 
             except:
                 compare_func(txed_packet, packet)
 
             except:
-                print packet.command()
                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
                 raise
 
                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
                 raise