Fix M(), M2() macros in VAT
[vpp.git] / test / test_snat.py
index 09fdb10..967d4b3 100644 (file)
@@ -6,7 +6,7 @@ import struct
 
 from framework import VppTestCase, VppTestRunner
 from scapy.layers.inet import IP, TCP, UDP, ICMP
-from scapy.layers.l2 import Ether
+from scapy.layers.l2 import Ether, ARP
 from scapy.data import IP_PROTOS
 from util import ppp
 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
@@ -269,6 +269,9 @@ class TestSNAT(VppTestCase):
         """
         Clear SNAT configuration.
         """
+        if self.pg7.has_ip4_config:
+            self.pg7.unconfig_ip4()
+
         interfaces = self.vapi.snat_interface_addr_dump()
         for intf in interfaces:
             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
@@ -289,6 +292,7 @@ class TestSNAT(VppTestCase):
                                               external_port=sm.external_port,
                                               addr_only=sm.addr_only,
                                               vrf_id=sm.vrf_id,
+                                              protocol=sm.protocol,
                                               is_add=0)
 
         adresses = self.vapi.snat_address_dump()
@@ -297,8 +301,10 @@ class TestSNAT(VppTestCase):
                                              addr.ip_address,
                                              is_add=0)
 
-    def snat_add_static_mapping(self, local_ip, external_ip, local_port=0,
-                                external_port=0, vrf_id=0, is_add=1):
+    def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
+                                local_port=0, external_port=0, vrf_id=0,
+                                is_add=1, external_sw_if_index=0xFFFFFFFF,
+                                proto=0):
         """
         Add/delete S-NAT static mapping
 
@@ -308,6 +314,8 @@ class TestSNAT(VppTestCase):
         :param external_port: External port number (Optional)
         :param vrf_id: VRF ID (Default 0)
         :param is_add: 1 if add, 0 if delete (Default add)
+        :param external_sw_if_index: External interface instead of IP address
+        :param proto: IP protocol (Mandatory if port specified)
         """
         addr_only = 1
         if local_port and external_port:
@@ -317,10 +325,12 @@ class TestSNAT(VppTestCase):
         self.vapi.snat_add_static_mapping(
             l_ip,
             e_ip,
+            external_sw_if_index,
             local_port,
             external_port,
             addr_only,
             vrf_id,
+            proto,
             is_add)
 
     def snat_add_address(self, ip, is_add=1):
@@ -424,11 +434,14 @@ class TestSNAT(VppTestCase):
 
         self.snat_add_address(self.snat_addr)
         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
-                                     self.tcp_port_in, self.tcp_port_out)
+                                     self.tcp_port_in, self.tcp_port_out,
+                                     proto=IP_PROTOS.tcp)
         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
-                                     self.udp_port_in, self.udp_port_out)
+                                     self.udp_port_in, self.udp_port_out,
+                                     proto=IP_PROTOS.udp)
         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
-                                     self.icmp_id_in, self.icmp_id_out)
+                                     self.icmp_id_in, self.icmp_id_out,
+                                     proto=IP_PROTOS.icmp)
         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                  is_inside=0)
@@ -458,11 +471,14 @@ class TestSNAT(VppTestCase):
 
         self.snat_add_address(self.snat_addr)
         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
-                                     self.tcp_port_in, self.tcp_port_out)
+                                     self.tcp_port_in, self.tcp_port_out,
+                                     proto=IP_PROTOS.tcp)
         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
-                                     self.udp_port_in, self.udp_port_out)
+                                     self.udp_port_in, self.udp_port_out,
+                                     proto=IP_PROTOS.udp)
         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
-                                     self.icmp_id_in, self.icmp_id_out)
+                                     self.icmp_id_in, self.icmp_id_out,
+                                     proto=IP_PROTOS.icmp)
         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                  is_inside=0)
@@ -518,9 +534,7 @@ class TestSNAT(VppTestCase):
         self.pg3.assert_nothing_captured()
 
     def test_multiple_inside_interfaces(self):
-        """
-        SNAT multiple inside interfaces with non-overlapping address space
-        """
+        """ SNAT multiple inside interfaces (non-overlapping address space) """
 
         self.snat_add_address(self.snat_addr)
         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
@@ -681,7 +695,8 @@ class TestSNAT(VppTestCase):
                                                  is_inside=0)
         # add static mapping for server
         self.snat_add_static_mapping(server.ip4, self.snat_addr,
-                                     server_in_port, server_out_port)
+                                     server_in_port, server_out_port,
+                                     proto=IP_PROTOS.tcp)
 
         # send packet from host to server
         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
@@ -762,12 +777,35 @@ class TestSNAT(VppTestCase):
         self.pg7.config_ip4()
         adresses = self.vapi.snat_address_dump()
         self.assertEqual(1, len(adresses))
+        self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
 
         # remove interface address and check NAT address pool
         self.pg7.unconfig_ip4()
         adresses = self.vapi.snat_address_dump()
         self.assertEqual(0, len(adresses))
 
+    def test_interface_addr_static_mapping(self):
+        """ Static mapping with addresses from interface """
+        self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
+        self.snat_add_static_mapping('1.2.3.4',
+                                     external_sw_if_index=self.pg7.sw_if_index)
+
+        # no static mappings
+        static_mappings = self.vapi.snat_static_mapping_dump()
+        self.assertEqual(0, len(static_mappings))
+
+        # configure interface address and check static mappings
+        self.pg7.config_ip4()
+        static_mappings = self.vapi.snat_static_mapping_dump()
+        self.assertEqual(1, len(static_mappings))
+        self.assertEqual(static_mappings[0].external_ip_address[0:4],
+                         self.pg7.local_ip4n)
+
+        # remove interface address and check static mappings
+        self.pg7.unconfig_ip4()
+        static_mappings = self.vapi.snat_static_mapping_dump()
+        self.assertEqual(0, len(static_mappings))
+
     def test_ipfix_nat44_sess(self):
         """ S-NAT IPFIX logging NAT44 session created/delted """
         self.snat_add_address(self.snat_addr)
@@ -833,6 +871,67 @@ class TestSNAT(VppTestCase):
                 data = ipfix.decode_data_set(p.getlayer(Set))
                 self.verify_ipfix_addr_exhausted(data)
 
+    def test_pool_addr_fib(self):
+        """ S-NAT add pool addresses to FIB """
+        static_addr = '10.0.0.10'
+        self.snat_add_address(self.snat_addr)
+        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+                                                 is_inside=0)
+        self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
+
+        # SNAT address
+        p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
+             ARP(op=ARP.who_has, pdst=self.snat_addr,
+                 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
+        self.pg1.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(1)
+        self.assertTrue(capture[0].haslayer(ARP))
+        self.assertTrue(capture[0][ARP].op, ARP.is_at)
+
+        # 1:1 NAT address
+        p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
+             ARP(op=ARP.who_has, pdst=static_addr,
+                 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
+        self.pg1.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(1)
+        self.assertTrue(capture[0].haslayer(ARP))
+        self.assertTrue(capture[0][ARP].op, ARP.is_at)
+
+        # send ARP to non-SNAT interface
+        p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
+             ARP(op=ARP.who_has, pdst=self.snat_addr,
+                 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
+        self.pg2.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(0)
+
+        # remove addresses and verify
+        self.snat_add_address(self.snat_addr, is_add=0)
+        self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
+                                     is_add=0)
+
+        p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
+             ARP(op=ARP.who_has, pdst=self.snat_addr,
+                 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
+        self.pg1.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(0)
+
+        p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
+             ARP(op=ARP.who_has, pdst=static_addr,
+                 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
+        self.pg1.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(0)
+
     def tearDown(self):
         super(TestSNAT, self).tearDown()
         if not self.vpp_dead: