tests docs: update python3 venv packages
[vpp.git] / test / test_nat44_ed.py
1 #!/usr/bin/env python3
2
3 import unittest
4 from io import BytesIO
5 from random import randint, choice
6
7 import re
8 import scapy.compat
9 from framework import tag_fixme_ubuntu2204, is_distro_ubuntu2204
10 from framework import VppTestCase, VppTestRunner, VppLoInterface
11 from scapy.data import IP_PROTOS
12 from scapy.layers.inet import IP, TCP, UDP, ICMP, GRE
13 from scapy.layers.inet import IPerror, TCPerror
14 from scapy.layers.l2 import Ether
15 from scapy.packet import Raw
16 from syslog_rfc5424_parser import SyslogMessage, ParseError
17 from syslog_rfc5424_parser.constants import SyslogSeverity
18 from util import ppp, pr, ip4_range
19 from vpp_acl import AclRule, VppAcl, VppAclInterface
20 from vpp_ip_route import VppIpRoute, VppRoutePath
21 from vpp_papi import VppEnum
22 from util import StatsDiff
23
24
25 class TestNAT44ED(VppTestCase):
26     """NAT44ED Test Case"""
27
28     nat_addr = "10.0.10.3"
29
30     tcp_port_in = 6303
31     tcp_port_out = 6303
32
33     udp_port_in = 6304
34     udp_port_out = 6304
35
36     icmp_id_in = 6305
37     icmp_id_out = 6305
38
39     tcp_external_port = 80
40
41     max_sessions = 100
42
43     def setUp(self):
44         super().setUp()
45         self.plugin_enable()
46
47     def tearDown(self):
48         super().tearDown()
49         if not self.vpp_dead:
50             self.plugin_disable()
51
52     def plugin_enable(self, max_sessions=None):
53         max_sessions = max_sessions or self.max_sessions
54         self.vapi.nat44_ed_plugin_enable_disable(sessions=max_sessions, enable=1)
55
56     def plugin_disable(self):
57         self.vapi.nat44_ed_plugin_enable_disable(enable=0)
58
59     @property
60     def config_flags(self):
61         return VppEnum.vl_api_nat_config_flags_t
62
63     @property
64     def nat44_config_flags(self):
65         return VppEnum.vl_api_nat44_config_flags_t
66
67     @property
68     def syslog_severity(self):
69         return VppEnum.vl_api_syslog_severity_t
70
71     @property
72     def server_addr(self):
73         return self.pg1.remote_hosts[0].ip4
74
75     @staticmethod
76     def random_port():
77         return randint(1024, 65535)
78
79     @staticmethod
80     def proto2layer(proto):
81         if proto == IP_PROTOS.tcp:
82             return TCP
83         elif proto == IP_PROTOS.udp:
84             return UDP
85         elif proto == IP_PROTOS.icmp:
86             return ICMP
87         else:
88             raise Exception("Unsupported protocol")
89
90     @classmethod
91     def create_and_add_ip4_table(cls, i, table_id=0):
92         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": table_id})
93         i.set_table_ip4(table_id)
94
95     @classmethod
96     def configure_ip4_interface(cls, i, hosts=0, table_id=None):
97         if table_id:
98             cls.create_and_add_ip4_table(i, table_id)
99
100         i.admin_up()
101         i.config_ip4()
102         i.resolve_arp()
103
104         if hosts:
105             i.generate_remote_hosts(hosts)
106             i.configure_ipv4_neighbors()
107
108     @classmethod
109     def nat_add_interface_address(cls, i):
110         cls.vapi.nat44_add_del_interface_addr(sw_if_index=i.sw_if_index, is_add=1)
111
112     def nat_add_inside_interface(self, i):
113         self.vapi.nat44_interface_add_del_feature(
114             flags=self.config_flags.NAT_IS_INSIDE, sw_if_index=i.sw_if_index, is_add=1
115         )
116
117     def nat_add_outside_interface(self, i):
118         self.vapi.nat44_interface_add_del_feature(
119             flags=self.config_flags.NAT_IS_OUTSIDE, sw_if_index=i.sw_if_index, is_add=1
120         )
121
122     def nat_add_address(self, address, twice_nat=0, vrf_id=0xFFFFFFFF, is_add=1):
123         flags = self.config_flags.NAT_IS_TWICE_NAT if twice_nat else 0
124         self.vapi.nat44_add_del_address_range(
125             first_ip_address=address,
126             last_ip_address=address,
127             vrf_id=vrf_id,
128             is_add=is_add,
129             flags=flags,
130         )
131
132     def nat_add_static_mapping(
133         self,
134         local_ip,
135         external_ip="0.0.0.0",
136         local_port=0,
137         external_port=0,
138         vrf_id=0,
139         is_add=1,
140         external_sw_if_index=0xFFFFFFFF,
141         proto=0,
142         tag="",
143         flags=0,
144     ):
145         if not (local_port and external_port):
146             flags |= self.config_flags.NAT_IS_ADDR_ONLY
147
148         self.vapi.nat44_add_del_static_mapping(
149             is_add=is_add,
150             local_ip_address=local_ip,
151             external_ip_address=external_ip,
152             external_sw_if_index=external_sw_if_index,
153             local_port=local_port,
154             external_port=external_port,
155             vrf_id=vrf_id,
156             protocol=proto,
157             flags=flags,
158             tag=tag,
159         )
160
161     @classmethod
162     def setUpClass(cls):
163         super().setUpClass()
164         if is_distro_ubuntu2204 == True and not hasattr(cls, "vpp"):
165             return
166
167         cls.create_pg_interfaces(range(12))
168         cls.interfaces = list(cls.pg_interfaces[:4])
169
170         cls.create_and_add_ip4_table(cls.pg2, 10)
171
172         for i in cls.interfaces:
173             cls.configure_ip4_interface(i, hosts=3)
174
175         # test specific (test-multiple-vrf)
176         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 1})
177
178         # test specific (test-one-armed-nat44-static)
179         cls.pg4.generate_remote_hosts(2)
180         cls.pg4.config_ip4()
181         cls.vapi.sw_interface_add_del_address(
182             sw_if_index=cls.pg4.sw_if_index, prefix="10.0.0.1/24"
183         )
184         cls.pg4.admin_up()
185         cls.pg4.resolve_arp()
186         cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
187         cls.pg4.resolve_arp()
188
189         # test specific interface (pg5)
190         cls.pg5._local_ip4 = "10.1.1.1"
191         cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
192         cls.pg5.set_table_ip4(1)
193         cls.pg5.config_ip4()
194         cls.pg5.admin_up()
195         cls.pg5.resolve_arp()
196
197         # test specific interface (pg6)
198         cls.pg6._local_ip4 = "10.1.2.1"
199         cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
200         cls.pg6.set_table_ip4(1)
201         cls.pg6.config_ip4()
202         cls.pg6.admin_up()
203         cls.pg6.resolve_arp()
204
205         rl = list()
206
207         rl.append(
208             VppIpRoute(
209                 cls,
210                 "0.0.0.0",
211                 0,
212                 [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=0)],
213                 register=False,
214                 table_id=1,
215             )
216         )
217         rl.append(
218             VppIpRoute(
219                 cls,
220                 "0.0.0.0",
221                 0,
222                 [VppRoutePath(cls.pg1.local_ip4, cls.pg1.sw_if_index)],
223                 register=False,
224             )
225         )
226         rl.append(
227             VppIpRoute(
228                 cls,
229                 cls.pg5.remote_ip4,
230                 32,
231                 [VppRoutePath("0.0.0.0", cls.pg5.sw_if_index)],
232                 register=False,
233                 table_id=1,
234             )
235         )
236         rl.append(
237             VppIpRoute(
238                 cls,
239                 cls.pg6.remote_ip4,
240                 32,
241                 [VppRoutePath("0.0.0.0", cls.pg6.sw_if_index)],
242                 register=False,
243                 table_id=1,
244             )
245         )
246         rl.append(
247             VppIpRoute(
248                 cls,
249                 cls.pg6.remote_ip4,
250                 16,
251                 [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=1)],
252                 register=False,
253                 table_id=0,
254             )
255         )
256
257         for r in rl:
258             r.add_vpp_config()
259
260         cls.no_diff = StatsDiff(
261             {
262                 pg.sw_if_index: {
263                     "/nat44-ed/in2out/fastpath/tcp": 0,
264                     "/nat44-ed/in2out/fastpath/udp": 0,
265                     "/nat44-ed/in2out/fastpath/icmp": 0,
266                     "/nat44-ed/in2out/fastpath/drops": 0,
267                     "/nat44-ed/in2out/slowpath/tcp": 0,
268                     "/nat44-ed/in2out/slowpath/udp": 0,
269                     "/nat44-ed/in2out/slowpath/icmp": 0,
270                     "/nat44-ed/in2out/slowpath/drops": 0,
271                     "/nat44-ed/in2out/fastpath/tcp": 0,
272                     "/nat44-ed/in2out/fastpath/udp": 0,
273                     "/nat44-ed/in2out/fastpath/icmp": 0,
274                     "/nat44-ed/in2out/fastpath/drops": 0,
275                     "/nat44-ed/in2out/slowpath/tcp": 0,
276                     "/nat44-ed/in2out/slowpath/udp": 0,
277                     "/nat44-ed/in2out/slowpath/icmp": 0,
278                     "/nat44-ed/in2out/slowpath/drops": 0,
279                 }
280                 for pg in cls.pg_interfaces
281             }
282         )
283
284     def get_err_counter(self, path):
285         return self.statistics.get_err_counter(path)
286
287     def reass_hairpinning(
288         self,
289         server_addr,
290         server_in_port,
291         server_out_port,
292         host_in_port,
293         proto=IP_PROTOS.tcp,
294         ignore_port=False,
295     ):
296         layer = self.proto2layer(proto)
297
298         if proto == IP_PROTOS.tcp:
299             data = b"A" * 4 + b"B" * 16 + b"C" * 3
300         else:
301             data = b"A" * 16 + b"B" * 16 + b"C" * 3
302
303         # send packet from host to server
304         pkts = self.create_stream_frag(
305             self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto
306         )
307         self.pg0.add_stream(pkts)
308         self.pg_enable_capture(self.pg_interfaces)
309         self.pg_start()
310         frags = self.pg0.get_capture(len(pkts))
311         p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr)
312         if proto != IP_PROTOS.icmp:
313             if not ignore_port:
314                 self.assertNotEqual(p[layer].sport, host_in_port)
315             self.assertEqual(p[layer].dport, server_in_port)
316         else:
317             if not ignore_port:
318                 self.assertNotEqual(p[layer].id, host_in_port)
319         self.assertEqual(data, p[Raw].load)
320
321     def frag_out_of_order(
322         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
323     ):
324         layer = self.proto2layer(proto)
325
326         if proto == IP_PROTOS.tcp:
327             data = b"A" * 4 + b"B" * 16 + b"C" * 3
328         else:
329             data = b"A" * 16 + b"B" * 16 + b"C" * 3
330         self.port_in = self.random_port()
331
332         for i in range(2):
333             # in2out
334             pkts = self.create_stream_frag(
335                 self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
336             )
337             pkts.reverse()
338             self.pg0.add_stream(pkts)
339             self.pg_enable_capture(self.pg_interfaces)
340             self.pg_start()
341             frags = self.pg1.get_capture(len(pkts))
342             if not dont_translate:
343                 p = self.reass_frags_and_verify(
344                     frags, self.nat_addr, self.pg1.remote_ip4
345                 )
346             else:
347                 p = self.reass_frags_and_verify(
348                     frags, self.pg0.remote_ip4, self.pg1.remote_ip4
349                 )
350             if proto != IP_PROTOS.icmp:
351                 if not dont_translate:
352                     self.assertEqual(p[layer].dport, 20)
353                     if not ignore_port:
354                         self.assertNotEqual(p[layer].sport, self.port_in)
355                 else:
356                     self.assertEqual(p[layer].sport, self.port_in)
357             else:
358                 if not ignore_port:
359                     if not dont_translate:
360                         self.assertNotEqual(p[layer].id, self.port_in)
361                     else:
362                         self.assertEqual(p[layer].id, self.port_in)
363             self.assertEqual(data, p[Raw].load)
364
365             # out2in
366             if not dont_translate:
367                 dst_addr = self.nat_addr
368             else:
369                 dst_addr = self.pg0.remote_ip4
370             if proto != IP_PROTOS.icmp:
371                 sport = 20
372                 dport = p[layer].sport
373             else:
374                 sport = p[layer].id
375                 dport = 0
376             pkts = self.create_stream_frag(
377                 self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
378             )
379             pkts.reverse()
380             self.pg1.add_stream(pkts)
381             self.pg_enable_capture(self.pg_interfaces)
382             self.logger.info(self.vapi.cli("show trace"))
383             self.pg_start()
384             frags = self.pg0.get_capture(len(pkts))
385             p = self.reass_frags_and_verify(
386                 frags, self.pg1.remote_ip4, self.pg0.remote_ip4
387             )
388             if proto != IP_PROTOS.icmp:
389                 self.assertEqual(p[layer].sport, 20)
390                 self.assertEqual(p[layer].dport, self.port_in)
391             else:
392                 self.assertEqual(p[layer].id, self.port_in)
393             self.assertEqual(data, p[Raw].load)
394
395     def reass_frags_and_verify(self, frags, src, dst):
396         buffer = BytesIO()
397         for p in frags:
398             self.assertEqual(p[IP].src, src)
399             self.assertEqual(p[IP].dst, dst)
400             self.assert_ip_checksum_valid(p)
401             buffer.seek(p[IP].frag * 8)
402             buffer.write(bytes(p[IP].payload))
403         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
404         if ip.proto == IP_PROTOS.tcp:
405             p = ip / TCP(buffer.getvalue())
406             self.logger.debug(ppp("Reassembled:", p))
407             self.assert_tcp_checksum_valid(p)
408         elif ip.proto == IP_PROTOS.udp:
409             p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
410         elif ip.proto == IP_PROTOS.icmp:
411             p = ip / ICMP(buffer.getvalue())
412         return p
413
414     def frag_in_order(
415         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
416     ):
417         layer = self.proto2layer(proto)
418
419         if proto == IP_PROTOS.tcp:
420             data = b"A" * 4 + b"B" * 16 + b"C" * 3
421         else:
422             data = b"A" * 16 + b"B" * 16 + b"C" * 3
423         self.port_in = self.random_port()
424
425         # in2out
426         pkts = self.create_stream_frag(
427             self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
428         )
429         self.pg0.add_stream(pkts)
430         self.pg_enable_capture(self.pg_interfaces)
431         self.pg_start()
432         frags = self.pg1.get_capture(len(pkts))
433         if not dont_translate:
434             p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
435         else:
436             p = self.reass_frags_and_verify(
437                 frags, self.pg0.remote_ip4, self.pg1.remote_ip4
438             )
439         if proto != IP_PROTOS.icmp:
440             if not dont_translate:
441                 self.assertEqual(p[layer].dport, 20)
442                 if not ignore_port:
443                     self.assertNotEqual(p[layer].sport, self.port_in)
444             else:
445                 self.assertEqual(p[layer].sport, self.port_in)
446         else:
447             if not ignore_port:
448                 if not dont_translate:
449                     self.assertNotEqual(p[layer].id, self.port_in)
450                 else:
451                     self.assertEqual(p[layer].id, self.port_in)
452         self.assertEqual(data, p[Raw].load)
453
454         # out2in
455         if not dont_translate:
456             dst_addr = self.nat_addr
457         else:
458             dst_addr = self.pg0.remote_ip4
459         if proto != IP_PROTOS.icmp:
460             sport = 20
461             dport = p[layer].sport
462         else:
463             sport = p[layer].id
464             dport = 0
465         pkts = self.create_stream_frag(
466             self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
467         )
468         self.pg1.add_stream(pkts)
469         self.pg_enable_capture(self.pg_interfaces)
470         self.pg_start()
471         frags = self.pg0.get_capture(len(pkts))
472         p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
473         if proto != IP_PROTOS.icmp:
474             self.assertEqual(p[layer].sport, 20)
475             self.assertEqual(p[layer].dport, self.port_in)
476         else:
477             self.assertEqual(p[layer].id, self.port_in)
478         self.assertEqual(data, p[Raw].load)
479
480     def verify_capture_out(
481         self, capture, nat_ip=None, same_port=False, dst_ip=None, ignore_port=False
482     ):
483         if nat_ip is None:
484             nat_ip = self.nat_addr
485         for packet in capture:
486             try:
487                 self.assert_packet_checksums_valid(packet)
488                 self.assertEqual(packet[IP].src, nat_ip)
489                 if dst_ip is not None:
490                     self.assertEqual(packet[IP].dst, dst_ip)
491                 if packet.haslayer(TCP):
492                     if not ignore_port:
493                         if same_port:
494                             self.assertEqual(packet[TCP].sport, self.tcp_port_in)
495                         else:
496                             self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
497                     self.tcp_port_out = packet[TCP].sport
498                     self.assert_packet_checksums_valid(packet)
499                 elif packet.haslayer(UDP):
500                     if not ignore_port:
501                         if same_port:
502                             self.assertEqual(packet[UDP].sport, self.udp_port_in)
503                         else:
504                             self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
505                     self.udp_port_out = packet[UDP].sport
506                 else:
507                     if not ignore_port:
508                         if same_port:
509                             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
510                         else:
511                             self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
512                     self.icmp_id_out = packet[ICMP].id
513                     self.assert_packet_checksums_valid(packet)
514             except:
515                 self.logger.error(
516                     ppp("Unexpected or invalid packet (outside network):", packet)
517                 )
518                 raise
519
520     def verify_capture_in(self, capture, in_if):
521         for packet in capture:
522             try:
523                 self.assert_packet_checksums_valid(packet)
524                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
525                 if packet.haslayer(TCP):
526                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
527                 elif packet.haslayer(UDP):
528                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
529                 else:
530                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
531             except:
532                 self.logger.error(
533                     ppp("Unexpected or invalid packet (inside network):", packet)
534                 )
535                 raise
536
537     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
538         if dst_ip is None:
539             dst_ip = out_if.remote_ip4
540
541         pkts = []
542         # TCP
543         p = (
544             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
545             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
546             / TCP(sport=self.tcp_port_in, dport=20)
547         )
548         pkts.extend([p, p])
549
550         # UDP
551         p = (
552             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
553             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
554             / UDP(sport=self.udp_port_in, dport=20)
555         )
556         pkts.append(p)
557
558         # ICMP
559         p = (
560             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
561             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
562             / ICMP(id=self.icmp_id_in, type="echo-request")
563         )
564         pkts.append(p)
565
566         return pkts
567
568     def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
569         if dst_ip is None:
570             dst_ip = self.nat_addr
571         if not use_inside_ports:
572             tcp_port = self.tcp_port_out
573             udp_port = self.udp_port_out
574             icmp_id = self.icmp_id_out
575         else:
576             tcp_port = self.tcp_port_in
577             udp_port = self.udp_port_in
578             icmp_id = self.icmp_id_in
579         pkts = []
580         # TCP
581         p = (
582             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
583             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
584             / TCP(dport=tcp_port, sport=20)
585         )
586         pkts.extend([p, p])
587
588         # UDP
589         p = (
590             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
591             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
592             / UDP(dport=udp_port, sport=20)
593         )
594         pkts.append(p)
595
596         # ICMP
597         p = (
598             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
599             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
600             / ICMP(id=icmp_id, type="echo-reply")
601         )
602         pkts.append(p)
603
604         return pkts
605
606     def create_tcp_stream(self, in_if, out_if, count):
607         pkts = []
608         port = 6303
609
610         for i in range(count):
611             p = (
612                 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
613                 / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64)
614                 / TCP(sport=port + i, dport=20)
615             )
616             pkts.append(p)
617
618         return pkts
619
620     def create_udp_stream(self, in_if, out_if, count, base_port=6303):
621         return [
622             (
623                 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
624                 / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64)
625                 / UDP(sport=base_port + i, dport=20)
626             )
627             for i in range(count)
628         ]
629
630     def create_stream_frag(
631         self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
632     ):
633         if proto == IP_PROTOS.tcp:
634             p = (
635                 IP(src=src_if.remote_ip4, dst=dst)
636                 / TCP(sport=sport, dport=dport)
637                 / Raw(data)
638             )
639             p = p.__class__(scapy.compat.raw(p))
640             chksum = p[TCP].chksum
641             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
642         elif proto == IP_PROTOS.udp:
643             proto_header = UDP(sport=sport, dport=dport)
644         elif proto == IP_PROTOS.icmp:
645             if not echo_reply:
646                 proto_header = ICMP(id=sport, type="echo-request")
647             else:
648                 proto_header = ICMP(id=sport, type="echo-reply")
649         else:
650             raise Exception("Unsupported protocol")
651         id = self.random_port()
652         pkts = []
653         if proto == IP_PROTOS.tcp:
654             raw = Raw(data[0:4])
655         else:
656             raw = Raw(data[0:16])
657         p = (
658             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
659             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
660             / proto_header
661             / raw
662         )
663         pkts.append(p)
664         if proto == IP_PROTOS.tcp:
665             raw = Raw(data[4:20])
666         else:
667             raw = Raw(data[16:32])
668         p = (
669             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
670             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
671             / raw
672         )
673         pkts.append(p)
674         if proto == IP_PROTOS.tcp:
675             raw = Raw(data[20:])
676         else:
677             raw = Raw(data[32:])
678         p = (
679             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
680             / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
681             / raw
682         )
683         pkts.append(p)
684         return pkts
685
686     def frag_in_order_in_plus_out(
687         self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
688     ):
689         layer = self.proto2layer(proto)
690
691         if proto == IP_PROTOS.tcp:
692             data = b"A" * 4 + b"B" * 16 + b"C" * 3
693         else:
694             data = b"A" * 16 + b"B" * 16 + b"C" * 3
695         port_in = self.random_port()
696
697         for i in range(2):
698             # out2in
699             pkts = self.create_stream_frag(
700                 self.pg0, out_addr, port_in, out_port, data, proto
701             )
702             self.pg0.add_stream(pkts)
703             self.pg_enable_capture(self.pg_interfaces)
704             self.pg_start()
705             frags = self.pg1.get_capture(len(pkts))
706             p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr)
707             if proto != IP_PROTOS.icmp:
708                 self.assertEqual(p[layer].sport, port_in)
709                 self.assertEqual(p[layer].dport, in_port)
710             else:
711                 self.assertEqual(p[layer].id, port_in)
712             self.assertEqual(data, p[Raw].load)
713
714             # in2out
715             if proto != IP_PROTOS.icmp:
716                 pkts = self.create_stream_frag(
717                     self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto
718                 )
719             else:
720                 pkts = self.create_stream_frag(
721                     self.pg1,
722                     self.pg0.remote_ip4,
723                     p[layer].id,
724                     0,
725                     data,
726                     proto,
727                     echo_reply=True,
728                 )
729             self.pg1.add_stream(pkts)
730             self.pg_enable_capture(self.pg_interfaces)
731             self.pg_start()
732             frags = self.pg0.get_capture(len(pkts))
733             p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4)
734             if proto != IP_PROTOS.icmp:
735                 self.assertEqual(p[layer].sport, out_port)
736                 self.assertEqual(p[layer].dport, port_in)
737             else:
738                 self.assertEqual(p[layer].id, port_in)
739             self.assertEqual(data, p[Raw].load)
740
741     def frag_out_of_order_in_plus_out(
742         self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
743     ):
744         layer = self.proto2layer(proto)
745
746         if proto == IP_PROTOS.tcp:
747             data = b"A" * 4 + b"B" * 16 + b"C" * 3
748         else:
749             data = b"A" * 16 + b"B" * 16 + b"C" * 3
750         port_in = self.random_port()
751
752         for i in range(2):
753             # out2in
754             pkts = self.create_stream_frag(
755                 self.pg0, out_addr, port_in, out_port, data, proto
756             )
757             pkts.reverse()
758             self.pg0.add_stream(pkts)
759             self.pg_enable_capture(self.pg_interfaces)
760             self.pg_start()
761             frags = self.pg1.get_capture(len(pkts))
762             p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr)
763             if proto != IP_PROTOS.icmp:
764                 self.assertEqual(p[layer].dport, in_port)
765                 self.assertEqual(p[layer].sport, port_in)
766                 self.assertEqual(p[layer].dport, in_port)
767             else:
768                 self.assertEqual(p[layer].id, port_in)
769             self.assertEqual(data, p[Raw].load)
770
771             # in2out
772             if proto != IP_PROTOS.icmp:
773                 pkts = self.create_stream_frag(
774                     self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto
775                 )
776             else:
777                 pkts = self.create_stream_frag(
778                     self.pg1,
779                     self.pg0.remote_ip4,
780                     p[layer].id,
781                     0,
782                     data,
783                     proto,
784                     echo_reply=True,
785                 )
786             pkts.reverse()
787             self.pg1.add_stream(pkts)
788             self.pg_enable_capture(self.pg_interfaces)
789             self.pg_start()
790             frags = self.pg0.get_capture(len(pkts))
791             p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4)
792             if proto != IP_PROTOS.icmp:
793                 self.assertEqual(p[layer].sport, out_port)
794                 self.assertEqual(p[layer].dport, port_in)
795             else:
796                 self.assertEqual(p[layer].id, port_in)
797             self.assertEqual(data, p[Raw].load)
798
799     def init_tcp_session(self, in_if, out_if, in_port, ext_port):
800         # SYN packet in->out
801         p = (
802             Ether(src=in_if.remote_mac, dst=in_if.local_mac)
803             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
804             / TCP(sport=in_port, dport=ext_port, flags="S")
805         )
806         in_if.add_stream(p)
807         self.pg_enable_capture(self.pg_interfaces)
808         self.pg_start()
809         capture = out_if.get_capture(1)
810         p = capture[0]
811         out_port = p[TCP].sport
812
813         # SYN + ACK packet out->in
814         p = (
815             Ether(src=out_if.remote_mac, dst=out_if.local_mac)
816             / IP(src=out_if.remote_ip4, dst=self.nat_addr)
817             / TCP(sport=ext_port, dport=out_port, flags="SA")
818         )
819         out_if.add_stream(p)
820         self.pg_enable_capture(self.pg_interfaces)
821         self.pg_start()
822         in_if.get_capture(1)
823
824         # ACK packet in->out
825         p = (
826             Ether(src=in_if.remote_mac, dst=in_if.local_mac)
827             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
828             / TCP(sport=in_port, dport=ext_port, flags="A")
829         )
830         in_if.add_stream(p)
831         self.pg_enable_capture(self.pg_interfaces)
832         self.pg_start()
833         out_if.get_capture(1)
834
835         return out_port
836
837     def twice_nat_common(
838         self, self_twice_nat=False, same_pg=False, lb=False, client_id=None
839     ):
840         twice_nat_addr = "10.0.1.3"
841
842         port_in = 8080
843         if lb:
844             if not same_pg:
845                 port_in1 = port_in
846                 port_in2 = port_in
847             else:
848                 port_in1 = port_in + 1
849                 port_in2 = port_in + 2
850
851         port_out = 80
852         eh_port_out = 4567
853
854         server1 = self.pg0.remote_hosts[0]
855         server2 = self.pg0.remote_hosts[1]
856         if lb and same_pg:
857             server2 = server1
858         if not lb:
859             server = server1
860
861         pg0 = self.pg0
862         if same_pg:
863             pg1 = self.pg0
864         else:
865             pg1 = self.pg1
866
867         eh_translate = (not self_twice_nat) or (not lb and same_pg) or client_id == 1
868
869         self.nat_add_address(self.nat_addr)
870         self.nat_add_address(twice_nat_addr, twice_nat=1)
871
872         flags = 0
873         if self_twice_nat:
874             flags |= self.config_flags.NAT_IS_SELF_TWICE_NAT
875         else:
876             flags |= self.config_flags.NAT_IS_TWICE_NAT
877
878         if not lb:
879             self.nat_add_static_mapping(
880                 pg0.remote_ip4,
881                 self.nat_addr,
882                 port_in,
883                 port_out,
884                 proto=IP_PROTOS.tcp,
885                 flags=flags,
886             )
887         else:
888             locals = [
889                 {"addr": server1.ip4, "port": port_in1, "probability": 50, "vrf_id": 0},
890                 {"addr": server2.ip4, "port": port_in2, "probability": 50, "vrf_id": 0},
891             ]
892             out_addr = self.nat_addr
893
894             self.vapi.nat44_add_del_lb_static_mapping(
895                 is_add=1,
896                 flags=flags,
897                 external_addr=out_addr,
898                 external_port=port_out,
899                 protocol=IP_PROTOS.tcp,
900                 local_num=len(locals),
901                 locals=locals,
902             )
903         self.nat_add_inside_interface(pg0)
904         self.nat_add_outside_interface(pg1)
905
906         if same_pg:
907             if not lb:
908                 client = server
909             else:
910                 assert client_id is not None
911                 if client_id == 1:
912                     client = self.pg0.remote_hosts[0]
913                 elif client_id == 2:
914                     client = self.pg0.remote_hosts[1]
915         else:
916             client = pg1.remote_hosts[0]
917         p = (
918             Ether(src=pg1.remote_mac, dst=pg1.local_mac)
919             / IP(src=client.ip4, dst=self.nat_addr)
920             / TCP(sport=eh_port_out, dport=port_out)
921         )
922         pg1.add_stream(p)
923         self.pg_enable_capture(self.pg_interfaces)
924         self.pg_start()
925         capture = pg0.get_capture(1)
926         p = capture[0]
927         try:
928             ip = p[IP]
929             tcp = p[TCP]
930             if lb:
931                 if ip.dst == server1.ip4:
932                     server = server1
933                     port_in = port_in1
934                 else:
935                     server = server2
936                     port_in = port_in2
937             self.assertEqual(ip.dst, server.ip4)
938             if lb and same_pg:
939                 self.assertIn(tcp.dport, [port_in1, port_in2])
940             else:
941                 self.assertEqual(tcp.dport, port_in)
942             if eh_translate:
943                 self.assertEqual(ip.src, twice_nat_addr)
944                 self.assertNotEqual(tcp.sport, eh_port_out)
945             else:
946                 self.assertEqual(ip.src, client.ip4)
947                 self.assertEqual(tcp.sport, eh_port_out)
948             eh_addr_in = ip.src
949             eh_port_in = tcp.sport
950             saved_port_in = tcp.dport
951             self.assert_packet_checksums_valid(p)
952         except:
953             self.logger.error(ppp("Unexpected or invalid packet:", p))
954             raise
955
956         p = (
957             Ether(src=server.mac, dst=pg0.local_mac)
958             / IP(src=server.ip4, dst=eh_addr_in)
959             / TCP(sport=saved_port_in, dport=eh_port_in)
960         )
961         pg0.add_stream(p)
962         self.pg_enable_capture(self.pg_interfaces)
963         self.pg_start()
964         capture = pg1.get_capture(1)
965         p = capture[0]
966         try:
967             ip = p[IP]
968             tcp = p[TCP]
969             self.assertEqual(ip.dst, client.ip4)
970             self.assertEqual(ip.src, self.nat_addr)
971             self.assertEqual(tcp.dport, eh_port_out)
972             self.assertEqual(tcp.sport, port_out)
973             self.assert_packet_checksums_valid(p)
974         except:
975             self.logger.error(ppp("Unexpected or invalid packet:", p))
976             raise
977
978         if eh_translate:
979             sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
980             self.assertEqual(len(sessions), 1)
981             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
982             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_TWICE_NAT)
983             self.logger.info(self.vapi.cli("show nat44 sessions"))
984             self.vapi.nat44_del_session(
985                 address=sessions[0].inside_ip_address,
986                 port=sessions[0].inside_port,
987                 protocol=sessions[0].protocol,
988                 flags=(
989                     self.config_flags.NAT_IS_INSIDE
990                     | self.config_flags.NAT_IS_EXT_HOST_VALID
991                 ),
992                 ext_host_address=sessions[0].ext_host_nat_address,
993                 ext_host_port=sessions[0].ext_host_nat_port,
994             )
995             sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
996             self.assertEqual(len(sessions), 0)
997
998     def verify_syslog_sess(self, data, msgid, is_ip6=False):
999         message = data.decode("utf-8")
1000         try:
1001             message = SyslogMessage.parse(message)
1002         except ParseError as e:
1003             self.logger.error(e)
1004             raise
1005         else:
1006             self.assertEqual(message.severity, SyslogSeverity.info)
1007             self.assertEqual(message.appname, "NAT")
1008             self.assertEqual(message.msgid, msgid)
1009             sd_params = message.sd.get("nsess")
1010             self.assertTrue(sd_params is not None)
1011             if is_ip6:
1012                 self.assertEqual(sd_params.get("IATYP"), "IPv6")
1013                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip6)
1014             else:
1015                 self.assertEqual(sd_params.get("IATYP"), "IPv4")
1016                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
1017                 self.assertTrue(sd_params.get("SSUBIX") is not None)
1018             self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
1019             self.assertEqual(sd_params.get("XATYP"), "IPv4")
1020             self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
1021             self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
1022             self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
1023             self.assertEqual(sd_params.get("SVLAN"), "0")
1024             self.assertEqual(sd_params.get("XDADDR"), self.pg1.remote_ip4)
1025             self.assertEqual(sd_params.get("XDPORT"), "%d" % self.tcp_external_port)
1026
1027     def test_icmp_error(self):
1028         """NAT44ED test ICMP error message with inner header"""
1029
1030         payload = "H" * 10
1031
1032         self.nat_add_address(self.nat_addr)
1033         self.nat_add_inside_interface(self.pg0)
1034         self.nat_add_outside_interface(self.pg1)
1035
1036         # in2out (initiate connection)
1037         p1 = [
1038             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1039             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1040             / UDP(sport=21, dport=20)
1041             / payload,
1042             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1043             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1044             / TCP(sport=21, dport=20, flags="S")
1045             / payload,
1046             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1047             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1048             / ICMP(type="echo-request", id=7777)
1049             / payload,
1050         ]
1051
1052         capture = self.send_and_expect(self.pg0, p1, self.pg1)
1053
1054         # out2in (send error message)
1055         p2 = [
1056             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1057             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1058             / ICMP(type="dest-unreach", code="port-unreachable")
1059             / c[IP:]
1060             for c in capture
1061         ]
1062
1063         capture = self.send_and_expect(self.pg1, p2, self.pg0)
1064
1065         for c in capture:
1066             try:
1067                 assert c[IP].dst == self.pg0.remote_ip4
1068                 assert c[IPerror].src == self.pg0.remote_ip4
1069             except AssertionError as a:
1070                 raise AssertionError(f"Packet {pr(c)} not translated properly") from a
1071
1072     def test_icmp_echo_reply_trailer(self):
1073         """ICMP echo reply with ethernet trailer"""
1074
1075         self.nat_add_address(self.nat_addr)
1076         self.nat_add_inside_interface(self.pg0)
1077         self.nat_add_outside_interface(self.pg1)
1078
1079         # in2out
1080         p1 = (
1081             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1082             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1083             / ICMP(type=8, id=0xABCD, seq=0)
1084         )
1085
1086         self.pg0.add_stream(p1)
1087         self.pg_enable_capture(self.pg_interfaces)
1088         self.pg_start()
1089         c = self.pg1.get_capture(1)[0]
1090
1091         self.logger.debug(self.vapi.cli("show trace"))
1092
1093         # out2in
1094         p2 = (
1095             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1096             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, id=0xEE59)
1097             / ICMP(type=0, id=c[ICMP].id, seq=0)
1098         )
1099
1100         # force checksum calculation
1101         p2 = p2.__class__(bytes(p2))
1102
1103         self.logger.debug(ppp("Packet before modification:", p2))
1104
1105         # hex representation of vss monitoring ethernet trailer
1106         # this seems to be just added to end of packet without modifying
1107         # IP or ICMP lengths / checksums
1108         p2 = p2 / Raw("\x00\x00\x52\x54\x00\x46\xab\x04\x84\x18")
1109         # change it so that IP/ICMP is unaffected
1110         p2[IP].len = 28
1111
1112         self.logger.debug(ppp("Packet with added trailer:", p2))
1113
1114         self.pg1.add_stream(p2)
1115         self.pg_enable_capture(self.pg_interfaces)
1116         self.pg_start()
1117
1118         self.pg0.get_capture(1)
1119
1120     def test_users_dump(self):
1121         """NAT44ED API test - nat44_user_dump"""
1122
1123         self.nat_add_address(self.nat_addr)
1124         self.nat_add_inside_interface(self.pg0)
1125         self.nat_add_outside_interface(self.pg1)
1126
1127         self.vapi.nat44_forwarding_enable_disable(enable=1)
1128
1129         local_ip = self.pg0.remote_ip4
1130         external_ip = self.nat_addr
1131         self.nat_add_static_mapping(local_ip, external_ip)
1132
1133         users = self.vapi.nat44_user_dump()
1134         self.assertEqual(len(users), 0)
1135
1136         # in2out - static mapping match
1137
1138         pkts = self.create_stream_out(self.pg1)
1139         self.pg1.add_stream(pkts)
1140         self.pg_enable_capture(self.pg_interfaces)
1141         self.pg_start()
1142         capture = self.pg0.get_capture(len(pkts))
1143         self.verify_capture_in(capture, self.pg0)
1144
1145         pkts = self.create_stream_in(self.pg0, self.pg1)
1146         self.pg0.add_stream(pkts)
1147         self.pg_enable_capture(self.pg_interfaces)
1148         self.pg_start()
1149         capture = self.pg1.get_capture(len(pkts))
1150         self.verify_capture_out(capture, same_port=True)
1151
1152         users = self.vapi.nat44_user_dump()
1153         self.assertEqual(len(users), 1)
1154         static_user = users[0]
1155         self.assertEqual(static_user.nstaticsessions, 3)
1156         self.assertEqual(static_user.nsessions, 0)
1157
1158         # in2out - no static mapping match (forwarding test)
1159
1160         host0 = self.pg0.remote_hosts[0]
1161         self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1162         try:
1163             pkts = self.create_stream_out(
1164                 self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1165             )
1166             self.pg1.add_stream(pkts)
1167             self.pg_enable_capture(self.pg_interfaces)
1168             self.pg_start()
1169             capture = self.pg0.get_capture(len(pkts))
1170             self.verify_capture_in(capture, self.pg0)
1171
1172             pkts = self.create_stream_in(self.pg0, self.pg1)
1173             self.pg0.add_stream(pkts)
1174             self.pg_enable_capture(self.pg_interfaces)
1175             self.pg_start()
1176             capture = self.pg1.get_capture(len(pkts))
1177             self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, same_port=True)
1178         finally:
1179             self.pg0.remote_hosts[0] = host0
1180
1181         users = self.vapi.nat44_user_dump()
1182         self.assertEqual(len(users), 2)
1183         if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4:
1184             non_static_user = users[1]
1185             static_user = users[0]
1186         else:
1187             non_static_user = users[0]
1188             static_user = users[1]
1189         self.assertEqual(static_user.nstaticsessions, 3)
1190         self.assertEqual(static_user.nsessions, 0)
1191         self.assertEqual(non_static_user.nstaticsessions, 0)
1192         self.assertEqual(non_static_user.nsessions, 3)
1193
1194         users = self.vapi.nat44_user_dump()
1195         self.assertEqual(len(users), 2)
1196         if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4:
1197             non_static_user = users[1]
1198             static_user = users[0]
1199         else:
1200             non_static_user = users[0]
1201             static_user = users[1]
1202         self.assertEqual(static_user.nstaticsessions, 3)
1203         self.assertEqual(static_user.nsessions, 0)
1204         self.assertEqual(non_static_user.nstaticsessions, 0)
1205         self.assertEqual(non_static_user.nsessions, 3)
1206
1207     def test_frag_out_of_order_do_not_translate(self):
1208         """NAT44ED don't translate fragments arriving out of order"""
1209         self.nat_add_inside_interface(self.pg0)
1210         self.nat_add_outside_interface(self.pg1)
1211         self.vapi.nat44_forwarding_enable_disable(enable=True)
1212         self.frag_out_of_order(proto=IP_PROTOS.tcp, dont_translate=True)
1213
1214     def test_forwarding(self):
1215         """NAT44ED forwarding test"""
1216
1217         self.nat_add_inside_interface(self.pg0)
1218         self.nat_add_outside_interface(self.pg1)
1219         self.vapi.nat44_forwarding_enable_disable(enable=1)
1220
1221         real_ip = self.pg0.remote_ip4
1222         alias_ip = self.nat_addr
1223         flags = self.config_flags.NAT_IS_ADDR_ONLY
1224         self.vapi.nat44_add_del_static_mapping(
1225             is_add=1,
1226             local_ip_address=real_ip,
1227             external_ip_address=alias_ip,
1228             external_sw_if_index=0xFFFFFFFF,
1229             flags=flags,
1230         )
1231
1232         try:
1233             # in2out - static mapping match
1234
1235             pkts = self.create_stream_out(self.pg1)
1236             self.pg1.add_stream(pkts)
1237             self.pg_enable_capture(self.pg_interfaces)
1238             self.pg_start()
1239             capture = self.pg0.get_capture(len(pkts))
1240             self.verify_capture_in(capture, self.pg0)
1241
1242             pkts = self.create_stream_in(self.pg0, self.pg1)
1243             self.pg0.add_stream(pkts)
1244             self.pg_enable_capture(self.pg_interfaces)
1245             self.pg_start()
1246             capture = self.pg1.get_capture(len(pkts))
1247             self.verify_capture_out(capture, same_port=True)
1248
1249             # in2out - no static mapping match
1250
1251             host0 = self.pg0.remote_hosts[0]
1252             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1253             try:
1254                 pkts = self.create_stream_out(
1255                     self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1256                 )
1257                 self.pg1.add_stream(pkts)
1258                 self.pg_enable_capture(self.pg_interfaces)
1259                 self.pg_start()
1260                 capture = self.pg0.get_capture(len(pkts))
1261                 self.verify_capture_in(capture, self.pg0)
1262
1263                 pkts = self.create_stream_in(self.pg0, self.pg1)
1264                 self.pg0.add_stream(pkts)
1265                 self.pg_enable_capture(self.pg_interfaces)
1266                 self.pg_start()
1267                 capture = self.pg1.get_capture(len(pkts))
1268                 self.verify_capture_out(
1269                     capture, nat_ip=self.pg0.remote_ip4, same_port=True
1270                 )
1271             finally:
1272                 self.pg0.remote_hosts[0] = host0
1273
1274             user = self.pg0.remote_hosts[1]
1275             sessions = self.vapi.nat44_user_session_dump(user.ip4, 0)
1276             self.assertEqual(len(sessions), 3)
1277             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1278             self.vapi.nat44_del_session(
1279                 address=sessions[0].inside_ip_address,
1280                 port=sessions[0].inside_port,
1281                 protocol=sessions[0].protocol,
1282                 flags=(
1283                     self.config_flags.NAT_IS_INSIDE
1284                     | self.config_flags.NAT_IS_EXT_HOST_VALID
1285                 ),
1286                 ext_host_address=sessions[0].ext_host_address,
1287                 ext_host_port=sessions[0].ext_host_port,
1288             )
1289             sessions = self.vapi.nat44_user_session_dump(user.ip4, 0)
1290             self.assertEqual(len(sessions), 2)
1291
1292         finally:
1293             self.vapi.nat44_forwarding_enable_disable(enable=0)
1294             flags = self.config_flags.NAT_IS_ADDR_ONLY
1295             self.vapi.nat44_add_del_static_mapping(
1296                 is_add=0,
1297                 local_ip_address=real_ip,
1298                 external_ip_address=alias_ip,
1299                 external_sw_if_index=0xFFFFFFFF,
1300                 flags=flags,
1301             )
1302
1303     def test_output_feature_and_service2(self):
1304         """NAT44ED interface output feature and service host direct access"""
1305         self.vapi.nat44_forwarding_enable_disable(enable=1)
1306         self.nat_add_address(self.nat_addr)
1307
1308         self.vapi.nat44_ed_add_del_output_interface(
1309             sw_if_index=self.pg1.sw_if_index, is_add=1
1310         )
1311
1312         # session initiated from service host - translate
1313         pkts = self.create_stream_in(self.pg0, self.pg1)
1314         self.pg0.add_stream(pkts)
1315         self.pg_enable_capture(self.pg_interfaces)
1316         self.pg_start()
1317         capture = self.pg1.get_capture(len(pkts))
1318         self.verify_capture_out(capture, ignore_port=True)
1319
1320         pkts = self.create_stream_out(self.pg1)
1321         self.pg1.add_stream(pkts)
1322         self.pg_enable_capture(self.pg_interfaces)
1323         self.pg_start()
1324         capture = self.pg0.get_capture(len(pkts))
1325         self.verify_capture_in(capture, self.pg0)
1326
1327         # session initiated from remote host - do not translate
1328         tcp_port_in = self.tcp_port_in
1329         udp_port_in = self.udp_port_in
1330         icmp_id_in = self.icmp_id_in
1331
1332         self.tcp_port_in = 60303
1333         self.udp_port_in = 60304
1334         self.icmp_id_in = 60305
1335
1336         try:
1337             pkts = self.create_stream_out(
1338                 self.pg1, self.pg0.remote_ip4, use_inside_ports=True
1339             )
1340             self.pg1.add_stream(pkts)
1341             self.pg_enable_capture(self.pg_interfaces)
1342             self.pg_start()
1343             capture = self.pg0.get_capture(len(pkts))
1344             self.verify_capture_in(capture, self.pg0)
1345
1346             pkts = self.create_stream_in(self.pg0, self.pg1)
1347             self.pg0.add_stream(pkts)
1348             self.pg_enable_capture(self.pg_interfaces)
1349             self.pg_start()
1350             capture = self.pg1.get_capture(len(pkts))
1351             self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, same_port=True)
1352         finally:
1353             self.tcp_port_in = tcp_port_in
1354             self.udp_port_in = udp_port_in
1355             self.icmp_id_in = icmp_id_in
1356
1357     def test_twice_nat(self):
1358         """NAT44ED Twice NAT"""
1359         self.twice_nat_common()
1360
1361     def test_self_twice_nat_positive(self):
1362         """NAT44ED Self Twice NAT (positive test)"""
1363         self.twice_nat_common(self_twice_nat=True, same_pg=True)
1364
1365     def test_self_twice_nat_lb_positive(self):
1366         """NAT44ED Self Twice NAT local service load balancing (positive test)"""
1367         self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, client_id=1)
1368
1369     def test_twice_nat_lb(self):
1370         """NAT44ED Twice NAT local service load balancing"""
1371         self.twice_nat_common(lb=True)
1372
1373     def test_output_feature(self):
1374         """NAT44ED interface output feature (in2out postrouting)"""
1375         self.vapi.nat44_forwarding_enable_disable(enable=1)
1376         self.nat_add_address(self.nat_addr)
1377
1378         self.nat_add_outside_interface(self.pg0)
1379         self.vapi.nat44_ed_add_del_output_interface(
1380             sw_if_index=self.pg1.sw_if_index, is_add=1
1381         )
1382
1383         # in2out
1384         pkts = self.create_stream_in(self.pg0, self.pg1)
1385         self.pg0.add_stream(pkts)
1386         self.pg_enable_capture(self.pg_interfaces)
1387         self.pg_start()
1388         capture = self.pg1.get_capture(len(pkts))
1389         self.verify_capture_out(capture, ignore_port=True)
1390         self.logger.debug(self.vapi.cli("show trace"))
1391
1392         # out2in
1393         pkts = self.create_stream_out(self.pg1)
1394         self.pg1.add_stream(pkts)
1395         self.pg_enable_capture(self.pg_interfaces)
1396         self.pg_start()
1397         capture = self.pg0.get_capture(len(pkts))
1398         self.verify_capture_in(capture, self.pg0)
1399         self.logger.debug(self.vapi.cli("show trace"))
1400
1401         # in2out
1402         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1403         self.pg0.add_stream(pkts)
1404         self.pg_enable_capture(self.pg_interfaces)
1405         self.pg_start()
1406         capture = self.pg1.get_capture(len(pkts))
1407         self.verify_capture_out(capture, ignore_port=True)
1408         self.logger.debug(self.vapi.cli("show trace"))
1409
1410         # out2in
1411         pkts = self.create_stream_out(self.pg1, ttl=2)
1412         self.pg1.add_stream(pkts)
1413         self.pg_enable_capture(self.pg_interfaces)
1414         self.pg_start()
1415         capture = self.pg0.get_capture(len(pkts))
1416         self.verify_capture_in(capture, self.pg0)
1417         self.logger.debug(self.vapi.cli("show trace"))
1418
1419         # in2out
1420         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1421         capture = self.send_and_expect_some(self.pg0, pkts, self.pg0)
1422         for p in capture:
1423             self.assertIn(ICMP, p)
1424             self.assertEqual(p[ICMP].type, 11)  # 11 == time-exceeded
1425
1426     def test_static_with_port_out2(self):
1427         """NAT44ED 1:1 NAPT asymmetrical rule"""
1428
1429         external_port = 80
1430         local_port = 8080
1431
1432         self.vapi.nat44_forwarding_enable_disable(enable=1)
1433         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1434         self.nat_add_static_mapping(
1435             self.pg0.remote_ip4,
1436             self.nat_addr,
1437             local_port,
1438             external_port,
1439             proto=IP_PROTOS.tcp,
1440             flags=flags,
1441         )
1442
1443         self.nat_add_inside_interface(self.pg0)
1444         self.nat_add_outside_interface(self.pg1)
1445
1446         # from client to service
1447         p = (
1448             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1449             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1450             / TCP(sport=12345, dport=external_port)
1451         )
1452         self.pg1.add_stream(p)
1453         self.pg_enable_capture(self.pg_interfaces)
1454         self.pg_start()
1455         capture = self.pg0.get_capture(1)
1456         p = capture[0]
1457         try:
1458             ip = p[IP]
1459             tcp = p[TCP]
1460             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1461             self.assertEqual(tcp.dport, local_port)
1462             self.assert_packet_checksums_valid(p)
1463         except:
1464             self.logger.error(ppp("Unexpected or invalid packet:", p))
1465             raise
1466
1467         # ICMP error
1468         p = (
1469             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1470             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1471             / ICMP(type=11)
1472             / capture[0][IP]
1473         )
1474         self.pg0.add_stream(p)
1475         self.pg_enable_capture(self.pg_interfaces)
1476         self.pg_start()
1477         capture = self.pg1.get_capture(1)
1478         p = capture[0]
1479         try:
1480             self.assertEqual(p[IP].src, self.nat_addr)
1481             inner = p[IPerror]
1482             self.assertEqual(inner.dst, self.nat_addr)
1483             self.assertEqual(inner[TCPerror].dport, external_port)
1484         except:
1485             self.logger.error(ppp("Unexpected or invalid packet:", p))
1486             raise
1487
1488         # from service back to client
1489         p = (
1490             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1491             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1492             / TCP(sport=local_port, dport=12345)
1493         )
1494         self.pg0.add_stream(p)
1495         self.pg_enable_capture(self.pg_interfaces)
1496         self.pg_start()
1497         capture = self.pg1.get_capture(1)
1498         p = capture[0]
1499         try:
1500             ip = p[IP]
1501             tcp = p[TCP]
1502             self.assertEqual(ip.src, self.nat_addr)
1503             self.assertEqual(tcp.sport, external_port)
1504             self.assert_packet_checksums_valid(p)
1505         except:
1506             self.logger.error(ppp("Unexpected or invalid packet:", p))
1507             raise
1508
1509         # ICMP error
1510         p = (
1511             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1512             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1513             / ICMP(type=11)
1514             / capture[0][IP]
1515         )
1516         self.pg1.add_stream(p)
1517         self.pg_enable_capture(self.pg_interfaces)
1518         self.pg_start()
1519         capture = self.pg0.get_capture(1)
1520         p = capture[0]
1521         try:
1522             self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1523             inner = p[IPerror]
1524             self.assertEqual(inner.src, self.pg0.remote_ip4)
1525             self.assertEqual(inner[TCPerror].sport, local_port)
1526         except:
1527             self.logger.error(ppp("Unexpected or invalid packet:", p))
1528             raise
1529
1530         # from client to server (no translation)
1531         p = (
1532             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1533             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
1534             / TCP(sport=12346, dport=local_port)
1535         )
1536         self.pg1.add_stream(p)
1537         self.pg_enable_capture(self.pg_interfaces)
1538         self.pg_start()
1539         capture = self.pg0.get_capture(1)
1540         p = capture[0]
1541         try:
1542             ip = p[IP]
1543             tcp = p[TCP]
1544             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1545             self.assertEqual(tcp.dport, local_port)
1546             self.assert_packet_checksums_valid(p)
1547         except:
1548             self.logger.error(ppp("Unexpected or invalid packet:", p))
1549             raise
1550
1551         # from service back to client (no translation)
1552         p = (
1553             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1554             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1555             / TCP(sport=local_port, dport=12346)
1556         )
1557         self.pg0.add_stream(p)
1558         self.pg_enable_capture(self.pg_interfaces)
1559         self.pg_start()
1560         capture = self.pg1.get_capture(1)
1561         p = capture[0]
1562         try:
1563             ip = p[IP]
1564             tcp = p[TCP]
1565             self.assertEqual(ip.src, self.pg0.remote_ip4)
1566             self.assertEqual(tcp.sport, local_port)
1567             self.assert_packet_checksums_valid(p)
1568         except:
1569             self.logger.error(ppp("Unexpected or invalid packet:", p))
1570             raise
1571
1572     def test_static_lb(self):
1573         """NAT44ED local service load balancing"""
1574         external_addr_n = self.nat_addr
1575         external_port = 80
1576         local_port = 8080
1577         server1 = self.pg0.remote_hosts[0]
1578         server2 = self.pg0.remote_hosts[1]
1579
1580         locals = [
1581             {"addr": server1.ip4, "port": local_port, "probability": 70, "vrf_id": 0},
1582             {"addr": server2.ip4, "port": local_port, "probability": 30, "vrf_id": 0},
1583         ]
1584
1585         self.nat_add_address(self.nat_addr)
1586         self.vapi.nat44_add_del_lb_static_mapping(
1587             is_add=1,
1588             external_addr=external_addr_n,
1589             external_port=external_port,
1590             protocol=IP_PROTOS.tcp,
1591             local_num=len(locals),
1592             locals=locals,
1593         )
1594         flags = self.config_flags.NAT_IS_INSIDE
1595         self.vapi.nat44_interface_add_del_feature(
1596             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1597         )
1598         self.vapi.nat44_interface_add_del_feature(
1599             sw_if_index=self.pg1.sw_if_index, is_add=1
1600         )
1601
1602         # from client to service
1603         p = (
1604             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1605             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1606             / TCP(sport=12345, dport=external_port)
1607         )
1608         self.pg1.add_stream(p)
1609         self.pg_enable_capture(self.pg_interfaces)
1610         self.pg_start()
1611         capture = self.pg0.get_capture(1)
1612         p = capture[0]
1613         server = None
1614         try:
1615             ip = p[IP]
1616             tcp = p[TCP]
1617             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1618             if ip.dst == server1.ip4:
1619                 server = server1
1620             else:
1621                 server = server2
1622             self.assertEqual(tcp.dport, local_port)
1623             self.assert_packet_checksums_valid(p)
1624         except:
1625             self.logger.error(ppp("Unexpected or invalid packet:", p))
1626             raise
1627
1628         # from service back to client
1629         p = (
1630             Ether(src=server.mac, dst=self.pg0.local_mac)
1631             / IP(src=server.ip4, dst=self.pg1.remote_ip4)
1632             / TCP(sport=local_port, dport=12345)
1633         )
1634         self.pg0.add_stream(p)
1635         self.pg_enable_capture(self.pg_interfaces)
1636         self.pg_start()
1637         capture = self.pg1.get_capture(1)
1638         p = capture[0]
1639         try:
1640             ip = p[IP]
1641             tcp = p[TCP]
1642             self.assertEqual(ip.src, self.nat_addr)
1643             self.assertEqual(tcp.sport, external_port)
1644             self.assert_packet_checksums_valid(p)
1645         except:
1646             self.logger.error(ppp("Unexpected or invalid packet:", p))
1647             raise
1648
1649         sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
1650         self.assertEqual(len(sessions), 1)
1651         self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1652         self.vapi.nat44_del_session(
1653             address=sessions[0].inside_ip_address,
1654             port=sessions[0].inside_port,
1655             protocol=sessions[0].protocol,
1656             flags=(
1657                 self.config_flags.NAT_IS_INSIDE
1658                 | self.config_flags.NAT_IS_EXT_HOST_VALID
1659             ),
1660             ext_host_address=sessions[0].ext_host_address,
1661             ext_host_port=sessions[0].ext_host_port,
1662         )
1663         sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
1664         self.assertEqual(len(sessions), 0)
1665
1666     def test_static_lb_2(self):
1667         """NAT44ED local service load balancing (asymmetrical rule)"""
1668         external_addr = self.nat_addr
1669         external_port = 80
1670         local_port = 8080
1671         server1 = self.pg0.remote_hosts[0]
1672         server2 = self.pg0.remote_hosts[1]
1673
1674         locals = [
1675             {"addr": server1.ip4, "port": local_port, "probability": 70, "vrf_id": 0},
1676             {"addr": server2.ip4, "port": local_port, "probability": 30, "vrf_id": 0},
1677         ]
1678
1679         self.vapi.nat44_forwarding_enable_disable(enable=1)
1680         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1681         self.vapi.nat44_add_del_lb_static_mapping(
1682             is_add=1,
1683             flags=flags,
1684             external_addr=external_addr,
1685             external_port=external_port,
1686             protocol=IP_PROTOS.tcp,
1687             local_num=len(locals),
1688             locals=locals,
1689         )
1690         flags = self.config_flags.NAT_IS_INSIDE
1691         self.vapi.nat44_interface_add_del_feature(
1692             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1693         )
1694         self.vapi.nat44_interface_add_del_feature(
1695             sw_if_index=self.pg1.sw_if_index, is_add=1
1696         )
1697
1698         # from client to service
1699         p = (
1700             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1701             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1702             / TCP(sport=12345, dport=external_port)
1703         )
1704         self.pg1.add_stream(p)
1705         self.pg_enable_capture(self.pg_interfaces)
1706         self.pg_start()
1707         capture = self.pg0.get_capture(1)
1708         p = capture[0]
1709         server = None
1710         try:
1711             ip = p[IP]
1712             tcp = p[TCP]
1713             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1714             if ip.dst == server1.ip4:
1715                 server = server1
1716             else:
1717                 server = server2
1718             self.assertEqual(tcp.dport, local_port)
1719             self.assert_packet_checksums_valid(p)
1720         except:
1721             self.logger.error(ppp("Unexpected or invalid packet:", p))
1722             raise
1723
1724         # from service back to client
1725         p = (
1726             Ether(src=server.mac, dst=self.pg0.local_mac)
1727             / IP(src=server.ip4, dst=self.pg1.remote_ip4)
1728             / TCP(sport=local_port, dport=12345)
1729         )
1730         self.pg0.add_stream(p)
1731         self.pg_enable_capture(self.pg_interfaces)
1732         self.pg_start()
1733         capture = self.pg1.get_capture(1)
1734         p = capture[0]
1735         try:
1736             ip = p[IP]
1737             tcp = p[TCP]
1738             self.assertEqual(ip.src, self.nat_addr)
1739             self.assertEqual(tcp.sport, external_port)
1740             self.assert_packet_checksums_valid(p)
1741         except:
1742             self.logger.error(ppp("Unexpected or invalid packet:", p))
1743             raise
1744
1745         # from client to server (no translation)
1746         p = (
1747             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1748             / IP(src=self.pg1.remote_ip4, dst=server1.ip4)
1749             / TCP(sport=12346, dport=local_port)
1750         )
1751         self.pg1.add_stream(p)
1752         self.pg_enable_capture(self.pg_interfaces)
1753         self.pg_start()
1754         capture = self.pg0.get_capture(1)
1755         p = capture[0]
1756         server = None
1757         try:
1758             ip = p[IP]
1759             tcp = p[TCP]
1760             self.assertEqual(ip.dst, server1.ip4)
1761             self.assertEqual(tcp.dport, local_port)
1762             self.assert_packet_checksums_valid(p)
1763         except:
1764             self.logger.error(ppp("Unexpected or invalid packet:", p))
1765             raise
1766
1767         # from service back to client (no translation)
1768         p = (
1769             Ether(src=server1.mac, dst=self.pg0.local_mac)
1770             / IP(src=server1.ip4, dst=self.pg1.remote_ip4)
1771             / TCP(sport=local_port, dport=12346)
1772         )
1773         self.pg0.add_stream(p)
1774         self.pg_enable_capture(self.pg_interfaces)
1775         self.pg_start()
1776         capture = self.pg1.get_capture(1)
1777         p = capture[0]
1778         try:
1779             ip = p[IP]
1780             tcp = p[TCP]
1781             self.assertEqual(ip.src, server1.ip4)
1782             self.assertEqual(tcp.sport, local_port)
1783             self.assert_packet_checksums_valid(p)
1784         except:
1785             self.logger.error(ppp("Unexpected or invalid packet:", p))
1786             raise
1787
1788     def test_lb_affinity(self):
1789         """NAT44ED local service load balancing affinity"""
1790         external_addr = self.nat_addr
1791         external_port = 80
1792         local_port = 8080
1793         server1 = self.pg0.remote_hosts[0]
1794         server2 = self.pg0.remote_hosts[1]
1795
1796         locals = [
1797             {"addr": server1.ip4, "port": local_port, "probability": 50, "vrf_id": 0},
1798             {"addr": server2.ip4, "port": local_port, "probability": 50, "vrf_id": 0},
1799         ]
1800
1801         self.nat_add_address(self.nat_addr)
1802         self.vapi.nat44_add_del_lb_static_mapping(
1803             is_add=1,
1804             external_addr=external_addr,
1805             external_port=external_port,
1806             protocol=IP_PROTOS.tcp,
1807             affinity=10800,
1808             local_num=len(locals),
1809             locals=locals,
1810         )
1811         flags = self.config_flags.NAT_IS_INSIDE
1812         self.vapi.nat44_interface_add_del_feature(
1813             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1814         )
1815         self.vapi.nat44_interface_add_del_feature(
1816             sw_if_index=self.pg1.sw_if_index, is_add=1
1817         )
1818
1819         p = (
1820             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1821             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1822             / TCP(sport=1025, dport=external_port)
1823         )
1824         self.pg1.add_stream(p)
1825         self.pg_enable_capture(self.pg_interfaces)
1826         self.pg_start()
1827         capture = self.pg0.get_capture(1)
1828         backend = capture[0][IP].dst
1829
1830         sessions = self.vapi.nat44_user_session_dump(backend, 0)
1831         self.assertEqual(len(sessions), 1)
1832         self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1833         self.vapi.nat44_del_session(
1834             address=sessions[0].inside_ip_address,
1835             port=sessions[0].inside_port,
1836             protocol=sessions[0].protocol,
1837             flags=(
1838                 self.config_flags.NAT_IS_INSIDE
1839                 | self.config_flags.NAT_IS_EXT_HOST_VALID
1840             ),
1841             ext_host_address=sessions[0].ext_host_address,
1842             ext_host_port=sessions[0].ext_host_port,
1843         )
1844
1845         pkts = []
1846         for port in range(1030, 1100):
1847             p = (
1848                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1849                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1850                 / TCP(sport=port, dport=external_port)
1851             )
1852             pkts.append(p)
1853         self.pg1.add_stream(pkts)
1854         self.pg_enable_capture(self.pg_interfaces)
1855         self.pg_start()
1856         capture = self.pg0.get_capture(len(pkts))
1857         for p in capture:
1858             self.assertEqual(p[IP].dst, backend)
1859
1860     def test_multiple_vrf_1(self):
1861         """Multiple VRF - both client & service in VRF1"""
1862
1863         external_addr = "1.2.3.4"
1864         external_port = 80
1865         local_port = 8080
1866         port = 0
1867
1868         flags = self.config_flags.NAT_IS_INSIDE
1869         self.vapi.nat44_interface_add_del_feature(
1870             sw_if_index=self.pg5.sw_if_index, is_add=1
1871         )
1872         self.vapi.nat44_interface_add_del_feature(
1873             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
1874         )
1875         self.vapi.nat44_interface_add_del_feature(
1876             sw_if_index=self.pg6.sw_if_index, is_add=1
1877         )
1878         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1879         self.nat_add_static_mapping(
1880             self.pg5.remote_ip4,
1881             external_addr,
1882             local_port,
1883             external_port,
1884             vrf_id=1,
1885             proto=IP_PROTOS.tcp,
1886             flags=flags,
1887         )
1888
1889         p = (
1890             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
1891             / IP(src=self.pg6.remote_ip4, dst=external_addr)
1892             / TCP(sport=12345, dport=external_port)
1893         )
1894         self.pg6.add_stream(p)
1895         self.pg_enable_capture(self.pg_interfaces)
1896         self.pg_start()
1897         capture = self.pg5.get_capture(1)
1898         p = capture[0]
1899         try:
1900             ip = p[IP]
1901             tcp = p[TCP]
1902             self.assertEqual(ip.dst, self.pg5.remote_ip4)
1903             self.assertEqual(tcp.dport, local_port)
1904             self.assert_packet_checksums_valid(p)
1905         except:
1906             self.logger.error(ppp("Unexpected or invalid packet:", p))
1907             raise
1908
1909         p = (
1910             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
1911             / IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4)
1912             / TCP(sport=local_port, dport=12345)
1913         )
1914         self.pg5.add_stream(p)
1915         self.pg_enable_capture(self.pg_interfaces)
1916         self.pg_start()
1917         capture = self.pg6.get_capture(1)
1918         p = capture[0]
1919         try:
1920             ip = p[IP]
1921             tcp = p[TCP]
1922             self.assertEqual(ip.src, external_addr)
1923             self.assertEqual(tcp.sport, external_port)
1924             self.assert_packet_checksums_valid(p)
1925         except:
1926             self.logger.error(ppp("Unexpected or invalid packet:", p))
1927             raise
1928
1929     def test_multiple_vrf_2(self):
1930         """Multiple VRF - dynamic NAT from VRF1 to VRF0 (output-feature)"""
1931
1932         external_addr = "1.2.3.4"
1933         external_port = 80
1934         local_port = 8080
1935         port = 0
1936
1937         self.nat_add_address(self.nat_addr)
1938         flags = self.config_flags.NAT_IS_INSIDE
1939         self.vapi.nat44_ed_add_del_output_interface(
1940             sw_if_index=self.pg1.sw_if_index, is_add=1
1941         )
1942         self.vapi.nat44_interface_add_del_feature(
1943             sw_if_index=self.pg5.sw_if_index, is_add=1
1944         )
1945         self.vapi.nat44_interface_add_del_feature(
1946             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
1947         )
1948         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1949         self.nat_add_static_mapping(
1950             self.pg5.remote_ip4,
1951             external_addr,
1952             local_port,
1953             external_port,
1954             vrf_id=1,
1955             proto=IP_PROTOS.tcp,
1956             flags=flags,
1957         )
1958
1959         p = (
1960             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
1961             / IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4)
1962             / TCP(sport=2345, dport=22)
1963         )
1964         self.pg5.add_stream(p)
1965         self.pg_enable_capture(self.pg_interfaces)
1966         self.pg_start()
1967         capture = self.pg1.get_capture(1)
1968         p = capture[0]
1969         try:
1970             ip = p[IP]
1971             tcp = p[TCP]
1972             self.assertEqual(ip.src, self.nat_addr)
1973             self.assert_packet_checksums_valid(p)
1974             port = tcp.sport
1975         except:
1976             self.logger.error(ppp("Unexpected or invalid packet:", p))
1977             raise
1978
1979         p = (
1980             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1981             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1982             / TCP(sport=22, dport=port)
1983         )
1984         self.pg1.add_stream(p)
1985         self.pg_enable_capture(self.pg_interfaces)
1986         self.pg_start()
1987         capture = self.pg5.get_capture(1)
1988         p = capture[0]
1989         try:
1990             ip = p[IP]
1991             tcp = p[TCP]
1992             self.assertEqual(ip.dst, self.pg5.remote_ip4)
1993             self.assertEqual(tcp.dport, 2345)
1994             self.assert_packet_checksums_valid(p)
1995         except:
1996             self.logger.error(ppp("Unexpected or invalid packet:", p))
1997             raise
1998
1999     def test_multiple_vrf_3(self):
2000         """Multiple VRF - client in VRF1, service in VRF0"""
2001
2002         external_addr = "1.2.3.4"
2003         external_port = 80
2004         local_port = 8080
2005         port = 0
2006
2007         flags = self.config_flags.NAT_IS_INSIDE
2008         self.vapi.nat44_interface_add_del_feature(
2009             sw_if_index=self.pg0.sw_if_index, is_add=1
2010         )
2011         self.vapi.nat44_interface_add_del_feature(
2012             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2013         )
2014         self.vapi.nat44_interface_add_del_feature(
2015             sw_if_index=self.pg6.sw_if_index, is_add=1
2016         )
2017         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2018         self.nat_add_static_mapping(
2019             self.pg0.remote_ip4,
2020             external_sw_if_index=self.pg0.sw_if_index,
2021             local_port=local_port,
2022             vrf_id=0,
2023             external_port=external_port,
2024             proto=IP_PROTOS.tcp,
2025             flags=flags,
2026         )
2027
2028         # from client VRF1 to service VRF0
2029         p = (
2030             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
2031             / IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4)
2032             / TCP(sport=12346, dport=external_port)
2033         )
2034         self.pg6.add_stream(p)
2035         self.pg_enable_capture(self.pg_interfaces)
2036         self.pg_start()
2037         capture = self.pg0.get_capture(1)
2038         p = capture[0]
2039         try:
2040             ip = p[IP]
2041             tcp = p[TCP]
2042             self.assertEqual(ip.dst, self.pg0.remote_ip4)
2043             self.assertEqual(tcp.dport, local_port)
2044             self.assert_packet_checksums_valid(p)
2045         except:
2046             self.logger.error(ppp("Unexpected or invalid packet:", p))
2047             raise
2048
2049         # from service VRF0 back to client VRF1
2050         p = (
2051             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2052             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2053             / TCP(sport=local_port, dport=12346)
2054         )
2055         self.pg0.add_stream(p)
2056         self.pg_enable_capture(self.pg_interfaces)
2057         self.pg_start()
2058         capture = self.pg6.get_capture(1)
2059         p = capture[0]
2060         try:
2061             ip = p[IP]
2062             tcp = p[TCP]
2063             self.assertEqual(ip.src, self.pg0.local_ip4)
2064             self.assertEqual(tcp.sport, external_port)
2065             self.assert_packet_checksums_valid(p)
2066         except:
2067             self.logger.error(ppp("Unexpected or invalid packet:", p))
2068             raise
2069
2070     def test_multiple_vrf_4(self):
2071         """Multiple VRF - client in VRF0, service in VRF1"""
2072
2073         external_addr = "1.2.3.4"
2074         external_port = 80
2075         local_port = 8080
2076         port = 0
2077
2078         flags = self.config_flags.NAT_IS_INSIDE
2079         self.vapi.nat44_interface_add_del_feature(
2080             sw_if_index=self.pg0.sw_if_index, is_add=1
2081         )
2082         self.vapi.nat44_interface_add_del_feature(
2083             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2084         )
2085         self.vapi.nat44_interface_add_del_feature(
2086             sw_if_index=self.pg5.sw_if_index, is_add=1
2087         )
2088         self.vapi.nat44_interface_add_del_feature(
2089             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
2090         )
2091         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2092         self.nat_add_static_mapping(
2093             self.pg5.remote_ip4,
2094             external_addr,
2095             local_port,
2096             external_port,
2097             vrf_id=1,
2098             proto=IP_PROTOS.tcp,
2099             flags=flags,
2100         )
2101
2102         # from client VRF0 to service VRF1
2103         p = (
2104             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2105             / IP(src=self.pg0.remote_ip4, dst=external_addr)
2106             / TCP(sport=12347, dport=external_port)
2107         )
2108         self.pg0.add_stream(p)
2109         self.pg_enable_capture(self.pg_interfaces)
2110         self.pg_start()
2111         capture = self.pg5.get_capture(1)
2112         p = capture[0]
2113         try:
2114             ip = p[IP]
2115             tcp = p[TCP]
2116             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2117             self.assertEqual(tcp.dport, local_port)
2118             self.assert_packet_checksums_valid(p)
2119         except:
2120             self.logger.error(ppp("Unexpected or invalid packet:", p))
2121             raise
2122
2123         # from service VRF1 back to client VRF0
2124         p = (
2125             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2126             / IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4)
2127             / TCP(sport=local_port, dport=12347)
2128         )
2129         self.pg5.add_stream(p)
2130         self.pg_enable_capture(self.pg_interfaces)
2131         self.pg_start()
2132         capture = self.pg0.get_capture(1)
2133         p = capture[0]
2134         try:
2135             ip = p[IP]
2136             tcp = p[TCP]
2137             self.assertEqual(ip.src, external_addr)
2138             self.assertEqual(tcp.sport, external_port)
2139             self.assert_packet_checksums_valid(p)
2140         except:
2141             self.logger.error(ppp("Unexpected or invalid packet:", p))
2142             raise
2143
2144     def test_multiple_vrf_5(self):
2145         """Multiple VRF - forwarding - no translation"""
2146
2147         external_addr = "1.2.3.4"
2148         external_port = 80
2149         local_port = 8080
2150         port = 0
2151
2152         self.vapi.nat44_forwarding_enable_disable(enable=1)
2153         flags = self.config_flags.NAT_IS_INSIDE
2154         self.vapi.nat44_interface_add_del_feature(
2155             sw_if_index=self.pg0.sw_if_index, is_add=1
2156         )
2157         self.vapi.nat44_interface_add_del_feature(
2158             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2159         )
2160         self.vapi.nat44_interface_add_del_feature(
2161             sw_if_index=self.pg5.sw_if_index, is_add=1
2162         )
2163         self.vapi.nat44_interface_add_del_feature(
2164             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
2165         )
2166         self.vapi.nat44_interface_add_del_feature(
2167             sw_if_index=self.pg6.sw_if_index, is_add=1
2168         )
2169         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2170         self.nat_add_static_mapping(
2171             self.pg5.remote_ip4,
2172             external_addr,
2173             local_port,
2174             external_port,
2175             vrf_id=1,
2176             proto=IP_PROTOS.tcp,
2177             flags=flags,
2178         )
2179         self.nat_add_static_mapping(
2180             self.pg0.remote_ip4,
2181             external_sw_if_index=self.pg0.sw_if_index,
2182             local_port=local_port,
2183             vrf_id=0,
2184             external_port=external_port,
2185             proto=IP_PROTOS.tcp,
2186             flags=flags,
2187         )
2188
2189         # from client to server (both VRF1, no translation)
2190         p = (
2191             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
2192             / IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4)
2193             / TCP(sport=12348, dport=local_port)
2194         )
2195         self.pg6.add_stream(p)
2196         self.pg_enable_capture(self.pg_interfaces)
2197         self.pg_start()
2198         capture = self.pg5.get_capture(1)
2199         p = capture[0]
2200         try:
2201             ip = p[IP]
2202             tcp = p[TCP]
2203             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2204             self.assertEqual(tcp.dport, local_port)
2205             self.assert_packet_checksums_valid(p)
2206         except:
2207             self.logger.error(ppp("Unexpected or invalid packet:", p))
2208             raise
2209
2210         # from server back to client (both VRF1, no translation)
2211         p = (
2212             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2213             / IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4)
2214             / TCP(sport=local_port, dport=12348)
2215         )
2216         self.pg5.add_stream(p)
2217         self.pg_enable_capture(self.pg_interfaces)
2218         self.pg_start()
2219         capture = self.pg6.get_capture(1)
2220         p = capture[0]
2221         try:
2222             ip = p[IP]
2223             tcp = p[TCP]
2224             self.assertEqual(ip.src, self.pg5.remote_ip4)
2225             self.assertEqual(tcp.sport, local_port)
2226             self.assert_packet_checksums_valid(p)
2227         except:
2228             self.logger.error(ppp("Unexpected or invalid packet:", p))
2229             raise
2230
2231         # from client VRF1 to server VRF0 (no translation)
2232         p = (
2233             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2234             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2235             / TCP(sport=local_port, dport=12349)
2236         )
2237         self.pg0.add_stream(p)
2238         self.pg_enable_capture(self.pg_interfaces)
2239         self.pg_start()
2240         capture = self.pg6.get_capture(1)
2241         p = capture[0]
2242         try:
2243             ip = p[IP]
2244             tcp = p[TCP]
2245             self.assertEqual(ip.src, self.pg0.remote_ip4)
2246             self.assertEqual(tcp.sport, local_port)
2247             self.assert_packet_checksums_valid(p)
2248         except:
2249             self.logger.error(ppp("Unexpected or invalid packet:", p))
2250             raise
2251
2252         # from server VRF0 back to client VRF1 (no translation)
2253         p = (
2254             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2255             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2256             / TCP(sport=local_port, dport=12349)
2257         )
2258         self.pg0.add_stream(p)
2259         self.pg_enable_capture(self.pg_interfaces)
2260         self.pg_start()
2261         capture = self.pg6.get_capture(1)
2262         p = capture[0]
2263         try:
2264             ip = p[IP]
2265             tcp = p[TCP]
2266             self.assertEqual(ip.src, self.pg0.remote_ip4)
2267             self.assertEqual(tcp.sport, local_port)
2268             self.assert_packet_checksums_valid(p)
2269         except:
2270             self.logger.error(ppp("Unexpected or invalid packet:", p))
2271             raise
2272
2273         # from client VRF0 to server VRF1 (no translation)
2274         p = (
2275             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2276             / IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4)
2277             / TCP(sport=12344, dport=local_port)
2278         )
2279         self.pg0.add_stream(p)
2280         self.pg_enable_capture(self.pg_interfaces)
2281         self.pg_start()
2282         capture = self.pg5.get_capture(1)
2283         p = capture[0]
2284         try:
2285             ip = p[IP]
2286             tcp = p[TCP]
2287             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2288             self.assertEqual(tcp.dport, local_port)
2289             self.assert_packet_checksums_valid(p)
2290         except:
2291             self.logger.error(ppp("Unexpected or invalid packet:", p))
2292             raise
2293
2294         # from server VRF1 back to client VRF0 (no translation)
2295         p = (
2296             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2297             / IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4)
2298             / TCP(sport=local_port, dport=12344)
2299         )
2300         self.pg5.add_stream(p)
2301         self.pg_enable_capture(self.pg_interfaces)
2302         self.pg_start()
2303         capture = self.pg0.get_capture(1)
2304         p = capture[0]
2305         try:
2306             ip = p[IP]
2307             tcp = p[TCP]
2308             self.assertEqual(ip.src, self.pg5.remote_ip4)
2309             self.assertEqual(tcp.sport, local_port)
2310             self.assert_packet_checksums_valid(p)
2311         except:
2312             self.logger.error(ppp("Unexpected or invalid packet:", p))
2313             raise
2314
2315     def test_outside_address_distribution(self):
2316         """Outside address distribution based on source address"""
2317
2318         x = 100
2319         nat_addresses = []
2320
2321         for i in range(1, x):
2322             a = "10.0.0.%d" % i
2323             nat_addresses.append(a)
2324
2325         self.nat_add_inside_interface(self.pg0)
2326         self.nat_add_outside_interface(self.pg1)
2327
2328         self.vapi.nat44_add_del_address_range(
2329             first_ip_address=nat_addresses[0],
2330             last_ip_address=nat_addresses[-1],
2331             vrf_id=0xFFFFFFFF,
2332             is_add=1,
2333             flags=0,
2334         )
2335
2336         self.pg0.generate_remote_hosts(x)
2337
2338         pkts = []
2339         for i in range(x):
2340             info = self.create_packet_info(self.pg0, self.pg1)
2341             payload = self.info_to_payload(info)
2342             p = (
2343                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2344                 / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4)
2345                 / UDP(sport=7000 + i, dport=8000 + i)
2346                 / Raw(payload)
2347             )
2348             info.data = p
2349             pkts.append(p)
2350
2351         self.pg0.add_stream(pkts)
2352         self.pg_enable_capture(self.pg_interfaces)
2353         self.pg_start()
2354         recvd = self.pg1.get_capture(len(pkts))
2355         for p_recvd in recvd:
2356             payload_info = self.payload_to_info(p_recvd[Raw])
2357             packet_index = payload_info.index
2358             info = self._packet_infos[packet_index]
2359             self.assertTrue(info is not None)
2360             self.assertEqual(packet_index, info.index)
2361             p_sent = info.data
2362             packed = socket.inet_aton(p_sent[IP].src)
2363             numeric = struct.unpack("!L", packed)[0]
2364             numeric = socket.htonl(numeric)
2365             a = nat_addresses[(numeric - 1) % len(nat_addresses)]
2366             self.assertEqual(
2367                 a,
2368                 p_recvd[IP].src,
2369                 "Invalid packet (src IP %s translated to %s, but expected %s)"
2370                 % (p_sent[IP].src, p_recvd[IP].src, a),
2371             )
2372
2373     def test_dynamic_edge_ports(self):
2374         """NAT44ED dynamic translation test: edge ports"""
2375
2376         worker_count = self.vpp_worker_count or 1
2377         port_offset = 1024
2378         port_per_thread = (65536 - port_offset) // worker_count
2379         port_count = port_per_thread * worker_count
2380
2381         # worker thread edge ports
2382         thread_edge_ports = {0, port_offset - 1, 65535}
2383         for i in range(0, worker_count):
2384             port_thread_offset = (port_per_thread * i) + port_offset
2385             for port_range_offset in [0, port_per_thread - 1]:
2386                 port = port_thread_offset + port_range_offset
2387                 thread_edge_ports.add(port)
2388         thread_drop_ports = set(
2389             filter(
2390                 lambda x: x not in range(port_offset, port_offset + port_count),
2391                 thread_edge_ports,
2392             )
2393         )
2394
2395         in_if = self.pg7
2396         out_if = self.pg8
2397
2398         self.nat_add_address(self.nat_addr)
2399
2400         try:
2401             self.configure_ip4_interface(in_if, hosts=worker_count)
2402             self.configure_ip4_interface(out_if)
2403
2404             self.nat_add_inside_interface(in_if)
2405             self.nat_add_outside_interface(out_if)
2406
2407             # in2out
2408             tc1 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2409             uc1 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2410             ic1 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2411             dc1 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2412
2413             pkt_count = worker_count * len(thread_edge_ports)
2414
2415             i2o_pkts = [[] for x in range(0, worker_count)]
2416             for i in range(0, worker_count):
2417                 remote_host = in_if.remote_hosts[i]
2418                 for port in thread_edge_ports:
2419                     p = (
2420                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2421                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2422                         / TCP(sport=port, dport=port)
2423                     )
2424                     i2o_pkts[i].append(p)
2425
2426                     p = (
2427                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2428                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2429                         / UDP(sport=port, dport=port)
2430                     )
2431                     i2o_pkts[i].append(p)
2432
2433                     p = (
2434                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2435                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2436                         / ICMP(id=port, seq=port, type="echo-request")
2437                     )
2438                     i2o_pkts[i].append(p)
2439
2440             for i in range(0, worker_count):
2441                 if len(i2o_pkts[i]) > 0:
2442                     in_if.add_stream(i2o_pkts[i], worker=i)
2443
2444             self.pg_enable_capture(self.pg_interfaces)
2445             self.pg_start()
2446             capture = out_if.get_capture(pkt_count * 3)
2447             for packet in capture:
2448                 self.assert_packet_checksums_valid(packet)
2449                 if packet.haslayer(TCP):
2450                     self.assert_in_range(
2451                         packet[TCP].sport,
2452                         port_offset,
2453                         port_offset + port_count,
2454                         "src TCP port",
2455                     )
2456                 elif packet.haslayer(UDP):
2457                     self.assert_in_range(
2458                         packet[UDP].sport,
2459                         port_offset,
2460                         port_offset + port_count,
2461                         "src UDP port",
2462                     )
2463                 elif packet.haslayer(ICMP):
2464                     self.assert_in_range(
2465                         packet[ICMP].id,
2466                         port_offset,
2467                         port_offset + port_count,
2468                         "ICMP id",
2469                     )
2470                 else:
2471                     self.fail(
2472                         ppp("Unexpected or invalid packet (outside network):", packet)
2473                     )
2474
2475             if_idx = in_if.sw_if_index
2476             tc2 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2477             uc2 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2478             ic2 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2479             dc2 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2480
2481             self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2482             self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2483             self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2484             self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2485
2486             # out2in
2487             tc1 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2488             uc1 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2489             ic1 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2490             dc1 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2491             dc3 = self.statistics["/nat44-ed/out2in/slowpath/drops"]
2492
2493             # replies to unchanged thread ports should pass on each worker,
2494             # excluding packets outside dynamic port range
2495             drop_count = worker_count * len(thread_drop_ports)
2496             pass_count = worker_count * len(thread_edge_ports) - drop_count
2497
2498             o2i_pkts = [[] for x in range(0, worker_count)]
2499             for i in range(0, worker_count):
2500                 for port in thread_edge_ports:
2501                     p = (
2502                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2503                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2504                         / TCP(sport=port, dport=port)
2505                     )
2506                     o2i_pkts[i].append(p)
2507
2508                     p = (
2509                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2510                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2511                         / UDP(sport=port, dport=port)
2512                     )
2513                     o2i_pkts[i].append(p)
2514
2515                     p = (
2516                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2517                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2518                         / ICMP(id=port, seq=port, type="echo-reply")
2519                     )
2520                     o2i_pkts[i].append(p)
2521
2522             for i in range(0, worker_count):
2523                 if len(o2i_pkts[i]) > 0:
2524                     out_if.add_stream(o2i_pkts[i], worker=i)
2525
2526             self.pg_enable_capture(self.pg_interfaces)
2527             self.pg_start()
2528             capture = in_if.get_capture(pass_count * 3)
2529             for packet in capture:
2530                 self.assert_packet_checksums_valid(packet)
2531                 if packet.haslayer(TCP):
2532                     self.assertIn(packet[TCP].dport, thread_edge_ports, "dst TCP port")
2533                     self.assertEqual(packet[TCP].dport, packet[TCP].sport, "TCP ports")
2534                 elif packet.haslayer(UDP):
2535                     self.assertIn(packet[UDP].dport, thread_edge_ports, "dst UDP port")
2536                     self.assertEqual(packet[UDP].dport, packet[UDP].sport, "UDP ports")
2537                 elif packet.haslayer(ICMP):
2538                     self.assertIn(packet[ICMP].id, thread_edge_ports, "ICMP id")
2539                     self.assertEqual(packet[ICMP].id, packet[ICMP].seq, "ICMP id & seq")
2540                 else:
2541                     self.fail(
2542                         ppp("Unexpected or invalid packet (inside network):", packet)
2543                     )
2544
2545             if_idx = out_if.sw_if_index
2546             tc2 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2547             uc2 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2548             ic2 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2549             dc2 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2550             dc4 = self.statistics["/nat44-ed/out2in/slowpath/drops"]
2551
2552             self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pass_count)
2553             self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pass_count)
2554             self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pass_count)
2555             self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2556             self.assertEqual(
2557                 dc4[:, if_idx].sum() - dc3[:, if_idx].sum(), drop_count * 3
2558             )
2559
2560         finally:
2561             in_if.unconfig()
2562             out_if.unconfig()
2563
2564     def test_delete_interface(self):
2565         """NAT44ED delete nat interface"""
2566
2567         self.nat_add_address(self.nat_addr)
2568
2569         interfaces = self.create_loopback_interfaces(4)
2570         self.nat_add_outside_interface(interfaces[0])
2571         self.nat_add_inside_interface(interfaces[1])
2572         self.nat_add_outside_interface(interfaces[2])
2573         self.nat_add_inside_interface(interfaces[2])
2574         self.vapi.nat44_ed_add_del_output_interface(
2575             sw_if_index=interfaces[3].sw_if_index, is_add=1
2576         )
2577
2578         nat_sw_if_indices = [
2579             i.sw_if_index
2580             for i in self.vapi.nat44_interface_dump()
2581             + list(self.vapi.vpp.details_iter(self.vapi.nat44_ed_output_interface_get))
2582         ]
2583         self.assertEqual(len(nat_sw_if_indices), len(interfaces))
2584
2585         loopbacks = []
2586         for i in interfaces:
2587             # delete nat-enabled interface
2588             self.assertIn(i.sw_if_index, nat_sw_if_indices)
2589             i.remove_vpp_config()
2590
2591             # create interface with the same index
2592             lo = VppLoInterface(self)
2593             loopbacks.append(lo)
2594             self.assertEqual(lo.sw_if_index, i.sw_if_index)
2595
2596             # check interface is not nat-enabled
2597             nat_sw_if_indices = [
2598                 i.sw_if_index
2599                 for i in self.vapi.nat44_interface_dump()
2600                 + list(
2601                     self.vapi.vpp.details_iter(self.vapi.nat44_ed_output_interface_get)
2602                 )
2603             ]
2604             self.assertNotIn(lo.sw_if_index, nat_sw_if_indices)
2605
2606         for i in loopbacks:
2607             i.remove_vpp_config()
2608
2609
2610 @tag_fixme_ubuntu2204
2611 class TestNAT44EDMW(TestNAT44ED):
2612     """NAT44ED MW Test Case"""
2613
2614     vpp_worker_count = 4
2615     max_sessions = 5000
2616
2617     def test_dynamic(self):
2618         """NAT44ED dynamic translation test"""
2619         pkt_count = 1500
2620         tcp_port_offset = 20
2621         udp_port_offset = 20
2622         icmp_id_offset = 20
2623
2624         self.nat_add_address(self.nat_addr)
2625         self.nat_add_inside_interface(self.pg0)
2626         self.nat_add_outside_interface(self.pg1)
2627
2628         # in2out
2629         tc1 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2630         uc1 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2631         ic1 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2632         dc1 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2633
2634         i2o_pkts = [[] for x in range(0, self.vpp_worker_count)]
2635
2636         for i in range(pkt_count):
2637             p = (
2638                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2639                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2640                 / TCP(sport=tcp_port_offset + i, dport=20)
2641             )
2642             i2o_pkts[p[TCP].sport % self.vpp_worker_count].append(p)
2643
2644             p = (
2645                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2646                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2647                 / UDP(sport=udp_port_offset + i, dport=20)
2648             )
2649             i2o_pkts[p[UDP].sport % self.vpp_worker_count].append(p)
2650
2651             p = (
2652                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2653                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2654                 / ICMP(id=icmp_id_offset + i, type="echo-request")
2655             )
2656             i2o_pkts[p[ICMP].id % self.vpp_worker_count].append(p)
2657
2658         for i in range(0, self.vpp_worker_count):
2659             if len(i2o_pkts[i]) > 0:
2660                 self.pg0.add_stream(i2o_pkts[i], worker=i)
2661
2662         self.pg_enable_capture(self.pg_interfaces)
2663         self.pg_start()
2664         capture = self.pg1.get_capture(pkt_count * 3, timeout=5)
2665
2666         if_idx = self.pg0.sw_if_index
2667         tc2 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2668         uc2 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2669         ic2 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2670         dc2 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2671
2672         self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2673         self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2674         self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2675         self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2676
2677         self.logger.info(self.vapi.cli("show trace"))
2678
2679         # out2in
2680         tc1 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2681         uc1 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2682         ic1 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2683         dc1 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2684
2685         recvd_tcp_ports = set()
2686         recvd_udp_ports = set()
2687         recvd_icmp_ids = set()
2688
2689         for p in capture:
2690             if TCP in p:
2691                 recvd_tcp_ports.add(p[TCP].sport)
2692             if UDP in p:
2693                 recvd_udp_ports.add(p[UDP].sport)
2694             if ICMP in p:
2695                 recvd_icmp_ids.add(p[ICMP].id)
2696
2697         recvd_tcp_ports = list(recvd_tcp_ports)
2698         recvd_udp_ports = list(recvd_udp_ports)
2699         recvd_icmp_ids = list(recvd_icmp_ids)
2700
2701         o2i_pkts = [[] for x in range(0, self.vpp_worker_count)]
2702         for i in range(pkt_count):
2703             p = (
2704                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2705                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2706                 / TCP(dport=choice(recvd_tcp_ports), sport=20)
2707             )
2708             o2i_pkts[p[TCP].dport % self.vpp_worker_count].append(p)
2709
2710             p = (
2711                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2712                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2713                 / UDP(dport=choice(recvd_udp_ports), sport=20)
2714             )
2715             o2i_pkts[p[UDP].dport % self.vpp_worker_count].append(p)
2716
2717             p = (
2718                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2719                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2720                 / ICMP(id=choice(recvd_icmp_ids), type="echo-reply")
2721             )
2722             o2i_pkts[p[ICMP].id % self.vpp_worker_count].append(p)
2723
2724         for i in range(0, self.vpp_worker_count):
2725             if len(o2i_pkts[i]) > 0:
2726                 self.pg1.add_stream(o2i_pkts[i], worker=i)
2727
2728         self.pg_enable_capture(self.pg_interfaces)
2729         self.pg_start()
2730         capture = self.pg0.get_capture(pkt_count * 3)
2731         for packet in capture:
2732             try:
2733                 self.assert_packet_checksums_valid(packet)
2734                 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2735                 if packet.haslayer(TCP):
2736                     self.assert_in_range(
2737                         packet[TCP].dport,
2738                         tcp_port_offset,
2739                         tcp_port_offset + pkt_count,
2740                         "dst TCP port",
2741                     )
2742                 elif packet.haslayer(UDP):
2743                     self.assert_in_range(
2744                         packet[UDP].dport,
2745                         udp_port_offset,
2746                         udp_port_offset + pkt_count,
2747                         "dst UDP port",
2748                     )
2749                 else:
2750                     self.assert_in_range(
2751                         packet[ICMP].id,
2752                         icmp_id_offset,
2753                         icmp_id_offset + pkt_count,
2754                         "ICMP id",
2755                     )
2756             except:
2757                 self.logger.error(
2758                     ppp("Unexpected or invalid packet (inside network):", packet)
2759                 )
2760                 raise
2761
2762         if_idx = self.pg1.sw_if_index
2763         tc2 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2764         uc2 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2765         ic2 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2766         dc2 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2767
2768         self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2769         self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2770         self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2771         self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2772
2773         sc = self.statistics["/nat44-ed/total-sessions"]
2774         self.assertEqual(
2775             sc[:, 0].sum(),
2776             len(recvd_tcp_ports) + len(recvd_udp_ports) + len(recvd_icmp_ids),
2777         )
2778
2779     def test_frag_in_order(self):
2780         """NAT44ED translate fragments arriving in order"""
2781
2782         self.nat_add_address(self.nat_addr)
2783         self.nat_add_inside_interface(self.pg0)
2784         self.nat_add_outside_interface(self.pg1)
2785
2786         self.frag_in_order(proto=IP_PROTOS.tcp, ignore_port=True)
2787         self.frag_in_order(proto=IP_PROTOS.udp, ignore_port=True)
2788         self.frag_in_order(proto=IP_PROTOS.icmp, ignore_port=True)
2789
2790     def test_frag_in_order_do_not_translate(self):
2791         """NAT44ED don't translate fragments arriving in order"""
2792
2793         self.nat_add_address(self.nat_addr)
2794         self.nat_add_inside_interface(self.pg0)
2795         self.nat_add_outside_interface(self.pg1)
2796         self.vapi.nat44_forwarding_enable_disable(enable=True)
2797
2798         self.frag_in_order(proto=IP_PROTOS.tcp, dont_translate=True)
2799
2800     def test_frag_out_of_order(self):
2801         """NAT44ED translate fragments arriving out of order"""
2802
2803         self.nat_add_address(self.nat_addr)
2804         self.nat_add_inside_interface(self.pg0)
2805         self.nat_add_outside_interface(self.pg1)
2806
2807         self.frag_out_of_order(proto=IP_PROTOS.tcp, ignore_port=True)
2808         self.frag_out_of_order(proto=IP_PROTOS.udp, ignore_port=True)
2809         self.frag_out_of_order(proto=IP_PROTOS.icmp, ignore_port=True)
2810
2811     def test_frag_in_order_in_plus_out(self):
2812         """NAT44ED in+out interface fragments in order"""
2813
2814         in_port = self.random_port()
2815         out_port = self.random_port()
2816
2817         self.nat_add_address(self.nat_addr)
2818         self.nat_add_inside_interface(self.pg0)
2819         self.nat_add_outside_interface(self.pg0)
2820         self.nat_add_inside_interface(self.pg1)
2821         self.nat_add_outside_interface(self.pg1)
2822
2823         # add static mappings for server
2824         self.nat_add_static_mapping(
2825             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.tcp
2826         )
2827         self.nat_add_static_mapping(
2828             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.udp
2829         )
2830         self.nat_add_static_mapping(
2831             self.server_addr, self.nat_addr, proto=IP_PROTOS.icmp
2832         )
2833
2834         # run tests for each protocol
2835         self.frag_in_order_in_plus_out(
2836             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.tcp
2837         )
2838         self.frag_in_order_in_plus_out(
2839             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.udp
2840         )
2841         self.frag_in_order_in_plus_out(
2842             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.icmp
2843         )
2844
2845     def test_frag_out_of_order_in_plus_out(self):
2846         """NAT44ED in+out interface fragments out of order"""
2847
2848         in_port = self.random_port()
2849         out_port = self.random_port()
2850
2851         self.nat_add_address(self.nat_addr)
2852         self.nat_add_inside_interface(self.pg0)
2853         self.nat_add_outside_interface(self.pg0)
2854         self.nat_add_inside_interface(self.pg1)
2855         self.nat_add_outside_interface(self.pg1)
2856
2857         # add static mappings for server
2858         self.nat_add_static_mapping(
2859             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.tcp
2860         )
2861         self.nat_add_static_mapping(
2862             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.udp
2863         )
2864         self.nat_add_static_mapping(
2865             self.server_addr, self.nat_addr, proto=IP_PROTOS.icmp
2866         )
2867
2868         # run tests for each protocol
2869         self.frag_out_of_order_in_plus_out(
2870             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.tcp
2871         )
2872         self.frag_out_of_order_in_plus_out(
2873             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.udp
2874         )
2875         self.frag_out_of_order_in_plus_out(
2876             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.icmp
2877         )
2878
2879     def test_reass_hairpinning(self):
2880         """NAT44ED fragments hairpinning"""
2881
2882         server_addr = self.pg0.remote_hosts[1].ip4
2883
2884         host_in_port = self.random_port()
2885         server_in_port = self.random_port()
2886         server_out_port = self.random_port()
2887
2888         self.nat_add_address(self.nat_addr)
2889         self.nat_add_inside_interface(self.pg0)
2890         self.nat_add_outside_interface(self.pg1)
2891
2892         # add static mapping for server
2893         self.nat_add_static_mapping(
2894             server_addr,
2895             self.nat_addr,
2896             server_in_port,
2897             server_out_port,
2898             proto=IP_PROTOS.tcp,
2899         )
2900         self.nat_add_static_mapping(
2901             server_addr,
2902             self.nat_addr,
2903             server_in_port,
2904             server_out_port,
2905             proto=IP_PROTOS.udp,
2906         )
2907         self.nat_add_static_mapping(server_addr, self.nat_addr)
2908
2909         self.reass_hairpinning(
2910             server_addr,
2911             server_in_port,
2912             server_out_port,
2913             host_in_port,
2914             proto=IP_PROTOS.tcp,
2915             ignore_port=True,
2916         )
2917         self.reass_hairpinning(
2918             server_addr,
2919             server_in_port,
2920             server_out_port,
2921             host_in_port,
2922             proto=IP_PROTOS.udp,
2923             ignore_port=True,
2924         )
2925         self.reass_hairpinning(
2926             server_addr,
2927             server_in_port,
2928             server_out_port,
2929             host_in_port,
2930             proto=IP_PROTOS.icmp,
2931             ignore_port=True,
2932         )
2933
2934     def test_session_limit_per_vrf(self):
2935         """NAT44ED per vrf session limit"""
2936
2937         inside = self.pg0
2938         inside_vrf10 = self.pg2
2939         outside = self.pg1
2940
2941         limit = 5
2942
2943         # 2 interfaces pg0, pg1 (vrf10, limit 1 tcp session)
2944         # non existing vrf_id makes process core dump
2945         self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=10)
2946
2947         self.nat_add_inside_interface(inside)
2948         self.nat_add_inside_interface(inside_vrf10)
2949         self.nat_add_outside_interface(outside)
2950
2951         # vrf independent
2952         self.nat_add_interface_address(outside)
2953
2954         # BUG: causing core dump - when bad vrf_id is specified
2955         # self.nat_add_address(outside.local_ip4, vrf_id=20)
2956
2957         stream = self.create_tcp_stream(inside_vrf10, outside, limit * 2)
2958         inside_vrf10.add_stream(stream)
2959
2960         self.pg_enable_capture(self.pg_interfaces)
2961         self.pg_start()
2962
2963         capture = outside.get_capture(limit)
2964
2965         stream = self.create_tcp_stream(inside, outside, limit * 2)
2966         inside.add_stream(stream)
2967
2968         self.pg_enable_capture(self.pg_interfaces)
2969         self.pg_start()
2970
2971         capture = outside.get_capture(len(stream))
2972
2973     def test_show_max_translations(self):
2974         """NAT44ED API test - max translations per thread"""
2975         config = self.vapi.nat44_show_running_config()
2976         self.assertEqual(self.max_sessions, config.sessions)
2977
2978     def test_lru_cleanup(self):
2979         """NAT44ED LRU cleanup algorithm"""
2980
2981         self.nat_add_address(self.nat_addr)
2982         self.nat_add_inside_interface(self.pg0)
2983         self.nat_add_outside_interface(self.pg1)
2984
2985         self.vapi.nat_set_timeouts(
2986             udp=1, tcp_established=7440, tcp_transitory=30, icmp=1
2987         )
2988
2989         tcp_port_out = self.init_tcp_session(self.pg0, self.pg1, 2000, 80)
2990         pkts = []
2991         for i in range(0, self.max_sessions - 1):
2992             p = (
2993                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2994                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
2995                 / UDP(sport=7000 + i, dport=80)
2996             )
2997             pkts.append(p)
2998
2999         self.pg0.add_stream(pkts)
3000         self.pg_enable_capture(self.pg_interfaces)
3001         self.pg_start()
3002         self.pg1.get_capture(len(pkts))
3003         self.virtual_sleep(1.5, "wait for timeouts")
3004
3005         pkts = []
3006         for i in range(0, self.max_sessions - 1):
3007             p = (
3008                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3009                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
3010                 / ICMP(id=8000 + i, type="echo-request")
3011             )
3012             pkts.append(p)
3013
3014         self.pg0.add_stream(pkts)
3015         self.pg_enable_capture(self.pg_interfaces)
3016         self.pg_start()
3017         self.pg1.get_capture(len(pkts))
3018
3019     def test_session_rst_timeout(self):
3020         """NAT44ED session RST timeouts"""
3021
3022         self.nat_add_address(self.nat_addr)
3023         self.nat_add_inside_interface(self.pg0)
3024         self.nat_add_outside_interface(self.pg1)
3025
3026         self.vapi.nat_set_timeouts(
3027             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
3028         )
3029
3030         self.init_tcp_session(
3031             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
3032         )
3033         p = (
3034             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3035             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3036             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="R")
3037         )
3038         self.send_and_expect(self.pg0, p, self.pg1)
3039
3040         self.virtual_sleep(6)
3041
3042         # The session is already closed
3043         p = (
3044             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3045             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3046             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
3047         )
3048         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
3049
3050         # The session can be re-opened
3051         p = (
3052             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3053             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3054             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="S")
3055         )
3056         self.send_and_expect(self.pg0, p, self.pg1)
3057
3058     def test_session_rst_established_timeout(self):
3059         """NAT44ED session RST timeouts"""
3060
3061         self.nat_add_address(self.nat_addr)
3062         self.nat_add_inside_interface(self.pg0)
3063         self.nat_add_outside_interface(self.pg1)
3064
3065         self.vapi.nat_set_timeouts(
3066             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
3067         )
3068
3069         self.init_tcp_session(
3070             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
3071         )
3072
3073         # Wait at least the transitory time, the session is in established
3074         # state anyway. RST followed by a data packet should move it to
3075         # transitory state.
3076         self.virtual_sleep(6)
3077         p = (
3078             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3079             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3080             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="R")
3081         )
3082         self.send_and_expect(self.pg0, p, self.pg1)
3083
3084         p = (
3085             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3086             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3087             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
3088         )
3089         self.send_and_expect(self.pg0, p, self.pg1)
3090
3091         # State is transitory, session should be closed after 6 seconds
3092         self.virtual_sleep(6)
3093
3094         p = (
3095             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3096             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3097             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
3098         )
3099         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
3100
3101     def test_dynamic_out_of_ports(self):
3102         """NAT44ED dynamic translation test: out of ports"""
3103
3104         self.nat_add_inside_interface(self.pg0)
3105         self.nat_add_outside_interface(self.pg1)
3106
3107         # in2out and no NAT addresses added
3108         pkts = self.create_stream_in(self.pg0, self.pg1)
3109
3110         self.send_and_assert_no_replies(
3111             self.pg0,
3112             pkts,
3113             msg="i2o pkts",
3114             stats_diff=self.no_diff
3115             | {
3116                 "err": {
3117                     "/err/nat44-ed-in2out-slowpath/out of ports": len(pkts),
3118                 },
3119                 self.pg0.sw_if_index: {
3120                     "/nat44-ed/in2out/slowpath/drops": len(pkts),
3121                 },
3122             },
3123         )
3124
3125         # in2out after NAT addresses added
3126         self.nat_add_address(self.nat_addr)
3127
3128         tcpn, udpn, icmpn = (
3129             sum(x) for x in zip(*((TCP in p, UDP in p, ICMP in p) for p in pkts))
3130         )
3131
3132         self.send_and_expect(
3133             self.pg0,
3134             pkts,
3135             self.pg1,
3136             msg="i2o pkts",
3137             stats_diff=self.no_diff
3138             | {
3139                 "err": {
3140                     "/err/nat44-ed-in2out-slowpath/out of ports": 0,
3141                 },
3142                 self.pg0.sw_if_index: {
3143                     "/nat44-ed/in2out/slowpath/drops": 0,
3144                     "/nat44-ed/in2out/slowpath/tcp": tcpn,
3145                     "/nat44-ed/in2out/slowpath/udp": udpn,
3146                     "/nat44-ed/in2out/slowpath/icmp": icmpn,
3147                 },
3148             },
3149         )
3150
3151     def test_unknown_proto(self):
3152         """NAT44ED translate packet with unknown protocol"""
3153
3154         self.nat_add_address(self.nat_addr)
3155         self.nat_add_inside_interface(self.pg0)
3156         self.nat_add_outside_interface(self.pg1)
3157
3158         # in2out
3159         p = (
3160             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3161             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3162             / TCP(sport=self.tcp_port_in, dport=20)
3163         )
3164         self.pg0.add_stream(p)
3165         self.pg_enable_capture(self.pg_interfaces)
3166         self.pg_start()
3167         p = self.pg1.get_capture(1)
3168
3169         p = (
3170             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3171             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3172             / GRE()
3173             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3174             / TCP(sport=1234, dport=1234)
3175         )
3176         self.pg0.add_stream(p)
3177         self.pg_enable_capture(self.pg_interfaces)
3178         self.pg_start()
3179         p = self.pg1.get_capture(1)
3180         packet = p[0]
3181         try:
3182             self.assertEqual(packet[IP].src, self.nat_addr)
3183             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3184             self.assertEqual(packet.haslayer(GRE), 1)
3185             self.assert_packet_checksums_valid(packet)
3186         except:
3187             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3188             raise
3189
3190         # out2in
3191         p = (
3192             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3193             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3194             / GRE()
3195             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3196             / TCP(sport=1234, dport=1234)
3197         )
3198         self.pg1.add_stream(p)
3199         self.pg_enable_capture(self.pg_interfaces)
3200         self.pg_start()
3201         p = self.pg0.get_capture(1)
3202         packet = p[0]
3203         try:
3204             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3205             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3206             self.assertEqual(packet.haslayer(GRE), 1)
3207             self.assert_packet_checksums_valid(packet)
3208         except:
3209             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3210             raise
3211
3212     def test_hairpinning_unknown_proto(self):
3213         """NAT44ED translate packet with unknown protocol - hairpinning"""
3214         host = self.pg0.remote_hosts[0]
3215         server = self.pg0.remote_hosts[1]
3216         host_in_port = 1234
3217         server_out_port = 8765
3218         server_nat_ip = "10.0.0.11"
3219
3220         self.nat_add_address(self.nat_addr)
3221         self.nat_add_inside_interface(self.pg0)
3222         self.nat_add_outside_interface(self.pg1)
3223
3224         # add static mapping for server
3225         self.nat_add_static_mapping(server.ip4, server_nat_ip)
3226
3227         # host to server
3228         p = (
3229             Ether(src=host.mac, dst=self.pg0.local_mac)
3230             / IP(src=host.ip4, dst=server_nat_ip)
3231             / TCP(sport=host_in_port, dport=server_out_port)
3232         )
3233         self.pg0.add_stream(p)
3234         self.pg_enable_capture(self.pg_interfaces)
3235         self.pg_start()
3236         self.pg0.get_capture(1)
3237
3238         p = (
3239             Ether(dst=self.pg0.local_mac, src=host.mac)
3240             / IP(src=host.ip4, dst=server_nat_ip)
3241             / GRE()
3242             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3243             / TCP(sport=1234, dport=1234)
3244         )
3245         self.pg0.add_stream(p)
3246         self.pg_enable_capture(self.pg_interfaces)
3247         self.pg_start()
3248         p = self.pg0.get_capture(1)
3249         packet = p[0]
3250         try:
3251             self.assertEqual(packet[IP].src, self.nat_addr)
3252             self.assertEqual(packet[IP].dst, server.ip4)
3253             self.assertEqual(packet.haslayer(GRE), 1)
3254             self.assert_packet_checksums_valid(packet)
3255         except:
3256             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3257             raise
3258
3259         # server to host
3260         p = (
3261             Ether(dst=self.pg0.local_mac, src=server.mac)
3262             / IP(src=server.ip4, dst=self.nat_addr)
3263             / GRE()
3264             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3265             / TCP(sport=1234, dport=1234)
3266         )
3267         self.pg0.add_stream(p)
3268         self.pg_enable_capture(self.pg_interfaces)
3269         self.pg_start()
3270         p = self.pg0.get_capture(1)
3271         packet = p[0]
3272         try:
3273             self.assertEqual(packet[IP].src, server_nat_ip)
3274             self.assertEqual(packet[IP].dst, host.ip4)
3275             self.assertEqual(packet.haslayer(GRE), 1)
3276             self.assert_packet_checksums_valid(packet)
3277         except:
3278             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3279             raise
3280
3281     def test_output_feature_and_service(self):
3282         """NAT44ED interface output feature and services"""
3283         external_addr = "1.2.3.4"
3284         external_port = 80
3285         local_port = 8080
3286
3287         self.vapi.nat44_forwarding_enable_disable(enable=1)
3288         self.nat_add_address(self.nat_addr)
3289         flags = self.config_flags.NAT_IS_ADDR_ONLY
3290         self.vapi.nat44_add_del_identity_mapping(
3291             ip_address=self.pg1.remote_ip4,
3292             sw_if_index=0xFFFFFFFF,
3293             flags=flags,
3294             is_add=1,
3295         )
3296         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
3297         self.nat_add_static_mapping(
3298             self.pg0.remote_ip4,
3299             external_addr,
3300             local_port,
3301             external_port,
3302             proto=IP_PROTOS.tcp,
3303             flags=flags,
3304         )
3305
3306         self.nat_add_inside_interface(self.pg0)
3307         self.nat_add_outside_interface(self.pg0)
3308         self.vapi.nat44_ed_add_del_output_interface(
3309             sw_if_index=self.pg1.sw_if_index, is_add=1
3310         )
3311
3312         # from client to service
3313         p = (
3314             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3315             / IP(src=self.pg1.remote_ip4, dst=external_addr)
3316             / TCP(sport=12345, dport=external_port)
3317         )
3318         self.pg1.add_stream(p)
3319         self.pg_enable_capture(self.pg_interfaces)
3320         self.pg_start()
3321         capture = self.pg0.get_capture(1)
3322         p = capture[0]
3323         try:
3324             ip = p[IP]
3325             tcp = p[TCP]
3326             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3327             self.assertEqual(tcp.dport, local_port)
3328             self.assert_packet_checksums_valid(p)
3329         except:
3330             self.logger.error(ppp("Unexpected or invalid packet:", p))
3331             raise
3332
3333         # from service back to client
3334         p = (
3335             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3336             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3337             / TCP(sport=local_port, dport=12345)
3338         )
3339         self.pg0.add_stream(p)
3340         self.pg_enable_capture(self.pg_interfaces)
3341         self.pg_start()
3342         capture = self.pg1.get_capture(1)
3343         p = capture[0]
3344         try:
3345             ip = p[IP]
3346             tcp = p[TCP]
3347             self.assertEqual(ip.src, external_addr)
3348             self.assertEqual(tcp.sport, external_port)
3349             self.assert_packet_checksums_valid(p)
3350         except:
3351             self.logger.error(ppp("Unexpected or invalid packet:", p))
3352             raise
3353
3354         # from local network host to external network
3355         pkts = self.create_stream_in(self.pg0, self.pg1)
3356         self.pg0.add_stream(pkts)
3357         self.pg_enable_capture(self.pg_interfaces)
3358         self.pg_start()
3359         capture = self.pg1.get_capture(len(pkts))
3360         self.verify_capture_out(capture, ignore_port=True)
3361         pkts = self.create_stream_in(self.pg0, self.pg1)
3362         self.pg0.add_stream(pkts)
3363         self.pg_enable_capture(self.pg_interfaces)
3364         self.pg_start()
3365         capture = self.pg1.get_capture(len(pkts))
3366         self.verify_capture_out(capture, ignore_port=True)
3367
3368         # from external network back to local network host
3369         pkts = self.create_stream_out(self.pg1)
3370         self.pg1.add_stream(pkts)
3371         self.pg_enable_capture(self.pg_interfaces)
3372         self.pg_start()
3373         capture = self.pg0.get_capture(len(pkts))
3374         self.verify_capture_in(capture, self.pg0)
3375
3376     def test_output_feature_and_service3(self):
3377         """NAT44ED interface output feature and DST NAT"""
3378         external_addr = "1.2.3.4"
3379         external_port = 80
3380         local_port = 8080
3381
3382         self.vapi.nat44_forwarding_enable_disable(enable=1)
3383         self.nat_add_address(self.nat_addr)
3384         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
3385         self.nat_add_static_mapping(
3386             self.pg1.remote_ip4,
3387             external_addr,
3388             local_port,
3389             external_port,
3390             proto=IP_PROTOS.tcp,
3391             flags=flags,
3392         )
3393
3394         self.nat_add_inside_interface(self.pg0)
3395         self.nat_add_outside_interface(self.pg0)
3396         self.vapi.nat44_ed_add_del_output_interface(
3397             sw_if_index=self.pg1.sw_if_index, is_add=1
3398         )
3399
3400         p = (
3401             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3402             / IP(src=self.pg0.remote_ip4, dst=external_addr)
3403             / TCP(sport=12345, dport=external_port)
3404         )
3405         self.pg0.add_stream(p)
3406         self.pg_enable_capture(self.pg_interfaces)
3407         self.pg_start()
3408         capture = self.pg1.get_capture(1)
3409         p = capture[0]
3410         try:
3411             ip = p[IP]
3412             tcp = p[TCP]
3413             self.assertEqual(ip.src, self.pg0.remote_ip4)
3414             self.assertEqual(tcp.sport, 12345)
3415             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3416             self.assertEqual(tcp.dport, local_port)
3417             self.assert_packet_checksums_valid(p)
3418         except:
3419             self.logger.error(ppp("Unexpected or invalid packet:", p))
3420             raise
3421
3422         p = (
3423             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3424             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
3425             / TCP(sport=local_port, dport=12345)
3426         )
3427         self.pg1.add_stream(p)
3428         self.pg_enable_capture(self.pg_interfaces)
3429         self.pg_start()
3430         capture = self.pg0.get_capture(1)
3431         p = capture[0]
3432         try:
3433             ip = p[IP]
3434             tcp = p[TCP]
3435             self.assertEqual(ip.src, external_addr)
3436             self.assertEqual(tcp.sport, external_port)
3437             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3438             self.assertEqual(tcp.dport, 12345)
3439             self.assert_packet_checksums_valid(p)
3440         except:
3441             self.logger.error(ppp("Unexpected or invalid packet:", p))
3442             raise
3443
3444     def test_self_twice_nat_lb_negative(self):
3445         """NAT44ED Self Twice NAT local service load balancing (negative test)"""
3446         self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, client_id=2)
3447
3448     def test_self_twice_nat_negative(self):
3449         """NAT44ED Self Twice NAT (negative test)"""
3450         self.twice_nat_common(self_twice_nat=True)
3451
3452     def test_static_lb_multi_clients(self):
3453         """NAT44ED local service load balancing - multiple clients"""
3454
3455         external_addr = self.nat_addr
3456         external_port = 80
3457         local_port = 8080
3458         server1 = self.pg0.remote_hosts[0]
3459         server2 = self.pg0.remote_hosts[1]
3460         server3 = self.pg0.remote_hosts[2]
3461
3462         locals = [
3463             {"addr": server1.ip4, "port": local_port, "probability": 90, "vrf_id": 0},
3464             {"addr": server2.ip4, "port": local_port, "probability": 10, "vrf_id": 0},
3465         ]
3466
3467         flags = self.config_flags.NAT_IS_INSIDE
3468         self.vapi.nat44_interface_add_del_feature(
3469             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3470         )
3471         self.vapi.nat44_interface_add_del_feature(
3472             sw_if_index=self.pg1.sw_if_index, is_add=1
3473         )
3474
3475         self.nat_add_address(self.nat_addr)
3476         self.vapi.nat44_add_del_lb_static_mapping(
3477             is_add=1,
3478             external_addr=external_addr,
3479             external_port=external_port,
3480             protocol=IP_PROTOS.tcp,
3481             local_num=len(locals),
3482             locals=locals,
3483         )
3484
3485         server1_n = 0
3486         server2_n = 0
3487         clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3488         pkts = []
3489         for client in clients:
3490             p = (
3491                 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3492                 / IP(src=client, dst=self.nat_addr)
3493                 / TCP(sport=12345, dport=external_port)
3494             )
3495             pkts.append(p)
3496         self.pg1.add_stream(pkts)
3497         self.pg_enable_capture(self.pg_interfaces)
3498         self.pg_start()
3499         capture = self.pg0.get_capture(len(pkts))
3500         for p in capture:
3501             if p[IP].dst == server1.ip4:
3502                 server1_n += 1
3503             else:
3504                 server2_n += 1
3505         self.assertGreaterEqual(server1_n, server2_n)
3506
3507         local = {
3508             "addr": server3.ip4,
3509             "port": local_port,
3510             "probability": 20,
3511             "vrf_id": 0,
3512         }
3513
3514         # add new back-end
3515         self.vapi.nat44_lb_static_mapping_add_del_local(
3516             is_add=1,
3517             external_addr=external_addr,
3518             external_port=external_port,
3519             local=local,
3520             protocol=IP_PROTOS.tcp,
3521         )
3522         server1_n = 0
3523         server2_n = 0
3524         server3_n = 0
3525         clients = ip4_range(self.pg1.remote_ip4, 60, 110)
3526         pkts = []
3527         for client in clients:
3528             p = (
3529                 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3530                 / IP(src=client, dst=self.nat_addr)
3531                 / TCP(sport=12346, dport=external_port)
3532             )
3533             pkts.append(p)
3534         self.assertGreater(len(pkts), 0)
3535         self.pg1.add_stream(pkts)
3536         self.pg_enable_capture(self.pg_interfaces)
3537         self.pg_start()
3538         capture = self.pg0.get_capture(len(pkts))
3539         for p in capture:
3540             if p[IP].dst == server1.ip4:
3541                 server1_n += 1
3542             elif p[IP].dst == server2.ip4:
3543                 server2_n += 1
3544             else:
3545                 server3_n += 1
3546         self.assertGreater(server1_n, 0)
3547         self.assertGreater(server2_n, 0)
3548         self.assertGreater(server3_n, 0)
3549
3550         local = {
3551             "addr": server2.ip4,
3552             "port": local_port,
3553             "probability": 10,
3554             "vrf_id": 0,
3555         }
3556
3557         # remove one back-end
3558         self.vapi.nat44_lb_static_mapping_add_del_local(
3559             is_add=0,
3560             external_addr=external_addr,
3561             external_port=external_port,
3562             local=local,
3563             protocol=IP_PROTOS.tcp,
3564         )
3565         server1_n = 0
3566         server2_n = 0
3567         server3_n = 0
3568         self.pg1.add_stream(pkts)
3569         self.pg_enable_capture(self.pg_interfaces)
3570         self.pg_start()
3571         capture = self.pg0.get_capture(len(pkts))
3572         for p in capture:
3573             if p[IP].dst == server1.ip4:
3574                 server1_n += 1
3575             elif p[IP].dst == server2.ip4:
3576                 server2_n += 1
3577             else:
3578                 server3_n += 1
3579         self.assertGreater(server1_n, 0)
3580         self.assertEqual(server2_n, 0)
3581         self.assertGreater(server3_n, 0)
3582
3583     # put zzz in front of syslog test name so that it runs as a last test
3584     # setting syslog sender cannot be undone and if it is set, it messes
3585     # with self.send_and_assert_no_replies functionality
3586     def test_zzz_syslog_sess(self):
3587         """NAT44ED Test syslog session creation and deletion"""
3588         self.vapi.syslog_set_filter(self.syslog_severity.SYSLOG_API_SEVERITY_INFO)
3589         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
3590
3591         self.nat_add_address(self.nat_addr)
3592         self.nat_add_inside_interface(self.pg0)
3593         self.nat_add_outside_interface(self.pg1)
3594
3595         p = (
3596             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3597             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3598             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
3599         )
3600         self.pg0.add_stream(p)
3601         self.pg_enable_capture(self.pg_interfaces)
3602         self.pg_start()
3603         capture = self.pg1.get_capture(1)
3604         self.tcp_port_out = capture[0][TCP].sport
3605         capture = self.pg3.get_capture(1)
3606         self.verify_syslog_sess(capture[0][Raw].load, "SADD")
3607
3608         self.pg_enable_capture(self.pg_interfaces)
3609         self.pg_start()
3610         self.nat_add_address(self.nat_addr, is_add=0)
3611         capture = self.pg3.get_capture(1)
3612         self.verify_syslog_sess(capture[0][Raw].load, "SDEL")
3613
3614     # put zzz in front of syslog test name so that it runs as a last test
3615     # setting syslog sender cannot be undone and if it is set, it messes
3616     # with self.send_and_assert_no_replies functionality
3617     def test_zzz_syslog_sess_reopen(self):
3618         """Syslog events for session reopen"""
3619         self.vapi.syslog_set_filter(self.syslog_severity.SYSLOG_API_SEVERITY_INFO)
3620         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
3621
3622         self.nat_add_address(self.nat_addr)
3623         self.nat_add_inside_interface(self.pg0)
3624         self.nat_add_outside_interface(self.pg1)
3625
3626         # SYN in2out
3627         p = (
3628             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3629             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3630             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
3631         )
3632         capture = self.send_and_expect(self.pg0, p, self.pg1)[0]
3633         self.tcp_port_out = capture[0][TCP].sport
3634         capture = self.pg3.get_capture(1)
3635         self.verify_syslog_sess(capture[0][Raw].load, "SADD")
3636
3637         # SYN out2in
3638         p = (
3639             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3640             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3641             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="SA")
3642         )
3643         self.send_and_expect(self.pg1, p, self.pg0)
3644
3645         p = (
3646             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3647             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3648             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")
3649         )
3650         self.send_and_expect(self.pg0, p, self.pg1)
3651
3652         # FIN in2out
3653         p = (
3654             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3655             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3656             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="F")
3657         )
3658         self.send_and_expect(self.pg0, p, self.pg1)
3659
3660         # FIN out2in
3661         p = (
3662             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3663             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3664             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="F")
3665         )
3666         self.send_and_expect(self.pg1, p, self.pg0)
3667
3668         self.init_tcp_session(
3669             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
3670         )
3671
3672         # 2 records should be produced - first one del & add
3673         capture = self.pg3.get_capture(2)
3674         self.verify_syslog_sess(capture[0][Raw].load, "SDEL")
3675         self.verify_syslog_sess(capture[1][Raw].load, "SADD")
3676
3677     def test_twice_nat_interface_addr(self):
3678         """NAT44ED Acquire twice NAT addresses from interface"""
3679         flags = self.config_flags.NAT_IS_TWICE_NAT
3680         self.vapi.nat44_add_del_interface_addr(
3681             sw_if_index=self.pg11.sw_if_index, flags=flags, is_add=1
3682         )
3683
3684         # no address in NAT pool
3685         adresses = self.vapi.nat44_address_dump()
3686         self.assertEqual(0, len(adresses))
3687
3688         # configure interface address and check NAT address pool
3689         self.pg11.config_ip4()
3690         adresses = self.vapi.nat44_address_dump()
3691         self.assertEqual(1, len(adresses))
3692         self.assertEqual(str(adresses[0].ip_address), self.pg11.local_ip4)
3693         self.assertEqual(adresses[0].flags, flags)
3694
3695         # remove interface address and check NAT address pool
3696         self.pg11.unconfig_ip4()
3697         adresses = self.vapi.nat44_address_dump()
3698         self.assertEqual(0, len(adresses))
3699
3700     def test_output_feature_stateful_acl(self):
3701         """NAT44ED output feature works with stateful ACL"""
3702
3703         self.nat_add_address(self.nat_addr)
3704         self.vapi.nat44_ed_add_del_output_interface(
3705             sw_if_index=self.pg1.sw_if_index, is_add=1
3706         )
3707
3708         # First ensure that the NAT is working sans ACL
3709
3710         # send packets out2in, no sessions yet so packets should drop
3711         pkts_out2in = self.create_stream_out(self.pg1)
3712         self.send_and_assert_no_replies(self.pg1, pkts_out2in)
3713
3714         # send packets into inside intf, ensure received via outside intf
3715         pkts_in2out = self.create_stream_in(self.pg0, self.pg1)
3716         capture = self.send_and_expect(
3717             self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)
3718         )
3719         self.verify_capture_out(capture, ignore_port=True)
3720
3721         # send out2in again, with sessions created it should work now
3722         pkts_out2in = self.create_stream_out(self.pg1)
3723         capture = self.send_and_expect(
3724             self.pg1, pkts_out2in, self.pg0, len(pkts_out2in)
3725         )
3726         self.verify_capture_in(capture, self.pg0)
3727
3728         # Create an ACL blocking everything
3729         out2in_deny_rule = AclRule(is_permit=0)
3730         out2in_acl = VppAcl(self, rules=[out2in_deny_rule])
3731         out2in_acl.add_vpp_config()
3732
3733         # create an ACL to permit/reflect everything
3734         in2out_reflect_rule = AclRule(is_permit=2)
3735         in2out_acl = VppAcl(self, rules=[in2out_reflect_rule])
3736         in2out_acl.add_vpp_config()
3737
3738         # apply as input acl on interface and confirm it blocks everything
3739         acl_if = VppAclInterface(
3740             self, sw_if_index=self.pg1.sw_if_index, n_input=1, acls=[out2in_acl]
3741         )
3742         acl_if.add_vpp_config()
3743         self.send_and_assert_no_replies(self.pg1, pkts_out2in)
3744
3745         # apply output acl
3746         acl_if.acls = [out2in_acl, in2out_acl]
3747         acl_if.add_vpp_config()
3748         # send in2out to generate ACL state (NAT state was created earlier)
3749         capture = self.send_and_expect(
3750             self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)
3751         )
3752         self.verify_capture_out(capture, ignore_port=True)
3753
3754         # send out2in again. ACL state exists so it should work now.
3755         # TCP packets with the syn flag set also need the ack flag
3756         for p in pkts_out2in:
3757             if p.haslayer(TCP) and p[TCP].flags & 0x02:
3758                 p[TCP].flags |= 0x10
3759         capture = self.send_and_expect(
3760             self.pg1, pkts_out2in, self.pg0, len(pkts_out2in)
3761         )
3762         self.verify_capture_in(capture, self.pg0)
3763         self.logger.info(self.vapi.cli("show trace"))
3764
3765     def test_tcp_close(self):
3766         """NAT44ED Close TCP session from inside network - output feature"""
3767         config = self.vapi.nat44_show_running_config()
3768         old_timeouts = config.timeouts
3769         new_transitory = 2
3770         self.vapi.nat_set_timeouts(
3771             udp=old_timeouts.udp,
3772             tcp_established=old_timeouts.tcp_established,
3773             icmp=old_timeouts.icmp,
3774             tcp_transitory=new_transitory,
3775         )
3776
3777         self.vapi.nat44_forwarding_enable_disable(enable=1)
3778         self.nat_add_address(self.pg1.local_ip4)
3779         twice_nat_addr = "10.0.1.3"
3780         service_ip = "192.168.16.150"
3781         self.nat_add_address(twice_nat_addr, twice_nat=1)
3782
3783         flags = self.config_flags.NAT_IS_INSIDE
3784         self.vapi.nat44_interface_add_del_feature(
3785             sw_if_index=self.pg0.sw_if_index, is_add=1
3786         )
3787         self.vapi.nat44_interface_add_del_feature(
3788             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3789         )
3790         self.vapi.nat44_ed_add_del_output_interface(
3791             is_add=1, sw_if_index=self.pg1.sw_if_index
3792         )
3793
3794         flags = (
3795             self.config_flags.NAT_IS_OUT2IN_ONLY | self.config_flags.NAT_IS_TWICE_NAT
3796         )
3797         self.nat_add_static_mapping(
3798             self.pg0.remote_ip4, service_ip, 80, 80, proto=IP_PROTOS.tcp, flags=flags
3799         )
3800         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3801         start_sessnum = len(sessions)
3802
3803         # SYN packet out->in
3804         p = (
3805             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3806             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3807             / TCP(sport=33898, dport=80, flags="S")
3808         )
3809         capture = self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3810         p = capture[0]
3811         tcp_port = p[TCP].sport
3812
3813         # SYN + ACK packet in->out
3814         p = (
3815             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3816             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3817             / TCP(sport=80, dport=tcp_port, flags="SA")
3818         )
3819         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3820
3821         # ACK packet out->in
3822         p = (
3823             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3824             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3825             / TCP(sport=33898, dport=80, flags="A")
3826         )
3827         self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3828
3829         # FIN packet in -> out
3830         p = (
3831             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3832             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3833             / TCP(sport=80, dport=tcp_port, flags="FA", seq=100, ack=300)
3834         )
3835         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3836
3837         # FIN+ACK packet out -> in
3838         p = (
3839             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3840             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3841             / TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101)
3842         )
3843         self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3844
3845         # ACK packet in -> out
3846         p = (
3847             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3848             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3849             / TCP(sport=80, dport=tcp_port, flags="A", seq=101, ack=301)
3850         )
3851         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3852
3853         # session now in transitory timeout, but traffic still flows
3854         # try FIN packet out->in
3855         p = (
3856             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3857             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3858             / TCP(sport=33898, dport=80, flags="F")
3859         )
3860         self.pg1.add_stream(p)
3861         self.pg_enable_capture(self.pg_interfaces)
3862         self.pg_start()
3863
3864         self.virtual_sleep(new_transitory, "wait for transitory timeout")
3865         self.pg0.get_capture(1)
3866
3867         # session should still exist
3868         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3869         self.assertEqual(len(sessions) - start_sessnum, 1)
3870
3871         # send FIN+ACK packet out -> in - will cause session to be wiped
3872         # but won't create a new session
3873         p = (
3874             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3875             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3876             / TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101)
3877         )
3878         self.send_and_assert_no_replies(self.pg1, p)
3879         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3880         self.assertEqual(len(sessions) - start_sessnum, 0)
3881
3882     def test_tcp_session_close_in(self):
3883         """NAT44ED Close TCP session from inside network"""
3884
3885         in_port = self.tcp_port_in
3886         out_port = 10505
3887         ext_port = self.tcp_external_port
3888
3889         self.nat_add_address(self.nat_addr)
3890         self.nat_add_inside_interface(self.pg0)
3891         self.nat_add_outside_interface(self.pg1)
3892         self.nat_add_static_mapping(
3893             self.pg0.remote_ip4,
3894             self.nat_addr,
3895             in_port,
3896             out_port,
3897             proto=IP_PROTOS.tcp,
3898             flags=self.config_flags.NAT_IS_TWICE_NAT,
3899         )
3900
3901         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3902         session_n = len(sessions)
3903
3904         self.vapi.nat_set_timeouts(
3905             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
3906         )
3907
3908         self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
3909
3910         # FIN packet in -> out
3911         p = (
3912             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3913             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3914             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
3915         )
3916         self.send_and_expect(self.pg0, p, self.pg1)
3917         pkts = []
3918
3919         # ACK packet out -> in
3920         p = (
3921             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3922             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3923             / TCP(sport=ext_port, dport=out_port, flags="A", seq=300, ack=101)
3924         )
3925         pkts.append(p)
3926
3927         # FIN packet out -> in
3928         p = (
3929             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3930             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3931             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
3932         )
3933         pkts.append(p)
3934
3935         self.send_and_expect(self.pg1, pkts, self.pg0)
3936
3937         # ACK packet in -> out
3938         p = (
3939             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3940             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3941             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3942         )
3943         self.send_and_expect(self.pg0, p, self.pg1)
3944
3945         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3946         self.assertEqual(len(sessions) - session_n, 1)
3947
3948         # retransmit FIN packet out -> in
3949         p = (
3950             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3951             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3952             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
3953         )
3954
3955         self.send_and_expect(self.pg1, p, self.pg0)
3956
3957         # retransmit ACK packet in -> out
3958         p = (
3959             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3960             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3961             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3962         )
3963         self.send_and_expect(self.pg0, p, self.pg1)
3964
3965         self.virtual_sleep(3)
3966         # retransmit ACK packet in -> out - this will cause session to be wiped
3967         p = (
3968             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3969             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3970             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3971         )
3972         self.send_and_assert_no_replies(self.pg0, p)
3973         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3974         self.assertEqual(len(sessions) - session_n, 0)
3975
3976     def test_tcp_session_close_out(self):
3977         """NAT44ED Close TCP session from outside network"""
3978
3979         in_port = self.tcp_port_in
3980         out_port = 10505
3981         ext_port = self.tcp_external_port
3982
3983         self.nat_add_address(self.nat_addr)
3984         self.nat_add_inside_interface(self.pg0)
3985         self.nat_add_outside_interface(self.pg1)
3986         self.nat_add_static_mapping(
3987             self.pg0.remote_ip4,
3988             self.nat_addr,
3989             in_port,
3990             out_port,
3991             proto=IP_PROTOS.tcp,
3992             flags=self.config_flags.NAT_IS_TWICE_NAT,
3993         )
3994
3995         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3996         session_n = len(sessions)
3997
3998         self.vapi.nat_set_timeouts(
3999             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4000         )
4001
4002         _ = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4003
4004         # FIN packet out -> in
4005         p = (
4006             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4007             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4008             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=100, ack=300)
4009         )
4010         self.pg1.add_stream(p)
4011         self.pg_enable_capture(self.pg_interfaces)
4012         self.pg_start()
4013         self.pg0.get_capture(1)
4014
4015         # FIN+ACK packet in -> out
4016         p = (
4017             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4018             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4019             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=300, ack=101)
4020         )
4021
4022         self.pg0.add_stream(p)
4023         self.pg_enable_capture(self.pg_interfaces)
4024         self.pg_start()
4025         self.pg1.get_capture(1)
4026
4027         # ACK packet out -> in
4028         p = (
4029             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4030             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4031             / TCP(sport=ext_port, dport=out_port, flags="A", seq=101, ack=301)
4032         )
4033         self.pg1.add_stream(p)
4034         self.pg_enable_capture(self.pg_interfaces)
4035         self.pg_start()
4036         self.pg0.get_capture(1)
4037
4038         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4039         self.assertEqual(len(sessions) - session_n, 1)
4040
4041         # retransmit FIN packet out -> in
4042         p = (
4043             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4044             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4045             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
4046         )
4047         self.send_and_expect(self.pg1, p, self.pg0)
4048
4049         # retransmit ACK packet in -> out
4050         p = (
4051             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4052             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4053             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4054         )
4055         self.send_and_expect(self.pg0, p, self.pg1)
4056
4057         self.virtual_sleep(3)
4058         # retransmit ACK packet in -> out - this will cause session to be wiped
4059         p = (
4060             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4061             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4062             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4063         )
4064         self.send_and_assert_no_replies(self.pg0, p)
4065         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4066         self.assertEqual(len(sessions) - session_n, 0)
4067
4068     def test_tcp_session_close_simultaneous(self):
4069         """Simultaneous TCP close from both sides"""
4070
4071         in_port = self.tcp_port_in
4072         ext_port = 10505
4073
4074         self.nat_add_address(self.nat_addr)
4075         self.nat_add_inside_interface(self.pg0)
4076         self.nat_add_outside_interface(self.pg1)
4077         self.nat_add_static_mapping(
4078             self.pg0.remote_ip4,
4079             self.nat_addr,
4080             in_port,
4081             ext_port,
4082             proto=IP_PROTOS.tcp,
4083             flags=self.config_flags.NAT_IS_TWICE_NAT,
4084         )
4085
4086         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4087         session_n = len(sessions)
4088
4089         self.vapi.nat_set_timeouts(
4090             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4091         )
4092
4093         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4094
4095         # FIN packet in -> out
4096         p = (
4097             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4098             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4099             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4100         )
4101         self.send_and_expect(self.pg0, p, self.pg1)
4102
4103         # FIN packet out -> in
4104         p = (
4105             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4106             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4107             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4108         )
4109         self.send_and_expect(self.pg1, p, self.pg0)
4110
4111         # ACK packet in -> out
4112         p = (
4113             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4114             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4115             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4116         )
4117         self.send_and_expect(self.pg0, p, self.pg1)
4118
4119         # ACK packet out -> in
4120         p = (
4121             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4122             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4123             / TCP(sport=ext_port, dport=out_port, flags="A", seq=301, ack=101)
4124         )
4125         self.send_and_expect(self.pg1, p, self.pg0)
4126
4127         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4128         self.assertEqual(len(sessions) - session_n, 1)
4129
4130         # retransmit FIN packet out -> in
4131         p = (
4132             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4133             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4134             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
4135         )
4136         self.send_and_expect(self.pg1, p, self.pg0)
4137
4138         # retransmit ACK packet in -> out
4139         p = (
4140             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4141             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4142             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4143         )
4144         self.send_and_expect(self.pg0, p, self.pg1)
4145
4146         self.virtual_sleep(3)
4147         # retransmit ACK packet in -> out - this will cause session to be wiped
4148         p = (
4149             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4150             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4151             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4152         )
4153         self.pg_send(self.pg0, p)
4154         self.send_and_assert_no_replies(self.pg0, p)
4155         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4156         self.assertEqual(len(sessions) - session_n, 0)
4157
4158     def test_tcp_session_half_reopen_inside(self):
4159         """TCP session in FIN/FIN state not reopened by in2out SYN only"""
4160         in_port = self.tcp_port_in
4161         ext_port = 10505
4162
4163         self.nat_add_address(self.nat_addr)
4164         self.nat_add_inside_interface(self.pg0)
4165         self.nat_add_outside_interface(self.pg1)
4166         self.nat_add_static_mapping(
4167             self.pg0.remote_ip4,
4168             self.nat_addr,
4169             in_port,
4170             ext_port,
4171             proto=IP_PROTOS.tcp,
4172             flags=self.config_flags.NAT_IS_TWICE_NAT,
4173         )
4174
4175         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4176         session_n = len(sessions)
4177
4178         self.vapi.nat_set_timeouts(
4179             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4180         )
4181
4182         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4183
4184         # FIN packet in -> out
4185         p = (
4186             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4187             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4188             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4189         )
4190         self.send_and_expect(self.pg0, p, self.pg1)
4191
4192         # FIN packet out -> in
4193         p = (
4194             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4195             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4196             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4197         )
4198         self.send_and_expect(self.pg1, p, self.pg0)
4199
4200         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4201         self.assertEqual(len(sessions) - session_n, 1)
4202
4203         # send SYN packet in -> out
4204         p = (
4205             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4206             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4207             / TCP(sport=in_port, dport=ext_port, flags="S", seq=101, ack=301)
4208         )
4209         self.send_and_expect(self.pg0, p, self.pg1)
4210
4211         self.virtual_sleep(3)
4212         # send ACK packet in -> out - session should be wiped
4213         p = (
4214             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4215             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4216             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4217         )
4218         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
4219         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4220         self.assertEqual(len(sessions) - session_n, 0)
4221
4222     def test_tcp_session_half_reopen_outside(self):
4223         """TCP session in FIN/FIN state not reopened by out2in SYN only"""
4224         in_port = self.tcp_port_in
4225         ext_port = 10505
4226
4227         self.nat_add_address(self.nat_addr)
4228         self.nat_add_inside_interface(self.pg0)
4229         self.nat_add_outside_interface(self.pg1)
4230         self.nat_add_static_mapping(
4231             self.pg0.remote_ip4,
4232             self.nat_addr,
4233             in_port,
4234             ext_port,
4235             proto=IP_PROTOS.tcp,
4236             flags=self.config_flags.NAT_IS_TWICE_NAT,
4237         )
4238
4239         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4240         session_n = len(sessions)
4241
4242         self.vapi.nat_set_timeouts(
4243             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4244         )
4245
4246         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4247
4248         # FIN packet in -> out
4249         p = (
4250             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4251             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4252             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4253         )
4254         self.send_and_expect(self.pg0, p, self.pg1)
4255
4256         # FIN packet out -> in
4257         p = (
4258             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4259             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4260             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4261         )
4262         self.send_and_expect(self.pg1, p, self.pg0)
4263
4264         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4265         self.assertEqual(len(sessions) - session_n, 1)
4266
4267         # send SYN packet out -> in
4268         p = (
4269             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4270             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4271             / TCP(sport=ext_port, dport=out_port, flags="S", seq=300, ack=101)
4272         )
4273         self.send_and_expect(self.pg1, p, self.pg0)
4274
4275         self.virtual_sleep(3)
4276         # send ACK packet in -> out - session should be wiped
4277         p = (
4278             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4279             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4280             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4281         )
4282         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
4283         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4284         self.assertEqual(len(sessions) - session_n, 0)
4285
4286     def test_tcp_session_reopen(self):
4287         """TCP session in FIN/FIN state reopened by SYN from both sides"""
4288         in_port = self.tcp_port_in
4289         ext_port = 10505
4290
4291         self.nat_add_address(self.nat_addr)
4292         self.nat_add_inside_interface(self.pg0)
4293         self.nat_add_outside_interface(self.pg1)
4294         self.nat_add_static_mapping(
4295             self.pg0.remote_ip4,
4296             self.nat_addr,
4297             in_port,
4298             ext_port,
4299             proto=IP_PROTOS.tcp,
4300             flags=self.config_flags.NAT_IS_TWICE_NAT,
4301         )
4302
4303         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4304         session_n = len(sessions)
4305
4306         self.vapi.nat_set_timeouts(
4307             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4308         )
4309
4310         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4311
4312         # FIN packet in -> out
4313         p = (
4314             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4315             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4316             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4317         )
4318         self.send_and_expect(self.pg0, p, self.pg1)
4319
4320         # FIN packet out -> in
4321         p = (
4322             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4323             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4324             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4325         )
4326         self.send_and_expect(self.pg1, p, self.pg0)
4327
4328         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4329         self.assertEqual(len(sessions) - session_n, 1)
4330
4331         # send SYN packet out -> in
4332         p = (
4333             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4334             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4335             / TCP(sport=ext_port, dport=out_port, flags="S", seq=300, ack=101)
4336         )
4337         self.send_and_expect(self.pg1, p, self.pg0)
4338
4339         # send SYN packet in -> out
4340         p = (
4341             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4342             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4343             / TCP(sport=in_port, dport=ext_port, flags="SA", seq=101, ack=301)
4344         )
4345         self.send_and_expect(self.pg0, p, self.pg1)
4346
4347         # send ACK packet out -> in
4348         p = (
4349             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4350             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4351             / TCP(sport=ext_port, dport=out_port, flags="A", seq=300, ack=101)
4352         )
4353         self.send_and_expect(self.pg1, p, self.pg0)
4354
4355         self.virtual_sleep(3)
4356         # send ACK packet in -> out - should be forwarded and session alive
4357         p = (
4358             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4359             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4360             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4361         )
4362         self.send_and_expect(self.pg0, p, self.pg1)
4363         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4364         self.assertEqual(len(sessions) - session_n, 1)
4365
4366     def test_dynamic_vrf(self):
4367         """NAT44ED dynamic translation test: different VRF"""
4368
4369         vrf_id_in = 33
4370         vrf_id_out = 34
4371
4372         self.nat_add_address(self.nat_addr, vrf_id=vrf_id_in)
4373
4374         try:
4375             self.configure_ip4_interface(self.pg7, table_id=vrf_id_in)
4376             self.configure_ip4_interface(self.pg8, table_id=vrf_id_out)
4377
4378             self.nat_add_inside_interface(self.pg7)
4379             self.nat_add_outside_interface(self.pg8)
4380
4381             # just basic stuff nothing special
4382             pkts = self.create_stream_in(self.pg7, self.pg8)
4383             self.pg7.add_stream(pkts)
4384             self.pg_enable_capture(self.pg_interfaces)
4385             self.pg_start()
4386             capture = self.pg8.get_capture(len(pkts))
4387             self.verify_capture_out(capture, ignore_port=True)
4388
4389             pkts = self.create_stream_out(self.pg8)
4390             self.pg8.add_stream(pkts)
4391             self.pg_enable_capture(self.pg_interfaces)
4392             self.pg_start()
4393             capture = self.pg7.get_capture(len(pkts))
4394             self.verify_capture_in(capture, self.pg7)
4395
4396         finally:
4397             self.pg7.unconfig()
4398             self.pg8.unconfig()
4399
4400             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id_in})
4401             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id_out})
4402
4403     def test_dynamic_output_feature_vrf(self):
4404         """NAT44ED dynamic translation test: output-feature, VRF"""
4405
4406         # other then default (0)
4407         new_vrf_id = 22
4408
4409         self.nat_add_address(self.nat_addr)
4410         self.vapi.nat44_ed_add_del_output_interface(
4411             sw_if_index=self.pg8.sw_if_index, is_add=1
4412         )
4413         try:
4414             self.configure_ip4_interface(self.pg7, table_id=new_vrf_id)
4415             self.configure_ip4_interface(self.pg8, table_id=new_vrf_id)
4416
4417             # in2out
4418             tcpn = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
4419             udpn = self.statistics["/nat44-ed/in2out/slowpath/udp"]
4420             icmpn = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
4421             drops = self.statistics["/nat44-ed/in2out/slowpath/drops"]
4422
4423             pkts = self.create_stream_in(self.pg7, self.pg8)
4424             self.pg7.add_stream(pkts)
4425             self.pg_enable_capture(self.pg_interfaces)
4426             self.pg_start()
4427             capture = self.pg8.get_capture(len(pkts))
4428             self.verify_capture_out(capture, ignore_port=True)
4429
4430             if_idx = self.pg8.sw_if_index
4431             cnt = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
4432             self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
4433             cnt = self.statistics["/nat44-ed/in2out/slowpath/udp"]
4434             self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
4435             cnt = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
4436             self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
4437             cnt = self.statistics["/nat44-ed/in2out/slowpath/drops"]
4438             self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
4439
4440             # out2in
4441             tcpn = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
4442             udpn = self.statistics["/nat44-ed/out2in/fastpath/udp"]
4443             icmpn = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
4444             drops = self.statistics["/nat44-ed/out2in/fastpath/drops"]
4445
4446             pkts = self.create_stream_out(self.pg8)
4447             self.pg8.add_stream(pkts)
4448             self.pg_enable_capture(self.pg_interfaces)
4449             self.pg_start()
4450             capture = self.pg7.get_capture(len(pkts))
4451             self.verify_capture_in(capture, self.pg7)
4452
4453             if_idx = self.pg8.sw_if_index
4454             cnt = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
4455             self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
4456             cnt = self.statistics["/nat44-ed/out2in/fastpath/udp"]
4457             self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
4458             cnt = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
4459             self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
4460             cnt = self.statistics["/nat44-ed/out2in/fastpath/drops"]
4461             self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
4462
4463             sessions = self.statistics["/nat44-ed/total-sessions"]
4464             self.assertEqual(sessions[:, 0].sum(), 3)
4465
4466         finally:
4467             self.pg7.unconfig()
4468             self.pg8.unconfig()
4469
4470             self.vapi.ip_table_add_del(is_add=0, table={"table_id": new_vrf_id})
4471
4472     def test_next_src_nat(self):
4473         """NAT44ED On way back forward packet to nat44-in2out node."""
4474
4475         twice_nat_addr = "10.0.1.3"
4476         external_port = 80
4477         local_port = 8080
4478         post_twice_nat_port = 0
4479
4480         self.vapi.nat44_forwarding_enable_disable(enable=1)
4481         self.nat_add_address(twice_nat_addr, twice_nat=1)
4482         flags = (
4483             self.config_flags.NAT_IS_OUT2IN_ONLY
4484             | self.config_flags.NAT_IS_SELF_TWICE_NAT
4485         )
4486         self.nat_add_static_mapping(
4487             self.pg6.remote_ip4,
4488             self.pg1.remote_ip4,
4489             local_port,
4490             external_port,
4491             proto=IP_PROTOS.tcp,
4492             vrf_id=1,
4493             flags=flags,
4494         )
4495         self.vapi.nat44_interface_add_del_feature(
4496             sw_if_index=self.pg6.sw_if_index, is_add=1
4497         )
4498
4499         p = (
4500             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
4501             / IP(src=self.pg6.remote_ip4, dst=self.pg1.remote_ip4)
4502             / TCP(sport=12345, dport=external_port)
4503         )
4504         self.pg6.add_stream(p)
4505         self.pg_enable_capture(self.pg_interfaces)
4506         self.pg_start()
4507         capture = self.pg6.get_capture(1)
4508         p = capture[0]
4509         try:
4510             ip = p[IP]
4511             tcp = p[TCP]
4512             self.assertEqual(ip.src, twice_nat_addr)
4513             self.assertNotEqual(tcp.sport, 12345)
4514             post_twice_nat_port = tcp.sport
4515             self.assertEqual(ip.dst, self.pg6.remote_ip4)
4516             self.assertEqual(tcp.dport, local_port)
4517             self.assert_packet_checksums_valid(p)
4518         except:
4519             self.logger.error(ppp("Unexpected or invalid packet:", p))
4520             raise
4521
4522         p = (
4523             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
4524             / IP(src=self.pg6.remote_ip4, dst=twice_nat_addr)
4525             / TCP(sport=local_port, dport=post_twice_nat_port)
4526         )
4527         self.pg6.add_stream(p)
4528         self.pg_enable_capture(self.pg_interfaces)
4529         self.pg_start()
4530         capture = self.pg6.get_capture(1)
4531         p = capture[0]
4532         try:
4533             ip = p[IP]
4534             tcp = p[TCP]
4535             self.assertEqual(ip.src, self.pg1.remote_ip4)
4536             self.assertEqual(tcp.sport, external_port)
4537             self.assertEqual(ip.dst, self.pg6.remote_ip4)
4538             self.assertEqual(tcp.dport, 12345)
4539             self.assert_packet_checksums_valid(p)
4540         except:
4541             self.logger.error(ppp("Unexpected or invalid packet:", p))
4542             raise
4543
4544     def test_one_armed_nat44_static(self):
4545         """NAT44ED One armed NAT and 1:1 NAPT asymmetrical rule"""
4546
4547         remote_host = self.pg4.remote_hosts[0]
4548         local_host = self.pg4.remote_hosts[1]
4549         external_port = 80
4550         local_port = 8080
4551         eh_port_in = 0
4552
4553         self.vapi.nat44_forwarding_enable_disable(enable=1)
4554         self.nat_add_address(self.nat_addr, twice_nat=1)
4555         flags = (
4556             self.config_flags.NAT_IS_OUT2IN_ONLY | self.config_flags.NAT_IS_TWICE_NAT
4557         )
4558         self.nat_add_static_mapping(
4559             local_host.ip4,
4560             self.nat_addr,
4561             local_port,
4562             external_port,
4563             proto=IP_PROTOS.tcp,
4564             flags=flags,
4565         )
4566         flags = self.config_flags.NAT_IS_INSIDE
4567         self.vapi.nat44_interface_add_del_feature(
4568             sw_if_index=self.pg4.sw_if_index, is_add=1
4569         )
4570         self.vapi.nat44_interface_add_del_feature(
4571             sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
4572         )
4573
4574         # from client to service
4575         p = (
4576             Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
4577             / IP(src=remote_host.ip4, dst=self.nat_addr)
4578             / TCP(sport=12345, dport=external_port)
4579         )
4580         self.pg4.add_stream(p)
4581         self.pg_enable_capture(self.pg_interfaces)
4582         self.pg_start()
4583         capture = self.pg4.get_capture(1)
4584         p = capture[0]
4585         try:
4586             ip = p[IP]
4587             tcp = p[TCP]
4588             self.assertEqual(ip.dst, local_host.ip4)
4589             self.assertEqual(ip.src, self.nat_addr)
4590             self.assertEqual(tcp.dport, local_port)
4591             self.assertNotEqual(tcp.sport, 12345)
4592             eh_port_in = tcp.sport
4593             self.assert_packet_checksums_valid(p)
4594         except:
4595             self.logger.error(ppp("Unexpected or invalid packet:", p))
4596             raise
4597
4598         # from service back to client
4599         p = (
4600             Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
4601             / IP(src=local_host.ip4, dst=self.nat_addr)
4602             / TCP(sport=local_port, dport=eh_port_in)
4603         )
4604         self.pg4.add_stream(p)
4605         self.pg_enable_capture(self.pg_interfaces)
4606         self.pg_start()
4607         capture = self.pg4.get_capture(1)
4608         p = capture[0]
4609         try:
4610             ip = p[IP]
4611             tcp = p[TCP]
4612             self.assertEqual(ip.src, self.nat_addr)
4613             self.assertEqual(ip.dst, remote_host.ip4)
4614             self.assertEqual(tcp.sport, external_port)
4615             self.assertEqual(tcp.dport, 12345)
4616             self.assert_packet_checksums_valid(p)
4617         except:
4618             self.logger.error(ppp("Unexpected or invalid packet:", p))
4619             raise
4620
4621     def test_icmp_error_fwd_outbound(self):
4622         """NAT44ED ICMP error outbound with forwarding enabled"""
4623
4624         # Ensure that an outbound ICMP error message is properly associated
4625         # with the inbound forward bypass session it is related to.
4626         payload = "H" * 10
4627
4628         self.nat_add_address(self.nat_addr)
4629         self.nat_add_inside_interface(self.pg0)
4630         self.nat_add_outside_interface(self.pg1)
4631
4632         # enable forwarding and initiate connection out2in
4633         self.vapi.nat44_forwarding_enable_disable(enable=1)
4634         p1 = (
4635             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4636             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
4637             / UDP(sport=21, dport=20)
4638             / payload
4639         )
4640
4641         self.pg1.add_stream(p1)
4642         self.pg_enable_capture(self.pg_interfaces)
4643         self.pg_start()
4644         capture = self.pg0.get_capture(1)[0]
4645
4646         self.logger.info(self.vapi.cli("show nat44 sessions"))
4647
4648         # reply with ICMP error message in2out
4649         # We cannot reliably retrieve forward bypass sessions via the API.
4650         # session dumps for a user will only look on the worker that the
4651         # user is supposed to be mapped to in2out. The forward bypass session
4652         # is not necessarily created on that worker.
4653         p2 = (
4654             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4655             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4656             / ICMP(type="dest-unreach", code="port-unreachable")
4657             / capture[IP:]
4658         )
4659
4660         self.pg0.add_stream(p2)
4661         self.pg_enable_capture(self.pg_interfaces)
4662         self.pg_start()
4663         capture = self.pg1.get_capture(1)[0]
4664
4665         self.logger.info(self.vapi.cli("show nat44 sessions"))
4666
4667         self.logger.info(ppp("p1 packet:", p1))
4668         self.logger.info(ppp("p2 packet:", p2))
4669         self.logger.info(ppp("capture packet:", capture))
4670
4671     def test_tcp_session_open_retransmit1(self):
4672         """NAT44ED Open TCP session with SYN,ACK retransmit 1
4673
4674         The client does not receive the [SYN,ACK] or the
4675         ACK from the client is lost. Therefore, the [SYN, ACK]
4676         is retransmitted by the server.
4677         """
4678
4679         in_port = self.tcp_port_in
4680         ext_port = self.tcp_external_port
4681         payload = "H" * 10
4682
4683         self.nat_add_address(self.nat_addr)
4684         self.nat_add_inside_interface(self.pg0)
4685         self.nat_add_outside_interface(self.pg1)
4686
4687         self.vapi.nat_set_timeouts(
4688             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
4689         )
4690         # SYN packet in->out
4691         p = (
4692             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4693             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4694             / TCP(sport=in_port, dport=ext_port, flags="S")
4695         )
4696         p = self.send_and_expect(self.pg0, p, self.pg1)[0]
4697         out_port = p[TCP].sport
4698
4699         # SYN + ACK packet out->in
4700         p = (
4701             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4702             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4703             / TCP(sport=ext_port, dport=out_port, flags="SA")
4704         )
4705         self.send_and_expect(self.pg1, p, self.pg0)
4706
4707         # ACK in->out does not arrive
4708
4709         # resent SYN + ACK packet out->in
4710         p = (
4711             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4712             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4713             / TCP(sport=ext_port, dport=out_port, flags="SA")
4714         )
4715         self.send_and_expect(self.pg1, p, self.pg0)
4716
4717         # ACK packet in->out
4718         p = (
4719             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4720             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4721             / TCP(sport=in_port, dport=ext_port, flags="A")
4722         )
4723         self.send_and_expect(self.pg0, p, self.pg1)
4724
4725         # Verify that the data can be transmitted after the transitory time
4726         self.virtual_sleep(6)
4727
4728         p = (
4729             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4730             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4731             / TCP(sport=in_port, dport=ext_port, flags="PA")
4732             / Raw(payload)
4733         )
4734         self.send_and_expect(self.pg0, p, self.pg1)
4735
4736     def test_tcp_session_open_retransmit2(self):
4737         """NAT44ED Open TCP session with SYN,ACK retransmit 2
4738
4739         The ACK is lost to the server after the TCP session is opened.
4740         Data is sent by the client, then the [SYN,ACK] is
4741         retransmitted by the server.
4742         """
4743
4744         in_port = self.tcp_port_in
4745         ext_port = self.tcp_external_port
4746         payload = "H" * 10
4747
4748         self.nat_add_address(self.nat_addr)
4749         self.nat_add_inside_interface(self.pg0)
4750         self.nat_add_outside_interface(self.pg1)
4751
4752         self.vapi.nat_set_timeouts(
4753             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
4754         )
4755         # SYN packet in->out
4756         p = (
4757             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4758             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4759             / TCP(sport=in_port, dport=ext_port, flags="S")
4760         )
4761         p = self.send_and_expect(self.pg0, p, self.pg1)[0]
4762         out_port = p[TCP].sport
4763
4764         # SYN + ACK packet out->in
4765         p = (
4766             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4767             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4768             / TCP(sport=ext_port, dport=out_port, flags="SA")
4769         )
4770         self.send_and_expect(self.pg1, p, self.pg0)
4771
4772         # ACK packet in->out -- not received by the server
4773         p = (
4774             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4775             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4776             / TCP(sport=in_port, dport=ext_port, flags="A")
4777         )
4778         self.send_and_expect(self.pg0, p, self.pg1)
4779
4780         # PUSH + ACK packet in->out
4781         p = (
4782             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4783             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4784             / TCP(sport=in_port, dport=ext_port, flags="PA")
4785             / Raw(payload)
4786         )
4787         self.send_and_expect(self.pg0, p, self.pg1)
4788
4789         # resent SYN + ACK packet out->in
4790         p = (
4791             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4792             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4793             / TCP(sport=ext_port, dport=out_port, flags="SA")
4794         )
4795         self.send_and_expect(self.pg1, p, self.pg0)
4796
4797         # resent ACK packet in->out
4798         p = (
4799             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4800             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4801             / TCP(sport=in_port, dport=ext_port, flags="A")
4802         )
4803         self.send_and_expect(self.pg0, p, self.pg1)
4804
4805         # resent PUSH + ACK packet in->out
4806         p = (
4807             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4808             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4809             / TCP(sport=in_port, dport=ext_port, flags="PA")
4810             / Raw(payload)
4811         )
4812         self.send_and_expect(self.pg0, p, self.pg1)
4813
4814         # ACK packet out->in
4815         p = (
4816             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4817             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4818             / TCP(sport=ext_port, dport=out_port, flags="A")
4819         )
4820         self.send_and_expect(self.pg1, p, self.pg0)
4821
4822         # Verify that the data can be transmitted after the transitory time
4823         self.virtual_sleep(6)
4824
4825         p = (
4826             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4827             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4828             / TCP(sport=in_port, dport=ext_port, flags="PA")
4829             / Raw(payload)
4830         )
4831         self.send_and_expect(self.pg0, p, self.pg1)
4832
4833     def test_dynamic_ports_exhausted(self):
4834         """NAT44ED dynamic translation test: address ports exhaused"""
4835
4836         sessions_per_batch = 128
4837         n_available_ports = 65536 - 1024
4838         n_sessions = n_available_ports + 2 * sessions_per_batch
4839
4840         # set high enough session limit for ports to be exhausted
4841         self.plugin_disable()
4842         self.plugin_enable(max_sessions=n_sessions)
4843
4844         self.nat_add_inside_interface(self.pg0)
4845         self.nat_add_outside_interface(self.pg1)
4846
4847         # set timeouts to high for sessions to reallistically expire
4848         config = self.vapi.nat44_show_running_config()
4849         old_timeouts = config.timeouts
4850         self.vapi.nat_set_timeouts(
4851             udp=21600,
4852             tcp_established=old_timeouts.tcp_established,
4853             tcp_transitory=old_timeouts.tcp_transitory,
4854             icmp=old_timeouts.icmp,
4855         )
4856
4857         # in2out after NAT addresses added
4858         self.nat_add_address(self.nat_addr)
4859
4860         for i in range(n_sessions // sessions_per_batch):
4861             pkts = self.create_udp_stream(
4862                 self.pg0,
4863                 self.pg1,
4864                 sessions_per_batch,
4865                 base_port=i * sessions_per_batch + 100,
4866             )
4867
4868             self.pg0.add_stream(pkts)
4869             self.pg_start()
4870
4871             err = self.statistics.get_err_counter(
4872                 "/err/nat44-ed-in2out-slowpath/out of ports"
4873             )
4874             if err > sessions_per_batch:
4875                 break
4876
4877         # Check for ports to be used no more than once
4878         ports = set()
4879         sessions = self.vapi.cli("show nat44 sessions")
4880         rx = re.compile(
4881             f" *o2i flow: match: saddr {self.pg1.remote_ip4} sport [0-9]+ daddr {self.nat_addr} dport ([0-9]+) proto UDP.*"
4882         )
4883         for line in sessions.splitlines():
4884             m = rx.match(line)
4885             if m:
4886                 port = int(m.groups()[0])
4887                 self.assertNotIn(port, ports)
4888                 ports.add(port)
4889
4890         self.assertGreaterEqual(err, sessions_per_batch)
4891
4892
4893 if __name__ == "__main__":
4894     unittest.main(testRunner=VppTestRunner)