nat: fix nat44-ed port range with multiple workers
[vpp.git] / test / test_nat44_ed.py
1 #!/usr/bin/env python3
2
3 import unittest
4 from io import BytesIO
5 from random import randint, choice
6
7 import scapy.compat
8 from framework import VppTestCase, VppTestRunner
9 from scapy.data import IP_PROTOS
10 from scapy.layers.inet import IP, TCP, UDP, ICMP, GRE
11 from scapy.layers.inet import IPerror, TCPerror
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from syslog_rfc5424_parser import SyslogMessage, ParseError
15 from syslog_rfc5424_parser.constants import SyslogSeverity
16 from util import ppp, pr, ip4_range
17 from vpp_acl import AclRule, VppAcl, VppAclInterface
18 from vpp_ip_route import VppIpRoute, VppRoutePath
19 from vpp_papi import VppEnum
20 from util import StatsDiff
21
22
23 class TestNAT44ED(VppTestCase):
24     """NAT44ED Test Case"""
25
26     nat_addr = "10.0.10.3"
27
28     tcp_port_in = 6303
29     tcp_port_out = 6303
30
31     udp_port_in = 6304
32     udp_port_out = 6304
33
34     icmp_id_in = 6305
35     icmp_id_out = 6305
36
37     tcp_external_port = 80
38
39     max_sessions = 100
40
41     def setUp(self):
42         super().setUp()
43         self.plugin_enable()
44
45     def tearDown(self):
46         super().tearDown()
47         if not self.vpp_dead:
48             self.plugin_disable()
49
50     def plugin_enable(self):
51         self.vapi.nat44_ed_plugin_enable_disable(sessions=self.max_sessions, enable=1)
52
53     def plugin_disable(self):
54         self.vapi.nat44_ed_plugin_enable_disable(enable=0)
55
56     @property
57     def config_flags(self):
58         return VppEnum.vl_api_nat_config_flags_t
59
60     @property
61     def nat44_config_flags(self):
62         return VppEnum.vl_api_nat44_config_flags_t
63
64     @property
65     def syslog_severity(self):
66         return VppEnum.vl_api_syslog_severity_t
67
68     @property
69     def server_addr(self):
70         return self.pg1.remote_hosts[0].ip4
71
72     @staticmethod
73     def random_port():
74         return randint(1024, 65535)
75
76     @staticmethod
77     def proto2layer(proto):
78         if proto == IP_PROTOS.tcp:
79             return TCP
80         elif proto == IP_PROTOS.udp:
81             return UDP
82         elif proto == IP_PROTOS.icmp:
83             return ICMP
84         else:
85             raise Exception("Unsupported protocol")
86
87     @classmethod
88     def create_and_add_ip4_table(cls, i, table_id=0):
89         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": table_id})
90         i.set_table_ip4(table_id)
91
92     @classmethod
93     def configure_ip4_interface(cls, i, hosts=0, table_id=None):
94         if table_id:
95             cls.create_and_add_ip4_table(i, table_id)
96
97         i.admin_up()
98         i.config_ip4()
99         i.resolve_arp()
100
101         if hosts:
102             i.generate_remote_hosts(hosts)
103             i.configure_ipv4_neighbors()
104
105     @classmethod
106     def nat_add_interface_address(cls, i):
107         cls.vapi.nat44_add_del_interface_addr(sw_if_index=i.sw_if_index, is_add=1)
108
109     def nat_add_inside_interface(self, i):
110         self.vapi.nat44_interface_add_del_feature(
111             flags=self.config_flags.NAT_IS_INSIDE, sw_if_index=i.sw_if_index, is_add=1
112         )
113
114     def nat_add_outside_interface(self, i):
115         self.vapi.nat44_interface_add_del_feature(
116             flags=self.config_flags.NAT_IS_OUTSIDE, sw_if_index=i.sw_if_index, is_add=1
117         )
118
119     def nat_add_address(self, address, twice_nat=0, vrf_id=0xFFFFFFFF, is_add=1):
120         flags = self.config_flags.NAT_IS_TWICE_NAT if twice_nat else 0
121         self.vapi.nat44_add_del_address_range(
122             first_ip_address=address,
123             last_ip_address=address,
124             vrf_id=vrf_id,
125             is_add=is_add,
126             flags=flags,
127         )
128
129     def nat_add_static_mapping(
130         self,
131         local_ip,
132         external_ip="0.0.0.0",
133         local_port=0,
134         external_port=0,
135         vrf_id=0,
136         is_add=1,
137         external_sw_if_index=0xFFFFFFFF,
138         proto=0,
139         tag="",
140         flags=0,
141     ):
142
143         if not (local_port and external_port):
144             flags |= self.config_flags.NAT_IS_ADDR_ONLY
145
146         self.vapi.nat44_add_del_static_mapping(
147             is_add=is_add,
148             local_ip_address=local_ip,
149             external_ip_address=external_ip,
150             external_sw_if_index=external_sw_if_index,
151             local_port=local_port,
152             external_port=external_port,
153             vrf_id=vrf_id,
154             protocol=proto,
155             flags=flags,
156             tag=tag,
157         )
158
159     @classmethod
160     def setUpClass(cls):
161         super().setUpClass()
162
163         cls.create_pg_interfaces(range(12))
164         cls.interfaces = list(cls.pg_interfaces[:4])
165
166         cls.create_and_add_ip4_table(cls.pg2, 10)
167
168         for i in cls.interfaces:
169             cls.configure_ip4_interface(i, hosts=3)
170
171         # test specific (test-multiple-vrf)
172         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 1})
173
174         # test specific (test-one-armed-nat44-static)
175         cls.pg4.generate_remote_hosts(2)
176         cls.pg4.config_ip4()
177         cls.vapi.sw_interface_add_del_address(
178             sw_if_index=cls.pg4.sw_if_index, prefix="10.0.0.1/24"
179         )
180         cls.pg4.admin_up()
181         cls.pg4.resolve_arp()
182         cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
183         cls.pg4.resolve_arp()
184
185         # test specific interface (pg5)
186         cls.pg5._local_ip4 = "10.1.1.1"
187         cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
188         cls.pg5.set_table_ip4(1)
189         cls.pg5.config_ip4()
190         cls.pg5.admin_up()
191         cls.pg5.resolve_arp()
192
193         # test specific interface (pg6)
194         cls.pg6._local_ip4 = "10.1.2.1"
195         cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
196         cls.pg6.set_table_ip4(1)
197         cls.pg6.config_ip4()
198         cls.pg6.admin_up()
199         cls.pg6.resolve_arp()
200
201         rl = list()
202
203         rl.append(
204             VppIpRoute(
205                 cls,
206                 "0.0.0.0",
207                 0,
208                 [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=0)],
209                 register=False,
210                 table_id=1,
211             )
212         )
213         rl.append(
214             VppIpRoute(
215                 cls,
216                 "0.0.0.0",
217                 0,
218                 [VppRoutePath(cls.pg1.local_ip4, cls.pg1.sw_if_index)],
219                 register=False,
220             )
221         )
222         rl.append(
223             VppIpRoute(
224                 cls,
225                 cls.pg5.remote_ip4,
226                 32,
227                 [VppRoutePath("0.0.0.0", cls.pg5.sw_if_index)],
228                 register=False,
229                 table_id=1,
230             )
231         )
232         rl.append(
233             VppIpRoute(
234                 cls,
235                 cls.pg6.remote_ip4,
236                 32,
237                 [VppRoutePath("0.0.0.0", cls.pg6.sw_if_index)],
238                 register=False,
239                 table_id=1,
240             )
241         )
242         rl.append(
243             VppIpRoute(
244                 cls,
245                 cls.pg6.remote_ip4,
246                 16,
247                 [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=1)],
248                 register=False,
249                 table_id=0,
250             )
251         )
252
253         for r in rl:
254             r.add_vpp_config()
255
256         cls.no_diff = StatsDiff(
257             {
258                 pg.sw_if_index: {
259                     "/nat44-ed/in2out/fastpath/tcp": 0,
260                     "/nat44-ed/in2out/fastpath/udp": 0,
261                     "/nat44-ed/in2out/fastpath/icmp": 0,
262                     "/nat44-ed/in2out/fastpath/drops": 0,
263                     "/nat44-ed/in2out/slowpath/tcp": 0,
264                     "/nat44-ed/in2out/slowpath/udp": 0,
265                     "/nat44-ed/in2out/slowpath/icmp": 0,
266                     "/nat44-ed/in2out/slowpath/drops": 0,
267                     "/nat44-ed/in2out/fastpath/tcp": 0,
268                     "/nat44-ed/in2out/fastpath/udp": 0,
269                     "/nat44-ed/in2out/fastpath/icmp": 0,
270                     "/nat44-ed/in2out/fastpath/drops": 0,
271                     "/nat44-ed/in2out/slowpath/tcp": 0,
272                     "/nat44-ed/in2out/slowpath/udp": 0,
273                     "/nat44-ed/in2out/slowpath/icmp": 0,
274                     "/nat44-ed/in2out/slowpath/drops": 0,
275                 }
276                 for pg in cls.pg_interfaces
277             }
278         )
279
280     def get_err_counter(self, path):
281         return self.statistics.get_err_counter(path)
282
283     def reass_hairpinning(
284         self,
285         server_addr,
286         server_in_port,
287         server_out_port,
288         host_in_port,
289         proto=IP_PROTOS.tcp,
290         ignore_port=False,
291     ):
292         layer = self.proto2layer(proto)
293
294         if proto == IP_PROTOS.tcp:
295             data = b"A" * 4 + b"B" * 16 + b"C" * 3
296         else:
297             data = b"A" * 16 + b"B" * 16 + b"C" * 3
298
299         # send packet from host to server
300         pkts = self.create_stream_frag(
301             self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto
302         )
303         self.pg0.add_stream(pkts)
304         self.pg_enable_capture(self.pg_interfaces)
305         self.pg_start()
306         frags = self.pg0.get_capture(len(pkts))
307         p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr)
308         if proto != IP_PROTOS.icmp:
309             if not ignore_port:
310                 self.assertNotEqual(p[layer].sport, host_in_port)
311             self.assertEqual(p[layer].dport, server_in_port)
312         else:
313             if not ignore_port:
314                 self.assertNotEqual(p[layer].id, host_in_port)
315         self.assertEqual(data, p[Raw].load)
316
317     def frag_out_of_order(
318         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
319     ):
320         layer = self.proto2layer(proto)
321
322         if proto == IP_PROTOS.tcp:
323             data = b"A" * 4 + b"B" * 16 + b"C" * 3
324         else:
325             data = b"A" * 16 + b"B" * 16 + b"C" * 3
326         self.port_in = self.random_port()
327
328         for i in range(2):
329             # in2out
330             pkts = self.create_stream_frag(
331                 self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
332             )
333             pkts.reverse()
334             self.pg0.add_stream(pkts)
335             self.pg_enable_capture(self.pg_interfaces)
336             self.pg_start()
337             frags = self.pg1.get_capture(len(pkts))
338             if not dont_translate:
339                 p = self.reass_frags_and_verify(
340                     frags, self.nat_addr, self.pg1.remote_ip4
341                 )
342             else:
343                 p = self.reass_frags_and_verify(
344                     frags, self.pg0.remote_ip4, self.pg1.remote_ip4
345                 )
346             if proto != IP_PROTOS.icmp:
347                 if not dont_translate:
348                     self.assertEqual(p[layer].dport, 20)
349                     if not ignore_port:
350                         self.assertNotEqual(p[layer].sport, self.port_in)
351                 else:
352                     self.assertEqual(p[layer].sport, self.port_in)
353             else:
354                 if not ignore_port:
355                     if not dont_translate:
356                         self.assertNotEqual(p[layer].id, self.port_in)
357                     else:
358                         self.assertEqual(p[layer].id, self.port_in)
359             self.assertEqual(data, p[Raw].load)
360
361             # out2in
362             if not dont_translate:
363                 dst_addr = self.nat_addr
364             else:
365                 dst_addr = self.pg0.remote_ip4
366             if proto != IP_PROTOS.icmp:
367                 sport = 20
368                 dport = p[layer].sport
369             else:
370                 sport = p[layer].id
371                 dport = 0
372             pkts = self.create_stream_frag(
373                 self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
374             )
375             pkts.reverse()
376             self.pg1.add_stream(pkts)
377             self.pg_enable_capture(self.pg_interfaces)
378             self.logger.info(self.vapi.cli("show trace"))
379             self.pg_start()
380             frags = self.pg0.get_capture(len(pkts))
381             p = self.reass_frags_and_verify(
382                 frags, self.pg1.remote_ip4, self.pg0.remote_ip4
383             )
384             if proto != IP_PROTOS.icmp:
385                 self.assertEqual(p[layer].sport, 20)
386                 self.assertEqual(p[layer].dport, self.port_in)
387             else:
388                 self.assertEqual(p[layer].id, self.port_in)
389             self.assertEqual(data, p[Raw].load)
390
391     def reass_frags_and_verify(self, frags, src, dst):
392         buffer = BytesIO()
393         for p in frags:
394             self.assertEqual(p[IP].src, src)
395             self.assertEqual(p[IP].dst, dst)
396             self.assert_ip_checksum_valid(p)
397             buffer.seek(p[IP].frag * 8)
398             buffer.write(bytes(p[IP].payload))
399         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
400         if ip.proto == IP_PROTOS.tcp:
401             p = ip / TCP(buffer.getvalue())
402             self.logger.debug(ppp("Reassembled:", p))
403             self.assert_tcp_checksum_valid(p)
404         elif ip.proto == IP_PROTOS.udp:
405             p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
406         elif ip.proto == IP_PROTOS.icmp:
407             p = ip / ICMP(buffer.getvalue())
408         return p
409
410     def frag_in_order(
411         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
412     ):
413         layer = self.proto2layer(proto)
414
415         if proto == IP_PROTOS.tcp:
416             data = b"A" * 4 + b"B" * 16 + b"C" * 3
417         else:
418             data = b"A" * 16 + b"B" * 16 + b"C" * 3
419         self.port_in = self.random_port()
420
421         # in2out
422         pkts = self.create_stream_frag(
423             self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
424         )
425         self.pg0.add_stream(pkts)
426         self.pg_enable_capture(self.pg_interfaces)
427         self.pg_start()
428         frags = self.pg1.get_capture(len(pkts))
429         if not dont_translate:
430             p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
431         else:
432             p = self.reass_frags_and_verify(
433                 frags, self.pg0.remote_ip4, self.pg1.remote_ip4
434             )
435         if proto != IP_PROTOS.icmp:
436             if not dont_translate:
437                 self.assertEqual(p[layer].dport, 20)
438                 if not ignore_port:
439                     self.assertNotEqual(p[layer].sport, self.port_in)
440             else:
441                 self.assertEqual(p[layer].sport, self.port_in)
442         else:
443             if not ignore_port:
444                 if not dont_translate:
445                     self.assertNotEqual(p[layer].id, self.port_in)
446                 else:
447                     self.assertEqual(p[layer].id, self.port_in)
448         self.assertEqual(data, p[Raw].load)
449
450         # out2in
451         if not dont_translate:
452             dst_addr = self.nat_addr
453         else:
454             dst_addr = self.pg0.remote_ip4
455         if proto != IP_PROTOS.icmp:
456             sport = 20
457             dport = p[layer].sport
458         else:
459             sport = p[layer].id
460             dport = 0
461         pkts = self.create_stream_frag(
462             self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
463         )
464         self.pg1.add_stream(pkts)
465         self.pg_enable_capture(self.pg_interfaces)
466         self.pg_start()
467         frags = self.pg0.get_capture(len(pkts))
468         p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
469         if proto != IP_PROTOS.icmp:
470             self.assertEqual(p[layer].sport, 20)
471             self.assertEqual(p[layer].dport, self.port_in)
472         else:
473             self.assertEqual(p[layer].id, self.port_in)
474         self.assertEqual(data, p[Raw].load)
475
476     def verify_capture_out(
477         self, capture, nat_ip=None, same_port=False, dst_ip=None, ignore_port=False
478     ):
479         if nat_ip is None:
480             nat_ip = self.nat_addr
481         for packet in capture:
482             try:
483                 self.assert_packet_checksums_valid(packet)
484                 self.assertEqual(packet[IP].src, nat_ip)
485                 if dst_ip is not None:
486                     self.assertEqual(packet[IP].dst, dst_ip)
487                 if packet.haslayer(TCP):
488                     if not ignore_port:
489                         if same_port:
490                             self.assertEqual(packet[TCP].sport, self.tcp_port_in)
491                         else:
492                             self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
493                     self.tcp_port_out = packet[TCP].sport
494                     self.assert_packet_checksums_valid(packet)
495                 elif packet.haslayer(UDP):
496                     if not ignore_port:
497                         if same_port:
498                             self.assertEqual(packet[UDP].sport, self.udp_port_in)
499                         else:
500                             self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
501                     self.udp_port_out = packet[UDP].sport
502                 else:
503                     if not ignore_port:
504                         if same_port:
505                             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
506                         else:
507                             self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
508                     self.icmp_id_out = packet[ICMP].id
509                     self.assert_packet_checksums_valid(packet)
510             except:
511                 self.logger.error(
512                     ppp("Unexpected or invalid packet (outside network):", packet)
513                 )
514                 raise
515
516     def verify_capture_in(self, capture, in_if):
517         for packet in capture:
518             try:
519                 self.assert_packet_checksums_valid(packet)
520                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
521                 if packet.haslayer(TCP):
522                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
523                 elif packet.haslayer(UDP):
524                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
525                 else:
526                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
527             except:
528                 self.logger.error(
529                     ppp("Unexpected or invalid packet (inside network):", packet)
530                 )
531                 raise
532
533     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
534         if dst_ip is None:
535             dst_ip = out_if.remote_ip4
536
537         pkts = []
538         # TCP
539         p = (
540             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
541             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
542             / TCP(sport=self.tcp_port_in, dport=20)
543         )
544         pkts.extend([p, p])
545
546         # UDP
547         p = (
548             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
549             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
550             / UDP(sport=self.udp_port_in, dport=20)
551         )
552         pkts.append(p)
553
554         # ICMP
555         p = (
556             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
557             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
558             / ICMP(id=self.icmp_id_in, type="echo-request")
559         )
560         pkts.append(p)
561
562         return pkts
563
564     def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
565         if dst_ip is None:
566             dst_ip = self.nat_addr
567         if not use_inside_ports:
568             tcp_port = self.tcp_port_out
569             udp_port = self.udp_port_out
570             icmp_id = self.icmp_id_out
571         else:
572             tcp_port = self.tcp_port_in
573             udp_port = self.udp_port_in
574             icmp_id = self.icmp_id_in
575         pkts = []
576         # TCP
577         p = (
578             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
579             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
580             / TCP(dport=tcp_port, sport=20)
581         )
582         pkts.extend([p, p])
583
584         # UDP
585         p = (
586             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
587             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
588             / UDP(dport=udp_port, sport=20)
589         )
590         pkts.append(p)
591
592         # ICMP
593         p = (
594             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
595             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
596             / ICMP(id=icmp_id, type="echo-reply")
597         )
598         pkts.append(p)
599
600         return pkts
601
602     def create_tcp_stream(self, in_if, out_if, count):
603         pkts = []
604         port = 6303
605
606         for i in range(count):
607             p = (
608                 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
609                 / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64)
610                 / TCP(sport=port + i, dport=20)
611             )
612             pkts.append(p)
613
614         return pkts
615
616     def create_stream_frag(
617         self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
618     ):
619         if proto == IP_PROTOS.tcp:
620             p = (
621                 IP(src=src_if.remote_ip4, dst=dst)
622                 / TCP(sport=sport, dport=dport)
623                 / Raw(data)
624             )
625             p = p.__class__(scapy.compat.raw(p))
626             chksum = p[TCP].chksum
627             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
628         elif proto == IP_PROTOS.udp:
629             proto_header = UDP(sport=sport, dport=dport)
630         elif proto == IP_PROTOS.icmp:
631             if not echo_reply:
632                 proto_header = ICMP(id=sport, type="echo-request")
633             else:
634                 proto_header = ICMP(id=sport, type="echo-reply")
635         else:
636             raise Exception("Unsupported protocol")
637         id = self.random_port()
638         pkts = []
639         if proto == IP_PROTOS.tcp:
640             raw = Raw(data[0:4])
641         else:
642             raw = Raw(data[0:16])
643         p = (
644             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
645             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
646             / proto_header
647             / raw
648         )
649         pkts.append(p)
650         if proto == IP_PROTOS.tcp:
651             raw = Raw(data[4:20])
652         else:
653             raw = Raw(data[16:32])
654         p = (
655             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
656             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
657             / raw
658         )
659         pkts.append(p)
660         if proto == IP_PROTOS.tcp:
661             raw = Raw(data[20:])
662         else:
663             raw = Raw(data[32:])
664         p = (
665             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
666             / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
667             / raw
668         )
669         pkts.append(p)
670         return pkts
671
672     def frag_in_order_in_plus_out(
673         self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
674     ):
675
676         layer = self.proto2layer(proto)
677
678         if proto == IP_PROTOS.tcp:
679             data = b"A" * 4 + b"B" * 16 + b"C" * 3
680         else:
681             data = b"A" * 16 + b"B" * 16 + b"C" * 3
682         port_in = self.random_port()
683
684         for i in range(2):
685             # out2in
686             pkts = self.create_stream_frag(
687                 self.pg0, out_addr, port_in, out_port, data, proto
688             )
689             self.pg0.add_stream(pkts)
690             self.pg_enable_capture(self.pg_interfaces)
691             self.pg_start()
692             frags = self.pg1.get_capture(len(pkts))
693             p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr)
694             if proto != IP_PROTOS.icmp:
695                 self.assertEqual(p[layer].sport, port_in)
696                 self.assertEqual(p[layer].dport, in_port)
697             else:
698                 self.assertEqual(p[layer].id, port_in)
699             self.assertEqual(data, p[Raw].load)
700
701             # in2out
702             if proto != IP_PROTOS.icmp:
703                 pkts = self.create_stream_frag(
704                     self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto
705                 )
706             else:
707                 pkts = self.create_stream_frag(
708                     self.pg1,
709                     self.pg0.remote_ip4,
710                     p[layer].id,
711                     0,
712                     data,
713                     proto,
714                     echo_reply=True,
715                 )
716             self.pg1.add_stream(pkts)
717             self.pg_enable_capture(self.pg_interfaces)
718             self.pg_start()
719             frags = self.pg0.get_capture(len(pkts))
720             p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4)
721             if proto != IP_PROTOS.icmp:
722                 self.assertEqual(p[layer].sport, out_port)
723                 self.assertEqual(p[layer].dport, port_in)
724             else:
725                 self.assertEqual(p[layer].id, port_in)
726             self.assertEqual(data, p[Raw].load)
727
728     def frag_out_of_order_in_plus_out(
729         self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
730     ):
731
732         layer = self.proto2layer(proto)
733
734         if proto == IP_PROTOS.tcp:
735             data = b"A" * 4 + b"B" * 16 + b"C" * 3
736         else:
737             data = b"A" * 16 + b"B" * 16 + b"C" * 3
738         port_in = self.random_port()
739
740         for i in range(2):
741             # out2in
742             pkts = self.create_stream_frag(
743                 self.pg0, out_addr, port_in, out_port, data, proto
744             )
745             pkts.reverse()
746             self.pg0.add_stream(pkts)
747             self.pg_enable_capture(self.pg_interfaces)
748             self.pg_start()
749             frags = self.pg1.get_capture(len(pkts))
750             p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr)
751             if proto != IP_PROTOS.icmp:
752                 self.assertEqual(p[layer].dport, in_port)
753                 self.assertEqual(p[layer].sport, port_in)
754                 self.assertEqual(p[layer].dport, in_port)
755             else:
756                 self.assertEqual(p[layer].id, port_in)
757             self.assertEqual(data, p[Raw].load)
758
759             # in2out
760             if proto != IP_PROTOS.icmp:
761                 pkts = self.create_stream_frag(
762                     self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto
763                 )
764             else:
765                 pkts = self.create_stream_frag(
766                     self.pg1,
767                     self.pg0.remote_ip4,
768                     p[layer].id,
769                     0,
770                     data,
771                     proto,
772                     echo_reply=True,
773                 )
774             pkts.reverse()
775             self.pg1.add_stream(pkts)
776             self.pg_enable_capture(self.pg_interfaces)
777             self.pg_start()
778             frags = self.pg0.get_capture(len(pkts))
779             p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4)
780             if proto != IP_PROTOS.icmp:
781                 self.assertEqual(p[layer].sport, out_port)
782                 self.assertEqual(p[layer].dport, port_in)
783             else:
784                 self.assertEqual(p[layer].id, port_in)
785             self.assertEqual(data, p[Raw].load)
786
787     def init_tcp_session(self, in_if, out_if, in_port, ext_port):
788         # SYN packet in->out
789         p = (
790             Ether(src=in_if.remote_mac, dst=in_if.local_mac)
791             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
792             / TCP(sport=in_port, dport=ext_port, flags="S")
793         )
794         in_if.add_stream(p)
795         self.pg_enable_capture(self.pg_interfaces)
796         self.pg_start()
797         capture = out_if.get_capture(1)
798         p = capture[0]
799         out_port = p[TCP].sport
800
801         # SYN + ACK packet out->in
802         p = (
803             Ether(src=out_if.remote_mac, dst=out_if.local_mac)
804             / IP(src=out_if.remote_ip4, dst=self.nat_addr)
805             / TCP(sport=ext_port, dport=out_port, flags="SA")
806         )
807         out_if.add_stream(p)
808         self.pg_enable_capture(self.pg_interfaces)
809         self.pg_start()
810         in_if.get_capture(1)
811
812         # ACK packet in->out
813         p = (
814             Ether(src=in_if.remote_mac, dst=in_if.local_mac)
815             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
816             / TCP(sport=in_port, dport=ext_port, flags="A")
817         )
818         in_if.add_stream(p)
819         self.pg_enable_capture(self.pg_interfaces)
820         self.pg_start()
821         out_if.get_capture(1)
822
823         return out_port
824
825     def twice_nat_common(
826         self, self_twice_nat=False, same_pg=False, lb=False, client_id=None
827     ):
828         twice_nat_addr = "10.0.1.3"
829
830         port_in = 8080
831         if lb:
832             if not same_pg:
833                 port_in1 = port_in
834                 port_in2 = port_in
835             else:
836                 port_in1 = port_in + 1
837                 port_in2 = port_in + 2
838
839         port_out = 80
840         eh_port_out = 4567
841
842         server1 = self.pg0.remote_hosts[0]
843         server2 = self.pg0.remote_hosts[1]
844         if lb and same_pg:
845             server2 = server1
846         if not lb:
847             server = server1
848
849         pg0 = self.pg0
850         if same_pg:
851             pg1 = self.pg0
852         else:
853             pg1 = self.pg1
854
855         eh_translate = (not self_twice_nat) or (not lb and same_pg) or client_id == 1
856
857         self.nat_add_address(self.nat_addr)
858         self.nat_add_address(twice_nat_addr, twice_nat=1)
859
860         flags = 0
861         if self_twice_nat:
862             flags |= self.config_flags.NAT_IS_SELF_TWICE_NAT
863         else:
864             flags |= self.config_flags.NAT_IS_TWICE_NAT
865
866         if not lb:
867             self.nat_add_static_mapping(
868                 pg0.remote_ip4,
869                 self.nat_addr,
870                 port_in,
871                 port_out,
872                 proto=IP_PROTOS.tcp,
873                 flags=flags,
874             )
875         else:
876             locals = [
877                 {"addr": server1.ip4, "port": port_in1, "probability": 50, "vrf_id": 0},
878                 {"addr": server2.ip4, "port": port_in2, "probability": 50, "vrf_id": 0},
879             ]
880             out_addr = self.nat_addr
881
882             self.vapi.nat44_add_del_lb_static_mapping(
883                 is_add=1,
884                 flags=flags,
885                 external_addr=out_addr,
886                 external_port=port_out,
887                 protocol=IP_PROTOS.tcp,
888                 local_num=len(locals),
889                 locals=locals,
890             )
891         self.nat_add_inside_interface(pg0)
892         self.nat_add_outside_interface(pg1)
893
894         if same_pg:
895             if not lb:
896                 client = server
897             else:
898                 assert client_id is not None
899                 if client_id == 1:
900                     client = self.pg0.remote_hosts[0]
901                 elif client_id == 2:
902                     client = self.pg0.remote_hosts[1]
903         else:
904             client = pg1.remote_hosts[0]
905         p = (
906             Ether(src=pg1.remote_mac, dst=pg1.local_mac)
907             / IP(src=client.ip4, dst=self.nat_addr)
908             / TCP(sport=eh_port_out, dport=port_out)
909         )
910         pg1.add_stream(p)
911         self.pg_enable_capture(self.pg_interfaces)
912         self.pg_start()
913         capture = pg0.get_capture(1)
914         p = capture[0]
915         try:
916             ip = p[IP]
917             tcp = p[TCP]
918             if lb:
919                 if ip.dst == server1.ip4:
920                     server = server1
921                     port_in = port_in1
922                 else:
923                     server = server2
924                     port_in = port_in2
925             self.assertEqual(ip.dst, server.ip4)
926             if lb and same_pg:
927                 self.assertIn(tcp.dport, [port_in1, port_in2])
928             else:
929                 self.assertEqual(tcp.dport, port_in)
930             if eh_translate:
931                 self.assertEqual(ip.src, twice_nat_addr)
932                 self.assertNotEqual(tcp.sport, eh_port_out)
933             else:
934                 self.assertEqual(ip.src, client.ip4)
935                 self.assertEqual(tcp.sport, eh_port_out)
936             eh_addr_in = ip.src
937             eh_port_in = tcp.sport
938             saved_port_in = tcp.dport
939             self.assert_packet_checksums_valid(p)
940         except:
941             self.logger.error(ppp("Unexpected or invalid packet:", p))
942             raise
943
944         p = (
945             Ether(src=server.mac, dst=pg0.local_mac)
946             / IP(src=server.ip4, dst=eh_addr_in)
947             / TCP(sport=saved_port_in, dport=eh_port_in)
948         )
949         pg0.add_stream(p)
950         self.pg_enable_capture(self.pg_interfaces)
951         self.pg_start()
952         capture = pg1.get_capture(1)
953         p = capture[0]
954         try:
955             ip = p[IP]
956             tcp = p[TCP]
957             self.assertEqual(ip.dst, client.ip4)
958             self.assertEqual(ip.src, self.nat_addr)
959             self.assertEqual(tcp.dport, eh_port_out)
960             self.assertEqual(tcp.sport, port_out)
961             self.assert_packet_checksums_valid(p)
962         except:
963             self.logger.error(ppp("Unexpected or invalid packet:", p))
964             raise
965
966         if eh_translate:
967             sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
968             self.assertEqual(len(sessions), 1)
969             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
970             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_TWICE_NAT)
971             self.logger.info(self.vapi.cli("show nat44 sessions"))
972             self.vapi.nat44_del_session(
973                 address=sessions[0].inside_ip_address,
974                 port=sessions[0].inside_port,
975                 protocol=sessions[0].protocol,
976                 flags=(
977                     self.config_flags.NAT_IS_INSIDE
978                     | self.config_flags.NAT_IS_EXT_HOST_VALID
979                 ),
980                 ext_host_address=sessions[0].ext_host_nat_address,
981                 ext_host_port=sessions[0].ext_host_nat_port,
982             )
983             sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
984             self.assertEqual(len(sessions), 0)
985
986     def verify_syslog_sess(self, data, msgid, is_ip6=False):
987         message = data.decode("utf-8")
988         try:
989             message = SyslogMessage.parse(message)
990         except ParseError as e:
991             self.logger.error(e)
992             raise
993         else:
994             self.assertEqual(message.severity, SyslogSeverity.info)
995             self.assertEqual(message.appname, "NAT")
996             self.assertEqual(message.msgid, msgid)
997             sd_params = message.sd.get("nsess")
998             self.assertTrue(sd_params is not None)
999             if is_ip6:
1000                 self.assertEqual(sd_params.get("IATYP"), "IPv6")
1001                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip6)
1002             else:
1003                 self.assertEqual(sd_params.get("IATYP"), "IPv4")
1004                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
1005                 self.assertTrue(sd_params.get("SSUBIX") is not None)
1006             self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
1007             self.assertEqual(sd_params.get("XATYP"), "IPv4")
1008             self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
1009             self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
1010             self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
1011             self.assertEqual(sd_params.get("SVLAN"), "0")
1012             self.assertEqual(sd_params.get("XDADDR"), self.pg1.remote_ip4)
1013             self.assertEqual(sd_params.get("XDPORT"), "%d" % self.tcp_external_port)
1014
1015     def test_icmp_error(self):
1016         """NAT44ED test ICMP error message with inner header"""
1017
1018         payload = "H" * 10
1019
1020         self.nat_add_address(self.nat_addr)
1021         self.nat_add_inside_interface(self.pg0)
1022         self.nat_add_outside_interface(self.pg1)
1023
1024         # in2out (initiate connection)
1025         p1 = [
1026             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1027             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1028             / UDP(sport=21, dport=20)
1029             / payload,
1030             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1031             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1032             / TCP(sport=21, dport=20, flags="S")
1033             / payload,
1034             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1035             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1036             / ICMP(type="echo-request", id=7777)
1037             / payload,
1038         ]
1039
1040         capture = self.send_and_expect(self.pg0, p1, self.pg1)
1041
1042         # out2in (send error message)
1043         p2 = [
1044             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1045             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1046             / ICMP(type="dest-unreach", code="port-unreachable")
1047             / c[IP:]
1048             for c in capture
1049         ]
1050
1051         capture = self.send_and_expect(self.pg1, p2, self.pg0)
1052
1053         for c in capture:
1054             try:
1055                 assert c[IP].dst == self.pg0.remote_ip4
1056                 assert c[IPerror].src == self.pg0.remote_ip4
1057             except AssertionError as a:
1058                 raise AssertionError(f"Packet {pr(c)} not translated properly") from a
1059
1060     def test_icmp_echo_reply_trailer(self):
1061         """ICMP echo reply with ethernet trailer"""
1062
1063         self.nat_add_address(self.nat_addr)
1064         self.nat_add_inside_interface(self.pg0)
1065         self.nat_add_outside_interface(self.pg1)
1066
1067         # in2out
1068         p1 = (
1069             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1070             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1071             / ICMP(type=8, id=0xABCD, seq=0)
1072         )
1073
1074         self.pg0.add_stream(p1)
1075         self.pg_enable_capture(self.pg_interfaces)
1076         self.pg_start()
1077         c = self.pg1.get_capture(1)[0]
1078
1079         self.logger.debug(self.vapi.cli("show trace"))
1080
1081         # out2in
1082         p2 = (
1083             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1084             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, id=0xEE59)
1085             / ICMP(type=0, id=c[ICMP].id, seq=0)
1086         )
1087
1088         # force checksum calculation
1089         p2 = p2.__class__(bytes(p2))
1090
1091         self.logger.debug(ppp("Packet before modification:", p2))
1092
1093         # hex representation of vss monitoring ethernet trailer
1094         # this seems to be just added to end of packet without modifying
1095         # IP or ICMP lengths / checksums
1096         p2 = p2 / Raw("\x00\x00\x52\x54\x00\x46\xab\x04\x84\x18")
1097         # change it so that IP/ICMP is unaffected
1098         p2[IP].len = 28
1099
1100         self.logger.debug(ppp("Packet with added trailer:", p2))
1101
1102         self.pg1.add_stream(p2)
1103         self.pg_enable_capture(self.pg_interfaces)
1104         self.pg_start()
1105
1106         self.pg0.get_capture(1)
1107
1108     def test_users_dump(self):
1109         """NAT44ED API test - nat44_user_dump"""
1110
1111         self.nat_add_address(self.nat_addr)
1112         self.nat_add_inside_interface(self.pg0)
1113         self.nat_add_outside_interface(self.pg1)
1114
1115         self.vapi.nat44_forwarding_enable_disable(enable=1)
1116
1117         local_ip = self.pg0.remote_ip4
1118         external_ip = self.nat_addr
1119         self.nat_add_static_mapping(local_ip, external_ip)
1120
1121         users = self.vapi.nat44_user_dump()
1122         self.assertEqual(len(users), 0)
1123
1124         # in2out - static mapping match
1125
1126         pkts = self.create_stream_out(self.pg1)
1127         self.pg1.add_stream(pkts)
1128         self.pg_enable_capture(self.pg_interfaces)
1129         self.pg_start()
1130         capture = self.pg0.get_capture(len(pkts))
1131         self.verify_capture_in(capture, self.pg0)
1132
1133         pkts = self.create_stream_in(self.pg0, self.pg1)
1134         self.pg0.add_stream(pkts)
1135         self.pg_enable_capture(self.pg_interfaces)
1136         self.pg_start()
1137         capture = self.pg1.get_capture(len(pkts))
1138         self.verify_capture_out(capture, same_port=True)
1139
1140         users = self.vapi.nat44_user_dump()
1141         self.assertEqual(len(users), 1)
1142         static_user = users[0]
1143         self.assertEqual(static_user.nstaticsessions, 3)
1144         self.assertEqual(static_user.nsessions, 0)
1145
1146         # in2out - no static mapping match (forwarding test)
1147
1148         host0 = self.pg0.remote_hosts[0]
1149         self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1150         try:
1151             pkts = self.create_stream_out(
1152                 self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1153             )
1154             self.pg1.add_stream(pkts)
1155             self.pg_enable_capture(self.pg_interfaces)
1156             self.pg_start()
1157             capture = self.pg0.get_capture(len(pkts))
1158             self.verify_capture_in(capture, self.pg0)
1159
1160             pkts = self.create_stream_in(self.pg0, self.pg1)
1161             self.pg0.add_stream(pkts)
1162             self.pg_enable_capture(self.pg_interfaces)
1163             self.pg_start()
1164             capture = self.pg1.get_capture(len(pkts))
1165             self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, same_port=True)
1166         finally:
1167             self.pg0.remote_hosts[0] = host0
1168
1169         users = self.vapi.nat44_user_dump()
1170         self.assertEqual(len(users), 2)
1171         if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4:
1172             non_static_user = users[1]
1173             static_user = users[0]
1174         else:
1175             non_static_user = users[0]
1176             static_user = users[1]
1177         self.assertEqual(static_user.nstaticsessions, 3)
1178         self.assertEqual(static_user.nsessions, 0)
1179         self.assertEqual(non_static_user.nstaticsessions, 0)
1180         self.assertEqual(non_static_user.nsessions, 3)
1181
1182         users = self.vapi.nat44_user_dump()
1183         self.assertEqual(len(users), 2)
1184         if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4:
1185             non_static_user = users[1]
1186             static_user = users[0]
1187         else:
1188             non_static_user = users[0]
1189             static_user = users[1]
1190         self.assertEqual(static_user.nstaticsessions, 3)
1191         self.assertEqual(static_user.nsessions, 0)
1192         self.assertEqual(non_static_user.nstaticsessions, 0)
1193         self.assertEqual(non_static_user.nsessions, 3)
1194
1195     def test_frag_out_of_order_do_not_translate(self):
1196         """NAT44ED don't translate fragments arriving out of order"""
1197         self.nat_add_inside_interface(self.pg0)
1198         self.nat_add_outside_interface(self.pg1)
1199         self.vapi.nat44_forwarding_enable_disable(enable=True)
1200         self.frag_out_of_order(proto=IP_PROTOS.tcp, dont_translate=True)
1201
1202     def test_forwarding(self):
1203         """NAT44ED forwarding test"""
1204
1205         self.nat_add_inside_interface(self.pg0)
1206         self.nat_add_outside_interface(self.pg1)
1207         self.vapi.nat44_forwarding_enable_disable(enable=1)
1208
1209         real_ip = self.pg0.remote_ip4
1210         alias_ip = self.nat_addr
1211         flags = self.config_flags.NAT_IS_ADDR_ONLY
1212         self.vapi.nat44_add_del_static_mapping(
1213             is_add=1,
1214             local_ip_address=real_ip,
1215             external_ip_address=alias_ip,
1216             external_sw_if_index=0xFFFFFFFF,
1217             flags=flags,
1218         )
1219
1220         try:
1221             # in2out - static mapping match
1222
1223             pkts = self.create_stream_out(self.pg1)
1224             self.pg1.add_stream(pkts)
1225             self.pg_enable_capture(self.pg_interfaces)
1226             self.pg_start()
1227             capture = self.pg0.get_capture(len(pkts))
1228             self.verify_capture_in(capture, self.pg0)
1229
1230             pkts = self.create_stream_in(self.pg0, self.pg1)
1231             self.pg0.add_stream(pkts)
1232             self.pg_enable_capture(self.pg_interfaces)
1233             self.pg_start()
1234             capture = self.pg1.get_capture(len(pkts))
1235             self.verify_capture_out(capture, same_port=True)
1236
1237             # in2out - no static mapping match
1238
1239             host0 = self.pg0.remote_hosts[0]
1240             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1241             try:
1242                 pkts = self.create_stream_out(
1243                     self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1244                 )
1245                 self.pg1.add_stream(pkts)
1246                 self.pg_enable_capture(self.pg_interfaces)
1247                 self.pg_start()
1248                 capture = self.pg0.get_capture(len(pkts))
1249                 self.verify_capture_in(capture, self.pg0)
1250
1251                 pkts = self.create_stream_in(self.pg0, self.pg1)
1252                 self.pg0.add_stream(pkts)
1253                 self.pg_enable_capture(self.pg_interfaces)
1254                 self.pg_start()
1255                 capture = self.pg1.get_capture(len(pkts))
1256                 self.verify_capture_out(
1257                     capture, nat_ip=self.pg0.remote_ip4, same_port=True
1258                 )
1259             finally:
1260                 self.pg0.remote_hosts[0] = host0
1261
1262             user = self.pg0.remote_hosts[1]
1263             sessions = self.vapi.nat44_user_session_dump(user.ip4, 0)
1264             self.assertEqual(len(sessions), 3)
1265             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1266             self.vapi.nat44_del_session(
1267                 address=sessions[0].inside_ip_address,
1268                 port=sessions[0].inside_port,
1269                 protocol=sessions[0].protocol,
1270                 flags=(
1271                     self.config_flags.NAT_IS_INSIDE
1272                     | self.config_flags.NAT_IS_EXT_HOST_VALID
1273                 ),
1274                 ext_host_address=sessions[0].ext_host_address,
1275                 ext_host_port=sessions[0].ext_host_port,
1276             )
1277             sessions = self.vapi.nat44_user_session_dump(user.ip4, 0)
1278             self.assertEqual(len(sessions), 2)
1279
1280         finally:
1281             self.vapi.nat44_forwarding_enable_disable(enable=0)
1282             flags = self.config_flags.NAT_IS_ADDR_ONLY
1283             self.vapi.nat44_add_del_static_mapping(
1284                 is_add=0,
1285                 local_ip_address=real_ip,
1286                 external_ip_address=alias_ip,
1287                 external_sw_if_index=0xFFFFFFFF,
1288                 flags=flags,
1289             )
1290
1291     def test_output_feature_and_service2(self):
1292         """NAT44ED interface output feature and service host direct access"""
1293         self.vapi.nat44_forwarding_enable_disable(enable=1)
1294         self.nat_add_address(self.nat_addr)
1295
1296         self.vapi.nat44_ed_add_del_output_interface(
1297             sw_if_index=self.pg1.sw_if_index, is_add=1
1298         )
1299
1300         # session initiated from service host - translate
1301         pkts = self.create_stream_in(self.pg0, self.pg1)
1302         self.pg0.add_stream(pkts)
1303         self.pg_enable_capture(self.pg_interfaces)
1304         self.pg_start()
1305         capture = self.pg1.get_capture(len(pkts))
1306         self.verify_capture_out(capture, ignore_port=True)
1307
1308         pkts = self.create_stream_out(self.pg1)
1309         self.pg1.add_stream(pkts)
1310         self.pg_enable_capture(self.pg_interfaces)
1311         self.pg_start()
1312         capture = self.pg0.get_capture(len(pkts))
1313         self.verify_capture_in(capture, self.pg0)
1314
1315         # session initiated from remote host - do not translate
1316         tcp_port_in = self.tcp_port_in
1317         udp_port_in = self.udp_port_in
1318         icmp_id_in = self.icmp_id_in
1319
1320         self.tcp_port_in = 60303
1321         self.udp_port_in = 60304
1322         self.icmp_id_in = 60305
1323
1324         try:
1325             pkts = self.create_stream_out(
1326                 self.pg1, self.pg0.remote_ip4, use_inside_ports=True
1327             )
1328             self.pg1.add_stream(pkts)
1329             self.pg_enable_capture(self.pg_interfaces)
1330             self.pg_start()
1331             capture = self.pg0.get_capture(len(pkts))
1332             self.verify_capture_in(capture, self.pg0)
1333
1334             pkts = self.create_stream_in(self.pg0, self.pg1)
1335             self.pg0.add_stream(pkts)
1336             self.pg_enable_capture(self.pg_interfaces)
1337             self.pg_start()
1338             capture = self.pg1.get_capture(len(pkts))
1339             self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, same_port=True)
1340         finally:
1341             self.tcp_port_in = tcp_port_in
1342             self.udp_port_in = udp_port_in
1343             self.icmp_id_in = icmp_id_in
1344
1345     def test_twice_nat(self):
1346         """NAT44ED Twice NAT"""
1347         self.twice_nat_common()
1348
1349     def test_self_twice_nat_positive(self):
1350         """NAT44ED Self Twice NAT (positive test)"""
1351         self.twice_nat_common(self_twice_nat=True, same_pg=True)
1352
1353     def test_self_twice_nat_lb_positive(self):
1354         """NAT44ED Self Twice NAT local service load balancing (positive test)"""
1355         self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, client_id=1)
1356
1357     def test_twice_nat_lb(self):
1358         """NAT44ED Twice NAT local service load balancing"""
1359         self.twice_nat_common(lb=True)
1360
1361     def test_output_feature(self):
1362         """NAT44ED interface output feature (in2out postrouting)"""
1363         self.vapi.nat44_forwarding_enable_disable(enable=1)
1364         self.nat_add_address(self.nat_addr)
1365
1366         self.nat_add_outside_interface(self.pg0)
1367         self.vapi.nat44_ed_add_del_output_interface(
1368             sw_if_index=self.pg1.sw_if_index, is_add=1
1369         )
1370
1371         # in2out
1372         pkts = self.create_stream_in(self.pg0, self.pg1)
1373         self.pg0.add_stream(pkts)
1374         self.pg_enable_capture(self.pg_interfaces)
1375         self.pg_start()
1376         capture = self.pg1.get_capture(len(pkts))
1377         self.verify_capture_out(capture, ignore_port=True)
1378         self.logger.debug(self.vapi.cli("show trace"))
1379
1380         # out2in
1381         pkts = self.create_stream_out(self.pg1)
1382         self.pg1.add_stream(pkts)
1383         self.pg_enable_capture(self.pg_interfaces)
1384         self.pg_start()
1385         capture = self.pg0.get_capture(len(pkts))
1386         self.verify_capture_in(capture, self.pg0)
1387         self.logger.debug(self.vapi.cli("show trace"))
1388
1389         # in2out
1390         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1391         self.pg0.add_stream(pkts)
1392         self.pg_enable_capture(self.pg_interfaces)
1393         self.pg_start()
1394         capture = self.pg1.get_capture(len(pkts))
1395         self.verify_capture_out(capture, ignore_port=True)
1396         self.logger.debug(self.vapi.cli("show trace"))
1397
1398         # out2in
1399         pkts = self.create_stream_out(self.pg1, ttl=2)
1400         self.pg1.add_stream(pkts)
1401         self.pg_enable_capture(self.pg_interfaces)
1402         self.pg_start()
1403         capture = self.pg0.get_capture(len(pkts))
1404         self.verify_capture_in(capture, self.pg0)
1405         self.logger.debug(self.vapi.cli("show trace"))
1406
1407         # in2out
1408         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1409         capture = self.send_and_expect_some(self.pg0, pkts, self.pg0)
1410         for p in capture:
1411             self.assertIn(ICMP, p)
1412             self.assertEqual(p[ICMP].type, 11)  # 11 == time-exceeded
1413
1414     def test_static_with_port_out2(self):
1415         """NAT44ED 1:1 NAPT asymmetrical rule"""
1416
1417         external_port = 80
1418         local_port = 8080
1419
1420         self.vapi.nat44_forwarding_enable_disable(enable=1)
1421         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1422         self.nat_add_static_mapping(
1423             self.pg0.remote_ip4,
1424             self.nat_addr,
1425             local_port,
1426             external_port,
1427             proto=IP_PROTOS.tcp,
1428             flags=flags,
1429         )
1430
1431         self.nat_add_inside_interface(self.pg0)
1432         self.nat_add_outside_interface(self.pg1)
1433
1434         # from client to service
1435         p = (
1436             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1437             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1438             / TCP(sport=12345, dport=external_port)
1439         )
1440         self.pg1.add_stream(p)
1441         self.pg_enable_capture(self.pg_interfaces)
1442         self.pg_start()
1443         capture = self.pg0.get_capture(1)
1444         p = capture[0]
1445         try:
1446             ip = p[IP]
1447             tcp = p[TCP]
1448             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1449             self.assertEqual(tcp.dport, local_port)
1450             self.assert_packet_checksums_valid(p)
1451         except:
1452             self.logger.error(ppp("Unexpected or invalid packet:", p))
1453             raise
1454
1455         # ICMP error
1456         p = (
1457             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1458             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1459             / ICMP(type=11)
1460             / capture[0][IP]
1461         )
1462         self.pg0.add_stream(p)
1463         self.pg_enable_capture(self.pg_interfaces)
1464         self.pg_start()
1465         capture = self.pg1.get_capture(1)
1466         p = capture[0]
1467         try:
1468             self.assertEqual(p[IP].src, self.nat_addr)
1469             inner = p[IPerror]
1470             self.assertEqual(inner.dst, self.nat_addr)
1471             self.assertEqual(inner[TCPerror].dport, external_port)
1472         except:
1473             self.logger.error(ppp("Unexpected or invalid packet:", p))
1474             raise
1475
1476         # from service back to client
1477         p = (
1478             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1479             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1480             / TCP(sport=local_port, dport=12345)
1481         )
1482         self.pg0.add_stream(p)
1483         self.pg_enable_capture(self.pg_interfaces)
1484         self.pg_start()
1485         capture = self.pg1.get_capture(1)
1486         p = capture[0]
1487         try:
1488             ip = p[IP]
1489             tcp = p[TCP]
1490             self.assertEqual(ip.src, self.nat_addr)
1491             self.assertEqual(tcp.sport, external_port)
1492             self.assert_packet_checksums_valid(p)
1493         except:
1494             self.logger.error(ppp("Unexpected or invalid packet:", p))
1495             raise
1496
1497         # ICMP error
1498         p = (
1499             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1500             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1501             / ICMP(type=11)
1502             / capture[0][IP]
1503         )
1504         self.pg1.add_stream(p)
1505         self.pg_enable_capture(self.pg_interfaces)
1506         self.pg_start()
1507         capture = self.pg0.get_capture(1)
1508         p = capture[0]
1509         try:
1510             self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1511             inner = p[IPerror]
1512             self.assertEqual(inner.src, self.pg0.remote_ip4)
1513             self.assertEqual(inner[TCPerror].sport, local_port)
1514         except:
1515             self.logger.error(ppp("Unexpected or invalid packet:", p))
1516             raise
1517
1518         # from client to server (no translation)
1519         p = (
1520             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1521             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
1522             / TCP(sport=12346, dport=local_port)
1523         )
1524         self.pg1.add_stream(p)
1525         self.pg_enable_capture(self.pg_interfaces)
1526         self.pg_start()
1527         capture = self.pg0.get_capture(1)
1528         p = capture[0]
1529         try:
1530             ip = p[IP]
1531             tcp = p[TCP]
1532             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1533             self.assertEqual(tcp.dport, local_port)
1534             self.assert_packet_checksums_valid(p)
1535         except:
1536             self.logger.error(ppp("Unexpected or invalid packet:", p))
1537             raise
1538
1539         # from service back to client (no translation)
1540         p = (
1541             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1542             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1543             / TCP(sport=local_port, dport=12346)
1544         )
1545         self.pg0.add_stream(p)
1546         self.pg_enable_capture(self.pg_interfaces)
1547         self.pg_start()
1548         capture = self.pg1.get_capture(1)
1549         p = capture[0]
1550         try:
1551             ip = p[IP]
1552             tcp = p[TCP]
1553             self.assertEqual(ip.src, self.pg0.remote_ip4)
1554             self.assertEqual(tcp.sport, local_port)
1555             self.assert_packet_checksums_valid(p)
1556         except:
1557             self.logger.error(ppp("Unexpected or invalid packet:", p))
1558             raise
1559
1560     def test_static_lb(self):
1561         """NAT44ED local service load balancing"""
1562         external_addr_n = self.nat_addr
1563         external_port = 80
1564         local_port = 8080
1565         server1 = self.pg0.remote_hosts[0]
1566         server2 = self.pg0.remote_hosts[1]
1567
1568         locals = [
1569             {"addr": server1.ip4, "port": local_port, "probability": 70, "vrf_id": 0},
1570             {"addr": server2.ip4, "port": local_port, "probability": 30, "vrf_id": 0},
1571         ]
1572
1573         self.nat_add_address(self.nat_addr)
1574         self.vapi.nat44_add_del_lb_static_mapping(
1575             is_add=1,
1576             external_addr=external_addr_n,
1577             external_port=external_port,
1578             protocol=IP_PROTOS.tcp,
1579             local_num=len(locals),
1580             locals=locals,
1581         )
1582         flags = self.config_flags.NAT_IS_INSIDE
1583         self.vapi.nat44_interface_add_del_feature(
1584             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1585         )
1586         self.vapi.nat44_interface_add_del_feature(
1587             sw_if_index=self.pg1.sw_if_index, is_add=1
1588         )
1589
1590         # from client to service
1591         p = (
1592             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1593             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1594             / TCP(sport=12345, dport=external_port)
1595         )
1596         self.pg1.add_stream(p)
1597         self.pg_enable_capture(self.pg_interfaces)
1598         self.pg_start()
1599         capture = self.pg0.get_capture(1)
1600         p = capture[0]
1601         server = None
1602         try:
1603             ip = p[IP]
1604             tcp = p[TCP]
1605             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1606             if ip.dst == server1.ip4:
1607                 server = server1
1608             else:
1609                 server = server2
1610             self.assertEqual(tcp.dport, local_port)
1611             self.assert_packet_checksums_valid(p)
1612         except:
1613             self.logger.error(ppp("Unexpected or invalid packet:", p))
1614             raise
1615
1616         # from service back to client
1617         p = (
1618             Ether(src=server.mac, dst=self.pg0.local_mac)
1619             / IP(src=server.ip4, dst=self.pg1.remote_ip4)
1620             / TCP(sport=local_port, dport=12345)
1621         )
1622         self.pg0.add_stream(p)
1623         self.pg_enable_capture(self.pg_interfaces)
1624         self.pg_start()
1625         capture = self.pg1.get_capture(1)
1626         p = capture[0]
1627         try:
1628             ip = p[IP]
1629             tcp = p[TCP]
1630             self.assertEqual(ip.src, self.nat_addr)
1631             self.assertEqual(tcp.sport, external_port)
1632             self.assert_packet_checksums_valid(p)
1633         except:
1634             self.logger.error(ppp("Unexpected or invalid packet:", p))
1635             raise
1636
1637         sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
1638         self.assertEqual(len(sessions), 1)
1639         self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1640         self.vapi.nat44_del_session(
1641             address=sessions[0].inside_ip_address,
1642             port=sessions[0].inside_port,
1643             protocol=sessions[0].protocol,
1644             flags=(
1645                 self.config_flags.NAT_IS_INSIDE
1646                 | self.config_flags.NAT_IS_EXT_HOST_VALID
1647             ),
1648             ext_host_address=sessions[0].ext_host_address,
1649             ext_host_port=sessions[0].ext_host_port,
1650         )
1651         sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
1652         self.assertEqual(len(sessions), 0)
1653
1654     def test_static_lb_2(self):
1655         """NAT44ED local service load balancing (asymmetrical rule)"""
1656         external_addr = self.nat_addr
1657         external_port = 80
1658         local_port = 8080
1659         server1 = self.pg0.remote_hosts[0]
1660         server2 = self.pg0.remote_hosts[1]
1661
1662         locals = [
1663             {"addr": server1.ip4, "port": local_port, "probability": 70, "vrf_id": 0},
1664             {"addr": server2.ip4, "port": local_port, "probability": 30, "vrf_id": 0},
1665         ]
1666
1667         self.vapi.nat44_forwarding_enable_disable(enable=1)
1668         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1669         self.vapi.nat44_add_del_lb_static_mapping(
1670             is_add=1,
1671             flags=flags,
1672             external_addr=external_addr,
1673             external_port=external_port,
1674             protocol=IP_PROTOS.tcp,
1675             local_num=len(locals),
1676             locals=locals,
1677         )
1678         flags = self.config_flags.NAT_IS_INSIDE
1679         self.vapi.nat44_interface_add_del_feature(
1680             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1681         )
1682         self.vapi.nat44_interface_add_del_feature(
1683             sw_if_index=self.pg1.sw_if_index, is_add=1
1684         )
1685
1686         # from client to service
1687         p = (
1688             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1689             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1690             / TCP(sport=12345, dport=external_port)
1691         )
1692         self.pg1.add_stream(p)
1693         self.pg_enable_capture(self.pg_interfaces)
1694         self.pg_start()
1695         capture = self.pg0.get_capture(1)
1696         p = capture[0]
1697         server = None
1698         try:
1699             ip = p[IP]
1700             tcp = p[TCP]
1701             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1702             if ip.dst == server1.ip4:
1703                 server = server1
1704             else:
1705                 server = server2
1706             self.assertEqual(tcp.dport, local_port)
1707             self.assert_packet_checksums_valid(p)
1708         except:
1709             self.logger.error(ppp("Unexpected or invalid packet:", p))
1710             raise
1711
1712         # from service back to client
1713         p = (
1714             Ether(src=server.mac, dst=self.pg0.local_mac)
1715             / IP(src=server.ip4, dst=self.pg1.remote_ip4)
1716             / TCP(sport=local_port, dport=12345)
1717         )
1718         self.pg0.add_stream(p)
1719         self.pg_enable_capture(self.pg_interfaces)
1720         self.pg_start()
1721         capture = self.pg1.get_capture(1)
1722         p = capture[0]
1723         try:
1724             ip = p[IP]
1725             tcp = p[TCP]
1726             self.assertEqual(ip.src, self.nat_addr)
1727             self.assertEqual(tcp.sport, external_port)
1728             self.assert_packet_checksums_valid(p)
1729         except:
1730             self.logger.error(ppp("Unexpected or invalid packet:", p))
1731             raise
1732
1733         # from client to server (no translation)
1734         p = (
1735             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1736             / IP(src=self.pg1.remote_ip4, dst=server1.ip4)
1737             / TCP(sport=12346, dport=local_port)
1738         )
1739         self.pg1.add_stream(p)
1740         self.pg_enable_capture(self.pg_interfaces)
1741         self.pg_start()
1742         capture = self.pg0.get_capture(1)
1743         p = capture[0]
1744         server = None
1745         try:
1746             ip = p[IP]
1747             tcp = p[TCP]
1748             self.assertEqual(ip.dst, server1.ip4)
1749             self.assertEqual(tcp.dport, local_port)
1750             self.assert_packet_checksums_valid(p)
1751         except:
1752             self.logger.error(ppp("Unexpected or invalid packet:", p))
1753             raise
1754
1755         # from service back to client (no translation)
1756         p = (
1757             Ether(src=server1.mac, dst=self.pg0.local_mac)
1758             / IP(src=server1.ip4, dst=self.pg1.remote_ip4)
1759             / TCP(sport=local_port, dport=12346)
1760         )
1761         self.pg0.add_stream(p)
1762         self.pg_enable_capture(self.pg_interfaces)
1763         self.pg_start()
1764         capture = self.pg1.get_capture(1)
1765         p = capture[0]
1766         try:
1767             ip = p[IP]
1768             tcp = p[TCP]
1769             self.assertEqual(ip.src, server1.ip4)
1770             self.assertEqual(tcp.sport, local_port)
1771             self.assert_packet_checksums_valid(p)
1772         except:
1773             self.logger.error(ppp("Unexpected or invalid packet:", p))
1774             raise
1775
1776     def test_lb_affinity(self):
1777         """NAT44ED local service load balancing affinity"""
1778         external_addr = self.nat_addr
1779         external_port = 80
1780         local_port = 8080
1781         server1 = self.pg0.remote_hosts[0]
1782         server2 = self.pg0.remote_hosts[1]
1783
1784         locals = [
1785             {"addr": server1.ip4, "port": local_port, "probability": 50, "vrf_id": 0},
1786             {"addr": server2.ip4, "port": local_port, "probability": 50, "vrf_id": 0},
1787         ]
1788
1789         self.nat_add_address(self.nat_addr)
1790         self.vapi.nat44_add_del_lb_static_mapping(
1791             is_add=1,
1792             external_addr=external_addr,
1793             external_port=external_port,
1794             protocol=IP_PROTOS.tcp,
1795             affinity=10800,
1796             local_num=len(locals),
1797             locals=locals,
1798         )
1799         flags = self.config_flags.NAT_IS_INSIDE
1800         self.vapi.nat44_interface_add_del_feature(
1801             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1802         )
1803         self.vapi.nat44_interface_add_del_feature(
1804             sw_if_index=self.pg1.sw_if_index, is_add=1
1805         )
1806
1807         p = (
1808             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1809             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1810             / TCP(sport=1025, dport=external_port)
1811         )
1812         self.pg1.add_stream(p)
1813         self.pg_enable_capture(self.pg_interfaces)
1814         self.pg_start()
1815         capture = self.pg0.get_capture(1)
1816         backend = capture[0][IP].dst
1817
1818         sessions = self.vapi.nat44_user_session_dump(backend, 0)
1819         self.assertEqual(len(sessions), 1)
1820         self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1821         self.vapi.nat44_del_session(
1822             address=sessions[0].inside_ip_address,
1823             port=sessions[0].inside_port,
1824             protocol=sessions[0].protocol,
1825             flags=(
1826                 self.config_flags.NAT_IS_INSIDE
1827                 | self.config_flags.NAT_IS_EXT_HOST_VALID
1828             ),
1829             ext_host_address=sessions[0].ext_host_address,
1830             ext_host_port=sessions[0].ext_host_port,
1831         )
1832
1833         pkts = []
1834         for port in range(1030, 1100):
1835             p = (
1836                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1837                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1838                 / TCP(sport=port, dport=external_port)
1839             )
1840             pkts.append(p)
1841         self.pg1.add_stream(pkts)
1842         self.pg_enable_capture(self.pg_interfaces)
1843         self.pg_start()
1844         capture = self.pg0.get_capture(len(pkts))
1845         for p in capture:
1846             self.assertEqual(p[IP].dst, backend)
1847
1848     def test_multiple_vrf_1(self):
1849         """Multiple VRF - both client & service in VRF1"""
1850
1851         external_addr = "1.2.3.4"
1852         external_port = 80
1853         local_port = 8080
1854         port = 0
1855
1856         flags = self.config_flags.NAT_IS_INSIDE
1857         self.vapi.nat44_interface_add_del_feature(
1858             sw_if_index=self.pg5.sw_if_index, is_add=1
1859         )
1860         self.vapi.nat44_interface_add_del_feature(
1861             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
1862         )
1863         self.vapi.nat44_interface_add_del_feature(
1864             sw_if_index=self.pg6.sw_if_index, is_add=1
1865         )
1866         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1867         self.nat_add_static_mapping(
1868             self.pg5.remote_ip4,
1869             external_addr,
1870             local_port,
1871             external_port,
1872             vrf_id=1,
1873             proto=IP_PROTOS.tcp,
1874             flags=flags,
1875         )
1876
1877         p = (
1878             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
1879             / IP(src=self.pg6.remote_ip4, dst=external_addr)
1880             / TCP(sport=12345, dport=external_port)
1881         )
1882         self.pg6.add_stream(p)
1883         self.pg_enable_capture(self.pg_interfaces)
1884         self.pg_start()
1885         capture = self.pg5.get_capture(1)
1886         p = capture[0]
1887         try:
1888             ip = p[IP]
1889             tcp = p[TCP]
1890             self.assertEqual(ip.dst, self.pg5.remote_ip4)
1891             self.assertEqual(tcp.dport, local_port)
1892             self.assert_packet_checksums_valid(p)
1893         except:
1894             self.logger.error(ppp("Unexpected or invalid packet:", p))
1895             raise
1896
1897         p = (
1898             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
1899             / IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4)
1900             / TCP(sport=local_port, dport=12345)
1901         )
1902         self.pg5.add_stream(p)
1903         self.pg_enable_capture(self.pg_interfaces)
1904         self.pg_start()
1905         capture = self.pg6.get_capture(1)
1906         p = capture[0]
1907         try:
1908             ip = p[IP]
1909             tcp = p[TCP]
1910             self.assertEqual(ip.src, external_addr)
1911             self.assertEqual(tcp.sport, external_port)
1912             self.assert_packet_checksums_valid(p)
1913         except:
1914             self.logger.error(ppp("Unexpected or invalid packet:", p))
1915             raise
1916
1917     def test_multiple_vrf_2(self):
1918         """Multiple VRF - dynamic NAT from VRF1 to VRF0 (output-feature)"""
1919
1920         external_addr = "1.2.3.4"
1921         external_port = 80
1922         local_port = 8080
1923         port = 0
1924
1925         self.nat_add_address(self.nat_addr)
1926         flags = self.config_flags.NAT_IS_INSIDE
1927         self.vapi.nat44_ed_add_del_output_interface(
1928             sw_if_index=self.pg1.sw_if_index, is_add=1
1929         )
1930         self.vapi.nat44_interface_add_del_feature(
1931             sw_if_index=self.pg5.sw_if_index, is_add=1
1932         )
1933         self.vapi.nat44_interface_add_del_feature(
1934             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
1935         )
1936         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1937         self.nat_add_static_mapping(
1938             self.pg5.remote_ip4,
1939             external_addr,
1940             local_port,
1941             external_port,
1942             vrf_id=1,
1943             proto=IP_PROTOS.tcp,
1944             flags=flags,
1945         )
1946
1947         p = (
1948             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
1949             / IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4)
1950             / TCP(sport=2345, dport=22)
1951         )
1952         self.pg5.add_stream(p)
1953         self.pg_enable_capture(self.pg_interfaces)
1954         self.pg_start()
1955         capture = self.pg1.get_capture(1)
1956         p = capture[0]
1957         try:
1958             ip = p[IP]
1959             tcp = p[TCP]
1960             self.assertEqual(ip.src, self.nat_addr)
1961             self.assert_packet_checksums_valid(p)
1962             port = tcp.sport
1963         except:
1964             self.logger.error(ppp("Unexpected or invalid packet:", p))
1965             raise
1966
1967         p = (
1968             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1969             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1970             / TCP(sport=22, dport=port)
1971         )
1972         self.pg1.add_stream(p)
1973         self.pg_enable_capture(self.pg_interfaces)
1974         self.pg_start()
1975         capture = self.pg5.get_capture(1)
1976         p = capture[0]
1977         try:
1978             ip = p[IP]
1979             tcp = p[TCP]
1980             self.assertEqual(ip.dst, self.pg5.remote_ip4)
1981             self.assertEqual(tcp.dport, 2345)
1982             self.assert_packet_checksums_valid(p)
1983         except:
1984             self.logger.error(ppp("Unexpected or invalid packet:", p))
1985             raise
1986
1987     def test_multiple_vrf_3(self):
1988         """Multiple VRF - client in VRF1, service in VRF0"""
1989
1990         external_addr = "1.2.3.4"
1991         external_port = 80
1992         local_port = 8080
1993         port = 0
1994
1995         flags = self.config_flags.NAT_IS_INSIDE
1996         self.vapi.nat44_interface_add_del_feature(
1997             sw_if_index=self.pg0.sw_if_index, is_add=1
1998         )
1999         self.vapi.nat44_interface_add_del_feature(
2000             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2001         )
2002         self.vapi.nat44_interface_add_del_feature(
2003             sw_if_index=self.pg6.sw_if_index, is_add=1
2004         )
2005         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2006         self.nat_add_static_mapping(
2007             self.pg0.remote_ip4,
2008             external_sw_if_index=self.pg0.sw_if_index,
2009             local_port=local_port,
2010             vrf_id=0,
2011             external_port=external_port,
2012             proto=IP_PROTOS.tcp,
2013             flags=flags,
2014         )
2015
2016         # from client VRF1 to service VRF0
2017         p = (
2018             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
2019             / IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4)
2020             / TCP(sport=12346, dport=external_port)
2021         )
2022         self.pg6.add_stream(p)
2023         self.pg_enable_capture(self.pg_interfaces)
2024         self.pg_start()
2025         capture = self.pg0.get_capture(1)
2026         p = capture[0]
2027         try:
2028             ip = p[IP]
2029             tcp = p[TCP]
2030             self.assertEqual(ip.dst, self.pg0.remote_ip4)
2031             self.assertEqual(tcp.dport, local_port)
2032             self.assert_packet_checksums_valid(p)
2033         except:
2034             self.logger.error(ppp("Unexpected or invalid packet:", p))
2035             raise
2036
2037         # from service VRF0 back to client VRF1
2038         p = (
2039             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2040             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2041             / TCP(sport=local_port, dport=12346)
2042         )
2043         self.pg0.add_stream(p)
2044         self.pg_enable_capture(self.pg_interfaces)
2045         self.pg_start()
2046         capture = self.pg6.get_capture(1)
2047         p = capture[0]
2048         try:
2049             ip = p[IP]
2050             tcp = p[TCP]
2051             self.assertEqual(ip.src, self.pg0.local_ip4)
2052             self.assertEqual(tcp.sport, external_port)
2053             self.assert_packet_checksums_valid(p)
2054         except:
2055             self.logger.error(ppp("Unexpected or invalid packet:", p))
2056             raise
2057
2058     def test_multiple_vrf_4(self):
2059         """Multiple VRF - client in VRF0, service in VRF1"""
2060
2061         external_addr = "1.2.3.4"
2062         external_port = 80
2063         local_port = 8080
2064         port = 0
2065
2066         flags = self.config_flags.NAT_IS_INSIDE
2067         self.vapi.nat44_interface_add_del_feature(
2068             sw_if_index=self.pg0.sw_if_index, is_add=1
2069         )
2070         self.vapi.nat44_interface_add_del_feature(
2071             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2072         )
2073         self.vapi.nat44_interface_add_del_feature(
2074             sw_if_index=self.pg5.sw_if_index, is_add=1
2075         )
2076         self.vapi.nat44_interface_add_del_feature(
2077             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
2078         )
2079         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2080         self.nat_add_static_mapping(
2081             self.pg5.remote_ip4,
2082             external_addr,
2083             local_port,
2084             external_port,
2085             vrf_id=1,
2086             proto=IP_PROTOS.tcp,
2087             flags=flags,
2088         )
2089
2090         # from client VRF0 to service VRF1
2091         p = (
2092             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2093             / IP(src=self.pg0.remote_ip4, dst=external_addr)
2094             / TCP(sport=12347, dport=external_port)
2095         )
2096         self.pg0.add_stream(p)
2097         self.pg_enable_capture(self.pg_interfaces)
2098         self.pg_start()
2099         capture = self.pg5.get_capture(1)
2100         p = capture[0]
2101         try:
2102             ip = p[IP]
2103             tcp = p[TCP]
2104             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2105             self.assertEqual(tcp.dport, local_port)
2106             self.assert_packet_checksums_valid(p)
2107         except:
2108             self.logger.error(ppp("Unexpected or invalid packet:", p))
2109             raise
2110
2111         # from service VRF1 back to client VRF0
2112         p = (
2113             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2114             / IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4)
2115             / TCP(sport=local_port, dport=12347)
2116         )
2117         self.pg5.add_stream(p)
2118         self.pg_enable_capture(self.pg_interfaces)
2119         self.pg_start()
2120         capture = self.pg0.get_capture(1)
2121         p = capture[0]
2122         try:
2123             ip = p[IP]
2124             tcp = p[TCP]
2125             self.assertEqual(ip.src, external_addr)
2126             self.assertEqual(tcp.sport, external_port)
2127             self.assert_packet_checksums_valid(p)
2128         except:
2129             self.logger.error(ppp("Unexpected or invalid packet:", p))
2130             raise
2131
2132     def test_multiple_vrf_5(self):
2133         """Multiple VRF - forwarding - no translation"""
2134
2135         external_addr = "1.2.3.4"
2136         external_port = 80
2137         local_port = 8080
2138         port = 0
2139
2140         self.vapi.nat44_forwarding_enable_disable(enable=1)
2141         flags = self.config_flags.NAT_IS_INSIDE
2142         self.vapi.nat44_interface_add_del_feature(
2143             sw_if_index=self.pg0.sw_if_index, is_add=1
2144         )
2145         self.vapi.nat44_interface_add_del_feature(
2146             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2147         )
2148         self.vapi.nat44_interface_add_del_feature(
2149             sw_if_index=self.pg5.sw_if_index, is_add=1
2150         )
2151         self.vapi.nat44_interface_add_del_feature(
2152             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
2153         )
2154         self.vapi.nat44_interface_add_del_feature(
2155             sw_if_index=self.pg6.sw_if_index, is_add=1
2156         )
2157         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2158         self.nat_add_static_mapping(
2159             self.pg5.remote_ip4,
2160             external_addr,
2161             local_port,
2162             external_port,
2163             vrf_id=1,
2164             proto=IP_PROTOS.tcp,
2165             flags=flags,
2166         )
2167         self.nat_add_static_mapping(
2168             self.pg0.remote_ip4,
2169             external_sw_if_index=self.pg0.sw_if_index,
2170             local_port=local_port,
2171             vrf_id=0,
2172             external_port=external_port,
2173             proto=IP_PROTOS.tcp,
2174             flags=flags,
2175         )
2176
2177         # from client to server (both VRF1, no translation)
2178         p = (
2179             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
2180             / IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4)
2181             / TCP(sport=12348, dport=local_port)
2182         )
2183         self.pg6.add_stream(p)
2184         self.pg_enable_capture(self.pg_interfaces)
2185         self.pg_start()
2186         capture = self.pg5.get_capture(1)
2187         p = capture[0]
2188         try:
2189             ip = p[IP]
2190             tcp = p[TCP]
2191             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2192             self.assertEqual(tcp.dport, local_port)
2193             self.assert_packet_checksums_valid(p)
2194         except:
2195             self.logger.error(ppp("Unexpected or invalid packet:", p))
2196             raise
2197
2198         # from server back to client (both VRF1, no translation)
2199         p = (
2200             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2201             / IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4)
2202             / TCP(sport=local_port, dport=12348)
2203         )
2204         self.pg5.add_stream(p)
2205         self.pg_enable_capture(self.pg_interfaces)
2206         self.pg_start()
2207         capture = self.pg6.get_capture(1)
2208         p = capture[0]
2209         try:
2210             ip = p[IP]
2211             tcp = p[TCP]
2212             self.assertEqual(ip.src, self.pg5.remote_ip4)
2213             self.assertEqual(tcp.sport, local_port)
2214             self.assert_packet_checksums_valid(p)
2215         except:
2216             self.logger.error(ppp("Unexpected or invalid packet:", p))
2217             raise
2218
2219         # from client VRF1 to server VRF0 (no translation)
2220         p = (
2221             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2222             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2223             / TCP(sport=local_port, dport=12349)
2224         )
2225         self.pg0.add_stream(p)
2226         self.pg_enable_capture(self.pg_interfaces)
2227         self.pg_start()
2228         capture = self.pg6.get_capture(1)
2229         p = capture[0]
2230         try:
2231             ip = p[IP]
2232             tcp = p[TCP]
2233             self.assertEqual(ip.src, self.pg0.remote_ip4)
2234             self.assertEqual(tcp.sport, local_port)
2235             self.assert_packet_checksums_valid(p)
2236         except:
2237             self.logger.error(ppp("Unexpected or invalid packet:", p))
2238             raise
2239
2240         # from server VRF0 back to client VRF1 (no translation)
2241         p = (
2242             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2243             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2244             / TCP(sport=local_port, dport=12349)
2245         )
2246         self.pg0.add_stream(p)
2247         self.pg_enable_capture(self.pg_interfaces)
2248         self.pg_start()
2249         capture = self.pg6.get_capture(1)
2250         p = capture[0]
2251         try:
2252             ip = p[IP]
2253             tcp = p[TCP]
2254             self.assertEqual(ip.src, self.pg0.remote_ip4)
2255             self.assertEqual(tcp.sport, local_port)
2256             self.assert_packet_checksums_valid(p)
2257         except:
2258             self.logger.error(ppp("Unexpected or invalid packet:", p))
2259             raise
2260
2261         # from client VRF0 to server VRF1 (no translation)
2262         p = (
2263             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2264             / IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4)
2265             / TCP(sport=12344, dport=local_port)
2266         )
2267         self.pg0.add_stream(p)
2268         self.pg_enable_capture(self.pg_interfaces)
2269         self.pg_start()
2270         capture = self.pg5.get_capture(1)
2271         p = capture[0]
2272         try:
2273             ip = p[IP]
2274             tcp = p[TCP]
2275             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2276             self.assertEqual(tcp.dport, local_port)
2277             self.assert_packet_checksums_valid(p)
2278         except:
2279             self.logger.error(ppp("Unexpected or invalid packet:", p))
2280             raise
2281
2282         # from server VRF1 back to client VRF0 (no translation)
2283         p = (
2284             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2285             / IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4)
2286             / TCP(sport=local_port, dport=12344)
2287         )
2288         self.pg5.add_stream(p)
2289         self.pg_enable_capture(self.pg_interfaces)
2290         self.pg_start()
2291         capture = self.pg0.get_capture(1)
2292         p = capture[0]
2293         try:
2294             ip = p[IP]
2295             tcp = p[TCP]
2296             self.assertEqual(ip.src, self.pg5.remote_ip4)
2297             self.assertEqual(tcp.sport, local_port)
2298             self.assert_packet_checksums_valid(p)
2299         except:
2300             self.logger.error(ppp("Unexpected or invalid packet:", p))
2301             raise
2302
2303     def test_outside_address_distribution(self):
2304         """Outside address distribution based on source address"""
2305
2306         x = 100
2307         nat_addresses = []
2308
2309         for i in range(1, x):
2310             a = "10.0.0.%d" % i
2311             nat_addresses.append(a)
2312
2313         self.nat_add_inside_interface(self.pg0)
2314         self.nat_add_outside_interface(self.pg1)
2315
2316         self.vapi.nat44_add_del_address_range(
2317             first_ip_address=nat_addresses[0],
2318             last_ip_address=nat_addresses[-1],
2319             vrf_id=0xFFFFFFFF,
2320             is_add=1,
2321             flags=0,
2322         )
2323
2324         self.pg0.generate_remote_hosts(x)
2325
2326         pkts = []
2327         for i in range(x):
2328             info = self.create_packet_info(self.pg0, self.pg1)
2329             payload = self.info_to_payload(info)
2330             p = (
2331                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2332                 / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4)
2333                 / UDP(sport=7000 + i, dport=8000 + i)
2334                 / Raw(payload)
2335             )
2336             info.data = p
2337             pkts.append(p)
2338
2339         self.pg0.add_stream(pkts)
2340         self.pg_enable_capture(self.pg_interfaces)
2341         self.pg_start()
2342         recvd = self.pg1.get_capture(len(pkts))
2343         for p_recvd in recvd:
2344             payload_info = self.payload_to_info(p_recvd[Raw])
2345             packet_index = payload_info.index
2346             info = self._packet_infos[packet_index]
2347             self.assertTrue(info is not None)
2348             self.assertEqual(packet_index, info.index)
2349             p_sent = info.data
2350             packed = socket.inet_aton(p_sent[IP].src)
2351             numeric = struct.unpack("!L", packed)[0]
2352             numeric = socket.htonl(numeric)
2353             a = nat_addresses[(numeric - 1) % len(nat_addresses)]
2354             self.assertEqual(
2355                 a,
2356                 p_recvd[IP].src,
2357                 "Invalid packet (src IP %s translated to %s, but expected %s)"
2358                 % (p_sent[IP].src, p_recvd[IP].src, a),
2359             )
2360
2361     def test_dynamic_edge_ports(self):
2362         """NAT44ED dynamic translation test: edge ports"""
2363
2364         worker_count = self.vpp_worker_count or 1
2365         port_offset = 1024
2366         port_per_thread = (65536 - port_offset) // worker_count
2367         port_count = port_per_thread * worker_count
2368
2369         # worker thread edge ports
2370         thread_edge_ports = {0, port_offset - 1, 65535}
2371         for i in range(0, worker_count):
2372             port_thread_offset = (port_per_thread * i) + port_offset
2373             for port_range_offset in [0, port_per_thread - 1]:
2374                 port = port_thread_offset + port_range_offset
2375                 thread_edge_ports.add(port)
2376         thread_drop_ports = set(
2377             filter(
2378                 lambda x: x not in range(port_offset, port_offset + port_count),
2379                 thread_edge_ports,
2380             )
2381         )
2382
2383         in_if = self.pg7
2384         out_if = self.pg8
2385
2386         self.nat_add_address(self.nat_addr)
2387
2388         try:
2389             self.configure_ip4_interface(in_if, hosts=worker_count)
2390             self.configure_ip4_interface(out_if)
2391
2392             self.nat_add_inside_interface(in_if)
2393             self.nat_add_outside_interface(out_if)
2394
2395             # in2out
2396             tc1 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2397             uc1 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2398             ic1 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2399             dc1 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2400
2401             pkt_count = worker_count * len(thread_edge_ports)
2402
2403             i2o_pkts = [[] for x in range(0, worker_count)]
2404             for i in range(0, worker_count):
2405                 remote_host = in_if.remote_hosts[i]
2406                 for port in thread_edge_ports:
2407                     p = (
2408                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2409                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2410                         / TCP(sport=port, dport=port)
2411                     )
2412                     i2o_pkts[i].append(p)
2413
2414                     p = (
2415                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2416                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2417                         / UDP(sport=port, dport=port)
2418                     )
2419                     i2o_pkts[i].append(p)
2420
2421                     p = (
2422                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2423                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2424                         / ICMP(id=port, seq=port, type="echo-request")
2425                     )
2426                     i2o_pkts[i].append(p)
2427
2428             for i in range(0, worker_count):
2429                 if len(i2o_pkts[i]) > 0:
2430                     in_if.add_stream(i2o_pkts[i], worker=i)
2431
2432             self.pg_enable_capture(self.pg_interfaces)
2433             self.pg_start()
2434             capture = out_if.get_capture(pkt_count * 3)
2435             for packet in capture:
2436                 self.assert_packet_checksums_valid(packet)
2437                 if packet.haslayer(TCP):
2438                     self.assert_in_range(
2439                         packet[TCP].sport,
2440                         port_offset,
2441                         port_offset + port_count,
2442                         "src TCP port",
2443                     )
2444                 elif packet.haslayer(UDP):
2445                     self.assert_in_range(
2446                         packet[UDP].sport,
2447                         port_offset,
2448                         port_offset + port_count,
2449                         "src UDP port",
2450                     )
2451                 elif packet.haslayer(ICMP):
2452                     self.assert_in_range(
2453                         packet[ICMP].id,
2454                         port_offset,
2455                         port_offset + port_count,
2456                         "ICMP id",
2457                     )
2458                 else:
2459                     self.fail(
2460                         ppp("Unexpected or invalid packet (outside network):", packet)
2461                     )
2462
2463             if_idx = in_if.sw_if_index
2464             tc2 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2465             uc2 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2466             ic2 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2467             dc2 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2468
2469             self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2470             self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2471             self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2472             self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2473
2474             # out2in
2475             tc1 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2476             uc1 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2477             ic1 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2478             dc1 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2479             dc3 = self.statistics["/nat44-ed/out2in/slowpath/drops"]
2480
2481             # replies to unchanged thread ports should pass on each worker,
2482             # excluding packets outside dynamic port range
2483             drop_count = worker_count * len(thread_drop_ports)
2484             pass_count = worker_count * len(thread_edge_ports) - drop_count
2485
2486             o2i_pkts = [[] for x in range(0, worker_count)]
2487             for i in range(0, worker_count):
2488                 for port in thread_edge_ports:
2489                     p = (
2490                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2491                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2492                         / TCP(sport=port, dport=port)
2493                     )
2494                     o2i_pkts[i].append(p)
2495
2496                     p = (
2497                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2498                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2499                         / UDP(sport=port, dport=port)
2500                     )
2501                     o2i_pkts[i].append(p)
2502
2503                     p = (
2504                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2505                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2506                         / ICMP(id=port, seq=port, type="echo-reply")
2507                     )
2508                     o2i_pkts[i].append(p)
2509
2510             for i in range(0, worker_count):
2511                 if len(o2i_pkts[i]) > 0:
2512                     out_if.add_stream(o2i_pkts[i], worker=i)
2513
2514             self.pg_enable_capture(self.pg_interfaces)
2515             self.pg_start()
2516             capture = in_if.get_capture(pass_count * 3)
2517             for packet in capture:
2518                 self.assert_packet_checksums_valid(packet)
2519                 if packet.haslayer(TCP):
2520                     self.assertIn(packet[TCP].dport, thread_edge_ports, "dst TCP port")
2521                     self.assertEqual(packet[TCP].dport, packet[TCP].sport, "TCP ports")
2522                 elif packet.haslayer(UDP):
2523                     self.assertIn(packet[UDP].dport, thread_edge_ports, "dst UDP port")
2524                     self.assertEqual(packet[UDP].dport, packet[UDP].sport, "UDP ports")
2525                 elif packet.haslayer(ICMP):
2526                     self.assertIn(packet[ICMP].id, thread_edge_ports, "ICMP id")
2527                     self.assertEqual(packet[ICMP].id, packet[ICMP].seq, "ICMP id & seq")
2528                 else:
2529                     self.fail(
2530                         ppp("Unexpected or invalid packet (inside network):", packet)
2531                     )
2532
2533             if_idx = out_if.sw_if_index
2534             tc2 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2535             uc2 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2536             ic2 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2537             dc2 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2538             dc4 = self.statistics["/nat44-ed/out2in/slowpath/drops"]
2539
2540             self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pass_count)
2541             self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pass_count)
2542             self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pass_count)
2543             self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2544             self.assertEqual(
2545                 dc4[:, if_idx].sum() - dc3[:, if_idx].sum(), drop_count * 3
2546             )
2547
2548         finally:
2549             in_if.unconfig()
2550             out_if.unconfig()
2551
2552
2553 class TestNAT44EDMW(TestNAT44ED):
2554     """NAT44ED MW Test Case"""
2555
2556     vpp_worker_count = 4
2557     max_sessions = 5000
2558
2559     def test_dynamic(self):
2560         """NAT44ED dynamic translation test"""
2561         pkt_count = 1500
2562         tcp_port_offset = 20
2563         udp_port_offset = 20
2564         icmp_id_offset = 20
2565
2566         self.nat_add_address(self.nat_addr)
2567         self.nat_add_inside_interface(self.pg0)
2568         self.nat_add_outside_interface(self.pg1)
2569
2570         # in2out
2571         tc1 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2572         uc1 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2573         ic1 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2574         dc1 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2575
2576         i2o_pkts = [[] for x in range(0, self.vpp_worker_count)]
2577
2578         for i in range(pkt_count):
2579             p = (
2580                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2581                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2582                 / TCP(sport=tcp_port_offset + i, dport=20)
2583             )
2584             i2o_pkts[p[TCP].sport % self.vpp_worker_count].append(p)
2585
2586             p = (
2587                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2588                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2589                 / UDP(sport=udp_port_offset + i, dport=20)
2590             )
2591             i2o_pkts[p[UDP].sport % self.vpp_worker_count].append(p)
2592
2593             p = (
2594                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2595                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2596                 / ICMP(id=icmp_id_offset + i, type="echo-request")
2597             )
2598             i2o_pkts[p[ICMP].id % self.vpp_worker_count].append(p)
2599
2600         for i in range(0, self.vpp_worker_count):
2601             if len(i2o_pkts[i]) > 0:
2602                 self.pg0.add_stream(i2o_pkts[i], worker=i)
2603
2604         self.pg_enable_capture(self.pg_interfaces)
2605         self.pg_start()
2606         capture = self.pg1.get_capture(pkt_count * 3, timeout=5)
2607
2608         if_idx = self.pg0.sw_if_index
2609         tc2 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2610         uc2 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2611         ic2 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2612         dc2 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2613
2614         self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2615         self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2616         self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2617         self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2618
2619         self.logger.info(self.vapi.cli("show trace"))
2620
2621         # out2in
2622         tc1 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2623         uc1 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2624         ic1 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2625         dc1 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2626
2627         recvd_tcp_ports = set()
2628         recvd_udp_ports = set()
2629         recvd_icmp_ids = set()
2630
2631         for p in capture:
2632             if TCP in p:
2633                 recvd_tcp_ports.add(p[TCP].sport)
2634             if UDP in p:
2635                 recvd_udp_ports.add(p[UDP].sport)
2636             if ICMP in p:
2637                 recvd_icmp_ids.add(p[ICMP].id)
2638
2639         recvd_tcp_ports = list(recvd_tcp_ports)
2640         recvd_udp_ports = list(recvd_udp_ports)
2641         recvd_icmp_ids = list(recvd_icmp_ids)
2642
2643         o2i_pkts = [[] for x in range(0, self.vpp_worker_count)]
2644         for i in range(pkt_count):
2645             p = (
2646                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2647                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2648                 / TCP(dport=choice(recvd_tcp_ports), sport=20)
2649             )
2650             o2i_pkts[p[TCP].dport % self.vpp_worker_count].append(p)
2651
2652             p = (
2653                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2654                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2655                 / UDP(dport=choice(recvd_udp_ports), sport=20)
2656             )
2657             o2i_pkts[p[UDP].dport % self.vpp_worker_count].append(p)
2658
2659             p = (
2660                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2661                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2662                 / ICMP(id=choice(recvd_icmp_ids), type="echo-reply")
2663             )
2664             o2i_pkts[p[ICMP].id % self.vpp_worker_count].append(p)
2665
2666         for i in range(0, self.vpp_worker_count):
2667             if len(o2i_pkts[i]) > 0:
2668                 self.pg1.add_stream(o2i_pkts[i], worker=i)
2669
2670         self.pg_enable_capture(self.pg_interfaces)
2671         self.pg_start()
2672         capture = self.pg0.get_capture(pkt_count * 3)
2673         for packet in capture:
2674             try:
2675                 self.assert_packet_checksums_valid(packet)
2676                 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2677                 if packet.haslayer(TCP):
2678                     self.assert_in_range(
2679                         packet[TCP].dport,
2680                         tcp_port_offset,
2681                         tcp_port_offset + pkt_count,
2682                         "dst TCP port",
2683                     )
2684                 elif packet.haslayer(UDP):
2685                     self.assert_in_range(
2686                         packet[UDP].dport,
2687                         udp_port_offset,
2688                         udp_port_offset + pkt_count,
2689                         "dst UDP port",
2690                     )
2691                 else:
2692                     self.assert_in_range(
2693                         packet[ICMP].id,
2694                         icmp_id_offset,
2695                         icmp_id_offset + pkt_count,
2696                         "ICMP id",
2697                     )
2698             except:
2699                 self.logger.error(
2700                     ppp("Unexpected or invalid packet (inside network):", packet)
2701                 )
2702                 raise
2703
2704         if_idx = self.pg1.sw_if_index
2705         tc2 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2706         uc2 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2707         ic2 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2708         dc2 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2709
2710         self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2711         self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2712         self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2713         self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2714
2715         sc = self.statistics["/nat44-ed/total-sessions"]
2716         self.assertEqual(
2717             sc[:, 0].sum(),
2718             len(recvd_tcp_ports) + len(recvd_udp_ports) + len(recvd_icmp_ids),
2719         )
2720
2721     def test_frag_in_order(self):
2722         """NAT44ED translate fragments arriving in order"""
2723
2724         self.nat_add_address(self.nat_addr)
2725         self.nat_add_inside_interface(self.pg0)
2726         self.nat_add_outside_interface(self.pg1)
2727
2728         self.frag_in_order(proto=IP_PROTOS.tcp, ignore_port=True)
2729         self.frag_in_order(proto=IP_PROTOS.udp, ignore_port=True)
2730         self.frag_in_order(proto=IP_PROTOS.icmp, ignore_port=True)
2731
2732     def test_frag_in_order_do_not_translate(self):
2733         """NAT44ED don't translate fragments arriving in order"""
2734
2735         self.nat_add_address(self.nat_addr)
2736         self.nat_add_inside_interface(self.pg0)
2737         self.nat_add_outside_interface(self.pg1)
2738         self.vapi.nat44_forwarding_enable_disable(enable=True)
2739
2740         self.frag_in_order(proto=IP_PROTOS.tcp, dont_translate=True)
2741
2742     def test_frag_out_of_order(self):
2743         """NAT44ED translate fragments arriving out of order"""
2744
2745         self.nat_add_address(self.nat_addr)
2746         self.nat_add_inside_interface(self.pg0)
2747         self.nat_add_outside_interface(self.pg1)
2748
2749         self.frag_out_of_order(proto=IP_PROTOS.tcp, ignore_port=True)
2750         self.frag_out_of_order(proto=IP_PROTOS.udp, ignore_port=True)
2751         self.frag_out_of_order(proto=IP_PROTOS.icmp, ignore_port=True)
2752
2753     def test_frag_in_order_in_plus_out(self):
2754         """NAT44ED in+out interface fragments in order"""
2755
2756         in_port = self.random_port()
2757         out_port = self.random_port()
2758
2759         self.nat_add_address(self.nat_addr)
2760         self.nat_add_inside_interface(self.pg0)
2761         self.nat_add_outside_interface(self.pg0)
2762         self.nat_add_inside_interface(self.pg1)
2763         self.nat_add_outside_interface(self.pg1)
2764
2765         # add static mappings for server
2766         self.nat_add_static_mapping(
2767             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.tcp
2768         )
2769         self.nat_add_static_mapping(
2770             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.udp
2771         )
2772         self.nat_add_static_mapping(
2773             self.server_addr, self.nat_addr, proto=IP_PROTOS.icmp
2774         )
2775
2776         # run tests for each protocol
2777         self.frag_in_order_in_plus_out(
2778             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.tcp
2779         )
2780         self.frag_in_order_in_plus_out(
2781             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.udp
2782         )
2783         self.frag_in_order_in_plus_out(
2784             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.icmp
2785         )
2786
2787     def test_frag_out_of_order_in_plus_out(self):
2788         """NAT44ED in+out interface fragments out of order"""
2789
2790         in_port = self.random_port()
2791         out_port = self.random_port()
2792
2793         self.nat_add_address(self.nat_addr)
2794         self.nat_add_inside_interface(self.pg0)
2795         self.nat_add_outside_interface(self.pg0)
2796         self.nat_add_inside_interface(self.pg1)
2797         self.nat_add_outside_interface(self.pg1)
2798
2799         # add static mappings for server
2800         self.nat_add_static_mapping(
2801             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.tcp
2802         )
2803         self.nat_add_static_mapping(
2804             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.udp
2805         )
2806         self.nat_add_static_mapping(
2807             self.server_addr, self.nat_addr, proto=IP_PROTOS.icmp
2808         )
2809
2810         # run tests for each protocol
2811         self.frag_out_of_order_in_plus_out(
2812             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.tcp
2813         )
2814         self.frag_out_of_order_in_plus_out(
2815             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.udp
2816         )
2817         self.frag_out_of_order_in_plus_out(
2818             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.icmp
2819         )
2820
2821     def test_reass_hairpinning(self):
2822         """NAT44ED fragments hairpinning"""
2823
2824         server_addr = self.pg0.remote_hosts[1].ip4
2825
2826         host_in_port = self.random_port()
2827         server_in_port = self.random_port()
2828         server_out_port = self.random_port()
2829
2830         self.nat_add_address(self.nat_addr)
2831         self.nat_add_inside_interface(self.pg0)
2832         self.nat_add_outside_interface(self.pg1)
2833
2834         # add static mapping for server
2835         self.nat_add_static_mapping(
2836             server_addr,
2837             self.nat_addr,
2838             server_in_port,
2839             server_out_port,
2840             proto=IP_PROTOS.tcp,
2841         )
2842         self.nat_add_static_mapping(
2843             server_addr,
2844             self.nat_addr,
2845             server_in_port,
2846             server_out_port,
2847             proto=IP_PROTOS.udp,
2848         )
2849         self.nat_add_static_mapping(server_addr, self.nat_addr)
2850
2851         self.reass_hairpinning(
2852             server_addr,
2853             server_in_port,
2854             server_out_port,
2855             host_in_port,
2856             proto=IP_PROTOS.tcp,
2857             ignore_port=True,
2858         )
2859         self.reass_hairpinning(
2860             server_addr,
2861             server_in_port,
2862             server_out_port,
2863             host_in_port,
2864             proto=IP_PROTOS.udp,
2865             ignore_port=True,
2866         )
2867         self.reass_hairpinning(
2868             server_addr,
2869             server_in_port,
2870             server_out_port,
2871             host_in_port,
2872             proto=IP_PROTOS.icmp,
2873             ignore_port=True,
2874         )
2875
2876     def test_session_limit_per_vrf(self):
2877         """NAT44ED per vrf session limit"""
2878
2879         inside = self.pg0
2880         inside_vrf10 = self.pg2
2881         outside = self.pg1
2882
2883         limit = 5
2884
2885         # 2 interfaces pg0, pg1 (vrf10, limit 1 tcp session)
2886         # non existing vrf_id makes process core dump
2887         self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=10)
2888
2889         self.nat_add_inside_interface(inside)
2890         self.nat_add_inside_interface(inside_vrf10)
2891         self.nat_add_outside_interface(outside)
2892
2893         # vrf independent
2894         self.nat_add_interface_address(outside)
2895
2896         # BUG: causing core dump - when bad vrf_id is specified
2897         # self.nat_add_address(outside.local_ip4, vrf_id=20)
2898
2899         stream = self.create_tcp_stream(inside_vrf10, outside, limit * 2)
2900         inside_vrf10.add_stream(stream)
2901
2902         self.pg_enable_capture(self.pg_interfaces)
2903         self.pg_start()
2904
2905         capture = outside.get_capture(limit)
2906
2907         stream = self.create_tcp_stream(inside, outside, limit * 2)
2908         inside.add_stream(stream)
2909
2910         self.pg_enable_capture(self.pg_interfaces)
2911         self.pg_start()
2912
2913         capture = outside.get_capture(len(stream))
2914
2915     def test_show_max_translations(self):
2916         """NAT44ED API test - max translations per thread"""
2917         config = self.vapi.nat44_show_running_config()
2918         self.assertEqual(self.max_sessions, config.sessions)
2919
2920     def test_lru_cleanup(self):
2921         """NAT44ED LRU cleanup algorithm"""
2922
2923         self.nat_add_address(self.nat_addr)
2924         self.nat_add_inside_interface(self.pg0)
2925         self.nat_add_outside_interface(self.pg1)
2926
2927         self.vapi.nat_set_timeouts(
2928             udp=1, tcp_established=7440, tcp_transitory=30, icmp=1
2929         )
2930
2931         tcp_port_out = self.init_tcp_session(self.pg0, self.pg1, 2000, 80)
2932         pkts = []
2933         for i in range(0, self.max_sessions - 1):
2934             p = (
2935                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2936                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
2937                 / UDP(sport=7000 + i, dport=80)
2938             )
2939             pkts.append(p)
2940
2941         self.pg0.add_stream(pkts)
2942         self.pg_enable_capture(self.pg_interfaces)
2943         self.pg_start()
2944         self.pg1.get_capture(len(pkts))
2945         self.virtual_sleep(1.5, "wait for timeouts")
2946
2947         pkts = []
2948         for i in range(0, self.max_sessions - 1):
2949             p = (
2950                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2951                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
2952                 / ICMP(id=8000 + i, type="echo-request")
2953             )
2954             pkts.append(p)
2955
2956         self.pg0.add_stream(pkts)
2957         self.pg_enable_capture(self.pg_interfaces)
2958         self.pg_start()
2959         self.pg1.get_capture(len(pkts))
2960
2961     def test_session_rst_timeout(self):
2962         """NAT44ED session RST timeouts"""
2963
2964         self.nat_add_address(self.nat_addr)
2965         self.nat_add_inside_interface(self.pg0)
2966         self.nat_add_outside_interface(self.pg1)
2967
2968         self.vapi.nat_set_timeouts(
2969             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
2970         )
2971
2972         self.init_tcp_session(
2973             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
2974         )
2975         p = (
2976             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2977             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2978             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="R")
2979         )
2980         self.send_and_expect(self.pg0, p, self.pg1)
2981
2982         self.virtual_sleep(6)
2983
2984         # The session is already closed
2985         p = (
2986             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2987             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2988             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
2989         )
2990         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
2991
2992         # The session can be re-opened
2993         p = (
2994             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2995             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2996             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="S")
2997         )
2998         self.send_and_expect(self.pg0, p, self.pg1)
2999
3000     def test_session_rst_established_timeout(self):
3001         """NAT44ED session RST timeouts"""
3002
3003         self.nat_add_address(self.nat_addr)
3004         self.nat_add_inside_interface(self.pg0)
3005         self.nat_add_outside_interface(self.pg1)
3006
3007         self.vapi.nat_set_timeouts(
3008             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
3009         )
3010
3011         self.init_tcp_session(
3012             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
3013         )
3014
3015         # Wait at least the transitory time, the session is in established
3016         # state anyway. RST followed by a data packet should move it to
3017         # transitory state.
3018         self.virtual_sleep(6)
3019         p = (
3020             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3021             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3022             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="R")
3023         )
3024         self.send_and_expect(self.pg0, p, self.pg1)
3025
3026         p = (
3027             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3028             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3029             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
3030         )
3031         self.send_and_expect(self.pg0, p, self.pg1)
3032
3033         # State is transitory, session should be closed after 6 seconds
3034         self.virtual_sleep(6)
3035
3036         p = (
3037             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3038             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3039             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
3040         )
3041         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
3042
3043     def test_dynamic_out_of_ports(self):
3044         """NAT44ED dynamic translation test: out of ports"""
3045
3046         self.nat_add_inside_interface(self.pg0)
3047         self.nat_add_outside_interface(self.pg1)
3048
3049         # in2out and no NAT addresses added
3050         pkts = self.create_stream_in(self.pg0, self.pg1)
3051
3052         self.send_and_assert_no_replies(
3053             self.pg0,
3054             pkts,
3055             msg="i2o pkts",
3056             stats_diff=self.no_diff
3057             | {
3058                 "err": {
3059                     "/err/nat44-ed-in2out-slowpath/out of ports": len(pkts),
3060                 },
3061                 self.pg0.sw_if_index: {
3062                     "/nat44-ed/in2out/slowpath/drops": len(pkts),
3063                 },
3064             },
3065         )
3066
3067         # in2out after NAT addresses added
3068         self.nat_add_address(self.nat_addr)
3069
3070         tcpn, udpn, icmpn = (
3071             sum(x) for x in zip(*((TCP in p, UDP in p, ICMP in p) for p in pkts))
3072         )
3073
3074         self.send_and_expect(
3075             self.pg0,
3076             pkts,
3077             self.pg1,
3078             msg="i2o pkts",
3079             stats_diff=self.no_diff
3080             | {
3081                 "err": {
3082                     "/err/nat44-ed-in2out-slowpath/out of ports": 0,
3083                 },
3084                 self.pg0.sw_if_index: {
3085                     "/nat44-ed/in2out/slowpath/drops": 0,
3086                     "/nat44-ed/in2out/slowpath/tcp": tcpn,
3087                     "/nat44-ed/in2out/slowpath/udp": udpn,
3088                     "/nat44-ed/in2out/slowpath/icmp": icmpn,
3089                 },
3090             },
3091         )
3092
3093     def test_unknown_proto(self):
3094         """NAT44ED translate packet with unknown protocol"""
3095
3096         self.nat_add_address(self.nat_addr)
3097         self.nat_add_inside_interface(self.pg0)
3098         self.nat_add_outside_interface(self.pg1)
3099
3100         # in2out
3101         p = (
3102             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3103             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3104             / TCP(sport=self.tcp_port_in, dport=20)
3105         )
3106         self.pg0.add_stream(p)
3107         self.pg_enable_capture(self.pg_interfaces)
3108         self.pg_start()
3109         p = self.pg1.get_capture(1)
3110
3111         p = (
3112             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3113             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3114             / GRE()
3115             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3116             / TCP(sport=1234, dport=1234)
3117         )
3118         self.pg0.add_stream(p)
3119         self.pg_enable_capture(self.pg_interfaces)
3120         self.pg_start()
3121         p = self.pg1.get_capture(1)
3122         packet = p[0]
3123         try:
3124             self.assertEqual(packet[IP].src, self.nat_addr)
3125             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3126             self.assertEqual(packet.haslayer(GRE), 1)
3127             self.assert_packet_checksums_valid(packet)
3128         except:
3129             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3130             raise
3131
3132         # out2in
3133         p = (
3134             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3135             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3136             / GRE()
3137             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3138             / TCP(sport=1234, dport=1234)
3139         )
3140         self.pg1.add_stream(p)
3141         self.pg_enable_capture(self.pg_interfaces)
3142         self.pg_start()
3143         p = self.pg0.get_capture(1)
3144         packet = p[0]
3145         try:
3146             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3147             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3148             self.assertEqual(packet.haslayer(GRE), 1)
3149             self.assert_packet_checksums_valid(packet)
3150         except:
3151             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3152             raise
3153
3154     def test_hairpinning_unknown_proto(self):
3155         """NAT44ED translate packet with unknown protocol - hairpinning"""
3156         host = self.pg0.remote_hosts[0]
3157         server = self.pg0.remote_hosts[1]
3158         host_in_port = 1234
3159         server_out_port = 8765
3160         server_nat_ip = "10.0.0.11"
3161
3162         self.nat_add_address(self.nat_addr)
3163         self.nat_add_inside_interface(self.pg0)
3164         self.nat_add_outside_interface(self.pg1)
3165
3166         # add static mapping for server
3167         self.nat_add_static_mapping(server.ip4, server_nat_ip)
3168
3169         # host to server
3170         p = (
3171             Ether(src=host.mac, dst=self.pg0.local_mac)
3172             / IP(src=host.ip4, dst=server_nat_ip)
3173             / TCP(sport=host_in_port, dport=server_out_port)
3174         )
3175         self.pg0.add_stream(p)
3176         self.pg_enable_capture(self.pg_interfaces)
3177         self.pg_start()
3178         self.pg0.get_capture(1)
3179
3180         p = (
3181             Ether(dst=self.pg0.local_mac, src=host.mac)
3182             / IP(src=host.ip4, dst=server_nat_ip)
3183             / GRE()
3184             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3185             / TCP(sport=1234, dport=1234)
3186         )
3187         self.pg0.add_stream(p)
3188         self.pg_enable_capture(self.pg_interfaces)
3189         self.pg_start()
3190         p = self.pg0.get_capture(1)
3191         packet = p[0]
3192         try:
3193             self.assertEqual(packet[IP].src, self.nat_addr)
3194             self.assertEqual(packet[IP].dst, server.ip4)
3195             self.assertEqual(packet.haslayer(GRE), 1)
3196             self.assert_packet_checksums_valid(packet)
3197         except:
3198             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3199             raise
3200
3201         # server to host
3202         p = (
3203             Ether(dst=self.pg0.local_mac, src=server.mac)
3204             / IP(src=server.ip4, dst=self.nat_addr)
3205             / GRE()
3206             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3207             / TCP(sport=1234, dport=1234)
3208         )
3209         self.pg0.add_stream(p)
3210         self.pg_enable_capture(self.pg_interfaces)
3211         self.pg_start()
3212         p = self.pg0.get_capture(1)
3213         packet = p[0]
3214         try:
3215             self.assertEqual(packet[IP].src, server_nat_ip)
3216             self.assertEqual(packet[IP].dst, host.ip4)
3217             self.assertEqual(packet.haslayer(GRE), 1)
3218             self.assert_packet_checksums_valid(packet)
3219         except:
3220             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3221             raise
3222
3223     def test_output_feature_and_service(self):
3224         """NAT44ED interface output feature and services"""
3225         external_addr = "1.2.3.4"
3226         external_port = 80
3227         local_port = 8080
3228
3229         self.vapi.nat44_forwarding_enable_disable(enable=1)
3230         self.nat_add_address(self.nat_addr)
3231         flags = self.config_flags.NAT_IS_ADDR_ONLY
3232         self.vapi.nat44_add_del_identity_mapping(
3233             ip_address=self.pg1.remote_ip4,
3234             sw_if_index=0xFFFFFFFF,
3235             flags=flags,
3236             is_add=1,
3237         )
3238         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
3239         self.nat_add_static_mapping(
3240             self.pg0.remote_ip4,
3241             external_addr,
3242             local_port,
3243             external_port,
3244             proto=IP_PROTOS.tcp,
3245             flags=flags,
3246         )
3247
3248         self.nat_add_inside_interface(self.pg0)
3249         self.nat_add_outside_interface(self.pg0)
3250         self.vapi.nat44_ed_add_del_output_interface(
3251             sw_if_index=self.pg1.sw_if_index, is_add=1
3252         )
3253
3254         # from client to service
3255         p = (
3256             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3257             / IP(src=self.pg1.remote_ip4, dst=external_addr)
3258             / TCP(sport=12345, dport=external_port)
3259         )
3260         self.pg1.add_stream(p)
3261         self.pg_enable_capture(self.pg_interfaces)
3262         self.pg_start()
3263         capture = self.pg0.get_capture(1)
3264         p = capture[0]
3265         try:
3266             ip = p[IP]
3267             tcp = p[TCP]
3268             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3269             self.assertEqual(tcp.dport, local_port)
3270             self.assert_packet_checksums_valid(p)
3271         except:
3272             self.logger.error(ppp("Unexpected or invalid packet:", p))
3273             raise
3274
3275         # from service back to client
3276         p = (
3277             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3278             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3279             / TCP(sport=local_port, dport=12345)
3280         )
3281         self.pg0.add_stream(p)
3282         self.pg_enable_capture(self.pg_interfaces)
3283         self.pg_start()
3284         capture = self.pg1.get_capture(1)
3285         p = capture[0]
3286         try:
3287             ip = p[IP]
3288             tcp = p[TCP]
3289             self.assertEqual(ip.src, external_addr)
3290             self.assertEqual(tcp.sport, external_port)
3291             self.assert_packet_checksums_valid(p)
3292         except:
3293             self.logger.error(ppp("Unexpected or invalid packet:", p))
3294             raise
3295
3296         # from local network host to external network
3297         pkts = self.create_stream_in(self.pg0, self.pg1)
3298         self.pg0.add_stream(pkts)
3299         self.pg_enable_capture(self.pg_interfaces)
3300         self.pg_start()
3301         capture = self.pg1.get_capture(len(pkts))
3302         self.verify_capture_out(capture, ignore_port=True)
3303         pkts = self.create_stream_in(self.pg0, self.pg1)
3304         self.pg0.add_stream(pkts)
3305         self.pg_enable_capture(self.pg_interfaces)
3306         self.pg_start()
3307         capture = self.pg1.get_capture(len(pkts))
3308         self.verify_capture_out(capture, ignore_port=True)
3309
3310         # from external network back to local network host
3311         pkts = self.create_stream_out(self.pg1)
3312         self.pg1.add_stream(pkts)
3313         self.pg_enable_capture(self.pg_interfaces)
3314         self.pg_start()
3315         capture = self.pg0.get_capture(len(pkts))
3316         self.verify_capture_in(capture, self.pg0)
3317
3318     def test_output_feature_and_service3(self):
3319         """NAT44ED interface output feature and DST NAT"""
3320         external_addr = "1.2.3.4"
3321         external_port = 80
3322         local_port = 8080
3323
3324         self.vapi.nat44_forwarding_enable_disable(enable=1)
3325         self.nat_add_address(self.nat_addr)
3326         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
3327         self.nat_add_static_mapping(
3328             self.pg1.remote_ip4,
3329             external_addr,
3330             local_port,
3331             external_port,
3332             proto=IP_PROTOS.tcp,
3333             flags=flags,
3334         )
3335
3336         self.nat_add_inside_interface(self.pg0)
3337         self.nat_add_outside_interface(self.pg0)
3338         self.vapi.nat44_ed_add_del_output_interface(
3339             sw_if_index=self.pg1.sw_if_index, is_add=1
3340         )
3341
3342         p = (
3343             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3344             / IP(src=self.pg0.remote_ip4, dst=external_addr)
3345             / TCP(sport=12345, dport=external_port)
3346         )
3347         self.pg0.add_stream(p)
3348         self.pg_enable_capture(self.pg_interfaces)
3349         self.pg_start()
3350         capture = self.pg1.get_capture(1)
3351         p = capture[0]
3352         try:
3353             ip = p[IP]
3354             tcp = p[TCP]
3355             self.assertEqual(ip.src, self.pg0.remote_ip4)
3356             self.assertEqual(tcp.sport, 12345)
3357             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3358             self.assertEqual(tcp.dport, local_port)
3359             self.assert_packet_checksums_valid(p)
3360         except:
3361             self.logger.error(ppp("Unexpected or invalid packet:", p))
3362             raise
3363
3364         p = (
3365             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3366             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
3367             / TCP(sport=local_port, dport=12345)
3368         )
3369         self.pg1.add_stream(p)
3370         self.pg_enable_capture(self.pg_interfaces)
3371         self.pg_start()
3372         capture = self.pg0.get_capture(1)
3373         p = capture[0]
3374         try:
3375             ip = p[IP]
3376             tcp = p[TCP]
3377             self.assertEqual(ip.src, external_addr)
3378             self.assertEqual(tcp.sport, external_port)
3379             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3380             self.assertEqual(tcp.dport, 12345)
3381             self.assert_packet_checksums_valid(p)
3382         except:
3383             self.logger.error(ppp("Unexpected or invalid packet:", p))
3384             raise
3385
3386     def test_self_twice_nat_lb_negative(self):
3387         """NAT44ED Self Twice NAT local service load balancing (negative test)"""
3388         self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, client_id=2)
3389
3390     def test_self_twice_nat_negative(self):
3391         """NAT44ED Self Twice NAT (negative test)"""
3392         self.twice_nat_common(self_twice_nat=True)
3393
3394     def test_static_lb_multi_clients(self):
3395         """NAT44ED local service load balancing - multiple clients"""
3396
3397         external_addr = self.nat_addr
3398         external_port = 80
3399         local_port = 8080
3400         server1 = self.pg0.remote_hosts[0]
3401         server2 = self.pg0.remote_hosts[1]
3402         server3 = self.pg0.remote_hosts[2]
3403
3404         locals = [
3405             {"addr": server1.ip4, "port": local_port, "probability": 90, "vrf_id": 0},
3406             {"addr": server2.ip4, "port": local_port, "probability": 10, "vrf_id": 0},
3407         ]
3408
3409         flags = self.config_flags.NAT_IS_INSIDE
3410         self.vapi.nat44_interface_add_del_feature(
3411             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3412         )
3413         self.vapi.nat44_interface_add_del_feature(
3414             sw_if_index=self.pg1.sw_if_index, is_add=1
3415         )
3416
3417         self.nat_add_address(self.nat_addr)
3418         self.vapi.nat44_add_del_lb_static_mapping(
3419             is_add=1,
3420             external_addr=external_addr,
3421             external_port=external_port,
3422             protocol=IP_PROTOS.tcp,
3423             local_num=len(locals),
3424             locals=locals,
3425         )
3426
3427         server1_n = 0
3428         server2_n = 0
3429         clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3430         pkts = []
3431         for client in clients:
3432             p = (
3433                 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3434                 / IP(src=client, dst=self.nat_addr)
3435                 / TCP(sport=12345, dport=external_port)
3436             )
3437             pkts.append(p)
3438         self.pg1.add_stream(pkts)
3439         self.pg_enable_capture(self.pg_interfaces)
3440         self.pg_start()
3441         capture = self.pg0.get_capture(len(pkts))
3442         for p in capture:
3443             if p[IP].dst == server1.ip4:
3444                 server1_n += 1
3445             else:
3446                 server2_n += 1
3447         self.assertGreaterEqual(server1_n, server2_n)
3448
3449         local = {
3450             "addr": server3.ip4,
3451             "port": local_port,
3452             "probability": 20,
3453             "vrf_id": 0,
3454         }
3455
3456         # add new back-end
3457         self.vapi.nat44_lb_static_mapping_add_del_local(
3458             is_add=1,
3459             external_addr=external_addr,
3460             external_port=external_port,
3461             local=local,
3462             protocol=IP_PROTOS.tcp,
3463         )
3464         server1_n = 0
3465         server2_n = 0
3466         server3_n = 0
3467         clients = ip4_range(self.pg1.remote_ip4, 60, 110)
3468         pkts = []
3469         for client in clients:
3470             p = (
3471                 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3472                 / IP(src=client, dst=self.nat_addr)
3473                 / TCP(sport=12346, dport=external_port)
3474             )
3475             pkts.append(p)
3476         self.assertGreater(len(pkts), 0)
3477         self.pg1.add_stream(pkts)
3478         self.pg_enable_capture(self.pg_interfaces)
3479         self.pg_start()
3480         capture = self.pg0.get_capture(len(pkts))
3481         for p in capture:
3482             if p[IP].dst == server1.ip4:
3483                 server1_n += 1
3484             elif p[IP].dst == server2.ip4:
3485                 server2_n += 1
3486             else:
3487                 server3_n += 1
3488         self.assertGreater(server1_n, 0)
3489         self.assertGreater(server2_n, 0)
3490         self.assertGreater(server3_n, 0)
3491
3492         local = {
3493             "addr": server2.ip4,
3494             "port": local_port,
3495             "probability": 10,
3496             "vrf_id": 0,
3497         }
3498
3499         # remove one back-end
3500         self.vapi.nat44_lb_static_mapping_add_del_local(
3501             is_add=0,
3502             external_addr=external_addr,
3503             external_port=external_port,
3504             local=local,
3505             protocol=IP_PROTOS.tcp,
3506         )
3507         server1_n = 0
3508         server2_n = 0
3509         server3_n = 0
3510         self.pg1.add_stream(pkts)
3511         self.pg_enable_capture(self.pg_interfaces)
3512         self.pg_start()
3513         capture = self.pg0.get_capture(len(pkts))
3514         for p in capture:
3515             if p[IP].dst == server1.ip4:
3516                 server1_n += 1
3517             elif p[IP].dst == server2.ip4:
3518                 server2_n += 1
3519             else:
3520                 server3_n += 1
3521         self.assertGreater(server1_n, 0)
3522         self.assertEqual(server2_n, 0)
3523         self.assertGreater(server3_n, 0)
3524
3525     # put zzz in front of syslog test name so that it runs as a last test
3526     # setting syslog sender cannot be undone and if it is set, it messes
3527     # with self.send_and_assert_no_replies functionality
3528     def test_zzz_syslog_sess(self):
3529         """NAT44ED Test syslog session creation and deletion"""
3530         self.vapi.syslog_set_filter(self.syslog_severity.SYSLOG_API_SEVERITY_INFO)
3531         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
3532
3533         self.nat_add_address(self.nat_addr)
3534         self.nat_add_inside_interface(self.pg0)
3535         self.nat_add_outside_interface(self.pg1)
3536
3537         p = (
3538             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3539             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3540             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
3541         )
3542         self.pg0.add_stream(p)
3543         self.pg_enable_capture(self.pg_interfaces)
3544         self.pg_start()
3545         capture = self.pg1.get_capture(1)
3546         self.tcp_port_out = capture[0][TCP].sport
3547         capture = self.pg3.get_capture(1)
3548         self.verify_syslog_sess(capture[0][Raw].load, "SADD")
3549
3550         self.pg_enable_capture(self.pg_interfaces)
3551         self.pg_start()
3552         self.nat_add_address(self.nat_addr, is_add=0)
3553         capture = self.pg3.get_capture(1)
3554         self.verify_syslog_sess(capture[0][Raw].load, "SDEL")
3555
3556     # put zzz in front of syslog test name so that it runs as a last test
3557     # setting syslog sender cannot be undone and if it is set, it messes
3558     # with self.send_and_assert_no_replies functionality
3559     def test_zzz_syslog_sess_reopen(self):
3560         """Syslog events for session reopen"""
3561         self.vapi.syslog_set_filter(self.syslog_severity.SYSLOG_API_SEVERITY_INFO)
3562         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
3563
3564         self.nat_add_address(self.nat_addr)
3565         self.nat_add_inside_interface(self.pg0)
3566         self.nat_add_outside_interface(self.pg1)
3567
3568         # SYN in2out
3569         p = (
3570             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3571             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3572             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
3573         )
3574         capture = self.send_and_expect(self.pg0, p, self.pg1)[0]
3575         self.tcp_port_out = capture[0][TCP].sport
3576         capture = self.pg3.get_capture(1)
3577         self.verify_syslog_sess(capture[0][Raw].load, "SADD")
3578
3579         # SYN out2in
3580         p = (
3581             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3582             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3583             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="SA")
3584         )
3585         self.send_and_expect(self.pg1, p, self.pg0)
3586
3587         p = (
3588             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3589             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3590             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")
3591         )
3592         self.send_and_expect(self.pg0, p, self.pg1)
3593
3594         # FIN in2out
3595         p = (
3596             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3597             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3598             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="F")
3599         )
3600         self.send_and_expect(self.pg0, p, self.pg1)
3601
3602         # FIN out2in
3603         p = (
3604             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3605             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3606             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="F")
3607         )
3608         self.send_and_expect(self.pg1, p, self.pg0)
3609
3610         self.init_tcp_session(
3611             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
3612         )
3613
3614         # 2 records should be produced - first one del & add
3615         capture = self.pg3.get_capture(2)
3616         self.verify_syslog_sess(capture[0][Raw].load, "SDEL")
3617         self.verify_syslog_sess(capture[1][Raw].load, "SADD")
3618
3619     def test_twice_nat_interface_addr(self):
3620         """NAT44ED Acquire twice NAT addresses from interface"""
3621         flags = self.config_flags.NAT_IS_TWICE_NAT
3622         self.vapi.nat44_add_del_interface_addr(
3623             sw_if_index=self.pg11.sw_if_index, flags=flags, is_add=1
3624         )
3625
3626         # no address in NAT pool
3627         adresses = self.vapi.nat44_address_dump()
3628         self.assertEqual(0, len(adresses))
3629
3630         # configure interface address and check NAT address pool
3631         self.pg11.config_ip4()
3632         adresses = self.vapi.nat44_address_dump()
3633         self.assertEqual(1, len(adresses))
3634         self.assertEqual(str(adresses[0].ip_address), self.pg11.local_ip4)
3635         self.assertEqual(adresses[0].flags, flags)
3636
3637         # remove interface address and check NAT address pool
3638         self.pg11.unconfig_ip4()
3639         adresses = self.vapi.nat44_address_dump()
3640         self.assertEqual(0, len(adresses))
3641
3642     def test_output_feature_stateful_acl(self):
3643         """NAT44ED output feature works with stateful ACL"""
3644
3645         self.nat_add_address(self.nat_addr)
3646         self.vapi.nat44_ed_add_del_output_interface(
3647             sw_if_index=self.pg1.sw_if_index, is_add=1
3648         )
3649
3650         # First ensure that the NAT is working sans ACL
3651
3652         # send packets out2in, no sessions yet so packets should drop
3653         pkts_out2in = self.create_stream_out(self.pg1)
3654         self.send_and_assert_no_replies(self.pg1, pkts_out2in)
3655
3656         # send packets into inside intf, ensure received via outside intf
3657         pkts_in2out = self.create_stream_in(self.pg0, self.pg1)
3658         capture = self.send_and_expect(
3659             self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)
3660         )
3661         self.verify_capture_out(capture, ignore_port=True)
3662
3663         # send out2in again, with sessions created it should work now
3664         pkts_out2in = self.create_stream_out(self.pg1)
3665         capture = self.send_and_expect(
3666             self.pg1, pkts_out2in, self.pg0, len(pkts_out2in)
3667         )
3668         self.verify_capture_in(capture, self.pg0)
3669
3670         # Create an ACL blocking everything
3671         out2in_deny_rule = AclRule(is_permit=0)
3672         out2in_acl = VppAcl(self, rules=[out2in_deny_rule])
3673         out2in_acl.add_vpp_config()
3674
3675         # create an ACL to permit/reflect everything
3676         in2out_reflect_rule = AclRule(is_permit=2)
3677         in2out_acl = VppAcl(self, rules=[in2out_reflect_rule])
3678         in2out_acl.add_vpp_config()
3679
3680         # apply as input acl on interface and confirm it blocks everything
3681         acl_if = VppAclInterface(
3682             self, sw_if_index=self.pg1.sw_if_index, n_input=1, acls=[out2in_acl]
3683         )
3684         acl_if.add_vpp_config()
3685         self.send_and_assert_no_replies(self.pg1, pkts_out2in)
3686
3687         # apply output acl
3688         acl_if.acls = [out2in_acl, in2out_acl]
3689         acl_if.add_vpp_config()
3690         # send in2out to generate ACL state (NAT state was created earlier)
3691         capture = self.send_and_expect(
3692             self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)
3693         )
3694         self.verify_capture_out(capture, ignore_port=True)
3695
3696         # send out2in again. ACL state exists so it should work now.
3697         # TCP packets with the syn flag set also need the ack flag
3698         for p in pkts_out2in:
3699             if p.haslayer(TCP) and p[TCP].flags & 0x02:
3700                 p[TCP].flags |= 0x10
3701         capture = self.send_and_expect(
3702             self.pg1, pkts_out2in, self.pg0, len(pkts_out2in)
3703         )
3704         self.verify_capture_in(capture, self.pg0)
3705         self.logger.info(self.vapi.cli("show trace"))
3706
3707     def test_tcp_close(self):
3708         """NAT44ED Close TCP session from inside network - output feature"""
3709         config = self.vapi.nat44_show_running_config()
3710         old_timeouts = config.timeouts
3711         new_transitory = 2
3712         self.vapi.nat_set_timeouts(
3713             udp=old_timeouts.udp,
3714             tcp_established=old_timeouts.tcp_established,
3715             icmp=old_timeouts.icmp,
3716             tcp_transitory=new_transitory,
3717         )
3718
3719         self.vapi.nat44_forwarding_enable_disable(enable=1)
3720         self.nat_add_address(self.pg1.local_ip4)
3721         twice_nat_addr = "10.0.1.3"
3722         service_ip = "192.168.16.150"
3723         self.nat_add_address(twice_nat_addr, twice_nat=1)
3724
3725         flags = self.config_flags.NAT_IS_INSIDE
3726         self.vapi.nat44_interface_add_del_feature(
3727             sw_if_index=self.pg0.sw_if_index, is_add=1
3728         )
3729         self.vapi.nat44_interface_add_del_feature(
3730             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3731         )
3732         self.vapi.nat44_ed_add_del_output_interface(
3733             is_add=1, sw_if_index=self.pg1.sw_if_index
3734         )
3735
3736         flags = (
3737             self.config_flags.NAT_IS_OUT2IN_ONLY | self.config_flags.NAT_IS_TWICE_NAT
3738         )
3739         self.nat_add_static_mapping(
3740             self.pg0.remote_ip4, service_ip, 80, 80, proto=IP_PROTOS.tcp, flags=flags
3741         )
3742         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3743         start_sessnum = len(sessions)
3744
3745         # SYN packet out->in
3746         p = (
3747             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3748             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3749             / TCP(sport=33898, dport=80, flags="S")
3750         )
3751         capture = self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3752         p = capture[0]
3753         tcp_port = p[TCP].sport
3754
3755         # SYN + ACK packet in->out
3756         p = (
3757             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3758             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3759             / TCP(sport=80, dport=tcp_port, flags="SA")
3760         )
3761         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3762
3763         # ACK packet out->in
3764         p = (
3765             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3766             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3767             / TCP(sport=33898, dport=80, flags="A")
3768         )
3769         self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3770
3771         # FIN packet in -> out
3772         p = (
3773             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3774             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3775             / TCP(sport=80, dport=tcp_port, flags="FA", seq=100, ack=300)
3776         )
3777         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3778
3779         # FIN+ACK packet out -> in
3780         p = (
3781             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3782             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3783             / TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101)
3784         )
3785         self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3786
3787         # ACK packet in -> out
3788         p = (
3789             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3790             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3791             / TCP(sport=80, dport=tcp_port, flags="A", seq=101, ack=301)
3792         )
3793         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3794
3795         # session now in transitory timeout, but traffic still flows
3796         # try FIN packet out->in
3797         p = (
3798             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3799             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3800             / TCP(sport=33898, dport=80, flags="F")
3801         )
3802         self.pg1.add_stream(p)
3803         self.pg_enable_capture(self.pg_interfaces)
3804         self.pg_start()
3805
3806         self.virtual_sleep(new_transitory, "wait for transitory timeout")
3807         self.pg0.get_capture(1)
3808
3809         # session should still exist
3810         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3811         self.assertEqual(len(sessions) - start_sessnum, 1)
3812
3813         # send FIN+ACK packet out -> in - will cause session to be wiped
3814         # but won't create a new session
3815         p = (
3816             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3817             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3818             / TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101)
3819         )
3820         self.send_and_assert_no_replies(self.pg1, p)
3821         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3822         self.assertEqual(len(sessions) - start_sessnum, 0)
3823
3824     def test_tcp_session_close_in(self):
3825         """NAT44ED Close TCP session from inside network"""
3826
3827         in_port = self.tcp_port_in
3828         out_port = 10505
3829         ext_port = self.tcp_external_port
3830
3831         self.nat_add_address(self.nat_addr)
3832         self.nat_add_inside_interface(self.pg0)
3833         self.nat_add_outside_interface(self.pg1)
3834         self.nat_add_static_mapping(
3835             self.pg0.remote_ip4,
3836             self.nat_addr,
3837             in_port,
3838             out_port,
3839             proto=IP_PROTOS.tcp,
3840             flags=self.config_flags.NAT_IS_TWICE_NAT,
3841         )
3842
3843         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3844         session_n = len(sessions)
3845
3846         self.vapi.nat_set_timeouts(
3847             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
3848         )
3849
3850         self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
3851
3852         # FIN packet in -> out
3853         p = (
3854             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3855             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3856             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
3857         )
3858         self.send_and_expect(self.pg0, p, self.pg1)
3859         pkts = []
3860
3861         # ACK packet out -> in
3862         p = (
3863             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3864             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3865             / TCP(sport=ext_port, dport=out_port, flags="A", seq=300, ack=101)
3866         )
3867         pkts.append(p)
3868
3869         # FIN packet out -> in
3870         p = (
3871             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3872             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3873             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
3874         )
3875         pkts.append(p)
3876
3877         self.send_and_expect(self.pg1, pkts, self.pg0)
3878
3879         # ACK packet in -> out
3880         p = (
3881             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3882             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3883             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3884         )
3885         self.send_and_expect(self.pg0, p, self.pg1)
3886
3887         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3888         self.assertEqual(len(sessions) - session_n, 1)
3889
3890         # retransmit FIN packet out -> in
3891         p = (
3892             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3893             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3894             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
3895         )
3896
3897         self.send_and_expect(self.pg1, p, self.pg0)
3898
3899         # retransmit ACK packet in -> out
3900         p = (
3901             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3902             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3903             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3904         )
3905         self.send_and_expect(self.pg0, p, self.pg1)
3906
3907         self.virtual_sleep(3)
3908         # retransmit ACK packet in -> out - this will cause session to be wiped
3909         p = (
3910             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3911             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3912             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3913         )
3914         self.send_and_assert_no_replies(self.pg0, p)
3915         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3916         self.assertEqual(len(sessions) - session_n, 0)
3917
3918     def test_tcp_session_close_out(self):
3919         """NAT44ED Close TCP session from outside network"""
3920
3921         in_port = self.tcp_port_in
3922         out_port = 10505
3923         ext_port = self.tcp_external_port
3924
3925         self.nat_add_address(self.nat_addr)
3926         self.nat_add_inside_interface(self.pg0)
3927         self.nat_add_outside_interface(self.pg1)
3928         self.nat_add_static_mapping(
3929             self.pg0.remote_ip4,
3930             self.nat_addr,
3931             in_port,
3932             out_port,
3933             proto=IP_PROTOS.tcp,
3934             flags=self.config_flags.NAT_IS_TWICE_NAT,
3935         )
3936
3937         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3938         session_n = len(sessions)
3939
3940         self.vapi.nat_set_timeouts(
3941             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
3942         )
3943
3944         _ = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
3945
3946         # FIN packet out -> in
3947         p = (
3948             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3949             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3950             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=100, ack=300)
3951         )
3952         self.pg1.add_stream(p)
3953         self.pg_enable_capture(self.pg_interfaces)
3954         self.pg_start()
3955         self.pg0.get_capture(1)
3956
3957         # FIN+ACK packet in -> out
3958         p = (
3959             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3960             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3961             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=300, ack=101)
3962         )
3963
3964         self.pg0.add_stream(p)
3965         self.pg_enable_capture(self.pg_interfaces)
3966         self.pg_start()
3967         self.pg1.get_capture(1)
3968
3969         # ACK packet out -> in
3970         p = (
3971             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3972             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3973             / TCP(sport=ext_port, dport=out_port, flags="A", seq=101, ack=301)
3974         )
3975         self.pg1.add_stream(p)
3976         self.pg_enable_capture(self.pg_interfaces)
3977         self.pg_start()
3978         self.pg0.get_capture(1)
3979
3980         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3981         self.assertEqual(len(sessions) - session_n, 1)
3982
3983         # retransmit FIN packet out -> in
3984         p = (
3985             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3986             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3987             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
3988         )
3989         self.send_and_expect(self.pg1, p, self.pg0)
3990
3991         # retransmit ACK packet in -> out
3992         p = (
3993             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3994             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3995             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3996         )
3997         self.send_and_expect(self.pg0, p, self.pg1)
3998
3999         self.virtual_sleep(3)
4000         # retransmit ACK packet in -> out - this will cause session to be wiped
4001         p = (
4002             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4003             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4004             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4005         )
4006         self.send_and_assert_no_replies(self.pg0, p)
4007         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4008         self.assertEqual(len(sessions) - session_n, 0)
4009
4010     def test_tcp_session_close_simultaneous(self):
4011         """Simultaneous TCP close from both sides"""
4012
4013         in_port = self.tcp_port_in
4014         ext_port = 10505
4015
4016         self.nat_add_address(self.nat_addr)
4017         self.nat_add_inside_interface(self.pg0)
4018         self.nat_add_outside_interface(self.pg1)
4019         self.nat_add_static_mapping(
4020             self.pg0.remote_ip4,
4021             self.nat_addr,
4022             in_port,
4023             ext_port,
4024             proto=IP_PROTOS.tcp,
4025             flags=self.config_flags.NAT_IS_TWICE_NAT,
4026         )
4027
4028         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4029         session_n = len(sessions)
4030
4031         self.vapi.nat_set_timeouts(
4032             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4033         )
4034
4035         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4036
4037         # FIN packet in -> out
4038         p = (
4039             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4040             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4041             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4042         )
4043         self.send_and_expect(self.pg0, p, self.pg1)
4044
4045         # FIN packet out -> in
4046         p = (
4047             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4048             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4049             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4050         )
4051         self.send_and_expect(self.pg1, p, self.pg0)
4052
4053         # ACK packet in -> out
4054         p = (
4055             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4056             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4057             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4058         )
4059         self.send_and_expect(self.pg0, p, self.pg1)
4060
4061         # ACK packet out -> in
4062         p = (
4063             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4064             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4065             / TCP(sport=ext_port, dport=out_port, flags="A", seq=301, ack=101)
4066         )
4067         self.send_and_expect(self.pg1, p, self.pg0)
4068
4069         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4070         self.assertEqual(len(sessions) - session_n, 1)
4071
4072         # retransmit FIN packet out -> in
4073         p = (
4074             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4075             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4076             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
4077         )
4078         self.send_and_expect(self.pg1, p, self.pg0)
4079
4080         # retransmit ACK packet in -> out
4081         p = (
4082             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4083             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4084             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4085         )
4086         self.send_and_expect(self.pg0, p, self.pg1)
4087
4088         self.virtual_sleep(3)
4089         # retransmit ACK packet in -> out - this will cause session to be wiped
4090         p = (
4091             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4092             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4093             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4094         )
4095         self.pg_send(self.pg0, p)
4096         self.send_and_assert_no_replies(self.pg0, p)
4097         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4098         self.assertEqual(len(sessions) - session_n, 0)
4099
4100     def test_tcp_session_half_reopen_inside(self):
4101         """TCP session in FIN/FIN state not reopened by in2out SYN only"""
4102         in_port = self.tcp_port_in
4103         ext_port = 10505
4104
4105         self.nat_add_address(self.nat_addr)
4106         self.nat_add_inside_interface(self.pg0)
4107         self.nat_add_outside_interface(self.pg1)
4108         self.nat_add_static_mapping(
4109             self.pg0.remote_ip4,
4110             self.nat_addr,
4111             in_port,
4112             ext_port,
4113             proto=IP_PROTOS.tcp,
4114             flags=self.config_flags.NAT_IS_TWICE_NAT,
4115         )
4116
4117         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4118         session_n = len(sessions)
4119
4120         self.vapi.nat_set_timeouts(
4121             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4122         )
4123
4124         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4125
4126         # FIN packet in -> out
4127         p = (
4128             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4129             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4130             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4131         )
4132         self.send_and_expect(self.pg0, p, self.pg1)
4133
4134         # FIN packet out -> in
4135         p = (
4136             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4137             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4138             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4139         )
4140         self.send_and_expect(self.pg1, p, self.pg0)
4141
4142         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4143         self.assertEqual(len(sessions) - session_n, 1)
4144
4145         # send SYN packet in -> out
4146         p = (
4147             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4148             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4149             / TCP(sport=in_port, dport=ext_port, flags="S", seq=101, ack=301)
4150         )
4151         self.send_and_expect(self.pg0, p, self.pg1)
4152
4153         self.virtual_sleep(3)
4154         # send ACK packet in -> out - session should be wiped
4155         p = (
4156             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4157             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4158             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4159         )
4160         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
4161         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4162         self.assertEqual(len(sessions) - session_n, 0)
4163
4164     def test_tcp_session_half_reopen_outside(self):
4165         """TCP session in FIN/FIN state not reopened by out2in SYN only"""
4166         in_port = self.tcp_port_in
4167         ext_port = 10505
4168
4169         self.nat_add_address(self.nat_addr)
4170         self.nat_add_inside_interface(self.pg0)
4171         self.nat_add_outside_interface(self.pg1)
4172         self.nat_add_static_mapping(
4173             self.pg0.remote_ip4,
4174             self.nat_addr,
4175             in_port,
4176             ext_port,
4177             proto=IP_PROTOS.tcp,
4178             flags=self.config_flags.NAT_IS_TWICE_NAT,
4179         )
4180
4181         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4182         session_n = len(sessions)
4183
4184         self.vapi.nat_set_timeouts(
4185             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4186         )
4187
4188         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4189
4190         # FIN packet in -> out
4191         p = (
4192             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4193             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4194             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4195         )
4196         self.send_and_expect(self.pg0, p, self.pg1)
4197
4198         # FIN packet out -> in
4199         p = (
4200             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4201             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4202             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4203         )
4204         self.send_and_expect(self.pg1, p, self.pg0)
4205
4206         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4207         self.assertEqual(len(sessions) - session_n, 1)
4208
4209         # send SYN packet out -> in
4210         p = (
4211             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4212             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4213             / TCP(sport=ext_port, dport=out_port, flags="S", seq=300, ack=101)
4214         )
4215         self.send_and_expect(self.pg1, p, self.pg0)
4216
4217         self.virtual_sleep(3)
4218         # send ACK packet in -> out - session should be wiped
4219         p = (
4220             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4221             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4222             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4223         )
4224         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
4225         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4226         self.assertEqual(len(sessions) - session_n, 0)
4227
4228     def test_tcp_session_reopen(self):
4229         """TCP session in FIN/FIN state reopened by SYN from both sides"""
4230         in_port = self.tcp_port_in
4231         ext_port = 10505
4232
4233         self.nat_add_address(self.nat_addr)
4234         self.nat_add_inside_interface(self.pg0)
4235         self.nat_add_outside_interface(self.pg1)
4236         self.nat_add_static_mapping(
4237             self.pg0.remote_ip4,
4238             self.nat_addr,
4239             in_port,
4240             ext_port,
4241             proto=IP_PROTOS.tcp,
4242             flags=self.config_flags.NAT_IS_TWICE_NAT,
4243         )
4244
4245         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4246         session_n = len(sessions)
4247
4248         self.vapi.nat_set_timeouts(
4249             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4250         )
4251
4252         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4253
4254         # FIN packet in -> out
4255         p = (
4256             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4257             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4258             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4259         )
4260         self.send_and_expect(self.pg0, p, self.pg1)
4261
4262         # FIN packet out -> in
4263         p = (
4264             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4265             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4266             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4267         )
4268         self.send_and_expect(self.pg1, p, self.pg0)
4269
4270         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4271         self.assertEqual(len(sessions) - session_n, 1)
4272
4273         # send SYN packet out -> in
4274         p = (
4275             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4276             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4277             / TCP(sport=ext_port, dport=out_port, flags="S", seq=300, ack=101)
4278         )
4279         self.send_and_expect(self.pg1, p, self.pg0)
4280
4281         # send SYN packet in -> out
4282         p = (
4283             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4284             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4285             / TCP(sport=in_port, dport=ext_port, flags="SA", seq=101, ack=301)
4286         )
4287         self.send_and_expect(self.pg0, p, self.pg1)
4288
4289         # send ACK packet out -> in
4290         p = (
4291             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4292             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4293             / TCP(sport=ext_port, dport=out_port, flags="A", seq=300, ack=101)
4294         )
4295         self.send_and_expect(self.pg1, p, self.pg0)
4296
4297         self.virtual_sleep(3)
4298         # send ACK packet in -> out - should be forwarded and session alive
4299         p = (
4300             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4301             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4302             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4303         )
4304         self.send_and_expect(self.pg0, p, self.pg1)
4305         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4306         self.assertEqual(len(sessions) - session_n, 1)
4307
4308     def test_dynamic_vrf(self):
4309         """NAT44ED dynamic translation test: different VRF"""
4310
4311         vrf_id_in = 33
4312         vrf_id_out = 34
4313
4314         self.nat_add_address(self.nat_addr, vrf_id=vrf_id_in)
4315
4316         try:
4317             self.configure_ip4_interface(self.pg7, table_id=vrf_id_in)
4318             self.configure_ip4_interface(self.pg8, table_id=vrf_id_out)
4319
4320             self.nat_add_inside_interface(self.pg7)
4321             self.nat_add_outside_interface(self.pg8)
4322
4323             # just basic stuff nothing special
4324             pkts = self.create_stream_in(self.pg7, self.pg8)
4325             self.pg7.add_stream(pkts)
4326             self.pg_enable_capture(self.pg_interfaces)
4327             self.pg_start()
4328             capture = self.pg8.get_capture(len(pkts))
4329             self.verify_capture_out(capture, ignore_port=True)
4330
4331             pkts = self.create_stream_out(self.pg8)
4332             self.pg8.add_stream(pkts)
4333             self.pg_enable_capture(self.pg_interfaces)
4334             self.pg_start()
4335             capture = self.pg7.get_capture(len(pkts))
4336             self.verify_capture_in(capture, self.pg7)
4337
4338         finally:
4339             self.pg7.unconfig()
4340             self.pg8.unconfig()
4341
4342             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id_in})
4343             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id_out})
4344
4345     def test_dynamic_output_feature_vrf(self):
4346         """NAT44ED dynamic translation test: output-feature, VRF"""
4347
4348         # other then default (0)
4349         new_vrf_id = 22
4350
4351         self.nat_add_address(self.nat_addr)
4352         self.vapi.nat44_ed_add_del_output_interface(
4353             sw_if_index=self.pg8.sw_if_index, is_add=1
4354         )
4355         try:
4356             self.configure_ip4_interface(self.pg7, table_id=new_vrf_id)
4357             self.configure_ip4_interface(self.pg8, table_id=new_vrf_id)
4358
4359             # in2out
4360             tcpn = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
4361             udpn = self.statistics["/nat44-ed/in2out/slowpath/udp"]
4362             icmpn = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
4363             drops = self.statistics["/nat44-ed/in2out/slowpath/drops"]
4364
4365             pkts = self.create_stream_in(self.pg7, self.pg8)
4366             self.pg7.add_stream(pkts)
4367             self.pg_enable_capture(self.pg_interfaces)
4368             self.pg_start()
4369             capture = self.pg8.get_capture(len(pkts))
4370             self.verify_capture_out(capture, ignore_port=True)
4371
4372             if_idx = self.pg8.sw_if_index
4373             cnt = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
4374             self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
4375             cnt = self.statistics["/nat44-ed/in2out/slowpath/udp"]
4376             self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
4377             cnt = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
4378             self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
4379             cnt = self.statistics["/nat44-ed/in2out/slowpath/drops"]
4380             self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
4381
4382             # out2in
4383             tcpn = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
4384             udpn = self.statistics["/nat44-ed/out2in/fastpath/udp"]
4385             icmpn = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
4386             drops = self.statistics["/nat44-ed/out2in/fastpath/drops"]
4387
4388             pkts = self.create_stream_out(self.pg8)
4389             self.pg8.add_stream(pkts)
4390             self.pg_enable_capture(self.pg_interfaces)
4391             self.pg_start()
4392             capture = self.pg7.get_capture(len(pkts))
4393             self.verify_capture_in(capture, self.pg7)
4394
4395             if_idx = self.pg8.sw_if_index
4396             cnt = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
4397             self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
4398             cnt = self.statistics["/nat44-ed/out2in/fastpath/udp"]
4399             self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
4400             cnt = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
4401             self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
4402             cnt = self.statistics["/nat44-ed/out2in/fastpath/drops"]
4403             self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
4404
4405             sessions = self.statistics["/nat44-ed/total-sessions"]
4406             self.assertEqual(sessions[:, 0].sum(), 3)
4407
4408         finally:
4409             self.pg7.unconfig()
4410             self.pg8.unconfig()
4411
4412             self.vapi.ip_table_add_del(is_add=0, table={"table_id": new_vrf_id})
4413
4414     def test_next_src_nat(self):
4415         """NAT44ED On way back forward packet to nat44-in2out node."""
4416
4417         twice_nat_addr = "10.0.1.3"
4418         external_port = 80
4419         local_port = 8080
4420         post_twice_nat_port = 0
4421
4422         self.vapi.nat44_forwarding_enable_disable(enable=1)
4423         self.nat_add_address(twice_nat_addr, twice_nat=1)
4424         flags = (
4425             self.config_flags.NAT_IS_OUT2IN_ONLY
4426             | self.config_flags.NAT_IS_SELF_TWICE_NAT
4427         )
4428         self.nat_add_static_mapping(
4429             self.pg6.remote_ip4,
4430             self.pg1.remote_ip4,
4431             local_port,
4432             external_port,
4433             proto=IP_PROTOS.tcp,
4434             vrf_id=1,
4435             flags=flags,
4436         )
4437         self.vapi.nat44_interface_add_del_feature(
4438             sw_if_index=self.pg6.sw_if_index, is_add=1
4439         )
4440
4441         p = (
4442             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
4443             / IP(src=self.pg6.remote_ip4, dst=self.pg1.remote_ip4)
4444             / TCP(sport=12345, dport=external_port)
4445         )
4446         self.pg6.add_stream(p)
4447         self.pg_enable_capture(self.pg_interfaces)
4448         self.pg_start()
4449         capture = self.pg6.get_capture(1)
4450         p = capture[0]
4451         try:
4452             ip = p[IP]
4453             tcp = p[TCP]
4454             self.assertEqual(ip.src, twice_nat_addr)
4455             self.assertNotEqual(tcp.sport, 12345)
4456             post_twice_nat_port = tcp.sport
4457             self.assertEqual(ip.dst, self.pg6.remote_ip4)
4458             self.assertEqual(tcp.dport, local_port)
4459             self.assert_packet_checksums_valid(p)
4460         except:
4461             self.logger.error(ppp("Unexpected or invalid packet:", p))
4462             raise
4463
4464         p = (
4465             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
4466             / IP(src=self.pg6.remote_ip4, dst=twice_nat_addr)
4467             / TCP(sport=local_port, dport=post_twice_nat_port)
4468         )
4469         self.pg6.add_stream(p)
4470         self.pg_enable_capture(self.pg_interfaces)
4471         self.pg_start()
4472         capture = self.pg6.get_capture(1)
4473         p = capture[0]
4474         try:
4475             ip = p[IP]
4476             tcp = p[TCP]
4477             self.assertEqual(ip.src, self.pg1.remote_ip4)
4478             self.assertEqual(tcp.sport, external_port)
4479             self.assertEqual(ip.dst, self.pg6.remote_ip4)
4480             self.assertEqual(tcp.dport, 12345)
4481             self.assert_packet_checksums_valid(p)
4482         except:
4483             self.logger.error(ppp("Unexpected or invalid packet:", p))
4484             raise
4485
4486     def test_one_armed_nat44_static(self):
4487         """NAT44ED One armed NAT and 1:1 NAPT asymmetrical rule"""
4488
4489         remote_host = self.pg4.remote_hosts[0]
4490         local_host = self.pg4.remote_hosts[1]
4491         external_port = 80
4492         local_port = 8080
4493         eh_port_in = 0
4494
4495         self.vapi.nat44_forwarding_enable_disable(enable=1)
4496         self.nat_add_address(self.nat_addr, twice_nat=1)
4497         flags = (
4498             self.config_flags.NAT_IS_OUT2IN_ONLY | self.config_flags.NAT_IS_TWICE_NAT
4499         )
4500         self.nat_add_static_mapping(
4501             local_host.ip4,
4502             self.nat_addr,
4503             local_port,
4504             external_port,
4505             proto=IP_PROTOS.tcp,
4506             flags=flags,
4507         )
4508         flags = self.config_flags.NAT_IS_INSIDE
4509         self.vapi.nat44_interface_add_del_feature(
4510             sw_if_index=self.pg4.sw_if_index, is_add=1
4511         )
4512         self.vapi.nat44_interface_add_del_feature(
4513             sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
4514         )
4515
4516         # from client to service
4517         p = (
4518             Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
4519             / IP(src=remote_host.ip4, dst=self.nat_addr)
4520             / TCP(sport=12345, dport=external_port)
4521         )
4522         self.pg4.add_stream(p)
4523         self.pg_enable_capture(self.pg_interfaces)
4524         self.pg_start()
4525         capture = self.pg4.get_capture(1)
4526         p = capture[0]
4527         try:
4528             ip = p[IP]
4529             tcp = p[TCP]
4530             self.assertEqual(ip.dst, local_host.ip4)
4531             self.assertEqual(ip.src, self.nat_addr)
4532             self.assertEqual(tcp.dport, local_port)
4533             self.assertNotEqual(tcp.sport, 12345)
4534             eh_port_in = tcp.sport
4535             self.assert_packet_checksums_valid(p)
4536         except:
4537             self.logger.error(ppp("Unexpected or invalid packet:", p))
4538             raise
4539
4540         # from service back to client
4541         p = (
4542             Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
4543             / IP(src=local_host.ip4, dst=self.nat_addr)
4544             / TCP(sport=local_port, dport=eh_port_in)
4545         )
4546         self.pg4.add_stream(p)
4547         self.pg_enable_capture(self.pg_interfaces)
4548         self.pg_start()
4549         capture = self.pg4.get_capture(1)
4550         p = capture[0]
4551         try:
4552             ip = p[IP]
4553             tcp = p[TCP]
4554             self.assertEqual(ip.src, self.nat_addr)
4555             self.assertEqual(ip.dst, remote_host.ip4)
4556             self.assertEqual(tcp.sport, external_port)
4557             self.assertEqual(tcp.dport, 12345)
4558             self.assert_packet_checksums_valid(p)
4559         except:
4560             self.logger.error(ppp("Unexpected or invalid packet:", p))
4561             raise
4562
4563     def test_icmp_error_fwd_outbound(self):
4564         """NAT44ED ICMP error outbound with forwarding enabled"""
4565
4566         # Ensure that an outbound ICMP error message is properly associated
4567         # with the inbound forward bypass session it is related to.
4568         payload = "H" * 10
4569
4570         self.nat_add_address(self.nat_addr)
4571         self.nat_add_inside_interface(self.pg0)
4572         self.nat_add_outside_interface(self.pg1)
4573
4574         # enable forwarding and initiate connection out2in
4575         self.vapi.nat44_forwarding_enable_disable(enable=1)
4576         p1 = (
4577             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4578             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
4579             / UDP(sport=21, dport=20)
4580             / payload
4581         )
4582
4583         self.pg1.add_stream(p1)
4584         self.pg_enable_capture(self.pg_interfaces)
4585         self.pg_start()
4586         capture = self.pg0.get_capture(1)[0]
4587
4588         self.logger.info(self.vapi.cli("show nat44 sessions"))
4589
4590         # reply with ICMP error message in2out
4591         # We cannot reliably retrieve forward bypass sessions via the API.
4592         # session dumps for a user will only look on the worker that the
4593         # user is supposed to be mapped to in2out. The forward bypass session
4594         # is not necessarily created on that worker.
4595         p2 = (
4596             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4597             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4598             / ICMP(type="dest-unreach", code="port-unreachable")
4599             / capture[IP:]
4600         )
4601
4602         self.pg0.add_stream(p2)
4603         self.pg_enable_capture(self.pg_interfaces)
4604         self.pg_start()
4605         capture = self.pg1.get_capture(1)[0]
4606
4607         self.logger.info(self.vapi.cli("show nat44 sessions"))
4608
4609         self.logger.info(ppp("p1 packet:", p1))
4610         self.logger.info(ppp("p2 packet:", p2))
4611         self.logger.info(ppp("capture packet:", capture))
4612
4613     def test_tcp_session_open_retransmit1(self):
4614         """NAT44ED Open TCP session with SYN,ACK retransmit 1
4615
4616         The client does not receive the [SYN,ACK] or the
4617         ACK from the client is lost. Therefore, the [SYN, ACK]
4618         is retransmitted by the server.
4619         """
4620
4621         in_port = self.tcp_port_in
4622         ext_port = self.tcp_external_port
4623         payload = "H" * 10
4624
4625         self.nat_add_address(self.nat_addr)
4626         self.nat_add_inside_interface(self.pg0)
4627         self.nat_add_outside_interface(self.pg1)
4628
4629         self.vapi.nat_set_timeouts(
4630             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
4631         )
4632         # SYN packet in->out
4633         p = (
4634             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4635             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4636             / TCP(sport=in_port, dport=ext_port, flags="S")
4637         )
4638         p = self.send_and_expect(self.pg0, p, self.pg1)[0]
4639         out_port = p[TCP].sport
4640
4641         # SYN + ACK packet out->in
4642         p = (
4643             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4644             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4645             / TCP(sport=ext_port, dport=out_port, flags="SA")
4646         )
4647         self.send_and_expect(self.pg1, p, self.pg0)
4648
4649         # ACK in->out does not arrive
4650
4651         # resent SYN + ACK packet out->in
4652         p = (
4653             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4654             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4655             / TCP(sport=ext_port, dport=out_port, flags="SA")
4656         )
4657         self.send_and_expect(self.pg1, p, self.pg0)
4658
4659         # ACK packet in->out
4660         p = (
4661             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4662             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4663             / TCP(sport=in_port, dport=ext_port, flags="A")
4664         )
4665         self.send_and_expect(self.pg0, p, self.pg1)
4666
4667         # Verify that the data can be transmitted after the transitory time
4668         self.virtual_sleep(6)
4669
4670         p = (
4671             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4672             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4673             / TCP(sport=in_port, dport=ext_port, flags="PA")
4674             / Raw(payload)
4675         )
4676         self.send_and_expect(self.pg0, p, self.pg1)
4677
4678     def test_tcp_session_open_retransmit2(self):
4679         """NAT44ED Open TCP session with SYN,ACK retransmit 2
4680
4681         The ACK is lost to the server after the TCP session is opened.
4682         Data is sent by the client, then the [SYN,ACK] is
4683         retransmitted by the server.
4684         """
4685
4686         in_port = self.tcp_port_in
4687         ext_port = self.tcp_external_port
4688         payload = "H" * 10
4689
4690         self.nat_add_address(self.nat_addr)
4691         self.nat_add_inside_interface(self.pg0)
4692         self.nat_add_outside_interface(self.pg1)
4693
4694         self.vapi.nat_set_timeouts(
4695             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
4696         )
4697         # SYN packet in->out
4698         p = (
4699             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4700             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4701             / TCP(sport=in_port, dport=ext_port, flags="S")
4702         )
4703         p = self.send_and_expect(self.pg0, p, self.pg1)[0]
4704         out_port = p[TCP].sport
4705
4706         # SYN + ACK packet out->in
4707         p = (
4708             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4709             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4710             / TCP(sport=ext_port, dport=out_port, flags="SA")
4711         )
4712         self.send_and_expect(self.pg1, p, self.pg0)
4713
4714         # ACK packet in->out -- not received by the server
4715         p = (
4716             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4717             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4718             / TCP(sport=in_port, dport=ext_port, flags="A")
4719         )
4720         self.send_and_expect(self.pg0, p, self.pg1)
4721
4722         # PUSH + ACK packet in->out
4723         p = (
4724             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4725             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4726             / TCP(sport=in_port, dport=ext_port, flags="PA")
4727             / Raw(payload)
4728         )
4729         self.send_and_expect(self.pg0, p, self.pg1)
4730
4731         # resent SYN + ACK packet out->in
4732         p = (
4733             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4734             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4735             / TCP(sport=ext_port, dport=out_port, flags="SA")
4736         )
4737         self.send_and_expect(self.pg1, p, self.pg0)
4738
4739         # resent ACK packet in->out
4740         p = (
4741             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4742             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4743             / TCP(sport=in_port, dport=ext_port, flags="A")
4744         )
4745         self.send_and_expect(self.pg0, p, self.pg1)
4746
4747         # resent PUSH + ACK packet in->out
4748         p = (
4749             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4750             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4751             / TCP(sport=in_port, dport=ext_port, flags="PA")
4752             / Raw(payload)
4753         )
4754         self.send_and_expect(self.pg0, p, self.pg1)
4755
4756         # ACK packet out->in
4757         p = (
4758             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4759             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4760             / TCP(sport=ext_port, dport=out_port, flags="A")
4761         )
4762         self.send_and_expect(self.pg1, p, self.pg0)
4763
4764         # Verify that the data can be transmitted after the transitory time
4765         self.virtual_sleep(6)
4766
4767         p = (
4768             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4769             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4770             / TCP(sport=in_port, dport=ext_port, flags="PA")
4771             / Raw(payload)
4772         )
4773         self.send_and_expect(self.pg0, p, self.pg1)
4774
4775
4776 if __name__ == "__main__":
4777     unittest.main(testRunner=VppTestRunner)