VPP-1508: Use scapy.compat to manage packet level library differences.
[vpp.git] / test / test_nat.py
index 0ef3026..160890a 100644 (file)
@@ -6,6 +6,8 @@ import struct
 import random
 
 from framework import VppTestCase, VppTestRunner, running_extended_tests
+
+import scapy.compat
 from scapy.layers.inet import IP, TCP, UDP, ICMP
 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
@@ -320,7 +322,8 @@ class MethodHolder(VppTestCase):
             pref_n[13] = ip4_n[1]
             pref_n[14] = ip4_n[2]
             pref_n[15] = ip4_n[3]
-        return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
+        packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
+        return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
 
     def extract_ip4(self, ip6, plen):
         """
@@ -689,8 +692,8 @@ class MethodHolder(VppTestCase):
             p = (IP(src=src_if.remote_ip4, dst=dst) /
                  TCP(sport=sport, dport=dport) /
                  Raw(data))
-            p = p.__class__(str(p))
-            chksum = p['TCP'].chksum
+            p = p.__class__(scapy.compat.raw(p))
+            chksum = p[TCP].chksum
             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
         elif proto == IP_PROTOS.udp:
             proto_header = UDP(sport=sport, dport=dport)
@@ -869,8 +872,8 @@ class MethodHolder(VppTestCase):
         self.assertEqual(6, len(data))
         for record in data:
             # natEvent
-            self.assertIn(ord(record[230]), [4, 5])
-            if ord(record[230]) == 4:
+            self.assertIn(scapy.compat.orb(record[230]), [4, 5])
+            if scapy.compat.orb(record[230]) == 4:
                 nat44_ses_create_num += 1
             else:
                 nat44_ses_delete_num += 1
@@ -882,16 +885,16 @@ class MethodHolder(VppTestCase):
             # ingressVRFID
             self.assertEqual(struct.pack("!I", 0), record[234])
             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
-            if IP_PROTOS.icmp == ord(record[4]):
+            if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
                                  record[227])
-            elif IP_PROTOS.tcp == ord(record[4]):
+            elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
                                  record[7])
                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
                                  record[227])
-            elif IP_PROTOS.udp == ord(record[4]):
+            elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
                 self.assertEqual(struct.pack("!H", self.udp_port_in),
                                  record[7])
                 self.assertEqual(struct.pack("!H", self.udp_port_out),
@@ -910,7 +913,7 @@ class MethodHolder(VppTestCase):
         self.assertEqual(1, len(data))
         record = data[0]
         # natEvent
-        self.assertEqual(ord(record[230]), 3)
+        self.assertEqual(scapy.compat.orb(record[230]), 3)
         # natPoolID
         self.assertEqual(struct.pack("!I", 0), record[283])
 
@@ -924,7 +927,7 @@ class MethodHolder(VppTestCase):
         self.assertEqual(1, len(data))
         record = data[0]
         # natEvent
-        self.assertEqual(ord(record[230]), 13)
+        self.assertEqual(scapy.compat.orb(record[230]), 13)
         # natQuotaExceededEvent
         self.assertEqual(struct.pack("I", 1), record[466])
         # maxSessionEntries
@@ -940,7 +943,7 @@ class MethodHolder(VppTestCase):
         self.assertEqual(1, len(data))
         record = data[0]
         # natEvent
-        self.assertEqual(ord(record[230]), 13)
+        self.assertEqual(scapy.compat.orb(record[230]), 13)
         # natQuotaExceededEvent
         self.assertEqual(struct.pack("I", 2), record[466])
         # maxBIBEntries
@@ -957,7 +960,7 @@ class MethodHolder(VppTestCase):
         self.assertEqual(1, len(data))
         record = data[0]
         # natEvent
-        self.assertEqual(ord(record[230]), 13)
+        self.assertEqual(scapy.compat.orb(record[230]), 13)
         # natQuotaExceededEvent
         self.assertEqual(struct.pack("I", 5), record[466])
         # maxFragmentsPendingReassembly
@@ -976,7 +979,7 @@ class MethodHolder(VppTestCase):
         self.assertEqual(1, len(data))
         record = data[0]
         # natEvent
-        self.assertEqual(ord(record[230]), 13)
+        self.assertEqual(scapy.compat.orb(record[230]), 13)
         # natQuotaExceededEvent
         self.assertEqual(struct.pack("I", 5), record[466])
         # maxFragmentsPendingReassembly
@@ -996,15 +999,15 @@ class MethodHolder(VppTestCase):
         record = data[0]
         # natEvent
         if is_create:
-            self.assertEqual(ord(record[230]), 10)
+            self.assertEqual(scapy.compat.orb(record[230]), 10)
         else:
-            self.assertEqual(ord(record[230]), 11)
+            self.assertEqual(scapy.compat.orb(record[230]), 11)
         # sourceIPv6Address
         self.assertEqual(src_addr, record[27])
         # postNATSourceIPv4Address
         self.assertEqual(self.nat_addr_n, record[225])
         # protocolIdentifier
-        self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
+        self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
         # ingressVRFID
         self.assertEqual(struct.pack("!I", 0), record[234])
         # sourceTransportPort
@@ -1027,9 +1030,9 @@ class MethodHolder(VppTestCase):
         record = data[0]
         # natEvent
         if is_create:
-            self.assertEqual(ord(record[230]), 6)
+            self.assertEqual(scapy.compat.orb(record[230]), 6)
         else:
-            self.assertEqual(ord(record[230]), 7)
+            self.assertEqual(scapy.compat.orb(record[230]), 7)
         # sourceIPv6Address
         self.assertEqual(src_addr, record[27])
         # destinationIPv6Address
@@ -1044,7 +1047,7 @@ class MethodHolder(VppTestCase):
         self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
                          record[226])
         # protocolIdentifier
-        self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
+        self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
         # ingressVRFID
         self.assertEqual(struct.pack("!I", 0), record[234])
         # sourceTransportPort
@@ -1076,7 +1079,7 @@ class MethodHolder(VppTestCase):
         self.assertEqual(1, len(data))
         record = data[0]
         # natEvent
-        self.assertEqual(ord(record[230]), 13)
+        self.assertEqual(scapy.compat.orb(record[230]), 13)
         # natQuotaExceededEvent
         self.assertEqual(struct.pack("I", 3), record[466])
         # maxEntriesPerUser
@@ -1167,9 +1170,9 @@ class MethodHolder(VppTestCase):
         layer = self.proto2layer(proto)
 
         if proto == IP_PROTOS.tcp:
-            data = "A" * 4 + "B" * 16 + "C" * 3
+            data = b"A" * 4 + b"B" * 16 + b"C" * 3
         else:
-            data = "A" * 16 + "B" * 16 + "C" * 3
+            data = b"A" * 16 + b"B" * 16 + b"C" * 3
         self.port_in = random.randint(1025, 65535)
 
         reass = self.vapi.nat_reass_dump()
@@ -1248,9 +1251,9 @@ class MethodHolder(VppTestCase):
         layer = self.proto2layer(proto)
 
         if proto == IP_PROTOS.tcp:
-            data = "A" * 4 + "B" * 16 + "C" * 3
+            data = b"A" * 4 + b"B" * 16 + b"C" * 3
         else:
-            data = "A" * 16 + "B" * 16 + "C" * 3
+            data = b"A" * 16 + b"B" * 16 + b"C" * 3
         self.port_in = random.randint(1025, 65535)
 
         for i in range(2):
@@ -1317,9 +1320,9 @@ class MethodHolder(VppTestCase):
         layer = self.proto2layer(proto)
 
         if proto == IP_PROTOS.tcp:
-            data = "A" * 4 + "B" * 16 + "C" * 3
+            data = b"A" * 4 + b"B" * 16 + b"C" * 3
         else:
-            data = "A" * 16 + "B" * 16 + "C" * 3
+            data = b"A" * 16 + b"B" * 16 + b"C" * 3
 
         # send packet from host to server
         pkts = self.create_stream_frag(self.pg0,
@@ -1346,9 +1349,9 @@ class MethodHolder(VppTestCase):
         layer = self.proto2layer(proto)
 
         if proto == IP_PROTOS.tcp:
-            data = "A" * 4 + "B" * 16 + "C" * 3
+            data = b"A" * 4 + b"B" * 16 + b"C" * 3
         else:
-            data = "A" * 16 + "B" * 16 + "C" * 3
+            data = b"A" * 16 + b"B" * 16 + b"C" * 3
         self.port_in = random.randint(1025, 65535)
 
         for i in range(2):
@@ -1422,9 +1425,9 @@ class MethodHolder(VppTestCase):
         layer = self.proto2layer(proto)
 
         if proto == IP_PROTOS.tcp:
-            data = "A" * 4 + "B" * 16 + "C" * 3
+            data = b"A" * 4 + b"B" * 16 + b"C" * 3
         else:
-            data = "A" * 16 + "B" * 16 + "C" * 3
+            data = b"A" * 16 + b"B" * 16 + b"C" * 3
         self.port_in = random.randint(1025, 65535)
 
         for i in range(2):
@@ -3539,7 +3542,7 @@ class TestNAT44(MethodHolder):
                                                   is_inside=0)
         self.vapi.nat44_forwarding_enable_disable(1)
 
-        data = "A" * 16 + "B" * 16 + "C" * 3
+        data = b"A" * 16 + b"B" * 16 + b"C" * 3
         pkts = self.create_stream_frag(self.pg1,
                                        self.pg0.remote_ip4,
                                        4789,
@@ -3667,7 +3670,7 @@ class TestNAT44(MethodHolder):
         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
                                            src_port=self.ipfix_src_port)
 
-        data = "A" * 4 + "B" * 16 + "C" * 3
+        data = b"A" * 4 + b"B" * 16 + b"C" * 3
         self.tcp_port_in = random.randint(1025, 65535)
         pkts = self.create_stream_frag(self.pg0,
                                        self.pg1.remote_ip4,
@@ -8043,7 +8046,7 @@ class TestNAT64(MethodHolder):
         reass_n_start = len(reass)
 
         # in2out
-        data = 'a' * 200
+        data = b'a' * 200
         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
                                            self.tcp_port_in, 20, data)
         self.pg0.add_stream(pkts)
@@ -8059,7 +8062,7 @@ class TestNAT64(MethodHolder):
         self.assertEqual(data, p[Raw].load)
 
         # out2in
-        data = "A" * 4 + "b" * 16 + "C" * 3
+        data = b"A" * 4 + b"b" * 16 + b"C" * 3
         pkts = self.create_stream_frag(self.pg1,
                                        self.nat_addr,
                                        20,
@@ -8127,7 +8130,7 @@ class TestNAT64(MethodHolder):
         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
 
         # in2out
-        data = 'a' * 200
+        data = b'a' * 200
         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
                                            self.tcp_port_in, 20, data)
         pkts.reverse()
@@ -8144,7 +8147,7 @@ class TestNAT64(MethodHolder):
         self.assertEqual(data, p[Raw].load)
 
         # out2in
-        data = "A" * 4 + "B" * 16 + "C" * 3
+        data = b"A" * 4 + b"B" * 16 + b"C" * 3
         pkts = self.create_stream_frag(self.pg1,
                                        self.nat_addr,
                                        20,
@@ -8283,7 +8286,7 @@ class TestNAT64(MethodHolder):
         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
                                            src_port=self.ipfix_src_port)
 
-        data = 'a' * 200
+        data = b'a' * 200
         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
                                            self.tcp_port_in, 20, data)
         pkts.reverse()
@@ -8358,9 +8361,9 @@ class TestNAT64(MethodHolder):
         for p in capture:
             if p.haslayer(Data):
                 data = ipfix.decode_data_set(p.getlayer(Set))
-                if ord(data[0][230]) == 10:
+                if scapy.compat.orb(data[0][230]) == 10:
                     self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
-                elif ord(data[0][230]) == 6:
+                elif scapy.compat.orb(data[0][230]) == 6:
                     self.verify_ipfix_nat64_ses(data,
                                                 1,
                                                 self.pg0.remote_ip6n,
@@ -8387,9 +8390,9 @@ class TestNAT64(MethodHolder):
                              self.ipfix_domain_id)
             if p.haslayer(Data):
                 data = ipfix.decode_data_set(p.getlayer(Set))
-                if ord(data[0][230]) == 11:
+                if scapy.compat.orb(data[0][230]) == 11:
                     self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
-                elif ord(data[0][230]) == 7:
+                elif scapy.compat.orb(data[0][230]) == 7:
                     self.verify_ipfix_nat64_ses(data,
                                                 0,
                                                 self.pg0.remote_ip6n,