NAT44 virtual fragmentation reassembly for endpoint-dependent mode (VPP-1325)
[vpp.git] / test / test_nat.py
index a705bd9..f3b5205 100644 (file)
@@ -622,38 +622,64 @@ class MethodHolder(VppTestCase):
                                       "(inside network):", packet))
                 raise
 
-    def create_stream_frag(self, src_if, dst, sport, dport, data):
+    def create_stream_frag(self, src_if, dst, sport, dport, data,
+                           proto=IP_PROTOS.tcp, echo_reply=False):
         """
         Create fragmented packet stream
 
         :param src_if: Source interface
         :param dst: Destination IPv4 address
-        :param sport: Source TCP port
-        :param dport: Destination TCP port
+        :param sport: Source port
+        :param dport: Destination port
         :param data: Payload data
+        :param proto: protocol (TCP, UDP, ICMP)
+        :param echo_reply: use echo_reply if protocol is ICMP
         :returns: Fragmets
         """
+        if proto == IP_PROTOS.tcp:
+            p = (IP(src=src_if.remote_ip4, dst=dst) /
+                 TCP(sport=sport, dport=dport) /
+                 Raw(data))
+            p = p.__class__(str(p))
+            chksum = p['TCP'].chksum
+            proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
+        elif proto == IP_PROTOS.udp:
+            proto_header = UDP(sport=sport, dport=dport)
+        elif proto == IP_PROTOS.icmp:
+            if not echo_reply:
+                proto_header = ICMP(id=sport, type='echo-request')
+            else:
+                proto_header = ICMP(id=sport, type='echo-reply')
+        else:
+            raise Exception("Unsupported protocol")
         id = random.randint(0, 65535)
-        p = (IP(src=src_if.remote_ip4, dst=dst) /
-             TCP(sport=sport, dport=dport) /
-             Raw(data))
-        p = p.__class__(str(p))
-        chksum = p['TCP'].chksum
         pkts = []
+        if proto == IP_PROTOS.tcp:
+            raw = Raw(data[0:4])
+        else:
+            raw = Raw(data[0:16])
         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
-             TCP(sport=sport, dport=dport, chksum=chksum) /
-             Raw(data[0:4]))
+             proto_header /
+             raw)
         pkts.append(p)
+        if proto == IP_PROTOS.tcp:
+            raw = Raw(data[4:20])
+        else:
+            raw = Raw(data[16:32])
         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
-                proto=IP_PROTOS.tcp) /
-             Raw(data[4:20]))
+                proto=proto) /
+             raw)
         pkts.append(p)
+        if proto == IP_PROTOS.tcp:
+            raw = Raw(data[20:])
+        else:
+            raw = Raw(data[32:])
         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
-             IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
+             IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
                 id=id) /
-             Raw(data[20:]))
+             raw)
         pkts.append(p)
         return pkts
 
@@ -702,14 +728,16 @@ class MethodHolder(VppTestCase):
             self.assert_ip_checksum_valid(p)
             buffer.seek(p[IP].frag * 8)
             buffer.write(p[IP].payload)
-        ip = frags[0].getlayer(IP)
         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
                 proto=frags[0][IP].proto)
         if ip.proto == IP_PROTOS.tcp:
             p = (ip / TCP(buffer.getvalue()))
             self.assert_tcp_checksum_valid(p)
         elif ip.proto == IP_PROTOS.udp:
-            p = (ip / UDP(buffer.getvalue()))
+            p = (ip / UDP(buffer.getvalue()[:8]) /
+                 Raw(buffer.getvalue()[8:]))
+        elif ip.proto == IP_PROTOS.icmp:
+            p = (ip / ICMP(buffer.getvalue()))
         return p
 
     def reass_frags_and_verify_ip6(self, frags, src, dst):
@@ -1018,6 +1046,336 @@ class MethodHolder(VppTestCase):
                 self.assertEqual(option[1], mss)
                 self.assert_tcp_checksum_valid(pkt)
 
+    @staticmethod
+    def proto2layer(proto):
+        if proto == IP_PROTOS.tcp:
+            return TCP
+        elif proto == IP_PROTOS.udp:
+            return UDP
+        elif proto == IP_PROTOS.icmp:
+            return ICMP
+        else:
+            raise Exception("Unsupported protocol")
+
+    def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False):
+        layer = self.proto2layer(proto)
+
+        if proto == IP_PROTOS.tcp:
+            data = "A" * 4 + "B" * 16 + "C" * 3
+        else:
+            data = "A" * 16 + "B" * 16 + "C" * 3
+        self.port_in = random.randint(1025, 65535)
+
+        reass = self.vapi.nat_reass_dump()
+        reass_n_start = len(reass)
+
+        # in2out
+        pkts = self.create_stream_frag(self.pg0,
+                                       self.pg1.remote_ip4,
+                                       self.port_in,
+                                       20,
+                                       data,
+                                       proto)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        frags = self.pg1.get_capture(len(pkts))
+        if not dont_translate:
+            p = self.reass_frags_and_verify(frags,
+                                            self.nat_addr,
+                                            self.pg1.remote_ip4)
+        else:
+            p = self.reass_frags_and_verify(frags,
+                                            self.pg0.remote_ip4,
+                                            self.pg1.remote_ip4)
+        if proto != IP_PROTOS.icmp:
+            if not dont_translate:
+                self.assertEqual(p[layer].dport, 20)
+                self.assertNotEqual(p[layer].sport, self.port_in)
+            else:
+                self.assertEqual(p[layer].sport, self.port_in)
+        else:
+            if not dont_translate:
+                self.assertNotEqual(p[layer].id, self.port_in)
+            else:
+                self.assertEqual(p[layer].id, self.port_in)
+        self.assertEqual(data, p[Raw].load)
+
+        # out2in
+        if not dont_translate:
+            dst_addr = self.nat_addr
+        else:
+            dst_addr = self.pg0.remote_ip4
+        if proto != IP_PROTOS.icmp:
+            sport = 20
+            dport = p[layer].sport
+        else:
+            sport = p[layer].id
+            dport = 0
+        pkts = self.create_stream_frag(self.pg1,
+                                       dst_addr,
+                                       sport,
+                                       dport,
+                                       data,
+                                       proto,
+                                       echo_reply=True)
+        self.pg1.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        frags = self.pg0.get_capture(len(pkts))
+        p = self.reass_frags_and_verify(frags,
+                                        self.pg1.remote_ip4,
+                                        self.pg0.remote_ip4)
+        if proto != IP_PROTOS.icmp:
+            self.assertEqual(p[layer].sport, 20)
+            self.assertEqual(p[layer].dport, self.port_in)
+        else:
+            self.assertEqual(p[layer].id, self.port_in)
+        self.assertEqual(data, p[Raw].load)
+
+        reass = self.vapi.nat_reass_dump()
+        reass_n_end = len(reass)
+
+        self.assertEqual(reass_n_end - reass_n_start, 2)
+
+    def frag_in_order_in_plus_out(self, proto=IP_PROTOS.tcp):
+        layer = self.proto2layer(proto)
+
+        if proto == IP_PROTOS.tcp:
+            data = "A" * 4 + "B" * 16 + "C" * 3
+        else:
+            data = "A" * 16 + "B" * 16 + "C" * 3
+        self.port_in = random.randint(1025, 65535)
+
+        for i in range(2):
+            reass = self.vapi.nat_reass_dump()
+            reass_n_start = len(reass)
+
+            # out2in
+            pkts = self.create_stream_frag(self.pg0,
+                                           self.server_out_addr,
+                                           self.port_in,
+                                           self.server_out_port,
+                                           data,
+                                           proto)
+            self.pg0.add_stream(pkts)
+            self.pg_enable_capture(self.pg_interfaces)
+            self.pg_start()
+            frags = self.pg1.get_capture(len(pkts))
+            p = self.reass_frags_and_verify(frags,
+                                            self.pg0.remote_ip4,
+                                            self.server_in_addr)
+            if proto != IP_PROTOS.icmp:
+                self.assertEqual(p[layer].sport, self.port_in)
+                self.assertEqual(p[layer].dport, self.server_in_port)
+            else:
+                self.assertEqual(p[layer].id, self.port_in)
+            self.assertEqual(data, p[Raw].load)
+
+            # in2out
+            if proto != IP_PROTOS.icmp:
+                pkts = self.create_stream_frag(self.pg1,
+                                               self.pg0.remote_ip4,
+                                               self.server_in_port,
+                                               p[layer].sport,
+                                               data,
+                                               proto)
+            else:
+                pkts = self.create_stream_frag(self.pg1,
+                                               self.pg0.remote_ip4,
+                                               p[layer].id,
+                                               0,
+                                               data,
+                                               proto,
+                                               echo_reply=True)
+            self.pg1.add_stream(pkts)
+            self.pg_enable_capture(self.pg_interfaces)
+            self.pg_start()
+            frags = self.pg0.get_capture(len(pkts))
+            p = self.reass_frags_and_verify(frags,
+                                            self.server_out_addr,
+                                            self.pg0.remote_ip4)
+            if proto != IP_PROTOS.icmp:
+                self.assertEqual(p[layer].sport, self.server_out_port)
+                self.assertEqual(p[layer].dport, self.port_in)
+            else:
+                self.assertEqual(p[layer].id, self.port_in)
+            self.assertEqual(data, p[Raw].load)
+
+            reass = self.vapi.nat_reass_dump()
+            reass_n_end = len(reass)
+
+            self.assertEqual(reass_n_end - reass_n_start, 2)
+
+    def reass_hairpinning(self, proto=IP_PROTOS.tcp):
+        layer = self.proto2layer(proto)
+
+        if proto == IP_PROTOS.tcp:
+            data = "A" * 4 + "B" * 16 + "C" * 3
+        else:
+            data = "A" * 16 + "B" * 16 + "C" * 3
+
+        # send packet from host to server
+        pkts = self.create_stream_frag(self.pg0,
+                                       self.nat_addr,
+                                       self.host_in_port,
+                                       self.server_out_port,
+                                       data,
+                                       proto)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        frags = self.pg0.get_capture(len(pkts))
+        p = self.reass_frags_and_verify(frags,
+                                        self.nat_addr,
+                                        self.server.ip4)
+        if proto != IP_PROTOS.icmp:
+            self.assertNotEqual(p[layer].sport, self.host_in_port)
+            self.assertEqual(p[layer].dport, self.server_in_port)
+        else:
+            self.assertNotEqual(p[layer].id, self.host_in_port)
+        self.assertEqual(data, p[Raw].load)
+
+    def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False):
+        layer = self.proto2layer(proto)
+
+        if proto == IP_PROTOS.tcp:
+            data = "A" * 4 + "B" * 16 + "C" * 3
+        else:
+            data = "A" * 16 + "B" * 16 + "C" * 3
+        self.port_in = random.randint(1025, 65535)
+
+        for i in range(2):
+            # in2out
+            pkts = self.create_stream_frag(self.pg0,
+                                           self.pg1.remote_ip4,
+                                           self.port_in,
+                                           20,
+                                           data,
+                                           proto)
+            pkts.reverse()
+            self.pg0.add_stream(pkts)
+            self.pg_enable_capture(self.pg_interfaces)
+            self.pg_start()
+            frags = self.pg1.get_capture(len(pkts))
+            if not dont_translate:
+                p = self.reass_frags_and_verify(frags,
+                                                self.nat_addr,
+                                                self.pg1.remote_ip4)
+            else:
+                p = self.reass_frags_and_verify(frags,
+                                                self.pg0.remote_ip4,
+                                                self.pg1.remote_ip4)
+            if proto != IP_PROTOS.icmp:
+                if not dont_translate:
+                    self.assertEqual(p[layer].dport, 20)
+                    self.assertNotEqual(p[layer].sport, self.port_in)
+                else:
+                    self.assertEqual(p[layer].sport, self.port_in)
+            else:
+                if not dont_translate:
+                    self.assertNotEqual(p[layer].id, self.port_in)
+                else:
+                    self.assertEqual(p[layer].id, self.port_in)
+            self.assertEqual(data, p[Raw].load)
+
+            # out2in
+            if not dont_translate:
+                dst_addr = self.nat_addr
+            else:
+                dst_addr = self.pg0.remote_ip4
+            if proto != IP_PROTOS.icmp:
+                sport = 20
+                dport = p[layer].sport
+            else:
+                sport = p[layer].id
+                dport = 0
+            pkts = self.create_stream_frag(self.pg1,
+                                           dst_addr,
+                                           sport,
+                                           dport,
+                                           data,
+                                           proto,
+                                           echo_reply=True)
+            pkts.reverse()
+            self.pg1.add_stream(pkts)
+            self.pg_enable_capture(self.pg_interfaces)
+            self.pg_start()
+            frags = self.pg0.get_capture(len(pkts))
+            p = self.reass_frags_and_verify(frags,
+                                            self.pg1.remote_ip4,
+                                            self.pg0.remote_ip4)
+            if proto != IP_PROTOS.icmp:
+                self.assertEqual(p[layer].sport, 20)
+                self.assertEqual(p[layer].dport, self.port_in)
+            else:
+                self.assertEqual(p[layer].id, self.port_in)
+            self.assertEqual(data, p[Raw].load)
+
+    def frag_out_of_order_in_plus_out(self, proto=IP_PROTOS.tcp):
+        layer = self.proto2layer(proto)
+
+        if proto == IP_PROTOS.tcp:
+            data = "A" * 4 + "B" * 16 + "C" * 3
+        else:
+            data = "A" * 16 + "B" * 16 + "C" * 3
+        self.port_in = random.randint(1025, 65535)
+
+        for i in range(2):
+            # out2in
+            pkts = self.create_stream_frag(self.pg0,
+                                           self.server_out_addr,
+                                           self.port_in,
+                                           self.server_out_port,
+                                           data,
+                                           proto)
+            pkts.reverse()
+            self.pg0.add_stream(pkts)
+            self.pg_enable_capture(self.pg_interfaces)
+            self.pg_start()
+            frags = self.pg1.get_capture(len(pkts))
+            p = self.reass_frags_and_verify(frags,
+                                            self.pg0.remote_ip4,
+                                            self.server_in_addr)
+            if proto != IP_PROTOS.icmp:
+                self.assertEqual(p[layer].dport, self.server_in_port)
+                self.assertEqual(p[layer].sport, self.port_in)
+                self.assertEqual(p[layer].dport, self.server_in_port)
+            else:
+                self.assertEqual(p[layer].id, self.port_in)
+            self.assertEqual(data, p[Raw].load)
+
+            # in2out
+            if proto != IP_PROTOS.icmp:
+                pkts = self.create_stream_frag(self.pg1,
+                                               self.pg0.remote_ip4,
+                                               self.server_in_port,
+                                               p[layer].sport,
+                                               data,
+                                               proto)
+            else:
+                pkts = self.create_stream_frag(self.pg1,
+                                               self.pg0.remote_ip4,
+                                               p[layer].id,
+                                               0,
+                                               data,
+                                               proto,
+                                               echo_reply=True)
+            pkts.reverse()
+            self.pg1.add_stream(pkts)
+            self.pg_enable_capture(self.pg_interfaces)
+            self.pg_start()
+            frags = self.pg0.get_capture(len(pkts))
+            p = self.reass_frags_and_verify(frags,
+                                            self.server_out_addr,
+                                            self.pg0.remote_ip4)
+            if proto != IP_PROTOS.icmp:
+                self.assertEqual(p[layer].sport, self.server_out_port)
+                self.assertEqual(p[layer].dport, self.port_in)
+            else:
+                self.assertEqual(p[layer].id, self.port_in)
+            self.assertEqual(data, p[Raw].load)
+
 
 class TestNAT44(MethodHolder):
     """ NAT44 Test Cases """
@@ -2964,6 +3322,7 @@ class TestNAT44(MethodHolder):
 
     def test_frag_in_order(self):
         """ NAT44 translate fragments arriving in order """
+
         self.nat44_add_address(self.nat_addr)
         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
@@ -3017,6 +3376,7 @@ class TestNAT44(MethodHolder):
 
     def test_reass_hairpinning(self):
         """ NAT44 fragments hairpinning """
+
         server = self.pg0.remote_hosts[1]
         host_in_port = random.randint(1025, 65535)
         server_in_port = random.randint(1025, 65535)
@@ -3051,6 +3411,7 @@ class TestNAT44(MethodHolder):
 
     def test_frag_out_of_order(self):
         """ NAT44 translate fragments arriving out of order """
+
         self.nat44_add_address(self.nat_addr)
         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
@@ -3469,6 +3830,147 @@ class TestNAT44EndpointDependent(MethodHolder):
             super(TestNAT44EndpointDependent, cls).tearDownClass()
             raise
 
+    def test_frag_in_order(self):
+        """ NAT44 translate fragments arriving in order """
+        self.nat44_add_address(self.nat_addr)
+        self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+        self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+                                                  is_inside=0)
+        self.frag_in_order(proto=IP_PROTOS.tcp)
+        self.frag_in_order(proto=IP_PROTOS.udp)
+        self.frag_in_order(proto=IP_PROTOS.icmp)
+
+    def test_frag_in_order_dont_translate(self):
+        """ NAT44 don't translate fragments arriving in order """
+        self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+        self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+                                                  is_inside=0)
+        self.vapi.nat44_forwarding_enable_disable(enable=True)
+        self.frag_in_order(proto=IP_PROTOS.tcp, dont_translate=True)
+
+    def test_frag_out_of_order(self):
+        """ NAT44 translate fragments arriving out of order """
+        self.nat44_add_address(self.nat_addr)
+        self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+        self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+                                                  is_inside=0)
+        self.frag_out_of_order(proto=IP_PROTOS.tcp)
+        self.frag_out_of_order(proto=IP_PROTOS.udp)
+        self.frag_out_of_order(proto=IP_PROTOS.icmp)
+
+    def test_frag_out_of_order_dont_translate(self):
+        """ NAT44 don't translate fragments arriving out of order """
+        self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+        self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+                                                  is_inside=0)
+        self.vapi.nat44_forwarding_enable_disable(enable=True)
+        self.frag_out_of_order(proto=IP_PROTOS.tcp, dont_translate=True)
+
+    def test_frag_in_order_in_plus_out(self):
+        """ in+out interface fragments in order """
+        self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+        self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
+                                                  is_inside=0)
+        self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
+        self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+                                                  is_inside=0)
+
+        self.server = self.pg1.remote_hosts[0]
+
+        self.server_in_addr = self.server.ip4
+        self.server_out_addr = '11.11.11.11'
+        self.server_in_port = random.randint(1025, 65535)
+        self.server_out_port = random.randint(1025, 65535)
+
+        self.nat44_add_address(self.server_out_addr)
+
+        # add static mappings for server
+        self.nat44_add_static_mapping(self.server_in_addr,
+                                      self.server_out_addr,
+                                      self.server_in_port,
+                                      self.server_out_port,
+                                      proto=IP_PROTOS.tcp)
+        self.nat44_add_static_mapping(self.server_in_addr,
+                                      self.server_out_addr,
+                                      self.server_in_port,
+                                      self.server_out_port,
+                                      proto=IP_PROTOS.udp)
+        self.nat44_add_static_mapping(self.server_in_addr,
+                                      self.server_out_addr,
+                                      proto=IP_PROTOS.icmp)
+
+        self.vapi.nat_set_reass(timeout=10)
+
+        self.frag_in_order_in_plus_out(proto=IP_PROTOS.tcp)
+        self.frag_in_order_in_plus_out(proto=IP_PROTOS.udp)
+        self.frag_in_order_in_plus_out(proto=IP_PROTOS.icmp)
+
+    def test_frag_out_of_order_in_plus_out(self):
+        """ in+out interface fragments out of order """
+        self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+        self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
+                                                  is_inside=0)
+        self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
+        self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+                                                  is_inside=0)
+
+        self.server = self.pg1.remote_hosts[0]
+
+        self.server_in_addr = self.server.ip4
+        self.server_out_addr = '11.11.11.11'
+        self.server_in_port = random.randint(1025, 65535)
+        self.server_out_port = random.randint(1025, 65535)
+
+        self.nat44_add_address(self.server_out_addr)
+
+        # add static mappings for server
+        self.nat44_add_static_mapping(self.server_in_addr,
+                                      self.server_out_addr,
+                                      self.server_in_port,
+                                      self.server_out_port,
+                                      proto=IP_PROTOS.tcp)
+        self.nat44_add_static_mapping(self.server_in_addr,
+                                      self.server_out_addr,
+                                      self.server_in_port,
+                                      self.server_out_port,
+                                      proto=IP_PROTOS.udp)
+        self.nat44_add_static_mapping(self.server_in_addr,
+                                      self.server_out_addr,
+                                      proto=IP_PROTOS.icmp)
+
+        self.vapi.nat_set_reass(timeout=10)
+
+        self.frag_out_of_order_in_plus_out(proto=IP_PROTOS.tcp)
+        self.frag_out_of_order_in_plus_out(proto=IP_PROTOS.udp)
+        self.frag_out_of_order_in_plus_out(proto=IP_PROTOS.icmp)
+
+    def test_reass_hairpinning(self):
+        """ NAT44 fragments hairpinning """
+        self.server = self.pg0.remote_hosts[1]
+        self.host_in_port = random.randint(1025, 65535)
+        self.server_in_port = random.randint(1025, 65535)
+        self.server_out_port = random.randint(1025, 65535)
+
+        self.nat44_add_address(self.nat_addr)
+        self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+        self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+                                                  is_inside=0)
+        # add static mapping for server
+        self.nat44_add_static_mapping(self.server.ip4, self.nat_addr,
+                                      self.server_in_port,
+                                      self.server_out_port,
+                                      proto=IP_PROTOS.tcp)
+        self.nat44_add_static_mapping(self.server.ip4, self.nat_addr,
+                                      self.server_in_port,
+                                      self.server_out_port,
+                                      proto=IP_PROTOS.udp)
+        self.nat44_add_static_mapping(self.server.ip4, self.nat_addr,
+                                      proto=IP_PROTOS.icmp)
+
+        self.reass_hairpinning(proto=IP_PROTOS.tcp)
+        self.reass_hairpinning(proto=IP_PROTOS.udp)
+        self.reass_hairpinning(proto=IP_PROTOS.icmp)
+
     def test_dynamic(self):
         """ NAT44 dynamic translation test """