nat: in2out-output nodes work with acl reflect
[vpp.git] / src / plugins / nat / test / test_nat.py
index 46b97c0..f870655 100644 (file)
@@ -32,6 +32,7 @@ from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
     PacketListField
 from ipaddress import IPv6Network
 from util import ppc, ppp
+from socket import inet_pton, AF_INET
 
 
 # NAT HA protocol event data
@@ -6386,6 +6387,108 @@ class TestNAT44EndpointDependent(MethodHolder):
         capture = self.pg0.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg0)
 
+    def test_output_feature_stateful_acl(self):
+        """ NAT44 endpoint-dependent output feature works with stateful ACL """
+        self.nat44_add_address(self.nat_addr)
+        self.vapi.nat44_interface_add_del_output_feature(
+            sw_if_index=self.pg0.sw_if_index,
+            flags=self.config_flags.NAT_IS_INSIDE,
+            is_add=1)
+        self.vapi.nat44_interface_add_del_output_feature(
+            sw_if_index=self.pg1.sw_if_index,
+            flags=self.config_flags.NAT_IS_OUTSIDE,
+            is_add=1)
+
+        # First ensure that the NAT is working sans ACL
+
+        # send packets out2in, no sessions yet so packets should drop
+        pkts_out2in = self.create_stream_out(self.pg1)
+        self.send_and_assert_no_replies(self.pg1, pkts_out2in)
+
+        # send packets into inside intf, ensure received via outside intf
+        pkts_in2out = self.create_stream_in(self.pg0, self.pg1)
+        capture = self.send_and_expect(self.pg0, pkts_in2out, self.pg1,
+                                       len(pkts_in2out))
+        self.verify_capture_out(capture)
+
+        # send out2in again, with sessions created it should work now
+        pkts_out2in = self.create_stream_out(self.pg1)
+        capture = self.send_and_expect(self.pg1, pkts_out2in, self.pg0,
+                                       len(pkts_out2in))
+        self.verify_capture_in(capture, self.pg0)
+
+        # Create an ACL blocking everything
+        out2in_deny_rule = {
+            'is_permit': 0,
+            'is_ipv6': 0,
+            'src_ip_addr': inet_pton(AF_INET, "0.0.0.0"),
+            'src_ip_prefix_len': 0,
+            'dst_ip_addr':  inet_pton(AF_INET, "0.0.0.0"),
+            'dst_ip_prefix_len': 0,
+            'srcport_or_icmptype_first': 0,
+            'srcport_or_icmptype_last': 65535,
+            'dstport_or_icmpcode_first': 0,
+            'dstport_or_icmpcode_last': 65535,
+            'proto': 0,
+        }
+        out2in_rules = [out2in_deny_rule]
+        res = self.vapi.acl_add_replace(0xffffffff, out2in_rules)
+        self.assertEqual(res.retval, 0, "error adding out2in ACL")
+        out2in_acl = res.acl_index
+
+        # apply as input acl on interface and confirm it blocks everything
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
+                                             n_input=1,
+                                             acls=[out2in_acl])
+        self.send_and_assert_no_replies(self.pg1, pkts_out2in)
+
+        # create an ACL to permit/reflect everything
+        in2out_reflect_rule = {
+            'is_permit': 2,
+            'is_ipv6': 0,
+            'src_ip_addr': inet_pton(AF_INET, "0.0.0.0"),
+            'src_ip_prefix_len': 0,
+            'dst_ip_addr':  inet_pton(AF_INET, "0.0.0.0"),
+            'dst_ip_prefix_len': 0,
+            'srcport_or_icmptype_first': 0,
+            'srcport_or_icmptype_last': 65535,
+            'dstport_or_icmpcode_first': 0,
+            'dstport_or_icmpcode_last': 65535,
+            'proto': 0,
+        }
+        in2out_rules = [in2out_reflect_rule]
+        res = self.vapi.acl_add_replace(0xffffffff, in2out_rules)
+        self.assertEqual(res.retval, 0, "error adding in2out ACL")
+        in2out_acl = res.acl_index
+
+        # apply output acl
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
+                                             n_input=1,
+                                             acls=[out2in_acl, in2out_acl])
+        # send in2out to generate ACL state (NAT state was created earlier)
+        capture = self.send_and_expect(self.pg0, pkts_in2out, self.pg1,
+                                       len(pkts_in2out))
+        self.verify_capture_out(capture)
+
+        # send out2in again. ACL state exists so it should work now.
+        # TCP packets with the syn flag set also need the ack flag
+        for p in pkts_out2in:
+            if p.haslayer(TCP) and p[TCP].flags & 0x02:
+                p[TCP].flags |= 0x10
+        capture = self.send_and_expect(self.pg1, pkts_out2in, self.pg0,
+                                       len(pkts_out2in))
+        self.verify_capture_in(capture, self.pg0)
+        self.logger.info(self.vapi.cli("show trace"))
+
+        # Clean up
+        # Remove ACLs from interface
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
+                                             n_input=0,
+                                             acls=[])
+        # delete ACLs
+        self.vapi.acl_del(acl_index=out2in_acl, expected_retval=0)
+        self.vapi.acl_del(acl_index=in2out_acl, expected_retval=0)
+
     def test_multiple_vrf(self):
         """ Multiple VRF setup """
         external_addr = '1.2.3.4'