vppinfra: add bit_extract_u32 and bit_extract_u64
[vpp.git] / test / test_nat44_ed.py
1 #!/usr/bin/env python3
2
3 import unittest
4 from io import BytesIO
5 from random import randint, choice
6
7 import re
8 import scapy.compat
9 from framework import tag_fixme_ubuntu2204, is_distro_ubuntu2204
10 from framework import VppTestCase, VppTestRunner, VppLoInterface
11 from scapy.data import IP_PROTOS
12 from scapy.layers.inet import IP, TCP, UDP, ICMP, GRE
13 from scapy.layers.inet import IPerror, TCPerror
14 from scapy.layers.l2 import Ether
15 from scapy.packet import Raw
16 from syslog_rfc5424_parser import SyslogMessage, ParseError
17 from syslog_rfc5424_parser.constants import SyslogSeverity
18 from util import ppp, pr, ip4_range
19 from vpp_acl import AclRule, VppAcl, VppAclInterface
20 from vpp_ip_route import VppIpRoute, VppRoutePath
21 from vpp_papi import VppEnum
22 from util import StatsDiff
23
24
25 class TestNAT44ED(VppTestCase):
26     """NAT44ED Test Case"""
27
28     nat_addr = "10.0.10.3"
29
30     tcp_port_in = 6303
31     tcp_port_out = 6303
32
33     udp_port_in = 6304
34     udp_port_out = 6304
35
36     icmp_id_in = 6305
37     icmp_id_out = 6305
38
39     tcp_external_port = 80
40
41     max_sessions = 100
42
43     def setUp(self):
44         super().setUp()
45         self.plugin_enable()
46
47     def tearDown(self):
48         super().tearDown()
49         if not self.vpp_dead:
50             self.plugin_disable()
51
52     def plugin_enable(self, max_sessions=None):
53         max_sessions = max_sessions or self.max_sessions
54         self.vapi.nat44_ed_plugin_enable_disable(sessions=max_sessions, enable=1)
55
56     def plugin_disable(self):
57         self.vapi.nat44_ed_plugin_enable_disable(enable=0)
58
59     @property
60     def config_flags(self):
61         return VppEnum.vl_api_nat_config_flags_t
62
63     @property
64     def nat44_config_flags(self):
65         return VppEnum.vl_api_nat44_config_flags_t
66
67     @property
68     def syslog_severity(self):
69         return VppEnum.vl_api_syslog_severity_t
70
71     @property
72     def server_addr(self):
73         return self.pg1.remote_hosts[0].ip4
74
75     @staticmethod
76     def random_port():
77         return randint(1024, 65535)
78
79     @staticmethod
80     def proto2layer(proto):
81         if proto == IP_PROTOS.tcp:
82             return TCP
83         elif proto == IP_PROTOS.udp:
84             return UDP
85         elif proto == IP_PROTOS.icmp:
86             return ICMP
87         else:
88             raise Exception("Unsupported protocol")
89
90     @classmethod
91     def create_and_add_ip4_table(cls, i, table_id=0):
92         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": table_id})
93         i.set_table_ip4(table_id)
94
95     @classmethod
96     def configure_ip4_interface(cls, i, hosts=0, table_id=None):
97         if table_id:
98             cls.create_and_add_ip4_table(i, table_id)
99
100         i.admin_up()
101         i.config_ip4()
102         i.resolve_arp()
103
104         if hosts:
105             i.generate_remote_hosts(hosts)
106             i.configure_ipv4_neighbors()
107
108     @classmethod
109     def nat_add_interface_address(cls, i):
110         cls.vapi.nat44_add_del_interface_addr(sw_if_index=i.sw_if_index, is_add=1)
111
112     def nat_add_inside_interface(self, i):
113         self.vapi.nat44_interface_add_del_feature(
114             flags=self.config_flags.NAT_IS_INSIDE, sw_if_index=i.sw_if_index, is_add=1
115         )
116
117     def nat_add_outside_interface(self, i):
118         self.vapi.nat44_interface_add_del_feature(
119             flags=self.config_flags.NAT_IS_OUTSIDE, sw_if_index=i.sw_if_index, is_add=1
120         )
121
122     def nat_add_address(self, address, twice_nat=0, vrf_id=0xFFFFFFFF, is_add=1):
123         flags = self.config_flags.NAT_IS_TWICE_NAT if twice_nat else 0
124         self.vapi.nat44_add_del_address_range(
125             first_ip_address=address,
126             last_ip_address=address,
127             vrf_id=vrf_id,
128             is_add=is_add,
129             flags=flags,
130         )
131
132     def nat_add_static_mapping(
133         self,
134         local_ip,
135         external_ip="0.0.0.0",
136         local_port=0,
137         external_port=0,
138         vrf_id=0,
139         is_add=1,
140         external_sw_if_index=0xFFFFFFFF,
141         proto=0,
142         tag="",
143         flags=0,
144     ):
145
146         if not (local_port and external_port):
147             flags |= self.config_flags.NAT_IS_ADDR_ONLY
148
149         self.vapi.nat44_add_del_static_mapping(
150             is_add=is_add,
151             local_ip_address=local_ip,
152             external_ip_address=external_ip,
153             external_sw_if_index=external_sw_if_index,
154             local_port=local_port,
155             external_port=external_port,
156             vrf_id=vrf_id,
157             protocol=proto,
158             flags=flags,
159             tag=tag,
160         )
161
162     @classmethod
163     def setUpClass(cls):
164         super().setUpClass()
165         if is_distro_ubuntu2204 == True and not hasattr(cls, "vpp"):
166             return
167
168         cls.create_pg_interfaces(range(12))
169         cls.interfaces = list(cls.pg_interfaces[:4])
170
171         cls.create_and_add_ip4_table(cls.pg2, 10)
172
173         for i in cls.interfaces:
174             cls.configure_ip4_interface(i, hosts=3)
175
176         # test specific (test-multiple-vrf)
177         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 1})
178
179         # test specific (test-one-armed-nat44-static)
180         cls.pg4.generate_remote_hosts(2)
181         cls.pg4.config_ip4()
182         cls.vapi.sw_interface_add_del_address(
183             sw_if_index=cls.pg4.sw_if_index, prefix="10.0.0.1/24"
184         )
185         cls.pg4.admin_up()
186         cls.pg4.resolve_arp()
187         cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
188         cls.pg4.resolve_arp()
189
190         # test specific interface (pg5)
191         cls.pg5._local_ip4 = "10.1.1.1"
192         cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
193         cls.pg5.set_table_ip4(1)
194         cls.pg5.config_ip4()
195         cls.pg5.admin_up()
196         cls.pg5.resolve_arp()
197
198         # test specific interface (pg6)
199         cls.pg6._local_ip4 = "10.1.2.1"
200         cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
201         cls.pg6.set_table_ip4(1)
202         cls.pg6.config_ip4()
203         cls.pg6.admin_up()
204         cls.pg6.resolve_arp()
205
206         rl = list()
207
208         rl.append(
209             VppIpRoute(
210                 cls,
211                 "0.0.0.0",
212                 0,
213                 [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=0)],
214                 register=False,
215                 table_id=1,
216             )
217         )
218         rl.append(
219             VppIpRoute(
220                 cls,
221                 "0.0.0.0",
222                 0,
223                 [VppRoutePath(cls.pg1.local_ip4, cls.pg1.sw_if_index)],
224                 register=False,
225             )
226         )
227         rl.append(
228             VppIpRoute(
229                 cls,
230                 cls.pg5.remote_ip4,
231                 32,
232                 [VppRoutePath("0.0.0.0", cls.pg5.sw_if_index)],
233                 register=False,
234                 table_id=1,
235             )
236         )
237         rl.append(
238             VppIpRoute(
239                 cls,
240                 cls.pg6.remote_ip4,
241                 32,
242                 [VppRoutePath("0.0.0.0", cls.pg6.sw_if_index)],
243                 register=False,
244                 table_id=1,
245             )
246         )
247         rl.append(
248             VppIpRoute(
249                 cls,
250                 cls.pg6.remote_ip4,
251                 16,
252                 [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=1)],
253                 register=False,
254                 table_id=0,
255             )
256         )
257
258         for r in rl:
259             r.add_vpp_config()
260
261         cls.no_diff = StatsDiff(
262             {
263                 pg.sw_if_index: {
264                     "/nat44-ed/in2out/fastpath/tcp": 0,
265                     "/nat44-ed/in2out/fastpath/udp": 0,
266                     "/nat44-ed/in2out/fastpath/icmp": 0,
267                     "/nat44-ed/in2out/fastpath/drops": 0,
268                     "/nat44-ed/in2out/slowpath/tcp": 0,
269                     "/nat44-ed/in2out/slowpath/udp": 0,
270                     "/nat44-ed/in2out/slowpath/icmp": 0,
271                     "/nat44-ed/in2out/slowpath/drops": 0,
272                     "/nat44-ed/in2out/fastpath/tcp": 0,
273                     "/nat44-ed/in2out/fastpath/udp": 0,
274                     "/nat44-ed/in2out/fastpath/icmp": 0,
275                     "/nat44-ed/in2out/fastpath/drops": 0,
276                     "/nat44-ed/in2out/slowpath/tcp": 0,
277                     "/nat44-ed/in2out/slowpath/udp": 0,
278                     "/nat44-ed/in2out/slowpath/icmp": 0,
279                     "/nat44-ed/in2out/slowpath/drops": 0,
280                 }
281                 for pg in cls.pg_interfaces
282             }
283         )
284
285     def get_err_counter(self, path):
286         return self.statistics.get_err_counter(path)
287
288     def reass_hairpinning(
289         self,
290         server_addr,
291         server_in_port,
292         server_out_port,
293         host_in_port,
294         proto=IP_PROTOS.tcp,
295         ignore_port=False,
296     ):
297         layer = self.proto2layer(proto)
298
299         if proto == IP_PROTOS.tcp:
300             data = b"A" * 4 + b"B" * 16 + b"C" * 3
301         else:
302             data = b"A" * 16 + b"B" * 16 + b"C" * 3
303
304         # send packet from host to server
305         pkts = self.create_stream_frag(
306             self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto
307         )
308         self.pg0.add_stream(pkts)
309         self.pg_enable_capture(self.pg_interfaces)
310         self.pg_start()
311         frags = self.pg0.get_capture(len(pkts))
312         p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr)
313         if proto != IP_PROTOS.icmp:
314             if not ignore_port:
315                 self.assertNotEqual(p[layer].sport, host_in_port)
316             self.assertEqual(p[layer].dport, server_in_port)
317         else:
318             if not ignore_port:
319                 self.assertNotEqual(p[layer].id, host_in_port)
320         self.assertEqual(data, p[Raw].load)
321
322     def frag_out_of_order(
323         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
324     ):
325         layer = self.proto2layer(proto)
326
327         if proto == IP_PROTOS.tcp:
328             data = b"A" * 4 + b"B" * 16 + b"C" * 3
329         else:
330             data = b"A" * 16 + b"B" * 16 + b"C" * 3
331         self.port_in = self.random_port()
332
333         for i in range(2):
334             # in2out
335             pkts = self.create_stream_frag(
336                 self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
337             )
338             pkts.reverse()
339             self.pg0.add_stream(pkts)
340             self.pg_enable_capture(self.pg_interfaces)
341             self.pg_start()
342             frags = self.pg1.get_capture(len(pkts))
343             if not dont_translate:
344                 p = self.reass_frags_and_verify(
345                     frags, self.nat_addr, self.pg1.remote_ip4
346                 )
347             else:
348                 p = self.reass_frags_and_verify(
349                     frags, self.pg0.remote_ip4, self.pg1.remote_ip4
350                 )
351             if proto != IP_PROTOS.icmp:
352                 if not dont_translate:
353                     self.assertEqual(p[layer].dport, 20)
354                     if not ignore_port:
355                         self.assertNotEqual(p[layer].sport, self.port_in)
356                 else:
357                     self.assertEqual(p[layer].sport, self.port_in)
358             else:
359                 if not ignore_port:
360                     if not dont_translate:
361                         self.assertNotEqual(p[layer].id, self.port_in)
362                     else:
363                         self.assertEqual(p[layer].id, self.port_in)
364             self.assertEqual(data, p[Raw].load)
365
366             # out2in
367             if not dont_translate:
368                 dst_addr = self.nat_addr
369             else:
370                 dst_addr = self.pg0.remote_ip4
371             if proto != IP_PROTOS.icmp:
372                 sport = 20
373                 dport = p[layer].sport
374             else:
375                 sport = p[layer].id
376                 dport = 0
377             pkts = self.create_stream_frag(
378                 self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
379             )
380             pkts.reverse()
381             self.pg1.add_stream(pkts)
382             self.pg_enable_capture(self.pg_interfaces)
383             self.logger.info(self.vapi.cli("show trace"))
384             self.pg_start()
385             frags = self.pg0.get_capture(len(pkts))
386             p = self.reass_frags_and_verify(
387                 frags, self.pg1.remote_ip4, self.pg0.remote_ip4
388             )
389             if proto != IP_PROTOS.icmp:
390                 self.assertEqual(p[layer].sport, 20)
391                 self.assertEqual(p[layer].dport, self.port_in)
392             else:
393                 self.assertEqual(p[layer].id, self.port_in)
394             self.assertEqual(data, p[Raw].load)
395
396     def reass_frags_and_verify(self, frags, src, dst):
397         buffer = BytesIO()
398         for p in frags:
399             self.assertEqual(p[IP].src, src)
400             self.assertEqual(p[IP].dst, dst)
401             self.assert_ip_checksum_valid(p)
402             buffer.seek(p[IP].frag * 8)
403             buffer.write(bytes(p[IP].payload))
404         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
405         if ip.proto == IP_PROTOS.tcp:
406             p = ip / TCP(buffer.getvalue())
407             self.logger.debug(ppp("Reassembled:", p))
408             self.assert_tcp_checksum_valid(p)
409         elif ip.proto == IP_PROTOS.udp:
410             p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
411         elif ip.proto == IP_PROTOS.icmp:
412             p = ip / ICMP(buffer.getvalue())
413         return p
414
415     def frag_in_order(
416         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
417     ):
418         layer = self.proto2layer(proto)
419
420         if proto == IP_PROTOS.tcp:
421             data = b"A" * 4 + b"B" * 16 + b"C" * 3
422         else:
423             data = b"A" * 16 + b"B" * 16 + b"C" * 3
424         self.port_in = self.random_port()
425
426         # in2out
427         pkts = self.create_stream_frag(
428             self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
429         )
430         self.pg0.add_stream(pkts)
431         self.pg_enable_capture(self.pg_interfaces)
432         self.pg_start()
433         frags = self.pg1.get_capture(len(pkts))
434         if not dont_translate:
435             p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
436         else:
437             p = self.reass_frags_and_verify(
438                 frags, self.pg0.remote_ip4, self.pg1.remote_ip4
439             )
440         if proto != IP_PROTOS.icmp:
441             if not dont_translate:
442                 self.assertEqual(p[layer].dport, 20)
443                 if not ignore_port:
444                     self.assertNotEqual(p[layer].sport, self.port_in)
445             else:
446                 self.assertEqual(p[layer].sport, self.port_in)
447         else:
448             if not ignore_port:
449                 if not dont_translate:
450                     self.assertNotEqual(p[layer].id, self.port_in)
451                 else:
452                     self.assertEqual(p[layer].id, self.port_in)
453         self.assertEqual(data, p[Raw].load)
454
455         # out2in
456         if not dont_translate:
457             dst_addr = self.nat_addr
458         else:
459             dst_addr = self.pg0.remote_ip4
460         if proto != IP_PROTOS.icmp:
461             sport = 20
462             dport = p[layer].sport
463         else:
464             sport = p[layer].id
465             dport = 0
466         pkts = self.create_stream_frag(
467             self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
468         )
469         self.pg1.add_stream(pkts)
470         self.pg_enable_capture(self.pg_interfaces)
471         self.pg_start()
472         frags = self.pg0.get_capture(len(pkts))
473         p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
474         if proto != IP_PROTOS.icmp:
475             self.assertEqual(p[layer].sport, 20)
476             self.assertEqual(p[layer].dport, self.port_in)
477         else:
478             self.assertEqual(p[layer].id, self.port_in)
479         self.assertEqual(data, p[Raw].load)
480
481     def verify_capture_out(
482         self, capture, nat_ip=None, same_port=False, dst_ip=None, ignore_port=False
483     ):
484         if nat_ip is None:
485             nat_ip = self.nat_addr
486         for packet in capture:
487             try:
488                 self.assert_packet_checksums_valid(packet)
489                 self.assertEqual(packet[IP].src, nat_ip)
490                 if dst_ip is not None:
491                     self.assertEqual(packet[IP].dst, dst_ip)
492                 if packet.haslayer(TCP):
493                     if not ignore_port:
494                         if same_port:
495                             self.assertEqual(packet[TCP].sport, self.tcp_port_in)
496                         else:
497                             self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
498                     self.tcp_port_out = packet[TCP].sport
499                     self.assert_packet_checksums_valid(packet)
500                 elif packet.haslayer(UDP):
501                     if not ignore_port:
502                         if same_port:
503                             self.assertEqual(packet[UDP].sport, self.udp_port_in)
504                         else:
505                             self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
506                     self.udp_port_out = packet[UDP].sport
507                 else:
508                     if not ignore_port:
509                         if same_port:
510                             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
511                         else:
512                             self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
513                     self.icmp_id_out = packet[ICMP].id
514                     self.assert_packet_checksums_valid(packet)
515             except:
516                 self.logger.error(
517                     ppp("Unexpected or invalid packet (outside network):", packet)
518                 )
519                 raise
520
521     def verify_capture_in(self, capture, in_if):
522         for packet in capture:
523             try:
524                 self.assert_packet_checksums_valid(packet)
525                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
526                 if packet.haslayer(TCP):
527                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
528                 elif packet.haslayer(UDP):
529                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
530                 else:
531                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
532             except:
533                 self.logger.error(
534                     ppp("Unexpected or invalid packet (inside network):", packet)
535                 )
536                 raise
537
538     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
539         if dst_ip is None:
540             dst_ip = out_if.remote_ip4
541
542         pkts = []
543         # TCP
544         p = (
545             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
546             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
547             / TCP(sport=self.tcp_port_in, dport=20)
548         )
549         pkts.extend([p, p])
550
551         # UDP
552         p = (
553             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
554             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
555             / UDP(sport=self.udp_port_in, dport=20)
556         )
557         pkts.append(p)
558
559         # ICMP
560         p = (
561             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
562             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
563             / ICMP(id=self.icmp_id_in, type="echo-request")
564         )
565         pkts.append(p)
566
567         return pkts
568
569     def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
570         if dst_ip is None:
571             dst_ip = self.nat_addr
572         if not use_inside_ports:
573             tcp_port = self.tcp_port_out
574             udp_port = self.udp_port_out
575             icmp_id = self.icmp_id_out
576         else:
577             tcp_port = self.tcp_port_in
578             udp_port = self.udp_port_in
579             icmp_id = self.icmp_id_in
580         pkts = []
581         # TCP
582         p = (
583             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
584             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
585             / TCP(dport=tcp_port, sport=20)
586         )
587         pkts.extend([p, p])
588
589         # UDP
590         p = (
591             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
592             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
593             / UDP(dport=udp_port, sport=20)
594         )
595         pkts.append(p)
596
597         # ICMP
598         p = (
599             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
600             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
601             / ICMP(id=icmp_id, type="echo-reply")
602         )
603         pkts.append(p)
604
605         return pkts
606
607     def create_tcp_stream(self, in_if, out_if, count):
608         pkts = []
609         port = 6303
610
611         for i in range(count):
612             p = (
613                 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
614                 / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64)
615                 / TCP(sport=port + i, dport=20)
616             )
617             pkts.append(p)
618
619         return pkts
620
621     def create_udp_stream(self, in_if, out_if, count, base_port=6303):
622         return [
623             (
624                 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
625                 / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64)
626                 / UDP(sport=base_port + i, dport=20)
627             )
628             for i in range(count)
629         ]
630
631     def create_stream_frag(
632         self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
633     ):
634         if proto == IP_PROTOS.tcp:
635             p = (
636                 IP(src=src_if.remote_ip4, dst=dst)
637                 / TCP(sport=sport, dport=dport)
638                 / Raw(data)
639             )
640             p = p.__class__(scapy.compat.raw(p))
641             chksum = p[TCP].chksum
642             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
643         elif proto == IP_PROTOS.udp:
644             proto_header = UDP(sport=sport, dport=dport)
645         elif proto == IP_PROTOS.icmp:
646             if not echo_reply:
647                 proto_header = ICMP(id=sport, type="echo-request")
648             else:
649                 proto_header = ICMP(id=sport, type="echo-reply")
650         else:
651             raise Exception("Unsupported protocol")
652         id = self.random_port()
653         pkts = []
654         if proto == IP_PROTOS.tcp:
655             raw = Raw(data[0:4])
656         else:
657             raw = Raw(data[0:16])
658         p = (
659             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
660             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
661             / proto_header
662             / raw
663         )
664         pkts.append(p)
665         if proto == IP_PROTOS.tcp:
666             raw = Raw(data[4:20])
667         else:
668             raw = Raw(data[16:32])
669         p = (
670             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
671             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
672             / raw
673         )
674         pkts.append(p)
675         if proto == IP_PROTOS.tcp:
676             raw = Raw(data[20:])
677         else:
678             raw = Raw(data[32:])
679         p = (
680             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
681             / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
682             / raw
683         )
684         pkts.append(p)
685         return pkts
686
687     def frag_in_order_in_plus_out(
688         self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
689     ):
690
691         layer = self.proto2layer(proto)
692
693         if proto == IP_PROTOS.tcp:
694             data = b"A" * 4 + b"B" * 16 + b"C" * 3
695         else:
696             data = b"A" * 16 + b"B" * 16 + b"C" * 3
697         port_in = self.random_port()
698
699         for i in range(2):
700             # out2in
701             pkts = self.create_stream_frag(
702                 self.pg0, out_addr, port_in, out_port, data, proto
703             )
704             self.pg0.add_stream(pkts)
705             self.pg_enable_capture(self.pg_interfaces)
706             self.pg_start()
707             frags = self.pg1.get_capture(len(pkts))
708             p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr)
709             if proto != IP_PROTOS.icmp:
710                 self.assertEqual(p[layer].sport, port_in)
711                 self.assertEqual(p[layer].dport, in_port)
712             else:
713                 self.assertEqual(p[layer].id, port_in)
714             self.assertEqual(data, p[Raw].load)
715
716             # in2out
717             if proto != IP_PROTOS.icmp:
718                 pkts = self.create_stream_frag(
719                     self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto
720                 )
721             else:
722                 pkts = self.create_stream_frag(
723                     self.pg1,
724                     self.pg0.remote_ip4,
725                     p[layer].id,
726                     0,
727                     data,
728                     proto,
729                     echo_reply=True,
730                 )
731             self.pg1.add_stream(pkts)
732             self.pg_enable_capture(self.pg_interfaces)
733             self.pg_start()
734             frags = self.pg0.get_capture(len(pkts))
735             p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4)
736             if proto != IP_PROTOS.icmp:
737                 self.assertEqual(p[layer].sport, out_port)
738                 self.assertEqual(p[layer].dport, port_in)
739             else:
740                 self.assertEqual(p[layer].id, port_in)
741             self.assertEqual(data, p[Raw].load)
742
743     def frag_out_of_order_in_plus_out(
744         self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
745     ):
746
747         layer = self.proto2layer(proto)
748
749         if proto == IP_PROTOS.tcp:
750             data = b"A" * 4 + b"B" * 16 + b"C" * 3
751         else:
752             data = b"A" * 16 + b"B" * 16 + b"C" * 3
753         port_in = self.random_port()
754
755         for i in range(2):
756             # out2in
757             pkts = self.create_stream_frag(
758                 self.pg0, out_addr, port_in, out_port, data, proto
759             )
760             pkts.reverse()
761             self.pg0.add_stream(pkts)
762             self.pg_enable_capture(self.pg_interfaces)
763             self.pg_start()
764             frags = self.pg1.get_capture(len(pkts))
765             p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr)
766             if proto != IP_PROTOS.icmp:
767                 self.assertEqual(p[layer].dport, in_port)
768                 self.assertEqual(p[layer].sport, port_in)
769                 self.assertEqual(p[layer].dport, in_port)
770             else:
771                 self.assertEqual(p[layer].id, port_in)
772             self.assertEqual(data, p[Raw].load)
773
774             # in2out
775             if proto != IP_PROTOS.icmp:
776                 pkts = self.create_stream_frag(
777                     self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto
778                 )
779             else:
780                 pkts = self.create_stream_frag(
781                     self.pg1,
782                     self.pg0.remote_ip4,
783                     p[layer].id,
784                     0,
785                     data,
786                     proto,
787                     echo_reply=True,
788                 )
789             pkts.reverse()
790             self.pg1.add_stream(pkts)
791             self.pg_enable_capture(self.pg_interfaces)
792             self.pg_start()
793             frags = self.pg0.get_capture(len(pkts))
794             p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4)
795             if proto != IP_PROTOS.icmp:
796                 self.assertEqual(p[layer].sport, out_port)
797                 self.assertEqual(p[layer].dport, port_in)
798             else:
799                 self.assertEqual(p[layer].id, port_in)
800             self.assertEqual(data, p[Raw].load)
801
802     def init_tcp_session(self, in_if, out_if, in_port, ext_port):
803         # SYN packet in->out
804         p = (
805             Ether(src=in_if.remote_mac, dst=in_if.local_mac)
806             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
807             / TCP(sport=in_port, dport=ext_port, flags="S")
808         )
809         in_if.add_stream(p)
810         self.pg_enable_capture(self.pg_interfaces)
811         self.pg_start()
812         capture = out_if.get_capture(1)
813         p = capture[0]
814         out_port = p[TCP].sport
815
816         # SYN + ACK packet out->in
817         p = (
818             Ether(src=out_if.remote_mac, dst=out_if.local_mac)
819             / IP(src=out_if.remote_ip4, dst=self.nat_addr)
820             / TCP(sport=ext_port, dport=out_port, flags="SA")
821         )
822         out_if.add_stream(p)
823         self.pg_enable_capture(self.pg_interfaces)
824         self.pg_start()
825         in_if.get_capture(1)
826
827         # ACK packet in->out
828         p = (
829             Ether(src=in_if.remote_mac, dst=in_if.local_mac)
830             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
831             / TCP(sport=in_port, dport=ext_port, flags="A")
832         )
833         in_if.add_stream(p)
834         self.pg_enable_capture(self.pg_interfaces)
835         self.pg_start()
836         out_if.get_capture(1)
837
838         return out_port
839
840     def twice_nat_common(
841         self, self_twice_nat=False, same_pg=False, lb=False, client_id=None
842     ):
843         twice_nat_addr = "10.0.1.3"
844
845         port_in = 8080
846         if lb:
847             if not same_pg:
848                 port_in1 = port_in
849                 port_in2 = port_in
850             else:
851                 port_in1 = port_in + 1
852                 port_in2 = port_in + 2
853
854         port_out = 80
855         eh_port_out = 4567
856
857         server1 = self.pg0.remote_hosts[0]
858         server2 = self.pg0.remote_hosts[1]
859         if lb and same_pg:
860             server2 = server1
861         if not lb:
862             server = server1
863
864         pg0 = self.pg0
865         if same_pg:
866             pg1 = self.pg0
867         else:
868             pg1 = self.pg1
869
870         eh_translate = (not self_twice_nat) or (not lb and same_pg) or client_id == 1
871
872         self.nat_add_address(self.nat_addr)
873         self.nat_add_address(twice_nat_addr, twice_nat=1)
874
875         flags = 0
876         if self_twice_nat:
877             flags |= self.config_flags.NAT_IS_SELF_TWICE_NAT
878         else:
879             flags |= self.config_flags.NAT_IS_TWICE_NAT
880
881         if not lb:
882             self.nat_add_static_mapping(
883                 pg0.remote_ip4,
884                 self.nat_addr,
885                 port_in,
886                 port_out,
887                 proto=IP_PROTOS.tcp,
888                 flags=flags,
889             )
890         else:
891             locals = [
892                 {"addr": server1.ip4, "port": port_in1, "probability": 50, "vrf_id": 0},
893                 {"addr": server2.ip4, "port": port_in2, "probability": 50, "vrf_id": 0},
894             ]
895             out_addr = self.nat_addr
896
897             self.vapi.nat44_add_del_lb_static_mapping(
898                 is_add=1,
899                 flags=flags,
900                 external_addr=out_addr,
901                 external_port=port_out,
902                 protocol=IP_PROTOS.tcp,
903                 local_num=len(locals),
904                 locals=locals,
905             )
906         self.nat_add_inside_interface(pg0)
907         self.nat_add_outside_interface(pg1)
908
909         if same_pg:
910             if not lb:
911                 client = server
912             else:
913                 assert client_id is not None
914                 if client_id == 1:
915                     client = self.pg0.remote_hosts[0]
916                 elif client_id == 2:
917                     client = self.pg0.remote_hosts[1]
918         else:
919             client = pg1.remote_hosts[0]
920         p = (
921             Ether(src=pg1.remote_mac, dst=pg1.local_mac)
922             / IP(src=client.ip4, dst=self.nat_addr)
923             / TCP(sport=eh_port_out, dport=port_out)
924         )
925         pg1.add_stream(p)
926         self.pg_enable_capture(self.pg_interfaces)
927         self.pg_start()
928         capture = pg0.get_capture(1)
929         p = capture[0]
930         try:
931             ip = p[IP]
932             tcp = p[TCP]
933             if lb:
934                 if ip.dst == server1.ip4:
935                     server = server1
936                     port_in = port_in1
937                 else:
938                     server = server2
939                     port_in = port_in2
940             self.assertEqual(ip.dst, server.ip4)
941             if lb and same_pg:
942                 self.assertIn(tcp.dport, [port_in1, port_in2])
943             else:
944                 self.assertEqual(tcp.dport, port_in)
945             if eh_translate:
946                 self.assertEqual(ip.src, twice_nat_addr)
947                 self.assertNotEqual(tcp.sport, eh_port_out)
948             else:
949                 self.assertEqual(ip.src, client.ip4)
950                 self.assertEqual(tcp.sport, eh_port_out)
951             eh_addr_in = ip.src
952             eh_port_in = tcp.sport
953             saved_port_in = tcp.dport
954             self.assert_packet_checksums_valid(p)
955         except:
956             self.logger.error(ppp("Unexpected or invalid packet:", p))
957             raise
958
959         p = (
960             Ether(src=server.mac, dst=pg0.local_mac)
961             / IP(src=server.ip4, dst=eh_addr_in)
962             / TCP(sport=saved_port_in, dport=eh_port_in)
963         )
964         pg0.add_stream(p)
965         self.pg_enable_capture(self.pg_interfaces)
966         self.pg_start()
967         capture = pg1.get_capture(1)
968         p = capture[0]
969         try:
970             ip = p[IP]
971             tcp = p[TCP]
972             self.assertEqual(ip.dst, client.ip4)
973             self.assertEqual(ip.src, self.nat_addr)
974             self.assertEqual(tcp.dport, eh_port_out)
975             self.assertEqual(tcp.sport, port_out)
976             self.assert_packet_checksums_valid(p)
977         except:
978             self.logger.error(ppp("Unexpected or invalid packet:", p))
979             raise
980
981         if eh_translate:
982             sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
983             self.assertEqual(len(sessions), 1)
984             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
985             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_TWICE_NAT)
986             self.logger.info(self.vapi.cli("show nat44 sessions"))
987             self.vapi.nat44_del_session(
988                 address=sessions[0].inside_ip_address,
989                 port=sessions[0].inside_port,
990                 protocol=sessions[0].protocol,
991                 flags=(
992                     self.config_flags.NAT_IS_INSIDE
993                     | self.config_flags.NAT_IS_EXT_HOST_VALID
994                 ),
995                 ext_host_address=sessions[0].ext_host_nat_address,
996                 ext_host_port=sessions[0].ext_host_nat_port,
997             )
998             sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
999             self.assertEqual(len(sessions), 0)
1000
1001     def verify_syslog_sess(self, data, msgid, is_ip6=False):
1002         message = data.decode("utf-8")
1003         try:
1004             message = SyslogMessage.parse(message)
1005         except ParseError as e:
1006             self.logger.error(e)
1007             raise
1008         else:
1009             self.assertEqual(message.severity, SyslogSeverity.info)
1010             self.assertEqual(message.appname, "NAT")
1011             self.assertEqual(message.msgid, msgid)
1012             sd_params = message.sd.get("nsess")
1013             self.assertTrue(sd_params is not None)
1014             if is_ip6:
1015                 self.assertEqual(sd_params.get("IATYP"), "IPv6")
1016                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip6)
1017             else:
1018                 self.assertEqual(sd_params.get("IATYP"), "IPv4")
1019                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
1020                 self.assertTrue(sd_params.get("SSUBIX") is not None)
1021             self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
1022             self.assertEqual(sd_params.get("XATYP"), "IPv4")
1023             self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
1024             self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
1025             self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
1026             self.assertEqual(sd_params.get("SVLAN"), "0")
1027             self.assertEqual(sd_params.get("XDADDR"), self.pg1.remote_ip4)
1028             self.assertEqual(sd_params.get("XDPORT"), "%d" % self.tcp_external_port)
1029
1030     def test_icmp_error(self):
1031         """NAT44ED test ICMP error message with inner header"""
1032
1033         payload = "H" * 10
1034
1035         self.nat_add_address(self.nat_addr)
1036         self.nat_add_inside_interface(self.pg0)
1037         self.nat_add_outside_interface(self.pg1)
1038
1039         # in2out (initiate connection)
1040         p1 = [
1041             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1042             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1043             / UDP(sport=21, dport=20)
1044             / payload,
1045             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1046             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1047             / TCP(sport=21, dport=20, flags="S")
1048             / payload,
1049             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1050             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1051             / ICMP(type="echo-request", id=7777)
1052             / payload,
1053         ]
1054
1055         capture = self.send_and_expect(self.pg0, p1, self.pg1)
1056
1057         # out2in (send error message)
1058         p2 = [
1059             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1060             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1061             / ICMP(type="dest-unreach", code="port-unreachable")
1062             / c[IP:]
1063             for c in capture
1064         ]
1065
1066         capture = self.send_and_expect(self.pg1, p2, self.pg0)
1067
1068         for c in capture:
1069             try:
1070                 assert c[IP].dst == self.pg0.remote_ip4
1071                 assert c[IPerror].src == self.pg0.remote_ip4
1072             except AssertionError as a:
1073                 raise AssertionError(f"Packet {pr(c)} not translated properly") from a
1074
1075     def test_icmp_echo_reply_trailer(self):
1076         """ICMP echo reply with ethernet trailer"""
1077
1078         self.nat_add_address(self.nat_addr)
1079         self.nat_add_inside_interface(self.pg0)
1080         self.nat_add_outside_interface(self.pg1)
1081
1082         # in2out
1083         p1 = (
1084             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1085             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1086             / ICMP(type=8, id=0xABCD, seq=0)
1087         )
1088
1089         self.pg0.add_stream(p1)
1090         self.pg_enable_capture(self.pg_interfaces)
1091         self.pg_start()
1092         c = self.pg1.get_capture(1)[0]
1093
1094         self.logger.debug(self.vapi.cli("show trace"))
1095
1096         # out2in
1097         p2 = (
1098             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1099             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, id=0xEE59)
1100             / ICMP(type=0, id=c[ICMP].id, seq=0)
1101         )
1102
1103         # force checksum calculation
1104         p2 = p2.__class__(bytes(p2))
1105
1106         self.logger.debug(ppp("Packet before modification:", p2))
1107
1108         # hex representation of vss monitoring ethernet trailer
1109         # this seems to be just added to end of packet without modifying
1110         # IP or ICMP lengths / checksums
1111         p2 = p2 / Raw("\x00\x00\x52\x54\x00\x46\xab\x04\x84\x18")
1112         # change it so that IP/ICMP is unaffected
1113         p2[IP].len = 28
1114
1115         self.logger.debug(ppp("Packet with added trailer:", p2))
1116
1117         self.pg1.add_stream(p2)
1118         self.pg_enable_capture(self.pg_interfaces)
1119         self.pg_start()
1120
1121         self.pg0.get_capture(1)
1122
1123     def test_users_dump(self):
1124         """NAT44ED API test - nat44_user_dump"""
1125
1126         self.nat_add_address(self.nat_addr)
1127         self.nat_add_inside_interface(self.pg0)
1128         self.nat_add_outside_interface(self.pg1)
1129
1130         self.vapi.nat44_forwarding_enable_disable(enable=1)
1131
1132         local_ip = self.pg0.remote_ip4
1133         external_ip = self.nat_addr
1134         self.nat_add_static_mapping(local_ip, external_ip)
1135
1136         users = self.vapi.nat44_user_dump()
1137         self.assertEqual(len(users), 0)
1138
1139         # in2out - static mapping match
1140
1141         pkts = self.create_stream_out(self.pg1)
1142         self.pg1.add_stream(pkts)
1143         self.pg_enable_capture(self.pg_interfaces)
1144         self.pg_start()
1145         capture = self.pg0.get_capture(len(pkts))
1146         self.verify_capture_in(capture, self.pg0)
1147
1148         pkts = self.create_stream_in(self.pg0, self.pg1)
1149         self.pg0.add_stream(pkts)
1150         self.pg_enable_capture(self.pg_interfaces)
1151         self.pg_start()
1152         capture = self.pg1.get_capture(len(pkts))
1153         self.verify_capture_out(capture, same_port=True)
1154
1155         users = self.vapi.nat44_user_dump()
1156         self.assertEqual(len(users), 1)
1157         static_user = users[0]
1158         self.assertEqual(static_user.nstaticsessions, 3)
1159         self.assertEqual(static_user.nsessions, 0)
1160
1161         # in2out - no static mapping match (forwarding test)
1162
1163         host0 = self.pg0.remote_hosts[0]
1164         self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1165         try:
1166             pkts = self.create_stream_out(
1167                 self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1168             )
1169             self.pg1.add_stream(pkts)
1170             self.pg_enable_capture(self.pg_interfaces)
1171             self.pg_start()
1172             capture = self.pg0.get_capture(len(pkts))
1173             self.verify_capture_in(capture, self.pg0)
1174
1175             pkts = self.create_stream_in(self.pg0, self.pg1)
1176             self.pg0.add_stream(pkts)
1177             self.pg_enable_capture(self.pg_interfaces)
1178             self.pg_start()
1179             capture = self.pg1.get_capture(len(pkts))
1180             self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, same_port=True)
1181         finally:
1182             self.pg0.remote_hosts[0] = host0
1183
1184         users = self.vapi.nat44_user_dump()
1185         self.assertEqual(len(users), 2)
1186         if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4:
1187             non_static_user = users[1]
1188             static_user = users[0]
1189         else:
1190             non_static_user = users[0]
1191             static_user = users[1]
1192         self.assertEqual(static_user.nstaticsessions, 3)
1193         self.assertEqual(static_user.nsessions, 0)
1194         self.assertEqual(non_static_user.nstaticsessions, 0)
1195         self.assertEqual(non_static_user.nsessions, 3)
1196
1197         users = self.vapi.nat44_user_dump()
1198         self.assertEqual(len(users), 2)
1199         if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4:
1200             non_static_user = users[1]
1201             static_user = users[0]
1202         else:
1203             non_static_user = users[0]
1204             static_user = users[1]
1205         self.assertEqual(static_user.nstaticsessions, 3)
1206         self.assertEqual(static_user.nsessions, 0)
1207         self.assertEqual(non_static_user.nstaticsessions, 0)
1208         self.assertEqual(non_static_user.nsessions, 3)
1209
1210     def test_frag_out_of_order_do_not_translate(self):
1211         """NAT44ED don't translate fragments arriving out of order"""
1212         self.nat_add_inside_interface(self.pg0)
1213         self.nat_add_outside_interface(self.pg1)
1214         self.vapi.nat44_forwarding_enable_disable(enable=True)
1215         self.frag_out_of_order(proto=IP_PROTOS.tcp, dont_translate=True)
1216
1217     def test_forwarding(self):
1218         """NAT44ED forwarding test"""
1219
1220         self.nat_add_inside_interface(self.pg0)
1221         self.nat_add_outside_interface(self.pg1)
1222         self.vapi.nat44_forwarding_enable_disable(enable=1)
1223
1224         real_ip = self.pg0.remote_ip4
1225         alias_ip = self.nat_addr
1226         flags = self.config_flags.NAT_IS_ADDR_ONLY
1227         self.vapi.nat44_add_del_static_mapping(
1228             is_add=1,
1229             local_ip_address=real_ip,
1230             external_ip_address=alias_ip,
1231             external_sw_if_index=0xFFFFFFFF,
1232             flags=flags,
1233         )
1234
1235         try:
1236             # in2out - static mapping match
1237
1238             pkts = self.create_stream_out(self.pg1)
1239             self.pg1.add_stream(pkts)
1240             self.pg_enable_capture(self.pg_interfaces)
1241             self.pg_start()
1242             capture = self.pg0.get_capture(len(pkts))
1243             self.verify_capture_in(capture, self.pg0)
1244
1245             pkts = self.create_stream_in(self.pg0, self.pg1)
1246             self.pg0.add_stream(pkts)
1247             self.pg_enable_capture(self.pg_interfaces)
1248             self.pg_start()
1249             capture = self.pg1.get_capture(len(pkts))
1250             self.verify_capture_out(capture, same_port=True)
1251
1252             # in2out - no static mapping match
1253
1254             host0 = self.pg0.remote_hosts[0]
1255             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1256             try:
1257                 pkts = self.create_stream_out(
1258                     self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1259                 )
1260                 self.pg1.add_stream(pkts)
1261                 self.pg_enable_capture(self.pg_interfaces)
1262                 self.pg_start()
1263                 capture = self.pg0.get_capture(len(pkts))
1264                 self.verify_capture_in(capture, self.pg0)
1265
1266                 pkts = self.create_stream_in(self.pg0, self.pg1)
1267                 self.pg0.add_stream(pkts)
1268                 self.pg_enable_capture(self.pg_interfaces)
1269                 self.pg_start()
1270                 capture = self.pg1.get_capture(len(pkts))
1271                 self.verify_capture_out(
1272                     capture, nat_ip=self.pg0.remote_ip4, same_port=True
1273                 )
1274             finally:
1275                 self.pg0.remote_hosts[0] = host0
1276
1277             user = self.pg0.remote_hosts[1]
1278             sessions = self.vapi.nat44_user_session_dump(user.ip4, 0)
1279             self.assertEqual(len(sessions), 3)
1280             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1281             self.vapi.nat44_del_session(
1282                 address=sessions[0].inside_ip_address,
1283                 port=sessions[0].inside_port,
1284                 protocol=sessions[0].protocol,
1285                 flags=(
1286                     self.config_flags.NAT_IS_INSIDE
1287                     | self.config_flags.NAT_IS_EXT_HOST_VALID
1288                 ),
1289                 ext_host_address=sessions[0].ext_host_address,
1290                 ext_host_port=sessions[0].ext_host_port,
1291             )
1292             sessions = self.vapi.nat44_user_session_dump(user.ip4, 0)
1293             self.assertEqual(len(sessions), 2)
1294
1295         finally:
1296             self.vapi.nat44_forwarding_enable_disable(enable=0)
1297             flags = self.config_flags.NAT_IS_ADDR_ONLY
1298             self.vapi.nat44_add_del_static_mapping(
1299                 is_add=0,
1300                 local_ip_address=real_ip,
1301                 external_ip_address=alias_ip,
1302                 external_sw_if_index=0xFFFFFFFF,
1303                 flags=flags,
1304             )
1305
1306     def test_output_feature_and_service2(self):
1307         """NAT44ED interface output feature and service host direct access"""
1308         self.vapi.nat44_forwarding_enable_disable(enable=1)
1309         self.nat_add_address(self.nat_addr)
1310
1311         self.vapi.nat44_ed_add_del_output_interface(
1312             sw_if_index=self.pg1.sw_if_index, is_add=1
1313         )
1314
1315         # session initiated from service host - translate
1316         pkts = self.create_stream_in(self.pg0, self.pg1)
1317         self.pg0.add_stream(pkts)
1318         self.pg_enable_capture(self.pg_interfaces)
1319         self.pg_start()
1320         capture = self.pg1.get_capture(len(pkts))
1321         self.verify_capture_out(capture, ignore_port=True)
1322
1323         pkts = self.create_stream_out(self.pg1)
1324         self.pg1.add_stream(pkts)
1325         self.pg_enable_capture(self.pg_interfaces)
1326         self.pg_start()
1327         capture = self.pg0.get_capture(len(pkts))
1328         self.verify_capture_in(capture, self.pg0)
1329
1330         # session initiated from remote host - do not translate
1331         tcp_port_in = self.tcp_port_in
1332         udp_port_in = self.udp_port_in
1333         icmp_id_in = self.icmp_id_in
1334
1335         self.tcp_port_in = 60303
1336         self.udp_port_in = 60304
1337         self.icmp_id_in = 60305
1338
1339         try:
1340             pkts = self.create_stream_out(
1341                 self.pg1, self.pg0.remote_ip4, use_inside_ports=True
1342             )
1343             self.pg1.add_stream(pkts)
1344             self.pg_enable_capture(self.pg_interfaces)
1345             self.pg_start()
1346             capture = self.pg0.get_capture(len(pkts))
1347             self.verify_capture_in(capture, self.pg0)
1348
1349             pkts = self.create_stream_in(self.pg0, self.pg1)
1350             self.pg0.add_stream(pkts)
1351             self.pg_enable_capture(self.pg_interfaces)
1352             self.pg_start()
1353             capture = self.pg1.get_capture(len(pkts))
1354             self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, same_port=True)
1355         finally:
1356             self.tcp_port_in = tcp_port_in
1357             self.udp_port_in = udp_port_in
1358             self.icmp_id_in = icmp_id_in
1359
1360     def test_twice_nat(self):
1361         """NAT44ED Twice NAT"""
1362         self.twice_nat_common()
1363
1364     def test_self_twice_nat_positive(self):
1365         """NAT44ED Self Twice NAT (positive test)"""
1366         self.twice_nat_common(self_twice_nat=True, same_pg=True)
1367
1368     def test_self_twice_nat_lb_positive(self):
1369         """NAT44ED Self Twice NAT local service load balancing (positive test)"""
1370         self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, client_id=1)
1371
1372     def test_twice_nat_lb(self):
1373         """NAT44ED Twice NAT local service load balancing"""
1374         self.twice_nat_common(lb=True)
1375
1376     def test_output_feature(self):
1377         """NAT44ED interface output feature (in2out postrouting)"""
1378         self.vapi.nat44_forwarding_enable_disable(enable=1)
1379         self.nat_add_address(self.nat_addr)
1380
1381         self.nat_add_outside_interface(self.pg0)
1382         self.vapi.nat44_ed_add_del_output_interface(
1383             sw_if_index=self.pg1.sw_if_index, is_add=1
1384         )
1385
1386         # in2out
1387         pkts = self.create_stream_in(self.pg0, self.pg1)
1388         self.pg0.add_stream(pkts)
1389         self.pg_enable_capture(self.pg_interfaces)
1390         self.pg_start()
1391         capture = self.pg1.get_capture(len(pkts))
1392         self.verify_capture_out(capture, ignore_port=True)
1393         self.logger.debug(self.vapi.cli("show trace"))
1394
1395         # out2in
1396         pkts = self.create_stream_out(self.pg1)
1397         self.pg1.add_stream(pkts)
1398         self.pg_enable_capture(self.pg_interfaces)
1399         self.pg_start()
1400         capture = self.pg0.get_capture(len(pkts))
1401         self.verify_capture_in(capture, self.pg0)
1402         self.logger.debug(self.vapi.cli("show trace"))
1403
1404         # in2out
1405         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1406         self.pg0.add_stream(pkts)
1407         self.pg_enable_capture(self.pg_interfaces)
1408         self.pg_start()
1409         capture = self.pg1.get_capture(len(pkts))
1410         self.verify_capture_out(capture, ignore_port=True)
1411         self.logger.debug(self.vapi.cli("show trace"))
1412
1413         # out2in
1414         pkts = self.create_stream_out(self.pg1, ttl=2)
1415         self.pg1.add_stream(pkts)
1416         self.pg_enable_capture(self.pg_interfaces)
1417         self.pg_start()
1418         capture = self.pg0.get_capture(len(pkts))
1419         self.verify_capture_in(capture, self.pg0)
1420         self.logger.debug(self.vapi.cli("show trace"))
1421
1422         # in2out
1423         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1424         capture = self.send_and_expect_some(self.pg0, pkts, self.pg0)
1425         for p in capture:
1426             self.assertIn(ICMP, p)
1427             self.assertEqual(p[ICMP].type, 11)  # 11 == time-exceeded
1428
1429     def test_static_with_port_out2(self):
1430         """NAT44ED 1:1 NAPT asymmetrical rule"""
1431
1432         external_port = 80
1433         local_port = 8080
1434
1435         self.vapi.nat44_forwarding_enable_disable(enable=1)
1436         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1437         self.nat_add_static_mapping(
1438             self.pg0.remote_ip4,
1439             self.nat_addr,
1440             local_port,
1441             external_port,
1442             proto=IP_PROTOS.tcp,
1443             flags=flags,
1444         )
1445
1446         self.nat_add_inside_interface(self.pg0)
1447         self.nat_add_outside_interface(self.pg1)
1448
1449         # from client to service
1450         p = (
1451             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1452             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1453             / TCP(sport=12345, dport=external_port)
1454         )
1455         self.pg1.add_stream(p)
1456         self.pg_enable_capture(self.pg_interfaces)
1457         self.pg_start()
1458         capture = self.pg0.get_capture(1)
1459         p = capture[0]
1460         try:
1461             ip = p[IP]
1462             tcp = p[TCP]
1463             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1464             self.assertEqual(tcp.dport, local_port)
1465             self.assert_packet_checksums_valid(p)
1466         except:
1467             self.logger.error(ppp("Unexpected or invalid packet:", p))
1468             raise
1469
1470         # ICMP error
1471         p = (
1472             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1473             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1474             / ICMP(type=11)
1475             / capture[0][IP]
1476         )
1477         self.pg0.add_stream(p)
1478         self.pg_enable_capture(self.pg_interfaces)
1479         self.pg_start()
1480         capture = self.pg1.get_capture(1)
1481         p = capture[0]
1482         try:
1483             self.assertEqual(p[IP].src, self.nat_addr)
1484             inner = p[IPerror]
1485             self.assertEqual(inner.dst, self.nat_addr)
1486             self.assertEqual(inner[TCPerror].dport, external_port)
1487         except:
1488             self.logger.error(ppp("Unexpected or invalid packet:", p))
1489             raise
1490
1491         # from service back to client
1492         p = (
1493             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1494             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1495             / TCP(sport=local_port, dport=12345)
1496         )
1497         self.pg0.add_stream(p)
1498         self.pg_enable_capture(self.pg_interfaces)
1499         self.pg_start()
1500         capture = self.pg1.get_capture(1)
1501         p = capture[0]
1502         try:
1503             ip = p[IP]
1504             tcp = p[TCP]
1505             self.assertEqual(ip.src, self.nat_addr)
1506             self.assertEqual(tcp.sport, external_port)
1507             self.assert_packet_checksums_valid(p)
1508         except:
1509             self.logger.error(ppp("Unexpected or invalid packet:", p))
1510             raise
1511
1512         # ICMP error
1513         p = (
1514             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1515             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1516             / ICMP(type=11)
1517             / capture[0][IP]
1518         )
1519         self.pg1.add_stream(p)
1520         self.pg_enable_capture(self.pg_interfaces)
1521         self.pg_start()
1522         capture = self.pg0.get_capture(1)
1523         p = capture[0]
1524         try:
1525             self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1526             inner = p[IPerror]
1527             self.assertEqual(inner.src, self.pg0.remote_ip4)
1528             self.assertEqual(inner[TCPerror].sport, local_port)
1529         except:
1530             self.logger.error(ppp("Unexpected or invalid packet:", p))
1531             raise
1532
1533         # from client to server (no translation)
1534         p = (
1535             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1536             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
1537             / TCP(sport=12346, dport=local_port)
1538         )
1539         self.pg1.add_stream(p)
1540         self.pg_enable_capture(self.pg_interfaces)
1541         self.pg_start()
1542         capture = self.pg0.get_capture(1)
1543         p = capture[0]
1544         try:
1545             ip = p[IP]
1546             tcp = p[TCP]
1547             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1548             self.assertEqual(tcp.dport, local_port)
1549             self.assert_packet_checksums_valid(p)
1550         except:
1551             self.logger.error(ppp("Unexpected or invalid packet:", p))
1552             raise
1553
1554         # from service back to client (no translation)
1555         p = (
1556             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1557             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1558             / TCP(sport=local_port, dport=12346)
1559         )
1560         self.pg0.add_stream(p)
1561         self.pg_enable_capture(self.pg_interfaces)
1562         self.pg_start()
1563         capture = self.pg1.get_capture(1)
1564         p = capture[0]
1565         try:
1566             ip = p[IP]
1567             tcp = p[TCP]
1568             self.assertEqual(ip.src, self.pg0.remote_ip4)
1569             self.assertEqual(tcp.sport, local_port)
1570             self.assert_packet_checksums_valid(p)
1571         except:
1572             self.logger.error(ppp("Unexpected or invalid packet:", p))
1573             raise
1574
1575     def test_static_lb(self):
1576         """NAT44ED local service load balancing"""
1577         external_addr_n = self.nat_addr
1578         external_port = 80
1579         local_port = 8080
1580         server1 = self.pg0.remote_hosts[0]
1581         server2 = self.pg0.remote_hosts[1]
1582
1583         locals = [
1584             {"addr": server1.ip4, "port": local_port, "probability": 70, "vrf_id": 0},
1585             {"addr": server2.ip4, "port": local_port, "probability": 30, "vrf_id": 0},
1586         ]
1587
1588         self.nat_add_address(self.nat_addr)
1589         self.vapi.nat44_add_del_lb_static_mapping(
1590             is_add=1,
1591             external_addr=external_addr_n,
1592             external_port=external_port,
1593             protocol=IP_PROTOS.tcp,
1594             local_num=len(locals),
1595             locals=locals,
1596         )
1597         flags = self.config_flags.NAT_IS_INSIDE
1598         self.vapi.nat44_interface_add_del_feature(
1599             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1600         )
1601         self.vapi.nat44_interface_add_del_feature(
1602             sw_if_index=self.pg1.sw_if_index, is_add=1
1603         )
1604
1605         # from client to service
1606         p = (
1607             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1608             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1609             / TCP(sport=12345, dport=external_port)
1610         )
1611         self.pg1.add_stream(p)
1612         self.pg_enable_capture(self.pg_interfaces)
1613         self.pg_start()
1614         capture = self.pg0.get_capture(1)
1615         p = capture[0]
1616         server = None
1617         try:
1618             ip = p[IP]
1619             tcp = p[TCP]
1620             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1621             if ip.dst == server1.ip4:
1622                 server = server1
1623             else:
1624                 server = server2
1625             self.assertEqual(tcp.dport, local_port)
1626             self.assert_packet_checksums_valid(p)
1627         except:
1628             self.logger.error(ppp("Unexpected or invalid packet:", p))
1629             raise
1630
1631         # from service back to client
1632         p = (
1633             Ether(src=server.mac, dst=self.pg0.local_mac)
1634             / IP(src=server.ip4, dst=self.pg1.remote_ip4)
1635             / TCP(sport=local_port, dport=12345)
1636         )
1637         self.pg0.add_stream(p)
1638         self.pg_enable_capture(self.pg_interfaces)
1639         self.pg_start()
1640         capture = self.pg1.get_capture(1)
1641         p = capture[0]
1642         try:
1643             ip = p[IP]
1644             tcp = p[TCP]
1645             self.assertEqual(ip.src, self.nat_addr)
1646             self.assertEqual(tcp.sport, external_port)
1647             self.assert_packet_checksums_valid(p)
1648         except:
1649             self.logger.error(ppp("Unexpected or invalid packet:", p))
1650             raise
1651
1652         sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
1653         self.assertEqual(len(sessions), 1)
1654         self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1655         self.vapi.nat44_del_session(
1656             address=sessions[0].inside_ip_address,
1657             port=sessions[0].inside_port,
1658             protocol=sessions[0].protocol,
1659             flags=(
1660                 self.config_flags.NAT_IS_INSIDE
1661                 | self.config_flags.NAT_IS_EXT_HOST_VALID
1662             ),
1663             ext_host_address=sessions[0].ext_host_address,
1664             ext_host_port=sessions[0].ext_host_port,
1665         )
1666         sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
1667         self.assertEqual(len(sessions), 0)
1668
1669     def test_static_lb_2(self):
1670         """NAT44ED local service load balancing (asymmetrical rule)"""
1671         external_addr = self.nat_addr
1672         external_port = 80
1673         local_port = 8080
1674         server1 = self.pg0.remote_hosts[0]
1675         server2 = self.pg0.remote_hosts[1]
1676
1677         locals = [
1678             {"addr": server1.ip4, "port": local_port, "probability": 70, "vrf_id": 0},
1679             {"addr": server2.ip4, "port": local_port, "probability": 30, "vrf_id": 0},
1680         ]
1681
1682         self.vapi.nat44_forwarding_enable_disable(enable=1)
1683         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1684         self.vapi.nat44_add_del_lb_static_mapping(
1685             is_add=1,
1686             flags=flags,
1687             external_addr=external_addr,
1688             external_port=external_port,
1689             protocol=IP_PROTOS.tcp,
1690             local_num=len(locals),
1691             locals=locals,
1692         )
1693         flags = self.config_flags.NAT_IS_INSIDE
1694         self.vapi.nat44_interface_add_del_feature(
1695             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1696         )
1697         self.vapi.nat44_interface_add_del_feature(
1698             sw_if_index=self.pg1.sw_if_index, is_add=1
1699         )
1700
1701         # from client to service
1702         p = (
1703             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1704             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1705             / TCP(sport=12345, dport=external_port)
1706         )
1707         self.pg1.add_stream(p)
1708         self.pg_enable_capture(self.pg_interfaces)
1709         self.pg_start()
1710         capture = self.pg0.get_capture(1)
1711         p = capture[0]
1712         server = None
1713         try:
1714             ip = p[IP]
1715             tcp = p[TCP]
1716             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1717             if ip.dst == server1.ip4:
1718                 server = server1
1719             else:
1720                 server = server2
1721             self.assertEqual(tcp.dport, local_port)
1722             self.assert_packet_checksums_valid(p)
1723         except:
1724             self.logger.error(ppp("Unexpected or invalid packet:", p))
1725             raise
1726
1727         # from service back to client
1728         p = (
1729             Ether(src=server.mac, dst=self.pg0.local_mac)
1730             / IP(src=server.ip4, dst=self.pg1.remote_ip4)
1731             / TCP(sport=local_port, dport=12345)
1732         )
1733         self.pg0.add_stream(p)
1734         self.pg_enable_capture(self.pg_interfaces)
1735         self.pg_start()
1736         capture = self.pg1.get_capture(1)
1737         p = capture[0]
1738         try:
1739             ip = p[IP]
1740             tcp = p[TCP]
1741             self.assertEqual(ip.src, self.nat_addr)
1742             self.assertEqual(tcp.sport, external_port)
1743             self.assert_packet_checksums_valid(p)
1744         except:
1745             self.logger.error(ppp("Unexpected or invalid packet:", p))
1746             raise
1747
1748         # from client to server (no translation)
1749         p = (
1750             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1751             / IP(src=self.pg1.remote_ip4, dst=server1.ip4)
1752             / TCP(sport=12346, dport=local_port)
1753         )
1754         self.pg1.add_stream(p)
1755         self.pg_enable_capture(self.pg_interfaces)
1756         self.pg_start()
1757         capture = self.pg0.get_capture(1)
1758         p = capture[0]
1759         server = None
1760         try:
1761             ip = p[IP]
1762             tcp = p[TCP]
1763             self.assertEqual(ip.dst, server1.ip4)
1764             self.assertEqual(tcp.dport, local_port)
1765             self.assert_packet_checksums_valid(p)
1766         except:
1767             self.logger.error(ppp("Unexpected or invalid packet:", p))
1768             raise
1769
1770         # from service back to client (no translation)
1771         p = (
1772             Ether(src=server1.mac, dst=self.pg0.local_mac)
1773             / IP(src=server1.ip4, dst=self.pg1.remote_ip4)
1774             / TCP(sport=local_port, dport=12346)
1775         )
1776         self.pg0.add_stream(p)
1777         self.pg_enable_capture(self.pg_interfaces)
1778         self.pg_start()
1779         capture = self.pg1.get_capture(1)
1780         p = capture[0]
1781         try:
1782             ip = p[IP]
1783             tcp = p[TCP]
1784             self.assertEqual(ip.src, server1.ip4)
1785             self.assertEqual(tcp.sport, local_port)
1786             self.assert_packet_checksums_valid(p)
1787         except:
1788             self.logger.error(ppp("Unexpected or invalid packet:", p))
1789             raise
1790
1791     def test_lb_affinity(self):
1792         """NAT44ED local service load balancing affinity"""
1793         external_addr = self.nat_addr
1794         external_port = 80
1795         local_port = 8080
1796         server1 = self.pg0.remote_hosts[0]
1797         server2 = self.pg0.remote_hosts[1]
1798
1799         locals = [
1800             {"addr": server1.ip4, "port": local_port, "probability": 50, "vrf_id": 0},
1801             {"addr": server2.ip4, "port": local_port, "probability": 50, "vrf_id": 0},
1802         ]
1803
1804         self.nat_add_address(self.nat_addr)
1805         self.vapi.nat44_add_del_lb_static_mapping(
1806             is_add=1,
1807             external_addr=external_addr,
1808             external_port=external_port,
1809             protocol=IP_PROTOS.tcp,
1810             affinity=10800,
1811             local_num=len(locals),
1812             locals=locals,
1813         )
1814         flags = self.config_flags.NAT_IS_INSIDE
1815         self.vapi.nat44_interface_add_del_feature(
1816             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1817         )
1818         self.vapi.nat44_interface_add_del_feature(
1819             sw_if_index=self.pg1.sw_if_index, is_add=1
1820         )
1821
1822         p = (
1823             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1824             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1825             / TCP(sport=1025, dport=external_port)
1826         )
1827         self.pg1.add_stream(p)
1828         self.pg_enable_capture(self.pg_interfaces)
1829         self.pg_start()
1830         capture = self.pg0.get_capture(1)
1831         backend = capture[0][IP].dst
1832
1833         sessions = self.vapi.nat44_user_session_dump(backend, 0)
1834         self.assertEqual(len(sessions), 1)
1835         self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1836         self.vapi.nat44_del_session(
1837             address=sessions[0].inside_ip_address,
1838             port=sessions[0].inside_port,
1839             protocol=sessions[0].protocol,
1840             flags=(
1841                 self.config_flags.NAT_IS_INSIDE
1842                 | self.config_flags.NAT_IS_EXT_HOST_VALID
1843             ),
1844             ext_host_address=sessions[0].ext_host_address,
1845             ext_host_port=sessions[0].ext_host_port,
1846         )
1847
1848         pkts = []
1849         for port in range(1030, 1100):
1850             p = (
1851                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1852                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1853                 / TCP(sport=port, dport=external_port)
1854             )
1855             pkts.append(p)
1856         self.pg1.add_stream(pkts)
1857         self.pg_enable_capture(self.pg_interfaces)
1858         self.pg_start()
1859         capture = self.pg0.get_capture(len(pkts))
1860         for p in capture:
1861             self.assertEqual(p[IP].dst, backend)
1862
1863     def test_multiple_vrf_1(self):
1864         """Multiple VRF - both client & service in VRF1"""
1865
1866         external_addr = "1.2.3.4"
1867         external_port = 80
1868         local_port = 8080
1869         port = 0
1870
1871         flags = self.config_flags.NAT_IS_INSIDE
1872         self.vapi.nat44_interface_add_del_feature(
1873             sw_if_index=self.pg5.sw_if_index, is_add=1
1874         )
1875         self.vapi.nat44_interface_add_del_feature(
1876             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
1877         )
1878         self.vapi.nat44_interface_add_del_feature(
1879             sw_if_index=self.pg6.sw_if_index, is_add=1
1880         )
1881         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1882         self.nat_add_static_mapping(
1883             self.pg5.remote_ip4,
1884             external_addr,
1885             local_port,
1886             external_port,
1887             vrf_id=1,
1888             proto=IP_PROTOS.tcp,
1889             flags=flags,
1890         )
1891
1892         p = (
1893             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
1894             / IP(src=self.pg6.remote_ip4, dst=external_addr)
1895             / TCP(sport=12345, dport=external_port)
1896         )
1897         self.pg6.add_stream(p)
1898         self.pg_enable_capture(self.pg_interfaces)
1899         self.pg_start()
1900         capture = self.pg5.get_capture(1)
1901         p = capture[0]
1902         try:
1903             ip = p[IP]
1904             tcp = p[TCP]
1905             self.assertEqual(ip.dst, self.pg5.remote_ip4)
1906             self.assertEqual(tcp.dport, local_port)
1907             self.assert_packet_checksums_valid(p)
1908         except:
1909             self.logger.error(ppp("Unexpected or invalid packet:", p))
1910             raise
1911
1912         p = (
1913             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
1914             / IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4)
1915             / TCP(sport=local_port, dport=12345)
1916         )
1917         self.pg5.add_stream(p)
1918         self.pg_enable_capture(self.pg_interfaces)
1919         self.pg_start()
1920         capture = self.pg6.get_capture(1)
1921         p = capture[0]
1922         try:
1923             ip = p[IP]
1924             tcp = p[TCP]
1925             self.assertEqual(ip.src, external_addr)
1926             self.assertEqual(tcp.sport, external_port)
1927             self.assert_packet_checksums_valid(p)
1928         except:
1929             self.logger.error(ppp("Unexpected or invalid packet:", p))
1930             raise
1931
1932     def test_multiple_vrf_2(self):
1933         """Multiple VRF - dynamic NAT from VRF1 to VRF0 (output-feature)"""
1934
1935         external_addr = "1.2.3.4"
1936         external_port = 80
1937         local_port = 8080
1938         port = 0
1939
1940         self.nat_add_address(self.nat_addr)
1941         flags = self.config_flags.NAT_IS_INSIDE
1942         self.vapi.nat44_ed_add_del_output_interface(
1943             sw_if_index=self.pg1.sw_if_index, is_add=1
1944         )
1945         self.vapi.nat44_interface_add_del_feature(
1946             sw_if_index=self.pg5.sw_if_index, is_add=1
1947         )
1948         self.vapi.nat44_interface_add_del_feature(
1949             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
1950         )
1951         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1952         self.nat_add_static_mapping(
1953             self.pg5.remote_ip4,
1954             external_addr,
1955             local_port,
1956             external_port,
1957             vrf_id=1,
1958             proto=IP_PROTOS.tcp,
1959             flags=flags,
1960         )
1961
1962         p = (
1963             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
1964             / IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4)
1965             / TCP(sport=2345, dport=22)
1966         )
1967         self.pg5.add_stream(p)
1968         self.pg_enable_capture(self.pg_interfaces)
1969         self.pg_start()
1970         capture = self.pg1.get_capture(1)
1971         p = capture[0]
1972         try:
1973             ip = p[IP]
1974             tcp = p[TCP]
1975             self.assertEqual(ip.src, self.nat_addr)
1976             self.assert_packet_checksums_valid(p)
1977             port = tcp.sport
1978         except:
1979             self.logger.error(ppp("Unexpected or invalid packet:", p))
1980             raise
1981
1982         p = (
1983             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1984             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1985             / TCP(sport=22, dport=port)
1986         )
1987         self.pg1.add_stream(p)
1988         self.pg_enable_capture(self.pg_interfaces)
1989         self.pg_start()
1990         capture = self.pg5.get_capture(1)
1991         p = capture[0]
1992         try:
1993             ip = p[IP]
1994             tcp = p[TCP]
1995             self.assertEqual(ip.dst, self.pg5.remote_ip4)
1996             self.assertEqual(tcp.dport, 2345)
1997             self.assert_packet_checksums_valid(p)
1998         except:
1999             self.logger.error(ppp("Unexpected or invalid packet:", p))
2000             raise
2001
2002     def test_multiple_vrf_3(self):
2003         """Multiple VRF - client in VRF1, service in VRF0"""
2004
2005         external_addr = "1.2.3.4"
2006         external_port = 80
2007         local_port = 8080
2008         port = 0
2009
2010         flags = self.config_flags.NAT_IS_INSIDE
2011         self.vapi.nat44_interface_add_del_feature(
2012             sw_if_index=self.pg0.sw_if_index, is_add=1
2013         )
2014         self.vapi.nat44_interface_add_del_feature(
2015             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2016         )
2017         self.vapi.nat44_interface_add_del_feature(
2018             sw_if_index=self.pg6.sw_if_index, is_add=1
2019         )
2020         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2021         self.nat_add_static_mapping(
2022             self.pg0.remote_ip4,
2023             external_sw_if_index=self.pg0.sw_if_index,
2024             local_port=local_port,
2025             vrf_id=0,
2026             external_port=external_port,
2027             proto=IP_PROTOS.tcp,
2028             flags=flags,
2029         )
2030
2031         # from client VRF1 to service VRF0
2032         p = (
2033             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
2034             / IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4)
2035             / TCP(sport=12346, dport=external_port)
2036         )
2037         self.pg6.add_stream(p)
2038         self.pg_enable_capture(self.pg_interfaces)
2039         self.pg_start()
2040         capture = self.pg0.get_capture(1)
2041         p = capture[0]
2042         try:
2043             ip = p[IP]
2044             tcp = p[TCP]
2045             self.assertEqual(ip.dst, self.pg0.remote_ip4)
2046             self.assertEqual(tcp.dport, local_port)
2047             self.assert_packet_checksums_valid(p)
2048         except:
2049             self.logger.error(ppp("Unexpected or invalid packet:", p))
2050             raise
2051
2052         # from service VRF0 back to client VRF1
2053         p = (
2054             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2055             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2056             / TCP(sport=local_port, dport=12346)
2057         )
2058         self.pg0.add_stream(p)
2059         self.pg_enable_capture(self.pg_interfaces)
2060         self.pg_start()
2061         capture = self.pg6.get_capture(1)
2062         p = capture[0]
2063         try:
2064             ip = p[IP]
2065             tcp = p[TCP]
2066             self.assertEqual(ip.src, self.pg0.local_ip4)
2067             self.assertEqual(tcp.sport, external_port)
2068             self.assert_packet_checksums_valid(p)
2069         except:
2070             self.logger.error(ppp("Unexpected or invalid packet:", p))
2071             raise
2072
2073     def test_multiple_vrf_4(self):
2074         """Multiple VRF - client in VRF0, service in VRF1"""
2075
2076         external_addr = "1.2.3.4"
2077         external_port = 80
2078         local_port = 8080
2079         port = 0
2080
2081         flags = self.config_flags.NAT_IS_INSIDE
2082         self.vapi.nat44_interface_add_del_feature(
2083             sw_if_index=self.pg0.sw_if_index, is_add=1
2084         )
2085         self.vapi.nat44_interface_add_del_feature(
2086             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2087         )
2088         self.vapi.nat44_interface_add_del_feature(
2089             sw_if_index=self.pg5.sw_if_index, is_add=1
2090         )
2091         self.vapi.nat44_interface_add_del_feature(
2092             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
2093         )
2094         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2095         self.nat_add_static_mapping(
2096             self.pg5.remote_ip4,
2097             external_addr,
2098             local_port,
2099             external_port,
2100             vrf_id=1,
2101             proto=IP_PROTOS.tcp,
2102             flags=flags,
2103         )
2104
2105         # from client VRF0 to service VRF1
2106         p = (
2107             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2108             / IP(src=self.pg0.remote_ip4, dst=external_addr)
2109             / TCP(sport=12347, dport=external_port)
2110         )
2111         self.pg0.add_stream(p)
2112         self.pg_enable_capture(self.pg_interfaces)
2113         self.pg_start()
2114         capture = self.pg5.get_capture(1)
2115         p = capture[0]
2116         try:
2117             ip = p[IP]
2118             tcp = p[TCP]
2119             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2120             self.assertEqual(tcp.dport, local_port)
2121             self.assert_packet_checksums_valid(p)
2122         except:
2123             self.logger.error(ppp("Unexpected or invalid packet:", p))
2124             raise
2125
2126         # from service VRF1 back to client VRF0
2127         p = (
2128             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2129             / IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4)
2130             / TCP(sport=local_port, dport=12347)
2131         )
2132         self.pg5.add_stream(p)
2133         self.pg_enable_capture(self.pg_interfaces)
2134         self.pg_start()
2135         capture = self.pg0.get_capture(1)
2136         p = capture[0]
2137         try:
2138             ip = p[IP]
2139             tcp = p[TCP]
2140             self.assertEqual(ip.src, external_addr)
2141             self.assertEqual(tcp.sport, external_port)
2142             self.assert_packet_checksums_valid(p)
2143         except:
2144             self.logger.error(ppp("Unexpected or invalid packet:", p))
2145             raise
2146
2147     def test_multiple_vrf_5(self):
2148         """Multiple VRF - forwarding - no translation"""
2149
2150         external_addr = "1.2.3.4"
2151         external_port = 80
2152         local_port = 8080
2153         port = 0
2154
2155         self.vapi.nat44_forwarding_enable_disable(enable=1)
2156         flags = self.config_flags.NAT_IS_INSIDE
2157         self.vapi.nat44_interface_add_del_feature(
2158             sw_if_index=self.pg0.sw_if_index, is_add=1
2159         )
2160         self.vapi.nat44_interface_add_del_feature(
2161             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2162         )
2163         self.vapi.nat44_interface_add_del_feature(
2164             sw_if_index=self.pg5.sw_if_index, is_add=1
2165         )
2166         self.vapi.nat44_interface_add_del_feature(
2167             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
2168         )
2169         self.vapi.nat44_interface_add_del_feature(
2170             sw_if_index=self.pg6.sw_if_index, is_add=1
2171         )
2172         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2173         self.nat_add_static_mapping(
2174             self.pg5.remote_ip4,
2175             external_addr,
2176             local_port,
2177             external_port,
2178             vrf_id=1,
2179             proto=IP_PROTOS.tcp,
2180             flags=flags,
2181         )
2182         self.nat_add_static_mapping(
2183             self.pg0.remote_ip4,
2184             external_sw_if_index=self.pg0.sw_if_index,
2185             local_port=local_port,
2186             vrf_id=0,
2187             external_port=external_port,
2188             proto=IP_PROTOS.tcp,
2189             flags=flags,
2190         )
2191
2192         # from client to server (both VRF1, no translation)
2193         p = (
2194             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
2195             / IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4)
2196             / TCP(sport=12348, dport=local_port)
2197         )
2198         self.pg6.add_stream(p)
2199         self.pg_enable_capture(self.pg_interfaces)
2200         self.pg_start()
2201         capture = self.pg5.get_capture(1)
2202         p = capture[0]
2203         try:
2204             ip = p[IP]
2205             tcp = p[TCP]
2206             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2207             self.assertEqual(tcp.dport, local_port)
2208             self.assert_packet_checksums_valid(p)
2209         except:
2210             self.logger.error(ppp("Unexpected or invalid packet:", p))
2211             raise
2212
2213         # from server back to client (both VRF1, no translation)
2214         p = (
2215             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2216             / IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4)
2217             / TCP(sport=local_port, dport=12348)
2218         )
2219         self.pg5.add_stream(p)
2220         self.pg_enable_capture(self.pg_interfaces)
2221         self.pg_start()
2222         capture = self.pg6.get_capture(1)
2223         p = capture[0]
2224         try:
2225             ip = p[IP]
2226             tcp = p[TCP]
2227             self.assertEqual(ip.src, self.pg5.remote_ip4)
2228             self.assertEqual(tcp.sport, local_port)
2229             self.assert_packet_checksums_valid(p)
2230         except:
2231             self.logger.error(ppp("Unexpected or invalid packet:", p))
2232             raise
2233
2234         # from client VRF1 to server VRF0 (no translation)
2235         p = (
2236             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2237             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2238             / TCP(sport=local_port, dport=12349)
2239         )
2240         self.pg0.add_stream(p)
2241         self.pg_enable_capture(self.pg_interfaces)
2242         self.pg_start()
2243         capture = self.pg6.get_capture(1)
2244         p = capture[0]
2245         try:
2246             ip = p[IP]
2247             tcp = p[TCP]
2248             self.assertEqual(ip.src, self.pg0.remote_ip4)
2249             self.assertEqual(tcp.sport, local_port)
2250             self.assert_packet_checksums_valid(p)
2251         except:
2252             self.logger.error(ppp("Unexpected or invalid packet:", p))
2253             raise
2254
2255         # from server VRF0 back to client VRF1 (no translation)
2256         p = (
2257             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2258             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2259             / TCP(sport=local_port, dport=12349)
2260         )
2261         self.pg0.add_stream(p)
2262         self.pg_enable_capture(self.pg_interfaces)
2263         self.pg_start()
2264         capture = self.pg6.get_capture(1)
2265         p = capture[0]
2266         try:
2267             ip = p[IP]
2268             tcp = p[TCP]
2269             self.assertEqual(ip.src, self.pg0.remote_ip4)
2270             self.assertEqual(tcp.sport, local_port)
2271             self.assert_packet_checksums_valid(p)
2272         except:
2273             self.logger.error(ppp("Unexpected or invalid packet:", p))
2274             raise
2275
2276         # from client VRF0 to server VRF1 (no translation)
2277         p = (
2278             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2279             / IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4)
2280             / TCP(sport=12344, dport=local_port)
2281         )
2282         self.pg0.add_stream(p)
2283         self.pg_enable_capture(self.pg_interfaces)
2284         self.pg_start()
2285         capture = self.pg5.get_capture(1)
2286         p = capture[0]
2287         try:
2288             ip = p[IP]
2289             tcp = p[TCP]
2290             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2291             self.assertEqual(tcp.dport, local_port)
2292             self.assert_packet_checksums_valid(p)
2293         except:
2294             self.logger.error(ppp("Unexpected or invalid packet:", p))
2295             raise
2296
2297         # from server VRF1 back to client VRF0 (no translation)
2298         p = (
2299             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2300             / IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4)
2301             / TCP(sport=local_port, dport=12344)
2302         )
2303         self.pg5.add_stream(p)
2304         self.pg_enable_capture(self.pg_interfaces)
2305         self.pg_start()
2306         capture = self.pg0.get_capture(1)
2307         p = capture[0]
2308         try:
2309             ip = p[IP]
2310             tcp = p[TCP]
2311             self.assertEqual(ip.src, self.pg5.remote_ip4)
2312             self.assertEqual(tcp.sport, local_port)
2313             self.assert_packet_checksums_valid(p)
2314         except:
2315             self.logger.error(ppp("Unexpected or invalid packet:", p))
2316             raise
2317
2318     def test_outside_address_distribution(self):
2319         """Outside address distribution based on source address"""
2320
2321         x = 100
2322         nat_addresses = []
2323
2324         for i in range(1, x):
2325             a = "10.0.0.%d" % i
2326             nat_addresses.append(a)
2327
2328         self.nat_add_inside_interface(self.pg0)
2329         self.nat_add_outside_interface(self.pg1)
2330
2331         self.vapi.nat44_add_del_address_range(
2332             first_ip_address=nat_addresses[0],
2333             last_ip_address=nat_addresses[-1],
2334             vrf_id=0xFFFFFFFF,
2335             is_add=1,
2336             flags=0,
2337         )
2338
2339         self.pg0.generate_remote_hosts(x)
2340
2341         pkts = []
2342         for i in range(x):
2343             info = self.create_packet_info(self.pg0, self.pg1)
2344             payload = self.info_to_payload(info)
2345             p = (
2346                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2347                 / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4)
2348                 / UDP(sport=7000 + i, dport=8000 + i)
2349                 / Raw(payload)
2350             )
2351             info.data = p
2352             pkts.append(p)
2353
2354         self.pg0.add_stream(pkts)
2355         self.pg_enable_capture(self.pg_interfaces)
2356         self.pg_start()
2357         recvd = self.pg1.get_capture(len(pkts))
2358         for p_recvd in recvd:
2359             payload_info = self.payload_to_info(p_recvd[Raw])
2360             packet_index = payload_info.index
2361             info = self._packet_infos[packet_index]
2362             self.assertTrue(info is not None)
2363             self.assertEqual(packet_index, info.index)
2364             p_sent = info.data
2365             packed = socket.inet_aton(p_sent[IP].src)
2366             numeric = struct.unpack("!L", packed)[0]
2367             numeric = socket.htonl(numeric)
2368             a = nat_addresses[(numeric - 1) % len(nat_addresses)]
2369             self.assertEqual(
2370                 a,
2371                 p_recvd[IP].src,
2372                 "Invalid packet (src IP %s translated to %s, but expected %s)"
2373                 % (p_sent[IP].src, p_recvd[IP].src, a),
2374             )
2375
2376     def test_dynamic_edge_ports(self):
2377         """NAT44ED dynamic translation test: edge ports"""
2378
2379         worker_count = self.vpp_worker_count or 1
2380         port_offset = 1024
2381         port_per_thread = (65536 - port_offset) // worker_count
2382         port_count = port_per_thread * worker_count
2383
2384         # worker thread edge ports
2385         thread_edge_ports = {0, port_offset - 1, 65535}
2386         for i in range(0, worker_count):
2387             port_thread_offset = (port_per_thread * i) + port_offset
2388             for port_range_offset in [0, port_per_thread - 1]:
2389                 port = port_thread_offset + port_range_offset
2390                 thread_edge_ports.add(port)
2391         thread_drop_ports = set(
2392             filter(
2393                 lambda x: x not in range(port_offset, port_offset + port_count),
2394                 thread_edge_ports,
2395             )
2396         )
2397
2398         in_if = self.pg7
2399         out_if = self.pg8
2400
2401         self.nat_add_address(self.nat_addr)
2402
2403         try:
2404             self.configure_ip4_interface(in_if, hosts=worker_count)
2405             self.configure_ip4_interface(out_if)
2406
2407             self.nat_add_inside_interface(in_if)
2408             self.nat_add_outside_interface(out_if)
2409
2410             # in2out
2411             tc1 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2412             uc1 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2413             ic1 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2414             dc1 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2415
2416             pkt_count = worker_count * len(thread_edge_ports)
2417
2418             i2o_pkts = [[] for x in range(0, worker_count)]
2419             for i in range(0, worker_count):
2420                 remote_host = in_if.remote_hosts[i]
2421                 for port in thread_edge_ports:
2422                     p = (
2423                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2424                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2425                         / TCP(sport=port, dport=port)
2426                     )
2427                     i2o_pkts[i].append(p)
2428
2429                     p = (
2430                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2431                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2432                         / UDP(sport=port, dport=port)
2433                     )
2434                     i2o_pkts[i].append(p)
2435
2436                     p = (
2437                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2438                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2439                         / ICMP(id=port, seq=port, type="echo-request")
2440                     )
2441                     i2o_pkts[i].append(p)
2442
2443             for i in range(0, worker_count):
2444                 if len(i2o_pkts[i]) > 0:
2445                     in_if.add_stream(i2o_pkts[i], worker=i)
2446
2447             self.pg_enable_capture(self.pg_interfaces)
2448             self.pg_start()
2449             capture = out_if.get_capture(pkt_count * 3)
2450             for packet in capture:
2451                 self.assert_packet_checksums_valid(packet)
2452                 if packet.haslayer(TCP):
2453                     self.assert_in_range(
2454                         packet[TCP].sport,
2455                         port_offset,
2456                         port_offset + port_count,
2457                         "src TCP port",
2458                     )
2459                 elif packet.haslayer(UDP):
2460                     self.assert_in_range(
2461                         packet[UDP].sport,
2462                         port_offset,
2463                         port_offset + port_count,
2464                         "src UDP port",
2465                     )
2466                 elif packet.haslayer(ICMP):
2467                     self.assert_in_range(
2468                         packet[ICMP].id,
2469                         port_offset,
2470                         port_offset + port_count,
2471                         "ICMP id",
2472                     )
2473                 else:
2474                     self.fail(
2475                         ppp("Unexpected or invalid packet (outside network):", packet)
2476                     )
2477
2478             if_idx = in_if.sw_if_index
2479             tc2 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2480             uc2 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2481             ic2 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2482             dc2 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2483
2484             self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2485             self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2486             self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2487             self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2488
2489             # out2in
2490             tc1 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2491             uc1 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2492             ic1 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2493             dc1 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2494             dc3 = self.statistics["/nat44-ed/out2in/slowpath/drops"]
2495
2496             # replies to unchanged thread ports should pass on each worker,
2497             # excluding packets outside dynamic port range
2498             drop_count = worker_count * len(thread_drop_ports)
2499             pass_count = worker_count * len(thread_edge_ports) - drop_count
2500
2501             o2i_pkts = [[] for x in range(0, worker_count)]
2502             for i in range(0, worker_count):
2503                 for port in thread_edge_ports:
2504                     p = (
2505                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2506                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2507                         / TCP(sport=port, dport=port)
2508                     )
2509                     o2i_pkts[i].append(p)
2510
2511                     p = (
2512                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2513                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2514                         / UDP(sport=port, dport=port)
2515                     )
2516                     o2i_pkts[i].append(p)
2517
2518                     p = (
2519                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2520                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2521                         / ICMP(id=port, seq=port, type="echo-reply")
2522                     )
2523                     o2i_pkts[i].append(p)
2524
2525             for i in range(0, worker_count):
2526                 if len(o2i_pkts[i]) > 0:
2527                     out_if.add_stream(o2i_pkts[i], worker=i)
2528
2529             self.pg_enable_capture(self.pg_interfaces)
2530             self.pg_start()
2531             capture = in_if.get_capture(pass_count * 3)
2532             for packet in capture:
2533                 self.assert_packet_checksums_valid(packet)
2534                 if packet.haslayer(TCP):
2535                     self.assertIn(packet[TCP].dport, thread_edge_ports, "dst TCP port")
2536                     self.assertEqual(packet[TCP].dport, packet[TCP].sport, "TCP ports")
2537                 elif packet.haslayer(UDP):
2538                     self.assertIn(packet[UDP].dport, thread_edge_ports, "dst UDP port")
2539                     self.assertEqual(packet[UDP].dport, packet[UDP].sport, "UDP ports")
2540                 elif packet.haslayer(ICMP):
2541                     self.assertIn(packet[ICMP].id, thread_edge_ports, "ICMP id")
2542                     self.assertEqual(packet[ICMP].id, packet[ICMP].seq, "ICMP id & seq")
2543                 else:
2544                     self.fail(
2545                         ppp("Unexpected or invalid packet (inside network):", packet)
2546                     )
2547
2548             if_idx = out_if.sw_if_index
2549             tc2 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2550             uc2 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2551             ic2 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2552             dc2 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2553             dc4 = self.statistics["/nat44-ed/out2in/slowpath/drops"]
2554
2555             self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pass_count)
2556             self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pass_count)
2557             self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pass_count)
2558             self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2559             self.assertEqual(
2560                 dc4[:, if_idx].sum() - dc3[:, if_idx].sum(), drop_count * 3
2561             )
2562
2563         finally:
2564             in_if.unconfig()
2565             out_if.unconfig()
2566
2567     def test_delete_interface(self):
2568         """NAT44ED delete nat interface"""
2569
2570         self.nat_add_address(self.nat_addr)
2571
2572         interfaces = self.create_loopback_interfaces(4)
2573         self.nat_add_outside_interface(interfaces[0])
2574         self.nat_add_inside_interface(interfaces[1])
2575         self.nat_add_outside_interface(interfaces[2])
2576         self.nat_add_inside_interface(interfaces[2])
2577         self.vapi.nat44_ed_add_del_output_interface(
2578             sw_if_index=interfaces[3].sw_if_index, is_add=1
2579         )
2580
2581         nat_sw_if_indices = [
2582             i.sw_if_index
2583             for i in self.vapi.nat44_interface_dump()
2584             + list(self.vapi.vpp.details_iter(self.vapi.nat44_ed_output_interface_get))
2585         ]
2586         self.assertEqual(len(nat_sw_if_indices), len(interfaces))
2587
2588         loopbacks = []
2589         for i in interfaces:
2590             # delete nat-enabled interface
2591             self.assertIn(i.sw_if_index, nat_sw_if_indices)
2592             i.remove_vpp_config()
2593
2594             # create interface with the same index
2595             lo = VppLoInterface(self)
2596             loopbacks.append(lo)
2597             self.assertEqual(lo.sw_if_index, i.sw_if_index)
2598
2599             # check interface is not nat-enabled
2600             nat_sw_if_indices = [
2601                 i.sw_if_index
2602                 for i in self.vapi.nat44_interface_dump()
2603                 + list(
2604                     self.vapi.vpp.details_iter(self.vapi.nat44_ed_output_interface_get)
2605                 )
2606             ]
2607             self.assertNotIn(lo.sw_if_index, nat_sw_if_indices)
2608
2609         for i in loopbacks:
2610             i.remove_vpp_config()
2611
2612
2613 @tag_fixme_ubuntu2204
2614 class TestNAT44EDMW(TestNAT44ED):
2615     """NAT44ED MW Test Case"""
2616
2617     vpp_worker_count = 4
2618     max_sessions = 5000
2619
2620     def test_dynamic(self):
2621         """NAT44ED dynamic translation test"""
2622         pkt_count = 1500
2623         tcp_port_offset = 20
2624         udp_port_offset = 20
2625         icmp_id_offset = 20
2626
2627         self.nat_add_address(self.nat_addr)
2628         self.nat_add_inside_interface(self.pg0)
2629         self.nat_add_outside_interface(self.pg1)
2630
2631         # in2out
2632         tc1 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2633         uc1 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2634         ic1 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2635         dc1 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2636
2637         i2o_pkts = [[] for x in range(0, self.vpp_worker_count)]
2638
2639         for i in range(pkt_count):
2640             p = (
2641                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2642                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2643                 / TCP(sport=tcp_port_offset + i, dport=20)
2644             )
2645             i2o_pkts[p[TCP].sport % self.vpp_worker_count].append(p)
2646
2647             p = (
2648                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2649                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2650                 / UDP(sport=udp_port_offset + i, dport=20)
2651             )
2652             i2o_pkts[p[UDP].sport % self.vpp_worker_count].append(p)
2653
2654             p = (
2655                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2656                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2657                 / ICMP(id=icmp_id_offset + i, type="echo-request")
2658             )
2659             i2o_pkts[p[ICMP].id % self.vpp_worker_count].append(p)
2660
2661         for i in range(0, self.vpp_worker_count):
2662             if len(i2o_pkts[i]) > 0:
2663                 self.pg0.add_stream(i2o_pkts[i], worker=i)
2664
2665         self.pg_enable_capture(self.pg_interfaces)
2666         self.pg_start()
2667         capture = self.pg1.get_capture(pkt_count * 3, timeout=5)
2668
2669         if_idx = self.pg0.sw_if_index
2670         tc2 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2671         uc2 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2672         ic2 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2673         dc2 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2674
2675         self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2676         self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2677         self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2678         self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2679
2680         self.logger.info(self.vapi.cli("show trace"))
2681
2682         # out2in
2683         tc1 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2684         uc1 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2685         ic1 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2686         dc1 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2687
2688         recvd_tcp_ports = set()
2689         recvd_udp_ports = set()
2690         recvd_icmp_ids = set()
2691
2692         for p in capture:
2693             if TCP in p:
2694                 recvd_tcp_ports.add(p[TCP].sport)
2695             if UDP in p:
2696                 recvd_udp_ports.add(p[UDP].sport)
2697             if ICMP in p:
2698                 recvd_icmp_ids.add(p[ICMP].id)
2699
2700         recvd_tcp_ports = list(recvd_tcp_ports)
2701         recvd_udp_ports = list(recvd_udp_ports)
2702         recvd_icmp_ids = list(recvd_icmp_ids)
2703
2704         o2i_pkts = [[] for x in range(0, self.vpp_worker_count)]
2705         for i in range(pkt_count):
2706             p = (
2707                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2708                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2709                 / TCP(dport=choice(recvd_tcp_ports), sport=20)
2710             )
2711             o2i_pkts[p[TCP].dport % self.vpp_worker_count].append(p)
2712
2713             p = (
2714                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2715                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2716                 / UDP(dport=choice(recvd_udp_ports), sport=20)
2717             )
2718             o2i_pkts[p[UDP].dport % self.vpp_worker_count].append(p)
2719
2720             p = (
2721                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2722                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2723                 / ICMP(id=choice(recvd_icmp_ids), type="echo-reply")
2724             )
2725             o2i_pkts[p[ICMP].id % self.vpp_worker_count].append(p)
2726
2727         for i in range(0, self.vpp_worker_count):
2728             if len(o2i_pkts[i]) > 0:
2729                 self.pg1.add_stream(o2i_pkts[i], worker=i)
2730
2731         self.pg_enable_capture(self.pg_interfaces)
2732         self.pg_start()
2733         capture = self.pg0.get_capture(pkt_count * 3)
2734         for packet in capture:
2735             try:
2736                 self.assert_packet_checksums_valid(packet)
2737                 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2738                 if packet.haslayer(TCP):
2739                     self.assert_in_range(
2740                         packet[TCP].dport,
2741                         tcp_port_offset,
2742                         tcp_port_offset + pkt_count,
2743                         "dst TCP port",
2744                     )
2745                 elif packet.haslayer(UDP):
2746                     self.assert_in_range(
2747                         packet[UDP].dport,
2748                         udp_port_offset,
2749                         udp_port_offset + pkt_count,
2750                         "dst UDP port",
2751                     )
2752                 else:
2753                     self.assert_in_range(
2754                         packet[ICMP].id,
2755                         icmp_id_offset,
2756                         icmp_id_offset + pkt_count,
2757                         "ICMP id",
2758                     )
2759             except:
2760                 self.logger.error(
2761                     ppp("Unexpected or invalid packet (inside network):", packet)
2762                 )
2763                 raise
2764
2765         if_idx = self.pg1.sw_if_index
2766         tc2 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2767         uc2 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2768         ic2 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2769         dc2 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2770
2771         self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2772         self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2773         self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2774         self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2775
2776         sc = self.statistics["/nat44-ed/total-sessions"]
2777         self.assertEqual(
2778             sc[:, 0].sum(),
2779             len(recvd_tcp_ports) + len(recvd_udp_ports) + len(recvd_icmp_ids),
2780         )
2781
2782     def test_frag_in_order(self):
2783         """NAT44ED translate fragments arriving in order"""
2784
2785         self.nat_add_address(self.nat_addr)
2786         self.nat_add_inside_interface(self.pg0)
2787         self.nat_add_outside_interface(self.pg1)
2788
2789         self.frag_in_order(proto=IP_PROTOS.tcp, ignore_port=True)
2790         self.frag_in_order(proto=IP_PROTOS.udp, ignore_port=True)
2791         self.frag_in_order(proto=IP_PROTOS.icmp, ignore_port=True)
2792
2793     def test_frag_in_order_do_not_translate(self):
2794         """NAT44ED don't translate fragments arriving in order"""
2795
2796         self.nat_add_address(self.nat_addr)
2797         self.nat_add_inside_interface(self.pg0)
2798         self.nat_add_outside_interface(self.pg1)
2799         self.vapi.nat44_forwarding_enable_disable(enable=True)
2800
2801         self.frag_in_order(proto=IP_PROTOS.tcp, dont_translate=True)
2802
2803     def test_frag_out_of_order(self):
2804         """NAT44ED translate fragments arriving out of order"""
2805
2806         self.nat_add_address(self.nat_addr)
2807         self.nat_add_inside_interface(self.pg0)
2808         self.nat_add_outside_interface(self.pg1)
2809
2810         self.frag_out_of_order(proto=IP_PROTOS.tcp, ignore_port=True)
2811         self.frag_out_of_order(proto=IP_PROTOS.udp, ignore_port=True)
2812         self.frag_out_of_order(proto=IP_PROTOS.icmp, ignore_port=True)
2813
2814     def test_frag_in_order_in_plus_out(self):
2815         """NAT44ED in+out interface fragments in order"""
2816
2817         in_port = self.random_port()
2818         out_port = self.random_port()
2819
2820         self.nat_add_address(self.nat_addr)
2821         self.nat_add_inside_interface(self.pg0)
2822         self.nat_add_outside_interface(self.pg0)
2823         self.nat_add_inside_interface(self.pg1)
2824         self.nat_add_outside_interface(self.pg1)
2825
2826         # add static mappings for server
2827         self.nat_add_static_mapping(
2828             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.tcp
2829         )
2830         self.nat_add_static_mapping(
2831             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.udp
2832         )
2833         self.nat_add_static_mapping(
2834             self.server_addr, self.nat_addr, proto=IP_PROTOS.icmp
2835         )
2836
2837         # run tests for each protocol
2838         self.frag_in_order_in_plus_out(
2839             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.tcp
2840         )
2841         self.frag_in_order_in_plus_out(
2842             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.udp
2843         )
2844         self.frag_in_order_in_plus_out(
2845             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.icmp
2846         )
2847
2848     def test_frag_out_of_order_in_plus_out(self):
2849         """NAT44ED in+out interface fragments out of order"""
2850
2851         in_port = self.random_port()
2852         out_port = self.random_port()
2853
2854         self.nat_add_address(self.nat_addr)
2855         self.nat_add_inside_interface(self.pg0)
2856         self.nat_add_outside_interface(self.pg0)
2857         self.nat_add_inside_interface(self.pg1)
2858         self.nat_add_outside_interface(self.pg1)
2859
2860         # add static mappings for server
2861         self.nat_add_static_mapping(
2862             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.tcp
2863         )
2864         self.nat_add_static_mapping(
2865             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.udp
2866         )
2867         self.nat_add_static_mapping(
2868             self.server_addr, self.nat_addr, proto=IP_PROTOS.icmp
2869         )
2870
2871         # run tests for each protocol
2872         self.frag_out_of_order_in_plus_out(
2873             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.tcp
2874         )
2875         self.frag_out_of_order_in_plus_out(
2876             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.udp
2877         )
2878         self.frag_out_of_order_in_plus_out(
2879             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.icmp
2880         )
2881
2882     def test_reass_hairpinning(self):
2883         """NAT44ED fragments hairpinning"""
2884
2885         server_addr = self.pg0.remote_hosts[1].ip4
2886
2887         host_in_port = self.random_port()
2888         server_in_port = self.random_port()
2889         server_out_port = self.random_port()
2890
2891         self.nat_add_address(self.nat_addr)
2892         self.nat_add_inside_interface(self.pg0)
2893         self.nat_add_outside_interface(self.pg1)
2894
2895         # add static mapping for server
2896         self.nat_add_static_mapping(
2897             server_addr,
2898             self.nat_addr,
2899             server_in_port,
2900             server_out_port,
2901             proto=IP_PROTOS.tcp,
2902         )
2903         self.nat_add_static_mapping(
2904             server_addr,
2905             self.nat_addr,
2906             server_in_port,
2907             server_out_port,
2908             proto=IP_PROTOS.udp,
2909         )
2910         self.nat_add_static_mapping(server_addr, self.nat_addr)
2911
2912         self.reass_hairpinning(
2913             server_addr,
2914             server_in_port,
2915             server_out_port,
2916             host_in_port,
2917             proto=IP_PROTOS.tcp,
2918             ignore_port=True,
2919         )
2920         self.reass_hairpinning(
2921             server_addr,
2922             server_in_port,
2923             server_out_port,
2924             host_in_port,
2925             proto=IP_PROTOS.udp,
2926             ignore_port=True,
2927         )
2928         self.reass_hairpinning(
2929             server_addr,
2930             server_in_port,
2931             server_out_port,
2932             host_in_port,
2933             proto=IP_PROTOS.icmp,
2934             ignore_port=True,
2935         )
2936
2937     def test_session_limit_per_vrf(self):
2938         """NAT44ED per vrf session limit"""
2939
2940         inside = self.pg0
2941         inside_vrf10 = self.pg2
2942         outside = self.pg1
2943
2944         limit = 5
2945
2946         # 2 interfaces pg0, pg1 (vrf10, limit 1 tcp session)
2947         # non existing vrf_id makes process core dump
2948         self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=10)
2949
2950         self.nat_add_inside_interface(inside)
2951         self.nat_add_inside_interface(inside_vrf10)
2952         self.nat_add_outside_interface(outside)
2953
2954         # vrf independent
2955         self.nat_add_interface_address(outside)
2956
2957         # BUG: causing core dump - when bad vrf_id is specified
2958         # self.nat_add_address(outside.local_ip4, vrf_id=20)
2959
2960         stream = self.create_tcp_stream(inside_vrf10, outside, limit * 2)
2961         inside_vrf10.add_stream(stream)
2962
2963         self.pg_enable_capture(self.pg_interfaces)
2964         self.pg_start()
2965
2966         capture = outside.get_capture(limit)
2967
2968         stream = self.create_tcp_stream(inside, outside, limit * 2)
2969         inside.add_stream(stream)
2970
2971         self.pg_enable_capture(self.pg_interfaces)
2972         self.pg_start()
2973
2974         capture = outside.get_capture(len(stream))
2975
2976     def test_show_max_translations(self):
2977         """NAT44ED API test - max translations per thread"""
2978         config = self.vapi.nat44_show_running_config()
2979         self.assertEqual(self.max_sessions, config.sessions)
2980
2981     def test_lru_cleanup(self):
2982         """NAT44ED LRU cleanup algorithm"""
2983
2984         self.nat_add_address(self.nat_addr)
2985         self.nat_add_inside_interface(self.pg0)
2986         self.nat_add_outside_interface(self.pg1)
2987
2988         self.vapi.nat_set_timeouts(
2989             udp=1, tcp_established=7440, tcp_transitory=30, icmp=1
2990         )
2991
2992         tcp_port_out = self.init_tcp_session(self.pg0, self.pg1, 2000, 80)
2993         pkts = []
2994         for i in range(0, self.max_sessions - 1):
2995             p = (
2996                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2997                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
2998                 / UDP(sport=7000 + i, dport=80)
2999             )
3000             pkts.append(p)
3001
3002         self.pg0.add_stream(pkts)
3003         self.pg_enable_capture(self.pg_interfaces)
3004         self.pg_start()
3005         self.pg1.get_capture(len(pkts))
3006         self.virtual_sleep(1.5, "wait for timeouts")
3007
3008         pkts = []
3009         for i in range(0, self.max_sessions - 1):
3010             p = (
3011                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3012                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
3013                 / ICMP(id=8000 + i, type="echo-request")
3014             )
3015             pkts.append(p)
3016
3017         self.pg0.add_stream(pkts)
3018         self.pg_enable_capture(self.pg_interfaces)
3019         self.pg_start()
3020         self.pg1.get_capture(len(pkts))
3021
3022     def test_session_rst_timeout(self):
3023         """NAT44ED session RST timeouts"""
3024
3025         self.nat_add_address(self.nat_addr)
3026         self.nat_add_inside_interface(self.pg0)
3027         self.nat_add_outside_interface(self.pg1)
3028
3029         self.vapi.nat_set_timeouts(
3030             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
3031         )
3032
3033         self.init_tcp_session(
3034             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
3035         )
3036         p = (
3037             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3038             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3039             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="R")
3040         )
3041         self.send_and_expect(self.pg0, p, self.pg1)
3042
3043         self.virtual_sleep(6)
3044
3045         # The session is already closed
3046         p = (
3047             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3048             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3049             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
3050         )
3051         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
3052
3053         # The session can be re-opened
3054         p = (
3055             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3056             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3057             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="S")
3058         )
3059         self.send_and_expect(self.pg0, p, self.pg1)
3060
3061     def test_session_rst_established_timeout(self):
3062         """NAT44ED session RST timeouts"""
3063
3064         self.nat_add_address(self.nat_addr)
3065         self.nat_add_inside_interface(self.pg0)
3066         self.nat_add_outside_interface(self.pg1)
3067
3068         self.vapi.nat_set_timeouts(
3069             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
3070         )
3071
3072         self.init_tcp_session(
3073             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
3074         )
3075
3076         # Wait at least the transitory time, the session is in established
3077         # state anyway. RST followed by a data packet should move it to
3078         # transitory state.
3079         self.virtual_sleep(6)
3080         p = (
3081             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3082             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3083             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="R")
3084         )
3085         self.send_and_expect(self.pg0, p, self.pg1)
3086
3087         p = (
3088             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3089             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3090             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
3091         )
3092         self.send_and_expect(self.pg0, p, self.pg1)
3093
3094         # State is transitory, session should be closed after 6 seconds
3095         self.virtual_sleep(6)
3096
3097         p = (
3098             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3099             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3100             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
3101         )
3102         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
3103
3104     def test_dynamic_out_of_ports(self):
3105         """NAT44ED dynamic translation test: out of ports"""
3106
3107         self.nat_add_inside_interface(self.pg0)
3108         self.nat_add_outside_interface(self.pg1)
3109
3110         # in2out and no NAT addresses added
3111         pkts = self.create_stream_in(self.pg0, self.pg1)
3112
3113         self.send_and_assert_no_replies(
3114             self.pg0,
3115             pkts,
3116             msg="i2o pkts",
3117             stats_diff=self.no_diff
3118             | {
3119                 "err": {
3120                     "/err/nat44-ed-in2out-slowpath/out of ports": len(pkts),
3121                 },
3122                 self.pg0.sw_if_index: {
3123                     "/nat44-ed/in2out/slowpath/drops": len(pkts),
3124                 },
3125             },
3126         )
3127
3128         # in2out after NAT addresses added
3129         self.nat_add_address(self.nat_addr)
3130
3131         tcpn, udpn, icmpn = (
3132             sum(x) for x in zip(*((TCP in p, UDP in p, ICMP in p) for p in pkts))
3133         )
3134
3135         self.send_and_expect(
3136             self.pg0,
3137             pkts,
3138             self.pg1,
3139             msg="i2o pkts",
3140             stats_diff=self.no_diff
3141             | {
3142                 "err": {
3143                     "/err/nat44-ed-in2out-slowpath/out of ports": 0,
3144                 },
3145                 self.pg0.sw_if_index: {
3146                     "/nat44-ed/in2out/slowpath/drops": 0,
3147                     "/nat44-ed/in2out/slowpath/tcp": tcpn,
3148                     "/nat44-ed/in2out/slowpath/udp": udpn,
3149                     "/nat44-ed/in2out/slowpath/icmp": icmpn,
3150                 },
3151             },
3152         )
3153
3154     def test_unknown_proto(self):
3155         """NAT44ED translate packet with unknown protocol"""
3156
3157         self.nat_add_address(self.nat_addr)
3158         self.nat_add_inside_interface(self.pg0)
3159         self.nat_add_outside_interface(self.pg1)
3160
3161         # in2out
3162         p = (
3163             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3164             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3165             / TCP(sport=self.tcp_port_in, dport=20)
3166         )
3167         self.pg0.add_stream(p)
3168         self.pg_enable_capture(self.pg_interfaces)
3169         self.pg_start()
3170         p = self.pg1.get_capture(1)
3171
3172         p = (
3173             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3174             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3175             / GRE()
3176             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3177             / TCP(sport=1234, dport=1234)
3178         )
3179         self.pg0.add_stream(p)
3180         self.pg_enable_capture(self.pg_interfaces)
3181         self.pg_start()
3182         p = self.pg1.get_capture(1)
3183         packet = p[0]
3184         try:
3185             self.assertEqual(packet[IP].src, self.nat_addr)
3186             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3187             self.assertEqual(packet.haslayer(GRE), 1)
3188             self.assert_packet_checksums_valid(packet)
3189         except:
3190             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3191             raise
3192
3193         # out2in
3194         p = (
3195             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3196             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3197             / GRE()
3198             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3199             / TCP(sport=1234, dport=1234)
3200         )
3201         self.pg1.add_stream(p)
3202         self.pg_enable_capture(self.pg_interfaces)
3203         self.pg_start()
3204         p = self.pg0.get_capture(1)
3205         packet = p[0]
3206         try:
3207             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3208             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3209             self.assertEqual(packet.haslayer(GRE), 1)
3210             self.assert_packet_checksums_valid(packet)
3211         except:
3212             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3213             raise
3214
3215     def test_hairpinning_unknown_proto(self):
3216         """NAT44ED translate packet with unknown protocol - hairpinning"""
3217         host = self.pg0.remote_hosts[0]
3218         server = self.pg0.remote_hosts[1]
3219         host_in_port = 1234
3220         server_out_port = 8765
3221         server_nat_ip = "10.0.0.11"
3222
3223         self.nat_add_address(self.nat_addr)
3224         self.nat_add_inside_interface(self.pg0)
3225         self.nat_add_outside_interface(self.pg1)
3226
3227         # add static mapping for server
3228         self.nat_add_static_mapping(server.ip4, server_nat_ip)
3229
3230         # host to server
3231         p = (
3232             Ether(src=host.mac, dst=self.pg0.local_mac)
3233             / IP(src=host.ip4, dst=server_nat_ip)
3234             / TCP(sport=host_in_port, dport=server_out_port)
3235         )
3236         self.pg0.add_stream(p)
3237         self.pg_enable_capture(self.pg_interfaces)
3238         self.pg_start()
3239         self.pg0.get_capture(1)
3240
3241         p = (
3242             Ether(dst=self.pg0.local_mac, src=host.mac)
3243             / IP(src=host.ip4, dst=server_nat_ip)
3244             / GRE()
3245             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3246             / TCP(sport=1234, dport=1234)
3247         )
3248         self.pg0.add_stream(p)
3249         self.pg_enable_capture(self.pg_interfaces)
3250         self.pg_start()
3251         p = self.pg0.get_capture(1)
3252         packet = p[0]
3253         try:
3254             self.assertEqual(packet[IP].src, self.nat_addr)
3255             self.assertEqual(packet[IP].dst, server.ip4)
3256             self.assertEqual(packet.haslayer(GRE), 1)
3257             self.assert_packet_checksums_valid(packet)
3258         except:
3259             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3260             raise
3261
3262         # server to host
3263         p = (
3264             Ether(dst=self.pg0.local_mac, src=server.mac)
3265             / IP(src=server.ip4, dst=self.nat_addr)
3266             / GRE()
3267             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3268             / TCP(sport=1234, dport=1234)
3269         )
3270         self.pg0.add_stream(p)
3271         self.pg_enable_capture(self.pg_interfaces)
3272         self.pg_start()
3273         p = self.pg0.get_capture(1)
3274         packet = p[0]
3275         try:
3276             self.assertEqual(packet[IP].src, server_nat_ip)
3277             self.assertEqual(packet[IP].dst, host.ip4)
3278             self.assertEqual(packet.haslayer(GRE), 1)
3279             self.assert_packet_checksums_valid(packet)
3280         except:
3281             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3282             raise
3283
3284     def test_output_feature_and_service(self):
3285         """NAT44ED interface output feature and services"""
3286         external_addr = "1.2.3.4"
3287         external_port = 80
3288         local_port = 8080
3289
3290         self.vapi.nat44_forwarding_enable_disable(enable=1)
3291         self.nat_add_address(self.nat_addr)
3292         flags = self.config_flags.NAT_IS_ADDR_ONLY
3293         self.vapi.nat44_add_del_identity_mapping(
3294             ip_address=self.pg1.remote_ip4,
3295             sw_if_index=0xFFFFFFFF,
3296             flags=flags,
3297             is_add=1,
3298         )
3299         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
3300         self.nat_add_static_mapping(
3301             self.pg0.remote_ip4,
3302             external_addr,
3303             local_port,
3304             external_port,
3305             proto=IP_PROTOS.tcp,
3306             flags=flags,
3307         )
3308
3309         self.nat_add_inside_interface(self.pg0)
3310         self.nat_add_outside_interface(self.pg0)
3311         self.vapi.nat44_ed_add_del_output_interface(
3312             sw_if_index=self.pg1.sw_if_index, is_add=1
3313         )
3314
3315         # from client to service
3316         p = (
3317             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3318             / IP(src=self.pg1.remote_ip4, dst=external_addr)
3319             / TCP(sport=12345, dport=external_port)
3320         )
3321         self.pg1.add_stream(p)
3322         self.pg_enable_capture(self.pg_interfaces)
3323         self.pg_start()
3324         capture = self.pg0.get_capture(1)
3325         p = capture[0]
3326         try:
3327             ip = p[IP]
3328             tcp = p[TCP]
3329             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3330             self.assertEqual(tcp.dport, local_port)
3331             self.assert_packet_checksums_valid(p)
3332         except:
3333             self.logger.error(ppp("Unexpected or invalid packet:", p))
3334             raise
3335
3336         # from service back to client
3337         p = (
3338             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3339             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3340             / TCP(sport=local_port, dport=12345)
3341         )
3342         self.pg0.add_stream(p)
3343         self.pg_enable_capture(self.pg_interfaces)
3344         self.pg_start()
3345         capture = self.pg1.get_capture(1)
3346         p = capture[0]
3347         try:
3348             ip = p[IP]
3349             tcp = p[TCP]
3350             self.assertEqual(ip.src, external_addr)
3351             self.assertEqual(tcp.sport, external_port)
3352             self.assert_packet_checksums_valid(p)
3353         except:
3354             self.logger.error(ppp("Unexpected or invalid packet:", p))
3355             raise
3356
3357         # from local network host to external network
3358         pkts = self.create_stream_in(self.pg0, self.pg1)
3359         self.pg0.add_stream(pkts)
3360         self.pg_enable_capture(self.pg_interfaces)
3361         self.pg_start()
3362         capture = self.pg1.get_capture(len(pkts))
3363         self.verify_capture_out(capture, ignore_port=True)
3364         pkts = self.create_stream_in(self.pg0, self.pg1)
3365         self.pg0.add_stream(pkts)
3366         self.pg_enable_capture(self.pg_interfaces)
3367         self.pg_start()
3368         capture = self.pg1.get_capture(len(pkts))
3369         self.verify_capture_out(capture, ignore_port=True)
3370
3371         # from external network back to local network host
3372         pkts = self.create_stream_out(self.pg1)
3373         self.pg1.add_stream(pkts)
3374         self.pg_enable_capture(self.pg_interfaces)
3375         self.pg_start()
3376         capture = self.pg0.get_capture(len(pkts))
3377         self.verify_capture_in(capture, self.pg0)
3378
3379     def test_output_feature_and_service3(self):
3380         """NAT44ED interface output feature and DST NAT"""
3381         external_addr = "1.2.3.4"
3382         external_port = 80
3383         local_port = 8080
3384
3385         self.vapi.nat44_forwarding_enable_disable(enable=1)
3386         self.nat_add_address(self.nat_addr)
3387         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
3388         self.nat_add_static_mapping(
3389             self.pg1.remote_ip4,
3390             external_addr,
3391             local_port,
3392             external_port,
3393             proto=IP_PROTOS.tcp,
3394             flags=flags,
3395         )
3396
3397         self.nat_add_inside_interface(self.pg0)
3398         self.nat_add_outside_interface(self.pg0)
3399         self.vapi.nat44_ed_add_del_output_interface(
3400             sw_if_index=self.pg1.sw_if_index, is_add=1
3401         )
3402
3403         p = (
3404             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3405             / IP(src=self.pg0.remote_ip4, dst=external_addr)
3406             / TCP(sport=12345, dport=external_port)
3407         )
3408         self.pg0.add_stream(p)
3409         self.pg_enable_capture(self.pg_interfaces)
3410         self.pg_start()
3411         capture = self.pg1.get_capture(1)
3412         p = capture[0]
3413         try:
3414             ip = p[IP]
3415             tcp = p[TCP]
3416             self.assertEqual(ip.src, self.pg0.remote_ip4)
3417             self.assertEqual(tcp.sport, 12345)
3418             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3419             self.assertEqual(tcp.dport, local_port)
3420             self.assert_packet_checksums_valid(p)
3421         except:
3422             self.logger.error(ppp("Unexpected or invalid packet:", p))
3423             raise
3424
3425         p = (
3426             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3427             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
3428             / TCP(sport=local_port, dport=12345)
3429         )
3430         self.pg1.add_stream(p)
3431         self.pg_enable_capture(self.pg_interfaces)
3432         self.pg_start()
3433         capture = self.pg0.get_capture(1)
3434         p = capture[0]
3435         try:
3436             ip = p[IP]
3437             tcp = p[TCP]
3438             self.assertEqual(ip.src, external_addr)
3439             self.assertEqual(tcp.sport, external_port)
3440             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3441             self.assertEqual(tcp.dport, 12345)
3442             self.assert_packet_checksums_valid(p)
3443         except:
3444             self.logger.error(ppp("Unexpected or invalid packet:", p))
3445             raise
3446
3447     def test_self_twice_nat_lb_negative(self):
3448         """NAT44ED Self Twice NAT local service load balancing (negative test)"""
3449         self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, client_id=2)
3450
3451     def test_self_twice_nat_negative(self):
3452         """NAT44ED Self Twice NAT (negative test)"""
3453         self.twice_nat_common(self_twice_nat=True)
3454
3455     def test_static_lb_multi_clients(self):
3456         """NAT44ED local service load balancing - multiple clients"""
3457
3458         external_addr = self.nat_addr
3459         external_port = 80
3460         local_port = 8080
3461         server1 = self.pg0.remote_hosts[0]
3462         server2 = self.pg0.remote_hosts[1]
3463         server3 = self.pg0.remote_hosts[2]
3464
3465         locals = [
3466             {"addr": server1.ip4, "port": local_port, "probability": 90, "vrf_id": 0},
3467             {"addr": server2.ip4, "port": local_port, "probability": 10, "vrf_id": 0},
3468         ]
3469
3470         flags = self.config_flags.NAT_IS_INSIDE
3471         self.vapi.nat44_interface_add_del_feature(
3472             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3473         )
3474         self.vapi.nat44_interface_add_del_feature(
3475             sw_if_index=self.pg1.sw_if_index, is_add=1
3476         )
3477
3478         self.nat_add_address(self.nat_addr)
3479         self.vapi.nat44_add_del_lb_static_mapping(
3480             is_add=1,
3481             external_addr=external_addr,
3482             external_port=external_port,
3483             protocol=IP_PROTOS.tcp,
3484             local_num=len(locals),
3485             locals=locals,
3486         )
3487
3488         server1_n = 0
3489         server2_n = 0
3490         clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3491         pkts = []
3492         for client in clients:
3493             p = (
3494                 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3495                 / IP(src=client, dst=self.nat_addr)
3496                 / TCP(sport=12345, dport=external_port)
3497             )
3498             pkts.append(p)
3499         self.pg1.add_stream(pkts)
3500         self.pg_enable_capture(self.pg_interfaces)
3501         self.pg_start()
3502         capture = self.pg0.get_capture(len(pkts))
3503         for p in capture:
3504             if p[IP].dst == server1.ip4:
3505                 server1_n += 1
3506             else:
3507                 server2_n += 1
3508         self.assertGreaterEqual(server1_n, server2_n)
3509
3510         local = {
3511             "addr": server3.ip4,
3512             "port": local_port,
3513             "probability": 20,
3514             "vrf_id": 0,
3515         }
3516
3517         # add new back-end
3518         self.vapi.nat44_lb_static_mapping_add_del_local(
3519             is_add=1,
3520             external_addr=external_addr,
3521             external_port=external_port,
3522             local=local,
3523             protocol=IP_PROTOS.tcp,
3524         )
3525         server1_n = 0
3526         server2_n = 0
3527         server3_n = 0
3528         clients = ip4_range(self.pg1.remote_ip4, 60, 110)
3529         pkts = []
3530         for client in clients:
3531             p = (
3532                 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3533                 / IP(src=client, dst=self.nat_addr)
3534                 / TCP(sport=12346, dport=external_port)
3535             )
3536             pkts.append(p)
3537         self.assertGreater(len(pkts), 0)
3538         self.pg1.add_stream(pkts)
3539         self.pg_enable_capture(self.pg_interfaces)
3540         self.pg_start()
3541         capture = self.pg0.get_capture(len(pkts))
3542         for p in capture:
3543             if p[IP].dst == server1.ip4:
3544                 server1_n += 1
3545             elif p[IP].dst == server2.ip4:
3546                 server2_n += 1
3547             else:
3548                 server3_n += 1
3549         self.assertGreater(server1_n, 0)
3550         self.assertGreater(server2_n, 0)
3551         self.assertGreater(server3_n, 0)
3552
3553         local = {
3554             "addr": server2.ip4,
3555             "port": local_port,
3556             "probability": 10,
3557             "vrf_id": 0,
3558         }
3559
3560         # remove one back-end
3561         self.vapi.nat44_lb_static_mapping_add_del_local(
3562             is_add=0,
3563             external_addr=external_addr,
3564             external_port=external_port,
3565             local=local,
3566             protocol=IP_PROTOS.tcp,
3567         )
3568         server1_n = 0
3569         server2_n = 0
3570         server3_n = 0
3571         self.pg1.add_stream(pkts)
3572         self.pg_enable_capture(self.pg_interfaces)
3573         self.pg_start()
3574         capture = self.pg0.get_capture(len(pkts))
3575         for p in capture:
3576             if p[IP].dst == server1.ip4:
3577                 server1_n += 1
3578             elif p[IP].dst == server2.ip4:
3579                 server2_n += 1
3580             else:
3581                 server3_n += 1
3582         self.assertGreater(server1_n, 0)
3583         self.assertEqual(server2_n, 0)
3584         self.assertGreater(server3_n, 0)
3585
3586     # put zzz in front of syslog test name so that it runs as a last test
3587     # setting syslog sender cannot be undone and if it is set, it messes
3588     # with self.send_and_assert_no_replies functionality
3589     def test_zzz_syslog_sess(self):
3590         """NAT44ED Test syslog session creation and deletion"""
3591         self.vapi.syslog_set_filter(self.syslog_severity.SYSLOG_API_SEVERITY_INFO)
3592         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
3593
3594         self.nat_add_address(self.nat_addr)
3595         self.nat_add_inside_interface(self.pg0)
3596         self.nat_add_outside_interface(self.pg1)
3597
3598         p = (
3599             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3600             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3601             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
3602         )
3603         self.pg0.add_stream(p)
3604         self.pg_enable_capture(self.pg_interfaces)
3605         self.pg_start()
3606         capture = self.pg1.get_capture(1)
3607         self.tcp_port_out = capture[0][TCP].sport
3608         capture = self.pg3.get_capture(1)
3609         self.verify_syslog_sess(capture[0][Raw].load, "SADD")
3610
3611         self.pg_enable_capture(self.pg_interfaces)
3612         self.pg_start()
3613         self.nat_add_address(self.nat_addr, is_add=0)
3614         capture = self.pg3.get_capture(1)
3615         self.verify_syslog_sess(capture[0][Raw].load, "SDEL")
3616
3617     # put zzz in front of syslog test name so that it runs as a last test
3618     # setting syslog sender cannot be undone and if it is set, it messes
3619     # with self.send_and_assert_no_replies functionality
3620     def test_zzz_syslog_sess_reopen(self):
3621         """Syslog events for session reopen"""
3622         self.vapi.syslog_set_filter(self.syslog_severity.SYSLOG_API_SEVERITY_INFO)
3623         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
3624
3625         self.nat_add_address(self.nat_addr)
3626         self.nat_add_inside_interface(self.pg0)
3627         self.nat_add_outside_interface(self.pg1)
3628
3629         # SYN in2out
3630         p = (
3631             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3632             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3633             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
3634         )
3635         capture = self.send_and_expect(self.pg0, p, self.pg1)[0]
3636         self.tcp_port_out = capture[0][TCP].sport
3637         capture = self.pg3.get_capture(1)
3638         self.verify_syslog_sess(capture[0][Raw].load, "SADD")
3639
3640         # SYN out2in
3641         p = (
3642             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3643             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3644             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="SA")
3645         )
3646         self.send_and_expect(self.pg1, p, self.pg0)
3647
3648         p = (
3649             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3650             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3651             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")
3652         )
3653         self.send_and_expect(self.pg0, p, self.pg1)
3654
3655         # FIN in2out
3656         p = (
3657             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3658             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3659             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="F")
3660         )
3661         self.send_and_expect(self.pg0, p, self.pg1)
3662
3663         # FIN out2in
3664         p = (
3665             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3666             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3667             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="F")
3668         )
3669         self.send_and_expect(self.pg1, p, self.pg0)
3670
3671         self.init_tcp_session(
3672             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
3673         )
3674
3675         # 2 records should be produced - first one del & add
3676         capture = self.pg3.get_capture(2)
3677         self.verify_syslog_sess(capture[0][Raw].load, "SDEL")
3678         self.verify_syslog_sess(capture[1][Raw].load, "SADD")
3679
3680     def test_twice_nat_interface_addr(self):
3681         """NAT44ED Acquire twice NAT addresses from interface"""
3682         flags = self.config_flags.NAT_IS_TWICE_NAT
3683         self.vapi.nat44_add_del_interface_addr(
3684             sw_if_index=self.pg11.sw_if_index, flags=flags, is_add=1
3685         )
3686
3687         # no address in NAT pool
3688         adresses = self.vapi.nat44_address_dump()
3689         self.assertEqual(0, len(adresses))
3690
3691         # configure interface address and check NAT address pool
3692         self.pg11.config_ip4()
3693         adresses = self.vapi.nat44_address_dump()
3694         self.assertEqual(1, len(adresses))
3695         self.assertEqual(str(adresses[0].ip_address), self.pg11.local_ip4)
3696         self.assertEqual(adresses[0].flags, flags)
3697
3698         # remove interface address and check NAT address pool
3699         self.pg11.unconfig_ip4()
3700         adresses = self.vapi.nat44_address_dump()
3701         self.assertEqual(0, len(adresses))
3702
3703     def test_output_feature_stateful_acl(self):
3704         """NAT44ED output feature works with stateful ACL"""
3705
3706         self.nat_add_address(self.nat_addr)
3707         self.vapi.nat44_ed_add_del_output_interface(
3708             sw_if_index=self.pg1.sw_if_index, is_add=1
3709         )
3710
3711         # First ensure that the NAT is working sans ACL
3712
3713         # send packets out2in, no sessions yet so packets should drop
3714         pkts_out2in = self.create_stream_out(self.pg1)
3715         self.send_and_assert_no_replies(self.pg1, pkts_out2in)
3716
3717         # send packets into inside intf, ensure received via outside intf
3718         pkts_in2out = self.create_stream_in(self.pg0, self.pg1)
3719         capture = self.send_and_expect(
3720             self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)
3721         )
3722         self.verify_capture_out(capture, ignore_port=True)
3723
3724         # send out2in again, with sessions created it should work now
3725         pkts_out2in = self.create_stream_out(self.pg1)
3726         capture = self.send_and_expect(
3727             self.pg1, pkts_out2in, self.pg0, len(pkts_out2in)
3728         )
3729         self.verify_capture_in(capture, self.pg0)
3730
3731         # Create an ACL blocking everything
3732         out2in_deny_rule = AclRule(is_permit=0)
3733         out2in_acl = VppAcl(self, rules=[out2in_deny_rule])
3734         out2in_acl.add_vpp_config()
3735
3736         # create an ACL to permit/reflect everything
3737         in2out_reflect_rule = AclRule(is_permit=2)
3738         in2out_acl = VppAcl(self, rules=[in2out_reflect_rule])
3739         in2out_acl.add_vpp_config()
3740
3741         # apply as input acl on interface and confirm it blocks everything
3742         acl_if = VppAclInterface(
3743             self, sw_if_index=self.pg1.sw_if_index, n_input=1, acls=[out2in_acl]
3744         )
3745         acl_if.add_vpp_config()
3746         self.send_and_assert_no_replies(self.pg1, pkts_out2in)
3747
3748         # apply output acl
3749         acl_if.acls = [out2in_acl, in2out_acl]
3750         acl_if.add_vpp_config()
3751         # send in2out to generate ACL state (NAT state was created earlier)
3752         capture = self.send_and_expect(
3753             self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)
3754         )
3755         self.verify_capture_out(capture, ignore_port=True)
3756
3757         # send out2in again. ACL state exists so it should work now.
3758         # TCP packets with the syn flag set also need the ack flag
3759         for p in pkts_out2in:
3760             if p.haslayer(TCP) and p[TCP].flags & 0x02:
3761                 p[TCP].flags |= 0x10
3762         capture = self.send_and_expect(
3763             self.pg1, pkts_out2in, self.pg0, len(pkts_out2in)
3764         )
3765         self.verify_capture_in(capture, self.pg0)
3766         self.logger.info(self.vapi.cli("show trace"))
3767
3768     def test_tcp_close(self):
3769         """NAT44ED Close TCP session from inside network - output feature"""
3770         config = self.vapi.nat44_show_running_config()
3771         old_timeouts = config.timeouts
3772         new_transitory = 2
3773         self.vapi.nat_set_timeouts(
3774             udp=old_timeouts.udp,
3775             tcp_established=old_timeouts.tcp_established,
3776             icmp=old_timeouts.icmp,
3777             tcp_transitory=new_transitory,
3778         )
3779
3780         self.vapi.nat44_forwarding_enable_disable(enable=1)
3781         self.nat_add_address(self.pg1.local_ip4)
3782         twice_nat_addr = "10.0.1.3"
3783         service_ip = "192.168.16.150"
3784         self.nat_add_address(twice_nat_addr, twice_nat=1)
3785
3786         flags = self.config_flags.NAT_IS_INSIDE
3787         self.vapi.nat44_interface_add_del_feature(
3788             sw_if_index=self.pg0.sw_if_index, is_add=1
3789         )
3790         self.vapi.nat44_interface_add_del_feature(
3791             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3792         )
3793         self.vapi.nat44_ed_add_del_output_interface(
3794             is_add=1, sw_if_index=self.pg1.sw_if_index
3795         )
3796
3797         flags = (
3798             self.config_flags.NAT_IS_OUT2IN_ONLY | self.config_flags.NAT_IS_TWICE_NAT
3799         )
3800         self.nat_add_static_mapping(
3801             self.pg0.remote_ip4, service_ip, 80, 80, proto=IP_PROTOS.tcp, flags=flags
3802         )
3803         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3804         start_sessnum = len(sessions)
3805
3806         # SYN packet out->in
3807         p = (
3808             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3809             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3810             / TCP(sport=33898, dport=80, flags="S")
3811         )
3812         capture = self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3813         p = capture[0]
3814         tcp_port = p[TCP].sport
3815
3816         # SYN + ACK packet in->out
3817         p = (
3818             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3819             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3820             / TCP(sport=80, dport=tcp_port, flags="SA")
3821         )
3822         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3823
3824         # ACK packet out->in
3825         p = (
3826             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3827             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3828             / TCP(sport=33898, dport=80, flags="A")
3829         )
3830         self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3831
3832         # FIN packet in -> out
3833         p = (
3834             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3835             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3836             / TCP(sport=80, dport=tcp_port, flags="FA", seq=100, ack=300)
3837         )
3838         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3839
3840         # FIN+ACK packet out -> in
3841         p = (
3842             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3843             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3844             / TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101)
3845         )
3846         self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3847
3848         # ACK packet in -> out
3849         p = (
3850             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3851             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3852             / TCP(sport=80, dport=tcp_port, flags="A", seq=101, ack=301)
3853         )
3854         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3855
3856         # session now in transitory timeout, but traffic still flows
3857         # try FIN packet out->in
3858         p = (
3859             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3860             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3861             / TCP(sport=33898, dport=80, flags="F")
3862         )
3863         self.pg1.add_stream(p)
3864         self.pg_enable_capture(self.pg_interfaces)
3865         self.pg_start()
3866
3867         self.virtual_sleep(new_transitory, "wait for transitory timeout")
3868         self.pg0.get_capture(1)
3869
3870         # session should still exist
3871         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3872         self.assertEqual(len(sessions) - start_sessnum, 1)
3873
3874         # send FIN+ACK packet out -> in - will cause session to be wiped
3875         # but won't create a new session
3876         p = (
3877             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3878             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3879             / TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101)
3880         )
3881         self.send_and_assert_no_replies(self.pg1, p)
3882         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3883         self.assertEqual(len(sessions) - start_sessnum, 0)
3884
3885     def test_tcp_session_close_in(self):
3886         """NAT44ED Close TCP session from inside network"""
3887
3888         in_port = self.tcp_port_in
3889         out_port = 10505
3890         ext_port = self.tcp_external_port
3891
3892         self.nat_add_address(self.nat_addr)
3893         self.nat_add_inside_interface(self.pg0)
3894         self.nat_add_outside_interface(self.pg1)
3895         self.nat_add_static_mapping(
3896             self.pg0.remote_ip4,
3897             self.nat_addr,
3898             in_port,
3899             out_port,
3900             proto=IP_PROTOS.tcp,
3901             flags=self.config_flags.NAT_IS_TWICE_NAT,
3902         )
3903
3904         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3905         session_n = len(sessions)
3906
3907         self.vapi.nat_set_timeouts(
3908             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
3909         )
3910
3911         self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
3912
3913         # FIN packet in -> out
3914         p = (
3915             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3916             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3917             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
3918         )
3919         self.send_and_expect(self.pg0, p, self.pg1)
3920         pkts = []
3921
3922         # ACK packet out -> in
3923         p = (
3924             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3925             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3926             / TCP(sport=ext_port, dport=out_port, flags="A", seq=300, ack=101)
3927         )
3928         pkts.append(p)
3929
3930         # FIN packet out -> in
3931         p = (
3932             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3933             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3934             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
3935         )
3936         pkts.append(p)
3937
3938         self.send_and_expect(self.pg1, pkts, self.pg0)
3939
3940         # ACK packet in -> out
3941         p = (
3942             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3943             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3944             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3945         )
3946         self.send_and_expect(self.pg0, p, self.pg1)
3947
3948         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3949         self.assertEqual(len(sessions) - session_n, 1)
3950
3951         # retransmit FIN packet out -> in
3952         p = (
3953             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3954             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3955             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
3956         )
3957
3958         self.send_and_expect(self.pg1, p, self.pg0)
3959
3960         # retransmit ACK packet in -> out
3961         p = (
3962             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3963             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3964             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3965         )
3966         self.send_and_expect(self.pg0, p, self.pg1)
3967
3968         self.virtual_sleep(3)
3969         # retransmit ACK packet in -> out - this will cause session to be wiped
3970         p = (
3971             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3972             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3973             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3974         )
3975         self.send_and_assert_no_replies(self.pg0, p)
3976         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3977         self.assertEqual(len(sessions) - session_n, 0)
3978
3979     def test_tcp_session_close_out(self):
3980         """NAT44ED Close TCP session from outside network"""
3981
3982         in_port = self.tcp_port_in
3983         out_port = 10505
3984         ext_port = self.tcp_external_port
3985
3986         self.nat_add_address(self.nat_addr)
3987         self.nat_add_inside_interface(self.pg0)
3988         self.nat_add_outside_interface(self.pg1)
3989         self.nat_add_static_mapping(
3990             self.pg0.remote_ip4,
3991             self.nat_addr,
3992             in_port,
3993             out_port,
3994             proto=IP_PROTOS.tcp,
3995             flags=self.config_flags.NAT_IS_TWICE_NAT,
3996         )
3997
3998         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3999         session_n = len(sessions)
4000
4001         self.vapi.nat_set_timeouts(
4002             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4003         )
4004
4005         _ = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4006
4007         # FIN packet out -> in
4008         p = (
4009             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4010             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4011             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=100, ack=300)
4012         )
4013         self.pg1.add_stream(p)
4014         self.pg_enable_capture(self.pg_interfaces)
4015         self.pg_start()
4016         self.pg0.get_capture(1)
4017
4018         # FIN+ACK packet in -> out
4019         p = (
4020             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4021             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4022             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=300, ack=101)
4023         )
4024
4025         self.pg0.add_stream(p)
4026         self.pg_enable_capture(self.pg_interfaces)
4027         self.pg_start()
4028         self.pg1.get_capture(1)
4029
4030         # ACK packet out -> in
4031         p = (
4032             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4033             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4034             / TCP(sport=ext_port, dport=out_port, flags="A", seq=101, ack=301)
4035         )
4036         self.pg1.add_stream(p)
4037         self.pg_enable_capture(self.pg_interfaces)
4038         self.pg_start()
4039         self.pg0.get_capture(1)
4040
4041         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4042         self.assertEqual(len(sessions) - session_n, 1)
4043
4044         # retransmit FIN packet out -> in
4045         p = (
4046             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4047             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4048             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
4049         )
4050         self.send_and_expect(self.pg1, p, self.pg0)
4051
4052         # retransmit ACK packet in -> out
4053         p = (
4054             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4055             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4056             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4057         )
4058         self.send_and_expect(self.pg0, p, self.pg1)
4059
4060         self.virtual_sleep(3)
4061         # retransmit ACK packet in -> out - this will cause session to be wiped
4062         p = (
4063             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4064             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4065             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4066         )
4067         self.send_and_assert_no_replies(self.pg0, p)
4068         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4069         self.assertEqual(len(sessions) - session_n, 0)
4070
4071     def test_tcp_session_close_simultaneous(self):
4072         """Simultaneous TCP close from both sides"""
4073
4074         in_port = self.tcp_port_in
4075         ext_port = 10505
4076
4077         self.nat_add_address(self.nat_addr)
4078         self.nat_add_inside_interface(self.pg0)
4079         self.nat_add_outside_interface(self.pg1)
4080         self.nat_add_static_mapping(
4081             self.pg0.remote_ip4,
4082             self.nat_addr,
4083             in_port,
4084             ext_port,
4085             proto=IP_PROTOS.tcp,
4086             flags=self.config_flags.NAT_IS_TWICE_NAT,
4087         )
4088
4089         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4090         session_n = len(sessions)
4091
4092         self.vapi.nat_set_timeouts(
4093             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4094         )
4095
4096         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4097
4098         # FIN packet in -> out
4099         p = (
4100             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4101             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4102             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4103         )
4104         self.send_and_expect(self.pg0, p, self.pg1)
4105
4106         # FIN packet out -> in
4107         p = (
4108             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4109             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4110             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4111         )
4112         self.send_and_expect(self.pg1, p, self.pg0)
4113
4114         # ACK packet in -> out
4115         p = (
4116             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4117             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4118             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4119         )
4120         self.send_and_expect(self.pg0, p, self.pg1)
4121
4122         # ACK packet out -> in
4123         p = (
4124             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4125             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4126             / TCP(sport=ext_port, dport=out_port, flags="A", seq=301, ack=101)
4127         )
4128         self.send_and_expect(self.pg1, p, self.pg0)
4129
4130         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4131         self.assertEqual(len(sessions) - session_n, 1)
4132
4133         # retransmit FIN packet out -> in
4134         p = (
4135             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4136             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4137             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
4138         )
4139         self.send_and_expect(self.pg1, p, self.pg0)
4140
4141         # retransmit ACK packet in -> out
4142         p = (
4143             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4144             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4145             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4146         )
4147         self.send_and_expect(self.pg0, p, self.pg1)
4148
4149         self.virtual_sleep(3)
4150         # retransmit ACK packet in -> out - this will cause session to be wiped
4151         p = (
4152             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4153             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4154             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4155         )
4156         self.pg_send(self.pg0, p)
4157         self.send_and_assert_no_replies(self.pg0, p)
4158         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4159         self.assertEqual(len(sessions) - session_n, 0)
4160
4161     def test_tcp_session_half_reopen_inside(self):
4162         """TCP session in FIN/FIN state not reopened by in2out SYN only"""
4163         in_port = self.tcp_port_in
4164         ext_port = 10505
4165
4166         self.nat_add_address(self.nat_addr)
4167         self.nat_add_inside_interface(self.pg0)
4168         self.nat_add_outside_interface(self.pg1)
4169         self.nat_add_static_mapping(
4170             self.pg0.remote_ip4,
4171             self.nat_addr,
4172             in_port,
4173             ext_port,
4174             proto=IP_PROTOS.tcp,
4175             flags=self.config_flags.NAT_IS_TWICE_NAT,
4176         )
4177
4178         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4179         session_n = len(sessions)
4180
4181         self.vapi.nat_set_timeouts(
4182             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4183         )
4184
4185         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4186
4187         # FIN packet in -> out
4188         p = (
4189             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4190             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4191             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4192         )
4193         self.send_and_expect(self.pg0, p, self.pg1)
4194
4195         # FIN packet out -> in
4196         p = (
4197             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4198             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4199             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4200         )
4201         self.send_and_expect(self.pg1, p, self.pg0)
4202
4203         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4204         self.assertEqual(len(sessions) - session_n, 1)
4205
4206         # send SYN packet in -> out
4207         p = (
4208             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4209             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4210             / TCP(sport=in_port, dport=ext_port, flags="S", seq=101, ack=301)
4211         )
4212         self.send_and_expect(self.pg0, p, self.pg1)
4213
4214         self.virtual_sleep(3)
4215         # send ACK packet in -> out - session should be wiped
4216         p = (
4217             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4218             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4219             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4220         )
4221         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
4222         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4223         self.assertEqual(len(sessions) - session_n, 0)
4224
4225     def test_tcp_session_half_reopen_outside(self):
4226         """TCP session in FIN/FIN state not reopened by out2in SYN only"""
4227         in_port = self.tcp_port_in
4228         ext_port = 10505
4229
4230         self.nat_add_address(self.nat_addr)
4231         self.nat_add_inside_interface(self.pg0)
4232         self.nat_add_outside_interface(self.pg1)
4233         self.nat_add_static_mapping(
4234             self.pg0.remote_ip4,
4235             self.nat_addr,
4236             in_port,
4237             ext_port,
4238             proto=IP_PROTOS.tcp,
4239             flags=self.config_flags.NAT_IS_TWICE_NAT,
4240         )
4241
4242         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4243         session_n = len(sessions)
4244
4245         self.vapi.nat_set_timeouts(
4246             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4247         )
4248
4249         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4250
4251         # FIN packet in -> out
4252         p = (
4253             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4254             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4255             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4256         )
4257         self.send_and_expect(self.pg0, p, self.pg1)
4258
4259         # FIN packet out -> in
4260         p = (
4261             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4262             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4263             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4264         )
4265         self.send_and_expect(self.pg1, p, self.pg0)
4266
4267         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4268         self.assertEqual(len(sessions) - session_n, 1)
4269
4270         # send SYN packet out -> in
4271         p = (
4272             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4273             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4274             / TCP(sport=ext_port, dport=out_port, flags="S", seq=300, ack=101)
4275         )
4276         self.send_and_expect(self.pg1, p, self.pg0)
4277
4278         self.virtual_sleep(3)
4279         # send ACK packet in -> out - session should be wiped
4280         p = (
4281             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4282             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4283             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4284         )
4285         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
4286         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4287         self.assertEqual(len(sessions) - session_n, 0)
4288
4289     def test_tcp_session_reopen(self):
4290         """TCP session in FIN/FIN state reopened by SYN from both sides"""
4291         in_port = self.tcp_port_in
4292         ext_port = 10505
4293
4294         self.nat_add_address(self.nat_addr)
4295         self.nat_add_inside_interface(self.pg0)
4296         self.nat_add_outside_interface(self.pg1)
4297         self.nat_add_static_mapping(
4298             self.pg0.remote_ip4,
4299             self.nat_addr,
4300             in_port,
4301             ext_port,
4302             proto=IP_PROTOS.tcp,
4303             flags=self.config_flags.NAT_IS_TWICE_NAT,
4304         )
4305
4306         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4307         session_n = len(sessions)
4308
4309         self.vapi.nat_set_timeouts(
4310             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4311         )
4312
4313         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4314
4315         # FIN packet in -> out
4316         p = (
4317             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4318             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4319             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4320         )
4321         self.send_and_expect(self.pg0, p, self.pg1)
4322
4323         # FIN packet out -> in
4324         p = (
4325             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4326             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4327             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4328         )
4329         self.send_and_expect(self.pg1, p, self.pg0)
4330
4331         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4332         self.assertEqual(len(sessions) - session_n, 1)
4333
4334         # send SYN packet out -> in
4335         p = (
4336             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4337             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4338             / TCP(sport=ext_port, dport=out_port, flags="S", seq=300, ack=101)
4339         )
4340         self.send_and_expect(self.pg1, p, self.pg0)
4341
4342         # send SYN packet in -> out
4343         p = (
4344             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4345             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4346             / TCP(sport=in_port, dport=ext_port, flags="SA", seq=101, ack=301)
4347         )
4348         self.send_and_expect(self.pg0, p, self.pg1)
4349
4350         # send ACK packet out -> in
4351         p = (
4352             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4353             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4354             / TCP(sport=ext_port, dport=out_port, flags="A", seq=300, ack=101)
4355         )
4356         self.send_and_expect(self.pg1, p, self.pg0)
4357
4358         self.virtual_sleep(3)
4359         # send ACK packet in -> out - should be forwarded and session alive
4360         p = (
4361             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4362             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4363             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4364         )
4365         self.send_and_expect(self.pg0, p, self.pg1)
4366         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4367         self.assertEqual(len(sessions) - session_n, 1)
4368
4369     def test_dynamic_vrf(self):
4370         """NAT44ED dynamic translation test: different VRF"""
4371
4372         vrf_id_in = 33
4373         vrf_id_out = 34
4374
4375         self.nat_add_address(self.nat_addr, vrf_id=vrf_id_in)
4376
4377         try:
4378             self.configure_ip4_interface(self.pg7, table_id=vrf_id_in)
4379             self.configure_ip4_interface(self.pg8, table_id=vrf_id_out)
4380
4381             self.nat_add_inside_interface(self.pg7)
4382             self.nat_add_outside_interface(self.pg8)
4383
4384             # just basic stuff nothing special
4385             pkts = self.create_stream_in(self.pg7, self.pg8)
4386             self.pg7.add_stream(pkts)
4387             self.pg_enable_capture(self.pg_interfaces)
4388             self.pg_start()
4389             capture = self.pg8.get_capture(len(pkts))
4390             self.verify_capture_out(capture, ignore_port=True)
4391
4392             pkts = self.create_stream_out(self.pg8)
4393             self.pg8.add_stream(pkts)
4394             self.pg_enable_capture(self.pg_interfaces)
4395             self.pg_start()
4396             capture = self.pg7.get_capture(len(pkts))
4397             self.verify_capture_in(capture, self.pg7)
4398
4399         finally:
4400             self.pg7.unconfig()
4401             self.pg8.unconfig()
4402
4403             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id_in})
4404             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id_out})
4405
4406     def test_dynamic_output_feature_vrf(self):
4407         """NAT44ED dynamic translation test: output-feature, VRF"""
4408
4409         # other then default (0)
4410         new_vrf_id = 22
4411
4412         self.nat_add_address(self.nat_addr)
4413         self.vapi.nat44_ed_add_del_output_interface(
4414             sw_if_index=self.pg8.sw_if_index, is_add=1
4415         )
4416         try:
4417             self.configure_ip4_interface(self.pg7, table_id=new_vrf_id)
4418             self.configure_ip4_interface(self.pg8, table_id=new_vrf_id)
4419
4420             # in2out
4421             tcpn = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
4422             udpn = self.statistics["/nat44-ed/in2out/slowpath/udp"]
4423             icmpn = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
4424             drops = self.statistics["/nat44-ed/in2out/slowpath/drops"]
4425
4426             pkts = self.create_stream_in(self.pg7, self.pg8)
4427             self.pg7.add_stream(pkts)
4428             self.pg_enable_capture(self.pg_interfaces)
4429             self.pg_start()
4430             capture = self.pg8.get_capture(len(pkts))
4431             self.verify_capture_out(capture, ignore_port=True)
4432
4433             if_idx = self.pg8.sw_if_index
4434             cnt = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
4435             self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
4436             cnt = self.statistics["/nat44-ed/in2out/slowpath/udp"]
4437             self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
4438             cnt = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
4439             self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
4440             cnt = self.statistics["/nat44-ed/in2out/slowpath/drops"]
4441             self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
4442
4443             # out2in
4444             tcpn = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
4445             udpn = self.statistics["/nat44-ed/out2in/fastpath/udp"]
4446             icmpn = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
4447             drops = self.statistics["/nat44-ed/out2in/fastpath/drops"]
4448
4449             pkts = self.create_stream_out(self.pg8)
4450             self.pg8.add_stream(pkts)
4451             self.pg_enable_capture(self.pg_interfaces)
4452             self.pg_start()
4453             capture = self.pg7.get_capture(len(pkts))
4454             self.verify_capture_in(capture, self.pg7)
4455
4456             if_idx = self.pg8.sw_if_index
4457             cnt = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
4458             self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
4459             cnt = self.statistics["/nat44-ed/out2in/fastpath/udp"]
4460             self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
4461             cnt = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
4462             self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
4463             cnt = self.statistics["/nat44-ed/out2in/fastpath/drops"]
4464             self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
4465
4466             sessions = self.statistics["/nat44-ed/total-sessions"]
4467             self.assertEqual(sessions[:, 0].sum(), 3)
4468
4469         finally:
4470             self.pg7.unconfig()
4471             self.pg8.unconfig()
4472
4473             self.vapi.ip_table_add_del(is_add=0, table={"table_id": new_vrf_id})
4474
4475     def test_next_src_nat(self):
4476         """NAT44ED On way back forward packet to nat44-in2out node."""
4477
4478         twice_nat_addr = "10.0.1.3"
4479         external_port = 80
4480         local_port = 8080
4481         post_twice_nat_port = 0
4482
4483         self.vapi.nat44_forwarding_enable_disable(enable=1)
4484         self.nat_add_address(twice_nat_addr, twice_nat=1)
4485         flags = (
4486             self.config_flags.NAT_IS_OUT2IN_ONLY
4487             | self.config_flags.NAT_IS_SELF_TWICE_NAT
4488         )
4489         self.nat_add_static_mapping(
4490             self.pg6.remote_ip4,
4491             self.pg1.remote_ip4,
4492             local_port,
4493             external_port,
4494             proto=IP_PROTOS.tcp,
4495             vrf_id=1,
4496             flags=flags,
4497         )
4498         self.vapi.nat44_interface_add_del_feature(
4499             sw_if_index=self.pg6.sw_if_index, is_add=1
4500         )
4501
4502         p = (
4503             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
4504             / IP(src=self.pg6.remote_ip4, dst=self.pg1.remote_ip4)
4505             / TCP(sport=12345, dport=external_port)
4506         )
4507         self.pg6.add_stream(p)
4508         self.pg_enable_capture(self.pg_interfaces)
4509         self.pg_start()
4510         capture = self.pg6.get_capture(1)
4511         p = capture[0]
4512         try:
4513             ip = p[IP]
4514             tcp = p[TCP]
4515             self.assertEqual(ip.src, twice_nat_addr)
4516             self.assertNotEqual(tcp.sport, 12345)
4517             post_twice_nat_port = tcp.sport
4518             self.assertEqual(ip.dst, self.pg6.remote_ip4)
4519             self.assertEqual(tcp.dport, local_port)
4520             self.assert_packet_checksums_valid(p)
4521         except:
4522             self.logger.error(ppp("Unexpected or invalid packet:", p))
4523             raise
4524
4525         p = (
4526             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
4527             / IP(src=self.pg6.remote_ip4, dst=twice_nat_addr)
4528             / TCP(sport=local_port, dport=post_twice_nat_port)
4529         )
4530         self.pg6.add_stream(p)
4531         self.pg_enable_capture(self.pg_interfaces)
4532         self.pg_start()
4533         capture = self.pg6.get_capture(1)
4534         p = capture[0]
4535         try:
4536             ip = p[IP]
4537             tcp = p[TCP]
4538             self.assertEqual(ip.src, self.pg1.remote_ip4)
4539             self.assertEqual(tcp.sport, external_port)
4540             self.assertEqual(ip.dst, self.pg6.remote_ip4)
4541             self.assertEqual(tcp.dport, 12345)
4542             self.assert_packet_checksums_valid(p)
4543         except:
4544             self.logger.error(ppp("Unexpected or invalid packet:", p))
4545             raise
4546
4547     def test_one_armed_nat44_static(self):
4548         """NAT44ED One armed NAT and 1:1 NAPT asymmetrical rule"""
4549
4550         remote_host = self.pg4.remote_hosts[0]
4551         local_host = self.pg4.remote_hosts[1]
4552         external_port = 80
4553         local_port = 8080
4554         eh_port_in = 0
4555
4556         self.vapi.nat44_forwarding_enable_disable(enable=1)
4557         self.nat_add_address(self.nat_addr, twice_nat=1)
4558         flags = (
4559             self.config_flags.NAT_IS_OUT2IN_ONLY | self.config_flags.NAT_IS_TWICE_NAT
4560         )
4561         self.nat_add_static_mapping(
4562             local_host.ip4,
4563             self.nat_addr,
4564             local_port,
4565             external_port,
4566             proto=IP_PROTOS.tcp,
4567             flags=flags,
4568         )
4569         flags = self.config_flags.NAT_IS_INSIDE
4570         self.vapi.nat44_interface_add_del_feature(
4571             sw_if_index=self.pg4.sw_if_index, is_add=1
4572         )
4573         self.vapi.nat44_interface_add_del_feature(
4574             sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
4575         )
4576
4577         # from client to service
4578         p = (
4579             Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
4580             / IP(src=remote_host.ip4, dst=self.nat_addr)
4581             / TCP(sport=12345, dport=external_port)
4582         )
4583         self.pg4.add_stream(p)
4584         self.pg_enable_capture(self.pg_interfaces)
4585         self.pg_start()
4586         capture = self.pg4.get_capture(1)
4587         p = capture[0]
4588         try:
4589             ip = p[IP]
4590             tcp = p[TCP]
4591             self.assertEqual(ip.dst, local_host.ip4)
4592             self.assertEqual(ip.src, self.nat_addr)
4593             self.assertEqual(tcp.dport, local_port)
4594             self.assertNotEqual(tcp.sport, 12345)
4595             eh_port_in = tcp.sport
4596             self.assert_packet_checksums_valid(p)
4597         except:
4598             self.logger.error(ppp("Unexpected or invalid packet:", p))
4599             raise
4600
4601         # from service back to client
4602         p = (
4603             Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
4604             / IP(src=local_host.ip4, dst=self.nat_addr)
4605             / TCP(sport=local_port, dport=eh_port_in)
4606         )
4607         self.pg4.add_stream(p)
4608         self.pg_enable_capture(self.pg_interfaces)
4609         self.pg_start()
4610         capture = self.pg4.get_capture(1)
4611         p = capture[0]
4612         try:
4613             ip = p[IP]
4614             tcp = p[TCP]
4615             self.assertEqual(ip.src, self.nat_addr)
4616             self.assertEqual(ip.dst, remote_host.ip4)
4617             self.assertEqual(tcp.sport, external_port)
4618             self.assertEqual(tcp.dport, 12345)
4619             self.assert_packet_checksums_valid(p)
4620         except:
4621             self.logger.error(ppp("Unexpected or invalid packet:", p))
4622             raise
4623
4624     def test_icmp_error_fwd_outbound(self):
4625         """NAT44ED ICMP error outbound with forwarding enabled"""
4626
4627         # Ensure that an outbound ICMP error message is properly associated
4628         # with the inbound forward bypass session it is related to.
4629         payload = "H" * 10
4630
4631         self.nat_add_address(self.nat_addr)
4632         self.nat_add_inside_interface(self.pg0)
4633         self.nat_add_outside_interface(self.pg1)
4634
4635         # enable forwarding and initiate connection out2in
4636         self.vapi.nat44_forwarding_enable_disable(enable=1)
4637         p1 = (
4638             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4639             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
4640             / UDP(sport=21, dport=20)
4641             / payload
4642         )
4643
4644         self.pg1.add_stream(p1)
4645         self.pg_enable_capture(self.pg_interfaces)
4646         self.pg_start()
4647         capture = self.pg0.get_capture(1)[0]
4648
4649         self.logger.info(self.vapi.cli("show nat44 sessions"))
4650
4651         # reply with ICMP error message in2out
4652         # We cannot reliably retrieve forward bypass sessions via the API.
4653         # session dumps for a user will only look on the worker that the
4654         # user is supposed to be mapped to in2out. The forward bypass session
4655         # is not necessarily created on that worker.
4656         p2 = (
4657             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4658             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4659             / ICMP(type="dest-unreach", code="port-unreachable")
4660             / capture[IP:]
4661         )
4662
4663         self.pg0.add_stream(p2)
4664         self.pg_enable_capture(self.pg_interfaces)
4665         self.pg_start()
4666         capture = self.pg1.get_capture(1)[0]
4667
4668         self.logger.info(self.vapi.cli("show nat44 sessions"))
4669
4670         self.logger.info(ppp("p1 packet:", p1))
4671         self.logger.info(ppp("p2 packet:", p2))
4672         self.logger.info(ppp("capture packet:", capture))
4673
4674     def test_tcp_session_open_retransmit1(self):
4675         """NAT44ED Open TCP session with SYN,ACK retransmit 1
4676
4677         The client does not receive the [SYN,ACK] or the
4678         ACK from the client is lost. Therefore, the [SYN, ACK]
4679         is retransmitted by the server.
4680         """
4681
4682         in_port = self.tcp_port_in
4683         ext_port = self.tcp_external_port
4684         payload = "H" * 10
4685
4686         self.nat_add_address(self.nat_addr)
4687         self.nat_add_inside_interface(self.pg0)
4688         self.nat_add_outside_interface(self.pg1)
4689
4690         self.vapi.nat_set_timeouts(
4691             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
4692         )
4693         # SYN packet in->out
4694         p = (
4695             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4696             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4697             / TCP(sport=in_port, dport=ext_port, flags="S")
4698         )
4699         p = self.send_and_expect(self.pg0, p, self.pg1)[0]
4700         out_port = p[TCP].sport
4701
4702         # SYN + ACK packet out->in
4703         p = (
4704             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4705             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4706             / TCP(sport=ext_port, dport=out_port, flags="SA")
4707         )
4708         self.send_and_expect(self.pg1, p, self.pg0)
4709
4710         # ACK in->out does not arrive
4711
4712         # resent SYN + ACK packet out->in
4713         p = (
4714             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4715             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4716             / TCP(sport=ext_port, dport=out_port, flags="SA")
4717         )
4718         self.send_and_expect(self.pg1, p, self.pg0)
4719
4720         # ACK packet in->out
4721         p = (
4722             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4723             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4724             / TCP(sport=in_port, dport=ext_port, flags="A")
4725         )
4726         self.send_and_expect(self.pg0, p, self.pg1)
4727
4728         # Verify that the data can be transmitted after the transitory time
4729         self.virtual_sleep(6)
4730
4731         p = (
4732             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4733             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4734             / TCP(sport=in_port, dport=ext_port, flags="PA")
4735             / Raw(payload)
4736         )
4737         self.send_and_expect(self.pg0, p, self.pg1)
4738
4739     def test_tcp_session_open_retransmit2(self):
4740         """NAT44ED Open TCP session with SYN,ACK retransmit 2
4741
4742         The ACK is lost to the server after the TCP session is opened.
4743         Data is sent by the client, then the [SYN,ACK] is
4744         retransmitted by the server.
4745         """
4746
4747         in_port = self.tcp_port_in
4748         ext_port = self.tcp_external_port
4749         payload = "H" * 10
4750
4751         self.nat_add_address(self.nat_addr)
4752         self.nat_add_inside_interface(self.pg0)
4753         self.nat_add_outside_interface(self.pg1)
4754
4755         self.vapi.nat_set_timeouts(
4756             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
4757         )
4758         # SYN packet in->out
4759         p = (
4760             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4761             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4762             / TCP(sport=in_port, dport=ext_port, flags="S")
4763         )
4764         p = self.send_and_expect(self.pg0, p, self.pg1)[0]
4765         out_port = p[TCP].sport
4766
4767         # SYN + ACK packet out->in
4768         p = (
4769             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4770             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4771             / TCP(sport=ext_port, dport=out_port, flags="SA")
4772         )
4773         self.send_and_expect(self.pg1, p, self.pg0)
4774
4775         # ACK packet in->out -- not received by the server
4776         p = (
4777             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4778             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4779             / TCP(sport=in_port, dport=ext_port, flags="A")
4780         )
4781         self.send_and_expect(self.pg0, p, self.pg1)
4782
4783         # PUSH + ACK packet in->out
4784         p = (
4785             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4786             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4787             / TCP(sport=in_port, dport=ext_port, flags="PA")
4788             / Raw(payload)
4789         )
4790         self.send_and_expect(self.pg0, p, self.pg1)
4791
4792         # resent SYN + ACK packet out->in
4793         p = (
4794             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4795             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4796             / TCP(sport=ext_port, dport=out_port, flags="SA")
4797         )
4798         self.send_and_expect(self.pg1, p, self.pg0)
4799
4800         # resent ACK packet in->out
4801         p = (
4802             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4803             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4804             / TCP(sport=in_port, dport=ext_port, flags="A")
4805         )
4806         self.send_and_expect(self.pg0, p, self.pg1)
4807
4808         # resent PUSH + ACK packet in->out
4809         p = (
4810             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4811             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4812             / TCP(sport=in_port, dport=ext_port, flags="PA")
4813             / Raw(payload)
4814         )
4815         self.send_and_expect(self.pg0, p, self.pg1)
4816
4817         # ACK packet out->in
4818         p = (
4819             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4820             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4821             / TCP(sport=ext_port, dport=out_port, flags="A")
4822         )
4823         self.send_and_expect(self.pg1, p, self.pg0)
4824
4825         # Verify that the data can be transmitted after the transitory time
4826         self.virtual_sleep(6)
4827
4828         p = (
4829             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4830             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4831             / TCP(sport=in_port, dport=ext_port, flags="PA")
4832             / Raw(payload)
4833         )
4834         self.send_and_expect(self.pg0, p, self.pg1)
4835
4836     def test_dynamic_ports_exhausted(self):
4837         """NAT44ED dynamic translation test: address ports exhaused"""
4838
4839         sessions_per_batch = 128
4840         n_available_ports = 65536 - 1024
4841         n_sessions = n_available_ports + 2 * sessions_per_batch
4842
4843         # set high enough session limit for ports to be exhausted
4844         self.plugin_disable()
4845         self.plugin_enable(max_sessions=n_sessions)
4846
4847         self.nat_add_inside_interface(self.pg0)
4848         self.nat_add_outside_interface(self.pg1)
4849
4850         # set timeouts to high for sessions to reallistically expire
4851         config = self.vapi.nat44_show_running_config()
4852         old_timeouts = config.timeouts
4853         self.vapi.nat_set_timeouts(
4854             udp=21600,
4855             tcp_established=old_timeouts.tcp_established,
4856             tcp_transitory=old_timeouts.tcp_transitory,
4857             icmp=old_timeouts.icmp,
4858         )
4859
4860         # in2out after NAT addresses added
4861         self.nat_add_address(self.nat_addr)
4862
4863         for i in range(n_sessions // sessions_per_batch):
4864             pkts = self.create_udp_stream(
4865                 self.pg0,
4866                 self.pg1,
4867                 sessions_per_batch,
4868                 base_port=i * sessions_per_batch + 100,
4869             )
4870
4871             self.pg0.add_stream(pkts)
4872             self.pg_start()
4873
4874             err = self.statistics.get_err_counter(
4875                 "/err/nat44-ed-in2out-slowpath/out of ports"
4876             )
4877             if err > sessions_per_batch:
4878                 break
4879
4880         # Check for ports to be used no more than once
4881         ports = set()
4882         sessions = self.vapi.cli("show nat44 sessions")
4883         rx = re.compile(
4884             f" *o2i flow: match: saddr {self.pg1.remote_ip4} sport [0-9]+ daddr {self.nat_addr} dport ([0-9]+) proto UDP.*"
4885         )
4886         for line in sessions.splitlines():
4887             m = rx.match(line)
4888             if m:
4889                 port = int(m.groups()[0])
4890                 self.assertNotIn(port, ports)
4891                 ports.add(port)
4892
4893         self.assertGreaterEqual(err, sessions_per_batch)
4894
4895
4896 if __name__ == "__main__":
4897     unittest.main(testRunner=VppTestRunner)