nat: improve nat44-ed outside address distribution
[vpp.git] / test / test_nat44_ed.py
1 #!/usr/bin/env python3
2
3 import unittest
4 from io import BytesIO
5 from random import randint, choice
6
7 import re
8 import scapy.compat
9 from framework import tag_fixme_ubuntu2204, is_distro_ubuntu2204
10 from framework import VppTestCase, VppTestRunner, VppLoInterface
11 from scapy.data import IP_PROTOS
12 from scapy.layers.inet import IP, TCP, UDP, ICMP, GRE
13 from scapy.layers.inet import IPerror, TCPerror
14 from scapy.layers.l2 import Ether
15 from scapy.packet import Raw
16 from statistics import variance
17 from syslog_rfc5424_parser import SyslogMessage, ParseError
18 from syslog_rfc5424_parser.constants import SyslogSeverity
19 from util import ppp, pr, ip4_range
20 from vpp_acl import AclRule, VppAcl, VppAclInterface
21 from vpp_ip_route import VppIpRoute, VppRoutePath
22 from vpp_papi import VppEnum
23 from util import StatsDiff
24
25
26 class TestNAT44ED(VppTestCase):
27     """NAT44ED Test Case"""
28
29     nat_addr = "10.0.10.3"
30
31     tcp_port_in = 6303
32     tcp_port_out = 6303
33
34     udp_port_in = 6304
35     udp_port_out = 6304
36
37     icmp_id_in = 6305
38     icmp_id_out = 6305
39
40     tcp_external_port = 80
41
42     max_sessions = 100
43
44     def setUp(self):
45         super().setUp()
46         self.plugin_enable()
47
48     def tearDown(self):
49         super().tearDown()
50         if not self.vpp_dead:
51             self.plugin_disable()
52
53     def plugin_enable(self, max_sessions=None):
54         max_sessions = max_sessions or self.max_sessions
55         self.vapi.nat44_ed_plugin_enable_disable(sessions=max_sessions, enable=1)
56
57     def plugin_disable(self):
58         self.vapi.nat44_ed_plugin_enable_disable(enable=0)
59
60     @property
61     def config_flags(self):
62         return VppEnum.vl_api_nat_config_flags_t
63
64     @property
65     def nat44_config_flags(self):
66         return VppEnum.vl_api_nat44_config_flags_t
67
68     @property
69     def syslog_severity(self):
70         return VppEnum.vl_api_syslog_severity_t
71
72     @property
73     def server_addr(self):
74         return self.pg1.remote_hosts[0].ip4
75
76     @staticmethod
77     def random_port():
78         return randint(1024, 65535)
79
80     @staticmethod
81     def proto2layer(proto):
82         if proto == IP_PROTOS.tcp:
83             return TCP
84         elif proto == IP_PROTOS.udp:
85             return UDP
86         elif proto == IP_PROTOS.icmp:
87             return ICMP
88         else:
89             raise Exception("Unsupported protocol")
90
91     @classmethod
92     def create_and_add_ip4_table(cls, i, table_id=0):
93         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": table_id})
94         i.set_table_ip4(table_id)
95
96     @classmethod
97     def configure_ip4_interface(cls, i, hosts=0, table_id=None):
98         if table_id:
99             cls.create_and_add_ip4_table(i, table_id)
100
101         i.admin_up()
102         i.config_ip4()
103         i.resolve_arp()
104
105         if hosts:
106             i.generate_remote_hosts(hosts)
107             i.configure_ipv4_neighbors()
108
109     @classmethod
110     def nat_add_interface_address(cls, i):
111         cls.vapi.nat44_add_del_interface_addr(sw_if_index=i.sw_if_index, is_add=1)
112
113     def nat_add_inside_interface(self, i):
114         self.vapi.nat44_interface_add_del_feature(
115             flags=self.config_flags.NAT_IS_INSIDE, sw_if_index=i.sw_if_index, is_add=1
116         )
117
118     def nat_add_outside_interface(self, i):
119         self.vapi.nat44_interface_add_del_feature(
120             flags=self.config_flags.NAT_IS_OUTSIDE, sw_if_index=i.sw_if_index, is_add=1
121         )
122
123     def nat_add_address(self, address, twice_nat=0, vrf_id=0xFFFFFFFF, is_add=1):
124         flags = self.config_flags.NAT_IS_TWICE_NAT if twice_nat else 0
125         self.vapi.nat44_add_del_address_range(
126             first_ip_address=address,
127             last_ip_address=address,
128             vrf_id=vrf_id,
129             is_add=is_add,
130             flags=flags,
131         )
132
133     def nat_add_static_mapping(
134         self,
135         local_ip,
136         external_ip="0.0.0.0",
137         local_port=0,
138         external_port=0,
139         vrf_id=0,
140         is_add=1,
141         external_sw_if_index=0xFFFFFFFF,
142         proto=0,
143         tag="",
144         flags=0,
145     ):
146         if not (local_port and external_port):
147             flags |= self.config_flags.NAT_IS_ADDR_ONLY
148
149         self.vapi.nat44_add_del_static_mapping(
150             is_add=is_add,
151             local_ip_address=local_ip,
152             external_ip_address=external_ip,
153             external_sw_if_index=external_sw_if_index,
154             local_port=local_port,
155             external_port=external_port,
156             vrf_id=vrf_id,
157             protocol=proto,
158             flags=flags,
159             tag=tag,
160         )
161
162     @classmethod
163     def setUpClass(cls):
164         super().setUpClass()
165         if is_distro_ubuntu2204 == True and not hasattr(cls, "vpp"):
166             return
167
168         cls.create_pg_interfaces(range(12))
169         cls.interfaces = list(cls.pg_interfaces[:4])
170
171         cls.create_and_add_ip4_table(cls.pg2, 10)
172
173         for i in cls.interfaces:
174             cls.configure_ip4_interface(i, hosts=3)
175
176         # test specific (test-multiple-vrf)
177         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 1})
178
179         # test specific (test-one-armed-nat44-static)
180         cls.pg4.generate_remote_hosts(2)
181         cls.pg4.config_ip4()
182         cls.vapi.sw_interface_add_del_address(
183             sw_if_index=cls.pg4.sw_if_index, prefix="10.0.0.1/24"
184         )
185         cls.pg4.admin_up()
186         cls.pg4.resolve_arp()
187         cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
188         cls.pg4.resolve_arp()
189
190         # test specific interface (pg5)
191         cls.pg5._local_ip4 = "10.1.1.1"
192         cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
193         cls.pg5.set_table_ip4(1)
194         cls.pg5.config_ip4()
195         cls.pg5.admin_up()
196         cls.pg5.resolve_arp()
197
198         # test specific interface (pg6)
199         cls.pg6._local_ip4 = "10.1.2.1"
200         cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
201         cls.pg6.set_table_ip4(1)
202         cls.pg6.config_ip4()
203         cls.pg6.admin_up()
204         cls.pg6.resolve_arp()
205
206         rl = list()
207
208         rl.append(
209             VppIpRoute(
210                 cls,
211                 "0.0.0.0",
212                 0,
213                 [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=0)],
214                 register=False,
215                 table_id=1,
216             )
217         )
218         rl.append(
219             VppIpRoute(
220                 cls,
221                 "0.0.0.0",
222                 0,
223                 [VppRoutePath(cls.pg1.local_ip4, cls.pg1.sw_if_index)],
224                 register=False,
225             )
226         )
227         rl.append(
228             VppIpRoute(
229                 cls,
230                 cls.pg5.remote_ip4,
231                 32,
232                 [VppRoutePath("0.0.0.0", cls.pg5.sw_if_index)],
233                 register=False,
234                 table_id=1,
235             )
236         )
237         rl.append(
238             VppIpRoute(
239                 cls,
240                 cls.pg6.remote_ip4,
241                 32,
242                 [VppRoutePath("0.0.0.0", cls.pg6.sw_if_index)],
243                 register=False,
244                 table_id=1,
245             )
246         )
247         rl.append(
248             VppIpRoute(
249                 cls,
250                 cls.pg6.remote_ip4,
251                 16,
252                 [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=1)],
253                 register=False,
254                 table_id=0,
255             )
256         )
257
258         for r in rl:
259             r.add_vpp_config()
260
261         cls.no_diff = StatsDiff(
262             {
263                 pg.sw_if_index: {
264                     "/nat44-ed/in2out/fastpath/tcp": 0,
265                     "/nat44-ed/in2out/fastpath/udp": 0,
266                     "/nat44-ed/in2out/fastpath/icmp": 0,
267                     "/nat44-ed/in2out/fastpath/drops": 0,
268                     "/nat44-ed/in2out/slowpath/tcp": 0,
269                     "/nat44-ed/in2out/slowpath/udp": 0,
270                     "/nat44-ed/in2out/slowpath/icmp": 0,
271                     "/nat44-ed/in2out/slowpath/drops": 0,
272                     "/nat44-ed/in2out/fastpath/tcp": 0,
273                     "/nat44-ed/in2out/fastpath/udp": 0,
274                     "/nat44-ed/in2out/fastpath/icmp": 0,
275                     "/nat44-ed/in2out/fastpath/drops": 0,
276                     "/nat44-ed/in2out/slowpath/tcp": 0,
277                     "/nat44-ed/in2out/slowpath/udp": 0,
278                     "/nat44-ed/in2out/slowpath/icmp": 0,
279                     "/nat44-ed/in2out/slowpath/drops": 0,
280                 }
281                 for pg in cls.pg_interfaces
282             }
283         )
284
285     def get_err_counter(self, path):
286         return self.statistics.get_err_counter(path)
287
288     def reass_hairpinning(
289         self,
290         server_addr,
291         server_in_port,
292         server_out_port,
293         host_in_port,
294         proto=IP_PROTOS.tcp,
295         ignore_port=False,
296     ):
297         layer = self.proto2layer(proto)
298
299         if proto == IP_PROTOS.tcp:
300             data = b"A" * 4 + b"B" * 16 + b"C" * 3
301         else:
302             data = b"A" * 16 + b"B" * 16 + b"C" * 3
303
304         # send packet from host to server
305         pkts = self.create_stream_frag(
306             self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto
307         )
308         self.pg0.add_stream(pkts)
309         self.pg_enable_capture(self.pg_interfaces)
310         self.pg_start()
311         frags = self.pg0.get_capture(len(pkts))
312         p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr)
313         if proto != IP_PROTOS.icmp:
314             if not ignore_port:
315                 self.assertNotEqual(p[layer].sport, host_in_port)
316             self.assertEqual(p[layer].dport, server_in_port)
317         else:
318             if not ignore_port:
319                 self.assertNotEqual(p[layer].id, host_in_port)
320         self.assertEqual(data, p[Raw].load)
321
322     def frag_out_of_order(
323         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
324     ):
325         layer = self.proto2layer(proto)
326
327         if proto == IP_PROTOS.tcp:
328             data = b"A" * 4 + b"B" * 16 + b"C" * 3
329         else:
330             data = b"A" * 16 + b"B" * 16 + b"C" * 3
331         self.port_in = self.random_port()
332
333         for i in range(2):
334             # in2out
335             pkts = self.create_stream_frag(
336                 self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
337             )
338             pkts.reverse()
339             self.pg0.add_stream(pkts)
340             self.pg_enable_capture(self.pg_interfaces)
341             self.pg_start()
342             frags = self.pg1.get_capture(len(pkts))
343             if not dont_translate:
344                 p = self.reass_frags_and_verify(
345                     frags, self.nat_addr, self.pg1.remote_ip4
346                 )
347             else:
348                 p = self.reass_frags_and_verify(
349                     frags, self.pg0.remote_ip4, self.pg1.remote_ip4
350                 )
351             if proto != IP_PROTOS.icmp:
352                 if not dont_translate:
353                     self.assertEqual(p[layer].dport, 20)
354                     if not ignore_port:
355                         self.assertNotEqual(p[layer].sport, self.port_in)
356                 else:
357                     self.assertEqual(p[layer].sport, self.port_in)
358             else:
359                 if not ignore_port:
360                     if not dont_translate:
361                         self.assertNotEqual(p[layer].id, self.port_in)
362                     else:
363                         self.assertEqual(p[layer].id, self.port_in)
364             self.assertEqual(data, p[Raw].load)
365
366             # out2in
367             if not dont_translate:
368                 dst_addr = self.nat_addr
369             else:
370                 dst_addr = self.pg0.remote_ip4
371             if proto != IP_PROTOS.icmp:
372                 sport = 20
373                 dport = p[layer].sport
374             else:
375                 sport = p[layer].id
376                 dport = 0
377             pkts = self.create_stream_frag(
378                 self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
379             )
380             pkts.reverse()
381             self.pg1.add_stream(pkts)
382             self.pg_enable_capture(self.pg_interfaces)
383             self.logger.info(self.vapi.cli("show trace"))
384             self.pg_start()
385             frags = self.pg0.get_capture(len(pkts))
386             p = self.reass_frags_and_verify(
387                 frags, self.pg1.remote_ip4, self.pg0.remote_ip4
388             )
389             if proto != IP_PROTOS.icmp:
390                 self.assertEqual(p[layer].sport, 20)
391                 self.assertEqual(p[layer].dport, self.port_in)
392             else:
393                 self.assertEqual(p[layer].id, self.port_in)
394             self.assertEqual(data, p[Raw].load)
395
396     def reass_frags_and_verify(self, frags, src, dst):
397         buffer = BytesIO()
398         for p in frags:
399             self.assertEqual(p[IP].src, src)
400             self.assertEqual(p[IP].dst, dst)
401             self.assert_ip_checksum_valid(p)
402             buffer.seek(p[IP].frag * 8)
403             buffer.write(bytes(p[IP].payload))
404         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
405         if ip.proto == IP_PROTOS.tcp:
406             p = ip / TCP(buffer.getvalue())
407             self.logger.debug(ppp("Reassembled:", p))
408             self.assert_tcp_checksum_valid(p)
409         elif ip.proto == IP_PROTOS.udp:
410             p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
411         elif ip.proto == IP_PROTOS.icmp:
412             p = ip / ICMP(buffer.getvalue())
413         return p
414
415     def frag_in_order(
416         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
417     ):
418         layer = self.proto2layer(proto)
419
420         if proto == IP_PROTOS.tcp:
421             data = b"A" * 4 + b"B" * 16 + b"C" * 3
422         else:
423             data = b"A" * 16 + b"B" * 16 + b"C" * 3
424         self.port_in = self.random_port()
425
426         # in2out
427         pkts = self.create_stream_frag(
428             self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
429         )
430         self.pg0.add_stream(pkts)
431         self.pg_enable_capture(self.pg_interfaces)
432         self.pg_start()
433         frags = self.pg1.get_capture(len(pkts))
434         if not dont_translate:
435             p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
436         else:
437             p = self.reass_frags_and_verify(
438                 frags, self.pg0.remote_ip4, self.pg1.remote_ip4
439             )
440         if proto != IP_PROTOS.icmp:
441             if not dont_translate:
442                 self.assertEqual(p[layer].dport, 20)
443                 if not ignore_port:
444                     self.assertNotEqual(p[layer].sport, self.port_in)
445             else:
446                 self.assertEqual(p[layer].sport, self.port_in)
447         else:
448             if not ignore_port:
449                 if not dont_translate:
450                     self.assertNotEqual(p[layer].id, self.port_in)
451                 else:
452                     self.assertEqual(p[layer].id, self.port_in)
453         self.assertEqual(data, p[Raw].load)
454
455         # out2in
456         if not dont_translate:
457             dst_addr = self.nat_addr
458         else:
459             dst_addr = self.pg0.remote_ip4
460         if proto != IP_PROTOS.icmp:
461             sport = 20
462             dport = p[layer].sport
463         else:
464             sport = p[layer].id
465             dport = 0
466         pkts = self.create_stream_frag(
467             self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
468         )
469         self.pg1.add_stream(pkts)
470         self.pg_enable_capture(self.pg_interfaces)
471         self.pg_start()
472         frags = self.pg0.get_capture(len(pkts))
473         p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
474         if proto != IP_PROTOS.icmp:
475             self.assertEqual(p[layer].sport, 20)
476             self.assertEqual(p[layer].dport, self.port_in)
477         else:
478             self.assertEqual(p[layer].id, self.port_in)
479         self.assertEqual(data, p[Raw].load)
480
481     def verify_capture_out(
482         self, capture, nat_ip=None, same_port=False, dst_ip=None, ignore_port=False
483     ):
484         if nat_ip is None:
485             nat_ip = self.nat_addr
486         for packet in capture:
487             try:
488                 self.assert_packet_checksums_valid(packet)
489                 self.assertEqual(packet[IP].src, nat_ip)
490                 if dst_ip is not None:
491                     self.assertEqual(packet[IP].dst, dst_ip)
492                 if packet.haslayer(TCP):
493                     if not ignore_port:
494                         if same_port:
495                             self.assertEqual(packet[TCP].sport, self.tcp_port_in)
496                         else:
497                             self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
498                     self.tcp_port_out = packet[TCP].sport
499                     self.assert_packet_checksums_valid(packet)
500                 elif packet.haslayer(UDP):
501                     if not ignore_port:
502                         if same_port:
503                             self.assertEqual(packet[UDP].sport, self.udp_port_in)
504                         else:
505                             self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
506                     self.udp_port_out = packet[UDP].sport
507                 else:
508                     if not ignore_port:
509                         if same_port:
510                             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
511                         else:
512                             self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
513                     self.icmp_id_out = packet[ICMP].id
514                     self.assert_packet_checksums_valid(packet)
515             except:
516                 self.logger.error(
517                     ppp("Unexpected or invalid packet (outside network):", packet)
518                 )
519                 raise
520
521     def verify_capture_in(self, capture, in_if):
522         for packet in capture:
523             try:
524                 self.assert_packet_checksums_valid(packet)
525                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
526                 if packet.haslayer(TCP):
527                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
528                 elif packet.haslayer(UDP):
529                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
530                 else:
531                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
532             except:
533                 self.logger.error(
534                     ppp("Unexpected or invalid packet (inside network):", packet)
535                 )
536                 raise
537
538     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
539         if dst_ip is None:
540             dst_ip = out_if.remote_ip4
541
542         pkts = []
543         # TCP
544         p = (
545             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
546             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
547             / TCP(sport=self.tcp_port_in, dport=20)
548         )
549         pkts.extend([p, p])
550
551         # UDP
552         p = (
553             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
554             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
555             / UDP(sport=self.udp_port_in, dport=20)
556         )
557         pkts.append(p)
558
559         # ICMP
560         p = (
561             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
562             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
563             / ICMP(id=self.icmp_id_in, type="echo-request")
564         )
565         pkts.append(p)
566
567         return pkts
568
569     def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
570         if dst_ip is None:
571             dst_ip = self.nat_addr
572         if not use_inside_ports:
573             tcp_port = self.tcp_port_out
574             udp_port = self.udp_port_out
575             icmp_id = self.icmp_id_out
576         else:
577             tcp_port = self.tcp_port_in
578             udp_port = self.udp_port_in
579             icmp_id = self.icmp_id_in
580         pkts = []
581         # TCP
582         p = (
583             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
584             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
585             / TCP(dport=tcp_port, sport=20)
586         )
587         pkts.extend([p, p])
588
589         # UDP
590         p = (
591             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
592             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
593             / UDP(dport=udp_port, sport=20)
594         )
595         pkts.append(p)
596
597         # ICMP
598         p = (
599             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
600             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
601             / ICMP(id=icmp_id, type="echo-reply")
602         )
603         pkts.append(p)
604
605         return pkts
606
607     def create_tcp_stream(self, in_if, out_if, count):
608         pkts = []
609         port = 6303
610
611         for i in range(count):
612             p = (
613                 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
614                 / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64)
615                 / TCP(sport=port + i, dport=20)
616             )
617             pkts.append(p)
618
619         return pkts
620
621     def create_udp_stream(self, in_if, out_if, count, base_port=6303):
622         return [
623             (
624                 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
625                 / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64)
626                 / UDP(sport=base_port + i, dport=20)
627             )
628             for i in range(count)
629         ]
630
631     def create_stream_frag(
632         self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
633     ):
634         if proto == IP_PROTOS.tcp:
635             p = (
636                 IP(src=src_if.remote_ip4, dst=dst)
637                 / TCP(sport=sport, dport=dport)
638                 / Raw(data)
639             )
640             p = p.__class__(scapy.compat.raw(p))
641             chksum = p[TCP].chksum
642             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
643         elif proto == IP_PROTOS.udp:
644             proto_header = UDP(sport=sport, dport=dport)
645         elif proto == IP_PROTOS.icmp:
646             if not echo_reply:
647                 proto_header = ICMP(id=sport, type="echo-request")
648             else:
649                 proto_header = ICMP(id=sport, type="echo-reply")
650         else:
651             raise Exception("Unsupported protocol")
652         id = self.random_port()
653         pkts = []
654         if proto == IP_PROTOS.tcp:
655             raw = Raw(data[0:4])
656         else:
657             raw = Raw(data[0:16])
658         p = (
659             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
660             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
661             / proto_header
662             / raw
663         )
664         pkts.append(p)
665         if proto == IP_PROTOS.tcp:
666             raw = Raw(data[4:20])
667         else:
668             raw = Raw(data[16:32])
669         p = (
670             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
671             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
672             / raw
673         )
674         pkts.append(p)
675         if proto == IP_PROTOS.tcp:
676             raw = Raw(data[20:])
677         else:
678             raw = Raw(data[32:])
679         p = (
680             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
681             / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
682             / raw
683         )
684         pkts.append(p)
685         return pkts
686
687     def frag_in_order_in_plus_out(
688         self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
689     ):
690         layer = self.proto2layer(proto)
691
692         if proto == IP_PROTOS.tcp:
693             data = b"A" * 4 + b"B" * 16 + b"C" * 3
694         else:
695             data = b"A" * 16 + b"B" * 16 + b"C" * 3
696         port_in = self.random_port()
697
698         for i in range(2):
699             # out2in
700             pkts = self.create_stream_frag(
701                 self.pg0, out_addr, port_in, out_port, data, proto
702             )
703             self.pg0.add_stream(pkts)
704             self.pg_enable_capture(self.pg_interfaces)
705             self.pg_start()
706             frags = self.pg1.get_capture(len(pkts))
707             p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr)
708             if proto != IP_PROTOS.icmp:
709                 self.assertEqual(p[layer].sport, port_in)
710                 self.assertEqual(p[layer].dport, in_port)
711             else:
712                 self.assertEqual(p[layer].id, port_in)
713             self.assertEqual(data, p[Raw].load)
714
715             # in2out
716             if proto != IP_PROTOS.icmp:
717                 pkts = self.create_stream_frag(
718                     self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto
719                 )
720             else:
721                 pkts = self.create_stream_frag(
722                     self.pg1,
723                     self.pg0.remote_ip4,
724                     p[layer].id,
725                     0,
726                     data,
727                     proto,
728                     echo_reply=True,
729                 )
730             self.pg1.add_stream(pkts)
731             self.pg_enable_capture(self.pg_interfaces)
732             self.pg_start()
733             frags = self.pg0.get_capture(len(pkts))
734             p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4)
735             if proto != IP_PROTOS.icmp:
736                 self.assertEqual(p[layer].sport, out_port)
737                 self.assertEqual(p[layer].dport, port_in)
738             else:
739                 self.assertEqual(p[layer].id, port_in)
740             self.assertEqual(data, p[Raw].load)
741
742     def frag_out_of_order_in_plus_out(
743         self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
744     ):
745         layer = self.proto2layer(proto)
746
747         if proto == IP_PROTOS.tcp:
748             data = b"A" * 4 + b"B" * 16 + b"C" * 3
749         else:
750             data = b"A" * 16 + b"B" * 16 + b"C" * 3
751         port_in = self.random_port()
752
753         for i in range(2):
754             # out2in
755             pkts = self.create_stream_frag(
756                 self.pg0, out_addr, port_in, out_port, data, proto
757             )
758             pkts.reverse()
759             self.pg0.add_stream(pkts)
760             self.pg_enable_capture(self.pg_interfaces)
761             self.pg_start()
762             frags = self.pg1.get_capture(len(pkts))
763             p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr)
764             if proto != IP_PROTOS.icmp:
765                 self.assertEqual(p[layer].dport, in_port)
766                 self.assertEqual(p[layer].sport, port_in)
767                 self.assertEqual(p[layer].dport, in_port)
768             else:
769                 self.assertEqual(p[layer].id, port_in)
770             self.assertEqual(data, p[Raw].load)
771
772             # in2out
773             if proto != IP_PROTOS.icmp:
774                 pkts = self.create_stream_frag(
775                     self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto
776                 )
777             else:
778                 pkts = self.create_stream_frag(
779                     self.pg1,
780                     self.pg0.remote_ip4,
781                     p[layer].id,
782                     0,
783                     data,
784                     proto,
785                     echo_reply=True,
786                 )
787             pkts.reverse()
788             self.pg1.add_stream(pkts)
789             self.pg_enable_capture(self.pg_interfaces)
790             self.pg_start()
791             frags = self.pg0.get_capture(len(pkts))
792             p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4)
793             if proto != IP_PROTOS.icmp:
794                 self.assertEqual(p[layer].sport, out_port)
795                 self.assertEqual(p[layer].dport, port_in)
796             else:
797                 self.assertEqual(p[layer].id, port_in)
798             self.assertEqual(data, p[Raw].load)
799
800     def init_tcp_session(self, in_if, out_if, in_port, ext_port):
801         # SYN packet in->out
802         p = (
803             Ether(src=in_if.remote_mac, dst=in_if.local_mac)
804             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
805             / TCP(sport=in_port, dport=ext_port, flags="S")
806         )
807         in_if.add_stream(p)
808         self.pg_enable_capture(self.pg_interfaces)
809         self.pg_start()
810         capture = out_if.get_capture(1)
811         p = capture[0]
812         out_port = p[TCP].sport
813
814         # SYN + ACK packet out->in
815         p = (
816             Ether(src=out_if.remote_mac, dst=out_if.local_mac)
817             / IP(src=out_if.remote_ip4, dst=self.nat_addr)
818             / TCP(sport=ext_port, dport=out_port, flags="SA")
819         )
820         out_if.add_stream(p)
821         self.pg_enable_capture(self.pg_interfaces)
822         self.pg_start()
823         in_if.get_capture(1)
824
825         # ACK packet in->out
826         p = (
827             Ether(src=in_if.remote_mac, dst=in_if.local_mac)
828             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
829             / TCP(sport=in_port, dport=ext_port, flags="A")
830         )
831         in_if.add_stream(p)
832         self.pg_enable_capture(self.pg_interfaces)
833         self.pg_start()
834         out_if.get_capture(1)
835
836         return out_port
837
838     def twice_nat_common(
839         self, self_twice_nat=False, same_pg=False, lb=False, client_id=None
840     ):
841         twice_nat_addr = "10.0.1.3"
842
843         port_in = 8080
844         if lb:
845             if not same_pg:
846                 port_in1 = port_in
847                 port_in2 = port_in
848             else:
849                 port_in1 = port_in + 1
850                 port_in2 = port_in + 2
851
852         port_out = 80
853         eh_port_out = 4567
854
855         server1 = self.pg0.remote_hosts[0]
856         server2 = self.pg0.remote_hosts[1]
857         if lb and same_pg:
858             server2 = server1
859         if not lb:
860             server = server1
861
862         pg0 = self.pg0
863         if same_pg:
864             pg1 = self.pg0
865         else:
866             pg1 = self.pg1
867
868         eh_translate = (not self_twice_nat) or (not lb and same_pg) or client_id == 1
869
870         self.nat_add_address(self.nat_addr)
871         self.nat_add_address(twice_nat_addr, twice_nat=1)
872
873         flags = 0
874         if self_twice_nat:
875             flags |= self.config_flags.NAT_IS_SELF_TWICE_NAT
876         else:
877             flags |= self.config_flags.NAT_IS_TWICE_NAT
878
879         if not lb:
880             self.nat_add_static_mapping(
881                 pg0.remote_ip4,
882                 self.nat_addr,
883                 port_in,
884                 port_out,
885                 proto=IP_PROTOS.tcp,
886                 flags=flags,
887             )
888         else:
889             locals = [
890                 {"addr": server1.ip4, "port": port_in1, "probability": 50, "vrf_id": 0},
891                 {"addr": server2.ip4, "port": port_in2, "probability": 50, "vrf_id": 0},
892             ]
893             out_addr = self.nat_addr
894
895             self.vapi.nat44_add_del_lb_static_mapping(
896                 is_add=1,
897                 flags=flags,
898                 external_addr=out_addr,
899                 external_port=port_out,
900                 protocol=IP_PROTOS.tcp,
901                 local_num=len(locals),
902                 locals=locals,
903             )
904         self.nat_add_inside_interface(pg0)
905         self.nat_add_outside_interface(pg1)
906
907         if same_pg:
908             if not lb:
909                 client = server
910             else:
911                 assert client_id is not None
912                 if client_id == 1:
913                     client = self.pg0.remote_hosts[0]
914                 elif client_id == 2:
915                     client = self.pg0.remote_hosts[1]
916         else:
917             client = pg1.remote_hosts[0]
918         p = (
919             Ether(src=pg1.remote_mac, dst=pg1.local_mac)
920             / IP(src=client.ip4, dst=self.nat_addr)
921             / TCP(sport=eh_port_out, dport=port_out)
922         )
923         pg1.add_stream(p)
924         self.pg_enable_capture(self.pg_interfaces)
925         self.pg_start()
926         capture = pg0.get_capture(1)
927         p = capture[0]
928         try:
929             ip = p[IP]
930             tcp = p[TCP]
931             if lb:
932                 if ip.dst == server1.ip4:
933                     server = server1
934                     port_in = port_in1
935                 else:
936                     server = server2
937                     port_in = port_in2
938             self.assertEqual(ip.dst, server.ip4)
939             if lb and same_pg:
940                 self.assertIn(tcp.dport, [port_in1, port_in2])
941             else:
942                 self.assertEqual(tcp.dport, port_in)
943             if eh_translate:
944                 self.assertEqual(ip.src, twice_nat_addr)
945                 self.assertNotEqual(tcp.sport, eh_port_out)
946             else:
947                 self.assertEqual(ip.src, client.ip4)
948                 self.assertEqual(tcp.sport, eh_port_out)
949             eh_addr_in = ip.src
950             eh_port_in = tcp.sport
951             saved_port_in = tcp.dport
952             self.assert_packet_checksums_valid(p)
953         except:
954             self.logger.error(ppp("Unexpected or invalid packet:", p))
955             raise
956
957         p = (
958             Ether(src=server.mac, dst=pg0.local_mac)
959             / IP(src=server.ip4, dst=eh_addr_in)
960             / TCP(sport=saved_port_in, dport=eh_port_in)
961         )
962         pg0.add_stream(p)
963         self.pg_enable_capture(self.pg_interfaces)
964         self.pg_start()
965         capture = pg1.get_capture(1)
966         p = capture[0]
967         try:
968             ip = p[IP]
969             tcp = p[TCP]
970             self.assertEqual(ip.dst, client.ip4)
971             self.assertEqual(ip.src, self.nat_addr)
972             self.assertEqual(tcp.dport, eh_port_out)
973             self.assertEqual(tcp.sport, port_out)
974             self.assert_packet_checksums_valid(p)
975         except:
976             self.logger.error(ppp("Unexpected or invalid packet:", p))
977             raise
978
979         if eh_translate:
980             sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
981             self.assertEqual(len(sessions), 1)
982             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
983             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_TWICE_NAT)
984             self.logger.info(self.vapi.cli("show nat44 sessions"))
985             self.vapi.nat44_del_session(
986                 address=sessions[0].inside_ip_address,
987                 port=sessions[0].inside_port,
988                 protocol=sessions[0].protocol,
989                 flags=(
990                     self.config_flags.NAT_IS_INSIDE
991                     | self.config_flags.NAT_IS_EXT_HOST_VALID
992                 ),
993                 ext_host_address=sessions[0].ext_host_nat_address,
994                 ext_host_port=sessions[0].ext_host_nat_port,
995             )
996             sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
997             self.assertEqual(len(sessions), 0)
998
999     def verify_syslog_sess(self, data, msgid, is_ip6=False):
1000         message = data.decode("utf-8")
1001         try:
1002             message = SyslogMessage.parse(message)
1003         except ParseError as e:
1004             self.logger.error(e)
1005             raise
1006         else:
1007             self.assertEqual(message.severity, SyslogSeverity.info)
1008             self.assertEqual(message.appname, "NAT")
1009             self.assertEqual(message.msgid, msgid)
1010             sd_params = message.sd.get("nsess")
1011             self.assertTrue(sd_params is not None)
1012             if is_ip6:
1013                 self.assertEqual(sd_params.get("IATYP"), "IPv6")
1014                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip6)
1015             else:
1016                 self.assertEqual(sd_params.get("IATYP"), "IPv4")
1017                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
1018                 self.assertTrue(sd_params.get("SSUBIX") is not None)
1019             self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
1020             self.assertEqual(sd_params.get("XATYP"), "IPv4")
1021             self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
1022             self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
1023             self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
1024             self.assertEqual(sd_params.get("SVLAN"), "0")
1025             self.assertEqual(sd_params.get("XDADDR"), self.pg1.remote_ip4)
1026             self.assertEqual(sd_params.get("XDPORT"), "%d" % self.tcp_external_port)
1027
1028     def test_icmp_error(self):
1029         """NAT44ED test ICMP error message with inner header"""
1030
1031         payload = "H" * 10
1032
1033         self.nat_add_address(self.nat_addr)
1034         self.nat_add_inside_interface(self.pg0)
1035         self.nat_add_outside_interface(self.pg1)
1036
1037         # in2out (initiate connection)
1038         p1 = [
1039             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1040             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1041             / UDP(sport=21, dport=20)
1042             / payload,
1043             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1044             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1045             / TCP(sport=21, dport=20, flags="S")
1046             / payload,
1047             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1048             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1049             / ICMP(type="echo-request", id=7777)
1050             / payload,
1051         ]
1052
1053         capture = self.send_and_expect(self.pg0, p1, self.pg1)
1054
1055         # out2in (send error message)
1056         p2 = [
1057             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1058             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1059             / ICMP(type="dest-unreach", code="port-unreachable")
1060             / c[IP:]
1061             for c in capture
1062         ]
1063
1064         capture = self.send_and_expect(self.pg1, p2, self.pg0)
1065
1066         for c in capture:
1067             try:
1068                 assert c[IP].dst == self.pg0.remote_ip4
1069                 assert c[IPerror].src == self.pg0.remote_ip4
1070             except AssertionError as a:
1071                 raise AssertionError(f"Packet {pr(c)} not translated properly") from a
1072
1073     def test_icmp_echo_reply_trailer(self):
1074         """ICMP echo reply with ethernet trailer"""
1075
1076         self.nat_add_address(self.nat_addr)
1077         self.nat_add_inside_interface(self.pg0)
1078         self.nat_add_outside_interface(self.pg1)
1079
1080         # in2out
1081         p1 = (
1082             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1083             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1084             / ICMP(type=8, id=0xABCD, seq=0)
1085         )
1086
1087         self.pg0.add_stream(p1)
1088         self.pg_enable_capture(self.pg_interfaces)
1089         self.pg_start()
1090         c = self.pg1.get_capture(1)[0]
1091
1092         self.logger.debug(self.vapi.cli("show trace"))
1093
1094         # out2in
1095         p2 = (
1096             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1097             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, id=0xEE59)
1098             / ICMP(type=0, id=c[ICMP].id, seq=0)
1099         )
1100
1101         # force checksum calculation
1102         p2 = p2.__class__(bytes(p2))
1103
1104         self.logger.debug(ppp("Packet before modification:", p2))
1105
1106         # hex representation of vss monitoring ethernet trailer
1107         # this seems to be just added to end of packet without modifying
1108         # IP or ICMP lengths / checksums
1109         p2 = p2 / Raw("\x00\x00\x52\x54\x00\x46\xab\x04\x84\x18")
1110         # change it so that IP/ICMP is unaffected
1111         p2[IP].len = 28
1112
1113         self.logger.debug(ppp("Packet with added trailer:", p2))
1114
1115         self.pg1.add_stream(p2)
1116         self.pg_enable_capture(self.pg_interfaces)
1117         self.pg_start()
1118
1119         self.pg0.get_capture(1)
1120
1121     def test_users_dump(self):
1122         """NAT44ED API test - nat44_user_dump"""
1123
1124         self.nat_add_address(self.nat_addr)
1125         self.nat_add_inside_interface(self.pg0)
1126         self.nat_add_outside_interface(self.pg1)
1127
1128         self.vapi.nat44_forwarding_enable_disable(enable=1)
1129
1130         local_ip = self.pg0.remote_ip4
1131         external_ip = self.nat_addr
1132         self.nat_add_static_mapping(local_ip, external_ip)
1133
1134         users = self.vapi.nat44_user_dump()
1135         self.assertEqual(len(users), 0)
1136
1137         # in2out - static mapping match
1138
1139         pkts = self.create_stream_out(self.pg1)
1140         self.pg1.add_stream(pkts)
1141         self.pg_enable_capture(self.pg_interfaces)
1142         self.pg_start()
1143         capture = self.pg0.get_capture(len(pkts))
1144         self.verify_capture_in(capture, self.pg0)
1145
1146         pkts = self.create_stream_in(self.pg0, self.pg1)
1147         self.pg0.add_stream(pkts)
1148         self.pg_enable_capture(self.pg_interfaces)
1149         self.pg_start()
1150         capture = self.pg1.get_capture(len(pkts))
1151         self.verify_capture_out(capture, same_port=True)
1152
1153         users = self.vapi.nat44_user_dump()
1154         self.assertEqual(len(users), 1)
1155         static_user = users[0]
1156         self.assertEqual(static_user.nstaticsessions, 3)
1157         self.assertEqual(static_user.nsessions, 0)
1158
1159         # in2out - no static mapping match (forwarding test)
1160
1161         host0 = self.pg0.remote_hosts[0]
1162         self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1163         try:
1164             pkts = self.create_stream_out(
1165                 self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1166             )
1167             self.pg1.add_stream(pkts)
1168             self.pg_enable_capture(self.pg_interfaces)
1169             self.pg_start()
1170             capture = self.pg0.get_capture(len(pkts))
1171             self.verify_capture_in(capture, self.pg0)
1172
1173             pkts = self.create_stream_in(self.pg0, self.pg1)
1174             self.pg0.add_stream(pkts)
1175             self.pg_enable_capture(self.pg_interfaces)
1176             self.pg_start()
1177             capture = self.pg1.get_capture(len(pkts))
1178             self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, same_port=True)
1179         finally:
1180             self.pg0.remote_hosts[0] = host0
1181
1182         users = self.vapi.nat44_user_dump()
1183         self.assertEqual(len(users), 2)
1184         if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4:
1185             non_static_user = users[1]
1186             static_user = users[0]
1187         else:
1188             non_static_user = users[0]
1189             static_user = users[1]
1190         self.assertEqual(static_user.nstaticsessions, 3)
1191         self.assertEqual(static_user.nsessions, 0)
1192         self.assertEqual(non_static_user.nstaticsessions, 0)
1193         self.assertEqual(non_static_user.nsessions, 3)
1194
1195         users = self.vapi.nat44_user_dump()
1196         self.assertEqual(len(users), 2)
1197         if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4:
1198             non_static_user = users[1]
1199             static_user = users[0]
1200         else:
1201             non_static_user = users[0]
1202             static_user = users[1]
1203         self.assertEqual(static_user.nstaticsessions, 3)
1204         self.assertEqual(static_user.nsessions, 0)
1205         self.assertEqual(non_static_user.nstaticsessions, 0)
1206         self.assertEqual(non_static_user.nsessions, 3)
1207
1208     def test_frag_out_of_order_do_not_translate(self):
1209         """NAT44ED don't translate fragments arriving out of order"""
1210         self.nat_add_inside_interface(self.pg0)
1211         self.nat_add_outside_interface(self.pg1)
1212         self.vapi.nat44_forwarding_enable_disable(enable=True)
1213         self.frag_out_of_order(proto=IP_PROTOS.tcp, dont_translate=True)
1214
1215     def test_forwarding(self):
1216         """NAT44ED forwarding test"""
1217
1218         self.nat_add_inside_interface(self.pg0)
1219         self.nat_add_outside_interface(self.pg1)
1220         self.vapi.nat44_forwarding_enable_disable(enable=1)
1221
1222         real_ip = self.pg0.remote_ip4
1223         alias_ip = self.nat_addr
1224         flags = self.config_flags.NAT_IS_ADDR_ONLY
1225         self.vapi.nat44_add_del_static_mapping(
1226             is_add=1,
1227             local_ip_address=real_ip,
1228             external_ip_address=alias_ip,
1229             external_sw_if_index=0xFFFFFFFF,
1230             flags=flags,
1231         )
1232
1233         try:
1234             # in2out - static mapping match
1235
1236             pkts = self.create_stream_out(self.pg1)
1237             self.pg1.add_stream(pkts)
1238             self.pg_enable_capture(self.pg_interfaces)
1239             self.pg_start()
1240             capture = self.pg0.get_capture(len(pkts))
1241             self.verify_capture_in(capture, self.pg0)
1242
1243             pkts = self.create_stream_in(self.pg0, self.pg1)
1244             self.pg0.add_stream(pkts)
1245             self.pg_enable_capture(self.pg_interfaces)
1246             self.pg_start()
1247             capture = self.pg1.get_capture(len(pkts))
1248             self.verify_capture_out(capture, same_port=True)
1249
1250             # in2out - no static mapping match
1251
1252             host0 = self.pg0.remote_hosts[0]
1253             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1254             try:
1255                 pkts = self.create_stream_out(
1256                     self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1257                 )
1258                 self.pg1.add_stream(pkts)
1259                 self.pg_enable_capture(self.pg_interfaces)
1260                 self.pg_start()
1261                 capture = self.pg0.get_capture(len(pkts))
1262                 self.verify_capture_in(capture, self.pg0)
1263
1264                 pkts = self.create_stream_in(self.pg0, self.pg1)
1265                 self.pg0.add_stream(pkts)
1266                 self.pg_enable_capture(self.pg_interfaces)
1267                 self.pg_start()
1268                 capture = self.pg1.get_capture(len(pkts))
1269                 self.verify_capture_out(
1270                     capture, nat_ip=self.pg0.remote_ip4, same_port=True
1271                 )
1272             finally:
1273                 self.pg0.remote_hosts[0] = host0
1274
1275             user = self.pg0.remote_hosts[1]
1276             sessions = self.vapi.nat44_user_session_dump(user.ip4, 0)
1277             self.assertEqual(len(sessions), 3)
1278             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1279             self.vapi.nat44_del_session(
1280                 address=sessions[0].inside_ip_address,
1281                 port=sessions[0].inside_port,
1282                 protocol=sessions[0].protocol,
1283                 flags=(
1284                     self.config_flags.NAT_IS_INSIDE
1285                     | self.config_flags.NAT_IS_EXT_HOST_VALID
1286                 ),
1287                 ext_host_address=sessions[0].ext_host_address,
1288                 ext_host_port=sessions[0].ext_host_port,
1289             )
1290             sessions = self.vapi.nat44_user_session_dump(user.ip4, 0)
1291             self.assertEqual(len(sessions), 2)
1292
1293         finally:
1294             self.vapi.nat44_forwarding_enable_disable(enable=0)
1295             flags = self.config_flags.NAT_IS_ADDR_ONLY
1296             self.vapi.nat44_add_del_static_mapping(
1297                 is_add=0,
1298                 local_ip_address=real_ip,
1299                 external_ip_address=alias_ip,
1300                 external_sw_if_index=0xFFFFFFFF,
1301                 flags=flags,
1302             )
1303
1304     def test_output_feature_and_service2(self):
1305         """NAT44ED interface output feature and service host direct access"""
1306         self.vapi.nat44_forwarding_enable_disable(enable=1)
1307         self.nat_add_address(self.nat_addr)
1308
1309         self.vapi.nat44_ed_add_del_output_interface(
1310             sw_if_index=self.pg1.sw_if_index, is_add=1
1311         )
1312
1313         # session initiated from service host - translate
1314         pkts = self.create_stream_in(self.pg0, self.pg1)
1315         self.pg0.add_stream(pkts)
1316         self.pg_enable_capture(self.pg_interfaces)
1317         self.pg_start()
1318         capture = self.pg1.get_capture(len(pkts))
1319         self.verify_capture_out(capture, ignore_port=True)
1320
1321         pkts = self.create_stream_out(self.pg1)
1322         self.pg1.add_stream(pkts)
1323         self.pg_enable_capture(self.pg_interfaces)
1324         self.pg_start()
1325         capture = self.pg0.get_capture(len(pkts))
1326         self.verify_capture_in(capture, self.pg0)
1327
1328         # session initiated from remote host - do not translate
1329         tcp_port_in = self.tcp_port_in
1330         udp_port_in = self.udp_port_in
1331         icmp_id_in = self.icmp_id_in
1332
1333         self.tcp_port_in = 60303
1334         self.udp_port_in = 60304
1335         self.icmp_id_in = 60305
1336
1337         try:
1338             pkts = self.create_stream_out(
1339                 self.pg1, self.pg0.remote_ip4, use_inside_ports=True
1340             )
1341             self.pg1.add_stream(pkts)
1342             self.pg_enable_capture(self.pg_interfaces)
1343             self.pg_start()
1344             capture = self.pg0.get_capture(len(pkts))
1345             self.verify_capture_in(capture, self.pg0)
1346
1347             pkts = self.create_stream_in(self.pg0, self.pg1)
1348             self.pg0.add_stream(pkts)
1349             self.pg_enable_capture(self.pg_interfaces)
1350             self.pg_start()
1351             capture = self.pg1.get_capture(len(pkts))
1352             self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, same_port=True)
1353         finally:
1354             self.tcp_port_in = tcp_port_in
1355             self.udp_port_in = udp_port_in
1356             self.icmp_id_in = icmp_id_in
1357
1358     def test_twice_nat(self):
1359         """NAT44ED Twice NAT"""
1360         self.twice_nat_common()
1361
1362     def test_self_twice_nat_positive(self):
1363         """NAT44ED Self Twice NAT (positive test)"""
1364         self.twice_nat_common(self_twice_nat=True, same_pg=True)
1365
1366     def test_self_twice_nat_lb_positive(self):
1367         """NAT44ED Self Twice NAT local service load balancing (positive test)"""
1368         self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, client_id=1)
1369
1370     def test_twice_nat_lb(self):
1371         """NAT44ED Twice NAT local service load balancing"""
1372         self.twice_nat_common(lb=True)
1373
1374     def test_output_feature(self):
1375         """NAT44ED interface output feature (in2out postrouting)"""
1376         self.vapi.nat44_forwarding_enable_disable(enable=1)
1377         self.nat_add_address(self.nat_addr)
1378
1379         self.nat_add_outside_interface(self.pg0)
1380         self.vapi.nat44_ed_add_del_output_interface(
1381             sw_if_index=self.pg1.sw_if_index, is_add=1
1382         )
1383
1384         # in2out
1385         pkts = self.create_stream_in(self.pg0, self.pg1)
1386         self.pg0.add_stream(pkts)
1387         self.pg_enable_capture(self.pg_interfaces)
1388         self.pg_start()
1389         capture = self.pg1.get_capture(len(pkts))
1390         self.verify_capture_out(capture, ignore_port=True)
1391         self.logger.debug(self.vapi.cli("show trace"))
1392
1393         # out2in
1394         pkts = self.create_stream_out(self.pg1)
1395         self.pg1.add_stream(pkts)
1396         self.pg_enable_capture(self.pg_interfaces)
1397         self.pg_start()
1398         capture = self.pg0.get_capture(len(pkts))
1399         self.verify_capture_in(capture, self.pg0)
1400         self.logger.debug(self.vapi.cli("show trace"))
1401
1402         # in2out
1403         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1404         self.pg0.add_stream(pkts)
1405         self.pg_enable_capture(self.pg_interfaces)
1406         self.pg_start()
1407         capture = self.pg1.get_capture(len(pkts))
1408         self.verify_capture_out(capture, ignore_port=True)
1409         self.logger.debug(self.vapi.cli("show trace"))
1410
1411         # out2in
1412         pkts = self.create_stream_out(self.pg1, ttl=2)
1413         self.pg1.add_stream(pkts)
1414         self.pg_enable_capture(self.pg_interfaces)
1415         self.pg_start()
1416         capture = self.pg0.get_capture(len(pkts))
1417         self.verify_capture_in(capture, self.pg0)
1418         self.logger.debug(self.vapi.cli("show trace"))
1419
1420         # in2out
1421         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1422         capture = self.send_and_expect_some(self.pg0, pkts, self.pg0)
1423         for p in capture:
1424             self.assertIn(ICMP, p)
1425             self.assertEqual(p[ICMP].type, 11)  # 11 == time-exceeded
1426
1427     def test_static_with_port_out2(self):
1428         """NAT44ED 1:1 NAPT asymmetrical rule"""
1429
1430         external_port = 80
1431         local_port = 8080
1432
1433         self.vapi.nat44_forwarding_enable_disable(enable=1)
1434         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1435         self.nat_add_static_mapping(
1436             self.pg0.remote_ip4,
1437             self.nat_addr,
1438             local_port,
1439             external_port,
1440             proto=IP_PROTOS.tcp,
1441             flags=flags,
1442         )
1443
1444         self.nat_add_inside_interface(self.pg0)
1445         self.nat_add_outside_interface(self.pg1)
1446
1447         # from client to service
1448         p = (
1449             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1450             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1451             / TCP(sport=12345, dport=external_port)
1452         )
1453         self.pg1.add_stream(p)
1454         self.pg_enable_capture(self.pg_interfaces)
1455         self.pg_start()
1456         capture = self.pg0.get_capture(1)
1457         p = capture[0]
1458         try:
1459             ip = p[IP]
1460             tcp = p[TCP]
1461             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1462             self.assertEqual(tcp.dport, local_port)
1463             self.assert_packet_checksums_valid(p)
1464         except:
1465             self.logger.error(ppp("Unexpected or invalid packet:", p))
1466             raise
1467
1468         # ICMP error
1469         p = (
1470             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1471             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1472             / ICMP(type=11)
1473             / capture[0][IP]
1474         )
1475         self.pg0.add_stream(p)
1476         self.pg_enable_capture(self.pg_interfaces)
1477         self.pg_start()
1478         capture = self.pg1.get_capture(1)
1479         p = capture[0]
1480         try:
1481             self.assertEqual(p[IP].src, self.nat_addr)
1482             inner = p[IPerror]
1483             self.assertEqual(inner.dst, self.nat_addr)
1484             self.assertEqual(inner[TCPerror].dport, external_port)
1485         except:
1486             self.logger.error(ppp("Unexpected or invalid packet:", p))
1487             raise
1488
1489         # from service back to client
1490         p = (
1491             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1492             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1493             / TCP(sport=local_port, dport=12345)
1494         )
1495         self.pg0.add_stream(p)
1496         self.pg_enable_capture(self.pg_interfaces)
1497         self.pg_start()
1498         capture = self.pg1.get_capture(1)
1499         p = capture[0]
1500         try:
1501             ip = p[IP]
1502             tcp = p[TCP]
1503             self.assertEqual(ip.src, self.nat_addr)
1504             self.assertEqual(tcp.sport, external_port)
1505             self.assert_packet_checksums_valid(p)
1506         except:
1507             self.logger.error(ppp("Unexpected or invalid packet:", p))
1508             raise
1509
1510         # ICMP error
1511         p = (
1512             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1513             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1514             / ICMP(type=11)
1515             / capture[0][IP]
1516         )
1517         self.pg1.add_stream(p)
1518         self.pg_enable_capture(self.pg_interfaces)
1519         self.pg_start()
1520         capture = self.pg0.get_capture(1)
1521         p = capture[0]
1522         try:
1523             self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1524             inner = p[IPerror]
1525             self.assertEqual(inner.src, self.pg0.remote_ip4)
1526             self.assertEqual(inner[TCPerror].sport, local_port)
1527         except:
1528             self.logger.error(ppp("Unexpected or invalid packet:", p))
1529             raise
1530
1531         # from client to server (no translation)
1532         p = (
1533             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1534             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
1535             / TCP(sport=12346, dport=local_port)
1536         )
1537         self.pg1.add_stream(p)
1538         self.pg_enable_capture(self.pg_interfaces)
1539         self.pg_start()
1540         capture = self.pg0.get_capture(1)
1541         p = capture[0]
1542         try:
1543             ip = p[IP]
1544             tcp = p[TCP]
1545             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1546             self.assertEqual(tcp.dport, local_port)
1547             self.assert_packet_checksums_valid(p)
1548         except:
1549             self.logger.error(ppp("Unexpected or invalid packet:", p))
1550             raise
1551
1552         # from service back to client (no translation)
1553         p = (
1554             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1555             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1556             / TCP(sport=local_port, dport=12346)
1557         )
1558         self.pg0.add_stream(p)
1559         self.pg_enable_capture(self.pg_interfaces)
1560         self.pg_start()
1561         capture = self.pg1.get_capture(1)
1562         p = capture[0]
1563         try:
1564             ip = p[IP]
1565             tcp = p[TCP]
1566             self.assertEqual(ip.src, self.pg0.remote_ip4)
1567             self.assertEqual(tcp.sport, local_port)
1568             self.assert_packet_checksums_valid(p)
1569         except:
1570             self.logger.error(ppp("Unexpected or invalid packet:", p))
1571             raise
1572
1573     def test_static_lb(self):
1574         """NAT44ED local service load balancing"""
1575         external_addr_n = self.nat_addr
1576         external_port = 80
1577         local_port = 8080
1578         server1 = self.pg0.remote_hosts[0]
1579         server2 = self.pg0.remote_hosts[1]
1580
1581         locals = [
1582             {"addr": server1.ip4, "port": local_port, "probability": 70, "vrf_id": 0},
1583             {"addr": server2.ip4, "port": local_port, "probability": 30, "vrf_id": 0},
1584         ]
1585
1586         self.nat_add_address(self.nat_addr)
1587         self.vapi.nat44_add_del_lb_static_mapping(
1588             is_add=1,
1589             external_addr=external_addr_n,
1590             external_port=external_port,
1591             protocol=IP_PROTOS.tcp,
1592             local_num=len(locals),
1593             locals=locals,
1594         )
1595         flags = self.config_flags.NAT_IS_INSIDE
1596         self.vapi.nat44_interface_add_del_feature(
1597             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1598         )
1599         self.vapi.nat44_interface_add_del_feature(
1600             sw_if_index=self.pg1.sw_if_index, is_add=1
1601         )
1602
1603         # from client to service
1604         p = (
1605             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1606             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1607             / TCP(sport=12345, dport=external_port)
1608         )
1609         self.pg1.add_stream(p)
1610         self.pg_enable_capture(self.pg_interfaces)
1611         self.pg_start()
1612         capture = self.pg0.get_capture(1)
1613         p = capture[0]
1614         server = None
1615         try:
1616             ip = p[IP]
1617             tcp = p[TCP]
1618             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1619             if ip.dst == server1.ip4:
1620                 server = server1
1621             else:
1622                 server = server2
1623             self.assertEqual(tcp.dport, local_port)
1624             self.assert_packet_checksums_valid(p)
1625         except:
1626             self.logger.error(ppp("Unexpected or invalid packet:", p))
1627             raise
1628
1629         # from service back to client
1630         p = (
1631             Ether(src=server.mac, dst=self.pg0.local_mac)
1632             / IP(src=server.ip4, dst=self.pg1.remote_ip4)
1633             / TCP(sport=local_port, dport=12345)
1634         )
1635         self.pg0.add_stream(p)
1636         self.pg_enable_capture(self.pg_interfaces)
1637         self.pg_start()
1638         capture = self.pg1.get_capture(1)
1639         p = capture[0]
1640         try:
1641             ip = p[IP]
1642             tcp = p[TCP]
1643             self.assertEqual(ip.src, self.nat_addr)
1644             self.assertEqual(tcp.sport, external_port)
1645             self.assert_packet_checksums_valid(p)
1646         except:
1647             self.logger.error(ppp("Unexpected or invalid packet:", p))
1648             raise
1649
1650         sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
1651         self.assertEqual(len(sessions), 1)
1652         self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1653         self.vapi.nat44_del_session(
1654             address=sessions[0].inside_ip_address,
1655             port=sessions[0].inside_port,
1656             protocol=sessions[0].protocol,
1657             flags=(
1658                 self.config_flags.NAT_IS_INSIDE
1659                 | self.config_flags.NAT_IS_EXT_HOST_VALID
1660             ),
1661             ext_host_address=sessions[0].ext_host_address,
1662             ext_host_port=sessions[0].ext_host_port,
1663         )
1664         sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
1665         self.assertEqual(len(sessions), 0)
1666
1667     def test_static_lb_2(self):
1668         """NAT44ED local service load balancing (asymmetrical rule)"""
1669         external_addr = self.nat_addr
1670         external_port = 80
1671         local_port = 8080
1672         server1 = self.pg0.remote_hosts[0]
1673         server2 = self.pg0.remote_hosts[1]
1674
1675         locals = [
1676             {"addr": server1.ip4, "port": local_port, "probability": 70, "vrf_id": 0},
1677             {"addr": server2.ip4, "port": local_port, "probability": 30, "vrf_id": 0},
1678         ]
1679
1680         self.vapi.nat44_forwarding_enable_disable(enable=1)
1681         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1682         self.vapi.nat44_add_del_lb_static_mapping(
1683             is_add=1,
1684             flags=flags,
1685             external_addr=external_addr,
1686             external_port=external_port,
1687             protocol=IP_PROTOS.tcp,
1688             local_num=len(locals),
1689             locals=locals,
1690         )
1691         flags = self.config_flags.NAT_IS_INSIDE
1692         self.vapi.nat44_interface_add_del_feature(
1693             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1694         )
1695         self.vapi.nat44_interface_add_del_feature(
1696             sw_if_index=self.pg1.sw_if_index, is_add=1
1697         )
1698
1699         # from client to service
1700         p = (
1701             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1702             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1703             / TCP(sport=12345, dport=external_port)
1704         )
1705         self.pg1.add_stream(p)
1706         self.pg_enable_capture(self.pg_interfaces)
1707         self.pg_start()
1708         capture = self.pg0.get_capture(1)
1709         p = capture[0]
1710         server = None
1711         try:
1712             ip = p[IP]
1713             tcp = p[TCP]
1714             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1715             if ip.dst == server1.ip4:
1716                 server = server1
1717             else:
1718                 server = server2
1719             self.assertEqual(tcp.dport, local_port)
1720             self.assert_packet_checksums_valid(p)
1721         except:
1722             self.logger.error(ppp("Unexpected or invalid packet:", p))
1723             raise
1724
1725         # from service back to client
1726         p = (
1727             Ether(src=server.mac, dst=self.pg0.local_mac)
1728             / IP(src=server.ip4, dst=self.pg1.remote_ip4)
1729             / TCP(sport=local_port, dport=12345)
1730         )
1731         self.pg0.add_stream(p)
1732         self.pg_enable_capture(self.pg_interfaces)
1733         self.pg_start()
1734         capture = self.pg1.get_capture(1)
1735         p = capture[0]
1736         try:
1737             ip = p[IP]
1738             tcp = p[TCP]
1739             self.assertEqual(ip.src, self.nat_addr)
1740             self.assertEqual(tcp.sport, external_port)
1741             self.assert_packet_checksums_valid(p)
1742         except:
1743             self.logger.error(ppp("Unexpected or invalid packet:", p))
1744             raise
1745
1746         # from client to server (no translation)
1747         p = (
1748             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1749             / IP(src=self.pg1.remote_ip4, dst=server1.ip4)
1750             / TCP(sport=12346, dport=local_port)
1751         )
1752         self.pg1.add_stream(p)
1753         self.pg_enable_capture(self.pg_interfaces)
1754         self.pg_start()
1755         capture = self.pg0.get_capture(1)
1756         p = capture[0]
1757         server = None
1758         try:
1759             ip = p[IP]
1760             tcp = p[TCP]
1761             self.assertEqual(ip.dst, server1.ip4)
1762             self.assertEqual(tcp.dport, local_port)
1763             self.assert_packet_checksums_valid(p)
1764         except:
1765             self.logger.error(ppp("Unexpected or invalid packet:", p))
1766             raise
1767
1768         # from service back to client (no translation)
1769         p = (
1770             Ether(src=server1.mac, dst=self.pg0.local_mac)
1771             / IP(src=server1.ip4, dst=self.pg1.remote_ip4)
1772             / TCP(sport=local_port, dport=12346)
1773         )
1774         self.pg0.add_stream(p)
1775         self.pg_enable_capture(self.pg_interfaces)
1776         self.pg_start()
1777         capture = self.pg1.get_capture(1)
1778         p = capture[0]
1779         try:
1780             ip = p[IP]
1781             tcp = p[TCP]
1782             self.assertEqual(ip.src, server1.ip4)
1783             self.assertEqual(tcp.sport, local_port)
1784             self.assert_packet_checksums_valid(p)
1785         except:
1786             self.logger.error(ppp("Unexpected or invalid packet:", p))
1787             raise
1788
1789     def test_lb_affinity(self):
1790         """NAT44ED local service load balancing affinity"""
1791         external_addr = self.nat_addr
1792         external_port = 80
1793         local_port = 8080
1794         server1 = self.pg0.remote_hosts[0]
1795         server2 = self.pg0.remote_hosts[1]
1796
1797         locals = [
1798             {"addr": server1.ip4, "port": local_port, "probability": 50, "vrf_id": 0},
1799             {"addr": server2.ip4, "port": local_port, "probability": 50, "vrf_id": 0},
1800         ]
1801
1802         self.nat_add_address(self.nat_addr)
1803         self.vapi.nat44_add_del_lb_static_mapping(
1804             is_add=1,
1805             external_addr=external_addr,
1806             external_port=external_port,
1807             protocol=IP_PROTOS.tcp,
1808             affinity=10800,
1809             local_num=len(locals),
1810             locals=locals,
1811         )
1812         flags = self.config_flags.NAT_IS_INSIDE
1813         self.vapi.nat44_interface_add_del_feature(
1814             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1815         )
1816         self.vapi.nat44_interface_add_del_feature(
1817             sw_if_index=self.pg1.sw_if_index, is_add=1
1818         )
1819
1820         p = (
1821             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1822             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1823             / TCP(sport=1025, dport=external_port)
1824         )
1825         self.pg1.add_stream(p)
1826         self.pg_enable_capture(self.pg_interfaces)
1827         self.pg_start()
1828         capture = self.pg0.get_capture(1)
1829         backend = capture[0][IP].dst
1830
1831         sessions = self.vapi.nat44_user_session_dump(backend, 0)
1832         self.assertEqual(len(sessions), 1)
1833         self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1834         self.vapi.nat44_del_session(
1835             address=sessions[0].inside_ip_address,
1836             port=sessions[0].inside_port,
1837             protocol=sessions[0].protocol,
1838             flags=(
1839                 self.config_flags.NAT_IS_INSIDE
1840                 | self.config_flags.NAT_IS_EXT_HOST_VALID
1841             ),
1842             ext_host_address=sessions[0].ext_host_address,
1843             ext_host_port=sessions[0].ext_host_port,
1844         )
1845
1846         pkts = []
1847         for port in range(1030, 1100):
1848             p = (
1849                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1850                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1851                 / TCP(sport=port, dport=external_port)
1852             )
1853             pkts.append(p)
1854         self.pg1.add_stream(pkts)
1855         self.pg_enable_capture(self.pg_interfaces)
1856         self.pg_start()
1857         capture = self.pg0.get_capture(len(pkts))
1858         for p in capture:
1859             self.assertEqual(p[IP].dst, backend)
1860
1861     def test_multiple_vrf_1(self):
1862         """Multiple VRF - both client & service in VRF1"""
1863
1864         external_addr = "1.2.3.4"
1865         external_port = 80
1866         local_port = 8080
1867         port = 0
1868
1869         flags = self.config_flags.NAT_IS_INSIDE
1870         self.vapi.nat44_interface_add_del_feature(
1871             sw_if_index=self.pg5.sw_if_index, is_add=1
1872         )
1873         self.vapi.nat44_interface_add_del_feature(
1874             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
1875         )
1876         self.vapi.nat44_interface_add_del_feature(
1877             sw_if_index=self.pg6.sw_if_index, is_add=1
1878         )
1879         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1880         self.nat_add_static_mapping(
1881             self.pg5.remote_ip4,
1882             external_addr,
1883             local_port,
1884             external_port,
1885             vrf_id=1,
1886             proto=IP_PROTOS.tcp,
1887             flags=flags,
1888         )
1889
1890         p = (
1891             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
1892             / IP(src=self.pg6.remote_ip4, dst=external_addr)
1893             / TCP(sport=12345, dport=external_port)
1894         )
1895         self.pg6.add_stream(p)
1896         self.pg_enable_capture(self.pg_interfaces)
1897         self.pg_start()
1898         capture = self.pg5.get_capture(1)
1899         p = capture[0]
1900         try:
1901             ip = p[IP]
1902             tcp = p[TCP]
1903             self.assertEqual(ip.dst, self.pg5.remote_ip4)
1904             self.assertEqual(tcp.dport, local_port)
1905             self.assert_packet_checksums_valid(p)
1906         except:
1907             self.logger.error(ppp("Unexpected or invalid packet:", p))
1908             raise
1909
1910         p = (
1911             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
1912             / IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4)
1913             / TCP(sport=local_port, dport=12345)
1914         )
1915         self.pg5.add_stream(p)
1916         self.pg_enable_capture(self.pg_interfaces)
1917         self.pg_start()
1918         capture = self.pg6.get_capture(1)
1919         p = capture[0]
1920         try:
1921             ip = p[IP]
1922             tcp = p[TCP]
1923             self.assertEqual(ip.src, external_addr)
1924             self.assertEqual(tcp.sport, external_port)
1925             self.assert_packet_checksums_valid(p)
1926         except:
1927             self.logger.error(ppp("Unexpected or invalid packet:", p))
1928             raise
1929
1930     def test_multiple_vrf_2(self):
1931         """Multiple VRF - dynamic NAT from VRF1 to VRF0 (output-feature)"""
1932
1933         external_addr = "1.2.3.4"
1934         external_port = 80
1935         local_port = 8080
1936         port = 0
1937
1938         self.nat_add_address(self.nat_addr)
1939         flags = self.config_flags.NAT_IS_INSIDE
1940         self.vapi.nat44_ed_add_del_output_interface(
1941             sw_if_index=self.pg1.sw_if_index, is_add=1
1942         )
1943         self.vapi.nat44_interface_add_del_feature(
1944             sw_if_index=self.pg5.sw_if_index, is_add=1
1945         )
1946         self.vapi.nat44_interface_add_del_feature(
1947             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
1948         )
1949         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1950         self.nat_add_static_mapping(
1951             self.pg5.remote_ip4,
1952             external_addr,
1953             local_port,
1954             external_port,
1955             vrf_id=1,
1956             proto=IP_PROTOS.tcp,
1957             flags=flags,
1958         )
1959
1960         p = (
1961             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
1962             / IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4)
1963             / TCP(sport=2345, dport=22)
1964         )
1965         self.pg5.add_stream(p)
1966         self.pg_enable_capture(self.pg_interfaces)
1967         self.pg_start()
1968         capture = self.pg1.get_capture(1)
1969         p = capture[0]
1970         try:
1971             ip = p[IP]
1972             tcp = p[TCP]
1973             self.assertEqual(ip.src, self.nat_addr)
1974             self.assert_packet_checksums_valid(p)
1975             port = tcp.sport
1976         except:
1977             self.logger.error(ppp("Unexpected or invalid packet:", p))
1978             raise
1979
1980         p = (
1981             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1982             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1983             / TCP(sport=22, dport=port)
1984         )
1985         self.pg1.add_stream(p)
1986         self.pg_enable_capture(self.pg_interfaces)
1987         self.pg_start()
1988         capture = self.pg5.get_capture(1)
1989         p = capture[0]
1990         try:
1991             ip = p[IP]
1992             tcp = p[TCP]
1993             self.assertEqual(ip.dst, self.pg5.remote_ip4)
1994             self.assertEqual(tcp.dport, 2345)
1995             self.assert_packet_checksums_valid(p)
1996         except:
1997             self.logger.error(ppp("Unexpected or invalid packet:", p))
1998             raise
1999
2000     def test_multiple_vrf_3(self):
2001         """Multiple VRF - client in VRF1, service in VRF0"""
2002
2003         external_addr = "1.2.3.4"
2004         external_port = 80
2005         local_port = 8080
2006         port = 0
2007
2008         flags = self.config_flags.NAT_IS_INSIDE
2009         self.vapi.nat44_interface_add_del_feature(
2010             sw_if_index=self.pg0.sw_if_index, is_add=1
2011         )
2012         self.vapi.nat44_interface_add_del_feature(
2013             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2014         )
2015         self.vapi.nat44_interface_add_del_feature(
2016             sw_if_index=self.pg6.sw_if_index, is_add=1
2017         )
2018         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2019         self.nat_add_static_mapping(
2020             self.pg0.remote_ip4,
2021             external_sw_if_index=self.pg0.sw_if_index,
2022             local_port=local_port,
2023             vrf_id=0,
2024             external_port=external_port,
2025             proto=IP_PROTOS.tcp,
2026             flags=flags,
2027         )
2028
2029         # from client VRF1 to service VRF0
2030         p = (
2031             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
2032             / IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4)
2033             / TCP(sport=12346, dport=external_port)
2034         )
2035         self.pg6.add_stream(p)
2036         self.pg_enable_capture(self.pg_interfaces)
2037         self.pg_start()
2038         capture = self.pg0.get_capture(1)
2039         p = capture[0]
2040         try:
2041             ip = p[IP]
2042             tcp = p[TCP]
2043             self.assertEqual(ip.dst, self.pg0.remote_ip4)
2044             self.assertEqual(tcp.dport, local_port)
2045             self.assert_packet_checksums_valid(p)
2046         except:
2047             self.logger.error(ppp("Unexpected or invalid packet:", p))
2048             raise
2049
2050         # from service VRF0 back to client VRF1
2051         p = (
2052             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2053             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2054             / TCP(sport=local_port, dport=12346)
2055         )
2056         self.pg0.add_stream(p)
2057         self.pg_enable_capture(self.pg_interfaces)
2058         self.pg_start()
2059         capture = self.pg6.get_capture(1)
2060         p = capture[0]
2061         try:
2062             ip = p[IP]
2063             tcp = p[TCP]
2064             self.assertEqual(ip.src, self.pg0.local_ip4)
2065             self.assertEqual(tcp.sport, external_port)
2066             self.assert_packet_checksums_valid(p)
2067         except:
2068             self.logger.error(ppp("Unexpected or invalid packet:", p))
2069             raise
2070
2071     def test_multiple_vrf_4(self):
2072         """Multiple VRF - client in VRF0, service in VRF1"""
2073
2074         external_addr = "1.2.3.4"
2075         external_port = 80
2076         local_port = 8080
2077         port = 0
2078
2079         flags = self.config_flags.NAT_IS_INSIDE
2080         self.vapi.nat44_interface_add_del_feature(
2081             sw_if_index=self.pg0.sw_if_index, is_add=1
2082         )
2083         self.vapi.nat44_interface_add_del_feature(
2084             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2085         )
2086         self.vapi.nat44_interface_add_del_feature(
2087             sw_if_index=self.pg5.sw_if_index, is_add=1
2088         )
2089         self.vapi.nat44_interface_add_del_feature(
2090             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
2091         )
2092         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2093         self.nat_add_static_mapping(
2094             self.pg5.remote_ip4,
2095             external_addr,
2096             local_port,
2097             external_port,
2098             vrf_id=1,
2099             proto=IP_PROTOS.tcp,
2100             flags=flags,
2101         )
2102
2103         # from client VRF0 to service VRF1
2104         p = (
2105             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2106             / IP(src=self.pg0.remote_ip4, dst=external_addr)
2107             / TCP(sport=12347, dport=external_port)
2108         )
2109         self.pg0.add_stream(p)
2110         self.pg_enable_capture(self.pg_interfaces)
2111         self.pg_start()
2112         capture = self.pg5.get_capture(1)
2113         p = capture[0]
2114         try:
2115             ip = p[IP]
2116             tcp = p[TCP]
2117             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2118             self.assertEqual(tcp.dport, local_port)
2119             self.assert_packet_checksums_valid(p)
2120         except:
2121             self.logger.error(ppp("Unexpected or invalid packet:", p))
2122             raise
2123
2124         # from service VRF1 back to client VRF0
2125         p = (
2126             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2127             / IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4)
2128             / TCP(sport=local_port, dport=12347)
2129         )
2130         self.pg5.add_stream(p)
2131         self.pg_enable_capture(self.pg_interfaces)
2132         self.pg_start()
2133         capture = self.pg0.get_capture(1)
2134         p = capture[0]
2135         try:
2136             ip = p[IP]
2137             tcp = p[TCP]
2138             self.assertEqual(ip.src, external_addr)
2139             self.assertEqual(tcp.sport, external_port)
2140             self.assert_packet_checksums_valid(p)
2141         except:
2142             self.logger.error(ppp("Unexpected or invalid packet:", p))
2143             raise
2144
2145     def test_multiple_vrf_5(self):
2146         """Multiple VRF - forwarding - no translation"""
2147
2148         external_addr = "1.2.3.4"
2149         external_port = 80
2150         local_port = 8080
2151         port = 0
2152
2153         self.vapi.nat44_forwarding_enable_disable(enable=1)
2154         flags = self.config_flags.NAT_IS_INSIDE
2155         self.vapi.nat44_interface_add_del_feature(
2156             sw_if_index=self.pg0.sw_if_index, is_add=1
2157         )
2158         self.vapi.nat44_interface_add_del_feature(
2159             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2160         )
2161         self.vapi.nat44_interface_add_del_feature(
2162             sw_if_index=self.pg5.sw_if_index, is_add=1
2163         )
2164         self.vapi.nat44_interface_add_del_feature(
2165             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
2166         )
2167         self.vapi.nat44_interface_add_del_feature(
2168             sw_if_index=self.pg6.sw_if_index, is_add=1
2169         )
2170         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2171         self.nat_add_static_mapping(
2172             self.pg5.remote_ip4,
2173             external_addr,
2174             local_port,
2175             external_port,
2176             vrf_id=1,
2177             proto=IP_PROTOS.tcp,
2178             flags=flags,
2179         )
2180         self.nat_add_static_mapping(
2181             self.pg0.remote_ip4,
2182             external_sw_if_index=self.pg0.sw_if_index,
2183             local_port=local_port,
2184             vrf_id=0,
2185             external_port=external_port,
2186             proto=IP_PROTOS.tcp,
2187             flags=flags,
2188         )
2189
2190         # from client to server (both VRF1, no translation)
2191         p = (
2192             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
2193             / IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4)
2194             / TCP(sport=12348, dport=local_port)
2195         )
2196         self.pg6.add_stream(p)
2197         self.pg_enable_capture(self.pg_interfaces)
2198         self.pg_start()
2199         capture = self.pg5.get_capture(1)
2200         p = capture[0]
2201         try:
2202             ip = p[IP]
2203             tcp = p[TCP]
2204             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2205             self.assertEqual(tcp.dport, local_port)
2206             self.assert_packet_checksums_valid(p)
2207         except:
2208             self.logger.error(ppp("Unexpected or invalid packet:", p))
2209             raise
2210
2211         # from server back to client (both VRF1, no translation)
2212         p = (
2213             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2214             / IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4)
2215             / TCP(sport=local_port, dport=12348)
2216         )
2217         self.pg5.add_stream(p)
2218         self.pg_enable_capture(self.pg_interfaces)
2219         self.pg_start()
2220         capture = self.pg6.get_capture(1)
2221         p = capture[0]
2222         try:
2223             ip = p[IP]
2224             tcp = p[TCP]
2225             self.assertEqual(ip.src, self.pg5.remote_ip4)
2226             self.assertEqual(tcp.sport, local_port)
2227             self.assert_packet_checksums_valid(p)
2228         except:
2229             self.logger.error(ppp("Unexpected or invalid packet:", p))
2230             raise
2231
2232         # from client VRF1 to server VRF0 (no translation)
2233         p = (
2234             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2235             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2236             / TCP(sport=local_port, dport=12349)
2237         )
2238         self.pg0.add_stream(p)
2239         self.pg_enable_capture(self.pg_interfaces)
2240         self.pg_start()
2241         capture = self.pg6.get_capture(1)
2242         p = capture[0]
2243         try:
2244             ip = p[IP]
2245             tcp = p[TCP]
2246             self.assertEqual(ip.src, self.pg0.remote_ip4)
2247             self.assertEqual(tcp.sport, local_port)
2248             self.assert_packet_checksums_valid(p)
2249         except:
2250             self.logger.error(ppp("Unexpected or invalid packet:", p))
2251             raise
2252
2253         # from server VRF0 back to client VRF1 (no translation)
2254         p = (
2255             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2256             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2257             / TCP(sport=local_port, dport=12349)
2258         )
2259         self.pg0.add_stream(p)
2260         self.pg_enable_capture(self.pg_interfaces)
2261         self.pg_start()
2262         capture = self.pg6.get_capture(1)
2263         p = capture[0]
2264         try:
2265             ip = p[IP]
2266             tcp = p[TCP]
2267             self.assertEqual(ip.src, self.pg0.remote_ip4)
2268             self.assertEqual(tcp.sport, local_port)
2269             self.assert_packet_checksums_valid(p)
2270         except:
2271             self.logger.error(ppp("Unexpected or invalid packet:", p))
2272             raise
2273
2274         # from client VRF0 to server VRF1 (no translation)
2275         p = (
2276             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2277             / IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4)
2278             / TCP(sport=12344, dport=local_port)
2279         )
2280         self.pg0.add_stream(p)
2281         self.pg_enable_capture(self.pg_interfaces)
2282         self.pg_start()
2283         capture = self.pg5.get_capture(1)
2284         p = capture[0]
2285         try:
2286             ip = p[IP]
2287             tcp = p[TCP]
2288             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2289             self.assertEqual(tcp.dport, local_port)
2290             self.assert_packet_checksums_valid(p)
2291         except:
2292             self.logger.error(ppp("Unexpected or invalid packet:", p))
2293             raise
2294
2295         # from server VRF1 back to client VRF0 (no translation)
2296         p = (
2297             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2298             / IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4)
2299             / TCP(sport=local_port, dport=12344)
2300         )
2301         self.pg5.add_stream(p)
2302         self.pg_enable_capture(self.pg_interfaces)
2303         self.pg_start()
2304         capture = self.pg0.get_capture(1)
2305         p = capture[0]
2306         try:
2307             ip = p[IP]
2308             tcp = p[TCP]
2309             self.assertEqual(ip.src, self.pg5.remote_ip4)
2310             self.assertEqual(tcp.sport, local_port)
2311             self.assert_packet_checksums_valid(p)
2312         except:
2313             self.logger.error(ppp("Unexpected or invalid packet:", p))
2314             raise
2315
2316     def test_outside_address_distribution(self):
2317         """NAT44ED outside address distribution based on source address"""
2318
2319         addresses = 65
2320         x = 100
2321
2322         nat_addresses = []
2323         nat_distribution = {}
2324         for i in range(1, addresses):
2325             a = "10.0.0.%d" % i
2326             nat_addresses.append(a)
2327             nat_distribution[a] = set()
2328
2329         self.nat_add_inside_interface(self.pg0)
2330         self.nat_add_outside_interface(self.pg1)
2331
2332         self.vapi.nat44_add_del_address_range(
2333             first_ip_address=nat_addresses[0],
2334             last_ip_address=nat_addresses[-1],
2335             vrf_id=0xFFFFFFFF,
2336             is_add=1,
2337             flags=0,
2338         )
2339
2340         self.pg0.generate_remote_hosts(x)
2341
2342         pkts = []
2343         for i in range(x):
2344             info = self.create_packet_info(self.pg0, self.pg1)
2345             payload = self.info_to_payload(info)
2346             p = (
2347                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2348                 / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4)
2349                 / UDP(sport=7000 + i, dport=8000 + i)
2350                 / Raw(payload)
2351             )
2352             info.data = p
2353             pkts.append(p)
2354
2355         self.pg0.add_stream(pkts)
2356         self.pg_enable_capture(self.pg_interfaces)
2357         self.pg_start()
2358         recvd = self.pg1.get_capture(len(pkts))
2359         for p_recvd in recvd:
2360             payload_info = self.payload_to_info(p_recvd[Raw])
2361             packet_index = payload_info.index
2362             info = self._packet_infos[packet_index]
2363             self.assertTrue(info is not None)
2364             self.assertEqual(packet_index, info.index)
2365             p_sent = info.data
2366             self.assertIn(p_recvd[IP].src, nat_distribution)
2367             nat_distribution[p_recvd[IP].src].add(p_sent[IP].src)
2368
2369         var = variance(map(len, nat_distribution.values()), x / addresses)
2370         self.assertLess(var, 0.33, msg="Bad outside address distribution")
2371
2372     def test_dynamic_edge_ports(self):
2373         """NAT44ED dynamic translation test: edge ports"""
2374
2375         worker_count = self.vpp_worker_count or 1
2376         port_offset = 1024
2377         port_per_thread = (65536 - port_offset) // worker_count
2378         port_count = port_per_thread * worker_count
2379
2380         # worker thread edge ports
2381         thread_edge_ports = {0, port_offset - 1, 65535}
2382         for i in range(0, worker_count):
2383             port_thread_offset = (port_per_thread * i) + port_offset
2384             for port_range_offset in [0, port_per_thread - 1]:
2385                 port = port_thread_offset + port_range_offset
2386                 thread_edge_ports.add(port)
2387         thread_drop_ports = set(
2388             filter(
2389                 lambda x: x not in range(port_offset, port_offset + port_count),
2390                 thread_edge_ports,
2391             )
2392         )
2393
2394         in_if = self.pg7
2395         out_if = self.pg8
2396
2397         self.nat_add_address(self.nat_addr)
2398
2399         try:
2400             self.configure_ip4_interface(in_if, hosts=worker_count)
2401             self.configure_ip4_interface(out_if)
2402
2403             self.nat_add_inside_interface(in_if)
2404             self.nat_add_outside_interface(out_if)
2405
2406             # in2out
2407             tc1 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2408             uc1 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2409             ic1 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2410             dc1 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2411
2412             pkt_count = worker_count * len(thread_edge_ports)
2413
2414             i2o_pkts = [[] for x in range(0, worker_count)]
2415             for i in range(0, worker_count):
2416                 remote_host = in_if.remote_hosts[i]
2417                 for port in thread_edge_ports:
2418                     p = (
2419                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2420                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2421                         / TCP(sport=port, dport=port)
2422                     )
2423                     i2o_pkts[i].append(p)
2424
2425                     p = (
2426                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2427                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2428                         / UDP(sport=port, dport=port)
2429                     )
2430                     i2o_pkts[i].append(p)
2431
2432                     p = (
2433                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2434                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2435                         / ICMP(id=port, seq=port, type="echo-request")
2436                     )
2437                     i2o_pkts[i].append(p)
2438
2439             for i in range(0, worker_count):
2440                 if len(i2o_pkts[i]) > 0:
2441                     in_if.add_stream(i2o_pkts[i], worker=i)
2442
2443             self.pg_enable_capture(self.pg_interfaces)
2444             self.pg_start()
2445             capture = out_if.get_capture(pkt_count * 3)
2446             for packet in capture:
2447                 self.assert_packet_checksums_valid(packet)
2448                 if packet.haslayer(TCP):
2449                     self.assert_in_range(
2450                         packet[TCP].sport,
2451                         port_offset,
2452                         port_offset + port_count,
2453                         "src TCP port",
2454                     )
2455                 elif packet.haslayer(UDP):
2456                     self.assert_in_range(
2457                         packet[UDP].sport,
2458                         port_offset,
2459                         port_offset + port_count,
2460                         "src UDP port",
2461                     )
2462                 elif packet.haslayer(ICMP):
2463                     self.assert_in_range(
2464                         packet[ICMP].id,
2465                         port_offset,
2466                         port_offset + port_count,
2467                         "ICMP id",
2468                     )
2469                 else:
2470                     self.fail(
2471                         ppp("Unexpected or invalid packet (outside network):", packet)
2472                     )
2473
2474             if_idx = in_if.sw_if_index
2475             tc2 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2476             uc2 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2477             ic2 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2478             dc2 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2479
2480             self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2481             self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2482             self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2483             self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2484
2485             # out2in
2486             tc1 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2487             uc1 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2488             ic1 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2489             dc1 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2490             dc3 = self.statistics["/nat44-ed/out2in/slowpath/drops"]
2491
2492             # replies to unchanged thread ports should pass on each worker,
2493             # excluding packets outside dynamic port range
2494             drop_count = worker_count * len(thread_drop_ports)
2495             pass_count = worker_count * len(thread_edge_ports) - drop_count
2496
2497             o2i_pkts = [[] for x in range(0, worker_count)]
2498             for i in range(0, worker_count):
2499                 for port in thread_edge_ports:
2500                     p = (
2501                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2502                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2503                         / TCP(sport=port, dport=port)
2504                     )
2505                     o2i_pkts[i].append(p)
2506
2507                     p = (
2508                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2509                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2510                         / UDP(sport=port, dport=port)
2511                     )
2512                     o2i_pkts[i].append(p)
2513
2514                     p = (
2515                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2516                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2517                         / ICMP(id=port, seq=port, type="echo-reply")
2518                     )
2519                     o2i_pkts[i].append(p)
2520
2521             for i in range(0, worker_count):
2522                 if len(o2i_pkts[i]) > 0:
2523                     out_if.add_stream(o2i_pkts[i], worker=i)
2524
2525             self.pg_enable_capture(self.pg_interfaces)
2526             self.pg_start()
2527             capture = in_if.get_capture(pass_count * 3)
2528             for packet in capture:
2529                 self.assert_packet_checksums_valid(packet)
2530                 if packet.haslayer(TCP):
2531                     self.assertIn(packet[TCP].dport, thread_edge_ports, "dst TCP port")
2532                     self.assertEqual(packet[TCP].dport, packet[TCP].sport, "TCP ports")
2533                 elif packet.haslayer(UDP):
2534                     self.assertIn(packet[UDP].dport, thread_edge_ports, "dst UDP port")
2535                     self.assertEqual(packet[UDP].dport, packet[UDP].sport, "UDP ports")
2536                 elif packet.haslayer(ICMP):
2537                     self.assertIn(packet[ICMP].id, thread_edge_ports, "ICMP id")
2538                     self.assertEqual(packet[ICMP].id, packet[ICMP].seq, "ICMP id & seq")
2539                 else:
2540                     self.fail(
2541                         ppp("Unexpected or invalid packet (inside network):", packet)
2542                     )
2543
2544             if_idx = out_if.sw_if_index
2545             tc2 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2546             uc2 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2547             ic2 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2548             dc2 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2549             dc4 = self.statistics["/nat44-ed/out2in/slowpath/drops"]
2550
2551             self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pass_count)
2552             self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pass_count)
2553             self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pass_count)
2554             self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2555             self.assertEqual(
2556                 dc4[:, if_idx].sum() - dc3[:, if_idx].sum(), drop_count * 3
2557             )
2558
2559         finally:
2560             in_if.unconfig()
2561             out_if.unconfig()
2562
2563     def test_delete_interface(self):
2564         """NAT44ED delete nat interface"""
2565
2566         self.nat_add_address(self.nat_addr)
2567
2568         interfaces = self.create_loopback_interfaces(4)
2569         self.nat_add_outside_interface(interfaces[0])
2570         self.nat_add_inside_interface(interfaces[1])
2571         self.nat_add_outside_interface(interfaces[2])
2572         self.nat_add_inside_interface(interfaces[2])
2573         self.vapi.nat44_ed_add_del_output_interface(
2574             sw_if_index=interfaces[3].sw_if_index, is_add=1
2575         )
2576
2577         nat_sw_if_indices = [
2578             i.sw_if_index
2579             for i in self.vapi.nat44_interface_dump()
2580             + list(self.vapi.vpp.details_iter(self.vapi.nat44_ed_output_interface_get))
2581         ]
2582         self.assertEqual(len(nat_sw_if_indices), len(interfaces))
2583
2584         loopbacks = []
2585         for i in interfaces:
2586             # delete nat-enabled interface
2587             self.assertIn(i.sw_if_index, nat_sw_if_indices)
2588             i.remove_vpp_config()
2589
2590             # create interface with the same index
2591             lo = VppLoInterface(self)
2592             loopbacks.append(lo)
2593             self.assertEqual(lo.sw_if_index, i.sw_if_index)
2594
2595             # check interface is not nat-enabled
2596             nat_sw_if_indices = [
2597                 i.sw_if_index
2598                 for i in self.vapi.nat44_interface_dump()
2599                 + list(
2600                     self.vapi.vpp.details_iter(self.vapi.nat44_ed_output_interface_get)
2601                 )
2602             ]
2603             self.assertNotIn(lo.sw_if_index, nat_sw_if_indices)
2604
2605         for i in loopbacks:
2606             i.remove_vpp_config()
2607
2608
2609 @tag_fixme_ubuntu2204
2610 class TestNAT44EDMW(TestNAT44ED):
2611     """NAT44ED MW Test Case"""
2612
2613     vpp_worker_count = 4
2614     max_sessions = 5000
2615
2616     def test_dynamic(self):
2617         """NAT44ED dynamic translation test"""
2618         pkt_count = 1500
2619         tcp_port_offset = 20
2620         udp_port_offset = 20
2621         icmp_id_offset = 20
2622
2623         self.nat_add_address(self.nat_addr)
2624         self.nat_add_inside_interface(self.pg0)
2625         self.nat_add_outside_interface(self.pg1)
2626
2627         # in2out
2628         tc1 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2629         uc1 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2630         ic1 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2631         dc1 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2632
2633         i2o_pkts = [[] for x in range(0, self.vpp_worker_count)]
2634
2635         for i in range(pkt_count):
2636             p = (
2637                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2638                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2639                 / TCP(sport=tcp_port_offset + i, dport=20)
2640             )
2641             i2o_pkts[p[TCP].sport % self.vpp_worker_count].append(p)
2642
2643             p = (
2644                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2645                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2646                 / UDP(sport=udp_port_offset + i, dport=20)
2647             )
2648             i2o_pkts[p[UDP].sport % self.vpp_worker_count].append(p)
2649
2650             p = (
2651                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2652                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2653                 / ICMP(id=icmp_id_offset + i, type="echo-request")
2654             )
2655             i2o_pkts[p[ICMP].id % self.vpp_worker_count].append(p)
2656
2657         for i in range(0, self.vpp_worker_count):
2658             if len(i2o_pkts[i]) > 0:
2659                 self.pg0.add_stream(i2o_pkts[i], worker=i)
2660
2661         self.pg_enable_capture(self.pg_interfaces)
2662         self.pg_start()
2663         capture = self.pg1.get_capture(pkt_count * 3, timeout=5)
2664
2665         if_idx = self.pg0.sw_if_index
2666         tc2 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2667         uc2 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2668         ic2 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2669         dc2 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2670
2671         self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2672         self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2673         self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2674         self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2675
2676         self.logger.info(self.vapi.cli("show trace"))
2677
2678         # out2in
2679         tc1 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2680         uc1 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2681         ic1 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2682         dc1 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2683
2684         recvd_tcp_ports = set()
2685         recvd_udp_ports = set()
2686         recvd_icmp_ids = set()
2687
2688         for p in capture:
2689             if TCP in p:
2690                 recvd_tcp_ports.add(p[TCP].sport)
2691             if UDP in p:
2692                 recvd_udp_ports.add(p[UDP].sport)
2693             if ICMP in p:
2694                 recvd_icmp_ids.add(p[ICMP].id)
2695
2696         recvd_tcp_ports = list(recvd_tcp_ports)
2697         recvd_udp_ports = list(recvd_udp_ports)
2698         recvd_icmp_ids = list(recvd_icmp_ids)
2699
2700         o2i_pkts = [[] for x in range(0, self.vpp_worker_count)]
2701         for i in range(pkt_count):
2702             p = (
2703                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2704                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2705                 / TCP(dport=choice(recvd_tcp_ports), sport=20)
2706             )
2707             o2i_pkts[p[TCP].dport % self.vpp_worker_count].append(p)
2708
2709             p = (
2710                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2711                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2712                 / UDP(dport=choice(recvd_udp_ports), sport=20)
2713             )
2714             o2i_pkts[p[UDP].dport % self.vpp_worker_count].append(p)
2715
2716             p = (
2717                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2718                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2719                 / ICMP(id=choice(recvd_icmp_ids), type="echo-reply")
2720             )
2721             o2i_pkts[p[ICMP].id % self.vpp_worker_count].append(p)
2722
2723         for i in range(0, self.vpp_worker_count):
2724             if len(o2i_pkts[i]) > 0:
2725                 self.pg1.add_stream(o2i_pkts[i], worker=i)
2726
2727         self.pg_enable_capture(self.pg_interfaces)
2728         self.pg_start()
2729         capture = self.pg0.get_capture(pkt_count * 3)
2730         for packet in capture:
2731             try:
2732                 self.assert_packet_checksums_valid(packet)
2733                 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2734                 if packet.haslayer(TCP):
2735                     self.assert_in_range(
2736                         packet[TCP].dport,
2737                         tcp_port_offset,
2738                         tcp_port_offset + pkt_count,
2739                         "dst TCP port",
2740                     )
2741                 elif packet.haslayer(UDP):
2742                     self.assert_in_range(
2743                         packet[UDP].dport,
2744                         udp_port_offset,
2745                         udp_port_offset + pkt_count,
2746                         "dst UDP port",
2747                     )
2748                 else:
2749                     self.assert_in_range(
2750                         packet[ICMP].id,
2751                         icmp_id_offset,
2752                         icmp_id_offset + pkt_count,
2753                         "ICMP id",
2754                     )
2755             except:
2756                 self.logger.error(
2757                     ppp("Unexpected or invalid packet (inside network):", packet)
2758                 )
2759                 raise
2760
2761         if_idx = self.pg1.sw_if_index
2762         tc2 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2763         uc2 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2764         ic2 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2765         dc2 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2766
2767         self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2768         self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2769         self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2770         self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2771
2772         sc = self.statistics["/nat44-ed/total-sessions"]
2773         self.assertEqual(
2774             sc[:, 0].sum(),
2775             len(recvd_tcp_ports) + len(recvd_udp_ports) + len(recvd_icmp_ids),
2776         )
2777
2778     def test_frag_in_order(self):
2779         """NAT44ED translate fragments arriving in order"""
2780
2781         self.nat_add_address(self.nat_addr)
2782         self.nat_add_inside_interface(self.pg0)
2783         self.nat_add_outside_interface(self.pg1)
2784
2785         self.frag_in_order(proto=IP_PROTOS.tcp, ignore_port=True)
2786         self.frag_in_order(proto=IP_PROTOS.udp, ignore_port=True)
2787         self.frag_in_order(proto=IP_PROTOS.icmp, ignore_port=True)
2788
2789     def test_frag_in_order_do_not_translate(self):
2790         """NAT44ED don't translate fragments arriving in order"""
2791
2792         self.nat_add_address(self.nat_addr)
2793         self.nat_add_inside_interface(self.pg0)
2794         self.nat_add_outside_interface(self.pg1)
2795         self.vapi.nat44_forwarding_enable_disable(enable=True)
2796
2797         self.frag_in_order(proto=IP_PROTOS.tcp, dont_translate=True)
2798
2799     def test_frag_out_of_order(self):
2800         """NAT44ED translate fragments arriving out of order"""
2801
2802         self.nat_add_address(self.nat_addr)
2803         self.nat_add_inside_interface(self.pg0)
2804         self.nat_add_outside_interface(self.pg1)
2805
2806         self.frag_out_of_order(proto=IP_PROTOS.tcp, ignore_port=True)
2807         self.frag_out_of_order(proto=IP_PROTOS.udp, ignore_port=True)
2808         self.frag_out_of_order(proto=IP_PROTOS.icmp, ignore_port=True)
2809
2810     def test_frag_in_order_in_plus_out(self):
2811         """NAT44ED in+out interface fragments in order"""
2812
2813         in_port = self.random_port()
2814         out_port = self.random_port()
2815
2816         self.nat_add_address(self.nat_addr)
2817         self.nat_add_inside_interface(self.pg0)
2818         self.nat_add_outside_interface(self.pg0)
2819         self.nat_add_inside_interface(self.pg1)
2820         self.nat_add_outside_interface(self.pg1)
2821
2822         # add static mappings for server
2823         self.nat_add_static_mapping(
2824             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.tcp
2825         )
2826         self.nat_add_static_mapping(
2827             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.udp
2828         )
2829         self.nat_add_static_mapping(
2830             self.server_addr, self.nat_addr, proto=IP_PROTOS.icmp
2831         )
2832
2833         # run tests for each protocol
2834         self.frag_in_order_in_plus_out(
2835             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.tcp
2836         )
2837         self.frag_in_order_in_plus_out(
2838             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.udp
2839         )
2840         self.frag_in_order_in_plus_out(
2841             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.icmp
2842         )
2843
2844     def test_frag_out_of_order_in_plus_out(self):
2845         """NAT44ED in+out interface fragments out of order"""
2846
2847         in_port = self.random_port()
2848         out_port = self.random_port()
2849
2850         self.nat_add_address(self.nat_addr)
2851         self.nat_add_inside_interface(self.pg0)
2852         self.nat_add_outside_interface(self.pg0)
2853         self.nat_add_inside_interface(self.pg1)
2854         self.nat_add_outside_interface(self.pg1)
2855
2856         # add static mappings for server
2857         self.nat_add_static_mapping(
2858             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.tcp
2859         )
2860         self.nat_add_static_mapping(
2861             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.udp
2862         )
2863         self.nat_add_static_mapping(
2864             self.server_addr, self.nat_addr, proto=IP_PROTOS.icmp
2865         )
2866
2867         # run tests for each protocol
2868         self.frag_out_of_order_in_plus_out(
2869             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.tcp
2870         )
2871         self.frag_out_of_order_in_plus_out(
2872             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.udp
2873         )
2874         self.frag_out_of_order_in_plus_out(
2875             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.icmp
2876         )
2877
2878     def test_reass_hairpinning(self):
2879         """NAT44ED fragments hairpinning"""
2880
2881         server_addr = self.pg0.remote_hosts[1].ip4
2882
2883         host_in_port = self.random_port()
2884         server_in_port = self.random_port()
2885         server_out_port = self.random_port()
2886
2887         self.nat_add_address(self.nat_addr)
2888         self.nat_add_inside_interface(self.pg0)
2889         self.nat_add_outside_interface(self.pg1)
2890
2891         # add static mapping for server
2892         self.nat_add_static_mapping(
2893             server_addr,
2894             self.nat_addr,
2895             server_in_port,
2896             server_out_port,
2897             proto=IP_PROTOS.tcp,
2898         )
2899         self.nat_add_static_mapping(
2900             server_addr,
2901             self.nat_addr,
2902             server_in_port,
2903             server_out_port,
2904             proto=IP_PROTOS.udp,
2905         )
2906         self.nat_add_static_mapping(server_addr, self.nat_addr)
2907
2908         self.reass_hairpinning(
2909             server_addr,
2910             server_in_port,
2911             server_out_port,
2912             host_in_port,
2913             proto=IP_PROTOS.tcp,
2914             ignore_port=True,
2915         )
2916         self.reass_hairpinning(
2917             server_addr,
2918             server_in_port,
2919             server_out_port,
2920             host_in_port,
2921             proto=IP_PROTOS.udp,
2922             ignore_port=True,
2923         )
2924         self.reass_hairpinning(
2925             server_addr,
2926             server_in_port,
2927             server_out_port,
2928             host_in_port,
2929             proto=IP_PROTOS.icmp,
2930             ignore_port=True,
2931         )
2932
2933     def test_session_limit_per_vrf(self):
2934         """NAT44ED per vrf session limit"""
2935
2936         inside = self.pg0
2937         inside_vrf10 = self.pg2
2938         outside = self.pg1
2939
2940         limit = 5
2941
2942         # 2 interfaces pg0, pg1 (vrf10, limit 1 tcp session)
2943         # non existing vrf_id makes process core dump
2944         self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=10)
2945
2946         self.nat_add_inside_interface(inside)
2947         self.nat_add_inside_interface(inside_vrf10)
2948         self.nat_add_outside_interface(outside)
2949
2950         # vrf independent
2951         self.nat_add_interface_address(outside)
2952
2953         # BUG: causing core dump - when bad vrf_id is specified
2954         # self.nat_add_address(outside.local_ip4, vrf_id=20)
2955
2956         stream = self.create_tcp_stream(inside_vrf10, outside, limit * 2)
2957         inside_vrf10.add_stream(stream)
2958
2959         self.pg_enable_capture(self.pg_interfaces)
2960         self.pg_start()
2961
2962         capture = outside.get_capture(limit)
2963
2964         stream = self.create_tcp_stream(inside, outside, limit * 2)
2965         inside.add_stream(stream)
2966
2967         self.pg_enable_capture(self.pg_interfaces)
2968         self.pg_start()
2969
2970         capture = outside.get_capture(len(stream))
2971
2972     def test_show_max_translations(self):
2973         """NAT44ED API test - max translations per thread"""
2974         config = self.vapi.nat44_show_running_config()
2975         self.assertEqual(self.max_sessions, config.sessions)
2976
2977     def test_lru_cleanup(self):
2978         """NAT44ED LRU cleanup algorithm"""
2979
2980         self.nat_add_address(self.nat_addr)
2981         self.nat_add_inside_interface(self.pg0)
2982         self.nat_add_outside_interface(self.pg1)
2983
2984         self.vapi.nat_set_timeouts(
2985             udp=1, tcp_established=7440, tcp_transitory=30, icmp=1
2986         )
2987
2988         tcp_port_out = self.init_tcp_session(self.pg0, self.pg1, 2000, 80)
2989         pkts = []
2990         for i in range(0, self.max_sessions - 1):
2991             p = (
2992                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2993                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
2994                 / UDP(sport=7000 + i, dport=80)
2995             )
2996             pkts.append(p)
2997
2998         self.pg0.add_stream(pkts)
2999         self.pg_enable_capture(self.pg_interfaces)
3000         self.pg_start()
3001         self.pg1.get_capture(len(pkts))
3002         self.virtual_sleep(1.5, "wait for timeouts")
3003
3004         pkts = []
3005         for i in range(0, self.max_sessions - 1):
3006             p = (
3007                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3008                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
3009                 / ICMP(id=8000 + i, type="echo-request")
3010             )
3011             pkts.append(p)
3012
3013         self.pg0.add_stream(pkts)
3014         self.pg_enable_capture(self.pg_interfaces)
3015         self.pg_start()
3016         self.pg1.get_capture(len(pkts))
3017
3018     def test_session_rst_timeout(self):
3019         """NAT44ED session RST timeouts"""
3020
3021         self.nat_add_address(self.nat_addr)
3022         self.nat_add_inside_interface(self.pg0)
3023         self.nat_add_outside_interface(self.pg1)
3024
3025         self.vapi.nat_set_timeouts(
3026             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
3027         )
3028
3029         self.init_tcp_session(
3030             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
3031         )
3032         p = (
3033             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3034             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3035             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="R")
3036         )
3037         self.send_and_expect(self.pg0, p, self.pg1)
3038
3039         self.virtual_sleep(6)
3040
3041         # The session is already closed
3042         p = (
3043             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3044             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3045             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
3046         )
3047         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
3048
3049         # The session can be re-opened
3050         p = (
3051             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3052             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3053             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="S")
3054         )
3055         self.send_and_expect(self.pg0, p, self.pg1)
3056
3057     def test_session_rst_established_timeout(self):
3058         """NAT44ED session RST timeouts"""
3059
3060         self.nat_add_address(self.nat_addr)
3061         self.nat_add_inside_interface(self.pg0)
3062         self.nat_add_outside_interface(self.pg1)
3063
3064         self.vapi.nat_set_timeouts(
3065             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
3066         )
3067
3068         self.init_tcp_session(
3069             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
3070         )
3071
3072         # Wait at least the transitory time, the session is in established
3073         # state anyway. RST followed by a data packet should move it to
3074         # transitory state.
3075         self.virtual_sleep(6)
3076         p = (
3077             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3078             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3079             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="R")
3080         )
3081         self.send_and_expect(self.pg0, p, self.pg1)
3082
3083         p = (
3084             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3085             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3086             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
3087         )
3088         self.send_and_expect(self.pg0, p, self.pg1)
3089
3090         # State is transitory, session should be closed after 6 seconds
3091         self.virtual_sleep(6)
3092
3093         p = (
3094             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3095             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3096             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
3097         )
3098         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
3099
3100     def test_dynamic_out_of_ports(self):
3101         """NAT44ED dynamic translation test: out of ports"""
3102
3103         self.nat_add_inside_interface(self.pg0)
3104         self.nat_add_outside_interface(self.pg1)
3105
3106         # in2out and no NAT addresses added
3107         pkts = self.create_stream_in(self.pg0, self.pg1)
3108
3109         self.send_and_assert_no_replies(
3110             self.pg0,
3111             pkts,
3112             msg="i2o pkts",
3113             stats_diff=self.no_diff
3114             | {
3115                 "err": {
3116                     "/err/nat44-ed-in2out-slowpath/out of ports": len(pkts),
3117                 },
3118                 self.pg0.sw_if_index: {
3119                     "/nat44-ed/in2out/slowpath/drops": len(pkts),
3120                 },
3121             },
3122         )
3123
3124         # in2out after NAT addresses added
3125         self.nat_add_address(self.nat_addr)
3126
3127         tcpn, udpn, icmpn = (
3128             sum(x) for x in zip(*((TCP in p, UDP in p, ICMP in p) for p in pkts))
3129         )
3130
3131         self.send_and_expect(
3132             self.pg0,
3133             pkts,
3134             self.pg1,
3135             msg="i2o pkts",
3136             stats_diff=self.no_diff
3137             | {
3138                 "err": {
3139                     "/err/nat44-ed-in2out-slowpath/out of ports": 0,
3140                 },
3141                 self.pg0.sw_if_index: {
3142                     "/nat44-ed/in2out/slowpath/drops": 0,
3143                     "/nat44-ed/in2out/slowpath/tcp": tcpn,
3144                     "/nat44-ed/in2out/slowpath/udp": udpn,
3145                     "/nat44-ed/in2out/slowpath/icmp": icmpn,
3146                 },
3147             },
3148         )
3149
3150     def test_unknown_proto(self):
3151         """NAT44ED translate packet with unknown protocol"""
3152
3153         self.nat_add_address(self.nat_addr)
3154         self.nat_add_inside_interface(self.pg0)
3155         self.nat_add_outside_interface(self.pg1)
3156
3157         # in2out
3158         p = (
3159             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3160             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3161             / TCP(sport=self.tcp_port_in, dport=20)
3162         )
3163         self.pg0.add_stream(p)
3164         self.pg_enable_capture(self.pg_interfaces)
3165         self.pg_start()
3166         p = self.pg1.get_capture(1)
3167
3168         p = (
3169             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3170             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3171             / GRE()
3172             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3173             / TCP(sport=1234, dport=1234)
3174         )
3175         self.pg0.add_stream(p)
3176         self.pg_enable_capture(self.pg_interfaces)
3177         self.pg_start()
3178         p = self.pg1.get_capture(1)
3179         packet = p[0]
3180         try:
3181             self.assertEqual(packet[IP].src, self.nat_addr)
3182             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3183             self.assertEqual(packet.haslayer(GRE), 1)
3184             self.assert_packet_checksums_valid(packet)
3185         except:
3186             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3187             raise
3188
3189         # out2in
3190         p = (
3191             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3192             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3193             / GRE()
3194             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3195             / TCP(sport=1234, dport=1234)
3196         )
3197         self.pg1.add_stream(p)
3198         self.pg_enable_capture(self.pg_interfaces)
3199         self.pg_start()
3200         p = self.pg0.get_capture(1)
3201         packet = p[0]
3202         try:
3203             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3204             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3205             self.assertEqual(packet.haslayer(GRE), 1)
3206             self.assert_packet_checksums_valid(packet)
3207         except:
3208             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3209             raise
3210
3211     def test_hairpinning_unknown_proto(self):
3212         """NAT44ED translate packet with unknown protocol - hairpinning"""
3213         host = self.pg0.remote_hosts[0]
3214         server = self.pg0.remote_hosts[1]
3215         host_in_port = 1234
3216         server_out_port = 8765
3217         server_nat_ip = "10.0.0.11"
3218
3219         self.nat_add_address(self.nat_addr)
3220         self.nat_add_inside_interface(self.pg0)
3221         self.nat_add_outside_interface(self.pg1)
3222
3223         # add static mapping for server
3224         self.nat_add_static_mapping(server.ip4, server_nat_ip)
3225
3226         # host to server
3227         p = (
3228             Ether(src=host.mac, dst=self.pg0.local_mac)
3229             / IP(src=host.ip4, dst=server_nat_ip)
3230             / TCP(sport=host_in_port, dport=server_out_port)
3231         )
3232         self.pg0.add_stream(p)
3233         self.pg_enable_capture(self.pg_interfaces)
3234         self.pg_start()
3235         self.pg0.get_capture(1)
3236
3237         p = (
3238             Ether(dst=self.pg0.local_mac, src=host.mac)
3239             / IP(src=host.ip4, dst=server_nat_ip)
3240             / GRE()
3241             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3242             / TCP(sport=1234, dport=1234)
3243         )
3244         self.pg0.add_stream(p)
3245         self.pg_enable_capture(self.pg_interfaces)
3246         self.pg_start()
3247         p = self.pg0.get_capture(1)
3248         packet = p[0]
3249         try:
3250             self.assertEqual(packet[IP].src, self.nat_addr)
3251             self.assertEqual(packet[IP].dst, server.ip4)
3252             self.assertEqual(packet.haslayer(GRE), 1)
3253             self.assert_packet_checksums_valid(packet)
3254         except:
3255             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3256             raise
3257
3258         # server to host
3259         p = (
3260             Ether(dst=self.pg0.local_mac, src=server.mac)
3261             / IP(src=server.ip4, dst=self.nat_addr)
3262             / GRE()
3263             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3264             / TCP(sport=1234, dport=1234)
3265         )
3266         self.pg0.add_stream(p)
3267         self.pg_enable_capture(self.pg_interfaces)
3268         self.pg_start()
3269         p = self.pg0.get_capture(1)
3270         packet = p[0]
3271         try:
3272             self.assertEqual(packet[IP].src, server_nat_ip)
3273             self.assertEqual(packet[IP].dst, host.ip4)
3274             self.assertEqual(packet.haslayer(GRE), 1)
3275             self.assert_packet_checksums_valid(packet)
3276         except:
3277             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3278             raise
3279
3280     def test_output_feature_and_service(self):
3281         """NAT44ED interface output feature and services"""
3282         external_addr = "1.2.3.4"
3283         external_port = 80
3284         local_port = 8080
3285
3286         self.vapi.nat44_forwarding_enable_disable(enable=1)
3287         self.nat_add_address(self.nat_addr)
3288         flags = self.config_flags.NAT_IS_ADDR_ONLY
3289         self.vapi.nat44_add_del_identity_mapping(
3290             ip_address=self.pg1.remote_ip4,
3291             sw_if_index=0xFFFFFFFF,
3292             flags=flags,
3293             is_add=1,
3294         )
3295         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
3296         self.nat_add_static_mapping(
3297             self.pg0.remote_ip4,
3298             external_addr,
3299             local_port,
3300             external_port,
3301             proto=IP_PROTOS.tcp,
3302             flags=flags,
3303         )
3304
3305         self.nat_add_inside_interface(self.pg0)
3306         self.nat_add_outside_interface(self.pg0)
3307         self.vapi.nat44_ed_add_del_output_interface(
3308             sw_if_index=self.pg1.sw_if_index, is_add=1
3309         )
3310
3311         # from client to service
3312         p = (
3313             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3314             / IP(src=self.pg1.remote_ip4, dst=external_addr)
3315             / TCP(sport=12345, dport=external_port)
3316         )
3317         self.pg1.add_stream(p)
3318         self.pg_enable_capture(self.pg_interfaces)
3319         self.pg_start()
3320         capture = self.pg0.get_capture(1)
3321         p = capture[0]
3322         try:
3323             ip = p[IP]
3324             tcp = p[TCP]
3325             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3326             self.assertEqual(tcp.dport, local_port)
3327             self.assert_packet_checksums_valid(p)
3328         except:
3329             self.logger.error(ppp("Unexpected or invalid packet:", p))
3330             raise
3331
3332         # from service back to client
3333         p = (
3334             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3335             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3336             / TCP(sport=local_port, dport=12345)
3337         )
3338         self.pg0.add_stream(p)
3339         self.pg_enable_capture(self.pg_interfaces)
3340         self.pg_start()
3341         capture = self.pg1.get_capture(1)
3342         p = capture[0]
3343         try:
3344             ip = p[IP]
3345             tcp = p[TCP]
3346             self.assertEqual(ip.src, external_addr)
3347             self.assertEqual(tcp.sport, external_port)
3348             self.assert_packet_checksums_valid(p)
3349         except:
3350             self.logger.error(ppp("Unexpected or invalid packet:", p))
3351             raise
3352
3353         # from local network host to external network
3354         pkts = self.create_stream_in(self.pg0, self.pg1)
3355         self.pg0.add_stream(pkts)
3356         self.pg_enable_capture(self.pg_interfaces)
3357         self.pg_start()
3358         capture = self.pg1.get_capture(len(pkts))
3359         self.verify_capture_out(capture, ignore_port=True)
3360         pkts = self.create_stream_in(self.pg0, self.pg1)
3361         self.pg0.add_stream(pkts)
3362         self.pg_enable_capture(self.pg_interfaces)
3363         self.pg_start()
3364         capture = self.pg1.get_capture(len(pkts))
3365         self.verify_capture_out(capture, ignore_port=True)
3366
3367         # from external network back to local network host
3368         pkts = self.create_stream_out(self.pg1)
3369         self.pg1.add_stream(pkts)
3370         self.pg_enable_capture(self.pg_interfaces)
3371         self.pg_start()
3372         capture = self.pg0.get_capture(len(pkts))
3373         self.verify_capture_in(capture, self.pg0)
3374
3375     def test_output_feature_and_service3(self):
3376         """NAT44ED interface output feature and DST NAT"""
3377         external_addr = "1.2.3.4"
3378         external_port = 80
3379         local_port = 8080
3380
3381         self.vapi.nat44_forwarding_enable_disable(enable=1)
3382         self.nat_add_address(self.nat_addr)
3383         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
3384         self.nat_add_static_mapping(
3385             self.pg1.remote_ip4,
3386             external_addr,
3387             local_port,
3388             external_port,
3389             proto=IP_PROTOS.tcp,
3390             flags=flags,
3391         )
3392
3393         self.nat_add_inside_interface(self.pg0)
3394         self.nat_add_outside_interface(self.pg0)
3395         self.vapi.nat44_ed_add_del_output_interface(
3396             sw_if_index=self.pg1.sw_if_index, is_add=1
3397         )
3398
3399         p = (
3400             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3401             / IP(src=self.pg0.remote_ip4, dst=external_addr)
3402             / TCP(sport=12345, dport=external_port)
3403         )
3404         self.pg0.add_stream(p)
3405         self.pg_enable_capture(self.pg_interfaces)
3406         self.pg_start()
3407         capture = self.pg1.get_capture(1)
3408         p = capture[0]
3409         try:
3410             ip = p[IP]
3411             tcp = p[TCP]
3412             self.assertEqual(ip.src, self.pg0.remote_ip4)
3413             self.assertEqual(tcp.sport, 12345)
3414             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3415             self.assertEqual(tcp.dport, local_port)
3416             self.assert_packet_checksums_valid(p)
3417         except:
3418             self.logger.error(ppp("Unexpected or invalid packet:", p))
3419             raise
3420
3421         p = (
3422             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3423             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
3424             / TCP(sport=local_port, dport=12345)
3425         )
3426         self.pg1.add_stream(p)
3427         self.pg_enable_capture(self.pg_interfaces)
3428         self.pg_start()
3429         capture = self.pg0.get_capture(1)
3430         p = capture[0]
3431         try:
3432             ip = p[IP]
3433             tcp = p[TCP]
3434             self.assertEqual(ip.src, external_addr)
3435             self.assertEqual(tcp.sport, external_port)
3436             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3437             self.assertEqual(tcp.dport, 12345)
3438             self.assert_packet_checksums_valid(p)
3439         except:
3440             self.logger.error(ppp("Unexpected or invalid packet:", p))
3441             raise
3442
3443     def test_self_twice_nat_lb_negative(self):
3444         """NAT44ED Self Twice NAT local service load balancing (negative test)"""
3445         self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, client_id=2)
3446
3447     def test_self_twice_nat_negative(self):
3448         """NAT44ED Self Twice NAT (negative test)"""
3449         self.twice_nat_common(self_twice_nat=True)
3450
3451     def test_static_lb_multi_clients(self):
3452         """NAT44ED local service load balancing - multiple clients"""
3453
3454         external_addr = self.nat_addr
3455         external_port = 80
3456         local_port = 8080
3457         server1 = self.pg0.remote_hosts[0]
3458         server2 = self.pg0.remote_hosts[1]
3459         server3 = self.pg0.remote_hosts[2]
3460
3461         locals = [
3462             {"addr": server1.ip4, "port": local_port, "probability": 90, "vrf_id": 0},
3463             {"addr": server2.ip4, "port": local_port, "probability": 10, "vrf_id": 0},
3464         ]
3465
3466         flags = self.config_flags.NAT_IS_INSIDE
3467         self.vapi.nat44_interface_add_del_feature(
3468             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3469         )
3470         self.vapi.nat44_interface_add_del_feature(
3471             sw_if_index=self.pg1.sw_if_index, is_add=1
3472         )
3473
3474         self.nat_add_address(self.nat_addr)
3475         self.vapi.nat44_add_del_lb_static_mapping(
3476             is_add=1,
3477             external_addr=external_addr,
3478             external_port=external_port,
3479             protocol=IP_PROTOS.tcp,
3480             local_num=len(locals),
3481             locals=locals,
3482         )
3483
3484         server1_n = 0
3485         server2_n = 0
3486         clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3487         pkts = []
3488         for client in clients:
3489             p = (
3490                 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3491                 / IP(src=client, dst=self.nat_addr)
3492                 / TCP(sport=12345, dport=external_port)
3493             )
3494             pkts.append(p)
3495         self.pg1.add_stream(pkts)
3496         self.pg_enable_capture(self.pg_interfaces)
3497         self.pg_start()
3498         capture = self.pg0.get_capture(len(pkts))
3499         for p in capture:
3500             if p[IP].dst == server1.ip4:
3501                 server1_n += 1
3502             else:
3503                 server2_n += 1
3504         self.assertGreaterEqual(server1_n, server2_n)
3505
3506         local = {
3507             "addr": server3.ip4,
3508             "port": local_port,
3509             "probability": 20,
3510             "vrf_id": 0,
3511         }
3512
3513         # add new back-end
3514         self.vapi.nat44_lb_static_mapping_add_del_local(
3515             is_add=1,
3516             external_addr=external_addr,
3517             external_port=external_port,
3518             local=local,
3519             protocol=IP_PROTOS.tcp,
3520         )
3521         server1_n = 0
3522         server2_n = 0
3523         server3_n = 0
3524         clients = ip4_range(self.pg1.remote_ip4, 60, 110)
3525         pkts = []
3526         for client in clients:
3527             p = (
3528                 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3529                 / IP(src=client, dst=self.nat_addr)
3530                 / TCP(sport=12346, dport=external_port)
3531             )
3532             pkts.append(p)
3533         self.assertGreater(len(pkts), 0)
3534         self.pg1.add_stream(pkts)
3535         self.pg_enable_capture(self.pg_interfaces)
3536         self.pg_start()
3537         capture = self.pg0.get_capture(len(pkts))
3538         for p in capture:
3539             if p[IP].dst == server1.ip4:
3540                 server1_n += 1
3541             elif p[IP].dst == server2.ip4:
3542                 server2_n += 1
3543             else:
3544                 server3_n += 1
3545         self.assertGreater(server1_n, 0)
3546         self.assertGreater(server2_n, 0)
3547         self.assertGreater(server3_n, 0)
3548
3549         local = {
3550             "addr": server2.ip4,
3551             "port": local_port,
3552             "probability": 10,
3553             "vrf_id": 0,
3554         }
3555
3556         # remove one back-end
3557         self.vapi.nat44_lb_static_mapping_add_del_local(
3558             is_add=0,
3559             external_addr=external_addr,
3560             external_port=external_port,
3561             local=local,
3562             protocol=IP_PROTOS.tcp,
3563         )
3564         server1_n = 0
3565         server2_n = 0
3566         server3_n = 0
3567         self.pg1.add_stream(pkts)
3568         self.pg_enable_capture(self.pg_interfaces)
3569         self.pg_start()
3570         capture = self.pg0.get_capture(len(pkts))
3571         for p in capture:
3572             if p[IP].dst == server1.ip4:
3573                 server1_n += 1
3574             elif p[IP].dst == server2.ip4:
3575                 server2_n += 1
3576             else:
3577                 server3_n += 1
3578         self.assertGreater(server1_n, 0)
3579         self.assertEqual(server2_n, 0)
3580         self.assertGreater(server3_n, 0)
3581
3582     # put zzz in front of syslog test name so that it runs as a last test
3583     # setting syslog sender cannot be undone and if it is set, it messes
3584     # with self.send_and_assert_no_replies functionality
3585     def test_zzz_syslog_sess(self):
3586         """NAT44ED Test syslog session creation and deletion"""
3587         self.vapi.syslog_set_filter(self.syslog_severity.SYSLOG_API_SEVERITY_INFO)
3588         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
3589
3590         self.nat_add_address(self.nat_addr)
3591         self.nat_add_inside_interface(self.pg0)
3592         self.nat_add_outside_interface(self.pg1)
3593
3594         p = (
3595             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3596             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3597             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
3598         )
3599         self.pg0.add_stream(p)
3600         self.pg_enable_capture(self.pg_interfaces)
3601         self.pg_start()
3602         capture = self.pg1.get_capture(1)
3603         self.tcp_port_out = capture[0][TCP].sport
3604         capture = self.pg3.get_capture(1)
3605         self.verify_syslog_sess(capture[0][Raw].load, "SADD")
3606
3607         self.pg_enable_capture(self.pg_interfaces)
3608         self.pg_start()
3609         self.nat_add_address(self.nat_addr, is_add=0)
3610         capture = self.pg3.get_capture(1)
3611         self.verify_syslog_sess(capture[0][Raw].load, "SDEL")
3612
3613     # put zzz in front of syslog test name so that it runs as a last test
3614     # setting syslog sender cannot be undone and if it is set, it messes
3615     # with self.send_and_assert_no_replies functionality
3616     def test_zzz_syslog_sess_reopen(self):
3617         """Syslog events for session reopen"""
3618         self.vapi.syslog_set_filter(self.syslog_severity.SYSLOG_API_SEVERITY_INFO)
3619         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
3620
3621         self.nat_add_address(self.nat_addr)
3622         self.nat_add_inside_interface(self.pg0)
3623         self.nat_add_outside_interface(self.pg1)
3624
3625         # SYN in2out
3626         p = (
3627             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3628             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3629             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
3630         )
3631         capture = self.send_and_expect(self.pg0, p, self.pg1)[0]
3632         self.tcp_port_out = capture[0][TCP].sport
3633         capture = self.pg3.get_capture(1)
3634         self.verify_syslog_sess(capture[0][Raw].load, "SADD")
3635
3636         # SYN out2in
3637         p = (
3638             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3639             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3640             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="SA")
3641         )
3642         self.send_and_expect(self.pg1, p, self.pg0)
3643
3644         p = (
3645             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3646             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3647             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")
3648         )
3649         self.send_and_expect(self.pg0, p, self.pg1)
3650
3651         # FIN in2out
3652         p = (
3653             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3654             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3655             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="F")
3656         )
3657         self.send_and_expect(self.pg0, p, self.pg1)
3658
3659         # FIN out2in
3660         p = (
3661             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3662             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3663             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="F")
3664         )
3665         self.send_and_expect(self.pg1, p, self.pg0)
3666
3667         self.init_tcp_session(
3668             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
3669         )
3670
3671         # 2 records should be produced - first one del & add
3672         capture = self.pg3.get_capture(2)
3673         self.verify_syslog_sess(capture[0][Raw].load, "SDEL")
3674         self.verify_syslog_sess(capture[1][Raw].load, "SADD")
3675
3676     def test_twice_nat_interface_addr(self):
3677         """NAT44ED Acquire twice NAT addresses from interface"""
3678         flags = self.config_flags.NAT_IS_TWICE_NAT
3679         self.vapi.nat44_add_del_interface_addr(
3680             sw_if_index=self.pg11.sw_if_index, flags=flags, is_add=1
3681         )
3682
3683         # no address in NAT pool
3684         adresses = self.vapi.nat44_address_dump()
3685         self.assertEqual(0, len(adresses))
3686
3687         # configure interface address and check NAT address pool
3688         self.pg11.config_ip4()
3689         adresses = self.vapi.nat44_address_dump()
3690         self.assertEqual(1, len(adresses))
3691         self.assertEqual(str(adresses[0].ip_address), self.pg11.local_ip4)
3692         self.assertEqual(adresses[0].flags, flags)
3693
3694         # remove interface address and check NAT address pool
3695         self.pg11.unconfig_ip4()
3696         adresses = self.vapi.nat44_address_dump()
3697         self.assertEqual(0, len(adresses))
3698
3699     def test_output_feature_stateful_acl(self):
3700         """NAT44ED output feature works with stateful ACL"""
3701
3702         self.nat_add_address(self.nat_addr)
3703         self.vapi.nat44_ed_add_del_output_interface(
3704             sw_if_index=self.pg1.sw_if_index, is_add=1
3705         )
3706
3707         # First ensure that the NAT is working sans ACL
3708
3709         # send packets out2in, no sessions yet so packets should drop
3710         pkts_out2in = self.create_stream_out(self.pg1)
3711         self.send_and_assert_no_replies(self.pg1, pkts_out2in)
3712
3713         # send packets into inside intf, ensure received via outside intf
3714         pkts_in2out = self.create_stream_in(self.pg0, self.pg1)
3715         capture = self.send_and_expect(
3716             self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)
3717         )
3718         self.verify_capture_out(capture, ignore_port=True)
3719
3720         # send out2in again, with sessions created it should work now
3721         pkts_out2in = self.create_stream_out(self.pg1)
3722         capture = self.send_and_expect(
3723             self.pg1, pkts_out2in, self.pg0, len(pkts_out2in)
3724         )
3725         self.verify_capture_in(capture, self.pg0)
3726
3727         # Create an ACL blocking everything
3728         out2in_deny_rule = AclRule(is_permit=0)
3729         out2in_acl = VppAcl(self, rules=[out2in_deny_rule])
3730         out2in_acl.add_vpp_config()
3731
3732         # create an ACL to permit/reflect everything
3733         in2out_reflect_rule = AclRule(is_permit=2)
3734         in2out_acl = VppAcl(self, rules=[in2out_reflect_rule])
3735         in2out_acl.add_vpp_config()
3736
3737         # apply as input acl on interface and confirm it blocks everything
3738         acl_if = VppAclInterface(
3739             self, sw_if_index=self.pg1.sw_if_index, n_input=1, acls=[out2in_acl]
3740         )
3741         acl_if.add_vpp_config()
3742         self.send_and_assert_no_replies(self.pg1, pkts_out2in)
3743
3744         # apply output acl
3745         acl_if.acls = [out2in_acl, in2out_acl]
3746         acl_if.add_vpp_config()
3747         # send in2out to generate ACL state (NAT state was created earlier)
3748         capture = self.send_and_expect(
3749             self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)
3750         )
3751         self.verify_capture_out(capture, ignore_port=True)
3752
3753         # send out2in again. ACL state exists so it should work now.
3754         # TCP packets with the syn flag set also need the ack flag
3755         for p in pkts_out2in:
3756             if p.haslayer(TCP) and p[TCP].flags & 0x02:
3757                 p[TCP].flags |= 0x10
3758         capture = self.send_and_expect(
3759             self.pg1, pkts_out2in, self.pg0, len(pkts_out2in)
3760         )
3761         self.verify_capture_in(capture, self.pg0)
3762         self.logger.info(self.vapi.cli("show trace"))
3763
3764     def test_tcp_close(self):
3765         """NAT44ED Close TCP session from inside network - output feature"""
3766         config = self.vapi.nat44_show_running_config()
3767         old_timeouts = config.timeouts
3768         new_transitory = 2
3769         self.vapi.nat_set_timeouts(
3770             udp=old_timeouts.udp,
3771             tcp_established=old_timeouts.tcp_established,
3772             icmp=old_timeouts.icmp,
3773             tcp_transitory=new_transitory,
3774         )
3775
3776         self.vapi.nat44_forwarding_enable_disable(enable=1)
3777         self.nat_add_address(self.pg1.local_ip4)
3778         twice_nat_addr = "10.0.1.3"
3779         service_ip = "192.168.16.150"
3780         self.nat_add_address(twice_nat_addr, twice_nat=1)
3781
3782         flags = self.config_flags.NAT_IS_INSIDE
3783         self.vapi.nat44_interface_add_del_feature(
3784             sw_if_index=self.pg0.sw_if_index, is_add=1
3785         )
3786         self.vapi.nat44_interface_add_del_feature(
3787             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3788         )
3789         self.vapi.nat44_ed_add_del_output_interface(
3790             is_add=1, sw_if_index=self.pg1.sw_if_index
3791         )
3792
3793         flags = (
3794             self.config_flags.NAT_IS_OUT2IN_ONLY | self.config_flags.NAT_IS_TWICE_NAT
3795         )
3796         self.nat_add_static_mapping(
3797             self.pg0.remote_ip4, service_ip, 80, 80, proto=IP_PROTOS.tcp, flags=flags
3798         )
3799         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3800         start_sessnum = len(sessions)
3801
3802         # SYN packet out->in
3803         p = (
3804             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3805             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3806             / TCP(sport=33898, dport=80, flags="S")
3807         )
3808         capture = self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3809         p = capture[0]
3810         tcp_port = p[TCP].sport
3811
3812         # SYN + ACK packet in->out
3813         p = (
3814             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3815             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3816             / TCP(sport=80, dport=tcp_port, flags="SA")
3817         )
3818         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3819
3820         # ACK packet out->in
3821         p = (
3822             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3823             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3824             / TCP(sport=33898, dport=80, flags="A")
3825         )
3826         self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3827
3828         # FIN packet in -> out
3829         p = (
3830             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3831             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3832             / TCP(sport=80, dport=tcp_port, flags="FA", seq=100, ack=300)
3833         )
3834         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3835
3836         # FIN+ACK packet out -> in
3837         p = (
3838             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3839             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3840             / TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101)
3841         )
3842         self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3843
3844         # ACK packet in -> out
3845         p = (
3846             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3847             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3848             / TCP(sport=80, dport=tcp_port, flags="A", seq=101, ack=301)
3849         )
3850         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3851
3852         # session now in transitory timeout, but traffic still flows
3853         # try FIN packet out->in
3854         p = (
3855             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3856             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3857             / TCP(sport=33898, dport=80, flags="F")
3858         )
3859         self.pg1.add_stream(p)
3860         self.pg_enable_capture(self.pg_interfaces)
3861         self.pg_start()
3862
3863         self.virtual_sleep(new_transitory, "wait for transitory timeout")
3864         self.pg0.get_capture(1)
3865
3866         # session should still exist
3867         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3868         self.assertEqual(len(sessions) - start_sessnum, 1)
3869
3870         # send FIN+ACK packet out -> in - will cause session to be wiped
3871         # but won't create a new session
3872         p = (
3873             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3874             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3875             / TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101)
3876         )
3877         self.send_and_assert_no_replies(self.pg1, p)
3878         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3879         self.assertEqual(len(sessions) - start_sessnum, 0)
3880
3881     def test_tcp_session_close_in(self):
3882         """NAT44ED Close TCP session from inside network"""
3883
3884         in_port = self.tcp_port_in
3885         out_port = 10505
3886         ext_port = self.tcp_external_port
3887
3888         self.nat_add_address(self.nat_addr)
3889         self.nat_add_inside_interface(self.pg0)
3890         self.nat_add_outside_interface(self.pg1)
3891         self.nat_add_static_mapping(
3892             self.pg0.remote_ip4,
3893             self.nat_addr,
3894             in_port,
3895             out_port,
3896             proto=IP_PROTOS.tcp,
3897             flags=self.config_flags.NAT_IS_TWICE_NAT,
3898         )
3899
3900         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3901         session_n = len(sessions)
3902
3903         self.vapi.nat_set_timeouts(
3904             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
3905         )
3906
3907         self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
3908
3909         # FIN packet in -> out
3910         p = (
3911             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3912             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3913             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
3914         )
3915         self.send_and_expect(self.pg0, p, self.pg1)
3916         pkts = []
3917
3918         # ACK packet out -> in
3919         p = (
3920             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3921             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3922             / TCP(sport=ext_port, dport=out_port, flags="A", seq=300, ack=101)
3923         )
3924         pkts.append(p)
3925
3926         # FIN packet out -> in
3927         p = (
3928             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3929             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3930             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
3931         )
3932         pkts.append(p)
3933
3934         self.send_and_expect(self.pg1, pkts, self.pg0)
3935
3936         # ACK packet in -> out
3937         p = (
3938             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3939             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3940             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3941         )
3942         self.send_and_expect(self.pg0, p, self.pg1)
3943
3944         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3945         self.assertEqual(len(sessions) - session_n, 1)
3946
3947         # retransmit FIN packet out -> in
3948         p = (
3949             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3950             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3951             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
3952         )
3953
3954         self.send_and_expect(self.pg1, p, self.pg0)
3955
3956         # retransmit ACK packet in -> out
3957         p = (
3958             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3959             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3960             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3961         )
3962         self.send_and_expect(self.pg0, p, self.pg1)
3963
3964         self.virtual_sleep(3)
3965         # retransmit ACK packet in -> out - this will cause session to be wiped
3966         p = (
3967             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3968             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3969             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3970         )
3971         self.send_and_assert_no_replies(self.pg0, p)
3972         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3973         self.assertEqual(len(sessions) - session_n, 0)
3974
3975     def test_tcp_session_close_out(self):
3976         """NAT44ED Close TCP session from outside network"""
3977
3978         in_port = self.tcp_port_in
3979         out_port = 10505
3980         ext_port = self.tcp_external_port
3981
3982         self.nat_add_address(self.nat_addr)
3983         self.nat_add_inside_interface(self.pg0)
3984         self.nat_add_outside_interface(self.pg1)
3985         self.nat_add_static_mapping(
3986             self.pg0.remote_ip4,
3987             self.nat_addr,
3988             in_port,
3989             out_port,
3990             proto=IP_PROTOS.tcp,
3991             flags=self.config_flags.NAT_IS_TWICE_NAT,
3992         )
3993
3994         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3995         session_n = len(sessions)
3996
3997         self.vapi.nat_set_timeouts(
3998             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
3999         )
4000
4001         _ = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4002
4003         # FIN packet out -> in
4004         p = (
4005             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4006             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4007             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=100, ack=300)
4008         )
4009         self.pg1.add_stream(p)
4010         self.pg_enable_capture(self.pg_interfaces)
4011         self.pg_start()
4012         self.pg0.get_capture(1)
4013
4014         # FIN+ACK packet in -> out
4015         p = (
4016             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4017             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4018             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=300, ack=101)
4019         )
4020
4021         self.pg0.add_stream(p)
4022         self.pg_enable_capture(self.pg_interfaces)
4023         self.pg_start()
4024         self.pg1.get_capture(1)
4025
4026         # ACK packet out -> in
4027         p = (
4028             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4029             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4030             / TCP(sport=ext_port, dport=out_port, flags="A", seq=101, ack=301)
4031         )
4032         self.pg1.add_stream(p)
4033         self.pg_enable_capture(self.pg_interfaces)
4034         self.pg_start()
4035         self.pg0.get_capture(1)
4036
4037         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4038         self.assertEqual(len(sessions) - session_n, 1)
4039
4040         # retransmit FIN packet out -> in
4041         p = (
4042             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4043             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4044             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
4045         )
4046         self.send_and_expect(self.pg1, p, self.pg0)
4047
4048         # retransmit ACK packet in -> out
4049         p = (
4050             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4051             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4052             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4053         )
4054         self.send_and_expect(self.pg0, p, self.pg1)
4055
4056         self.virtual_sleep(3)
4057         # retransmit ACK packet in -> out - this will cause session to be wiped
4058         p = (
4059             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4060             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4061             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4062         )
4063         self.send_and_assert_no_replies(self.pg0, p)
4064         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4065         self.assertEqual(len(sessions) - session_n, 0)
4066
4067     def test_tcp_session_close_simultaneous(self):
4068         """Simultaneous TCP close from both sides"""
4069
4070         in_port = self.tcp_port_in
4071         ext_port = 10505
4072
4073         self.nat_add_address(self.nat_addr)
4074         self.nat_add_inside_interface(self.pg0)
4075         self.nat_add_outside_interface(self.pg1)
4076         self.nat_add_static_mapping(
4077             self.pg0.remote_ip4,
4078             self.nat_addr,
4079             in_port,
4080             ext_port,
4081             proto=IP_PROTOS.tcp,
4082             flags=self.config_flags.NAT_IS_TWICE_NAT,
4083         )
4084
4085         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4086         session_n = len(sessions)
4087
4088         self.vapi.nat_set_timeouts(
4089             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4090         )
4091
4092         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4093
4094         # FIN packet in -> out
4095         p = (
4096             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4097             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4098             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4099         )
4100         self.send_and_expect(self.pg0, p, self.pg1)
4101
4102         # FIN packet out -> in
4103         p = (
4104             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4105             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4106             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4107         )
4108         self.send_and_expect(self.pg1, p, self.pg0)
4109
4110         # ACK packet in -> out
4111         p = (
4112             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4113             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4114             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4115         )
4116         self.send_and_expect(self.pg0, p, self.pg1)
4117
4118         # ACK packet out -> in
4119         p = (
4120             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4121             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4122             / TCP(sport=ext_port, dport=out_port, flags="A", seq=301, ack=101)
4123         )
4124         self.send_and_expect(self.pg1, p, self.pg0)
4125
4126         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4127         self.assertEqual(len(sessions) - session_n, 1)
4128
4129         # retransmit FIN packet out -> in
4130         p = (
4131             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4132             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4133             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
4134         )
4135         self.send_and_expect(self.pg1, p, self.pg0)
4136
4137         # retransmit ACK packet in -> out
4138         p = (
4139             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4140             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4141             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4142         )
4143         self.send_and_expect(self.pg0, p, self.pg1)
4144
4145         self.virtual_sleep(3)
4146         # retransmit ACK packet in -> out - this will cause session to be wiped
4147         p = (
4148             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4149             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4150             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4151         )
4152         self.pg_send(self.pg0, p)
4153         self.send_and_assert_no_replies(self.pg0, p)
4154         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4155         self.assertEqual(len(sessions) - session_n, 0)
4156
4157     def test_tcp_session_half_reopen_inside(self):
4158         """TCP session in FIN/FIN state not reopened by in2out SYN only"""
4159         in_port = self.tcp_port_in
4160         ext_port = 10505
4161
4162         self.nat_add_address(self.nat_addr)
4163         self.nat_add_inside_interface(self.pg0)
4164         self.nat_add_outside_interface(self.pg1)
4165         self.nat_add_static_mapping(
4166             self.pg0.remote_ip4,
4167             self.nat_addr,
4168             in_port,
4169             ext_port,
4170             proto=IP_PROTOS.tcp,
4171             flags=self.config_flags.NAT_IS_TWICE_NAT,
4172         )
4173
4174         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4175         session_n = len(sessions)
4176
4177         self.vapi.nat_set_timeouts(
4178             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4179         )
4180
4181         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4182
4183         # FIN packet in -> out
4184         p = (
4185             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4186             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4187             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4188         )
4189         self.send_and_expect(self.pg0, p, self.pg1)
4190
4191         # FIN packet out -> in
4192         p = (
4193             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4194             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4195             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4196         )
4197         self.send_and_expect(self.pg1, p, self.pg0)
4198
4199         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4200         self.assertEqual(len(sessions) - session_n, 1)
4201
4202         # send SYN packet in -> out
4203         p = (
4204             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4205             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4206             / TCP(sport=in_port, dport=ext_port, flags="S", seq=101, ack=301)
4207         )
4208         self.send_and_expect(self.pg0, p, self.pg1)
4209
4210         self.virtual_sleep(3)
4211         # send ACK packet in -> out - session should be wiped
4212         p = (
4213             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4214             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4215             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4216         )
4217         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
4218         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4219         self.assertEqual(len(sessions) - session_n, 0)
4220
4221     def test_tcp_session_half_reopen_outside(self):
4222         """TCP session in FIN/FIN state not reopened by out2in SYN only"""
4223         in_port = self.tcp_port_in
4224         ext_port = 10505
4225
4226         self.nat_add_address(self.nat_addr)
4227         self.nat_add_inside_interface(self.pg0)
4228         self.nat_add_outside_interface(self.pg1)
4229         self.nat_add_static_mapping(
4230             self.pg0.remote_ip4,
4231             self.nat_addr,
4232             in_port,
4233             ext_port,
4234             proto=IP_PROTOS.tcp,
4235             flags=self.config_flags.NAT_IS_TWICE_NAT,
4236         )
4237
4238         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4239         session_n = len(sessions)
4240
4241         self.vapi.nat_set_timeouts(
4242             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4243         )
4244
4245         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4246
4247         # FIN packet in -> out
4248         p = (
4249             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4250             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4251             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4252         )
4253         self.send_and_expect(self.pg0, p, self.pg1)
4254
4255         # FIN packet out -> in
4256         p = (
4257             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4258             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4259             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4260         )
4261         self.send_and_expect(self.pg1, p, self.pg0)
4262
4263         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4264         self.assertEqual(len(sessions) - session_n, 1)
4265
4266         # send SYN packet out -> in
4267         p = (
4268             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4269             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4270             / TCP(sport=ext_port, dport=out_port, flags="S", seq=300, ack=101)
4271         )
4272         self.send_and_expect(self.pg1, p, self.pg0)
4273
4274         self.virtual_sleep(3)
4275         # send ACK packet in -> out - session should be wiped
4276         p = (
4277             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4278             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4279             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4280         )
4281         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
4282         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4283         self.assertEqual(len(sessions) - session_n, 0)
4284
4285     def test_tcp_session_reopen(self):
4286         """TCP session in FIN/FIN state reopened by SYN from both sides"""
4287         in_port = self.tcp_port_in
4288         ext_port = 10505
4289
4290         self.nat_add_address(self.nat_addr)
4291         self.nat_add_inside_interface(self.pg0)
4292         self.nat_add_outside_interface(self.pg1)
4293         self.nat_add_static_mapping(
4294             self.pg0.remote_ip4,
4295             self.nat_addr,
4296             in_port,
4297             ext_port,
4298             proto=IP_PROTOS.tcp,
4299             flags=self.config_flags.NAT_IS_TWICE_NAT,
4300         )
4301
4302         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4303         session_n = len(sessions)
4304
4305         self.vapi.nat_set_timeouts(
4306             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4307         )
4308
4309         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4310
4311         # FIN packet in -> out
4312         p = (
4313             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4314             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4315             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4316         )
4317         self.send_and_expect(self.pg0, p, self.pg1)
4318
4319         # FIN packet out -> in
4320         p = (
4321             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4322             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4323             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4324         )
4325         self.send_and_expect(self.pg1, p, self.pg0)
4326
4327         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4328         self.assertEqual(len(sessions) - session_n, 1)
4329
4330         # send SYN packet out -> in
4331         p = (
4332             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4333             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4334             / TCP(sport=ext_port, dport=out_port, flags="S", seq=300, ack=101)
4335         )
4336         self.send_and_expect(self.pg1, p, self.pg0)
4337
4338         # send SYN packet in -> out
4339         p = (
4340             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4341             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4342             / TCP(sport=in_port, dport=ext_port, flags="SA", seq=101, ack=301)
4343         )
4344         self.send_and_expect(self.pg0, p, self.pg1)
4345
4346         # send ACK packet out -> in
4347         p = (
4348             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4349             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4350             / TCP(sport=ext_port, dport=out_port, flags="A", seq=300, ack=101)
4351         )
4352         self.send_and_expect(self.pg1, p, self.pg0)
4353
4354         self.virtual_sleep(3)
4355         # send ACK packet in -> out - should be forwarded and session alive
4356         p = (
4357             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4358             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4359             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4360         )
4361         self.send_and_expect(self.pg0, p, self.pg1)
4362         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4363         self.assertEqual(len(sessions) - session_n, 1)
4364
4365     def test_dynamic_vrf(self):
4366         """NAT44ED dynamic translation test: different VRF"""
4367
4368         vrf_id_in = 33
4369         vrf_id_out = 34
4370
4371         self.nat_add_address(self.nat_addr, vrf_id=vrf_id_in)
4372
4373         try:
4374             self.configure_ip4_interface(self.pg7, table_id=vrf_id_in)
4375             self.configure_ip4_interface(self.pg8, table_id=vrf_id_out)
4376
4377             self.nat_add_inside_interface(self.pg7)
4378             self.nat_add_outside_interface(self.pg8)
4379
4380             # just basic stuff nothing special
4381             pkts = self.create_stream_in(self.pg7, self.pg8)
4382             self.pg7.add_stream(pkts)
4383             self.pg_enable_capture(self.pg_interfaces)
4384             self.pg_start()
4385             capture = self.pg8.get_capture(len(pkts))
4386             self.verify_capture_out(capture, ignore_port=True)
4387
4388             pkts = self.create_stream_out(self.pg8)
4389             self.pg8.add_stream(pkts)
4390             self.pg_enable_capture(self.pg_interfaces)
4391             self.pg_start()
4392             capture = self.pg7.get_capture(len(pkts))
4393             self.verify_capture_in(capture, self.pg7)
4394
4395         finally:
4396             self.pg7.unconfig()
4397             self.pg8.unconfig()
4398
4399             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id_in})
4400             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id_out})
4401
4402     def test_dynamic_output_feature_vrf(self):
4403         """NAT44ED dynamic translation test: output-feature, VRF"""
4404
4405         # other then default (0)
4406         new_vrf_id = 22
4407
4408         self.nat_add_address(self.nat_addr)
4409         self.vapi.nat44_ed_add_del_output_interface(
4410             sw_if_index=self.pg8.sw_if_index, is_add=1
4411         )
4412         try:
4413             self.configure_ip4_interface(self.pg7, table_id=new_vrf_id)
4414             self.configure_ip4_interface(self.pg8, table_id=new_vrf_id)
4415
4416             # in2out
4417             tcpn = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
4418             udpn = self.statistics["/nat44-ed/in2out/slowpath/udp"]
4419             icmpn = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
4420             drops = self.statistics["/nat44-ed/in2out/slowpath/drops"]
4421
4422             pkts = self.create_stream_in(self.pg7, self.pg8)
4423             self.pg7.add_stream(pkts)
4424             self.pg_enable_capture(self.pg_interfaces)
4425             self.pg_start()
4426             capture = self.pg8.get_capture(len(pkts))
4427             self.verify_capture_out(capture, ignore_port=True)
4428
4429             if_idx = self.pg8.sw_if_index
4430             cnt = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
4431             self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
4432             cnt = self.statistics["/nat44-ed/in2out/slowpath/udp"]
4433             self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
4434             cnt = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
4435             self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
4436             cnt = self.statistics["/nat44-ed/in2out/slowpath/drops"]
4437             self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
4438
4439             # out2in
4440             tcpn = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
4441             udpn = self.statistics["/nat44-ed/out2in/fastpath/udp"]
4442             icmpn = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
4443             drops = self.statistics["/nat44-ed/out2in/fastpath/drops"]
4444
4445             pkts = self.create_stream_out(self.pg8)
4446             self.pg8.add_stream(pkts)
4447             self.pg_enable_capture(self.pg_interfaces)
4448             self.pg_start()
4449             capture = self.pg7.get_capture(len(pkts))
4450             self.verify_capture_in(capture, self.pg7)
4451
4452             if_idx = self.pg8.sw_if_index
4453             cnt = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
4454             self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
4455             cnt = self.statistics["/nat44-ed/out2in/fastpath/udp"]
4456             self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
4457             cnt = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
4458             self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
4459             cnt = self.statistics["/nat44-ed/out2in/fastpath/drops"]
4460             self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
4461
4462             sessions = self.statistics["/nat44-ed/total-sessions"]
4463             self.assertEqual(sessions[:, 0].sum(), 3)
4464
4465         finally:
4466             self.pg7.unconfig()
4467             self.pg8.unconfig()
4468
4469             self.vapi.ip_table_add_del(is_add=0, table={"table_id": new_vrf_id})
4470
4471     def test_next_src_nat(self):
4472         """NAT44ED On way back forward packet to nat44-in2out node."""
4473
4474         twice_nat_addr = "10.0.1.3"
4475         external_port = 80
4476         local_port = 8080
4477         post_twice_nat_port = 0
4478
4479         self.vapi.nat44_forwarding_enable_disable(enable=1)
4480         self.nat_add_address(twice_nat_addr, twice_nat=1)
4481         flags = (
4482             self.config_flags.NAT_IS_OUT2IN_ONLY
4483             | self.config_flags.NAT_IS_SELF_TWICE_NAT
4484         )
4485         self.nat_add_static_mapping(
4486             self.pg6.remote_ip4,
4487             self.pg1.remote_ip4,
4488             local_port,
4489             external_port,
4490             proto=IP_PROTOS.tcp,
4491             vrf_id=1,
4492             flags=flags,
4493         )
4494         self.vapi.nat44_interface_add_del_feature(
4495             sw_if_index=self.pg6.sw_if_index, is_add=1
4496         )
4497
4498         p = (
4499             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
4500             / IP(src=self.pg6.remote_ip4, dst=self.pg1.remote_ip4)
4501             / TCP(sport=12345, dport=external_port)
4502         )
4503         self.pg6.add_stream(p)
4504         self.pg_enable_capture(self.pg_interfaces)
4505         self.pg_start()
4506         capture = self.pg6.get_capture(1)
4507         p = capture[0]
4508         try:
4509             ip = p[IP]
4510             tcp = p[TCP]
4511             self.assertEqual(ip.src, twice_nat_addr)
4512             self.assertNotEqual(tcp.sport, 12345)
4513             post_twice_nat_port = tcp.sport
4514             self.assertEqual(ip.dst, self.pg6.remote_ip4)
4515             self.assertEqual(tcp.dport, local_port)
4516             self.assert_packet_checksums_valid(p)
4517         except:
4518             self.logger.error(ppp("Unexpected or invalid packet:", p))
4519             raise
4520
4521         p = (
4522             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
4523             / IP(src=self.pg6.remote_ip4, dst=twice_nat_addr)
4524             / TCP(sport=local_port, dport=post_twice_nat_port)
4525         )
4526         self.pg6.add_stream(p)
4527         self.pg_enable_capture(self.pg_interfaces)
4528         self.pg_start()
4529         capture = self.pg6.get_capture(1)
4530         p = capture[0]
4531         try:
4532             ip = p[IP]
4533             tcp = p[TCP]
4534             self.assertEqual(ip.src, self.pg1.remote_ip4)
4535             self.assertEqual(tcp.sport, external_port)
4536             self.assertEqual(ip.dst, self.pg6.remote_ip4)
4537             self.assertEqual(tcp.dport, 12345)
4538             self.assert_packet_checksums_valid(p)
4539         except:
4540             self.logger.error(ppp("Unexpected or invalid packet:", p))
4541             raise
4542
4543     def test_one_armed_nat44_static(self):
4544         """NAT44ED One armed NAT and 1:1 NAPT asymmetrical rule"""
4545
4546         remote_host = self.pg4.remote_hosts[0]
4547         local_host = self.pg4.remote_hosts[1]
4548         external_port = 80
4549         local_port = 8080
4550         eh_port_in = 0
4551
4552         self.vapi.nat44_forwarding_enable_disable(enable=1)
4553         self.nat_add_address(self.nat_addr, twice_nat=1)
4554         flags = (
4555             self.config_flags.NAT_IS_OUT2IN_ONLY | self.config_flags.NAT_IS_TWICE_NAT
4556         )
4557         self.nat_add_static_mapping(
4558             local_host.ip4,
4559             self.nat_addr,
4560             local_port,
4561             external_port,
4562             proto=IP_PROTOS.tcp,
4563             flags=flags,
4564         )
4565         flags = self.config_flags.NAT_IS_INSIDE
4566         self.vapi.nat44_interface_add_del_feature(
4567             sw_if_index=self.pg4.sw_if_index, is_add=1
4568         )
4569         self.vapi.nat44_interface_add_del_feature(
4570             sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
4571         )
4572
4573         # from client to service
4574         p = (
4575             Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
4576             / IP(src=remote_host.ip4, dst=self.nat_addr)
4577             / TCP(sport=12345, dport=external_port)
4578         )
4579         self.pg4.add_stream(p)
4580         self.pg_enable_capture(self.pg_interfaces)
4581         self.pg_start()
4582         capture = self.pg4.get_capture(1)
4583         p = capture[0]
4584         try:
4585             ip = p[IP]
4586             tcp = p[TCP]
4587             self.assertEqual(ip.dst, local_host.ip4)
4588             self.assertEqual(ip.src, self.nat_addr)
4589             self.assertEqual(tcp.dport, local_port)
4590             self.assertNotEqual(tcp.sport, 12345)
4591             eh_port_in = tcp.sport
4592             self.assert_packet_checksums_valid(p)
4593         except:
4594             self.logger.error(ppp("Unexpected or invalid packet:", p))
4595             raise
4596
4597         # from service back to client
4598         p = (
4599             Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
4600             / IP(src=local_host.ip4, dst=self.nat_addr)
4601             / TCP(sport=local_port, dport=eh_port_in)
4602         )
4603         self.pg4.add_stream(p)
4604         self.pg_enable_capture(self.pg_interfaces)
4605         self.pg_start()
4606         capture = self.pg4.get_capture(1)
4607         p = capture[0]
4608         try:
4609             ip = p[IP]
4610             tcp = p[TCP]
4611             self.assertEqual(ip.src, self.nat_addr)
4612             self.assertEqual(ip.dst, remote_host.ip4)
4613             self.assertEqual(tcp.sport, external_port)
4614             self.assertEqual(tcp.dport, 12345)
4615             self.assert_packet_checksums_valid(p)
4616         except:
4617             self.logger.error(ppp("Unexpected or invalid packet:", p))
4618             raise
4619
4620     def test_icmp_error_fwd_outbound(self):
4621         """NAT44ED ICMP error outbound with forwarding enabled"""
4622
4623         # Ensure that an outbound ICMP error message is properly associated
4624         # with the inbound forward bypass session it is related to.
4625         payload = "H" * 10
4626
4627         self.nat_add_address(self.nat_addr)
4628         self.nat_add_inside_interface(self.pg0)
4629         self.nat_add_outside_interface(self.pg1)
4630
4631         # enable forwarding and initiate connection out2in
4632         self.vapi.nat44_forwarding_enable_disable(enable=1)
4633         p1 = (
4634             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4635             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
4636             / UDP(sport=21, dport=20)
4637             / payload
4638         )
4639
4640         self.pg1.add_stream(p1)
4641         self.pg_enable_capture(self.pg_interfaces)
4642         self.pg_start()
4643         capture = self.pg0.get_capture(1)[0]
4644
4645         self.logger.info(self.vapi.cli("show nat44 sessions"))
4646
4647         # reply with ICMP error message in2out
4648         # We cannot reliably retrieve forward bypass sessions via the API.
4649         # session dumps for a user will only look on the worker that the
4650         # user is supposed to be mapped to in2out. The forward bypass session
4651         # is not necessarily created on that worker.
4652         p2 = (
4653             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4654             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4655             / ICMP(type="dest-unreach", code="port-unreachable")
4656             / capture[IP:]
4657         )
4658
4659         self.pg0.add_stream(p2)
4660         self.pg_enable_capture(self.pg_interfaces)
4661         self.pg_start()
4662         capture = self.pg1.get_capture(1)[0]
4663
4664         self.logger.info(self.vapi.cli("show nat44 sessions"))
4665
4666         self.logger.info(ppp("p1 packet:", p1))
4667         self.logger.info(ppp("p2 packet:", p2))
4668         self.logger.info(ppp("capture packet:", capture))
4669
4670     def test_tcp_session_open_retransmit1(self):
4671         """NAT44ED Open TCP session with SYN,ACK retransmit 1
4672
4673         The client does not receive the [SYN,ACK] or the
4674         ACK from the client is lost. Therefore, the [SYN, ACK]
4675         is retransmitted by the server.
4676         """
4677
4678         in_port = self.tcp_port_in
4679         ext_port = self.tcp_external_port
4680         payload = "H" * 10
4681
4682         self.nat_add_address(self.nat_addr)
4683         self.nat_add_inside_interface(self.pg0)
4684         self.nat_add_outside_interface(self.pg1)
4685
4686         self.vapi.nat_set_timeouts(
4687             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
4688         )
4689         # SYN packet in->out
4690         p = (
4691             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4692             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4693             / TCP(sport=in_port, dport=ext_port, flags="S")
4694         )
4695         p = self.send_and_expect(self.pg0, p, self.pg1)[0]
4696         out_port = p[TCP].sport
4697
4698         # SYN + ACK packet out->in
4699         p = (
4700             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4701             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4702             / TCP(sport=ext_port, dport=out_port, flags="SA")
4703         )
4704         self.send_and_expect(self.pg1, p, self.pg0)
4705
4706         # ACK in->out does not arrive
4707
4708         # resent SYN + ACK packet out->in
4709         p = (
4710             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4711             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4712             / TCP(sport=ext_port, dport=out_port, flags="SA")
4713         )
4714         self.send_and_expect(self.pg1, p, self.pg0)
4715
4716         # ACK packet in->out
4717         p = (
4718             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4719             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4720             / TCP(sport=in_port, dport=ext_port, flags="A")
4721         )
4722         self.send_and_expect(self.pg0, p, self.pg1)
4723
4724         # Verify that the data can be transmitted after the transitory time
4725         self.virtual_sleep(6)
4726
4727         p = (
4728             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4729             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4730             / TCP(sport=in_port, dport=ext_port, flags="PA")
4731             / Raw(payload)
4732         )
4733         self.send_and_expect(self.pg0, p, self.pg1)
4734
4735     def test_tcp_session_open_retransmit2(self):
4736         """NAT44ED Open TCP session with SYN,ACK retransmit 2
4737
4738         The ACK is lost to the server after the TCP session is opened.
4739         Data is sent by the client, then the [SYN,ACK] is
4740         retransmitted by the server.
4741         """
4742
4743         in_port = self.tcp_port_in
4744         ext_port = self.tcp_external_port
4745         payload = "H" * 10
4746
4747         self.nat_add_address(self.nat_addr)
4748         self.nat_add_inside_interface(self.pg0)
4749         self.nat_add_outside_interface(self.pg1)
4750
4751         self.vapi.nat_set_timeouts(
4752             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
4753         )
4754         # SYN packet in->out
4755         p = (
4756             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4757             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4758             / TCP(sport=in_port, dport=ext_port, flags="S")
4759         )
4760         p = self.send_and_expect(self.pg0, p, self.pg1)[0]
4761         out_port = p[TCP].sport
4762
4763         # SYN + ACK packet out->in
4764         p = (
4765             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4766             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4767             / TCP(sport=ext_port, dport=out_port, flags="SA")
4768         )
4769         self.send_and_expect(self.pg1, p, self.pg0)
4770
4771         # ACK packet in->out -- not received by the server
4772         p = (
4773             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4774             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4775             / TCP(sport=in_port, dport=ext_port, flags="A")
4776         )
4777         self.send_and_expect(self.pg0, p, self.pg1)
4778
4779         # PUSH + ACK packet in->out
4780         p = (
4781             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4782             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4783             / TCP(sport=in_port, dport=ext_port, flags="PA")
4784             / Raw(payload)
4785         )
4786         self.send_and_expect(self.pg0, p, self.pg1)
4787
4788         # resent SYN + ACK packet out->in
4789         p = (
4790             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4791             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4792             / TCP(sport=ext_port, dport=out_port, flags="SA")
4793         )
4794         self.send_and_expect(self.pg1, p, self.pg0)
4795
4796         # resent ACK packet in->out
4797         p = (
4798             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4799             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4800             / TCP(sport=in_port, dport=ext_port, flags="A")
4801         )
4802         self.send_and_expect(self.pg0, p, self.pg1)
4803
4804         # resent PUSH + ACK packet in->out
4805         p = (
4806             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4807             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4808             / TCP(sport=in_port, dport=ext_port, flags="PA")
4809             / Raw(payload)
4810         )
4811         self.send_and_expect(self.pg0, p, self.pg1)
4812
4813         # ACK packet out->in
4814         p = (
4815             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4816             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4817             / TCP(sport=ext_port, dport=out_port, flags="A")
4818         )
4819         self.send_and_expect(self.pg1, p, self.pg0)
4820
4821         # Verify that the data can be transmitted after the transitory time
4822         self.virtual_sleep(6)
4823
4824         p = (
4825             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4826             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4827             / TCP(sport=in_port, dport=ext_port, flags="PA")
4828             / Raw(payload)
4829         )
4830         self.send_and_expect(self.pg0, p, self.pg1)
4831
4832     def test_dynamic_ports_exhausted(self):
4833         """NAT44ED dynamic translation test: address ports exhaused"""
4834
4835         sessions_per_batch = 128
4836         n_available_ports = 65536 - 1024
4837         n_sessions = n_available_ports + 2 * sessions_per_batch
4838
4839         # set high enough session limit for ports to be exhausted
4840         self.plugin_disable()
4841         self.plugin_enable(max_sessions=n_sessions)
4842
4843         self.nat_add_inside_interface(self.pg0)
4844         self.nat_add_outside_interface(self.pg1)
4845
4846         # set timeouts to high for sessions to reallistically expire
4847         config = self.vapi.nat44_show_running_config()
4848         old_timeouts = config.timeouts
4849         self.vapi.nat_set_timeouts(
4850             udp=21600,
4851             tcp_established=old_timeouts.tcp_established,
4852             tcp_transitory=old_timeouts.tcp_transitory,
4853             icmp=old_timeouts.icmp,
4854         )
4855
4856         # in2out after NAT addresses added
4857         self.nat_add_address(self.nat_addr)
4858
4859         for i in range(n_sessions // sessions_per_batch):
4860             pkts = self.create_udp_stream(
4861                 self.pg0,
4862                 self.pg1,
4863                 sessions_per_batch,
4864                 base_port=i * sessions_per_batch + 100,
4865             )
4866
4867             self.pg0.add_stream(pkts)
4868             self.pg_start()
4869
4870             err = self.statistics.get_err_counter(
4871                 "/err/nat44-ed-in2out-slowpath/out of ports"
4872             )
4873             if err > sessions_per_batch:
4874                 break
4875
4876         # Check for ports to be used no more than once
4877         ports = set()
4878         sessions = self.vapi.cli("show nat44 sessions")
4879         rx = re.compile(
4880             f" *o2i flow: match: saddr {self.pg1.remote_ip4} sport [0-9]+ daddr {self.nat_addr} dport ([0-9]+) proto UDP.*"
4881         )
4882         for line in sessions.splitlines():
4883             m = rx.match(line)
4884             if m:
4885                 port = int(m.groups()[0])
4886                 self.assertNotIn(port, ports)
4887                 ports.add(port)
4888
4889         self.assertGreaterEqual(err, sessions_per_batch)
4890
4891
4892 if __name__ == "__main__":
4893     unittest.main(testRunner=VppTestRunner)