hs-test: fixed timed out tests passing in the CI
[vpp.git] / test / test_nat44_ed.py
1 #!/usr/bin/env python3
2
3 import unittest
4 from io import BytesIO
5 from random import randint, choice
6
7 import re
8 import scapy.compat
9 from framework import VppTestCase, VppLoInterface
10 from asfframework import VppTestRunner, tag_fixme_ubuntu2204, is_distro_ubuntu2204
11 from scapy.data import IP_PROTOS
12 from scapy.layers.inet import IP, TCP, UDP, ICMP, GRE
13 from scapy.layers.inet import IPerror, TCPerror
14 from scapy.layers.l2 import Ether
15 from scapy.packet import Raw
16 from statistics import variance
17 from syslog_rfc5424_parser import SyslogMessage, ParseError
18 from syslog_rfc5424_parser.constants import SyslogSeverity
19 from util import ppp, pr, ip4_range
20 from vpp_acl import AclRule, VppAcl, VppAclInterface
21 from vpp_ip_route import VppIpRoute, VppRoutePath
22 from vpp_papi import VppEnum
23 from util import StatsDiff
24
25
26 class TestNAT44ED(VppTestCase):
27     """NAT44ED Test Case"""
28
29     nat_addr = "10.0.10.3"
30
31     tcp_port_in = 6303
32     tcp_port_out = 6303
33
34     udp_port_in = 6304
35     udp_port_out = 6304
36
37     icmp_id_in = 6305
38     icmp_id_out = 6305
39
40     tcp_external_port = 80
41
42     max_sessions = 100
43
44     def setUp(self):
45         super().setUp()
46         self.plugin_enable()
47
48     def tearDown(self):
49         super().tearDown()
50         if not self.vpp_dead:
51             self.plugin_disable()
52
53     def plugin_enable(self, max_sessions=None):
54         max_sessions = max_sessions or self.max_sessions
55         self.vapi.nat44_ed_plugin_enable_disable(sessions=max_sessions, enable=1)
56
57     def plugin_disable(self):
58         self.vapi.nat44_ed_plugin_enable_disable(enable=0)
59
60     @property
61     def config_flags(self):
62         return VppEnum.vl_api_nat_config_flags_t
63
64     @property
65     def nat44_config_flags(self):
66         return VppEnum.vl_api_nat44_config_flags_t
67
68     @property
69     def syslog_severity(self):
70         return VppEnum.vl_api_syslog_severity_t
71
72     @property
73     def server_addr(self):
74         return self.pg1.remote_hosts[0].ip4
75
76     @staticmethod
77     def random_port():
78         return randint(1024, 65535)
79
80     @staticmethod
81     def proto2layer(proto):
82         if proto == IP_PROTOS.tcp:
83             return TCP
84         elif proto == IP_PROTOS.udp:
85             return UDP
86         elif proto == IP_PROTOS.icmp:
87             return ICMP
88         else:
89             raise Exception("Unsupported protocol")
90
91     @classmethod
92     def create_and_add_ip4_table(cls, i, table_id=0):
93         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": table_id})
94         i.set_table_ip4(table_id)
95
96     @classmethod
97     def configure_ip4_interface(cls, i, hosts=0, table_id=None):
98         if table_id:
99             cls.create_and_add_ip4_table(i, table_id)
100
101         i.admin_up()
102         i.config_ip4()
103         i.resolve_arp()
104
105         if hosts:
106             i.generate_remote_hosts(hosts)
107             i.configure_ipv4_neighbors()
108
109     @classmethod
110     def nat_add_interface_address(cls, i):
111         cls.vapi.nat44_add_del_interface_addr(sw_if_index=i.sw_if_index, is_add=1)
112
113     def nat_add_inside_interface(self, i):
114         self.vapi.nat44_interface_add_del_feature(
115             flags=self.config_flags.NAT_IS_INSIDE, sw_if_index=i.sw_if_index, is_add=1
116         )
117
118     def nat_add_outside_interface(self, i):
119         self.vapi.nat44_interface_add_del_feature(
120             flags=self.config_flags.NAT_IS_OUTSIDE, sw_if_index=i.sw_if_index, is_add=1
121         )
122
123     def nat_add_address(self, address, twice_nat=0, vrf_id=0xFFFFFFFF, is_add=1):
124         flags = self.config_flags.NAT_IS_TWICE_NAT if twice_nat else 0
125         self.vapi.nat44_add_del_address_range(
126             first_ip_address=address,
127             last_ip_address=address,
128             vrf_id=vrf_id,
129             is_add=is_add,
130             flags=flags,
131         )
132
133     def nat_add_static_mapping(
134         self,
135         local_ip,
136         external_ip="0.0.0.0",
137         local_port=0,
138         external_port=0,
139         vrf_id=0,
140         is_add=1,
141         external_sw_if_index=0xFFFFFFFF,
142         proto=0,
143         tag="",
144         flags=0,
145     ):
146         if not (local_port and external_port):
147             flags |= self.config_flags.NAT_IS_ADDR_ONLY
148
149         self.vapi.nat44_add_del_static_mapping(
150             is_add=is_add,
151             local_ip_address=local_ip,
152             external_ip_address=external_ip,
153             external_sw_if_index=external_sw_if_index,
154             local_port=local_port,
155             external_port=external_port,
156             vrf_id=vrf_id,
157             protocol=proto,
158             flags=flags,
159             tag=tag,
160         )
161
162     @classmethod
163     def setUpClass(cls):
164         super().setUpClass()
165         if is_distro_ubuntu2204 == True and not hasattr(cls, "vpp"):
166             return
167
168         cls.create_pg_interfaces(range(12))
169         cls.interfaces = list(cls.pg_interfaces[:4])
170
171         cls.create_and_add_ip4_table(cls.pg2, 10)
172
173         for i in cls.interfaces:
174             cls.configure_ip4_interface(i, hosts=3)
175
176         # test specific (test-multiple-vrf)
177         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 1})
178
179         # test specific (test-one-armed-nat44-static)
180         cls.pg4.generate_remote_hosts(2)
181         cls.pg4.config_ip4()
182         cls.vapi.sw_interface_add_del_address(
183             sw_if_index=cls.pg4.sw_if_index, prefix="10.0.0.1/24"
184         )
185         cls.pg4.admin_up()
186         cls.pg4.resolve_arp()
187         cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
188         cls.pg4.resolve_arp()
189
190         # test specific interface (pg5)
191         cls.pg5._local_ip4 = "10.1.1.1"
192         cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
193         cls.pg5.set_table_ip4(1)
194         cls.pg5.config_ip4()
195         cls.pg5.admin_up()
196         cls.pg5.resolve_arp()
197
198         # test specific interface (pg6)
199         cls.pg6._local_ip4 = "10.1.2.1"
200         cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
201         cls.pg6.set_table_ip4(1)
202         cls.pg6.config_ip4()
203         cls.pg6.admin_up()
204         cls.pg6.resolve_arp()
205
206         rl = list()
207
208         rl.append(
209             VppIpRoute(
210                 cls,
211                 "0.0.0.0",
212                 0,
213                 [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=0)],
214                 register=False,
215                 table_id=1,
216             )
217         )
218         rl.append(
219             VppIpRoute(
220                 cls,
221                 "0.0.0.0",
222                 0,
223                 [VppRoutePath(cls.pg1.local_ip4, cls.pg1.sw_if_index)],
224                 register=False,
225             )
226         )
227         rl.append(
228             VppIpRoute(
229                 cls,
230                 cls.pg5.remote_ip4,
231                 32,
232                 [VppRoutePath("0.0.0.0", cls.pg5.sw_if_index)],
233                 register=False,
234                 table_id=1,
235             )
236         )
237         rl.append(
238             VppIpRoute(
239                 cls,
240                 cls.pg6.remote_ip4,
241                 32,
242                 [VppRoutePath("0.0.0.0", cls.pg6.sw_if_index)],
243                 register=False,
244                 table_id=1,
245             )
246         )
247         rl.append(
248             VppIpRoute(
249                 cls,
250                 cls.pg6.remote_ip4,
251                 16,
252                 [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=1)],
253                 register=False,
254                 table_id=0,
255             )
256         )
257
258         for r in rl:
259             r.add_vpp_config()
260
261         cls.no_diff = StatsDiff(
262             {
263                 pg.sw_if_index: {
264                     "/nat44-ed/in2out/fastpath/tcp": 0,
265                     "/nat44-ed/in2out/fastpath/udp": 0,
266                     "/nat44-ed/in2out/fastpath/icmp": 0,
267                     "/nat44-ed/in2out/fastpath/drops": 0,
268                     "/nat44-ed/in2out/slowpath/tcp": 0,
269                     "/nat44-ed/in2out/slowpath/udp": 0,
270                     "/nat44-ed/in2out/slowpath/icmp": 0,
271                     "/nat44-ed/in2out/slowpath/drops": 0,
272                     "/nat44-ed/in2out/fastpath/tcp": 0,
273                     "/nat44-ed/in2out/fastpath/udp": 0,
274                     "/nat44-ed/in2out/fastpath/icmp": 0,
275                     "/nat44-ed/in2out/fastpath/drops": 0,
276                     "/nat44-ed/in2out/slowpath/tcp": 0,
277                     "/nat44-ed/in2out/slowpath/udp": 0,
278                     "/nat44-ed/in2out/slowpath/icmp": 0,
279                     "/nat44-ed/in2out/slowpath/drops": 0,
280                 }
281                 for pg in cls.pg_interfaces
282             }
283         )
284
285     def get_err_counter(self, path):
286         return self.statistics.get_err_counter(path)
287
288     def reass_hairpinning(
289         self,
290         server_addr,
291         server_in_port,
292         server_out_port,
293         host_in_port,
294         proto=IP_PROTOS.tcp,
295         ignore_port=False,
296     ):
297         layer = self.proto2layer(proto)
298
299         if proto == IP_PROTOS.tcp:
300             data = b"A" * 4 + b"B" * 16 + b"C" * 3
301         else:
302             data = b"A" * 16 + b"B" * 16 + b"C" * 3
303
304         # send packet from host to server
305         pkts = self.create_stream_frag(
306             self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto
307         )
308         self.pg0.add_stream(pkts)
309         self.pg_enable_capture(self.pg_interfaces)
310         self.pg_start()
311         frags = self.pg0.get_capture(len(pkts))
312         p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr)
313         if proto != IP_PROTOS.icmp:
314             if not ignore_port:
315                 self.assertNotEqual(p[layer].sport, host_in_port)
316             self.assertEqual(p[layer].dport, server_in_port)
317         else:
318             if not ignore_port:
319                 self.assertNotEqual(p[layer].id, host_in_port)
320         self.assertEqual(data, p[Raw].load)
321
322     def frag_out_of_order(
323         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
324     ):
325         layer = self.proto2layer(proto)
326
327         if proto == IP_PROTOS.tcp:
328             data = b"A" * 4 + b"B" * 16 + b"C" * 3
329         else:
330             data = b"A" * 16 + b"B" * 16 + b"C" * 3
331         self.port_in = self.random_port()
332
333         for i in range(2):
334             # in2out
335             pkts = self.create_stream_frag(
336                 self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
337             )
338             pkts.reverse()
339             self.pg0.add_stream(pkts)
340             self.pg_enable_capture(self.pg_interfaces)
341             self.pg_start()
342             frags = self.pg1.get_capture(len(pkts))
343             if not dont_translate:
344                 p = self.reass_frags_and_verify(
345                     frags, self.nat_addr, self.pg1.remote_ip4
346                 )
347             else:
348                 p = self.reass_frags_and_verify(
349                     frags, self.pg0.remote_ip4, self.pg1.remote_ip4
350                 )
351             if proto != IP_PROTOS.icmp:
352                 if not dont_translate:
353                     self.assertEqual(p[layer].dport, 20)
354                     if not ignore_port:
355                         self.assertNotEqual(p[layer].sport, self.port_in)
356                 else:
357                     self.assertEqual(p[layer].sport, self.port_in)
358             else:
359                 if not ignore_port:
360                     if not dont_translate:
361                         self.assertNotEqual(p[layer].id, self.port_in)
362                     else:
363                         self.assertEqual(p[layer].id, self.port_in)
364             self.assertEqual(data, p[Raw].load)
365
366             # out2in
367             if not dont_translate:
368                 dst_addr = self.nat_addr
369             else:
370                 dst_addr = self.pg0.remote_ip4
371             if proto != IP_PROTOS.icmp:
372                 sport = 20
373                 dport = p[layer].sport
374             else:
375                 sport = p[layer].id
376                 dport = 0
377             pkts = self.create_stream_frag(
378                 self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
379             )
380             pkts.reverse()
381             self.pg1.add_stream(pkts)
382             self.pg_enable_capture(self.pg_interfaces)
383             self.logger.info(self.vapi.cli("show trace"))
384             self.pg_start()
385             frags = self.pg0.get_capture(len(pkts))
386             p = self.reass_frags_and_verify(
387                 frags, self.pg1.remote_ip4, self.pg0.remote_ip4
388             )
389             if proto != IP_PROTOS.icmp:
390                 self.assertEqual(p[layer].sport, 20)
391                 self.assertEqual(p[layer].dport, self.port_in)
392             else:
393                 self.assertEqual(p[layer].id, self.port_in)
394             self.assertEqual(data, p[Raw].load)
395
396     def reass_frags_and_verify(self, frags, src, dst):
397         buffer = BytesIO()
398         for p in frags:
399             self.assertEqual(p[IP].src, src)
400             self.assertEqual(p[IP].dst, dst)
401             self.assert_ip_checksum_valid(p)
402             buffer.seek(p[IP].frag * 8)
403             buffer.write(bytes(p[IP].payload))
404         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
405         if ip.proto == IP_PROTOS.tcp:
406             p = ip / TCP(buffer.getvalue())
407             self.logger.debug(ppp("Reassembled:", p))
408             self.assert_tcp_checksum_valid(p)
409         elif ip.proto == IP_PROTOS.udp:
410             p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
411         elif ip.proto == IP_PROTOS.icmp:
412             p = ip / ICMP(buffer.getvalue())
413         return p
414
415     def frag_in_order(
416         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
417     ):
418         layer = self.proto2layer(proto)
419
420         if proto == IP_PROTOS.tcp:
421             data = b"A" * 4 + b"B" * 16 + b"C" * 3
422         else:
423             data = b"A" * 16 + b"B" * 16 + b"C" * 3
424         self.port_in = self.random_port()
425
426         # in2out
427         pkts = self.create_stream_frag(
428             self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
429         )
430         self.pg0.add_stream(pkts)
431         self.pg_enable_capture(self.pg_interfaces)
432         self.pg_start()
433         frags = self.pg1.get_capture(len(pkts))
434         if not dont_translate:
435             p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
436         else:
437             p = self.reass_frags_and_verify(
438                 frags, self.pg0.remote_ip4, self.pg1.remote_ip4
439             )
440         if proto != IP_PROTOS.icmp:
441             if not dont_translate:
442                 self.assertEqual(p[layer].dport, 20)
443                 if not ignore_port:
444                     self.assertNotEqual(p[layer].sport, self.port_in)
445             else:
446                 self.assertEqual(p[layer].sport, self.port_in)
447         else:
448             if not ignore_port:
449                 if not dont_translate:
450                     self.assertNotEqual(p[layer].id, self.port_in)
451                 else:
452                     self.assertEqual(p[layer].id, self.port_in)
453         self.assertEqual(data, p[Raw].load)
454
455         # out2in
456         if not dont_translate:
457             dst_addr = self.nat_addr
458         else:
459             dst_addr = self.pg0.remote_ip4
460         if proto != IP_PROTOS.icmp:
461             sport = 20
462             dport = p[layer].sport
463         else:
464             sport = p[layer].id
465             dport = 0
466         pkts = self.create_stream_frag(
467             self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
468         )
469         self.pg1.add_stream(pkts)
470         self.pg_enable_capture(self.pg_interfaces)
471         self.pg_start()
472         frags = self.pg0.get_capture(len(pkts))
473         p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
474         if proto != IP_PROTOS.icmp:
475             self.assertEqual(p[layer].sport, 20)
476             self.assertEqual(p[layer].dport, self.port_in)
477         else:
478             self.assertEqual(p[layer].id, self.port_in)
479         self.assertEqual(data, p[Raw].load)
480
481     def verify_capture_out(
482         self, capture, nat_ip=None, same_port=False, dst_ip=None, ignore_port=False
483     ):
484         if nat_ip is None:
485             nat_ip = self.nat_addr
486         for packet in capture:
487             try:
488                 self.assert_packet_checksums_valid(packet)
489                 self.assertEqual(packet[IP].src, nat_ip)
490                 if dst_ip is not None:
491                     self.assertEqual(packet[IP].dst, dst_ip)
492                 if packet.haslayer(TCP):
493                     if not ignore_port:
494                         if same_port:
495                             self.assertEqual(packet[TCP].sport, self.tcp_port_in)
496                         else:
497                             self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
498                     self.tcp_port_out = packet[TCP].sport
499                     self.assert_packet_checksums_valid(packet)
500                 elif packet.haslayer(UDP):
501                     if not ignore_port:
502                         if same_port:
503                             self.assertEqual(packet[UDP].sport, self.udp_port_in)
504                         else:
505                             self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
506                     self.udp_port_out = packet[UDP].sport
507                 else:
508                     if not ignore_port:
509                         if same_port:
510                             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
511                         else:
512                             self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
513                     self.icmp_id_out = packet[ICMP].id
514                     self.assert_packet_checksums_valid(packet)
515             except:
516                 self.logger.error(
517                     ppp("Unexpected or invalid packet (outside network):", packet)
518                 )
519                 raise
520
521     def verify_capture_in(self, capture, in_if):
522         for packet in capture:
523             try:
524                 self.assert_packet_checksums_valid(packet)
525                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
526                 if packet.haslayer(TCP):
527                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
528                 elif packet.haslayer(UDP):
529                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
530                 else:
531                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
532             except:
533                 self.logger.error(
534                     ppp("Unexpected or invalid packet (inside network):", packet)
535                 )
536                 raise
537
538     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
539         if dst_ip is None:
540             dst_ip = out_if.remote_ip4
541
542         pkts = []
543         # TCP
544         p = (
545             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
546             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
547             / TCP(sport=self.tcp_port_in, dport=20)
548         )
549         pkts.extend([p, p])
550
551         # UDP
552         p = (
553             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
554             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
555             / UDP(sport=self.udp_port_in, dport=20)
556         )
557         pkts.append(p)
558
559         # ICMP
560         p = (
561             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
562             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
563             / ICMP(id=self.icmp_id_in, type="echo-request")
564         )
565         pkts.append(p)
566
567         return pkts
568
569     def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
570         if dst_ip is None:
571             dst_ip = self.nat_addr
572         if not use_inside_ports:
573             tcp_port = self.tcp_port_out
574             udp_port = self.udp_port_out
575             icmp_id = self.icmp_id_out
576         else:
577             tcp_port = self.tcp_port_in
578             udp_port = self.udp_port_in
579             icmp_id = self.icmp_id_in
580         pkts = []
581         # TCP
582         p = (
583             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
584             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
585             / TCP(dport=tcp_port, sport=20)
586         )
587         pkts.extend([p, p])
588
589         # UDP
590         p = (
591             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
592             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
593             / UDP(dport=udp_port, sport=20)
594         )
595         pkts.append(p)
596
597         # ICMP
598         p = (
599             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
600             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
601             / ICMP(id=icmp_id, type="echo-reply")
602         )
603         pkts.append(p)
604
605         return pkts
606
607     def create_tcp_stream(self, in_if, out_if, count):
608         pkts = []
609         port = 6303
610
611         for i in range(count):
612             p = (
613                 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
614                 / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64)
615                 / TCP(sport=port + i, dport=20)
616             )
617             pkts.append(p)
618
619         return pkts
620
621     def create_udp_stream(self, in_if, out_if, count, base_port=6303):
622         return [
623             (
624                 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
625                 / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64)
626                 / UDP(sport=base_port + i, dport=20)
627             )
628             for i in range(count)
629         ]
630
631     def create_stream_frag(
632         self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
633     ):
634         if proto == IP_PROTOS.tcp:
635             p = (
636                 IP(src=src_if.remote_ip4, dst=dst)
637                 / TCP(sport=sport, dport=dport)
638                 / Raw(data)
639             )
640             p = p.__class__(scapy.compat.raw(p))
641             chksum = p[TCP].chksum
642             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
643         elif proto == IP_PROTOS.udp:
644             proto_header = UDP(sport=sport, dport=dport)
645         elif proto == IP_PROTOS.icmp:
646             if not echo_reply:
647                 proto_header = ICMP(id=sport, type="echo-request")
648             else:
649                 proto_header = ICMP(id=sport, type="echo-reply")
650         else:
651             raise Exception("Unsupported protocol")
652         id = self.random_port()
653         pkts = []
654         if proto == IP_PROTOS.tcp:
655             raw = Raw(data[0:4])
656         else:
657             raw = Raw(data[0:16])
658         p = (
659             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
660             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
661             / proto_header
662             / raw
663         )
664         pkts.append(p)
665         if proto == IP_PROTOS.tcp:
666             raw = Raw(data[4:20])
667         else:
668             raw = Raw(data[16:32])
669         p = (
670             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
671             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
672             / raw
673         )
674         pkts.append(p)
675         if proto == IP_PROTOS.tcp:
676             raw = Raw(data[20:])
677         else:
678             raw = Raw(data[32:])
679         p = (
680             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
681             / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
682             / raw
683         )
684         pkts.append(p)
685         return pkts
686
687     def frag_in_order_in_plus_out(
688         self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
689     ):
690         layer = self.proto2layer(proto)
691
692         if proto == IP_PROTOS.tcp:
693             data = b"A" * 4 + b"B" * 16 + b"C" * 3
694         else:
695             data = b"A" * 16 + b"B" * 16 + b"C" * 3
696         port_in = self.random_port()
697
698         for i in range(2):
699             # out2in
700             pkts = self.create_stream_frag(
701                 self.pg0, out_addr, port_in, out_port, data, proto
702             )
703             self.pg0.add_stream(pkts)
704             self.pg_enable_capture(self.pg_interfaces)
705             self.pg_start()
706             frags = self.pg1.get_capture(len(pkts))
707             p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr)
708             if proto != IP_PROTOS.icmp:
709                 self.assertEqual(p[layer].sport, port_in)
710                 self.assertEqual(p[layer].dport, in_port)
711             else:
712                 self.assertEqual(p[layer].id, port_in)
713             self.assertEqual(data, p[Raw].load)
714
715             # in2out
716             if proto != IP_PROTOS.icmp:
717                 pkts = self.create_stream_frag(
718                     self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto
719                 )
720             else:
721                 pkts = self.create_stream_frag(
722                     self.pg1,
723                     self.pg0.remote_ip4,
724                     p[layer].id,
725                     0,
726                     data,
727                     proto,
728                     echo_reply=True,
729                 )
730             self.pg1.add_stream(pkts)
731             self.pg_enable_capture(self.pg_interfaces)
732             self.pg_start()
733             frags = self.pg0.get_capture(len(pkts))
734             p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4)
735             if proto != IP_PROTOS.icmp:
736                 self.assertEqual(p[layer].sport, out_port)
737                 self.assertEqual(p[layer].dport, port_in)
738             else:
739                 self.assertEqual(p[layer].id, port_in)
740             self.assertEqual(data, p[Raw].load)
741
742     def frag_out_of_order_in_plus_out(
743         self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
744     ):
745         layer = self.proto2layer(proto)
746
747         if proto == IP_PROTOS.tcp:
748             data = b"A" * 4 + b"B" * 16 + b"C" * 3
749         else:
750             data = b"A" * 16 + b"B" * 16 + b"C" * 3
751         port_in = self.random_port()
752
753         for i in range(2):
754             # out2in
755             pkts = self.create_stream_frag(
756                 self.pg0, out_addr, port_in, out_port, data, proto
757             )
758             pkts.reverse()
759             self.pg0.add_stream(pkts)
760             self.pg_enable_capture(self.pg_interfaces)
761             self.pg_start()
762             frags = self.pg1.get_capture(len(pkts))
763             p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr)
764             if proto != IP_PROTOS.icmp:
765                 self.assertEqual(p[layer].dport, in_port)
766                 self.assertEqual(p[layer].sport, port_in)
767                 self.assertEqual(p[layer].dport, in_port)
768             else:
769                 self.assertEqual(p[layer].id, port_in)
770             self.assertEqual(data, p[Raw].load)
771
772             # in2out
773             if proto != IP_PROTOS.icmp:
774                 pkts = self.create_stream_frag(
775                     self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto
776                 )
777             else:
778                 pkts = self.create_stream_frag(
779                     self.pg1,
780                     self.pg0.remote_ip4,
781                     p[layer].id,
782                     0,
783                     data,
784                     proto,
785                     echo_reply=True,
786                 )
787             pkts.reverse()
788             self.pg1.add_stream(pkts)
789             self.pg_enable_capture(self.pg_interfaces)
790             self.pg_start()
791             frags = self.pg0.get_capture(len(pkts))
792             p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4)
793             if proto != IP_PROTOS.icmp:
794                 self.assertEqual(p[layer].sport, out_port)
795                 self.assertEqual(p[layer].dport, port_in)
796             else:
797                 self.assertEqual(p[layer].id, port_in)
798             self.assertEqual(data, p[Raw].load)
799
800     def init_tcp_session(self, in_if, out_if, in_port, ext_port):
801         # SYN packet in->out
802         p = (
803             Ether(src=in_if.remote_mac, dst=in_if.local_mac)
804             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
805             / TCP(sport=in_port, dport=ext_port, flags="S")
806         )
807         in_if.add_stream(p)
808         self.pg_enable_capture(self.pg_interfaces)
809         self.pg_start()
810         capture = out_if.get_capture(1)
811         p = capture[0]
812         out_port = p[TCP].sport
813
814         # SYN + ACK packet out->in
815         p = (
816             Ether(src=out_if.remote_mac, dst=out_if.local_mac)
817             / IP(src=out_if.remote_ip4, dst=self.nat_addr)
818             / TCP(sport=ext_port, dport=out_port, flags="SA")
819         )
820         out_if.add_stream(p)
821         self.pg_enable_capture(self.pg_interfaces)
822         self.pg_start()
823         in_if.get_capture(1)
824
825         # ACK packet in->out
826         p = (
827             Ether(src=in_if.remote_mac, dst=in_if.local_mac)
828             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
829             / TCP(sport=in_port, dport=ext_port, flags="A")
830         )
831         in_if.add_stream(p)
832         self.pg_enable_capture(self.pg_interfaces)
833         self.pg_start()
834         out_if.get_capture(1)
835
836         return out_port
837
838     def twice_nat_common(
839         self, self_twice_nat=False, same_pg=False, lb=False, client_id=None
840     ):
841         twice_nat_addr = "10.0.1.3"
842
843         port_in = 8080
844         if lb:
845             if not same_pg:
846                 port_in1 = port_in
847                 port_in2 = port_in
848             else:
849                 port_in1 = port_in + 1
850                 port_in2 = port_in + 2
851
852         port_out = 80
853         eh_port_out = 4567
854
855         server1 = self.pg0.remote_hosts[0]
856         server2 = self.pg0.remote_hosts[1]
857         if lb and same_pg:
858             server2 = server1
859         if not lb:
860             server = server1
861
862         pg0 = self.pg0
863         if same_pg:
864             pg1 = self.pg0
865         else:
866             pg1 = self.pg1
867
868         eh_translate = (not self_twice_nat) or (not lb and same_pg) or client_id == 1
869
870         self.nat_add_address(self.nat_addr)
871         self.nat_add_address(twice_nat_addr, twice_nat=1)
872
873         flags = 0
874         if self_twice_nat:
875             flags |= self.config_flags.NAT_IS_SELF_TWICE_NAT
876         else:
877             flags |= self.config_flags.NAT_IS_TWICE_NAT
878
879         if not lb:
880             self.nat_add_static_mapping(
881                 pg0.remote_ip4,
882                 self.nat_addr,
883                 port_in,
884                 port_out,
885                 proto=IP_PROTOS.tcp,
886                 flags=flags,
887             )
888         else:
889             locals = [
890                 {"addr": server1.ip4, "port": port_in1, "probability": 50, "vrf_id": 0},
891                 {"addr": server2.ip4, "port": port_in2, "probability": 50, "vrf_id": 0},
892             ]
893             out_addr = self.nat_addr
894
895             self.vapi.nat44_add_del_lb_static_mapping(
896                 is_add=1,
897                 flags=flags,
898                 external_addr=out_addr,
899                 external_port=port_out,
900                 protocol=IP_PROTOS.tcp,
901                 local_num=len(locals),
902                 locals=locals,
903             )
904         self.nat_add_inside_interface(pg0)
905         self.nat_add_outside_interface(pg1)
906
907         if same_pg:
908             if not lb:
909                 client = server
910             else:
911                 assert client_id is not None
912                 if client_id == 1:
913                     client = self.pg0.remote_hosts[0]
914                 elif client_id == 2:
915                     client = self.pg0.remote_hosts[1]
916         else:
917             client = pg1.remote_hosts[0]
918         p = (
919             Ether(src=pg1.remote_mac, dst=pg1.local_mac)
920             / IP(src=client.ip4, dst=self.nat_addr)
921             / TCP(sport=eh_port_out, dport=port_out)
922         )
923         pg1.add_stream(p)
924         self.pg_enable_capture(self.pg_interfaces)
925         self.pg_start()
926         capture = pg0.get_capture(1)
927         p = capture[0]
928         try:
929             ip = p[IP]
930             tcp = p[TCP]
931             if lb:
932                 if ip.dst == server1.ip4:
933                     server = server1
934                     port_in = port_in1
935                 else:
936                     server = server2
937                     port_in = port_in2
938             self.assertEqual(ip.dst, server.ip4)
939             if lb and same_pg:
940                 self.assertIn(tcp.dport, [port_in1, port_in2])
941             else:
942                 self.assertEqual(tcp.dport, port_in)
943             if eh_translate:
944                 self.assertEqual(ip.src, twice_nat_addr)
945                 self.assertNotEqual(tcp.sport, eh_port_out)
946             else:
947                 self.assertEqual(ip.src, client.ip4)
948                 self.assertEqual(tcp.sport, eh_port_out)
949             eh_addr_in = ip.src
950             eh_port_in = tcp.sport
951             saved_port_in = tcp.dport
952             self.assert_packet_checksums_valid(p)
953         except:
954             self.logger.error(ppp("Unexpected or invalid packet:", p))
955             raise
956
957         p = (
958             Ether(src=server.mac, dst=pg0.local_mac)
959             / IP(src=server.ip4, dst=eh_addr_in)
960             / TCP(sport=saved_port_in, dport=eh_port_in)
961         )
962         pg0.add_stream(p)
963         self.pg_enable_capture(self.pg_interfaces)
964         self.pg_start()
965         capture = pg1.get_capture(1)
966         p = capture[0]
967         try:
968             ip = p[IP]
969             tcp = p[TCP]
970             self.assertEqual(ip.dst, client.ip4)
971             self.assertEqual(ip.src, self.nat_addr)
972             self.assertEqual(tcp.dport, eh_port_out)
973             self.assertEqual(tcp.sport, port_out)
974             self.assert_packet_checksums_valid(p)
975         except:
976             self.logger.error(ppp("Unexpected or invalid packet:", p))
977             raise
978
979         if eh_translate:
980             sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
981             self.assertEqual(len(sessions), 1)
982             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
983             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_TWICE_NAT)
984             self.logger.info(self.vapi.cli("show nat44 sessions"))
985             self.vapi.nat44_del_session(
986                 address=sessions[0].inside_ip_address,
987                 port=sessions[0].inside_port,
988                 protocol=sessions[0].protocol,
989                 flags=(
990                     self.config_flags.NAT_IS_INSIDE
991                     | self.config_flags.NAT_IS_EXT_HOST_VALID
992                 ),
993                 ext_host_address=sessions[0].ext_host_nat_address,
994                 ext_host_port=sessions[0].ext_host_nat_port,
995             )
996             sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
997             self.assertEqual(len(sessions), 0)
998
999     def verify_syslog_sess(self, data, msgid, is_ip6=False):
1000         message = data.decode("utf-8")
1001         try:
1002             message = SyslogMessage.parse(message)
1003         except ParseError as e:
1004             self.logger.error(e)
1005             raise
1006         else:
1007             self.assertEqual(message.severity, SyslogSeverity.info)
1008             self.assertEqual(message.appname, "NAT")
1009             self.assertEqual(message.msgid, msgid)
1010             sd_params = message.sd.get("nsess")
1011             self.assertTrue(sd_params is not None)
1012             if is_ip6:
1013                 self.assertEqual(sd_params.get("IATYP"), "IPv6")
1014                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip6)
1015             else:
1016                 self.assertEqual(sd_params.get("IATYP"), "IPv4")
1017                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
1018                 self.assertTrue(sd_params.get("SSUBIX") is not None)
1019             self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
1020             self.assertEqual(sd_params.get("XATYP"), "IPv4")
1021             self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
1022             self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
1023             self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
1024             self.assertEqual(sd_params.get("SVLAN"), "0")
1025             self.assertEqual(sd_params.get("XDADDR"), self.pg1.remote_ip4)
1026             self.assertEqual(sd_params.get("XDPORT"), "%d" % self.tcp_external_port)
1027
1028     def test_icmp_error(self):
1029         """NAT44ED test ICMP error message with inner header"""
1030
1031         payload = "H" * 10
1032
1033         self.nat_add_address(self.nat_addr)
1034         self.nat_add_inside_interface(self.pg0)
1035         self.nat_add_outside_interface(self.pg1)
1036
1037         # in2out (initiate connection)
1038         p1 = [
1039             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1040             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1041             / UDP(sport=21, dport=20)
1042             / payload,
1043             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1044             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1045             / TCP(sport=21, dport=20, flags="S")
1046             / payload,
1047             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1048             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1049             / ICMP(type="echo-request", id=7777)
1050             / payload,
1051         ]
1052
1053         capture = self.send_and_expect(self.pg0, p1, self.pg1)
1054
1055         # out2in (send error message)
1056         p2 = [
1057             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1058             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1059             / ICMP(type="dest-unreach", code="port-unreachable")
1060             / c[IP:]
1061             for c in capture
1062         ]
1063
1064         capture = self.send_and_expect(self.pg1, p2, self.pg0)
1065
1066         for c in capture:
1067             try:
1068                 assert c[IP].dst == self.pg0.remote_ip4
1069                 assert c[IPerror].src == self.pg0.remote_ip4
1070             except AssertionError as a:
1071                 raise AssertionError(f"Packet {pr(c)} not translated properly") from a
1072
1073     def test_icmp_echo_reply_trailer(self):
1074         """ICMP echo reply with ethernet trailer"""
1075
1076         self.nat_add_address(self.nat_addr)
1077         self.nat_add_inside_interface(self.pg0)
1078         self.nat_add_outside_interface(self.pg1)
1079
1080         # in2out
1081         p1 = (
1082             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1083             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1084             / ICMP(type=8, id=0xABCD, seq=0)
1085         )
1086
1087         self.pg0.add_stream(p1)
1088         self.pg_enable_capture(self.pg_interfaces)
1089         self.pg_start()
1090         c = self.pg1.get_capture(1)[0]
1091
1092         self.logger.debug(self.vapi.cli("show trace"))
1093
1094         # out2in
1095         p2 = (
1096             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1097             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, id=0xEE59)
1098             / ICMP(type=0, id=c[ICMP].id, seq=0)
1099         )
1100
1101         # force checksum calculation
1102         p2 = p2.__class__(bytes(p2))
1103
1104         self.logger.debug(ppp("Packet before modification:", p2))
1105
1106         # hex representation of vss monitoring ethernet trailer
1107         # this seems to be just added to end of packet without modifying
1108         # IP or ICMP lengths / checksums
1109         p2 = p2 / Raw("\x00\x00\x52\x54\x00\x46\xab\x04\x84\x18")
1110         # change it so that IP/ICMP is unaffected
1111         p2[IP].len = 28
1112
1113         self.logger.debug(ppp("Packet with added trailer:", p2))
1114
1115         self.pg1.add_stream(p2)
1116         self.pg_enable_capture(self.pg_interfaces)
1117         self.pg_start()
1118
1119         self.pg0.get_capture(1)
1120
1121     def test_users_dump(self):
1122         """NAT44ED API test - nat44_user_dump"""
1123
1124         self.nat_add_address(self.nat_addr)
1125         self.nat_add_inside_interface(self.pg0)
1126         self.nat_add_outside_interface(self.pg1)
1127
1128         self.vapi.nat44_forwarding_enable_disable(enable=1)
1129
1130         local_ip = self.pg0.remote_ip4
1131         external_ip = self.nat_addr
1132         self.nat_add_static_mapping(local_ip, external_ip)
1133
1134         users = self.vapi.nat44_user_dump()
1135         self.assertEqual(len(users), 0)
1136
1137         # in2out - static mapping match
1138
1139         pkts = self.create_stream_out(self.pg1)
1140         self.pg1.add_stream(pkts)
1141         self.pg_enable_capture(self.pg_interfaces)
1142         self.pg_start()
1143         capture = self.pg0.get_capture(len(pkts))
1144         self.verify_capture_in(capture, self.pg0)
1145
1146         pkts = self.create_stream_in(self.pg0, self.pg1)
1147         self.pg0.add_stream(pkts)
1148         self.pg_enable_capture(self.pg_interfaces)
1149         self.pg_start()
1150         capture = self.pg1.get_capture(len(pkts))
1151         self.verify_capture_out(capture, same_port=True)
1152
1153         users = self.vapi.nat44_user_dump()
1154         self.assertEqual(len(users), 1)
1155         static_user = users[0]
1156         self.assertEqual(static_user.nstaticsessions, 3)
1157         self.assertEqual(static_user.nsessions, 0)
1158
1159         # in2out - no static mapping match (forwarding test)
1160
1161         host0 = self.pg0.remote_hosts[0]
1162         self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1163         try:
1164             pkts = self.create_stream_out(
1165                 self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1166             )
1167             self.pg1.add_stream(pkts)
1168             self.pg_enable_capture(self.pg_interfaces)
1169             self.pg_start()
1170             capture = self.pg0.get_capture(len(pkts))
1171             self.verify_capture_in(capture, self.pg0)
1172
1173             pkts = self.create_stream_in(self.pg0, self.pg1)
1174             self.pg0.add_stream(pkts)
1175             self.pg_enable_capture(self.pg_interfaces)
1176             self.pg_start()
1177             capture = self.pg1.get_capture(len(pkts))
1178             self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, same_port=True)
1179         finally:
1180             self.pg0.remote_hosts[0] = host0
1181
1182         users = self.vapi.nat44_user_dump()
1183         self.assertEqual(len(users), 2)
1184         if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4:
1185             non_static_user = users[1]
1186             static_user = users[0]
1187         else:
1188             non_static_user = users[0]
1189             static_user = users[1]
1190         self.assertEqual(static_user.nstaticsessions, 3)
1191         self.assertEqual(static_user.nsessions, 0)
1192         self.assertEqual(non_static_user.nstaticsessions, 0)
1193         self.assertEqual(non_static_user.nsessions, 3)
1194
1195         users = self.vapi.nat44_user_dump()
1196         self.assertEqual(len(users), 2)
1197         if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4:
1198             non_static_user = users[1]
1199             static_user = users[0]
1200         else:
1201             non_static_user = users[0]
1202             static_user = users[1]
1203         self.assertEqual(static_user.nstaticsessions, 3)
1204         self.assertEqual(static_user.nsessions, 0)
1205         self.assertEqual(non_static_user.nstaticsessions, 0)
1206         self.assertEqual(non_static_user.nsessions, 3)
1207
1208     def test_frag_out_of_order_do_not_translate(self):
1209         """NAT44ED don't translate fragments arriving out of order"""
1210         self.nat_add_inside_interface(self.pg0)
1211         self.nat_add_outside_interface(self.pg1)
1212         self.vapi.nat44_forwarding_enable_disable(enable=True)
1213         self.frag_out_of_order(proto=IP_PROTOS.tcp, dont_translate=True)
1214
1215     def test_forwarding(self):
1216         """NAT44ED forwarding test"""
1217
1218         self.nat_add_inside_interface(self.pg0)
1219         self.nat_add_outside_interface(self.pg1)
1220         self.vapi.nat44_forwarding_enable_disable(enable=1)
1221
1222         real_ip = self.pg0.remote_ip4
1223         alias_ip = self.nat_addr
1224         flags = self.config_flags.NAT_IS_ADDR_ONLY
1225         self.vapi.nat44_add_del_static_mapping(
1226             is_add=1,
1227             local_ip_address=real_ip,
1228             external_ip_address=alias_ip,
1229             external_sw_if_index=0xFFFFFFFF,
1230             flags=flags,
1231         )
1232
1233         try:
1234             # in2out - static mapping match
1235
1236             pkts = self.create_stream_out(self.pg1)
1237             self.pg1.add_stream(pkts)
1238             self.pg_enable_capture(self.pg_interfaces)
1239             self.pg_start()
1240             capture = self.pg0.get_capture(len(pkts))
1241             self.verify_capture_in(capture, self.pg0)
1242
1243             pkts = self.create_stream_in(self.pg0, self.pg1)
1244             self.pg0.add_stream(pkts)
1245             self.pg_enable_capture(self.pg_interfaces)
1246             self.pg_start()
1247             capture = self.pg1.get_capture(len(pkts))
1248             self.verify_capture_out(capture, same_port=True)
1249
1250             # in2out - no static mapping match
1251
1252             host0 = self.pg0.remote_hosts[0]
1253             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1254             try:
1255                 pkts = self.create_stream_out(
1256                     self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1257                 )
1258                 self.pg1.add_stream(pkts)
1259                 self.pg_enable_capture(self.pg_interfaces)
1260                 self.pg_start()
1261                 capture = self.pg0.get_capture(len(pkts))
1262                 self.verify_capture_in(capture, self.pg0)
1263
1264                 pkts = self.create_stream_in(self.pg0, self.pg1)
1265                 self.pg0.add_stream(pkts)
1266                 self.pg_enable_capture(self.pg_interfaces)
1267                 self.pg_start()
1268                 capture = self.pg1.get_capture(len(pkts))
1269                 self.verify_capture_out(
1270                     capture, nat_ip=self.pg0.remote_ip4, same_port=True
1271                 )
1272             finally:
1273                 self.pg0.remote_hosts[0] = host0
1274
1275             user = self.pg0.remote_hosts[1]
1276             sessions = self.vapi.nat44_user_session_dump(user.ip4, 0)
1277             self.assertEqual(len(sessions), 3)
1278             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1279             self.vapi.nat44_del_session(
1280                 address=sessions[0].inside_ip_address,
1281                 port=sessions[0].inside_port,
1282                 protocol=sessions[0].protocol,
1283                 flags=(
1284                     self.config_flags.NAT_IS_INSIDE
1285                     | self.config_flags.NAT_IS_EXT_HOST_VALID
1286                 ),
1287                 ext_host_address=sessions[0].ext_host_address,
1288                 ext_host_port=sessions[0].ext_host_port,
1289             )
1290             sessions = self.vapi.nat44_user_session_dump(user.ip4, 0)
1291             self.assertEqual(len(sessions), 2)
1292
1293         finally:
1294             self.vapi.nat44_forwarding_enable_disable(enable=0)
1295             flags = self.config_flags.NAT_IS_ADDR_ONLY
1296             self.vapi.nat44_add_del_static_mapping(
1297                 is_add=0,
1298                 local_ip_address=real_ip,
1299                 external_ip_address=alias_ip,
1300                 external_sw_if_index=0xFFFFFFFF,
1301                 flags=flags,
1302             )
1303
1304     def test_output_feature_and_service2(self):
1305         """NAT44ED interface output feature and service host direct access"""
1306         self.vapi.nat44_forwarding_enable_disable(enable=1)
1307         self.nat_add_address(self.nat_addr)
1308
1309         self.vapi.nat44_ed_add_del_output_interface(
1310             sw_if_index=self.pg1.sw_if_index, is_add=1
1311         )
1312
1313         # session initiated from service host - translate
1314         pkts = self.create_stream_in(self.pg0, self.pg1)
1315         self.pg0.add_stream(pkts)
1316         self.pg_enable_capture(self.pg_interfaces)
1317         self.pg_start()
1318         capture = self.pg1.get_capture(len(pkts))
1319         self.verify_capture_out(capture, ignore_port=True)
1320
1321         pkts = self.create_stream_out(self.pg1)
1322         self.pg1.add_stream(pkts)
1323         self.pg_enable_capture(self.pg_interfaces)
1324         self.pg_start()
1325         capture = self.pg0.get_capture(len(pkts))
1326         self.verify_capture_in(capture, self.pg0)
1327
1328         # session initiated from remote host - do not translate
1329         tcp_port_in = self.tcp_port_in
1330         udp_port_in = self.udp_port_in
1331         icmp_id_in = self.icmp_id_in
1332
1333         self.tcp_port_in = 60303
1334         self.udp_port_in = 60304
1335         self.icmp_id_in = 60305
1336
1337         try:
1338             pkts = self.create_stream_out(
1339                 self.pg1, self.pg0.remote_ip4, use_inside_ports=True
1340             )
1341             self.pg1.add_stream(pkts)
1342             self.pg_enable_capture(self.pg_interfaces)
1343             self.pg_start()
1344             capture = self.pg0.get_capture(len(pkts))
1345             self.verify_capture_in(capture, self.pg0)
1346
1347             pkts = self.create_stream_in(self.pg0, self.pg1)
1348             self.pg0.add_stream(pkts)
1349             self.pg_enable_capture(self.pg_interfaces)
1350             self.pg_start()
1351             capture = self.pg1.get_capture(len(pkts))
1352             self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, same_port=True)
1353         finally:
1354             self.tcp_port_in = tcp_port_in
1355             self.udp_port_in = udp_port_in
1356             self.icmp_id_in = icmp_id_in
1357
1358     def test_twice_nat(self):
1359         """NAT44ED Twice NAT"""
1360         self.twice_nat_common()
1361
1362     def test_self_twice_nat_positive(self):
1363         """NAT44ED Self Twice NAT (positive test)"""
1364         self.twice_nat_common(self_twice_nat=True, same_pg=True)
1365
1366     def test_self_twice_nat_lb_positive(self):
1367         """NAT44ED Self Twice NAT local service load balancing (positive test)"""
1368         self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, client_id=1)
1369
1370     def test_twice_nat_lb(self):
1371         """NAT44ED Twice NAT local service load balancing"""
1372         self.twice_nat_common(lb=True)
1373
1374     def test_output_feature(self):
1375         """NAT44ED interface output feature (in2out postrouting)"""
1376         self.vapi.nat44_forwarding_enable_disable(enable=1)
1377         self.nat_add_address(self.nat_addr)
1378
1379         self.nat_add_outside_interface(self.pg0)
1380         self.vapi.nat44_ed_add_del_output_interface(
1381             sw_if_index=self.pg1.sw_if_index, is_add=1
1382         )
1383
1384         # in2out
1385         pkts = self.create_stream_in(self.pg0, self.pg1)
1386         self.pg0.add_stream(pkts)
1387         self.pg_enable_capture(self.pg_interfaces)
1388         self.pg_start()
1389         capture = self.pg1.get_capture(len(pkts))
1390         self.verify_capture_out(capture, ignore_port=True)
1391         self.logger.debug(self.vapi.cli("show trace"))
1392
1393         # out2in
1394         pkts = self.create_stream_out(self.pg1)
1395         self.pg1.add_stream(pkts)
1396         self.pg_enable_capture(self.pg_interfaces)
1397         self.pg_start()
1398         capture = self.pg0.get_capture(len(pkts))
1399         self.verify_capture_in(capture, self.pg0)
1400         self.logger.debug(self.vapi.cli("show trace"))
1401
1402         # in2out
1403         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1404         self.pg0.add_stream(pkts)
1405         self.pg_enable_capture(self.pg_interfaces)
1406         self.pg_start()
1407         capture = self.pg1.get_capture(len(pkts))
1408         self.verify_capture_out(capture, ignore_port=True)
1409         self.logger.debug(self.vapi.cli("show trace"))
1410
1411         # out2in
1412         pkts = self.create_stream_out(self.pg1, ttl=2)
1413         self.pg1.add_stream(pkts)
1414         self.pg_enable_capture(self.pg_interfaces)
1415         self.pg_start()
1416         capture = self.pg0.get_capture(len(pkts))
1417         self.verify_capture_in(capture, self.pg0)
1418         self.logger.debug(self.vapi.cli("show trace"))
1419
1420         # in2out
1421         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1422         capture = self.send_and_expect_some(self.pg0, pkts, self.pg0)
1423         for p in capture:
1424             self.assertIn(ICMP, p)
1425             self.assertEqual(p[ICMP].type, 11)  # 11 == time-exceeded
1426
1427     def test_static_with_port_out2(self):
1428         """NAT44ED 1:1 NAPT asymmetrical rule"""
1429
1430         external_port = 80
1431         local_port = 8080
1432
1433         self.vapi.nat44_forwarding_enable_disable(enable=1)
1434         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1435         self.nat_add_static_mapping(
1436             self.pg0.remote_ip4,
1437             self.nat_addr,
1438             local_port,
1439             external_port,
1440             proto=IP_PROTOS.tcp,
1441             flags=flags,
1442         )
1443
1444         self.nat_add_inside_interface(self.pg0)
1445         self.nat_add_outside_interface(self.pg1)
1446
1447         # from client to service
1448         p = (
1449             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1450             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1451             / TCP(sport=12345, dport=external_port)
1452         )
1453         self.pg1.add_stream(p)
1454         self.pg_enable_capture(self.pg_interfaces)
1455         self.pg_start()
1456         capture = self.pg0.get_capture(1)
1457         p = capture[0]
1458         try:
1459             ip = p[IP]
1460             tcp = p[TCP]
1461             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1462             self.assertEqual(tcp.dport, local_port)
1463             self.assert_packet_checksums_valid(p)
1464         except:
1465             self.logger.error(ppp("Unexpected or invalid packet:", p))
1466             raise
1467
1468         # ICMP error
1469         p = (
1470             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1471             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1472             / ICMP(type=11)
1473             / capture[0][IP]
1474         )
1475         self.pg0.add_stream(p)
1476         self.pg_enable_capture(self.pg_interfaces)
1477         self.pg_start()
1478         capture = self.pg1.get_capture(1)
1479         p = capture[0]
1480         try:
1481             self.assertEqual(p[IP].src, self.nat_addr)
1482             inner = p[IPerror]
1483             self.assertEqual(inner.dst, self.nat_addr)
1484             self.assertEqual(inner[TCPerror].dport, external_port)
1485         except:
1486             self.logger.error(ppp("Unexpected or invalid packet:", p))
1487             raise
1488
1489         # from service back to client
1490         p = (
1491             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1492             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1493             / TCP(sport=local_port, dport=12345)
1494         )
1495         self.pg0.add_stream(p)
1496         self.pg_enable_capture(self.pg_interfaces)
1497         self.pg_start()
1498         capture = self.pg1.get_capture(1)
1499         p = capture[0]
1500         try:
1501             ip = p[IP]
1502             tcp = p[TCP]
1503             self.assertEqual(ip.src, self.nat_addr)
1504             self.assertEqual(tcp.sport, external_port)
1505             self.assert_packet_checksums_valid(p)
1506         except:
1507             self.logger.error(ppp("Unexpected or invalid packet:", p))
1508             raise
1509
1510         # ICMP error
1511         p = (
1512             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1513             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1514             / ICMP(type=11)
1515             / capture[0][IP]
1516         )
1517         self.pg1.add_stream(p)
1518         self.pg_enable_capture(self.pg_interfaces)
1519         self.pg_start()
1520         capture = self.pg0.get_capture(1)
1521         p = capture[0]
1522         try:
1523             self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1524             inner = p[IPerror]
1525             self.assertEqual(inner.src, self.pg0.remote_ip4)
1526             self.assertEqual(inner[TCPerror].sport, local_port)
1527         except:
1528             self.logger.error(ppp("Unexpected or invalid packet:", p))
1529             raise
1530
1531         # from client to server (no translation)
1532         p = (
1533             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1534             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
1535             / TCP(sport=12346, dport=local_port)
1536         )
1537         self.pg1.add_stream(p)
1538         self.pg_enable_capture(self.pg_interfaces)
1539         self.pg_start()
1540         capture = self.pg0.get_capture(1)
1541         p = capture[0]
1542         try:
1543             ip = p[IP]
1544             tcp = p[TCP]
1545             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1546             self.assertEqual(tcp.dport, local_port)
1547             self.assert_packet_checksums_valid(p)
1548         except:
1549             self.logger.error(ppp("Unexpected or invalid packet:", p))
1550             raise
1551
1552         # from service back to client (no translation)
1553         p = (
1554             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1555             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1556             / TCP(sport=local_port, dport=12346)
1557         )
1558         self.pg0.add_stream(p)
1559         self.pg_enable_capture(self.pg_interfaces)
1560         self.pg_start()
1561         capture = self.pg1.get_capture(1)
1562         p = capture[0]
1563         try:
1564             ip = p[IP]
1565             tcp = p[TCP]
1566             self.assertEqual(ip.src, self.pg0.remote_ip4)
1567             self.assertEqual(tcp.sport, local_port)
1568             self.assert_packet_checksums_valid(p)
1569         except:
1570             self.logger.error(ppp("Unexpected or invalid packet:", p))
1571             raise
1572
1573     def test_static_lb(self):
1574         """NAT44ED local service load balancing"""
1575         external_addr_n = self.nat_addr
1576         external_port = 80
1577         local_port = 8080
1578         server1 = self.pg0.remote_hosts[0]
1579         server2 = self.pg0.remote_hosts[1]
1580
1581         locals = [
1582             {"addr": server1.ip4, "port": local_port, "probability": 70, "vrf_id": 0},
1583             {"addr": server2.ip4, "port": local_port, "probability": 30, "vrf_id": 0},
1584         ]
1585
1586         self.nat_add_address(self.nat_addr)
1587         self.vapi.nat44_add_del_lb_static_mapping(
1588             is_add=1,
1589             external_addr=external_addr_n,
1590             external_port=external_port,
1591             protocol=IP_PROTOS.tcp,
1592             local_num=len(locals),
1593             locals=locals,
1594         )
1595         flags = self.config_flags.NAT_IS_INSIDE
1596         self.vapi.nat44_interface_add_del_feature(
1597             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1598         )
1599         self.vapi.nat44_interface_add_del_feature(
1600             sw_if_index=self.pg1.sw_if_index, is_add=1
1601         )
1602
1603         # from client to service
1604         p = (
1605             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1606             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1607             / TCP(sport=12345, dport=external_port)
1608         )
1609         self.pg1.add_stream(p)
1610         self.pg_enable_capture(self.pg_interfaces)
1611         self.pg_start()
1612         capture = self.pg0.get_capture(1)
1613         p = capture[0]
1614         server = None
1615         try:
1616             ip = p[IP]
1617             tcp = p[TCP]
1618             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1619             if ip.dst == server1.ip4:
1620                 server = server1
1621             else:
1622                 server = server2
1623             self.assertEqual(tcp.dport, local_port)
1624             self.assert_packet_checksums_valid(p)
1625         except:
1626             self.logger.error(ppp("Unexpected or invalid packet:", p))
1627             raise
1628
1629         # from service back to client
1630         p = (
1631             Ether(src=server.mac, dst=self.pg0.local_mac)
1632             / IP(src=server.ip4, dst=self.pg1.remote_ip4)
1633             / TCP(sport=local_port, dport=12345)
1634         )
1635         self.pg0.add_stream(p)
1636         self.pg_enable_capture(self.pg_interfaces)
1637         self.pg_start()
1638         capture = self.pg1.get_capture(1)
1639         p = capture[0]
1640         try:
1641             ip = p[IP]
1642             tcp = p[TCP]
1643             self.assertEqual(ip.src, self.nat_addr)
1644             self.assertEqual(tcp.sport, external_port)
1645             self.assert_packet_checksums_valid(p)
1646         except:
1647             self.logger.error(ppp("Unexpected or invalid packet:", p))
1648             raise
1649
1650         sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
1651         self.assertEqual(len(sessions), 1)
1652         self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1653         self.vapi.nat44_del_session(
1654             address=sessions[0].inside_ip_address,
1655             port=sessions[0].inside_port,
1656             protocol=sessions[0].protocol,
1657             flags=(
1658                 self.config_flags.NAT_IS_INSIDE
1659                 | self.config_flags.NAT_IS_EXT_HOST_VALID
1660             ),
1661             ext_host_address=sessions[0].ext_host_address,
1662             ext_host_port=sessions[0].ext_host_port,
1663         )
1664         sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
1665         self.assertEqual(len(sessions), 0)
1666
1667     def test_static_lb_2(self):
1668         """NAT44ED local service load balancing (asymmetrical rule)"""
1669         external_addr = self.nat_addr
1670         external_port = 80
1671         local_port = 8080
1672         server1 = self.pg0.remote_hosts[0]
1673         server2 = self.pg0.remote_hosts[1]
1674
1675         locals = [
1676             {"addr": server1.ip4, "port": local_port, "probability": 70, "vrf_id": 0},
1677             {"addr": server2.ip4, "port": local_port, "probability": 30, "vrf_id": 0},
1678         ]
1679
1680         self.vapi.nat44_forwarding_enable_disable(enable=1)
1681         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1682         self.vapi.nat44_add_del_lb_static_mapping(
1683             is_add=1,
1684             flags=flags,
1685             external_addr=external_addr,
1686             external_port=external_port,
1687             protocol=IP_PROTOS.tcp,
1688             local_num=len(locals),
1689             locals=locals,
1690         )
1691         flags = self.config_flags.NAT_IS_INSIDE
1692         self.vapi.nat44_interface_add_del_feature(
1693             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1694         )
1695         self.vapi.nat44_interface_add_del_feature(
1696             sw_if_index=self.pg1.sw_if_index, is_add=1
1697         )
1698
1699         # from client to service
1700         p = (
1701             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1702             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1703             / TCP(sport=12345, dport=external_port)
1704         )
1705         self.pg1.add_stream(p)
1706         self.pg_enable_capture(self.pg_interfaces)
1707         self.pg_start()
1708         capture = self.pg0.get_capture(1)
1709         p = capture[0]
1710         server = None
1711         try:
1712             ip = p[IP]
1713             tcp = p[TCP]
1714             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1715             if ip.dst == server1.ip4:
1716                 server = server1
1717             else:
1718                 server = server2
1719             self.assertEqual(tcp.dport, local_port)
1720             self.assert_packet_checksums_valid(p)
1721         except:
1722             self.logger.error(ppp("Unexpected or invalid packet:", p))
1723             raise
1724
1725         # from service back to client
1726         p = (
1727             Ether(src=server.mac, dst=self.pg0.local_mac)
1728             / IP(src=server.ip4, dst=self.pg1.remote_ip4)
1729             / TCP(sport=local_port, dport=12345)
1730         )
1731         self.pg0.add_stream(p)
1732         self.pg_enable_capture(self.pg_interfaces)
1733         self.pg_start()
1734         capture = self.pg1.get_capture(1)
1735         p = capture[0]
1736         try:
1737             ip = p[IP]
1738             tcp = p[TCP]
1739             self.assertEqual(ip.src, self.nat_addr)
1740             self.assertEqual(tcp.sport, external_port)
1741             self.assert_packet_checksums_valid(p)
1742         except:
1743             self.logger.error(ppp("Unexpected or invalid packet:", p))
1744             raise
1745
1746         # from client to server (no translation)
1747         p = (
1748             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1749             / IP(src=self.pg1.remote_ip4, dst=server1.ip4)
1750             / TCP(sport=12346, dport=local_port)
1751         )
1752         self.pg1.add_stream(p)
1753         self.pg_enable_capture(self.pg_interfaces)
1754         self.pg_start()
1755         capture = self.pg0.get_capture(1)
1756         p = capture[0]
1757         server = None
1758         try:
1759             ip = p[IP]
1760             tcp = p[TCP]
1761             self.assertEqual(ip.dst, server1.ip4)
1762             self.assertEqual(tcp.dport, local_port)
1763             self.assert_packet_checksums_valid(p)
1764         except:
1765             self.logger.error(ppp("Unexpected or invalid packet:", p))
1766             raise
1767
1768         # from service back to client (no translation)
1769         p = (
1770             Ether(src=server1.mac, dst=self.pg0.local_mac)
1771             / IP(src=server1.ip4, dst=self.pg1.remote_ip4)
1772             / TCP(sport=local_port, dport=12346)
1773         )
1774         self.pg0.add_stream(p)
1775         self.pg_enable_capture(self.pg_interfaces)
1776         self.pg_start()
1777         capture = self.pg1.get_capture(1)
1778         p = capture[0]
1779         try:
1780             ip = p[IP]
1781             tcp = p[TCP]
1782             self.assertEqual(ip.src, server1.ip4)
1783             self.assertEqual(tcp.sport, local_port)
1784             self.assert_packet_checksums_valid(p)
1785         except:
1786             self.logger.error(ppp("Unexpected or invalid packet:", p))
1787             raise
1788
1789     def test_lb_affinity(self):
1790         """NAT44ED local service load balancing affinity"""
1791         external_addr = self.nat_addr
1792         external_port = 80
1793         local_port = 8080
1794         server1 = self.pg0.remote_hosts[0]
1795         server2 = self.pg0.remote_hosts[1]
1796
1797         locals = [
1798             {"addr": server1.ip4, "port": local_port, "probability": 50, "vrf_id": 0},
1799             {"addr": server2.ip4, "port": local_port, "probability": 50, "vrf_id": 0},
1800         ]
1801
1802         self.nat_add_address(self.nat_addr)
1803         self.vapi.nat44_add_del_lb_static_mapping(
1804             is_add=1,
1805             external_addr=external_addr,
1806             external_port=external_port,
1807             protocol=IP_PROTOS.tcp,
1808             affinity=10800,
1809             local_num=len(locals),
1810             locals=locals,
1811         )
1812         flags = self.config_flags.NAT_IS_INSIDE
1813         self.vapi.nat44_interface_add_del_feature(
1814             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1815         )
1816         self.vapi.nat44_interface_add_del_feature(
1817             sw_if_index=self.pg1.sw_if_index, is_add=1
1818         )
1819
1820         p = (
1821             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1822             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1823             / TCP(sport=1025, dport=external_port)
1824         )
1825         self.pg1.add_stream(p)
1826         self.pg_enable_capture(self.pg_interfaces)
1827         self.pg_start()
1828         capture = self.pg0.get_capture(1)
1829         backend = capture[0][IP].dst
1830
1831         sessions = self.vapi.nat44_user_session_dump(backend, 0)
1832         self.assertEqual(len(sessions), 1)
1833         self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1834         self.vapi.nat44_del_session(
1835             address=sessions[0].inside_ip_address,
1836             port=sessions[0].inside_port,
1837             protocol=sessions[0].protocol,
1838             flags=(
1839                 self.config_flags.NAT_IS_INSIDE
1840                 | self.config_flags.NAT_IS_EXT_HOST_VALID
1841             ),
1842             ext_host_address=sessions[0].ext_host_address,
1843             ext_host_port=sessions[0].ext_host_port,
1844         )
1845
1846         pkts = []
1847         for port in range(1030, 1100):
1848             p = (
1849                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1850                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1851                 / TCP(sport=port, dport=external_port)
1852             )
1853             pkts.append(p)
1854         self.pg1.add_stream(pkts)
1855         self.pg_enable_capture(self.pg_interfaces)
1856         self.pg_start()
1857         capture = self.pg0.get_capture(len(pkts))
1858         for p in capture:
1859             self.assertEqual(p[IP].dst, backend)
1860
1861     def test_multiple_vrf_1(self):
1862         """Multiple VRF - both client & service in VRF1"""
1863
1864         external_addr = "1.2.3.4"
1865         external_port = 80
1866         local_port = 8080
1867         port = 0
1868
1869         flags = self.config_flags.NAT_IS_INSIDE
1870         self.vapi.nat44_interface_add_del_feature(
1871             sw_if_index=self.pg5.sw_if_index, is_add=1
1872         )
1873         self.vapi.nat44_interface_add_del_feature(
1874             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
1875         )
1876         self.vapi.nat44_interface_add_del_feature(
1877             sw_if_index=self.pg6.sw_if_index, is_add=1
1878         )
1879         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1880         self.nat_add_static_mapping(
1881             self.pg5.remote_ip4,
1882             external_addr,
1883             local_port,
1884             external_port,
1885             vrf_id=1,
1886             proto=IP_PROTOS.tcp,
1887             flags=flags,
1888         )
1889
1890         p = (
1891             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
1892             / IP(src=self.pg6.remote_ip4, dst=external_addr)
1893             / TCP(sport=12345, dport=external_port)
1894         )
1895         self.pg6.add_stream(p)
1896         self.pg_enable_capture(self.pg_interfaces)
1897         self.pg_start()
1898         capture = self.pg5.get_capture(1)
1899         p = capture[0]
1900         try:
1901             ip = p[IP]
1902             tcp = p[TCP]
1903             self.assertEqual(ip.dst, self.pg5.remote_ip4)
1904             self.assertEqual(tcp.dport, local_port)
1905             self.assert_packet_checksums_valid(p)
1906         except:
1907             self.logger.error(ppp("Unexpected or invalid packet:", p))
1908             raise
1909
1910         p = (
1911             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
1912             / IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4)
1913             / TCP(sport=local_port, dport=12345)
1914         )
1915         self.pg5.add_stream(p)
1916         self.pg_enable_capture(self.pg_interfaces)
1917         self.pg_start()
1918         capture = self.pg6.get_capture(1)
1919         p = capture[0]
1920         try:
1921             ip = p[IP]
1922             tcp = p[TCP]
1923             self.assertEqual(ip.src, external_addr)
1924             self.assertEqual(tcp.sport, external_port)
1925             self.assert_packet_checksums_valid(p)
1926         except:
1927             self.logger.error(ppp("Unexpected or invalid packet:", p))
1928             raise
1929
1930     def test_multiple_vrf_2(self):
1931         """Multiple VRF - dynamic NAT from VRF1 to VRF0 (output-feature)"""
1932
1933         external_addr = "1.2.3.4"
1934         external_port = 80
1935         local_port = 8080
1936         port = 0
1937
1938         self.nat_add_address(self.nat_addr)
1939         flags = self.config_flags.NAT_IS_INSIDE
1940         self.vapi.nat44_ed_add_del_output_interface(
1941             sw_if_index=self.pg1.sw_if_index, is_add=1
1942         )
1943         self.vapi.nat44_interface_add_del_feature(
1944             sw_if_index=self.pg5.sw_if_index, is_add=1
1945         )
1946         self.vapi.nat44_interface_add_del_feature(
1947             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
1948         )
1949         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1950         self.nat_add_static_mapping(
1951             self.pg5.remote_ip4,
1952             external_addr,
1953             local_port,
1954             external_port,
1955             vrf_id=1,
1956             proto=IP_PROTOS.tcp,
1957             flags=flags,
1958         )
1959
1960         p = (
1961             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
1962             / IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4)
1963             / TCP(sport=2345, dport=22)
1964         )
1965         self.pg5.add_stream(p)
1966         self.pg_enable_capture(self.pg_interfaces)
1967         self.pg_start()
1968         capture = self.pg1.get_capture(1)
1969         p = capture[0]
1970         try:
1971             ip = p[IP]
1972             tcp = p[TCP]
1973             self.assertEqual(ip.src, self.nat_addr)
1974             self.assert_packet_checksums_valid(p)
1975             port = tcp.sport
1976         except:
1977             self.logger.error(ppp("Unexpected or invalid packet:", p))
1978             raise
1979
1980         p = (
1981             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1982             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1983             / TCP(sport=22, dport=port)
1984         )
1985         self.pg1.add_stream(p)
1986         self.pg_enable_capture(self.pg_interfaces)
1987         self.pg_start()
1988         capture = self.pg5.get_capture(1)
1989         p = capture[0]
1990         try:
1991             ip = p[IP]
1992             tcp = p[TCP]
1993             self.assertEqual(ip.dst, self.pg5.remote_ip4)
1994             self.assertEqual(tcp.dport, 2345)
1995             self.assert_packet_checksums_valid(p)
1996         except:
1997             self.logger.error(ppp("Unexpected or invalid packet:", p))
1998             raise
1999
2000     def test_multiple_vrf_3(self):
2001         """Multiple VRF - client in VRF1, service in VRF0"""
2002
2003         external_addr = "1.2.3.4"
2004         external_port = 80
2005         local_port = 8080
2006         port = 0
2007
2008         flags = self.config_flags.NAT_IS_INSIDE
2009         self.vapi.nat44_interface_add_del_feature(
2010             sw_if_index=self.pg0.sw_if_index, is_add=1
2011         )
2012         self.vapi.nat44_interface_add_del_feature(
2013             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2014         )
2015         self.vapi.nat44_interface_add_del_feature(
2016             sw_if_index=self.pg6.sw_if_index, is_add=1
2017         )
2018         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2019         self.nat_add_static_mapping(
2020             self.pg0.remote_ip4,
2021             external_sw_if_index=self.pg0.sw_if_index,
2022             local_port=local_port,
2023             vrf_id=0,
2024             external_port=external_port,
2025             proto=IP_PROTOS.tcp,
2026             flags=flags,
2027         )
2028
2029         # from client VRF1 to service VRF0
2030         p = (
2031             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
2032             / IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4)
2033             / TCP(sport=12346, dport=external_port)
2034         )
2035         self.pg6.add_stream(p)
2036         self.pg_enable_capture(self.pg_interfaces)
2037         self.pg_start()
2038         capture = self.pg0.get_capture(1)
2039         p = capture[0]
2040         try:
2041             ip = p[IP]
2042             tcp = p[TCP]
2043             self.assertEqual(ip.dst, self.pg0.remote_ip4)
2044             self.assertEqual(tcp.dport, local_port)
2045             self.assert_packet_checksums_valid(p)
2046         except:
2047             self.logger.error(ppp("Unexpected or invalid packet:", p))
2048             raise
2049
2050         # from service VRF0 back to client VRF1
2051         p = (
2052             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2053             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2054             / TCP(sport=local_port, dport=12346)
2055         )
2056         self.pg0.add_stream(p)
2057         self.pg_enable_capture(self.pg_interfaces)
2058         self.pg_start()
2059         capture = self.pg6.get_capture(1)
2060         p = capture[0]
2061         try:
2062             ip = p[IP]
2063             tcp = p[TCP]
2064             self.assertEqual(ip.src, self.pg0.local_ip4)
2065             self.assertEqual(tcp.sport, external_port)
2066             self.assert_packet_checksums_valid(p)
2067         except:
2068             self.logger.error(ppp("Unexpected or invalid packet:", p))
2069             raise
2070
2071     def test_multiple_vrf_4(self):
2072         """Multiple VRF - client in VRF0, service in VRF1"""
2073
2074         external_addr = "1.2.3.4"
2075         external_port = 80
2076         local_port = 8080
2077         port = 0
2078
2079         flags = self.config_flags.NAT_IS_INSIDE
2080         self.vapi.nat44_interface_add_del_feature(
2081             sw_if_index=self.pg0.sw_if_index, is_add=1
2082         )
2083         self.vapi.nat44_interface_add_del_feature(
2084             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2085         )
2086         self.vapi.nat44_interface_add_del_feature(
2087             sw_if_index=self.pg5.sw_if_index, is_add=1
2088         )
2089         self.vapi.nat44_interface_add_del_feature(
2090             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
2091         )
2092         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2093         self.nat_add_static_mapping(
2094             self.pg5.remote_ip4,
2095             external_addr,
2096             local_port,
2097             external_port,
2098             vrf_id=1,
2099             proto=IP_PROTOS.tcp,
2100             flags=flags,
2101         )
2102
2103         # from client VRF0 to service VRF1
2104         p = (
2105             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2106             / IP(src=self.pg0.remote_ip4, dst=external_addr)
2107             / TCP(sport=12347, dport=external_port)
2108         )
2109         self.pg0.add_stream(p)
2110         self.pg_enable_capture(self.pg_interfaces)
2111         self.pg_start()
2112         capture = self.pg5.get_capture(1)
2113         p = capture[0]
2114         try:
2115             ip = p[IP]
2116             tcp = p[TCP]
2117             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2118             self.assertEqual(tcp.dport, local_port)
2119             self.assert_packet_checksums_valid(p)
2120         except:
2121             self.logger.error(ppp("Unexpected or invalid packet:", p))
2122             raise
2123
2124         # from service VRF1 back to client VRF0
2125         p = (
2126             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2127             / IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4)
2128             / TCP(sport=local_port, dport=12347)
2129         )
2130         self.pg5.add_stream(p)
2131         self.pg_enable_capture(self.pg_interfaces)
2132         self.pg_start()
2133         capture = self.pg0.get_capture(1)
2134         p = capture[0]
2135         try:
2136             ip = p[IP]
2137             tcp = p[TCP]
2138             self.assertEqual(ip.src, external_addr)
2139             self.assertEqual(tcp.sport, external_port)
2140             self.assert_packet_checksums_valid(p)
2141         except:
2142             self.logger.error(ppp("Unexpected or invalid packet:", p))
2143             raise
2144
2145     def test_multiple_vrf_5(self):
2146         """Multiple VRF - forwarding - no translation"""
2147
2148         external_addr = "1.2.3.4"
2149         external_port = 80
2150         local_port = 8080
2151         port = 0
2152
2153         self.vapi.nat44_forwarding_enable_disable(enable=1)
2154         flags = self.config_flags.NAT_IS_INSIDE
2155         self.vapi.nat44_interface_add_del_feature(
2156             sw_if_index=self.pg0.sw_if_index, is_add=1
2157         )
2158         self.vapi.nat44_interface_add_del_feature(
2159             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2160         )
2161         self.vapi.nat44_interface_add_del_feature(
2162             sw_if_index=self.pg5.sw_if_index, is_add=1
2163         )
2164         self.vapi.nat44_interface_add_del_feature(
2165             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
2166         )
2167         self.vapi.nat44_interface_add_del_feature(
2168             sw_if_index=self.pg6.sw_if_index, is_add=1
2169         )
2170         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2171         self.nat_add_static_mapping(
2172             self.pg5.remote_ip4,
2173             external_addr,
2174             local_port,
2175             external_port,
2176             vrf_id=1,
2177             proto=IP_PROTOS.tcp,
2178             flags=flags,
2179         )
2180         self.nat_add_static_mapping(
2181             self.pg0.remote_ip4,
2182             external_sw_if_index=self.pg0.sw_if_index,
2183             local_port=local_port,
2184             vrf_id=0,
2185             external_port=external_port,
2186             proto=IP_PROTOS.tcp,
2187             flags=flags,
2188         )
2189
2190         # from client to server (both VRF1, no translation)
2191         p = (
2192             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
2193             / IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4)
2194             / TCP(sport=12348, dport=local_port)
2195         )
2196         self.pg6.add_stream(p)
2197         self.pg_enable_capture(self.pg_interfaces)
2198         self.pg_start()
2199         capture = self.pg5.get_capture(1)
2200         p = capture[0]
2201         try:
2202             ip = p[IP]
2203             tcp = p[TCP]
2204             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2205             self.assertEqual(tcp.dport, local_port)
2206             self.assert_packet_checksums_valid(p)
2207         except:
2208             self.logger.error(ppp("Unexpected or invalid packet:", p))
2209             raise
2210
2211         # from server back to client (both VRF1, no translation)
2212         p = (
2213             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2214             / IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4)
2215             / TCP(sport=local_port, dport=12348)
2216         )
2217         self.pg5.add_stream(p)
2218         self.pg_enable_capture(self.pg_interfaces)
2219         self.pg_start()
2220         capture = self.pg6.get_capture(1)
2221         p = capture[0]
2222         try:
2223             ip = p[IP]
2224             tcp = p[TCP]
2225             self.assertEqual(ip.src, self.pg5.remote_ip4)
2226             self.assertEqual(tcp.sport, local_port)
2227             self.assert_packet_checksums_valid(p)
2228         except:
2229             self.logger.error(ppp("Unexpected or invalid packet:", p))
2230             raise
2231
2232         # from client VRF1 to server VRF0 (no translation)
2233         p = (
2234             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2235             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2236             / TCP(sport=local_port, dport=12349)
2237         )
2238         self.pg0.add_stream(p)
2239         self.pg_enable_capture(self.pg_interfaces)
2240         self.pg_start()
2241         capture = self.pg6.get_capture(1)
2242         p = capture[0]
2243         try:
2244             ip = p[IP]
2245             tcp = p[TCP]
2246             self.assertEqual(ip.src, self.pg0.remote_ip4)
2247             self.assertEqual(tcp.sport, local_port)
2248             self.assert_packet_checksums_valid(p)
2249         except:
2250             self.logger.error(ppp("Unexpected or invalid packet:", p))
2251             raise
2252
2253         # from server VRF0 back to client VRF1 (no translation)
2254         p = (
2255             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2256             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2257             / TCP(sport=local_port, dport=12349)
2258         )
2259         self.pg0.add_stream(p)
2260         self.pg_enable_capture(self.pg_interfaces)
2261         self.pg_start()
2262         capture = self.pg6.get_capture(1)
2263         p = capture[0]
2264         try:
2265             ip = p[IP]
2266             tcp = p[TCP]
2267             self.assertEqual(ip.src, self.pg0.remote_ip4)
2268             self.assertEqual(tcp.sport, local_port)
2269             self.assert_packet_checksums_valid(p)
2270         except:
2271             self.logger.error(ppp("Unexpected or invalid packet:", p))
2272             raise
2273
2274         # from client VRF0 to server VRF1 (no translation)
2275         p = (
2276             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2277             / IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4)
2278             / TCP(sport=12344, dport=local_port)
2279         )
2280         self.pg0.add_stream(p)
2281         self.pg_enable_capture(self.pg_interfaces)
2282         self.pg_start()
2283         capture = self.pg5.get_capture(1)
2284         p = capture[0]
2285         try:
2286             ip = p[IP]
2287             tcp = p[TCP]
2288             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2289             self.assertEqual(tcp.dport, local_port)
2290             self.assert_packet_checksums_valid(p)
2291         except:
2292             self.logger.error(ppp("Unexpected or invalid packet:", p))
2293             raise
2294
2295         # from server VRF1 back to client VRF0 (no translation)
2296         p = (
2297             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2298             / IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4)
2299             / TCP(sport=local_port, dport=12344)
2300         )
2301         self.pg5.add_stream(p)
2302         self.pg_enable_capture(self.pg_interfaces)
2303         self.pg_start()
2304         capture = self.pg0.get_capture(1)
2305         p = capture[0]
2306         try:
2307             ip = p[IP]
2308             tcp = p[TCP]
2309             self.assertEqual(ip.src, self.pg5.remote_ip4)
2310             self.assertEqual(tcp.sport, local_port)
2311             self.assert_packet_checksums_valid(p)
2312         except:
2313             self.logger.error(ppp("Unexpected or invalid packet:", p))
2314             raise
2315
2316     def test_outside_address_distribution(self):
2317         """NAT44ED outside address distribution based on source address"""
2318
2319         addresses = 65
2320         x = 100
2321
2322         nat_addresses = []
2323         nat_distribution = {}
2324         for i in range(1, addresses):
2325             a = "10.0.0.%d" % i
2326             nat_addresses.append(a)
2327             nat_distribution[a] = set()
2328
2329         self.nat_add_inside_interface(self.pg0)
2330         self.nat_add_outside_interface(self.pg1)
2331
2332         self.vapi.nat44_add_del_address_range(
2333             first_ip_address=nat_addresses[0],
2334             last_ip_address=nat_addresses[-1],
2335             vrf_id=0xFFFFFFFF,
2336             is_add=1,
2337             flags=0,
2338         )
2339
2340         self.pg0.generate_remote_hosts(x)
2341
2342         pkts = []
2343         for i in range(x):
2344             info = self.create_packet_info(self.pg0, self.pg1)
2345             payload = self.info_to_payload(info)
2346             p = (
2347                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2348                 / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4)
2349                 / UDP(sport=7000 + i, dport=8000 + i)
2350                 / Raw(payload)
2351             )
2352             info.data = p
2353             pkts.append(p)
2354
2355         self.pg0.add_stream(pkts)
2356         self.pg_enable_capture(self.pg_interfaces)
2357         self.pg_start()
2358         recvd = self.pg1.get_capture(len(pkts))
2359         for p_recvd in recvd:
2360             payload_info = self.payload_to_info(p_recvd[Raw])
2361             packet_index = payload_info.index
2362             info = self._packet_infos[packet_index]
2363             self.assertTrue(info is not None)
2364             self.assertEqual(packet_index, info.index)
2365             p_sent = info.data
2366             self.assertIn(p_recvd[IP].src, nat_distribution)
2367             nat_distribution[p_recvd[IP].src].add(p_sent[IP].src)
2368
2369         var = variance(map(len, nat_distribution.values()), x / addresses)
2370         self.assertLess(var, 0.33, msg="Bad outside address distribution")
2371
2372     def test_dynamic_edge_ports(self):
2373         """NAT44ED dynamic translation test: edge ports"""
2374
2375         worker_count = self.vpp_worker_count or 1
2376         port_offset = 1024
2377         port_per_thread = (65536 - port_offset) // worker_count
2378         port_count = port_per_thread * worker_count
2379
2380         # worker thread edge ports
2381         thread_edge_ports = {0, port_offset - 1, 65535}
2382         for i in range(0, worker_count):
2383             port_thread_offset = (port_per_thread * i) + port_offset
2384             for port_range_offset in [0, port_per_thread - 1]:
2385                 port = port_thread_offset + port_range_offset
2386                 thread_edge_ports.add(port)
2387         thread_drop_ports = set(
2388             filter(
2389                 lambda x: x not in range(port_offset, port_offset + port_count),
2390                 thread_edge_ports,
2391             )
2392         )
2393
2394         in_if = self.pg7
2395         out_if = self.pg8
2396
2397         self.nat_add_address(self.nat_addr)
2398
2399         try:
2400             self.configure_ip4_interface(in_if, hosts=worker_count)
2401             self.configure_ip4_interface(out_if)
2402
2403             self.nat_add_inside_interface(in_if)
2404             self.nat_add_outside_interface(out_if)
2405
2406             # in2out
2407             tc1 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2408             uc1 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2409             ic1 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2410             dc1 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2411
2412             pkt_count = worker_count * len(thread_edge_ports)
2413
2414             i2o_pkts = [[] for x in range(0, worker_count)]
2415             for i in range(0, worker_count):
2416                 remote_host = in_if.remote_hosts[i]
2417                 for port in thread_edge_ports:
2418                     p = (
2419                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2420                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2421                         / TCP(sport=port, dport=port)
2422                     )
2423                     i2o_pkts[i].append(p)
2424
2425                     p = (
2426                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2427                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2428                         / UDP(sport=port, dport=port)
2429                     )
2430                     i2o_pkts[i].append(p)
2431
2432                     p = (
2433                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2434                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2435                         / ICMP(id=port, seq=port, type="echo-request")
2436                     )
2437                     i2o_pkts[i].append(p)
2438
2439             for i in range(0, worker_count):
2440                 if len(i2o_pkts[i]) > 0:
2441                     in_if.add_stream(i2o_pkts[i], worker=i)
2442
2443             self.pg_enable_capture(self.pg_interfaces)
2444             self.pg_start()
2445             capture = out_if.get_capture(pkt_count * 3)
2446             for packet in capture:
2447                 self.assert_packet_checksums_valid(packet)
2448                 if packet.haslayer(TCP):
2449                     self.assert_in_range(
2450                         packet[TCP].sport,
2451                         port_offset,
2452                         port_offset + port_count,
2453                         "src TCP port",
2454                     )
2455                 elif packet.haslayer(UDP):
2456                     self.assert_in_range(
2457                         packet[UDP].sport,
2458                         port_offset,
2459                         port_offset + port_count,
2460                         "src UDP port",
2461                     )
2462                 elif packet.haslayer(ICMP):
2463                     self.assert_in_range(
2464                         packet[ICMP].id,
2465                         port_offset,
2466                         port_offset + port_count,
2467                         "ICMP id",
2468                     )
2469                 else:
2470                     self.fail(
2471                         ppp("Unexpected or invalid packet (outside network):", packet)
2472                     )
2473
2474             if_idx = in_if.sw_if_index
2475             tc2 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2476             uc2 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2477             ic2 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2478             dc2 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2479
2480             self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2481             self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2482             self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2483             self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2484
2485             # out2in
2486             tc1 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2487             uc1 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2488             ic1 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2489             dc1 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2490             dc3 = self.statistics["/nat44-ed/out2in/slowpath/drops"]
2491
2492             # replies to unchanged thread ports should pass on each worker,
2493             # excluding packets outside dynamic port range
2494             drop_count = worker_count * len(thread_drop_ports)
2495             pass_count = worker_count * len(thread_edge_ports) - drop_count
2496
2497             o2i_pkts = [[] for x in range(0, worker_count)]
2498             for i in range(0, worker_count):
2499                 for port in thread_edge_ports:
2500                     p = (
2501                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2502                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2503                         / TCP(sport=port, dport=port)
2504                     )
2505                     o2i_pkts[i].append(p)
2506
2507                     p = (
2508                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2509                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2510                         / UDP(sport=port, dport=port)
2511                     )
2512                     o2i_pkts[i].append(p)
2513
2514                     p = (
2515                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2516                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2517                         / ICMP(id=port, seq=port, type="echo-reply")
2518                     )
2519                     o2i_pkts[i].append(p)
2520
2521             for i in range(0, worker_count):
2522                 if len(o2i_pkts[i]) > 0:
2523                     out_if.add_stream(o2i_pkts[i], worker=i)
2524
2525             self.pg_enable_capture(self.pg_interfaces)
2526             self.pg_start()
2527             capture = in_if.get_capture(pass_count * 3)
2528             for packet in capture:
2529                 self.assert_packet_checksums_valid(packet)
2530                 if packet.haslayer(TCP):
2531                     self.assertIn(packet[TCP].dport, thread_edge_ports, "dst TCP port")
2532                     self.assertEqual(packet[TCP].dport, packet[TCP].sport, "TCP ports")
2533                 elif packet.haslayer(UDP):
2534                     self.assertIn(packet[UDP].dport, thread_edge_ports, "dst UDP port")
2535                     self.assertEqual(packet[UDP].dport, packet[UDP].sport, "UDP ports")
2536                 elif packet.haslayer(ICMP):
2537                     self.assertIn(packet[ICMP].id, thread_edge_ports, "ICMP id")
2538                     self.assertEqual(packet[ICMP].id, packet[ICMP].seq, "ICMP id & seq")
2539                 else:
2540                     self.fail(
2541                         ppp("Unexpected or invalid packet (inside network):", packet)
2542                     )
2543
2544             if_idx = out_if.sw_if_index
2545             tc2 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2546             uc2 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2547             ic2 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2548             dc2 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2549             dc4 = self.statistics["/nat44-ed/out2in/slowpath/drops"]
2550
2551             self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pass_count)
2552             self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pass_count)
2553             self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pass_count)
2554             self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2555             self.assertEqual(
2556                 dc4[:, if_idx].sum() - dc3[:, if_idx].sum(), drop_count * 3
2557             )
2558
2559         finally:
2560             in_if.unconfig()
2561             out_if.unconfig()
2562
2563     def test_delete_interface(self):
2564         """NAT44ED delete nat interface"""
2565
2566         self.nat_add_address(self.nat_addr)
2567
2568         interfaces = self.create_loopback_interfaces(4)
2569         self.nat_add_outside_interface(interfaces[0])
2570         self.nat_add_inside_interface(interfaces[1])
2571         self.nat_add_outside_interface(interfaces[2])
2572         self.nat_add_inside_interface(interfaces[2])
2573         self.vapi.nat44_ed_add_del_output_interface(
2574             sw_if_index=interfaces[3].sw_if_index, is_add=1
2575         )
2576
2577         nat_sw_if_indices = [
2578             i.sw_if_index
2579             for i in self.vapi.nat44_interface_dump()
2580             + list(self.vapi.vpp.details_iter(self.vapi.nat44_ed_output_interface_get))
2581         ]
2582         self.assertEqual(len(nat_sw_if_indices), len(interfaces))
2583
2584         loopbacks = []
2585         for i in interfaces:
2586             # delete nat-enabled interface
2587             self.assertIn(i.sw_if_index, nat_sw_if_indices)
2588             i.remove_vpp_config()
2589
2590             # create interface with the same index
2591             lo = VppLoInterface(self)
2592             loopbacks.append(lo)
2593             self.assertEqual(lo.sw_if_index, i.sw_if_index)
2594
2595             # check interface is not nat-enabled
2596             nat_sw_if_indices = [
2597                 i.sw_if_index
2598                 for i in self.vapi.nat44_interface_dump()
2599                 + list(
2600                     self.vapi.vpp.details_iter(self.vapi.nat44_ed_output_interface_get)
2601                 )
2602             ]
2603             self.assertNotIn(lo.sw_if_index, nat_sw_if_indices)
2604
2605         for i in loopbacks:
2606             i.remove_vpp_config()
2607
2608
2609 @tag_fixme_ubuntu2204
2610 class TestNAT44EDMW(TestNAT44ED):
2611     """NAT44ED MW Test Case"""
2612
2613     vpp_worker_count = 4
2614     max_sessions = 5000
2615
2616     def test_dynamic(self):
2617         """NAT44ED dynamic translation test"""
2618         pkt_count = 1500
2619         tcp_port_offset = 20
2620         udp_port_offset = 20
2621         icmp_id_offset = 20
2622
2623         self.nat_add_address(self.nat_addr)
2624         self.nat_add_inside_interface(self.pg0)
2625         self.nat_add_outside_interface(self.pg1)
2626
2627         # in2out
2628         tc1 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2629         uc1 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2630         ic1 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2631         dc1 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2632
2633         i2o_pkts = [[] for x in range(0, self.vpp_worker_count)]
2634
2635         for i in range(pkt_count):
2636             p = (
2637                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2638                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2639                 / TCP(sport=tcp_port_offset + i, dport=20)
2640             )
2641             i2o_pkts[p[TCP].sport % self.vpp_worker_count].append(p)
2642
2643             p = (
2644                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2645                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2646                 / UDP(sport=udp_port_offset + i, dport=20)
2647             )
2648             i2o_pkts[p[UDP].sport % self.vpp_worker_count].append(p)
2649
2650             p = (
2651                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2652                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2653                 / ICMP(id=icmp_id_offset + i, type="echo-request")
2654             )
2655             i2o_pkts[p[ICMP].id % self.vpp_worker_count].append(p)
2656
2657         for i in range(0, self.vpp_worker_count):
2658             if len(i2o_pkts[i]) > 0:
2659                 self.pg0.add_stream(i2o_pkts[i], worker=i)
2660
2661         self.pg_enable_capture(self.pg_interfaces)
2662         self.pg_start()
2663         capture = self.pg1.get_capture(pkt_count * 3, timeout=5)
2664
2665         if_idx = self.pg0.sw_if_index
2666         tc2 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2667         uc2 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2668         ic2 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2669         dc2 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2670
2671         self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2672         self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2673         self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2674         self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2675
2676         self.logger.info(self.vapi.cli("show trace"))
2677
2678         # out2in
2679         tc1 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2680         uc1 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2681         ic1 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2682         dc1 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2683
2684         recvd_tcp_ports = set()
2685         recvd_udp_ports = set()
2686         recvd_icmp_ids = set()
2687
2688         for p in capture:
2689             if TCP in p:
2690                 recvd_tcp_ports.add(p[TCP].sport)
2691             if UDP in p:
2692                 recvd_udp_ports.add(p[UDP].sport)
2693             if ICMP in p:
2694                 recvd_icmp_ids.add(p[ICMP].id)
2695
2696         recvd_tcp_ports = list(recvd_tcp_ports)
2697         recvd_udp_ports = list(recvd_udp_ports)
2698         recvd_icmp_ids = list(recvd_icmp_ids)
2699
2700         o2i_pkts = [[] for x in range(0, self.vpp_worker_count)]
2701         for i in range(pkt_count):
2702             p = (
2703                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2704                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2705                 / TCP(dport=choice(recvd_tcp_ports), sport=20)
2706             )
2707             o2i_pkts[p[TCP].dport % self.vpp_worker_count].append(p)
2708
2709             p = (
2710                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2711                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2712                 / UDP(dport=choice(recvd_udp_ports), sport=20)
2713             )
2714             o2i_pkts[p[UDP].dport % self.vpp_worker_count].append(p)
2715
2716             p = (
2717                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2718                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2719                 / ICMP(id=choice(recvd_icmp_ids), type="echo-reply")
2720             )
2721             o2i_pkts[p[ICMP].id % self.vpp_worker_count].append(p)
2722
2723         for i in range(0, self.vpp_worker_count):
2724             if len(o2i_pkts[i]) > 0:
2725                 self.pg1.add_stream(o2i_pkts[i], worker=i)
2726
2727         self.pg_enable_capture(self.pg_interfaces)
2728         self.pg_start()
2729         capture = self.pg0.get_capture(pkt_count * 3)
2730         for packet in capture:
2731             try:
2732                 self.assert_packet_checksums_valid(packet)
2733                 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2734                 if packet.haslayer(TCP):
2735                     self.assert_in_range(
2736                         packet[TCP].dport,
2737                         tcp_port_offset,
2738                         tcp_port_offset + pkt_count,
2739                         "dst TCP port",
2740                     )
2741                 elif packet.haslayer(UDP):
2742                     self.assert_in_range(
2743                         packet[UDP].dport,
2744                         udp_port_offset,
2745                         udp_port_offset + pkt_count,
2746                         "dst UDP port",
2747                     )
2748                 else:
2749                     self.assert_in_range(
2750                         packet[ICMP].id,
2751                         icmp_id_offset,
2752                         icmp_id_offset + pkt_count,
2753                         "ICMP id",
2754                     )
2755             except:
2756                 self.logger.error(
2757                     ppp("Unexpected or invalid packet (inside network):", packet)
2758                 )
2759                 raise
2760
2761         if_idx = self.pg1.sw_if_index
2762         tc2 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2763         uc2 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2764         ic2 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2765         dc2 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2766
2767         self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2768         self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2769         self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2770         self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2771
2772         sc = self.statistics["/nat44-ed/total-sessions"]
2773         self.assertEqual(
2774             sc[:, 0].sum(),
2775             len(recvd_tcp_ports) + len(recvd_udp_ports) + len(recvd_icmp_ids),
2776         )
2777
2778     def test_frag_in_order(self):
2779         """NAT44ED translate fragments arriving in order"""
2780
2781         self.nat_add_address(self.nat_addr)
2782         self.nat_add_inside_interface(self.pg0)
2783         self.nat_add_outside_interface(self.pg1)
2784
2785         self.frag_in_order(proto=IP_PROTOS.tcp, ignore_port=True)
2786         self.frag_in_order(proto=IP_PROTOS.udp, ignore_port=True)
2787         self.frag_in_order(proto=IP_PROTOS.icmp, ignore_port=True)
2788
2789     def test_frag_in_order_do_not_translate(self):
2790         """NAT44ED don't translate fragments arriving in order"""
2791
2792         self.nat_add_address(self.nat_addr)
2793         self.nat_add_inside_interface(self.pg0)
2794         self.nat_add_outside_interface(self.pg1)
2795         self.vapi.nat44_forwarding_enable_disable(enable=True)
2796
2797         self.frag_in_order(proto=IP_PROTOS.tcp, dont_translate=True)
2798
2799     def test_frag_out_of_order(self):
2800         """NAT44ED translate fragments arriving out of order"""
2801
2802         self.nat_add_address(self.nat_addr)
2803         self.nat_add_inside_interface(self.pg0)
2804         self.nat_add_outside_interface(self.pg1)
2805
2806         self.frag_out_of_order(proto=IP_PROTOS.tcp, ignore_port=True)
2807         self.frag_out_of_order(proto=IP_PROTOS.udp, ignore_port=True)
2808         self.frag_out_of_order(proto=IP_PROTOS.icmp, ignore_port=True)
2809
2810     def test_frag_in_order_in_plus_out(self):
2811         """NAT44ED in+out interface fragments in order"""
2812
2813         in_port = self.random_port()
2814         out_port = self.random_port()
2815
2816         self.nat_add_address(self.nat_addr)
2817         self.nat_add_inside_interface(self.pg0)
2818         self.nat_add_outside_interface(self.pg0)
2819         self.nat_add_inside_interface(self.pg1)
2820         self.nat_add_outside_interface(self.pg1)
2821
2822         # add static mappings for server
2823         self.nat_add_static_mapping(
2824             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.tcp
2825         )
2826         self.nat_add_static_mapping(
2827             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.udp
2828         )
2829         self.nat_add_static_mapping(
2830             self.server_addr, self.nat_addr, proto=IP_PROTOS.icmp
2831         )
2832
2833         # run tests for each protocol
2834         self.frag_in_order_in_plus_out(
2835             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.tcp
2836         )
2837         self.frag_in_order_in_plus_out(
2838             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.udp
2839         )
2840         self.frag_in_order_in_plus_out(
2841             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.icmp
2842         )
2843
2844     def test_frag_out_of_order_in_plus_out(self):
2845         """NAT44ED in+out interface fragments out of order"""
2846
2847         in_port = self.random_port()
2848         out_port = self.random_port()
2849
2850         self.nat_add_address(self.nat_addr)
2851         self.nat_add_inside_interface(self.pg0)
2852         self.nat_add_outside_interface(self.pg0)
2853         self.nat_add_inside_interface(self.pg1)
2854         self.nat_add_outside_interface(self.pg1)
2855
2856         # add static mappings for server
2857         self.nat_add_static_mapping(
2858             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.tcp
2859         )
2860         self.nat_add_static_mapping(
2861             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.udp
2862         )
2863         self.nat_add_static_mapping(
2864             self.server_addr, self.nat_addr, proto=IP_PROTOS.icmp
2865         )
2866
2867         # run tests for each protocol
2868         self.frag_out_of_order_in_plus_out(
2869             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.tcp
2870         )
2871         self.frag_out_of_order_in_plus_out(
2872             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.udp
2873         )
2874         self.frag_out_of_order_in_plus_out(
2875             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.icmp
2876         )
2877
2878     def test_reass_hairpinning(self):
2879         """NAT44ED fragments hairpinning"""
2880
2881         server_addr = self.pg0.remote_hosts[1].ip4
2882
2883         host_in_port = self.random_port()
2884         server_in_port = self.random_port()
2885         server_out_port = self.random_port()
2886
2887         self.nat_add_address(self.nat_addr)
2888         self.nat_add_inside_interface(self.pg0)
2889         self.nat_add_outside_interface(self.pg1)
2890
2891         # add static mapping for server
2892         self.nat_add_static_mapping(
2893             server_addr,
2894             self.nat_addr,
2895             server_in_port,
2896             server_out_port,
2897             proto=IP_PROTOS.tcp,
2898         )
2899         self.nat_add_static_mapping(
2900             server_addr,
2901             self.nat_addr,
2902             server_in_port,
2903             server_out_port,
2904             proto=IP_PROTOS.udp,
2905         )
2906         self.nat_add_static_mapping(server_addr, self.nat_addr)
2907
2908         self.reass_hairpinning(
2909             server_addr,
2910             server_in_port,
2911             server_out_port,
2912             host_in_port,
2913             proto=IP_PROTOS.tcp,
2914             ignore_port=True,
2915         )
2916         self.reass_hairpinning(
2917             server_addr,
2918             server_in_port,
2919             server_out_port,
2920             host_in_port,
2921             proto=IP_PROTOS.udp,
2922             ignore_port=True,
2923         )
2924         self.reass_hairpinning(
2925             server_addr,
2926             server_in_port,
2927             server_out_port,
2928             host_in_port,
2929             proto=IP_PROTOS.icmp,
2930             ignore_port=True,
2931         )
2932
2933     def test_session_limit_per_vrf(self):
2934         """NAT44ED per vrf session limit"""
2935
2936         inside = self.pg0
2937         inside_vrf10 = self.pg2
2938         outside = self.pg1
2939
2940         limit = 5
2941
2942         # 2 interfaces pg0, pg1 (vrf10, limit 5 tcp sessions)
2943         self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=10)
2944
2945         # expect error when bad is specified
2946         with self.vapi.assert_negative_api_retval():
2947             self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=20)
2948
2949         self.nat_add_inside_interface(inside)
2950         self.nat_add_inside_interface(inside_vrf10)
2951         self.nat_add_outside_interface(outside)
2952
2953         # vrf independent
2954         self.nat_add_interface_address(outside)
2955
2956         # BUG: causing core dump - when bad vrf_id is specified
2957         # self.nat_add_address(outside.local_ip4, vrf_id=20)
2958
2959         stream = self.create_tcp_stream(inside_vrf10, outside, limit * 2)
2960         inside_vrf10.add_stream(stream)
2961
2962         self.pg_enable_capture(self.pg_interfaces)
2963         self.pg_start()
2964
2965         capture = outside.get_capture(limit)
2966
2967         stream = self.create_tcp_stream(inside, outside, limit * 2)
2968         inside.add_stream(stream)
2969
2970         self.pg_enable_capture(self.pg_interfaces)
2971         self.pg_start()
2972
2973         capture = outside.get_capture(len(stream))
2974
2975     def test_show_max_translations(self):
2976         """NAT44ED API test - max translations per thread"""
2977         config = self.vapi.nat44_show_running_config()
2978         self.assertEqual(self.max_sessions, config.sessions)
2979
2980     def test_lru_cleanup(self):
2981         """NAT44ED LRU cleanup algorithm"""
2982
2983         self.nat_add_address(self.nat_addr)
2984         self.nat_add_inside_interface(self.pg0)
2985         self.nat_add_outside_interface(self.pg1)
2986
2987         self.vapi.nat_set_timeouts(
2988             udp=1, tcp_established=7440, tcp_transitory=30, icmp=1
2989         )
2990
2991         tcp_port_out = self.init_tcp_session(self.pg0, self.pg1, 2000, 80)
2992         pkts = []
2993         for i in range(0, self.max_sessions - 1):
2994             p = (
2995                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2996                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
2997                 / UDP(sport=7000 + i, dport=80)
2998             )
2999             pkts.append(p)
3000
3001         self.pg0.add_stream(pkts)
3002         self.pg_enable_capture(self.pg_interfaces)
3003         self.pg_start()
3004         self.pg1.get_capture(len(pkts))
3005         self.virtual_sleep(1.5, "wait for timeouts")
3006
3007         pkts = []
3008         for i in range(0, self.max_sessions - 1):
3009             p = (
3010                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3011                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
3012                 / ICMP(id=8000 + i, type="echo-request")
3013             )
3014             pkts.append(p)
3015
3016         self.pg0.add_stream(pkts)
3017         self.pg_enable_capture(self.pg_interfaces)
3018         self.pg_start()
3019         self.pg1.get_capture(len(pkts))
3020
3021     def test_session_rst_timeout(self):
3022         """NAT44ED session RST timeouts"""
3023
3024         self.nat_add_address(self.nat_addr)
3025         self.nat_add_inside_interface(self.pg0)
3026         self.nat_add_outside_interface(self.pg1)
3027
3028         self.vapi.nat_set_timeouts(
3029             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
3030         )
3031
3032         self.init_tcp_session(
3033             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
3034         )
3035         p = (
3036             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3037             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3038             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="R")
3039         )
3040         self.send_and_expect(self.pg0, p, self.pg1)
3041
3042         self.virtual_sleep(6)
3043
3044         # The session is already closed
3045         p = (
3046             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3047             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3048             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
3049         )
3050         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
3051
3052         # The session can be re-opened
3053         p = (
3054             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3055             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3056             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="S")
3057         )
3058         self.send_and_expect(self.pg0, p, self.pg1)
3059
3060     def test_session_rst_established_timeout(self):
3061         """NAT44ED session RST timeouts"""
3062
3063         self.nat_add_address(self.nat_addr)
3064         self.nat_add_inside_interface(self.pg0)
3065         self.nat_add_outside_interface(self.pg1)
3066
3067         self.vapi.nat_set_timeouts(
3068             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
3069         )
3070
3071         self.init_tcp_session(
3072             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
3073         )
3074
3075         # Wait at least the transitory time, the session is in established
3076         # state anyway. RST followed by a data packet should move it to
3077         # transitory state.
3078         self.virtual_sleep(6)
3079         p = (
3080             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3081             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3082             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="R")
3083         )
3084         self.send_and_expect(self.pg0, p, self.pg1)
3085
3086         p = (
3087             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3088             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3089             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
3090         )
3091         self.send_and_expect(self.pg0, p, self.pg1)
3092
3093         # State is transitory, session should be closed after 6 seconds
3094         self.virtual_sleep(6)
3095
3096         p = (
3097             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3098             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3099             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
3100         )
3101         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
3102
3103     def test_dynamic_out_of_ports(self):
3104         """NAT44ED dynamic translation test: out of ports"""
3105
3106         self.nat_add_inside_interface(self.pg0)
3107         self.nat_add_outside_interface(self.pg1)
3108
3109         # in2out and no NAT addresses added
3110         pkts = self.create_stream_in(self.pg0, self.pg1)
3111
3112         self.send_and_assert_no_replies(
3113             self.pg0,
3114             pkts,
3115             msg="i2o pkts",
3116             stats_diff=self.no_diff
3117             | {
3118                 "err": {
3119                     "/err/nat44-ed-in2out-slowpath/out of ports": len(pkts),
3120                 },
3121                 self.pg0.sw_if_index: {
3122                     "/nat44-ed/in2out/slowpath/drops": len(pkts),
3123                 },
3124             },
3125         )
3126
3127         # in2out after NAT addresses added
3128         self.nat_add_address(self.nat_addr)
3129
3130         tcpn, udpn, icmpn = (
3131             sum(x) for x in zip(*((TCP in p, UDP in p, ICMP in p) for p in pkts))
3132         )
3133
3134         self.send_and_expect(
3135             self.pg0,
3136             pkts,
3137             self.pg1,
3138             msg="i2o pkts",
3139             stats_diff=self.no_diff
3140             | {
3141                 "err": {
3142                     "/err/nat44-ed-in2out-slowpath/out of ports": 0,
3143                 },
3144                 self.pg0.sw_if_index: {
3145                     "/nat44-ed/in2out/slowpath/drops": 0,
3146                     "/nat44-ed/in2out/slowpath/tcp": tcpn,
3147                     "/nat44-ed/in2out/slowpath/udp": udpn,
3148                     "/nat44-ed/in2out/slowpath/icmp": icmpn,
3149                 },
3150             },
3151         )
3152
3153     def test_unknown_proto(self):
3154         """NAT44ED translate packet with unknown protocol"""
3155
3156         self.nat_add_address(self.nat_addr)
3157         self.nat_add_inside_interface(self.pg0)
3158         self.nat_add_outside_interface(self.pg1)
3159
3160         # in2out
3161         p = (
3162             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3163             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3164             / TCP(sport=self.tcp_port_in, dport=20)
3165         )
3166         self.pg0.add_stream(p)
3167         self.pg_enable_capture(self.pg_interfaces)
3168         self.pg_start()
3169         p = self.pg1.get_capture(1)
3170
3171         p = (
3172             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3173             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3174             / GRE()
3175             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3176             / TCP(sport=1234, dport=1234)
3177         )
3178         self.pg0.add_stream(p)
3179         self.pg_enable_capture(self.pg_interfaces)
3180         self.pg_start()
3181         p = self.pg1.get_capture(1)
3182         packet = p[0]
3183         try:
3184             self.assertEqual(packet[IP].src, self.nat_addr)
3185             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3186             self.assertEqual(packet.haslayer(GRE), 1)
3187             self.assert_packet_checksums_valid(packet)
3188         except:
3189             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3190             raise
3191
3192         # out2in
3193         p = (
3194             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3195             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3196             / GRE()
3197             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3198             / TCP(sport=1234, dport=1234)
3199         )
3200         self.pg1.add_stream(p)
3201         self.pg_enable_capture(self.pg_interfaces)
3202         self.pg_start()
3203         p = self.pg0.get_capture(1)
3204         packet = p[0]
3205         try:
3206             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3207             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3208             self.assertEqual(packet.haslayer(GRE), 1)
3209             self.assert_packet_checksums_valid(packet)
3210         except:
3211             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3212             raise
3213
3214     def test_hairpinning_unknown_proto(self):
3215         """NAT44ED translate packet with unknown protocol - hairpinning"""
3216         host = self.pg0.remote_hosts[0]
3217         server = self.pg0.remote_hosts[1]
3218         host_in_port = 1234
3219         server_out_port = 8765
3220         server_nat_ip = "10.0.0.11"
3221
3222         self.nat_add_address(self.nat_addr)
3223         self.nat_add_inside_interface(self.pg0)
3224         self.nat_add_outside_interface(self.pg1)
3225
3226         # add static mapping for server
3227         self.nat_add_static_mapping(server.ip4, server_nat_ip)
3228
3229         # host to server
3230         p = (
3231             Ether(src=host.mac, dst=self.pg0.local_mac)
3232             / IP(src=host.ip4, dst=server_nat_ip)
3233             / TCP(sport=host_in_port, dport=server_out_port)
3234         )
3235         self.pg0.add_stream(p)
3236         self.pg_enable_capture(self.pg_interfaces)
3237         self.pg_start()
3238         self.pg0.get_capture(1)
3239
3240         p = (
3241             Ether(dst=self.pg0.local_mac, src=host.mac)
3242             / IP(src=host.ip4, dst=server_nat_ip)
3243             / GRE()
3244             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3245             / TCP(sport=1234, dport=1234)
3246         )
3247         self.pg0.add_stream(p)
3248         self.pg_enable_capture(self.pg_interfaces)
3249         self.pg_start()
3250         p = self.pg0.get_capture(1)
3251         packet = p[0]
3252         try:
3253             self.assertEqual(packet[IP].src, self.nat_addr)
3254             self.assertEqual(packet[IP].dst, server.ip4)
3255             self.assertEqual(packet.haslayer(GRE), 1)
3256             self.assert_packet_checksums_valid(packet)
3257         except:
3258             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3259             raise
3260
3261         # server to host
3262         p = (
3263             Ether(dst=self.pg0.local_mac, src=server.mac)
3264             / IP(src=server.ip4, dst=self.nat_addr)
3265             / GRE()
3266             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3267             / TCP(sport=1234, dport=1234)
3268         )
3269         self.pg0.add_stream(p)
3270         self.pg_enable_capture(self.pg_interfaces)
3271         self.pg_start()
3272         p = self.pg0.get_capture(1)
3273         packet = p[0]
3274         try:
3275             self.assertEqual(packet[IP].src, server_nat_ip)
3276             self.assertEqual(packet[IP].dst, host.ip4)
3277             self.assertEqual(packet.haslayer(GRE), 1)
3278             self.assert_packet_checksums_valid(packet)
3279         except:
3280             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3281             raise
3282
3283     def test_output_feature_and_service(self):
3284         """NAT44ED interface output feature and services"""
3285         external_addr = "1.2.3.4"
3286         external_port = 80
3287         local_port = 8080
3288
3289         self.vapi.nat44_forwarding_enable_disable(enable=1)
3290         self.nat_add_address(self.nat_addr)
3291         flags = self.config_flags.NAT_IS_ADDR_ONLY
3292         self.vapi.nat44_add_del_identity_mapping(
3293             ip_address=self.pg1.remote_ip4,
3294             sw_if_index=0xFFFFFFFF,
3295             flags=flags,
3296             is_add=1,
3297         )
3298         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
3299         self.nat_add_static_mapping(
3300             self.pg0.remote_ip4,
3301             external_addr,
3302             local_port,
3303             external_port,
3304             proto=IP_PROTOS.tcp,
3305             flags=flags,
3306         )
3307
3308         self.nat_add_inside_interface(self.pg0)
3309         self.nat_add_outside_interface(self.pg0)
3310         self.vapi.nat44_ed_add_del_output_interface(
3311             sw_if_index=self.pg1.sw_if_index, is_add=1
3312         )
3313
3314         # from client to service
3315         p = (
3316             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3317             / IP(src=self.pg1.remote_ip4, dst=external_addr)
3318             / TCP(sport=12345, dport=external_port)
3319         )
3320         self.pg1.add_stream(p)
3321         self.pg_enable_capture(self.pg_interfaces)
3322         self.pg_start()
3323         capture = self.pg0.get_capture(1)
3324         p = capture[0]
3325         try:
3326             ip = p[IP]
3327             tcp = p[TCP]
3328             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3329             self.assertEqual(tcp.dport, local_port)
3330             self.assert_packet_checksums_valid(p)
3331         except:
3332             self.logger.error(ppp("Unexpected or invalid packet:", p))
3333             raise
3334
3335         # from service back to client
3336         p = (
3337             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3338             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3339             / TCP(sport=local_port, dport=12345)
3340         )
3341         self.pg0.add_stream(p)
3342         self.pg_enable_capture(self.pg_interfaces)
3343         self.pg_start()
3344         capture = self.pg1.get_capture(1)
3345         p = capture[0]
3346         try:
3347             ip = p[IP]
3348             tcp = p[TCP]
3349             self.assertEqual(ip.src, external_addr)
3350             self.assertEqual(tcp.sport, external_port)
3351             self.assert_packet_checksums_valid(p)
3352         except:
3353             self.logger.error(ppp("Unexpected or invalid packet:", p))
3354             raise
3355
3356         # from local network host to external network
3357         pkts = self.create_stream_in(self.pg0, self.pg1)
3358         self.pg0.add_stream(pkts)
3359         self.pg_enable_capture(self.pg_interfaces)
3360         self.pg_start()
3361         capture = self.pg1.get_capture(len(pkts))
3362         self.verify_capture_out(capture, ignore_port=True)
3363         pkts = self.create_stream_in(self.pg0, self.pg1)
3364         self.pg0.add_stream(pkts)
3365         self.pg_enable_capture(self.pg_interfaces)
3366         self.pg_start()
3367         capture = self.pg1.get_capture(len(pkts))
3368         self.verify_capture_out(capture, ignore_port=True)
3369
3370         # from external network back to local network host
3371         pkts = self.create_stream_out(self.pg1)
3372         self.pg1.add_stream(pkts)
3373         self.pg_enable_capture(self.pg_interfaces)
3374         self.pg_start()
3375         capture = self.pg0.get_capture(len(pkts))
3376         self.verify_capture_in(capture, self.pg0)
3377
3378     def test_output_feature_and_service3(self):
3379         """NAT44ED interface output feature and DST NAT"""
3380         external_addr = "1.2.3.4"
3381         external_port = 80
3382         local_port = 8080
3383
3384         self.vapi.nat44_forwarding_enable_disable(enable=1)
3385         self.nat_add_address(self.nat_addr)
3386         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
3387         self.nat_add_static_mapping(
3388             self.pg1.remote_ip4,
3389             external_addr,
3390             local_port,
3391             external_port,
3392             proto=IP_PROTOS.tcp,
3393             flags=flags,
3394         )
3395
3396         self.nat_add_inside_interface(self.pg0)
3397         self.nat_add_outside_interface(self.pg0)
3398         self.vapi.nat44_ed_add_del_output_interface(
3399             sw_if_index=self.pg1.sw_if_index, is_add=1
3400         )
3401
3402         p = (
3403             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3404             / IP(src=self.pg0.remote_ip4, dst=external_addr)
3405             / TCP(sport=12345, dport=external_port)
3406         )
3407         self.pg0.add_stream(p)
3408         self.pg_enable_capture(self.pg_interfaces)
3409         self.pg_start()
3410         capture = self.pg1.get_capture(1)
3411         p = capture[0]
3412         try:
3413             ip = p[IP]
3414             tcp = p[TCP]
3415             self.assertEqual(ip.src, self.pg0.remote_ip4)
3416             self.assertEqual(tcp.sport, 12345)
3417             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3418             self.assertEqual(tcp.dport, local_port)
3419             self.assert_packet_checksums_valid(p)
3420         except:
3421             self.logger.error(ppp("Unexpected or invalid packet:", p))
3422             raise
3423
3424         p = (
3425             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3426             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
3427             / TCP(sport=local_port, dport=12345)
3428         )
3429         self.pg1.add_stream(p)
3430         self.pg_enable_capture(self.pg_interfaces)
3431         self.pg_start()
3432         capture = self.pg0.get_capture(1)
3433         p = capture[0]
3434         try:
3435             ip = p[IP]
3436             tcp = p[TCP]
3437             self.assertEqual(ip.src, external_addr)
3438             self.assertEqual(tcp.sport, external_port)
3439             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3440             self.assertEqual(tcp.dport, 12345)
3441             self.assert_packet_checksums_valid(p)
3442         except:
3443             self.logger.error(ppp("Unexpected or invalid packet:", p))
3444             raise
3445
3446     def test_self_twice_nat_lb_negative(self):
3447         """NAT44ED Self Twice NAT local service load balancing (negative test)"""
3448         self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, client_id=2)
3449
3450     def test_self_twice_nat_negative(self):
3451         """NAT44ED Self Twice NAT (negative test)"""
3452         self.twice_nat_common(self_twice_nat=True)
3453
3454     def test_static_lb_multi_clients(self):
3455         """NAT44ED local service load balancing - multiple clients"""
3456
3457         external_addr = self.nat_addr
3458         external_port = 80
3459         local_port = 8080
3460         server1 = self.pg0.remote_hosts[0]
3461         server2 = self.pg0.remote_hosts[1]
3462         server3 = self.pg0.remote_hosts[2]
3463
3464         locals = [
3465             {"addr": server1.ip4, "port": local_port, "probability": 90, "vrf_id": 0},
3466             {"addr": server2.ip4, "port": local_port, "probability": 10, "vrf_id": 0},
3467         ]
3468
3469         flags = self.config_flags.NAT_IS_INSIDE
3470         self.vapi.nat44_interface_add_del_feature(
3471             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3472         )
3473         self.vapi.nat44_interface_add_del_feature(
3474             sw_if_index=self.pg1.sw_if_index, is_add=1
3475         )
3476
3477         self.nat_add_address(self.nat_addr)
3478         self.vapi.nat44_add_del_lb_static_mapping(
3479             is_add=1,
3480             external_addr=external_addr,
3481             external_port=external_port,
3482             protocol=IP_PROTOS.tcp,
3483             local_num=len(locals),
3484             locals=locals,
3485         )
3486
3487         server1_n = 0
3488         server2_n = 0
3489         clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3490         pkts = []
3491         for client in clients:
3492             p = (
3493                 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3494                 / IP(src=client, dst=self.nat_addr)
3495                 / TCP(sport=12345, dport=external_port)
3496             )
3497             pkts.append(p)
3498         self.pg1.add_stream(pkts)
3499         self.pg_enable_capture(self.pg_interfaces)
3500         self.pg_start()
3501         capture = self.pg0.get_capture(len(pkts))
3502         for p in capture:
3503             if p[IP].dst == server1.ip4:
3504                 server1_n += 1
3505             else:
3506                 server2_n += 1
3507         self.assertGreaterEqual(server1_n, server2_n)
3508
3509         local = {
3510             "addr": server3.ip4,
3511             "port": local_port,
3512             "probability": 20,
3513             "vrf_id": 0,
3514         }
3515
3516         # add new back-end
3517         self.vapi.nat44_lb_static_mapping_add_del_local(
3518             is_add=1,
3519             external_addr=external_addr,
3520             external_port=external_port,
3521             local=local,
3522             protocol=IP_PROTOS.tcp,
3523         )
3524         server1_n = 0
3525         server2_n = 0
3526         server3_n = 0
3527         clients = ip4_range(self.pg1.remote_ip4, 60, 110)
3528         pkts = []
3529         for client in clients:
3530             p = (
3531                 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3532                 / IP(src=client, dst=self.nat_addr)
3533                 / TCP(sport=12346, dport=external_port)
3534             )
3535             pkts.append(p)
3536         self.assertGreater(len(pkts), 0)
3537         self.pg1.add_stream(pkts)
3538         self.pg_enable_capture(self.pg_interfaces)
3539         self.pg_start()
3540         capture = self.pg0.get_capture(len(pkts))
3541         for p in capture:
3542             if p[IP].dst == server1.ip4:
3543                 server1_n += 1
3544             elif p[IP].dst == server2.ip4:
3545                 server2_n += 1
3546             else:
3547                 server3_n += 1
3548         self.assertGreater(server1_n, 0)
3549         self.assertGreater(server2_n, 0)
3550         self.assertGreater(server3_n, 0)
3551
3552         local = {
3553             "addr": server2.ip4,
3554             "port": local_port,
3555             "probability": 10,
3556             "vrf_id": 0,
3557         }
3558
3559         # remove one back-end
3560         self.vapi.nat44_lb_static_mapping_add_del_local(
3561             is_add=0,
3562             external_addr=external_addr,
3563             external_port=external_port,
3564             local=local,
3565             protocol=IP_PROTOS.tcp,
3566         )
3567         server1_n = 0
3568         server2_n = 0
3569         server3_n = 0
3570         self.pg1.add_stream(pkts)
3571         self.pg_enable_capture(self.pg_interfaces)
3572         self.pg_start()
3573         capture = self.pg0.get_capture(len(pkts))
3574         for p in capture:
3575             if p[IP].dst == server1.ip4:
3576                 server1_n += 1
3577             elif p[IP].dst == server2.ip4:
3578                 server2_n += 1
3579             else:
3580                 server3_n += 1
3581         self.assertGreater(server1_n, 0)
3582         self.assertEqual(server2_n, 0)
3583         self.assertGreater(server3_n, 0)
3584
3585     # put zzz in front of syslog test name so that it runs as a last test
3586     # setting syslog sender cannot be undone and if it is set, it messes
3587     # with self.send_and_assert_no_replies functionality
3588     def test_zzz_syslog_sess(self):
3589         """NAT44ED Test syslog session creation and deletion"""
3590         self.vapi.syslog_set_filter(self.syslog_severity.SYSLOG_API_SEVERITY_INFO)
3591         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
3592
3593         self.nat_add_address(self.nat_addr)
3594         self.nat_add_inside_interface(self.pg0)
3595         self.nat_add_outside_interface(self.pg1)
3596
3597         p = (
3598             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3599             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3600             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
3601         )
3602         self.pg0.add_stream(p)
3603         self.pg_enable_capture(self.pg_interfaces)
3604         self.pg_start()
3605         capture = self.pg1.get_capture(1)
3606         self.tcp_port_out = capture[0][TCP].sport
3607         capture = self.pg3.get_capture(1)
3608         self.verify_syslog_sess(capture[0][Raw].load, "SADD")
3609
3610         self.pg_enable_capture(self.pg_interfaces)
3611         self.pg_start()
3612         self.nat_add_address(self.nat_addr, is_add=0)
3613         capture = self.pg3.get_capture(1)
3614         self.verify_syslog_sess(capture[0][Raw].load, "SDEL")
3615
3616     # put zzz in front of syslog test name so that it runs as a last test
3617     # setting syslog sender cannot be undone and if it is set, it messes
3618     # with self.send_and_assert_no_replies functionality
3619     def test_zzz_syslog_sess_reopen(self):
3620         """Syslog events for session reopen"""
3621         self.vapi.syslog_set_filter(self.syslog_severity.SYSLOG_API_SEVERITY_INFO)
3622         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
3623
3624         self.nat_add_address(self.nat_addr)
3625         self.nat_add_inside_interface(self.pg0)
3626         self.nat_add_outside_interface(self.pg1)
3627
3628         # SYN in2out
3629         p = (
3630             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3631             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3632             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
3633         )
3634         capture = self.send_and_expect(self.pg0, p, self.pg1)[0]
3635         self.tcp_port_out = capture[0][TCP].sport
3636         capture = self.pg3.get_capture(1)
3637         self.verify_syslog_sess(capture[0][Raw].load, "SADD")
3638
3639         # SYN out2in
3640         p = (
3641             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3642             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3643             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="SA")
3644         )
3645         self.send_and_expect(self.pg1, p, self.pg0)
3646
3647         p = (
3648             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3649             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3650             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")
3651         )
3652         self.send_and_expect(self.pg0, p, self.pg1)
3653
3654         # FIN in2out
3655         p = (
3656             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3657             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3658             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="F")
3659         )
3660         self.send_and_expect(self.pg0, p, self.pg1)
3661
3662         # FIN out2in
3663         p = (
3664             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3665             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3666             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="F")
3667         )
3668         self.send_and_expect(self.pg1, p, self.pg0)
3669
3670         self.init_tcp_session(
3671             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
3672         )
3673
3674         # 2 records should be produced - first one del & add
3675         capture = self.pg3.get_capture(2)
3676         self.verify_syslog_sess(capture[0][Raw].load, "SDEL")
3677         self.verify_syslog_sess(capture[1][Raw].load, "SADD")
3678
3679     def test_twice_nat_interface_addr(self):
3680         """NAT44ED Acquire twice NAT addresses from interface"""
3681         flags = self.config_flags.NAT_IS_TWICE_NAT
3682         self.vapi.nat44_add_del_interface_addr(
3683             sw_if_index=self.pg11.sw_if_index, flags=flags, is_add=1
3684         )
3685
3686         # no address in NAT pool
3687         adresses = self.vapi.nat44_address_dump()
3688         self.assertEqual(0, len(adresses))
3689
3690         # configure interface address and check NAT address pool
3691         self.pg11.config_ip4()
3692         adresses = self.vapi.nat44_address_dump()
3693         self.assertEqual(1, len(adresses))
3694         self.assertEqual(str(adresses[0].ip_address), self.pg11.local_ip4)
3695         self.assertEqual(adresses[0].flags, flags)
3696
3697         # remove interface address and check NAT address pool
3698         self.pg11.unconfig_ip4()
3699         adresses = self.vapi.nat44_address_dump()
3700         self.assertEqual(0, len(adresses))
3701
3702     def test_output_feature_stateful_acl(self):
3703         """NAT44ED output feature works with stateful ACL"""
3704
3705         self.nat_add_address(self.nat_addr)
3706         self.vapi.nat44_ed_add_del_output_interface(
3707             sw_if_index=self.pg1.sw_if_index, is_add=1
3708         )
3709
3710         # First ensure that the NAT is working sans ACL
3711
3712         # send packets out2in, no sessions yet so packets should drop
3713         pkts_out2in = self.create_stream_out(self.pg1)
3714         self.send_and_assert_no_replies(self.pg1, pkts_out2in)
3715
3716         # send packets into inside intf, ensure received via outside intf
3717         pkts_in2out = self.create_stream_in(self.pg0, self.pg1)
3718         capture = self.send_and_expect(
3719             self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)
3720         )
3721         self.verify_capture_out(capture, ignore_port=True)
3722
3723         # send out2in again, with sessions created it should work now
3724         pkts_out2in = self.create_stream_out(self.pg1)
3725         capture = self.send_and_expect(
3726             self.pg1, pkts_out2in, self.pg0, len(pkts_out2in)
3727         )
3728         self.verify_capture_in(capture, self.pg0)
3729
3730         # Create an ACL blocking everything
3731         out2in_deny_rule = AclRule(is_permit=0)
3732         out2in_acl = VppAcl(self, rules=[out2in_deny_rule])
3733         out2in_acl.add_vpp_config()
3734
3735         # create an ACL to permit/reflect everything
3736         in2out_reflect_rule = AclRule(is_permit=2)
3737         in2out_acl = VppAcl(self, rules=[in2out_reflect_rule])
3738         in2out_acl.add_vpp_config()
3739
3740         # apply as input acl on interface and confirm it blocks everything
3741         acl_if = VppAclInterface(
3742             self, sw_if_index=self.pg1.sw_if_index, n_input=1, acls=[out2in_acl]
3743         )
3744         acl_if.add_vpp_config()
3745         self.send_and_assert_no_replies(self.pg1, pkts_out2in)
3746
3747         # apply output acl
3748         acl_if.acls = [out2in_acl, in2out_acl]
3749         acl_if.add_vpp_config()
3750         # send in2out to generate ACL state (NAT state was created earlier)
3751         capture = self.send_and_expect(
3752             self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)
3753         )
3754         self.verify_capture_out(capture, ignore_port=True)
3755
3756         # send out2in again. ACL state exists so it should work now.
3757         # TCP packets with the syn flag set also need the ack flag
3758         for p in pkts_out2in:
3759             if p.haslayer(TCP) and p[TCP].flags & 0x02:
3760                 p[TCP].flags |= 0x10
3761         capture = self.send_and_expect(
3762             self.pg1, pkts_out2in, self.pg0, len(pkts_out2in)
3763         )
3764         self.verify_capture_in(capture, self.pg0)
3765         self.logger.info(self.vapi.cli("show trace"))
3766
3767     def test_tcp_close(self):
3768         """NAT44ED Close TCP session from inside network - output feature"""
3769         config = self.vapi.nat44_show_running_config()
3770         old_timeouts = config.timeouts
3771         new_transitory = 2
3772         self.vapi.nat_set_timeouts(
3773             udp=old_timeouts.udp,
3774             tcp_established=old_timeouts.tcp_established,
3775             icmp=old_timeouts.icmp,
3776             tcp_transitory=new_transitory,
3777         )
3778
3779         self.vapi.nat44_forwarding_enable_disable(enable=1)
3780         self.nat_add_address(self.pg1.local_ip4)
3781         twice_nat_addr = "10.0.1.3"
3782         service_ip = "192.168.16.150"
3783         self.nat_add_address(twice_nat_addr, twice_nat=1)
3784
3785         flags = self.config_flags.NAT_IS_INSIDE
3786         self.vapi.nat44_interface_add_del_feature(
3787             sw_if_index=self.pg0.sw_if_index, is_add=1
3788         )
3789         self.vapi.nat44_interface_add_del_feature(
3790             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3791         )
3792         self.vapi.nat44_ed_add_del_output_interface(
3793             is_add=1, sw_if_index=self.pg1.sw_if_index
3794         )
3795
3796         flags = (
3797             self.config_flags.NAT_IS_OUT2IN_ONLY | self.config_flags.NAT_IS_TWICE_NAT
3798         )
3799         self.nat_add_static_mapping(
3800             self.pg0.remote_ip4, service_ip, 80, 80, proto=IP_PROTOS.tcp, flags=flags
3801         )
3802         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3803         start_sessnum = len(sessions)
3804
3805         # SYN packet out->in
3806         p = (
3807             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3808             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3809             / TCP(sport=33898, dport=80, flags="S")
3810         )
3811         capture = self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3812         p = capture[0]
3813         tcp_port = p[TCP].sport
3814
3815         # SYN + ACK packet in->out
3816         p = (
3817             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3818             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3819             / TCP(sport=80, dport=tcp_port, flags="SA")
3820         )
3821         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3822
3823         # ACK packet out->in
3824         p = (
3825             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3826             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3827             / TCP(sport=33898, dport=80, flags="A")
3828         )
3829         self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3830
3831         # FIN packet in -> out
3832         p = (
3833             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3834             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3835             / TCP(sport=80, dport=tcp_port, flags="FA", seq=100, ack=300)
3836         )
3837         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3838
3839         # FIN+ACK packet out -> in
3840         p = (
3841             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3842             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3843             / TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101)
3844         )
3845         self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3846
3847         # ACK packet in -> out
3848         p = (
3849             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3850             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3851             / TCP(sport=80, dport=tcp_port, flags="A", seq=101, ack=301)
3852         )
3853         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3854
3855         # session now in transitory timeout, but traffic still flows
3856         # try FIN packet out->in
3857         p = (
3858             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3859             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3860             / TCP(sport=33898, dport=80, flags="F")
3861         )
3862         self.pg1.add_stream(p)
3863         self.pg_enable_capture(self.pg_interfaces)
3864         self.pg_start()
3865
3866         self.virtual_sleep(new_transitory, "wait for transitory timeout")
3867         self.pg0.get_capture(1)
3868
3869         # session should still exist
3870         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3871         self.assertEqual(len(sessions) - start_sessnum, 1)
3872
3873         # send FIN+ACK packet out -> in - will cause session to be wiped
3874         # but won't create a new session
3875         p = (
3876             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3877             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3878             / TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101)
3879         )
3880         self.send_and_assert_no_replies(self.pg1, p)
3881         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3882         self.assertEqual(len(sessions) - start_sessnum, 0)
3883
3884     def test_tcp_session_close_in(self):
3885         """NAT44ED Close TCP session from inside network"""
3886
3887         in_port = self.tcp_port_in
3888         out_port = 10505
3889         ext_port = self.tcp_external_port
3890
3891         self.nat_add_address(self.nat_addr)
3892         self.nat_add_inside_interface(self.pg0)
3893         self.nat_add_outside_interface(self.pg1)
3894         self.nat_add_static_mapping(
3895             self.pg0.remote_ip4,
3896             self.nat_addr,
3897             in_port,
3898             out_port,
3899             proto=IP_PROTOS.tcp,
3900             flags=self.config_flags.NAT_IS_TWICE_NAT,
3901         )
3902
3903         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3904         session_n = len(sessions)
3905
3906         self.vapi.nat_set_timeouts(
3907             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
3908         )
3909
3910         self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
3911
3912         # FIN packet in -> out
3913         p = (
3914             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3915             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3916             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
3917         )
3918         self.send_and_expect(self.pg0, p, self.pg1)
3919         pkts = []
3920
3921         # ACK packet out -> in
3922         p = (
3923             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3924             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3925             / TCP(sport=ext_port, dport=out_port, flags="A", seq=300, ack=101)
3926         )
3927         pkts.append(p)
3928
3929         # FIN packet out -> in
3930         p = (
3931             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3932             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3933             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
3934         )
3935         pkts.append(p)
3936
3937         self.send_and_expect(self.pg1, pkts, self.pg0)
3938
3939         # ACK packet in -> out
3940         p = (
3941             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3942             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3943             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3944         )
3945         self.send_and_expect(self.pg0, p, self.pg1)
3946
3947         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3948         self.assertEqual(len(sessions) - session_n, 1)
3949
3950         # retransmit FIN packet out -> in
3951         p = (
3952             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3953             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3954             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
3955         )
3956
3957         self.send_and_expect(self.pg1, p, self.pg0)
3958
3959         # retransmit ACK packet in -> out
3960         p = (
3961             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3962             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3963             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3964         )
3965         self.send_and_expect(self.pg0, p, self.pg1)
3966
3967         self.virtual_sleep(3)
3968         # retransmit ACK packet in -> out - this will cause session to be wiped
3969         p = (
3970             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3971             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3972             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3973         )
3974         self.send_and_assert_no_replies(self.pg0, p)
3975         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3976         self.assertEqual(len(sessions) - session_n, 0)
3977
3978     def test_tcp_session_close_out(self):
3979         """NAT44ED Close TCP session from outside network"""
3980
3981         in_port = self.tcp_port_in
3982         out_port = 10505
3983         ext_port = self.tcp_external_port
3984
3985         self.nat_add_address(self.nat_addr)
3986         self.nat_add_inside_interface(self.pg0)
3987         self.nat_add_outside_interface(self.pg1)
3988         self.nat_add_static_mapping(
3989             self.pg0.remote_ip4,
3990             self.nat_addr,
3991             in_port,
3992             out_port,
3993             proto=IP_PROTOS.tcp,
3994             flags=self.config_flags.NAT_IS_TWICE_NAT,
3995         )
3996
3997         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3998         session_n = len(sessions)
3999
4000         self.vapi.nat_set_timeouts(
4001             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4002         )
4003
4004         _ = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4005
4006         # FIN packet out -> in
4007         p = (
4008             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4009             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4010             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=100, ack=300)
4011         )
4012         self.pg1.add_stream(p)
4013         self.pg_enable_capture(self.pg_interfaces)
4014         self.pg_start()
4015         self.pg0.get_capture(1)
4016
4017         # FIN+ACK packet in -> out
4018         p = (
4019             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4020             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4021             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=300, ack=101)
4022         )
4023
4024         self.pg0.add_stream(p)
4025         self.pg_enable_capture(self.pg_interfaces)
4026         self.pg_start()
4027         self.pg1.get_capture(1)
4028
4029         # ACK packet out -> in
4030         p = (
4031             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4032             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4033             / TCP(sport=ext_port, dport=out_port, flags="A", seq=101, ack=301)
4034         )
4035         self.pg1.add_stream(p)
4036         self.pg_enable_capture(self.pg_interfaces)
4037         self.pg_start()
4038         self.pg0.get_capture(1)
4039
4040         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4041         self.assertEqual(len(sessions) - session_n, 1)
4042
4043         # retransmit FIN packet out -> in
4044         p = (
4045             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4046             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4047             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
4048         )
4049         self.send_and_expect(self.pg1, p, self.pg0)
4050
4051         # retransmit ACK packet in -> out
4052         p = (
4053             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4054             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4055             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4056         )
4057         self.send_and_expect(self.pg0, p, self.pg1)
4058
4059         self.virtual_sleep(3)
4060         # retransmit ACK packet in -> out - this will cause session to be wiped
4061         p = (
4062             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4063             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4064             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4065         )
4066         self.send_and_assert_no_replies(self.pg0, p)
4067         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4068         self.assertEqual(len(sessions) - session_n, 0)
4069
4070     def test_tcp_session_close_simultaneous(self):
4071         """Simultaneous TCP close from both sides"""
4072
4073         in_port = self.tcp_port_in
4074         ext_port = 10505
4075
4076         self.nat_add_address(self.nat_addr)
4077         self.nat_add_inside_interface(self.pg0)
4078         self.nat_add_outside_interface(self.pg1)
4079         self.nat_add_static_mapping(
4080             self.pg0.remote_ip4,
4081             self.nat_addr,
4082             in_port,
4083             ext_port,
4084             proto=IP_PROTOS.tcp,
4085             flags=self.config_flags.NAT_IS_TWICE_NAT,
4086         )
4087
4088         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4089         session_n = len(sessions)
4090
4091         self.vapi.nat_set_timeouts(
4092             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4093         )
4094
4095         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4096
4097         # FIN packet in -> out
4098         p = (
4099             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4100             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4101             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4102         )
4103         self.send_and_expect(self.pg0, p, self.pg1)
4104
4105         # FIN packet out -> in
4106         p = (
4107             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4108             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4109             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4110         )
4111         self.send_and_expect(self.pg1, p, self.pg0)
4112
4113         # ACK packet in -> out
4114         p = (
4115             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4116             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4117             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4118         )
4119         self.send_and_expect(self.pg0, p, self.pg1)
4120
4121         # ACK packet out -> in
4122         p = (
4123             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4124             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4125             / TCP(sport=ext_port, dport=out_port, flags="A", seq=301, ack=101)
4126         )
4127         self.send_and_expect(self.pg1, p, self.pg0)
4128
4129         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4130         self.assertEqual(len(sessions) - session_n, 1)
4131
4132         # retransmit FIN packet out -> in
4133         p = (
4134             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4135             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4136             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
4137         )
4138         self.send_and_expect(self.pg1, p, self.pg0)
4139
4140         # retransmit ACK packet in -> out
4141         p = (
4142             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4143             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4144             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4145         )
4146         self.send_and_expect(self.pg0, p, self.pg1)
4147
4148         self.virtual_sleep(3)
4149         # retransmit ACK packet in -> out - this will cause session to be wiped
4150         p = (
4151             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4152             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4153             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4154         )
4155         self.pg_send(self.pg0, p)
4156         self.send_and_assert_no_replies(self.pg0, p)
4157         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4158         self.assertEqual(len(sessions) - session_n, 0)
4159
4160     def test_tcp_session_half_reopen_inside(self):
4161         """TCP session in FIN/FIN state not reopened by in2out SYN only"""
4162         in_port = self.tcp_port_in
4163         ext_port = 10505
4164
4165         self.nat_add_address(self.nat_addr)
4166         self.nat_add_inside_interface(self.pg0)
4167         self.nat_add_outside_interface(self.pg1)
4168         self.nat_add_static_mapping(
4169             self.pg0.remote_ip4,
4170             self.nat_addr,
4171             in_port,
4172             ext_port,
4173             proto=IP_PROTOS.tcp,
4174             flags=self.config_flags.NAT_IS_TWICE_NAT,
4175         )
4176
4177         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4178         session_n = len(sessions)
4179
4180         self.vapi.nat_set_timeouts(
4181             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4182         )
4183
4184         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4185
4186         # FIN packet in -> out
4187         p = (
4188             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4189             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4190             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4191         )
4192         self.send_and_expect(self.pg0, p, self.pg1)
4193
4194         # FIN packet out -> in
4195         p = (
4196             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4197             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4198             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4199         )
4200         self.send_and_expect(self.pg1, p, self.pg0)
4201
4202         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4203         self.assertEqual(len(sessions) - session_n, 1)
4204
4205         # send SYN packet in -> out
4206         p = (
4207             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4208             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4209             / TCP(sport=in_port, dport=ext_port, flags="S", seq=101, ack=301)
4210         )
4211         self.send_and_expect(self.pg0, p, self.pg1)
4212
4213         self.virtual_sleep(3)
4214         # send ACK packet in -> out - session should be wiped
4215         p = (
4216             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4217             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4218             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4219         )
4220         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
4221         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4222         self.assertEqual(len(sessions) - session_n, 0)
4223
4224     def test_tcp_session_half_reopen_outside(self):
4225         """TCP session in FIN/FIN state not reopened by out2in SYN only"""
4226         in_port = self.tcp_port_in
4227         ext_port = 10505
4228
4229         self.nat_add_address(self.nat_addr)
4230         self.nat_add_inside_interface(self.pg0)
4231         self.nat_add_outside_interface(self.pg1)
4232         self.nat_add_static_mapping(
4233             self.pg0.remote_ip4,
4234             self.nat_addr,
4235             in_port,
4236             ext_port,
4237             proto=IP_PROTOS.tcp,
4238             flags=self.config_flags.NAT_IS_TWICE_NAT,
4239         )
4240
4241         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4242         session_n = len(sessions)
4243
4244         self.vapi.nat_set_timeouts(
4245             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4246         )
4247
4248         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4249
4250         # FIN packet in -> out
4251         p = (
4252             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4253             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4254             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4255         )
4256         self.send_and_expect(self.pg0, p, self.pg1)
4257
4258         # FIN packet out -> in
4259         p = (
4260             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4261             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4262             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4263         )
4264         self.send_and_expect(self.pg1, p, self.pg0)
4265
4266         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4267         self.assertEqual(len(sessions) - session_n, 1)
4268
4269         # send SYN packet out -> in
4270         p = (
4271             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4272             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4273             / TCP(sport=ext_port, dport=out_port, flags="S", seq=300, ack=101)
4274         )
4275         self.send_and_expect(self.pg1, p, self.pg0)
4276
4277         self.virtual_sleep(3)
4278         # send ACK packet in -> out - session should be wiped
4279         p = (
4280             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4281             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4282             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4283         )
4284         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
4285         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4286         self.assertEqual(len(sessions) - session_n, 0)
4287
4288     def test_tcp_session_reopen(self):
4289         """TCP session in FIN/FIN state reopened by SYN from both sides"""
4290         in_port = self.tcp_port_in
4291         ext_port = 10505
4292
4293         self.nat_add_address(self.nat_addr)
4294         self.nat_add_inside_interface(self.pg0)
4295         self.nat_add_outside_interface(self.pg1)
4296         self.nat_add_static_mapping(
4297             self.pg0.remote_ip4,
4298             self.nat_addr,
4299             in_port,
4300             ext_port,
4301             proto=IP_PROTOS.tcp,
4302             flags=self.config_flags.NAT_IS_TWICE_NAT,
4303         )
4304
4305         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4306         session_n = len(sessions)
4307
4308         self.vapi.nat_set_timeouts(
4309             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4310         )
4311
4312         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4313
4314         # FIN packet in -> out
4315         p = (
4316             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4317             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4318             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4319         )
4320         self.send_and_expect(self.pg0, p, self.pg1)
4321
4322         # FIN packet out -> in
4323         p = (
4324             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4325             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4326             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4327         )
4328         self.send_and_expect(self.pg1, p, self.pg0)
4329
4330         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4331         self.assertEqual(len(sessions) - session_n, 1)
4332
4333         # send SYN packet out -> in
4334         p = (
4335             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4336             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4337             / TCP(sport=ext_port, dport=out_port, flags="S", seq=300, ack=101)
4338         )
4339         self.send_and_expect(self.pg1, p, self.pg0)
4340
4341         # send SYN packet in -> out
4342         p = (
4343             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4344             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4345             / TCP(sport=in_port, dport=ext_port, flags="SA", seq=101, ack=301)
4346         )
4347         self.send_and_expect(self.pg0, p, self.pg1)
4348
4349         # send ACK packet out -> in
4350         p = (
4351             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4352             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4353             / TCP(sport=ext_port, dport=out_port, flags="A", seq=300, ack=101)
4354         )
4355         self.send_and_expect(self.pg1, p, self.pg0)
4356
4357         self.virtual_sleep(3)
4358         # send ACK packet in -> out - should be forwarded and session alive
4359         p = (
4360             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4361             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4362             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4363         )
4364         self.send_and_expect(self.pg0, p, self.pg1)
4365         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4366         self.assertEqual(len(sessions) - session_n, 1)
4367
4368     def test_dynamic_vrf(self):
4369         """NAT44ED dynamic translation test: different VRF"""
4370
4371         vrf_id_in = 33
4372         vrf_id_out = 34
4373
4374         self.nat_add_address(self.nat_addr, vrf_id=vrf_id_in)
4375
4376         try:
4377             self.configure_ip4_interface(self.pg7, table_id=vrf_id_in)
4378             self.configure_ip4_interface(self.pg8, table_id=vrf_id_out)
4379
4380             self.nat_add_inside_interface(self.pg7)
4381             self.nat_add_outside_interface(self.pg8)
4382
4383             # just basic stuff nothing special
4384             pkts = self.create_stream_in(self.pg7, self.pg8)
4385             self.pg7.add_stream(pkts)
4386             self.pg_enable_capture(self.pg_interfaces)
4387             self.pg_start()
4388             capture = self.pg8.get_capture(len(pkts))
4389             self.verify_capture_out(capture, ignore_port=True)
4390
4391             pkts = self.create_stream_out(self.pg8)
4392             self.pg8.add_stream(pkts)
4393             self.pg_enable_capture(self.pg_interfaces)
4394             self.pg_start()
4395             capture = self.pg7.get_capture(len(pkts))
4396             self.verify_capture_in(capture, self.pg7)
4397
4398         finally:
4399             self.pg7.unconfig()
4400             self.pg8.unconfig()
4401
4402             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id_in})
4403             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id_out})
4404
4405     def test_dynamic_output_feature_vrf(self):
4406         """NAT44ED dynamic translation test: output-feature, VRF"""
4407
4408         # other then default (0)
4409         new_vrf_id = 22
4410
4411         self.nat_add_address(self.nat_addr)
4412         self.vapi.nat44_ed_add_del_output_interface(
4413             sw_if_index=self.pg8.sw_if_index, is_add=1
4414         )
4415         try:
4416             self.configure_ip4_interface(self.pg7, table_id=new_vrf_id)
4417             self.configure_ip4_interface(self.pg8, table_id=new_vrf_id)
4418
4419             # in2out
4420             tcpn = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
4421             udpn = self.statistics["/nat44-ed/in2out/slowpath/udp"]
4422             icmpn = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
4423             drops = self.statistics["/nat44-ed/in2out/slowpath/drops"]
4424
4425             pkts = self.create_stream_in(self.pg7, self.pg8)
4426             self.pg7.add_stream(pkts)
4427             self.pg_enable_capture(self.pg_interfaces)
4428             self.pg_start()
4429             capture = self.pg8.get_capture(len(pkts))
4430             self.verify_capture_out(capture, ignore_port=True)
4431
4432             if_idx = self.pg8.sw_if_index
4433             cnt = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
4434             self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
4435             cnt = self.statistics["/nat44-ed/in2out/slowpath/udp"]
4436             self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
4437             cnt = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
4438             self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
4439             cnt = self.statistics["/nat44-ed/in2out/slowpath/drops"]
4440             self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
4441
4442             # out2in
4443             tcpn = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
4444             udpn = self.statistics["/nat44-ed/out2in/fastpath/udp"]
4445             icmpn = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
4446             drops = self.statistics["/nat44-ed/out2in/fastpath/drops"]
4447
4448             pkts = self.create_stream_out(self.pg8)
4449             self.pg8.add_stream(pkts)
4450             self.pg_enable_capture(self.pg_interfaces)
4451             self.pg_start()
4452             capture = self.pg7.get_capture(len(pkts))
4453             self.verify_capture_in(capture, self.pg7)
4454
4455             if_idx = self.pg8.sw_if_index
4456             cnt = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
4457             self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
4458             cnt = self.statistics["/nat44-ed/out2in/fastpath/udp"]
4459             self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
4460             cnt = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
4461             self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
4462             cnt = self.statistics["/nat44-ed/out2in/fastpath/drops"]
4463             self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
4464
4465             sessions = self.statistics["/nat44-ed/total-sessions"]
4466             self.assertEqual(sessions[:, 0].sum(), 3)
4467
4468         finally:
4469             self.pg7.unconfig()
4470             self.pg8.unconfig()
4471
4472             self.vapi.ip_table_add_del(is_add=0, table={"table_id": new_vrf_id})
4473
4474     def test_next_src_nat(self):
4475         """NAT44ED On way back forward packet to nat44-in2out node."""
4476
4477         twice_nat_addr = "10.0.1.3"
4478         external_port = 80
4479         local_port = 8080
4480         post_twice_nat_port = 0
4481
4482         self.vapi.nat44_forwarding_enable_disable(enable=1)
4483         self.nat_add_address(twice_nat_addr, twice_nat=1)
4484         flags = (
4485             self.config_flags.NAT_IS_OUT2IN_ONLY
4486             | self.config_flags.NAT_IS_SELF_TWICE_NAT
4487         )
4488         self.nat_add_static_mapping(
4489             self.pg6.remote_ip4,
4490             self.pg1.remote_ip4,
4491             local_port,
4492             external_port,
4493             proto=IP_PROTOS.tcp,
4494             vrf_id=1,
4495             flags=flags,
4496         )
4497         self.vapi.nat44_interface_add_del_feature(
4498             sw_if_index=self.pg6.sw_if_index, is_add=1
4499         )
4500
4501         p = (
4502             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
4503             / IP(src=self.pg6.remote_ip4, dst=self.pg1.remote_ip4)
4504             / TCP(sport=12345, dport=external_port)
4505         )
4506         self.pg6.add_stream(p)
4507         self.pg_enable_capture(self.pg_interfaces)
4508         self.pg_start()
4509         capture = self.pg6.get_capture(1)
4510         p = capture[0]
4511         try:
4512             ip = p[IP]
4513             tcp = p[TCP]
4514             self.assertEqual(ip.src, twice_nat_addr)
4515             self.assertNotEqual(tcp.sport, 12345)
4516             post_twice_nat_port = tcp.sport
4517             self.assertEqual(ip.dst, self.pg6.remote_ip4)
4518             self.assertEqual(tcp.dport, local_port)
4519             self.assert_packet_checksums_valid(p)
4520         except:
4521             self.logger.error(ppp("Unexpected or invalid packet:", p))
4522             raise
4523
4524         p = (
4525             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
4526             / IP(src=self.pg6.remote_ip4, dst=twice_nat_addr)
4527             / TCP(sport=local_port, dport=post_twice_nat_port)
4528         )
4529         self.pg6.add_stream(p)
4530         self.pg_enable_capture(self.pg_interfaces)
4531         self.pg_start()
4532         capture = self.pg6.get_capture(1)
4533         p = capture[0]
4534         try:
4535             ip = p[IP]
4536             tcp = p[TCP]
4537             self.assertEqual(ip.src, self.pg1.remote_ip4)
4538             self.assertEqual(tcp.sport, external_port)
4539             self.assertEqual(ip.dst, self.pg6.remote_ip4)
4540             self.assertEqual(tcp.dport, 12345)
4541             self.assert_packet_checksums_valid(p)
4542         except:
4543             self.logger.error(ppp("Unexpected or invalid packet:", p))
4544             raise
4545
4546     def test_one_armed_nat44_static(self):
4547         """NAT44ED One armed NAT and 1:1 NAPT asymmetrical rule"""
4548
4549         remote_host = self.pg4.remote_hosts[0]
4550         local_host = self.pg4.remote_hosts[1]
4551         external_port = 80
4552         local_port = 8080
4553         eh_port_in = 0
4554
4555         self.vapi.nat44_forwarding_enable_disable(enable=1)
4556         self.nat_add_address(self.nat_addr, twice_nat=1)
4557         flags = (
4558             self.config_flags.NAT_IS_OUT2IN_ONLY | self.config_flags.NAT_IS_TWICE_NAT
4559         )
4560         self.nat_add_static_mapping(
4561             local_host.ip4,
4562             self.nat_addr,
4563             local_port,
4564             external_port,
4565             proto=IP_PROTOS.tcp,
4566             flags=flags,
4567         )
4568         flags = self.config_flags.NAT_IS_INSIDE
4569         self.vapi.nat44_interface_add_del_feature(
4570             sw_if_index=self.pg4.sw_if_index, is_add=1
4571         )
4572         self.vapi.nat44_interface_add_del_feature(
4573             sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
4574         )
4575
4576         # from client to service
4577         p = (
4578             Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
4579             / IP(src=remote_host.ip4, dst=self.nat_addr)
4580             / TCP(sport=12345, dport=external_port)
4581         )
4582         self.pg4.add_stream(p)
4583         self.pg_enable_capture(self.pg_interfaces)
4584         self.pg_start()
4585         capture = self.pg4.get_capture(1)
4586         p = capture[0]
4587         try:
4588             ip = p[IP]
4589             tcp = p[TCP]
4590             self.assertEqual(ip.dst, local_host.ip4)
4591             self.assertEqual(ip.src, self.nat_addr)
4592             self.assertEqual(tcp.dport, local_port)
4593             self.assertNotEqual(tcp.sport, 12345)
4594             eh_port_in = tcp.sport
4595             self.assert_packet_checksums_valid(p)
4596         except:
4597             self.logger.error(ppp("Unexpected or invalid packet:", p))
4598             raise
4599
4600         # from service back to client
4601         p = (
4602             Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
4603             / IP(src=local_host.ip4, dst=self.nat_addr)
4604             / TCP(sport=local_port, dport=eh_port_in)
4605         )
4606         self.pg4.add_stream(p)
4607         self.pg_enable_capture(self.pg_interfaces)
4608         self.pg_start()
4609         capture = self.pg4.get_capture(1)
4610         p = capture[0]
4611         try:
4612             ip = p[IP]
4613             tcp = p[TCP]
4614             self.assertEqual(ip.src, self.nat_addr)
4615             self.assertEqual(ip.dst, remote_host.ip4)
4616             self.assertEqual(tcp.sport, external_port)
4617             self.assertEqual(tcp.dport, 12345)
4618             self.assert_packet_checksums_valid(p)
4619         except:
4620             self.logger.error(ppp("Unexpected or invalid packet:", p))
4621             raise
4622
4623     def test_icmp_error_fwd_outbound(self):
4624         """NAT44ED ICMP error outbound with forwarding enabled"""
4625
4626         # Ensure that an outbound ICMP error message is properly associated
4627         # with the inbound forward bypass session it is related to.
4628         payload = "H" * 10
4629
4630         self.nat_add_address(self.nat_addr)
4631         self.nat_add_inside_interface(self.pg0)
4632         self.nat_add_outside_interface(self.pg1)
4633
4634         # enable forwarding and initiate connection out2in
4635         self.vapi.nat44_forwarding_enable_disable(enable=1)
4636         p1 = (
4637             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4638             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
4639             / UDP(sport=21, dport=20)
4640             / payload
4641         )
4642
4643         self.pg1.add_stream(p1)
4644         self.pg_enable_capture(self.pg_interfaces)
4645         self.pg_start()
4646         capture = self.pg0.get_capture(1)[0]
4647
4648         self.logger.info(self.vapi.cli("show nat44 sessions"))
4649
4650         # reply with ICMP error message in2out
4651         # We cannot reliably retrieve forward bypass sessions via the API.
4652         # session dumps for a user will only look on the worker that the
4653         # user is supposed to be mapped to in2out. The forward bypass session
4654         # is not necessarily created on that worker.
4655         p2 = (
4656             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4657             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4658             / ICMP(type="dest-unreach", code="port-unreachable")
4659             / capture[IP:]
4660         )
4661
4662         self.pg0.add_stream(p2)
4663         self.pg_enable_capture(self.pg_interfaces)
4664         self.pg_start()
4665         capture = self.pg1.get_capture(1)[0]
4666
4667         self.logger.info(self.vapi.cli("show nat44 sessions"))
4668
4669         self.logger.info(ppp("p1 packet:", p1))
4670         self.logger.info(ppp("p2 packet:", p2))
4671         self.logger.info(ppp("capture packet:", capture))
4672
4673     def test_tcp_session_open_retransmit1(self):
4674         """NAT44ED Open TCP session with SYN,ACK retransmit 1
4675
4676         The client does not receive the [SYN,ACK] or the
4677         ACK from the client is lost. Therefore, the [SYN, ACK]
4678         is retransmitted by the server.
4679         """
4680
4681         in_port = self.tcp_port_in
4682         ext_port = self.tcp_external_port
4683         payload = "H" * 10
4684
4685         self.nat_add_address(self.nat_addr)
4686         self.nat_add_inside_interface(self.pg0)
4687         self.nat_add_outside_interface(self.pg1)
4688
4689         self.vapi.nat_set_timeouts(
4690             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
4691         )
4692         # SYN packet in->out
4693         p = (
4694             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4695             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4696             / TCP(sport=in_port, dport=ext_port, flags="S")
4697         )
4698         p = self.send_and_expect(self.pg0, p, self.pg1)[0]
4699         out_port = p[TCP].sport
4700
4701         # SYN + ACK packet out->in
4702         p = (
4703             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4704             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4705             / TCP(sport=ext_port, dport=out_port, flags="SA")
4706         )
4707         self.send_and_expect(self.pg1, p, self.pg0)
4708
4709         # ACK in->out does not arrive
4710
4711         # resent SYN + ACK packet out->in
4712         p = (
4713             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4714             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4715             / TCP(sport=ext_port, dport=out_port, flags="SA")
4716         )
4717         self.send_and_expect(self.pg1, p, self.pg0)
4718
4719         # ACK packet in->out
4720         p = (
4721             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4722             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4723             / TCP(sport=in_port, dport=ext_port, flags="A")
4724         )
4725         self.send_and_expect(self.pg0, p, self.pg1)
4726
4727         # Verify that the data can be transmitted after the transitory time
4728         self.virtual_sleep(6)
4729
4730         p = (
4731             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4732             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4733             / TCP(sport=in_port, dport=ext_port, flags="PA")
4734             / Raw(payload)
4735         )
4736         self.send_and_expect(self.pg0, p, self.pg1)
4737
4738     def test_tcp_session_open_retransmit2(self):
4739         """NAT44ED Open TCP session with SYN,ACK retransmit 2
4740
4741         The ACK is lost to the server after the TCP session is opened.
4742         Data is sent by the client, then the [SYN,ACK] is
4743         retransmitted by the server.
4744         """
4745
4746         in_port = self.tcp_port_in
4747         ext_port = self.tcp_external_port
4748         payload = "H" * 10
4749
4750         self.nat_add_address(self.nat_addr)
4751         self.nat_add_inside_interface(self.pg0)
4752         self.nat_add_outside_interface(self.pg1)
4753
4754         self.vapi.nat_set_timeouts(
4755             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
4756         )
4757         # SYN packet in->out
4758         p = (
4759             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4760             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4761             / TCP(sport=in_port, dport=ext_port, flags="S")
4762         )
4763         p = self.send_and_expect(self.pg0, p, self.pg1)[0]
4764         out_port = p[TCP].sport
4765
4766         # SYN + ACK packet out->in
4767         p = (
4768             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4769             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4770             / TCP(sport=ext_port, dport=out_port, flags="SA")
4771         )
4772         self.send_and_expect(self.pg1, p, self.pg0)
4773
4774         # ACK packet in->out -- not received by the server
4775         p = (
4776             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4777             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4778             / TCP(sport=in_port, dport=ext_port, flags="A")
4779         )
4780         self.send_and_expect(self.pg0, p, self.pg1)
4781
4782         # PUSH + ACK packet in->out
4783         p = (
4784             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4785             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4786             / TCP(sport=in_port, dport=ext_port, flags="PA")
4787             / Raw(payload)
4788         )
4789         self.send_and_expect(self.pg0, p, self.pg1)
4790
4791         # resent SYN + ACK packet out->in
4792         p = (
4793             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4794             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4795             / TCP(sport=ext_port, dport=out_port, flags="SA")
4796         )
4797         self.send_and_expect(self.pg1, p, self.pg0)
4798
4799         # resent ACK packet in->out
4800         p = (
4801             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4802             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4803             / TCP(sport=in_port, dport=ext_port, flags="A")
4804         )
4805         self.send_and_expect(self.pg0, p, self.pg1)
4806
4807         # resent PUSH + ACK packet in->out
4808         p = (
4809             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4810             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4811             / TCP(sport=in_port, dport=ext_port, flags="PA")
4812             / Raw(payload)
4813         )
4814         self.send_and_expect(self.pg0, p, self.pg1)
4815
4816         # ACK packet out->in
4817         p = (
4818             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4819             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4820             / TCP(sport=ext_port, dport=out_port, flags="A")
4821         )
4822         self.send_and_expect(self.pg1, p, self.pg0)
4823
4824         # Verify that the data can be transmitted after the transitory time
4825         self.virtual_sleep(6)
4826
4827         p = (
4828             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4829             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4830             / TCP(sport=in_port, dport=ext_port, flags="PA")
4831             / Raw(payload)
4832         )
4833         self.send_and_expect(self.pg0, p, self.pg1)
4834
4835     def test_dynamic_ports_exhausted(self):
4836         """NAT44ED dynamic translation test: address ports exhaused"""
4837
4838         sessions_per_batch = 128
4839         n_available_ports = 65536 - 1024
4840         n_sessions = n_available_ports + 2 * sessions_per_batch
4841
4842         # set high enough session limit for ports to be exhausted
4843         self.plugin_disable()
4844         self.plugin_enable(max_sessions=n_sessions)
4845
4846         self.nat_add_inside_interface(self.pg0)
4847         self.nat_add_outside_interface(self.pg1)
4848
4849         # set timeouts to high for sessions to reallistically expire
4850         config = self.vapi.nat44_show_running_config()
4851         old_timeouts = config.timeouts
4852         self.vapi.nat_set_timeouts(
4853             udp=21600,
4854             tcp_established=old_timeouts.tcp_established,
4855             tcp_transitory=old_timeouts.tcp_transitory,
4856             icmp=old_timeouts.icmp,
4857         )
4858
4859         # in2out after NAT addresses added
4860         self.nat_add_address(self.nat_addr)
4861
4862         for i in range(n_sessions // sessions_per_batch):
4863             pkts = self.create_udp_stream(
4864                 self.pg0,
4865                 self.pg1,
4866                 sessions_per_batch,
4867                 base_port=i * sessions_per_batch + 100,
4868             )
4869
4870             self.pg0.add_stream(pkts)
4871             self.pg_start()
4872
4873             err = self.statistics.get_err_counter(
4874                 "/err/nat44-ed-in2out-slowpath/out of ports"
4875             )
4876             if err > sessions_per_batch:
4877                 break
4878
4879         # Check for ports to be used no more than once
4880         ports = set()
4881         sessions = self.vapi.cli("show nat44 sessions")
4882         rx = re.compile(
4883             f" *o2i flow: match: saddr {self.pg1.remote_ip4} sport [0-9]+ daddr {self.nat_addr} dport ([0-9]+) proto UDP.*"
4884         )
4885         for line in sessions.splitlines():
4886             m = rx.match(line)
4887             if m:
4888                 port = int(m.groups()[0])
4889                 self.assertNotIn(port, ports)
4890                 ports.add(port)
4891
4892         self.assertGreaterEqual(err, sessions_per_batch)
4893
4894
4895 if __name__ == "__main__":
4896     unittest.main(testRunner=VppTestRunner)