Improve the sample plugin node dispatch function
[vpp.git] / test / test_ipsec_nat.py
index cebbfc8..9e0b9ea 100644 (file)
@@ -6,18 +6,11 @@ from scapy.layers.l2 import Ether
 from scapy.layers.inet import ICMP, IP, TCP, UDP
 from scapy.layers.ipsec import SecurityAssociation, ESP
 from util import ppp, ppc
 from scapy.layers.inet import ICMP, IP, TCP, UDP
 from scapy.layers.ipsec import SecurityAssociation, ESP
 from util import ppp, ppc
-from framework import VppTestCase
+from template_ipsec import TemplateIpsec
 
 
 
 
-class IPSecNATTestCase(VppTestCase):
+class IPSecNATTestCase(TemplateIpsec):
     """ IPSec/NAT
     """ IPSec/NAT
-
-    TRANSPORT MODE:
-
-     ---   encrypt   ---
-    |pg2| <-------> |VPP|
-     ---   decrypt   ---
-
     TUNNEL MODE:
 
 
     TUNNEL MODE:
 
 
@@ -31,26 +24,21 @@ class IPSecNATTestCase(VppTestCase):
      ---            ---           ---
     """
 
      ---            ---           ---
     """
 
-    remote_pg0_client_addr = '1.1.1.1'
+    tcp_port_in = 6303
+    tcp_port_out = 6303
+    udp_port_in = 6304
+    udp_port_out = 6304
+    icmp_id_in = 6305
+    icmp_id_out = 6305
 
     @classmethod
     def setUpClass(cls):
         super(IPSecNATTestCase, cls).setUpClass()
 
     @classmethod
     def setUpClass(cls):
         super(IPSecNATTestCase, cls).setUpClass()
-        cls.create_pg_interfaces(range(2))
-        for i in cls.pg_interfaces:
-            i.configure_ipv4_neighbors()
-            i.admin_up()
-            i.config_ip4()
-            i.resolve_arp()
-
-        cls.tcp_port_in = 6303
-        cls.tcp_port_out = 6303
-        cls.udp_port_in = 6304
-        cls.udp_port_out = 6304
-        cls.icmp_id_in = 6305
-        cls.icmp_id_out = 6305
+        cls.tun_if = cls.pg0
         cls.config_esp_tun()
         cls.logger.info(cls.vapi.ppcli("show ipsec"))
         cls.config_esp_tun()
         cls.logger.info(cls.vapi.ppcli("show ipsec"))
+        client = socket.inet_pton(socket.AF_INET, cls.remote_tun_if_host)
+        cls.vapi.ip_add_del_route(client, 32, cls.tun_if.remote_ip4n)
 
     def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
         return [
 
     def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
         return [
@@ -88,7 +76,7 @@ class IPSecNATTestCase(VppTestCase):
         for packet in capture:
             try:
                 self.assert_packet_checksums_valid(packet)
         for packet in capture:
             try:
                 self.assert_packet_checksums_valid(packet)
-                self.assert_equal(packet[IP].src, self.pg0.remote_ip4,
+                self.assert_equal(packet[IP].src, self.tun_if.remote_ip4,
                                   "decrypted packet source address")
                 self.assert_equal(packet[IP].dst, self.pg1.remote_ip4,
                                   "decrypted packet destination address")
                                   "decrypted packet source address")
                 self.assert_equal(packet[IP].dst, self.pg1.remote_ip4,
                                   "decrypted packet destination address")
@@ -119,18 +107,19 @@ class IPSecNATTestCase(VppTestCase):
     def verify_capture_encrypted(self, capture, sa):
         for packet in capture:
             try:
     def verify_capture_encrypted(self, capture, sa):
         for packet in capture:
             try:
+                copy = packet.__class__(str(packet))
+                del copy[UDP].len
+                copy = packet.__class__(str(copy))
+                self.assert_equal(packet[UDP].len, copy[UDP].len,
+                                  "UDP header length")
+                self.assert_packet_checksums_valid(packet)
                 self.assertIn(ESP, packet[IP])
                 decrypt_pkt = sa.decrypt(packet[IP])
                 self.assertIn(ESP, packet[IP])
                 decrypt_pkt = sa.decrypt(packet[IP])
+                self.assert_packet_checksums_valid(decrypt_pkt)
                 self.assert_equal(decrypt_pkt[IP].src, self.pg1.remote_ip4,
                                   "encrypted packet source address")
                 self.assert_equal(decrypt_pkt[IP].src, self.pg1.remote_ip4,
                                   "encrypted packet source address")
-                self.assert_equal(decrypt_pkt[IP].dst, self.pg0.remote_ip4,
+                self.assert_equal(decrypt_pkt[IP].dst, self.tun_if.remote_ip4,
                                   "encrypted packet destination address")
                                   "encrypted packet destination address")
-                # if decrypt_pkt.haslayer(TCP):
-                #     self.tcp_port_out = decrypt_pkt[TCP].sport
-                # elif decrypt_pkt.haslayer(UDP):
-                #     self.udp_port_out = decrypt_pkt[UDP].sport
-                # else:
-                #     self.icmp_id_out = decrypt_pkt[ICMP].id
             except Exception:
                 self.logger.error(
                     ppp("Unexpected or invalid encrypted packet:", packet))
             except Exception:
                 self.logger.error(
                     ppp("Unexpected or invalid encrypted packet:", packet))
@@ -138,101 +127,100 @@ class IPSecNATTestCase(VppTestCase):
 
     @classmethod
     def config_esp_tun(cls):
 
     @classmethod
     def config_esp_tun(cls):
-        spd_id = 1
-        remote_sa_id = 10
-        local_sa_id = 20
-        remote_tun_spi = 1001
-        local_tun_spi = 1000
-        client = socket.inet_pton(socket.AF_INET, cls.remote_pg0_client_addr)
-        cls.vapi.ip_add_del_route(client, 32, cls.pg0.remote_ip4n)
-        cls.vapi.ipsec_sad_add_del_entry(remote_sa_id, remote_tun_spi,
+        cls.vapi.ipsec_sad_add_del_entry(cls.scapy_tun_sa_id,
+                                         cls.scapy_tun_spi,
+                                         cls.auth_algo_vpp_id, cls.auth_key,
+                                         cls.crypt_algo_vpp_id,
+                                         cls.crypt_key, cls.vpp_esp_protocol,
                                          cls.pg1.remote_ip4n,
                                          cls.pg1.remote_ip4n,
-                                         cls.pg0.remote_ip4n,
-                                         integrity_key_length=20,
-                                         crypto_key_length=16,
-                                         protocol=1, udp_encap=1)
-        cls.vapi.ipsec_sad_add_del_entry(local_sa_id, local_tun_spi,
-                                         cls.pg0.remote_ip4n,
+                                         cls.tun_if.remote_ip4n,
+                                         udp_encap=1)
+        cls.vapi.ipsec_sad_add_del_entry(cls.vpp_tun_sa_id,
+                                         cls.vpp_tun_spi,
+                                         cls.auth_algo_vpp_id, cls.auth_key,
+                                         cls.crypt_algo_vpp_id,
+                                         cls.crypt_key, cls.vpp_esp_protocol,
+                                         cls.tun_if.remote_ip4n,
                                          cls.pg1.remote_ip4n,
                                          cls.pg1.remote_ip4n,
-                                         integrity_key_length=20,
-                                         crypto_key_length=16,
-                                         protocol=1, udp_encap=1)
-        cls.vapi.ipsec_spd_add_del(spd_id)
-        cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg0.sw_if_index)
+                                         udp_encap=1)
+        cls.vapi.ipsec_spd_add_del(cls.tun_spd_id)
+        cls.vapi.ipsec_interface_add_del_spd(cls.tun_spd_id,
+                                             cls.tun_if.sw_if_index)
         l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET,
                                                      "0.0.0.0")
         l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
                                                    "255.255.255.255")
         l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET,
                                                      "0.0.0.0")
         l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
                                                    "255.255.255.255")
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
-                                         r_startaddr, r_stopaddr,
+        cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
+                                         l_startaddr, l_stopaddr, r_startaddr,
+                                         r_stopaddr,
                                          protocol=socket.IPPROTO_ESP)
                                          protocol=socket.IPPROTO_ESP)
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
-                                         r_startaddr, r_stopaddr,
-                                         protocol=socket.IPPROTO_ESP,
-                                         is_outbound=0)
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
-                                         r_startaddr, r_stopaddr,
-                                         remote_port_start=4500,
+        cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
+                                         l_startaddr, l_stopaddr, r_startaddr,
+                                         r_stopaddr, is_outbound=0,
+                                         protocol=socket.IPPROTO_ESP)
+        cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
+                                         l_startaddr, l_stopaddr, r_startaddr,
+                                         r_stopaddr, remote_port_start=4500,
                                          remote_port_stop=4500,
                                          protocol=socket.IPPROTO_UDP)
                                          remote_port_stop=4500,
                                          protocol=socket.IPPROTO_UDP)
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
-                                         r_startaddr, r_stopaddr,
-                                         remote_port_start=4500,
+        cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
+                                         l_startaddr, l_stopaddr, r_startaddr,
+                                         r_stopaddr, remote_port_start=4500,
                                          remote_port_stop=4500,
                                          protocol=socket.IPPROTO_UDP,
                                          is_outbound=0)
                                          remote_port_stop=4500,
                                          protocol=socket.IPPROTO_UDP,
                                          is_outbound=0)
-        l_startaddr = l_stopaddr = cls.pg0.remote_ip4n
+        l_startaddr = l_stopaddr = cls.tun_if.remote_ip4n
         r_startaddr = r_stopaddr = cls.pg1.remote_ip4n
         r_startaddr = r_stopaddr = cls.pg1.remote_ip4n
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
-                                         r_startaddr, r_stopaddr,
-                                         priority=10, policy=3,
-                                         is_outbound=0, sa_id=local_sa_id)
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, r_startaddr, r_stopaddr,
-                                         l_startaddr, l_stopaddr,
-                                         priority=10, policy=3,
-                                         sa_id=remote_sa_id)
+        cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id,
+                                         l_startaddr, l_stopaddr, r_startaddr,
+                                         r_stopaddr, priority=10, policy=3,
+                                         is_outbound=0)
+        cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
+                                         r_startaddr, r_stopaddr, l_startaddr,
+                                         l_stopaddr, priority=10, policy=3)
 
     def test_ipsec_nat_tun(self):
         """ IPSec/NAT tunnel test case """
 
     def test_ipsec_nat_tun(self):
         """ IPSec/NAT tunnel test case """
-        local_tun_sa = SecurityAssociation(ESP, spi=0x000003e9,
-                                           crypt_algo='AES-CBC',
-                                           crypt_key='JPjyOWBeVEQiMe7h',
-                                           auth_algo='HMAC-SHA1-96',
-                                           auth_key='C91KUR9GYMm5GfkEvNjX',
+        scapy_tun_sa = SecurityAssociation(ESP, spi=self.scapy_tun_spi,
+                                           crypt_algo=self.crypt_algo,
+                                           crypt_key=self.crypt_key,
+                                           auth_algo=self.auth_algo,
+                                           auth_key=self.auth_key,
                                            tunnel_header=IP(
                                                src=self.pg1.remote_ip4,
                                            tunnel_header=IP(
                                                src=self.pg1.remote_ip4,
-                                               dst=self.pg0.remote_ip4),
+                                               dst=self.tun_if.remote_ip4),
                                            nat_t_header=UDP(
                                                sport=4500,
                                                dport=4500))
         # in2out - from private network to public
         pkts = self.create_stream_plain(
             self.pg1.remote_mac, self.pg1.local_mac,
                                            nat_t_header=UDP(
                                                sport=4500,
                                                dport=4500))
         # in2out - from private network to public
         pkts = self.create_stream_plain(
             self.pg1.remote_mac, self.pg1.local_mac,
-            self.pg1.remote_ip4, self.pg0.remote_ip4)
+            self.pg1.remote_ip4, self.tun_if.remote_ip4)
         self.pg1.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         self.pg1.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture(len(pkts))
-        self.verify_capture_encrypted(capture, local_tun_sa)
-
-        remote_tun_sa = SecurityAssociation(ESP, spi=0x000003e8,
-                                            crypt_algo='AES-CBC',
-                                            crypt_key='JPjyOWBeVEQiMe7h',
-                                            auth_algo='HMAC-SHA1-96',
-                                            auth_key='C91KUR9GYMm5GfkEvNjX',
-                                            tunnel_header=IP(
-                                                src=self.pg0.remote_ip4,
-                                                dst=self.pg1.remote_ip4),
-                                            nat_t_header=UDP(
-                                                sport=4500,
-                                                dport=4500))
+        capture = self.tun_if.get_capture(len(pkts))
+        self.verify_capture_encrypted(capture, scapy_tun_sa)
+
+        vpp_tun_sa = SecurityAssociation(ESP,
+                                         spi=self.vpp_tun_spi,
+                                         crypt_algo=self.crypt_algo,
+                                         crypt_key=self.crypt_key,
+                                         auth_algo=self.auth_algo,
+                                         auth_key=self.auth_key,
+                                         tunnel_header=IP(
+                                             src=self.tun_if.remote_ip4,
+                                             dst=self.pg1.remote_ip4),
+                                         nat_t_header=UDP(
+                                             sport=4500,
+                                             dport=4500))
 
         # out2in - from public network to private
         pkts = self.create_stream_encrypted(
 
         # out2in - from public network to private
         pkts = self.create_stream_encrypted(
-            self.pg0.remote_mac, self.pg0.local_mac,
-            self.pg0.remote_ip4, self.pg1.remote_ip4, remote_tun_sa)
+            self.tun_if.remote_mac, self.tun_if.local_mac,
+            self.tun_if.remote_ip4, self.pg1.remote_ip4, vpp_tun_sa)
         self.logger.info(ppc("Sending packets:", pkts))
         self.logger.info(ppc("Sending packets:", pkts))
-        self.pg0.add_stream(pkts)
+        self.tun_if.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg1.get_capture(len(pkts))
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg1.get_capture(len(pkts))