VPP-1508: Tests: Fix vpp_api struct.error under py3.
[vpp.git] / test / test_nat.py
index e368fe0..f4978cf 100644 (file)
@@ -6,6 +6,8 @@ import struct
 import random
 
 from framework import VppTestCase, VppTestRunner, running_extended_tests
+
+import scapy.compat
 from scapy.layers.inet import IP, TCP, UDP, ICMP
 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
@@ -320,7 +322,8 @@ class MethodHolder(VppTestCase):
             pref_n[13] = ip4_n[1]
             pref_n[14] = ip4_n[2]
             pref_n[15] = ip4_n[3]
-        return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
+        packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
+        return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
 
     def extract_ip4(self, ip6, plen):
         """
@@ -689,8 +692,8 @@ class MethodHolder(VppTestCase):
             p = (IP(src=src_if.remote_ip4, dst=dst) /
                  TCP(sport=sport, dport=dport) /
                  Raw(data))
-            p = p.__class__(str(p))
-            chksum = p['TCP'].chksum
+            p = p.__class__(scapy.compat.raw(p))
+            chksum = p[TCP].chksum
             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
         elif proto == IP_PROTOS.udp:
             proto_header = UDP(sport=sport, dport=dport)
@@ -869,8 +872,8 @@ class MethodHolder(VppTestCase):
         self.assertEqual(6, len(data))
         for record in data:
             # natEvent
-            self.assertIn(ord(record[230]), [4, 5])
-            if ord(record[230]) == 4:
+            self.assertIn(scapy.compat.orb(record[230]), [4, 5])
+            if scapy.compat.orb(record[230]) == 4:
                 nat44_ses_create_num += 1
             else:
                 nat44_ses_delete_num += 1
@@ -882,16 +885,16 @@ class MethodHolder(VppTestCase):
             # ingressVRFID
             self.assertEqual(struct.pack("!I", 0), record[234])
             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
-            if IP_PROTOS.icmp == ord(record[4]):
+            if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
                                  record[227])
-            elif IP_PROTOS.tcp == ord(record[4]):
+            elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
                                  record[7])
                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
                                  record[227])
-            elif IP_PROTOS.udp == ord(record[4]):
+            elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
                 self.assertEqual(struct.pack("!H", self.udp_port_in),
                                  record[7])
                 self.assertEqual(struct.pack("!H", self.udp_port_out),
@@ -910,7 +913,7 @@ class MethodHolder(VppTestCase):
         self.assertEqual(1, len(data))
         record = data[0]
         # natEvent
-        self.assertEqual(ord(record[230]), 3)
+        self.assertEqual(scapy.compat.orb(record[230]), 3)
         # natPoolID
         self.assertEqual(struct.pack("!I", 0), record[283])
 
@@ -924,7 +927,7 @@ class MethodHolder(VppTestCase):
         self.assertEqual(1, len(data))
         record = data[0]
         # natEvent
-        self.assertEqual(ord(record[230]), 13)
+        self.assertEqual(scapy.compat.orb(record[230]), 13)
         # natQuotaExceededEvent
         self.assertEqual(struct.pack("I", 1), record[466])
         # maxSessionEntries
@@ -940,7 +943,7 @@ class MethodHolder(VppTestCase):
         self.assertEqual(1, len(data))
         record = data[0]
         # natEvent
-        self.assertEqual(ord(record[230]), 13)
+        self.assertEqual(scapy.compat.orb(record[230]), 13)
         # natQuotaExceededEvent
         self.assertEqual(struct.pack("I", 2), record[466])
         # maxBIBEntries
@@ -957,7 +960,7 @@ class MethodHolder(VppTestCase):
         self.assertEqual(1, len(data))
         record = data[0]
         # natEvent
-        self.assertEqual(ord(record[230]), 13)
+        self.assertEqual(scapy.compat.orb(record[230]), 13)
         # natQuotaExceededEvent
         self.assertEqual(struct.pack("I", 5), record[466])
         # maxFragmentsPendingReassembly
@@ -976,7 +979,7 @@ class MethodHolder(VppTestCase):
         self.assertEqual(1, len(data))
         record = data[0]
         # natEvent
-        self.assertEqual(ord(record[230]), 13)
+        self.assertEqual(scapy.compat.orb(record[230]), 13)
         # natQuotaExceededEvent
         self.assertEqual(struct.pack("I", 5), record[466])
         # maxFragmentsPendingReassembly
@@ -996,15 +999,15 @@ class MethodHolder(VppTestCase):
         record = data[0]
         # natEvent
         if is_create:
-            self.assertEqual(ord(record[230]), 10)
+            self.assertEqual(scapy.compat.orb(record[230]), 10)
         else:
-            self.assertEqual(ord(record[230]), 11)
+            self.assertEqual(scapy.compat.orb(record[230]), 11)
         # sourceIPv6Address
         self.assertEqual(src_addr, record[27])
         # postNATSourceIPv4Address
         self.assertEqual(self.nat_addr_n, record[225])
         # protocolIdentifier
-        self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
+        self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
         # ingressVRFID
         self.assertEqual(struct.pack("!I", 0), record[234])
         # sourceTransportPort
@@ -1027,9 +1030,9 @@ class MethodHolder(VppTestCase):
         record = data[0]
         # natEvent
         if is_create:
-            self.assertEqual(ord(record[230]), 6)
+            self.assertEqual(scapy.compat.orb(record[230]), 6)
         else:
-            self.assertEqual(ord(record[230]), 7)
+            self.assertEqual(scapy.compat.orb(record[230]), 7)
         # sourceIPv6Address
         self.assertEqual(src_addr, record[27])
         # destinationIPv6Address
@@ -1044,7 +1047,7 @@ class MethodHolder(VppTestCase):
         self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
                          record[226])
         # protocolIdentifier
-        self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
+        self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
         # ingressVRFID
         self.assertEqual(struct.pack("!I", 0), record[234])
         # sourceTransportPort
@@ -1076,7 +1079,7 @@ class MethodHolder(VppTestCase):
         self.assertEqual(1, len(data))
         record = data[0]
         # natEvent
-        self.assertEqual(ord(record[230]), 13)
+        self.assertEqual(scapy.compat.orb(record[230]), 13)
         # natQuotaExceededEvent
         self.assertEqual(struct.pack("I", 3), record[466])
         # maxEntriesPerUser
@@ -1167,9 +1170,9 @@ class MethodHolder(VppTestCase):
         layer = self.proto2layer(proto)
 
         if proto == IP_PROTOS.tcp:
-            data = "A" * 4 + "B" * 16 + "C" * 3
+            data = b"A" * 4 + b"B" * 16 + b"C" * 3
         else:
-            data = "A" * 16 + "B" * 16 + "C" * 3
+            data = b"A" * 16 + b"B" * 16 + b"C" * 3
         self.port_in = random.randint(1025, 65535)
 
         reass = self.vapi.nat_reass_dump()
@@ -1248,9 +1251,9 @@ class MethodHolder(VppTestCase):
         layer = self.proto2layer(proto)
 
         if proto == IP_PROTOS.tcp:
-            data = "A" * 4 + "B" * 16 + "C" * 3
+            data = b"A" * 4 + b"B" * 16 + b"C" * 3
         else:
-            data = "A" * 16 + "B" * 16 + "C" * 3
+            data = b"A" * 16 + b"B" * 16 + b"C" * 3
         self.port_in = random.randint(1025, 65535)
 
         for i in range(2):
@@ -1317,9 +1320,9 @@ class MethodHolder(VppTestCase):
         layer = self.proto2layer(proto)
 
         if proto == IP_PROTOS.tcp:
-            data = "A" * 4 + "B" * 16 + "C" * 3
+            data = b"A" * 4 + b"B" * 16 + b"C" * 3
         else:
-            data = "A" * 16 + "B" * 16 + "C" * 3
+            data = b"A" * 16 + b"B" * 16 + b"C" * 3
 
         # send packet from host to server
         pkts = self.create_stream_frag(self.pg0,
@@ -1346,9 +1349,9 @@ class MethodHolder(VppTestCase):
         layer = self.proto2layer(proto)
 
         if proto == IP_PROTOS.tcp:
-            data = "A" * 4 + "B" * 16 + "C" * 3
+            data = b"A" * 4 + b"B" * 16 + b"C" * 3
         else:
-            data = "A" * 16 + "B" * 16 + "C" * 3
+            data = b"A" * 16 + b"B" * 16 + b"C" * 3
         self.port_in = random.randint(1025, 65535)
 
         for i in range(2):
@@ -1422,9 +1425,9 @@ class MethodHolder(VppTestCase):
         layer = self.proto2layer(proto)
 
         if proto == IP_PROTOS.tcp:
-            data = "A" * 4 + "B" * 16 + "C" * 3
+            data = b"A" * 4 + b"B" * 16 + b"C" * 3
         else:
-            data = "A" * 16 + "B" * 16 + "C" * 3
+            data = b"A" * 16 + b"B" * 16 + b"C" * 3
         self.port_in = random.randint(1025, 65535)
 
         for i in range(2):
@@ -1863,7 +1866,7 @@ class TestNAT44(MethodHolder):
                                                   is_inside=0)
         sm = self.vapi.nat44_static_mapping_dump()
         self.assertEqual(len(sm), 1)
-        self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
+        self.assertEqual((sm[0].tag).split(b'\0', 1)[0], b'')
         self.assertEqual(sm[0].protocol, 0)
         self.assertEqual(sm[0].local_port, 0)
         self.assertEqual(sm[0].external_port, 0)
@@ -1891,7 +1894,7 @@ class TestNAT44(MethodHolder):
         self.tcp_port_out = 6303
         self.udp_port_out = 6304
         self.icmp_id_out = 6305
-        tag = "testTAG"
+        tag = b"testTAG"
 
         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
@@ -1899,7 +1902,7 @@ class TestNAT44(MethodHolder):
                                                   is_inside=0)
         sm = self.vapi.nat44_static_mapping_dump()
         self.assertEqual(len(sm), 1)
-        self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
+        self.assertEqual((sm[0].tag).split(b'\0', 1)[0], tag)
 
         # out2in
         pkts = self.create_stream_out(self.pg1, nat_ip)
@@ -2597,7 +2600,7 @@ class TestNAT44(MethodHolder):
 
     def test_interface_addr_static_mapping(self):
         """ Static mapping with addresses from interface """
-        tag = "testTAG"
+        tag = b"testTAG"
 
         self.vapi.nat44_add_del_interface_addr(self.pg7.sw_if_index)
         self.nat44_add_static_mapping(
@@ -2610,7 +2613,7 @@ class TestNAT44(MethodHolder):
         self.assertEqual(1, len(static_mappings))
         self.assertEqual(self.pg7.sw_if_index,
                          static_mappings[0].external_sw_if_index)
-        self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
+        self.assertEqual((static_mappings[0].tag).split(b'\0', 1)[0], tag)
 
         # configure interface address and check static mappings
         self.pg7.config_ip4()
@@ -2621,7 +2624,7 @@ class TestNAT44(MethodHolder):
             if sm.external_sw_if_index == 0xFFFFFFFF:
                 self.assertEqual(sm.external_ip_address[0:4],
                                  self.pg7.local_ip4n)
-                self.assertEqual((sm.tag).split('\0', 1)[0], tag)
+                self.assertEqual((sm.tag).split(b'\0', 1)[0], tag)
                 resolved = True
         self.assertTrue(resolved)
 
@@ -2631,7 +2634,7 @@ class TestNAT44(MethodHolder):
         self.assertEqual(1, len(static_mappings))
         self.assertEqual(self.pg7.sw_if_index,
                          static_mappings[0].external_sw_if_index)
-        self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
+        self.assertEqual((static_mappings[0].tag).split(b'\0', 1)[0], tag)
 
         # configure interface address again and check static mappings
         self.pg7.config_ip4()
@@ -2642,7 +2645,7 @@ class TestNAT44(MethodHolder):
             if sm.external_sw_if_index == 0xFFFFFFFF:
                 self.assertEqual(sm.external_ip_address[0:4],
                                  self.pg7.local_ip4n)
-                self.assertEqual((sm.tag).split('\0', 1)[0], tag)
+                self.assertEqual((sm.tag).split(b'\0', 1)[0], tag)
                 resolved = True
         self.assertTrue(resolved)
 
@@ -3539,7 +3542,7 @@ class TestNAT44(MethodHolder):
                                                   is_inside=0)
         self.vapi.nat44_forwarding_enable_disable(1)
 
-        data = "A" * 16 + "B" * 16 + "C" * 3
+        data = b"A" * 16 + b"B" * 16 + b"C" * 3
         pkts = self.create_stream_frag(self.pg1,
                                        self.pg0.remote_ip4,
                                        4789,
@@ -3667,7 +3670,7 @@ class TestNAT44(MethodHolder):
         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
                                            src_port=self.ipfix_src_port)
 
-        data = "A" * 4 + "B" * 16 + "C" * 3
+        data = b"A" * 4 + b"B" * 16 + b"C" * 3
         self.tcp_port_in = random.randint(1025, 65535)
         pkts = self.create_stream_frag(self.pg0,
                                        self.pg1.remote_ip4,
@@ -6455,7 +6458,7 @@ class TestNAT44Out2InDPO(MethodHolder):
             cls.pg1.config_ip6()
             cls.pg1.resolve_ndp()
 
-            cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00' * 16,
+            cls.vapi.ip_add_del_route(is_ipv6=True, dst_address=b'\x00' * 16,
                                       dst_address_length=0,
                                       next_hop_address=cls.pg1.remote_ip6n,
                                       next_hop_sw_if_index=cls.pg1.sw_if_index)
@@ -7285,9 +7288,9 @@ class TestNAT64(MethodHolder):
         self.assertTrue(pg1_found)
 
         features = self.vapi.cli("show interface features pg0")
-        self.assertNotEqual(features.find('nat64-in2out'), -1)
+        self.assertIn('nat64-in2out', features)
         features = self.vapi.cli("show interface features pg1")
-        self.assertNotEqual(features.find('nat64-out2in'), -1)
+        self.assertIn('nat64-out2in', features)
 
         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
@@ -8043,7 +8046,7 @@ class TestNAT64(MethodHolder):
         reass_n_start = len(reass)
 
         # in2out
-        data = 'a' * 200
+        data = b'a' * 200
         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
                                            self.tcp_port_in, 20, data)
         self.pg0.add_stream(pkts)
@@ -8059,7 +8062,7 @@ class TestNAT64(MethodHolder):
         self.assertEqual(data, p[Raw].load)
 
         # out2in
-        data = "A" * 4 + "b" * 16 + "C" * 3
+        data = b"A" * 4 + b"b" * 16 + b"C" * 3
         pkts = self.create_stream_frag(self.pg1,
                                        self.nat_addr,
                                        20,
@@ -8127,7 +8130,7 @@ class TestNAT64(MethodHolder):
         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
 
         # in2out
-        data = 'a' * 200
+        data = b'a' * 200
         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
                                            self.tcp_port_in, 20, data)
         pkts.reverse()
@@ -8144,7 +8147,7 @@ class TestNAT64(MethodHolder):
         self.assertEqual(data, p[Raw].load)
 
         # out2in
-        data = "A" * 4 + "B" * 16 + "C" * 3
+        data = b"A" * 4 + b"B" * 16 + b"C" * 3
         pkts = self.create_stream_frag(self.pg1,
                                        self.nat_addr,
                                        20,
@@ -8283,7 +8286,7 @@ class TestNAT64(MethodHolder):
         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
                                            src_port=self.ipfix_src_port)
 
-        data = 'a' * 200
+        data = b'a' * 200
         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
                                            self.tcp_port_in, 20, data)
         pkts.reverse()
@@ -8358,9 +8361,9 @@ class TestNAT64(MethodHolder):
         for p in capture:
             if p.haslayer(Data):
                 data = ipfix.decode_data_set(p.getlayer(Set))
-                if ord(data[0][230]) == 10:
+                if scapy.compat.orb(data[0][230]) == 10:
                     self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
-                elif ord(data[0][230]) == 6:
+                elif scapy.compat.orb(data[0][230]) == 6:
                     self.verify_ipfix_nat64_ses(data,
                                                 1,
                                                 self.pg0.remote_ip6n,
@@ -8387,9 +8390,9 @@ class TestNAT64(MethodHolder):
                              self.ipfix_domain_id)
             if p.haslayer(Data):
                 data = ipfix.decode_data_set(p.getlayer(Set))
-                if ord(data[0][230]) == 11:
+                if scapy.compat.orb(data[0][230]) == 11:
                     self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
-                elif ord(data[0][230]) == 7:
+                elif scapy.compat.orb(data[0][230]) == 7:
                     self.verify_ipfix_nat64_ses(data,
                                                 0,
                                                 self.pg0.remote_ip6n,