8a8c96870f45e47fdff9e31c6ab60bc2d0e17419
[vpp.git] / test / test_nat44_ed.py
1 #!/usr/bin/env python3
2
3 import unittest
4 from io import BytesIO
5 from random import randint, choice
6
7 import scapy.compat
8 from framework import tag_fixme_ubuntu2204, is_distro_ubuntu2204
9 from framework import VppTestCase, VppTestRunner, VppLoInterface
10 from scapy.data import IP_PROTOS
11 from scapy.layers.inet import IP, TCP, UDP, ICMP, GRE
12 from scapy.layers.inet import IPerror, TCPerror
13 from scapy.layers.l2 import Ether
14 from scapy.packet import Raw
15 from syslog_rfc5424_parser import SyslogMessage, ParseError
16 from syslog_rfc5424_parser.constants import SyslogSeverity
17 from util import ppp, pr, ip4_range
18 from vpp_acl import AclRule, VppAcl, VppAclInterface
19 from vpp_ip_route import VppIpRoute, VppRoutePath
20 from vpp_papi import VppEnum
21 from util import StatsDiff
22
23
24 class TestNAT44ED(VppTestCase):
25     """NAT44ED Test Case"""
26
27     nat_addr = "10.0.10.3"
28
29     tcp_port_in = 6303
30     tcp_port_out = 6303
31
32     udp_port_in = 6304
33     udp_port_out = 6304
34
35     icmp_id_in = 6305
36     icmp_id_out = 6305
37
38     tcp_external_port = 80
39
40     max_sessions = 100
41
42     def setUp(self):
43         super().setUp()
44         self.plugin_enable()
45
46     def tearDown(self):
47         super().tearDown()
48         if not self.vpp_dead:
49             self.plugin_disable()
50
51     def plugin_enable(self):
52         self.vapi.nat44_ed_plugin_enable_disable(sessions=self.max_sessions, enable=1)
53
54     def plugin_disable(self):
55         self.vapi.nat44_ed_plugin_enable_disable(enable=0)
56
57     @property
58     def config_flags(self):
59         return VppEnum.vl_api_nat_config_flags_t
60
61     @property
62     def nat44_config_flags(self):
63         return VppEnum.vl_api_nat44_config_flags_t
64
65     @property
66     def syslog_severity(self):
67         return VppEnum.vl_api_syslog_severity_t
68
69     @property
70     def server_addr(self):
71         return self.pg1.remote_hosts[0].ip4
72
73     @staticmethod
74     def random_port():
75         return randint(1024, 65535)
76
77     @staticmethod
78     def proto2layer(proto):
79         if proto == IP_PROTOS.tcp:
80             return TCP
81         elif proto == IP_PROTOS.udp:
82             return UDP
83         elif proto == IP_PROTOS.icmp:
84             return ICMP
85         else:
86             raise Exception("Unsupported protocol")
87
88     @classmethod
89     def create_and_add_ip4_table(cls, i, table_id=0):
90         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": table_id})
91         i.set_table_ip4(table_id)
92
93     @classmethod
94     def configure_ip4_interface(cls, i, hosts=0, table_id=None):
95         if table_id:
96             cls.create_and_add_ip4_table(i, table_id)
97
98         i.admin_up()
99         i.config_ip4()
100         i.resolve_arp()
101
102         if hosts:
103             i.generate_remote_hosts(hosts)
104             i.configure_ipv4_neighbors()
105
106     @classmethod
107     def nat_add_interface_address(cls, i):
108         cls.vapi.nat44_add_del_interface_addr(sw_if_index=i.sw_if_index, is_add=1)
109
110     def nat_add_inside_interface(self, i):
111         self.vapi.nat44_interface_add_del_feature(
112             flags=self.config_flags.NAT_IS_INSIDE, sw_if_index=i.sw_if_index, is_add=1
113         )
114
115     def nat_add_outside_interface(self, i):
116         self.vapi.nat44_interface_add_del_feature(
117             flags=self.config_flags.NAT_IS_OUTSIDE, sw_if_index=i.sw_if_index, is_add=1
118         )
119
120     def nat_add_address(self, address, twice_nat=0, vrf_id=0xFFFFFFFF, is_add=1):
121         flags = self.config_flags.NAT_IS_TWICE_NAT if twice_nat else 0
122         self.vapi.nat44_add_del_address_range(
123             first_ip_address=address,
124             last_ip_address=address,
125             vrf_id=vrf_id,
126             is_add=is_add,
127             flags=flags,
128         )
129
130     def nat_add_static_mapping(
131         self,
132         local_ip,
133         external_ip="0.0.0.0",
134         local_port=0,
135         external_port=0,
136         vrf_id=0,
137         is_add=1,
138         external_sw_if_index=0xFFFFFFFF,
139         proto=0,
140         tag="",
141         flags=0,
142     ):
143
144         if not (local_port and external_port):
145             flags |= self.config_flags.NAT_IS_ADDR_ONLY
146
147         self.vapi.nat44_add_del_static_mapping(
148             is_add=is_add,
149             local_ip_address=local_ip,
150             external_ip_address=external_ip,
151             external_sw_if_index=external_sw_if_index,
152             local_port=local_port,
153             external_port=external_port,
154             vrf_id=vrf_id,
155             protocol=proto,
156             flags=flags,
157             tag=tag,
158         )
159
160     @classmethod
161     def setUpClass(cls):
162         super().setUpClass()
163         if is_distro_ubuntu2204 == True and not hasattr(cls, "vpp"):
164             return
165
166         cls.create_pg_interfaces(range(12))
167         cls.interfaces = list(cls.pg_interfaces[:4])
168
169         cls.create_and_add_ip4_table(cls.pg2, 10)
170
171         for i in cls.interfaces:
172             cls.configure_ip4_interface(i, hosts=3)
173
174         # test specific (test-multiple-vrf)
175         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 1})
176
177         # test specific (test-one-armed-nat44-static)
178         cls.pg4.generate_remote_hosts(2)
179         cls.pg4.config_ip4()
180         cls.vapi.sw_interface_add_del_address(
181             sw_if_index=cls.pg4.sw_if_index, prefix="10.0.0.1/24"
182         )
183         cls.pg4.admin_up()
184         cls.pg4.resolve_arp()
185         cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
186         cls.pg4.resolve_arp()
187
188         # test specific interface (pg5)
189         cls.pg5._local_ip4 = "10.1.1.1"
190         cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
191         cls.pg5.set_table_ip4(1)
192         cls.pg5.config_ip4()
193         cls.pg5.admin_up()
194         cls.pg5.resolve_arp()
195
196         # test specific interface (pg6)
197         cls.pg6._local_ip4 = "10.1.2.1"
198         cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
199         cls.pg6.set_table_ip4(1)
200         cls.pg6.config_ip4()
201         cls.pg6.admin_up()
202         cls.pg6.resolve_arp()
203
204         rl = list()
205
206         rl.append(
207             VppIpRoute(
208                 cls,
209                 "0.0.0.0",
210                 0,
211                 [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=0)],
212                 register=False,
213                 table_id=1,
214             )
215         )
216         rl.append(
217             VppIpRoute(
218                 cls,
219                 "0.0.0.0",
220                 0,
221                 [VppRoutePath(cls.pg1.local_ip4, cls.pg1.sw_if_index)],
222                 register=False,
223             )
224         )
225         rl.append(
226             VppIpRoute(
227                 cls,
228                 cls.pg5.remote_ip4,
229                 32,
230                 [VppRoutePath("0.0.0.0", cls.pg5.sw_if_index)],
231                 register=False,
232                 table_id=1,
233             )
234         )
235         rl.append(
236             VppIpRoute(
237                 cls,
238                 cls.pg6.remote_ip4,
239                 32,
240                 [VppRoutePath("0.0.0.0", cls.pg6.sw_if_index)],
241                 register=False,
242                 table_id=1,
243             )
244         )
245         rl.append(
246             VppIpRoute(
247                 cls,
248                 cls.pg6.remote_ip4,
249                 16,
250                 [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=1)],
251                 register=False,
252                 table_id=0,
253             )
254         )
255
256         for r in rl:
257             r.add_vpp_config()
258
259         cls.no_diff = StatsDiff(
260             {
261                 pg.sw_if_index: {
262                     "/nat44-ed/in2out/fastpath/tcp": 0,
263                     "/nat44-ed/in2out/fastpath/udp": 0,
264                     "/nat44-ed/in2out/fastpath/icmp": 0,
265                     "/nat44-ed/in2out/fastpath/drops": 0,
266                     "/nat44-ed/in2out/slowpath/tcp": 0,
267                     "/nat44-ed/in2out/slowpath/udp": 0,
268                     "/nat44-ed/in2out/slowpath/icmp": 0,
269                     "/nat44-ed/in2out/slowpath/drops": 0,
270                     "/nat44-ed/in2out/fastpath/tcp": 0,
271                     "/nat44-ed/in2out/fastpath/udp": 0,
272                     "/nat44-ed/in2out/fastpath/icmp": 0,
273                     "/nat44-ed/in2out/fastpath/drops": 0,
274                     "/nat44-ed/in2out/slowpath/tcp": 0,
275                     "/nat44-ed/in2out/slowpath/udp": 0,
276                     "/nat44-ed/in2out/slowpath/icmp": 0,
277                     "/nat44-ed/in2out/slowpath/drops": 0,
278                 }
279                 for pg in cls.pg_interfaces
280             }
281         )
282
283     def get_err_counter(self, path):
284         return self.statistics.get_err_counter(path)
285
286     def reass_hairpinning(
287         self,
288         server_addr,
289         server_in_port,
290         server_out_port,
291         host_in_port,
292         proto=IP_PROTOS.tcp,
293         ignore_port=False,
294     ):
295         layer = self.proto2layer(proto)
296
297         if proto == IP_PROTOS.tcp:
298             data = b"A" * 4 + b"B" * 16 + b"C" * 3
299         else:
300             data = b"A" * 16 + b"B" * 16 + b"C" * 3
301
302         # send packet from host to server
303         pkts = self.create_stream_frag(
304             self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto
305         )
306         self.pg0.add_stream(pkts)
307         self.pg_enable_capture(self.pg_interfaces)
308         self.pg_start()
309         frags = self.pg0.get_capture(len(pkts))
310         p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr)
311         if proto != IP_PROTOS.icmp:
312             if not ignore_port:
313                 self.assertNotEqual(p[layer].sport, host_in_port)
314             self.assertEqual(p[layer].dport, server_in_port)
315         else:
316             if not ignore_port:
317                 self.assertNotEqual(p[layer].id, host_in_port)
318         self.assertEqual(data, p[Raw].load)
319
320     def frag_out_of_order(
321         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
322     ):
323         layer = self.proto2layer(proto)
324
325         if proto == IP_PROTOS.tcp:
326             data = b"A" * 4 + b"B" * 16 + b"C" * 3
327         else:
328             data = b"A" * 16 + b"B" * 16 + b"C" * 3
329         self.port_in = self.random_port()
330
331         for i in range(2):
332             # in2out
333             pkts = self.create_stream_frag(
334                 self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
335             )
336             pkts.reverse()
337             self.pg0.add_stream(pkts)
338             self.pg_enable_capture(self.pg_interfaces)
339             self.pg_start()
340             frags = self.pg1.get_capture(len(pkts))
341             if not dont_translate:
342                 p = self.reass_frags_and_verify(
343                     frags, self.nat_addr, self.pg1.remote_ip4
344                 )
345             else:
346                 p = self.reass_frags_and_verify(
347                     frags, self.pg0.remote_ip4, self.pg1.remote_ip4
348                 )
349             if proto != IP_PROTOS.icmp:
350                 if not dont_translate:
351                     self.assertEqual(p[layer].dport, 20)
352                     if not ignore_port:
353                         self.assertNotEqual(p[layer].sport, self.port_in)
354                 else:
355                     self.assertEqual(p[layer].sport, self.port_in)
356             else:
357                 if not ignore_port:
358                     if not dont_translate:
359                         self.assertNotEqual(p[layer].id, self.port_in)
360                     else:
361                         self.assertEqual(p[layer].id, self.port_in)
362             self.assertEqual(data, p[Raw].load)
363
364             # out2in
365             if not dont_translate:
366                 dst_addr = self.nat_addr
367             else:
368                 dst_addr = self.pg0.remote_ip4
369             if proto != IP_PROTOS.icmp:
370                 sport = 20
371                 dport = p[layer].sport
372             else:
373                 sport = p[layer].id
374                 dport = 0
375             pkts = self.create_stream_frag(
376                 self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
377             )
378             pkts.reverse()
379             self.pg1.add_stream(pkts)
380             self.pg_enable_capture(self.pg_interfaces)
381             self.logger.info(self.vapi.cli("show trace"))
382             self.pg_start()
383             frags = self.pg0.get_capture(len(pkts))
384             p = self.reass_frags_and_verify(
385                 frags, self.pg1.remote_ip4, self.pg0.remote_ip4
386             )
387             if proto != IP_PROTOS.icmp:
388                 self.assertEqual(p[layer].sport, 20)
389                 self.assertEqual(p[layer].dport, self.port_in)
390             else:
391                 self.assertEqual(p[layer].id, self.port_in)
392             self.assertEqual(data, p[Raw].load)
393
394     def reass_frags_and_verify(self, frags, src, dst):
395         buffer = BytesIO()
396         for p in frags:
397             self.assertEqual(p[IP].src, src)
398             self.assertEqual(p[IP].dst, dst)
399             self.assert_ip_checksum_valid(p)
400             buffer.seek(p[IP].frag * 8)
401             buffer.write(bytes(p[IP].payload))
402         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
403         if ip.proto == IP_PROTOS.tcp:
404             p = ip / TCP(buffer.getvalue())
405             self.logger.debug(ppp("Reassembled:", p))
406             self.assert_tcp_checksum_valid(p)
407         elif ip.proto == IP_PROTOS.udp:
408             p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
409         elif ip.proto == IP_PROTOS.icmp:
410             p = ip / ICMP(buffer.getvalue())
411         return p
412
413     def frag_in_order(
414         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
415     ):
416         layer = self.proto2layer(proto)
417
418         if proto == IP_PROTOS.tcp:
419             data = b"A" * 4 + b"B" * 16 + b"C" * 3
420         else:
421             data = b"A" * 16 + b"B" * 16 + b"C" * 3
422         self.port_in = self.random_port()
423
424         # in2out
425         pkts = self.create_stream_frag(
426             self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
427         )
428         self.pg0.add_stream(pkts)
429         self.pg_enable_capture(self.pg_interfaces)
430         self.pg_start()
431         frags = self.pg1.get_capture(len(pkts))
432         if not dont_translate:
433             p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
434         else:
435             p = self.reass_frags_and_verify(
436                 frags, self.pg0.remote_ip4, self.pg1.remote_ip4
437             )
438         if proto != IP_PROTOS.icmp:
439             if not dont_translate:
440                 self.assertEqual(p[layer].dport, 20)
441                 if not ignore_port:
442                     self.assertNotEqual(p[layer].sport, self.port_in)
443             else:
444                 self.assertEqual(p[layer].sport, self.port_in)
445         else:
446             if not ignore_port:
447                 if not dont_translate:
448                     self.assertNotEqual(p[layer].id, self.port_in)
449                 else:
450                     self.assertEqual(p[layer].id, self.port_in)
451         self.assertEqual(data, p[Raw].load)
452
453         # out2in
454         if not dont_translate:
455             dst_addr = self.nat_addr
456         else:
457             dst_addr = self.pg0.remote_ip4
458         if proto != IP_PROTOS.icmp:
459             sport = 20
460             dport = p[layer].sport
461         else:
462             sport = p[layer].id
463             dport = 0
464         pkts = self.create_stream_frag(
465             self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
466         )
467         self.pg1.add_stream(pkts)
468         self.pg_enable_capture(self.pg_interfaces)
469         self.pg_start()
470         frags = self.pg0.get_capture(len(pkts))
471         p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
472         if proto != IP_PROTOS.icmp:
473             self.assertEqual(p[layer].sport, 20)
474             self.assertEqual(p[layer].dport, self.port_in)
475         else:
476             self.assertEqual(p[layer].id, self.port_in)
477         self.assertEqual(data, p[Raw].load)
478
479     def verify_capture_out(
480         self, capture, nat_ip=None, same_port=False, dst_ip=None, ignore_port=False
481     ):
482         if nat_ip is None:
483             nat_ip = self.nat_addr
484         for packet in capture:
485             try:
486                 self.assert_packet_checksums_valid(packet)
487                 self.assertEqual(packet[IP].src, nat_ip)
488                 if dst_ip is not None:
489                     self.assertEqual(packet[IP].dst, dst_ip)
490                 if packet.haslayer(TCP):
491                     if not ignore_port:
492                         if same_port:
493                             self.assertEqual(packet[TCP].sport, self.tcp_port_in)
494                         else:
495                             self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
496                     self.tcp_port_out = packet[TCP].sport
497                     self.assert_packet_checksums_valid(packet)
498                 elif packet.haslayer(UDP):
499                     if not ignore_port:
500                         if same_port:
501                             self.assertEqual(packet[UDP].sport, self.udp_port_in)
502                         else:
503                             self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
504                     self.udp_port_out = packet[UDP].sport
505                 else:
506                     if not ignore_port:
507                         if same_port:
508                             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
509                         else:
510                             self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
511                     self.icmp_id_out = packet[ICMP].id
512                     self.assert_packet_checksums_valid(packet)
513             except:
514                 self.logger.error(
515                     ppp("Unexpected or invalid packet (outside network):", packet)
516                 )
517                 raise
518
519     def verify_capture_in(self, capture, in_if):
520         for packet in capture:
521             try:
522                 self.assert_packet_checksums_valid(packet)
523                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
524                 if packet.haslayer(TCP):
525                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
526                 elif packet.haslayer(UDP):
527                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
528                 else:
529                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
530             except:
531                 self.logger.error(
532                     ppp("Unexpected or invalid packet (inside network):", packet)
533                 )
534                 raise
535
536     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
537         if dst_ip is None:
538             dst_ip = out_if.remote_ip4
539
540         pkts = []
541         # TCP
542         p = (
543             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
544             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
545             / TCP(sport=self.tcp_port_in, dport=20)
546         )
547         pkts.extend([p, p])
548
549         # UDP
550         p = (
551             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
552             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
553             / UDP(sport=self.udp_port_in, dport=20)
554         )
555         pkts.append(p)
556
557         # ICMP
558         p = (
559             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
560             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
561             / ICMP(id=self.icmp_id_in, type="echo-request")
562         )
563         pkts.append(p)
564
565         return pkts
566
567     def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
568         if dst_ip is None:
569             dst_ip = self.nat_addr
570         if not use_inside_ports:
571             tcp_port = self.tcp_port_out
572             udp_port = self.udp_port_out
573             icmp_id = self.icmp_id_out
574         else:
575             tcp_port = self.tcp_port_in
576             udp_port = self.udp_port_in
577             icmp_id = self.icmp_id_in
578         pkts = []
579         # TCP
580         p = (
581             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
582             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
583             / TCP(dport=tcp_port, sport=20)
584         )
585         pkts.extend([p, p])
586
587         # UDP
588         p = (
589             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
590             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
591             / UDP(dport=udp_port, sport=20)
592         )
593         pkts.append(p)
594
595         # ICMP
596         p = (
597             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
598             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
599             / ICMP(id=icmp_id, type="echo-reply")
600         )
601         pkts.append(p)
602
603         return pkts
604
605     def create_tcp_stream(self, in_if, out_if, count):
606         pkts = []
607         port = 6303
608
609         for i in range(count):
610             p = (
611                 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
612                 / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64)
613                 / TCP(sport=port + i, dport=20)
614             )
615             pkts.append(p)
616
617         return pkts
618
619     def create_stream_frag(
620         self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
621     ):
622         if proto == IP_PROTOS.tcp:
623             p = (
624                 IP(src=src_if.remote_ip4, dst=dst)
625                 / TCP(sport=sport, dport=dport)
626                 / Raw(data)
627             )
628             p = p.__class__(scapy.compat.raw(p))
629             chksum = p[TCP].chksum
630             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
631         elif proto == IP_PROTOS.udp:
632             proto_header = UDP(sport=sport, dport=dport)
633         elif proto == IP_PROTOS.icmp:
634             if not echo_reply:
635                 proto_header = ICMP(id=sport, type="echo-request")
636             else:
637                 proto_header = ICMP(id=sport, type="echo-reply")
638         else:
639             raise Exception("Unsupported protocol")
640         id = self.random_port()
641         pkts = []
642         if proto == IP_PROTOS.tcp:
643             raw = Raw(data[0:4])
644         else:
645             raw = Raw(data[0:16])
646         p = (
647             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
648             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
649             / proto_header
650             / raw
651         )
652         pkts.append(p)
653         if proto == IP_PROTOS.tcp:
654             raw = Raw(data[4:20])
655         else:
656             raw = Raw(data[16:32])
657         p = (
658             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
659             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
660             / raw
661         )
662         pkts.append(p)
663         if proto == IP_PROTOS.tcp:
664             raw = Raw(data[20:])
665         else:
666             raw = Raw(data[32:])
667         p = (
668             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
669             / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
670             / raw
671         )
672         pkts.append(p)
673         return pkts
674
675     def frag_in_order_in_plus_out(
676         self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
677     ):
678
679         layer = self.proto2layer(proto)
680
681         if proto == IP_PROTOS.tcp:
682             data = b"A" * 4 + b"B" * 16 + b"C" * 3
683         else:
684             data = b"A" * 16 + b"B" * 16 + b"C" * 3
685         port_in = self.random_port()
686
687         for i in range(2):
688             # out2in
689             pkts = self.create_stream_frag(
690                 self.pg0, out_addr, port_in, out_port, data, proto
691             )
692             self.pg0.add_stream(pkts)
693             self.pg_enable_capture(self.pg_interfaces)
694             self.pg_start()
695             frags = self.pg1.get_capture(len(pkts))
696             p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr)
697             if proto != IP_PROTOS.icmp:
698                 self.assertEqual(p[layer].sport, port_in)
699                 self.assertEqual(p[layer].dport, in_port)
700             else:
701                 self.assertEqual(p[layer].id, port_in)
702             self.assertEqual(data, p[Raw].load)
703
704             # in2out
705             if proto != IP_PROTOS.icmp:
706                 pkts = self.create_stream_frag(
707                     self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto
708                 )
709             else:
710                 pkts = self.create_stream_frag(
711                     self.pg1,
712                     self.pg0.remote_ip4,
713                     p[layer].id,
714                     0,
715                     data,
716                     proto,
717                     echo_reply=True,
718                 )
719             self.pg1.add_stream(pkts)
720             self.pg_enable_capture(self.pg_interfaces)
721             self.pg_start()
722             frags = self.pg0.get_capture(len(pkts))
723             p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4)
724             if proto != IP_PROTOS.icmp:
725                 self.assertEqual(p[layer].sport, out_port)
726                 self.assertEqual(p[layer].dport, port_in)
727             else:
728                 self.assertEqual(p[layer].id, port_in)
729             self.assertEqual(data, p[Raw].load)
730
731     def frag_out_of_order_in_plus_out(
732         self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
733     ):
734
735         layer = self.proto2layer(proto)
736
737         if proto == IP_PROTOS.tcp:
738             data = b"A" * 4 + b"B" * 16 + b"C" * 3
739         else:
740             data = b"A" * 16 + b"B" * 16 + b"C" * 3
741         port_in = self.random_port()
742
743         for i in range(2):
744             # out2in
745             pkts = self.create_stream_frag(
746                 self.pg0, out_addr, port_in, out_port, data, proto
747             )
748             pkts.reverse()
749             self.pg0.add_stream(pkts)
750             self.pg_enable_capture(self.pg_interfaces)
751             self.pg_start()
752             frags = self.pg1.get_capture(len(pkts))
753             p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr)
754             if proto != IP_PROTOS.icmp:
755                 self.assertEqual(p[layer].dport, in_port)
756                 self.assertEqual(p[layer].sport, port_in)
757                 self.assertEqual(p[layer].dport, in_port)
758             else:
759                 self.assertEqual(p[layer].id, port_in)
760             self.assertEqual(data, p[Raw].load)
761
762             # in2out
763             if proto != IP_PROTOS.icmp:
764                 pkts = self.create_stream_frag(
765                     self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto
766                 )
767             else:
768                 pkts = self.create_stream_frag(
769                     self.pg1,
770                     self.pg0.remote_ip4,
771                     p[layer].id,
772                     0,
773                     data,
774                     proto,
775                     echo_reply=True,
776                 )
777             pkts.reverse()
778             self.pg1.add_stream(pkts)
779             self.pg_enable_capture(self.pg_interfaces)
780             self.pg_start()
781             frags = self.pg0.get_capture(len(pkts))
782             p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4)
783             if proto != IP_PROTOS.icmp:
784                 self.assertEqual(p[layer].sport, out_port)
785                 self.assertEqual(p[layer].dport, port_in)
786             else:
787                 self.assertEqual(p[layer].id, port_in)
788             self.assertEqual(data, p[Raw].load)
789
790     def init_tcp_session(self, in_if, out_if, in_port, ext_port):
791         # SYN packet in->out
792         p = (
793             Ether(src=in_if.remote_mac, dst=in_if.local_mac)
794             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
795             / TCP(sport=in_port, dport=ext_port, flags="S")
796         )
797         in_if.add_stream(p)
798         self.pg_enable_capture(self.pg_interfaces)
799         self.pg_start()
800         capture = out_if.get_capture(1)
801         p = capture[0]
802         out_port = p[TCP].sport
803
804         # SYN + ACK packet out->in
805         p = (
806             Ether(src=out_if.remote_mac, dst=out_if.local_mac)
807             / IP(src=out_if.remote_ip4, dst=self.nat_addr)
808             / TCP(sport=ext_port, dport=out_port, flags="SA")
809         )
810         out_if.add_stream(p)
811         self.pg_enable_capture(self.pg_interfaces)
812         self.pg_start()
813         in_if.get_capture(1)
814
815         # ACK packet in->out
816         p = (
817             Ether(src=in_if.remote_mac, dst=in_if.local_mac)
818             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
819             / TCP(sport=in_port, dport=ext_port, flags="A")
820         )
821         in_if.add_stream(p)
822         self.pg_enable_capture(self.pg_interfaces)
823         self.pg_start()
824         out_if.get_capture(1)
825
826         return out_port
827
828     def twice_nat_common(
829         self, self_twice_nat=False, same_pg=False, lb=False, client_id=None
830     ):
831         twice_nat_addr = "10.0.1.3"
832
833         port_in = 8080
834         if lb:
835             if not same_pg:
836                 port_in1 = port_in
837                 port_in2 = port_in
838             else:
839                 port_in1 = port_in + 1
840                 port_in2 = port_in + 2
841
842         port_out = 80
843         eh_port_out = 4567
844
845         server1 = self.pg0.remote_hosts[0]
846         server2 = self.pg0.remote_hosts[1]
847         if lb and same_pg:
848             server2 = server1
849         if not lb:
850             server = server1
851
852         pg0 = self.pg0
853         if same_pg:
854             pg1 = self.pg0
855         else:
856             pg1 = self.pg1
857
858         eh_translate = (not self_twice_nat) or (not lb and same_pg) or client_id == 1
859
860         self.nat_add_address(self.nat_addr)
861         self.nat_add_address(twice_nat_addr, twice_nat=1)
862
863         flags = 0
864         if self_twice_nat:
865             flags |= self.config_flags.NAT_IS_SELF_TWICE_NAT
866         else:
867             flags |= self.config_flags.NAT_IS_TWICE_NAT
868
869         if not lb:
870             self.nat_add_static_mapping(
871                 pg0.remote_ip4,
872                 self.nat_addr,
873                 port_in,
874                 port_out,
875                 proto=IP_PROTOS.tcp,
876                 flags=flags,
877             )
878         else:
879             locals = [
880                 {"addr": server1.ip4, "port": port_in1, "probability": 50, "vrf_id": 0},
881                 {"addr": server2.ip4, "port": port_in2, "probability": 50, "vrf_id": 0},
882             ]
883             out_addr = self.nat_addr
884
885             self.vapi.nat44_add_del_lb_static_mapping(
886                 is_add=1,
887                 flags=flags,
888                 external_addr=out_addr,
889                 external_port=port_out,
890                 protocol=IP_PROTOS.tcp,
891                 local_num=len(locals),
892                 locals=locals,
893             )
894         self.nat_add_inside_interface(pg0)
895         self.nat_add_outside_interface(pg1)
896
897         if same_pg:
898             if not lb:
899                 client = server
900             else:
901                 assert client_id is not None
902                 if client_id == 1:
903                     client = self.pg0.remote_hosts[0]
904                 elif client_id == 2:
905                     client = self.pg0.remote_hosts[1]
906         else:
907             client = pg1.remote_hosts[0]
908         p = (
909             Ether(src=pg1.remote_mac, dst=pg1.local_mac)
910             / IP(src=client.ip4, dst=self.nat_addr)
911             / TCP(sport=eh_port_out, dport=port_out)
912         )
913         pg1.add_stream(p)
914         self.pg_enable_capture(self.pg_interfaces)
915         self.pg_start()
916         capture = pg0.get_capture(1)
917         p = capture[0]
918         try:
919             ip = p[IP]
920             tcp = p[TCP]
921             if lb:
922                 if ip.dst == server1.ip4:
923                     server = server1
924                     port_in = port_in1
925                 else:
926                     server = server2
927                     port_in = port_in2
928             self.assertEqual(ip.dst, server.ip4)
929             if lb and same_pg:
930                 self.assertIn(tcp.dport, [port_in1, port_in2])
931             else:
932                 self.assertEqual(tcp.dport, port_in)
933             if eh_translate:
934                 self.assertEqual(ip.src, twice_nat_addr)
935                 self.assertNotEqual(tcp.sport, eh_port_out)
936             else:
937                 self.assertEqual(ip.src, client.ip4)
938                 self.assertEqual(tcp.sport, eh_port_out)
939             eh_addr_in = ip.src
940             eh_port_in = tcp.sport
941             saved_port_in = tcp.dport
942             self.assert_packet_checksums_valid(p)
943         except:
944             self.logger.error(ppp("Unexpected or invalid packet:", p))
945             raise
946
947         p = (
948             Ether(src=server.mac, dst=pg0.local_mac)
949             / IP(src=server.ip4, dst=eh_addr_in)
950             / TCP(sport=saved_port_in, dport=eh_port_in)
951         )
952         pg0.add_stream(p)
953         self.pg_enable_capture(self.pg_interfaces)
954         self.pg_start()
955         capture = pg1.get_capture(1)
956         p = capture[0]
957         try:
958             ip = p[IP]
959             tcp = p[TCP]
960             self.assertEqual(ip.dst, client.ip4)
961             self.assertEqual(ip.src, self.nat_addr)
962             self.assertEqual(tcp.dport, eh_port_out)
963             self.assertEqual(tcp.sport, port_out)
964             self.assert_packet_checksums_valid(p)
965         except:
966             self.logger.error(ppp("Unexpected or invalid packet:", p))
967             raise
968
969         if eh_translate:
970             sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
971             self.assertEqual(len(sessions), 1)
972             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
973             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_TWICE_NAT)
974             self.logger.info(self.vapi.cli("show nat44 sessions"))
975             self.vapi.nat44_del_session(
976                 address=sessions[0].inside_ip_address,
977                 port=sessions[0].inside_port,
978                 protocol=sessions[0].protocol,
979                 flags=(
980                     self.config_flags.NAT_IS_INSIDE
981                     | self.config_flags.NAT_IS_EXT_HOST_VALID
982                 ),
983                 ext_host_address=sessions[0].ext_host_nat_address,
984                 ext_host_port=sessions[0].ext_host_nat_port,
985             )
986             sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
987             self.assertEqual(len(sessions), 0)
988
989     def verify_syslog_sess(self, data, msgid, is_ip6=False):
990         message = data.decode("utf-8")
991         try:
992             message = SyslogMessage.parse(message)
993         except ParseError as e:
994             self.logger.error(e)
995             raise
996         else:
997             self.assertEqual(message.severity, SyslogSeverity.info)
998             self.assertEqual(message.appname, "NAT")
999             self.assertEqual(message.msgid, msgid)
1000             sd_params = message.sd.get("nsess")
1001             self.assertTrue(sd_params is not None)
1002             if is_ip6:
1003                 self.assertEqual(sd_params.get("IATYP"), "IPv6")
1004                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip6)
1005             else:
1006                 self.assertEqual(sd_params.get("IATYP"), "IPv4")
1007                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
1008                 self.assertTrue(sd_params.get("SSUBIX") is not None)
1009             self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
1010             self.assertEqual(sd_params.get("XATYP"), "IPv4")
1011             self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
1012             self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
1013             self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
1014             self.assertEqual(sd_params.get("SVLAN"), "0")
1015             self.assertEqual(sd_params.get("XDADDR"), self.pg1.remote_ip4)
1016             self.assertEqual(sd_params.get("XDPORT"), "%d" % self.tcp_external_port)
1017
1018     def test_icmp_error(self):
1019         """NAT44ED test ICMP error message with inner header"""
1020
1021         payload = "H" * 10
1022
1023         self.nat_add_address(self.nat_addr)
1024         self.nat_add_inside_interface(self.pg0)
1025         self.nat_add_outside_interface(self.pg1)
1026
1027         # in2out (initiate connection)
1028         p1 = [
1029             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1030             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1031             / UDP(sport=21, dport=20)
1032             / payload,
1033             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1034             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1035             / TCP(sport=21, dport=20, flags="S")
1036             / payload,
1037             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1038             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1039             / ICMP(type="echo-request", id=7777)
1040             / payload,
1041         ]
1042
1043         capture = self.send_and_expect(self.pg0, p1, self.pg1)
1044
1045         # out2in (send error message)
1046         p2 = [
1047             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1048             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1049             / ICMP(type="dest-unreach", code="port-unreachable")
1050             / c[IP:]
1051             for c in capture
1052         ]
1053
1054         capture = self.send_and_expect(self.pg1, p2, self.pg0)
1055
1056         for c in capture:
1057             try:
1058                 assert c[IP].dst == self.pg0.remote_ip4
1059                 assert c[IPerror].src == self.pg0.remote_ip4
1060             except AssertionError as a:
1061                 raise AssertionError(f"Packet {pr(c)} not translated properly") from a
1062
1063     def test_icmp_echo_reply_trailer(self):
1064         """ICMP echo reply with ethernet trailer"""
1065
1066         self.nat_add_address(self.nat_addr)
1067         self.nat_add_inside_interface(self.pg0)
1068         self.nat_add_outside_interface(self.pg1)
1069
1070         # in2out
1071         p1 = (
1072             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1073             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1074             / ICMP(type=8, id=0xABCD, seq=0)
1075         )
1076
1077         self.pg0.add_stream(p1)
1078         self.pg_enable_capture(self.pg_interfaces)
1079         self.pg_start()
1080         c = self.pg1.get_capture(1)[0]
1081
1082         self.logger.debug(self.vapi.cli("show trace"))
1083
1084         # out2in
1085         p2 = (
1086             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1087             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, id=0xEE59)
1088             / ICMP(type=0, id=c[ICMP].id, seq=0)
1089         )
1090
1091         # force checksum calculation
1092         p2 = p2.__class__(bytes(p2))
1093
1094         self.logger.debug(ppp("Packet before modification:", p2))
1095
1096         # hex representation of vss monitoring ethernet trailer
1097         # this seems to be just added to end of packet without modifying
1098         # IP or ICMP lengths / checksums
1099         p2 = p2 / Raw("\x00\x00\x52\x54\x00\x46\xab\x04\x84\x18")
1100         # change it so that IP/ICMP is unaffected
1101         p2[IP].len = 28
1102
1103         self.logger.debug(ppp("Packet with added trailer:", p2))
1104
1105         self.pg1.add_stream(p2)
1106         self.pg_enable_capture(self.pg_interfaces)
1107         self.pg_start()
1108
1109         self.pg0.get_capture(1)
1110
1111     def test_users_dump(self):
1112         """NAT44ED API test - nat44_user_dump"""
1113
1114         self.nat_add_address(self.nat_addr)
1115         self.nat_add_inside_interface(self.pg0)
1116         self.nat_add_outside_interface(self.pg1)
1117
1118         self.vapi.nat44_forwarding_enable_disable(enable=1)
1119
1120         local_ip = self.pg0.remote_ip4
1121         external_ip = self.nat_addr
1122         self.nat_add_static_mapping(local_ip, external_ip)
1123
1124         users = self.vapi.nat44_user_dump()
1125         self.assertEqual(len(users), 0)
1126
1127         # in2out - static mapping match
1128
1129         pkts = self.create_stream_out(self.pg1)
1130         self.pg1.add_stream(pkts)
1131         self.pg_enable_capture(self.pg_interfaces)
1132         self.pg_start()
1133         capture = self.pg0.get_capture(len(pkts))
1134         self.verify_capture_in(capture, self.pg0)
1135
1136         pkts = self.create_stream_in(self.pg0, self.pg1)
1137         self.pg0.add_stream(pkts)
1138         self.pg_enable_capture(self.pg_interfaces)
1139         self.pg_start()
1140         capture = self.pg1.get_capture(len(pkts))
1141         self.verify_capture_out(capture, same_port=True)
1142
1143         users = self.vapi.nat44_user_dump()
1144         self.assertEqual(len(users), 1)
1145         static_user = users[0]
1146         self.assertEqual(static_user.nstaticsessions, 3)
1147         self.assertEqual(static_user.nsessions, 0)
1148
1149         # in2out - no static mapping match (forwarding test)
1150
1151         host0 = self.pg0.remote_hosts[0]
1152         self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1153         try:
1154             pkts = self.create_stream_out(
1155                 self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1156             )
1157             self.pg1.add_stream(pkts)
1158             self.pg_enable_capture(self.pg_interfaces)
1159             self.pg_start()
1160             capture = self.pg0.get_capture(len(pkts))
1161             self.verify_capture_in(capture, self.pg0)
1162
1163             pkts = self.create_stream_in(self.pg0, self.pg1)
1164             self.pg0.add_stream(pkts)
1165             self.pg_enable_capture(self.pg_interfaces)
1166             self.pg_start()
1167             capture = self.pg1.get_capture(len(pkts))
1168             self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, same_port=True)
1169         finally:
1170             self.pg0.remote_hosts[0] = host0
1171
1172         users = self.vapi.nat44_user_dump()
1173         self.assertEqual(len(users), 2)
1174         if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4:
1175             non_static_user = users[1]
1176             static_user = users[0]
1177         else:
1178             non_static_user = users[0]
1179             static_user = users[1]
1180         self.assertEqual(static_user.nstaticsessions, 3)
1181         self.assertEqual(static_user.nsessions, 0)
1182         self.assertEqual(non_static_user.nstaticsessions, 0)
1183         self.assertEqual(non_static_user.nsessions, 3)
1184
1185         users = self.vapi.nat44_user_dump()
1186         self.assertEqual(len(users), 2)
1187         if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4:
1188             non_static_user = users[1]
1189             static_user = users[0]
1190         else:
1191             non_static_user = users[0]
1192             static_user = users[1]
1193         self.assertEqual(static_user.nstaticsessions, 3)
1194         self.assertEqual(static_user.nsessions, 0)
1195         self.assertEqual(non_static_user.nstaticsessions, 0)
1196         self.assertEqual(non_static_user.nsessions, 3)
1197
1198     def test_frag_out_of_order_do_not_translate(self):
1199         """NAT44ED don't translate fragments arriving out of order"""
1200         self.nat_add_inside_interface(self.pg0)
1201         self.nat_add_outside_interface(self.pg1)
1202         self.vapi.nat44_forwarding_enable_disable(enable=True)
1203         self.frag_out_of_order(proto=IP_PROTOS.tcp, dont_translate=True)
1204
1205     def test_forwarding(self):
1206         """NAT44ED forwarding test"""
1207
1208         self.nat_add_inside_interface(self.pg0)
1209         self.nat_add_outside_interface(self.pg1)
1210         self.vapi.nat44_forwarding_enable_disable(enable=1)
1211
1212         real_ip = self.pg0.remote_ip4
1213         alias_ip = self.nat_addr
1214         flags = self.config_flags.NAT_IS_ADDR_ONLY
1215         self.vapi.nat44_add_del_static_mapping(
1216             is_add=1,
1217             local_ip_address=real_ip,
1218             external_ip_address=alias_ip,
1219             external_sw_if_index=0xFFFFFFFF,
1220             flags=flags,
1221         )
1222
1223         try:
1224             # in2out - static mapping match
1225
1226             pkts = self.create_stream_out(self.pg1)
1227             self.pg1.add_stream(pkts)
1228             self.pg_enable_capture(self.pg_interfaces)
1229             self.pg_start()
1230             capture = self.pg0.get_capture(len(pkts))
1231             self.verify_capture_in(capture, self.pg0)
1232
1233             pkts = self.create_stream_in(self.pg0, self.pg1)
1234             self.pg0.add_stream(pkts)
1235             self.pg_enable_capture(self.pg_interfaces)
1236             self.pg_start()
1237             capture = self.pg1.get_capture(len(pkts))
1238             self.verify_capture_out(capture, same_port=True)
1239
1240             # in2out - no static mapping match
1241
1242             host0 = self.pg0.remote_hosts[0]
1243             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1244             try:
1245                 pkts = self.create_stream_out(
1246                     self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1247                 )
1248                 self.pg1.add_stream(pkts)
1249                 self.pg_enable_capture(self.pg_interfaces)
1250                 self.pg_start()
1251                 capture = self.pg0.get_capture(len(pkts))
1252                 self.verify_capture_in(capture, self.pg0)
1253
1254                 pkts = self.create_stream_in(self.pg0, self.pg1)
1255                 self.pg0.add_stream(pkts)
1256                 self.pg_enable_capture(self.pg_interfaces)
1257                 self.pg_start()
1258                 capture = self.pg1.get_capture(len(pkts))
1259                 self.verify_capture_out(
1260                     capture, nat_ip=self.pg0.remote_ip4, same_port=True
1261                 )
1262             finally:
1263                 self.pg0.remote_hosts[0] = host0
1264
1265             user = self.pg0.remote_hosts[1]
1266             sessions = self.vapi.nat44_user_session_dump(user.ip4, 0)
1267             self.assertEqual(len(sessions), 3)
1268             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1269             self.vapi.nat44_del_session(
1270                 address=sessions[0].inside_ip_address,
1271                 port=sessions[0].inside_port,
1272                 protocol=sessions[0].protocol,
1273                 flags=(
1274                     self.config_flags.NAT_IS_INSIDE
1275                     | self.config_flags.NAT_IS_EXT_HOST_VALID
1276                 ),
1277                 ext_host_address=sessions[0].ext_host_address,
1278                 ext_host_port=sessions[0].ext_host_port,
1279             )
1280             sessions = self.vapi.nat44_user_session_dump(user.ip4, 0)
1281             self.assertEqual(len(sessions), 2)
1282
1283         finally:
1284             self.vapi.nat44_forwarding_enable_disable(enable=0)
1285             flags = self.config_flags.NAT_IS_ADDR_ONLY
1286             self.vapi.nat44_add_del_static_mapping(
1287                 is_add=0,
1288                 local_ip_address=real_ip,
1289                 external_ip_address=alias_ip,
1290                 external_sw_if_index=0xFFFFFFFF,
1291                 flags=flags,
1292             )
1293
1294     def test_output_feature_and_service2(self):
1295         """NAT44ED interface output feature and service host direct access"""
1296         self.vapi.nat44_forwarding_enable_disable(enable=1)
1297         self.nat_add_address(self.nat_addr)
1298
1299         self.vapi.nat44_ed_add_del_output_interface(
1300             sw_if_index=self.pg1.sw_if_index, is_add=1
1301         )
1302
1303         # session initiated from service host - translate
1304         pkts = self.create_stream_in(self.pg0, self.pg1)
1305         self.pg0.add_stream(pkts)
1306         self.pg_enable_capture(self.pg_interfaces)
1307         self.pg_start()
1308         capture = self.pg1.get_capture(len(pkts))
1309         self.verify_capture_out(capture, ignore_port=True)
1310
1311         pkts = self.create_stream_out(self.pg1)
1312         self.pg1.add_stream(pkts)
1313         self.pg_enable_capture(self.pg_interfaces)
1314         self.pg_start()
1315         capture = self.pg0.get_capture(len(pkts))
1316         self.verify_capture_in(capture, self.pg0)
1317
1318         # session initiated from remote host - do not translate
1319         tcp_port_in = self.tcp_port_in
1320         udp_port_in = self.udp_port_in
1321         icmp_id_in = self.icmp_id_in
1322
1323         self.tcp_port_in = 60303
1324         self.udp_port_in = 60304
1325         self.icmp_id_in = 60305
1326
1327         try:
1328             pkts = self.create_stream_out(
1329                 self.pg1, self.pg0.remote_ip4, use_inside_ports=True
1330             )
1331             self.pg1.add_stream(pkts)
1332             self.pg_enable_capture(self.pg_interfaces)
1333             self.pg_start()
1334             capture = self.pg0.get_capture(len(pkts))
1335             self.verify_capture_in(capture, self.pg0)
1336
1337             pkts = self.create_stream_in(self.pg0, self.pg1)
1338             self.pg0.add_stream(pkts)
1339             self.pg_enable_capture(self.pg_interfaces)
1340             self.pg_start()
1341             capture = self.pg1.get_capture(len(pkts))
1342             self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, same_port=True)
1343         finally:
1344             self.tcp_port_in = tcp_port_in
1345             self.udp_port_in = udp_port_in
1346             self.icmp_id_in = icmp_id_in
1347
1348     def test_twice_nat(self):
1349         """NAT44ED Twice NAT"""
1350         self.twice_nat_common()
1351
1352     def test_self_twice_nat_positive(self):
1353         """NAT44ED Self Twice NAT (positive test)"""
1354         self.twice_nat_common(self_twice_nat=True, same_pg=True)
1355
1356     def test_self_twice_nat_lb_positive(self):
1357         """NAT44ED Self Twice NAT local service load balancing (positive test)"""
1358         self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, client_id=1)
1359
1360     def test_twice_nat_lb(self):
1361         """NAT44ED Twice NAT local service load balancing"""
1362         self.twice_nat_common(lb=True)
1363
1364     def test_output_feature(self):
1365         """NAT44ED interface output feature (in2out postrouting)"""
1366         self.vapi.nat44_forwarding_enable_disable(enable=1)
1367         self.nat_add_address(self.nat_addr)
1368
1369         self.nat_add_outside_interface(self.pg0)
1370         self.vapi.nat44_ed_add_del_output_interface(
1371             sw_if_index=self.pg1.sw_if_index, is_add=1
1372         )
1373
1374         # in2out
1375         pkts = self.create_stream_in(self.pg0, self.pg1)
1376         self.pg0.add_stream(pkts)
1377         self.pg_enable_capture(self.pg_interfaces)
1378         self.pg_start()
1379         capture = self.pg1.get_capture(len(pkts))
1380         self.verify_capture_out(capture, ignore_port=True)
1381         self.logger.debug(self.vapi.cli("show trace"))
1382
1383         # out2in
1384         pkts = self.create_stream_out(self.pg1)
1385         self.pg1.add_stream(pkts)
1386         self.pg_enable_capture(self.pg_interfaces)
1387         self.pg_start()
1388         capture = self.pg0.get_capture(len(pkts))
1389         self.verify_capture_in(capture, self.pg0)
1390         self.logger.debug(self.vapi.cli("show trace"))
1391
1392         # in2out
1393         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1394         self.pg0.add_stream(pkts)
1395         self.pg_enable_capture(self.pg_interfaces)
1396         self.pg_start()
1397         capture = self.pg1.get_capture(len(pkts))
1398         self.verify_capture_out(capture, ignore_port=True)
1399         self.logger.debug(self.vapi.cli("show trace"))
1400
1401         # out2in
1402         pkts = self.create_stream_out(self.pg1, ttl=2)
1403         self.pg1.add_stream(pkts)
1404         self.pg_enable_capture(self.pg_interfaces)
1405         self.pg_start()
1406         capture = self.pg0.get_capture(len(pkts))
1407         self.verify_capture_in(capture, self.pg0)
1408         self.logger.debug(self.vapi.cli("show trace"))
1409
1410         # in2out
1411         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1412         capture = self.send_and_expect_some(self.pg0, pkts, self.pg0)
1413         for p in capture:
1414             self.assertIn(ICMP, p)
1415             self.assertEqual(p[ICMP].type, 11)  # 11 == time-exceeded
1416
1417     def test_static_with_port_out2(self):
1418         """NAT44ED 1:1 NAPT asymmetrical rule"""
1419
1420         external_port = 80
1421         local_port = 8080
1422
1423         self.vapi.nat44_forwarding_enable_disable(enable=1)
1424         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1425         self.nat_add_static_mapping(
1426             self.pg0.remote_ip4,
1427             self.nat_addr,
1428             local_port,
1429             external_port,
1430             proto=IP_PROTOS.tcp,
1431             flags=flags,
1432         )
1433
1434         self.nat_add_inside_interface(self.pg0)
1435         self.nat_add_outside_interface(self.pg1)
1436
1437         # from client to service
1438         p = (
1439             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1440             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1441             / TCP(sport=12345, dport=external_port)
1442         )
1443         self.pg1.add_stream(p)
1444         self.pg_enable_capture(self.pg_interfaces)
1445         self.pg_start()
1446         capture = self.pg0.get_capture(1)
1447         p = capture[0]
1448         try:
1449             ip = p[IP]
1450             tcp = p[TCP]
1451             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1452             self.assertEqual(tcp.dport, local_port)
1453             self.assert_packet_checksums_valid(p)
1454         except:
1455             self.logger.error(ppp("Unexpected or invalid packet:", p))
1456             raise
1457
1458         # ICMP error
1459         p = (
1460             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1461             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1462             / ICMP(type=11)
1463             / capture[0][IP]
1464         )
1465         self.pg0.add_stream(p)
1466         self.pg_enable_capture(self.pg_interfaces)
1467         self.pg_start()
1468         capture = self.pg1.get_capture(1)
1469         p = capture[0]
1470         try:
1471             self.assertEqual(p[IP].src, self.nat_addr)
1472             inner = p[IPerror]
1473             self.assertEqual(inner.dst, self.nat_addr)
1474             self.assertEqual(inner[TCPerror].dport, external_port)
1475         except:
1476             self.logger.error(ppp("Unexpected or invalid packet:", p))
1477             raise
1478
1479         # from service back to client
1480         p = (
1481             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1482             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1483             / TCP(sport=local_port, dport=12345)
1484         )
1485         self.pg0.add_stream(p)
1486         self.pg_enable_capture(self.pg_interfaces)
1487         self.pg_start()
1488         capture = self.pg1.get_capture(1)
1489         p = capture[0]
1490         try:
1491             ip = p[IP]
1492             tcp = p[TCP]
1493             self.assertEqual(ip.src, self.nat_addr)
1494             self.assertEqual(tcp.sport, external_port)
1495             self.assert_packet_checksums_valid(p)
1496         except:
1497             self.logger.error(ppp("Unexpected or invalid packet:", p))
1498             raise
1499
1500         # ICMP error
1501         p = (
1502             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1503             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1504             / ICMP(type=11)
1505             / capture[0][IP]
1506         )
1507         self.pg1.add_stream(p)
1508         self.pg_enable_capture(self.pg_interfaces)
1509         self.pg_start()
1510         capture = self.pg0.get_capture(1)
1511         p = capture[0]
1512         try:
1513             self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1514             inner = p[IPerror]
1515             self.assertEqual(inner.src, self.pg0.remote_ip4)
1516             self.assertEqual(inner[TCPerror].sport, local_port)
1517         except:
1518             self.logger.error(ppp("Unexpected or invalid packet:", p))
1519             raise
1520
1521         # from client to server (no translation)
1522         p = (
1523             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1524             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
1525             / TCP(sport=12346, dport=local_port)
1526         )
1527         self.pg1.add_stream(p)
1528         self.pg_enable_capture(self.pg_interfaces)
1529         self.pg_start()
1530         capture = self.pg0.get_capture(1)
1531         p = capture[0]
1532         try:
1533             ip = p[IP]
1534             tcp = p[TCP]
1535             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1536             self.assertEqual(tcp.dport, local_port)
1537             self.assert_packet_checksums_valid(p)
1538         except:
1539             self.logger.error(ppp("Unexpected or invalid packet:", p))
1540             raise
1541
1542         # from service back to client (no translation)
1543         p = (
1544             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1545             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1546             / TCP(sport=local_port, dport=12346)
1547         )
1548         self.pg0.add_stream(p)
1549         self.pg_enable_capture(self.pg_interfaces)
1550         self.pg_start()
1551         capture = self.pg1.get_capture(1)
1552         p = capture[0]
1553         try:
1554             ip = p[IP]
1555             tcp = p[TCP]
1556             self.assertEqual(ip.src, self.pg0.remote_ip4)
1557             self.assertEqual(tcp.sport, local_port)
1558             self.assert_packet_checksums_valid(p)
1559         except:
1560             self.logger.error(ppp("Unexpected or invalid packet:", p))
1561             raise
1562
1563     def test_static_lb(self):
1564         """NAT44ED local service load balancing"""
1565         external_addr_n = self.nat_addr
1566         external_port = 80
1567         local_port = 8080
1568         server1 = self.pg0.remote_hosts[0]
1569         server2 = self.pg0.remote_hosts[1]
1570
1571         locals = [
1572             {"addr": server1.ip4, "port": local_port, "probability": 70, "vrf_id": 0},
1573             {"addr": server2.ip4, "port": local_port, "probability": 30, "vrf_id": 0},
1574         ]
1575
1576         self.nat_add_address(self.nat_addr)
1577         self.vapi.nat44_add_del_lb_static_mapping(
1578             is_add=1,
1579             external_addr=external_addr_n,
1580             external_port=external_port,
1581             protocol=IP_PROTOS.tcp,
1582             local_num=len(locals),
1583             locals=locals,
1584         )
1585         flags = self.config_flags.NAT_IS_INSIDE
1586         self.vapi.nat44_interface_add_del_feature(
1587             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1588         )
1589         self.vapi.nat44_interface_add_del_feature(
1590             sw_if_index=self.pg1.sw_if_index, is_add=1
1591         )
1592
1593         # from client to service
1594         p = (
1595             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1596             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1597             / TCP(sport=12345, dport=external_port)
1598         )
1599         self.pg1.add_stream(p)
1600         self.pg_enable_capture(self.pg_interfaces)
1601         self.pg_start()
1602         capture = self.pg0.get_capture(1)
1603         p = capture[0]
1604         server = None
1605         try:
1606             ip = p[IP]
1607             tcp = p[TCP]
1608             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1609             if ip.dst == server1.ip4:
1610                 server = server1
1611             else:
1612                 server = server2
1613             self.assertEqual(tcp.dport, local_port)
1614             self.assert_packet_checksums_valid(p)
1615         except:
1616             self.logger.error(ppp("Unexpected or invalid packet:", p))
1617             raise
1618
1619         # from service back to client
1620         p = (
1621             Ether(src=server.mac, dst=self.pg0.local_mac)
1622             / IP(src=server.ip4, dst=self.pg1.remote_ip4)
1623             / TCP(sport=local_port, dport=12345)
1624         )
1625         self.pg0.add_stream(p)
1626         self.pg_enable_capture(self.pg_interfaces)
1627         self.pg_start()
1628         capture = self.pg1.get_capture(1)
1629         p = capture[0]
1630         try:
1631             ip = p[IP]
1632             tcp = p[TCP]
1633             self.assertEqual(ip.src, self.nat_addr)
1634             self.assertEqual(tcp.sport, external_port)
1635             self.assert_packet_checksums_valid(p)
1636         except:
1637             self.logger.error(ppp("Unexpected or invalid packet:", p))
1638             raise
1639
1640         sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
1641         self.assertEqual(len(sessions), 1)
1642         self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1643         self.vapi.nat44_del_session(
1644             address=sessions[0].inside_ip_address,
1645             port=sessions[0].inside_port,
1646             protocol=sessions[0].protocol,
1647             flags=(
1648                 self.config_flags.NAT_IS_INSIDE
1649                 | self.config_flags.NAT_IS_EXT_HOST_VALID
1650             ),
1651             ext_host_address=sessions[0].ext_host_address,
1652             ext_host_port=sessions[0].ext_host_port,
1653         )
1654         sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
1655         self.assertEqual(len(sessions), 0)
1656
1657     def test_static_lb_2(self):
1658         """NAT44ED local service load balancing (asymmetrical rule)"""
1659         external_addr = self.nat_addr
1660         external_port = 80
1661         local_port = 8080
1662         server1 = self.pg0.remote_hosts[0]
1663         server2 = self.pg0.remote_hosts[1]
1664
1665         locals = [
1666             {"addr": server1.ip4, "port": local_port, "probability": 70, "vrf_id": 0},
1667             {"addr": server2.ip4, "port": local_port, "probability": 30, "vrf_id": 0},
1668         ]
1669
1670         self.vapi.nat44_forwarding_enable_disable(enable=1)
1671         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1672         self.vapi.nat44_add_del_lb_static_mapping(
1673             is_add=1,
1674             flags=flags,
1675             external_addr=external_addr,
1676             external_port=external_port,
1677             protocol=IP_PROTOS.tcp,
1678             local_num=len(locals),
1679             locals=locals,
1680         )
1681         flags = self.config_flags.NAT_IS_INSIDE
1682         self.vapi.nat44_interface_add_del_feature(
1683             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1684         )
1685         self.vapi.nat44_interface_add_del_feature(
1686             sw_if_index=self.pg1.sw_if_index, is_add=1
1687         )
1688
1689         # from client to service
1690         p = (
1691             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1692             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1693             / TCP(sport=12345, dport=external_port)
1694         )
1695         self.pg1.add_stream(p)
1696         self.pg_enable_capture(self.pg_interfaces)
1697         self.pg_start()
1698         capture = self.pg0.get_capture(1)
1699         p = capture[0]
1700         server = None
1701         try:
1702             ip = p[IP]
1703             tcp = p[TCP]
1704             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1705             if ip.dst == server1.ip4:
1706                 server = server1
1707             else:
1708                 server = server2
1709             self.assertEqual(tcp.dport, local_port)
1710             self.assert_packet_checksums_valid(p)
1711         except:
1712             self.logger.error(ppp("Unexpected or invalid packet:", p))
1713             raise
1714
1715         # from service back to client
1716         p = (
1717             Ether(src=server.mac, dst=self.pg0.local_mac)
1718             / IP(src=server.ip4, dst=self.pg1.remote_ip4)
1719             / TCP(sport=local_port, dport=12345)
1720         )
1721         self.pg0.add_stream(p)
1722         self.pg_enable_capture(self.pg_interfaces)
1723         self.pg_start()
1724         capture = self.pg1.get_capture(1)
1725         p = capture[0]
1726         try:
1727             ip = p[IP]
1728             tcp = p[TCP]
1729             self.assertEqual(ip.src, self.nat_addr)
1730             self.assertEqual(tcp.sport, external_port)
1731             self.assert_packet_checksums_valid(p)
1732         except:
1733             self.logger.error(ppp("Unexpected or invalid packet:", p))
1734             raise
1735
1736         # from client to server (no translation)
1737         p = (
1738             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1739             / IP(src=self.pg1.remote_ip4, dst=server1.ip4)
1740             / TCP(sport=12346, dport=local_port)
1741         )
1742         self.pg1.add_stream(p)
1743         self.pg_enable_capture(self.pg_interfaces)
1744         self.pg_start()
1745         capture = self.pg0.get_capture(1)
1746         p = capture[0]
1747         server = None
1748         try:
1749             ip = p[IP]
1750             tcp = p[TCP]
1751             self.assertEqual(ip.dst, server1.ip4)
1752             self.assertEqual(tcp.dport, local_port)
1753             self.assert_packet_checksums_valid(p)
1754         except:
1755             self.logger.error(ppp("Unexpected or invalid packet:", p))
1756             raise
1757
1758         # from service back to client (no translation)
1759         p = (
1760             Ether(src=server1.mac, dst=self.pg0.local_mac)
1761             / IP(src=server1.ip4, dst=self.pg1.remote_ip4)
1762             / TCP(sport=local_port, dport=12346)
1763         )
1764         self.pg0.add_stream(p)
1765         self.pg_enable_capture(self.pg_interfaces)
1766         self.pg_start()
1767         capture = self.pg1.get_capture(1)
1768         p = capture[0]
1769         try:
1770             ip = p[IP]
1771             tcp = p[TCP]
1772             self.assertEqual(ip.src, server1.ip4)
1773             self.assertEqual(tcp.sport, local_port)
1774             self.assert_packet_checksums_valid(p)
1775         except:
1776             self.logger.error(ppp("Unexpected or invalid packet:", p))
1777             raise
1778
1779     def test_lb_affinity(self):
1780         """NAT44ED local service load balancing affinity"""
1781         external_addr = self.nat_addr
1782         external_port = 80
1783         local_port = 8080
1784         server1 = self.pg0.remote_hosts[0]
1785         server2 = self.pg0.remote_hosts[1]
1786
1787         locals = [
1788             {"addr": server1.ip4, "port": local_port, "probability": 50, "vrf_id": 0},
1789             {"addr": server2.ip4, "port": local_port, "probability": 50, "vrf_id": 0},
1790         ]
1791
1792         self.nat_add_address(self.nat_addr)
1793         self.vapi.nat44_add_del_lb_static_mapping(
1794             is_add=1,
1795             external_addr=external_addr,
1796             external_port=external_port,
1797             protocol=IP_PROTOS.tcp,
1798             affinity=10800,
1799             local_num=len(locals),
1800             locals=locals,
1801         )
1802         flags = self.config_flags.NAT_IS_INSIDE
1803         self.vapi.nat44_interface_add_del_feature(
1804             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1805         )
1806         self.vapi.nat44_interface_add_del_feature(
1807             sw_if_index=self.pg1.sw_if_index, is_add=1
1808         )
1809
1810         p = (
1811             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1812             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1813             / TCP(sport=1025, dport=external_port)
1814         )
1815         self.pg1.add_stream(p)
1816         self.pg_enable_capture(self.pg_interfaces)
1817         self.pg_start()
1818         capture = self.pg0.get_capture(1)
1819         backend = capture[0][IP].dst
1820
1821         sessions = self.vapi.nat44_user_session_dump(backend, 0)
1822         self.assertEqual(len(sessions), 1)
1823         self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1824         self.vapi.nat44_del_session(
1825             address=sessions[0].inside_ip_address,
1826             port=sessions[0].inside_port,
1827             protocol=sessions[0].protocol,
1828             flags=(
1829                 self.config_flags.NAT_IS_INSIDE
1830                 | self.config_flags.NAT_IS_EXT_HOST_VALID
1831             ),
1832             ext_host_address=sessions[0].ext_host_address,
1833             ext_host_port=sessions[0].ext_host_port,
1834         )
1835
1836         pkts = []
1837         for port in range(1030, 1100):
1838             p = (
1839                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1840                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1841                 / TCP(sport=port, dport=external_port)
1842             )
1843             pkts.append(p)
1844         self.pg1.add_stream(pkts)
1845         self.pg_enable_capture(self.pg_interfaces)
1846         self.pg_start()
1847         capture = self.pg0.get_capture(len(pkts))
1848         for p in capture:
1849             self.assertEqual(p[IP].dst, backend)
1850
1851     def test_multiple_vrf_1(self):
1852         """Multiple VRF - both client & service in VRF1"""
1853
1854         external_addr = "1.2.3.4"
1855         external_port = 80
1856         local_port = 8080
1857         port = 0
1858
1859         flags = self.config_flags.NAT_IS_INSIDE
1860         self.vapi.nat44_interface_add_del_feature(
1861             sw_if_index=self.pg5.sw_if_index, is_add=1
1862         )
1863         self.vapi.nat44_interface_add_del_feature(
1864             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
1865         )
1866         self.vapi.nat44_interface_add_del_feature(
1867             sw_if_index=self.pg6.sw_if_index, is_add=1
1868         )
1869         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1870         self.nat_add_static_mapping(
1871             self.pg5.remote_ip4,
1872             external_addr,
1873             local_port,
1874             external_port,
1875             vrf_id=1,
1876             proto=IP_PROTOS.tcp,
1877             flags=flags,
1878         )
1879
1880         p = (
1881             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
1882             / IP(src=self.pg6.remote_ip4, dst=external_addr)
1883             / TCP(sport=12345, dport=external_port)
1884         )
1885         self.pg6.add_stream(p)
1886         self.pg_enable_capture(self.pg_interfaces)
1887         self.pg_start()
1888         capture = self.pg5.get_capture(1)
1889         p = capture[0]
1890         try:
1891             ip = p[IP]
1892             tcp = p[TCP]
1893             self.assertEqual(ip.dst, self.pg5.remote_ip4)
1894             self.assertEqual(tcp.dport, local_port)
1895             self.assert_packet_checksums_valid(p)
1896         except:
1897             self.logger.error(ppp("Unexpected or invalid packet:", p))
1898             raise
1899
1900         p = (
1901             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
1902             / IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4)
1903             / TCP(sport=local_port, dport=12345)
1904         )
1905         self.pg5.add_stream(p)
1906         self.pg_enable_capture(self.pg_interfaces)
1907         self.pg_start()
1908         capture = self.pg6.get_capture(1)
1909         p = capture[0]
1910         try:
1911             ip = p[IP]
1912             tcp = p[TCP]
1913             self.assertEqual(ip.src, external_addr)
1914             self.assertEqual(tcp.sport, external_port)
1915             self.assert_packet_checksums_valid(p)
1916         except:
1917             self.logger.error(ppp("Unexpected or invalid packet:", p))
1918             raise
1919
1920     def test_multiple_vrf_2(self):
1921         """Multiple VRF - dynamic NAT from VRF1 to VRF0 (output-feature)"""
1922
1923         external_addr = "1.2.3.4"
1924         external_port = 80
1925         local_port = 8080
1926         port = 0
1927
1928         self.nat_add_address(self.nat_addr)
1929         flags = self.config_flags.NAT_IS_INSIDE
1930         self.vapi.nat44_ed_add_del_output_interface(
1931             sw_if_index=self.pg1.sw_if_index, is_add=1
1932         )
1933         self.vapi.nat44_interface_add_del_feature(
1934             sw_if_index=self.pg5.sw_if_index, is_add=1
1935         )
1936         self.vapi.nat44_interface_add_del_feature(
1937             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
1938         )
1939         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1940         self.nat_add_static_mapping(
1941             self.pg5.remote_ip4,
1942             external_addr,
1943             local_port,
1944             external_port,
1945             vrf_id=1,
1946             proto=IP_PROTOS.tcp,
1947             flags=flags,
1948         )
1949
1950         p = (
1951             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
1952             / IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4)
1953             / TCP(sport=2345, dport=22)
1954         )
1955         self.pg5.add_stream(p)
1956         self.pg_enable_capture(self.pg_interfaces)
1957         self.pg_start()
1958         capture = self.pg1.get_capture(1)
1959         p = capture[0]
1960         try:
1961             ip = p[IP]
1962             tcp = p[TCP]
1963             self.assertEqual(ip.src, self.nat_addr)
1964             self.assert_packet_checksums_valid(p)
1965             port = tcp.sport
1966         except:
1967             self.logger.error(ppp("Unexpected or invalid packet:", p))
1968             raise
1969
1970         p = (
1971             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1972             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1973             / TCP(sport=22, dport=port)
1974         )
1975         self.pg1.add_stream(p)
1976         self.pg_enable_capture(self.pg_interfaces)
1977         self.pg_start()
1978         capture = self.pg5.get_capture(1)
1979         p = capture[0]
1980         try:
1981             ip = p[IP]
1982             tcp = p[TCP]
1983             self.assertEqual(ip.dst, self.pg5.remote_ip4)
1984             self.assertEqual(tcp.dport, 2345)
1985             self.assert_packet_checksums_valid(p)
1986         except:
1987             self.logger.error(ppp("Unexpected or invalid packet:", p))
1988             raise
1989
1990     def test_multiple_vrf_3(self):
1991         """Multiple VRF - client in VRF1, service in VRF0"""
1992
1993         external_addr = "1.2.3.4"
1994         external_port = 80
1995         local_port = 8080
1996         port = 0
1997
1998         flags = self.config_flags.NAT_IS_INSIDE
1999         self.vapi.nat44_interface_add_del_feature(
2000             sw_if_index=self.pg0.sw_if_index, is_add=1
2001         )
2002         self.vapi.nat44_interface_add_del_feature(
2003             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2004         )
2005         self.vapi.nat44_interface_add_del_feature(
2006             sw_if_index=self.pg6.sw_if_index, is_add=1
2007         )
2008         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2009         self.nat_add_static_mapping(
2010             self.pg0.remote_ip4,
2011             external_sw_if_index=self.pg0.sw_if_index,
2012             local_port=local_port,
2013             vrf_id=0,
2014             external_port=external_port,
2015             proto=IP_PROTOS.tcp,
2016             flags=flags,
2017         )
2018
2019         # from client VRF1 to service VRF0
2020         p = (
2021             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
2022             / IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4)
2023             / TCP(sport=12346, dport=external_port)
2024         )
2025         self.pg6.add_stream(p)
2026         self.pg_enable_capture(self.pg_interfaces)
2027         self.pg_start()
2028         capture = self.pg0.get_capture(1)
2029         p = capture[0]
2030         try:
2031             ip = p[IP]
2032             tcp = p[TCP]
2033             self.assertEqual(ip.dst, self.pg0.remote_ip4)
2034             self.assertEqual(tcp.dport, local_port)
2035             self.assert_packet_checksums_valid(p)
2036         except:
2037             self.logger.error(ppp("Unexpected or invalid packet:", p))
2038             raise
2039
2040         # from service VRF0 back to client VRF1
2041         p = (
2042             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2043             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2044             / TCP(sport=local_port, dport=12346)
2045         )
2046         self.pg0.add_stream(p)
2047         self.pg_enable_capture(self.pg_interfaces)
2048         self.pg_start()
2049         capture = self.pg6.get_capture(1)
2050         p = capture[0]
2051         try:
2052             ip = p[IP]
2053             tcp = p[TCP]
2054             self.assertEqual(ip.src, self.pg0.local_ip4)
2055             self.assertEqual(tcp.sport, external_port)
2056             self.assert_packet_checksums_valid(p)
2057         except:
2058             self.logger.error(ppp("Unexpected or invalid packet:", p))
2059             raise
2060
2061     def test_multiple_vrf_4(self):
2062         """Multiple VRF - client in VRF0, service in VRF1"""
2063
2064         external_addr = "1.2.3.4"
2065         external_port = 80
2066         local_port = 8080
2067         port = 0
2068
2069         flags = self.config_flags.NAT_IS_INSIDE
2070         self.vapi.nat44_interface_add_del_feature(
2071             sw_if_index=self.pg0.sw_if_index, is_add=1
2072         )
2073         self.vapi.nat44_interface_add_del_feature(
2074             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2075         )
2076         self.vapi.nat44_interface_add_del_feature(
2077             sw_if_index=self.pg5.sw_if_index, is_add=1
2078         )
2079         self.vapi.nat44_interface_add_del_feature(
2080             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
2081         )
2082         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2083         self.nat_add_static_mapping(
2084             self.pg5.remote_ip4,
2085             external_addr,
2086             local_port,
2087             external_port,
2088             vrf_id=1,
2089             proto=IP_PROTOS.tcp,
2090             flags=flags,
2091         )
2092
2093         # from client VRF0 to service VRF1
2094         p = (
2095             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2096             / IP(src=self.pg0.remote_ip4, dst=external_addr)
2097             / TCP(sport=12347, dport=external_port)
2098         )
2099         self.pg0.add_stream(p)
2100         self.pg_enable_capture(self.pg_interfaces)
2101         self.pg_start()
2102         capture = self.pg5.get_capture(1)
2103         p = capture[0]
2104         try:
2105             ip = p[IP]
2106             tcp = p[TCP]
2107             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2108             self.assertEqual(tcp.dport, local_port)
2109             self.assert_packet_checksums_valid(p)
2110         except:
2111             self.logger.error(ppp("Unexpected or invalid packet:", p))
2112             raise
2113
2114         # from service VRF1 back to client VRF0
2115         p = (
2116             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2117             / IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4)
2118             / TCP(sport=local_port, dport=12347)
2119         )
2120         self.pg5.add_stream(p)
2121         self.pg_enable_capture(self.pg_interfaces)
2122         self.pg_start()
2123         capture = self.pg0.get_capture(1)
2124         p = capture[0]
2125         try:
2126             ip = p[IP]
2127             tcp = p[TCP]
2128             self.assertEqual(ip.src, external_addr)
2129             self.assertEqual(tcp.sport, external_port)
2130             self.assert_packet_checksums_valid(p)
2131         except:
2132             self.logger.error(ppp("Unexpected or invalid packet:", p))
2133             raise
2134
2135     def test_multiple_vrf_5(self):
2136         """Multiple VRF - forwarding - no translation"""
2137
2138         external_addr = "1.2.3.4"
2139         external_port = 80
2140         local_port = 8080
2141         port = 0
2142
2143         self.vapi.nat44_forwarding_enable_disable(enable=1)
2144         flags = self.config_flags.NAT_IS_INSIDE
2145         self.vapi.nat44_interface_add_del_feature(
2146             sw_if_index=self.pg0.sw_if_index, is_add=1
2147         )
2148         self.vapi.nat44_interface_add_del_feature(
2149             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2150         )
2151         self.vapi.nat44_interface_add_del_feature(
2152             sw_if_index=self.pg5.sw_if_index, is_add=1
2153         )
2154         self.vapi.nat44_interface_add_del_feature(
2155             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
2156         )
2157         self.vapi.nat44_interface_add_del_feature(
2158             sw_if_index=self.pg6.sw_if_index, is_add=1
2159         )
2160         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2161         self.nat_add_static_mapping(
2162             self.pg5.remote_ip4,
2163             external_addr,
2164             local_port,
2165             external_port,
2166             vrf_id=1,
2167             proto=IP_PROTOS.tcp,
2168             flags=flags,
2169         )
2170         self.nat_add_static_mapping(
2171             self.pg0.remote_ip4,
2172             external_sw_if_index=self.pg0.sw_if_index,
2173             local_port=local_port,
2174             vrf_id=0,
2175             external_port=external_port,
2176             proto=IP_PROTOS.tcp,
2177             flags=flags,
2178         )
2179
2180         # from client to server (both VRF1, no translation)
2181         p = (
2182             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
2183             / IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4)
2184             / TCP(sport=12348, dport=local_port)
2185         )
2186         self.pg6.add_stream(p)
2187         self.pg_enable_capture(self.pg_interfaces)
2188         self.pg_start()
2189         capture = self.pg5.get_capture(1)
2190         p = capture[0]
2191         try:
2192             ip = p[IP]
2193             tcp = p[TCP]
2194             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2195             self.assertEqual(tcp.dport, local_port)
2196             self.assert_packet_checksums_valid(p)
2197         except:
2198             self.logger.error(ppp("Unexpected or invalid packet:", p))
2199             raise
2200
2201         # from server back to client (both VRF1, no translation)
2202         p = (
2203             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2204             / IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4)
2205             / TCP(sport=local_port, dport=12348)
2206         )
2207         self.pg5.add_stream(p)
2208         self.pg_enable_capture(self.pg_interfaces)
2209         self.pg_start()
2210         capture = self.pg6.get_capture(1)
2211         p = capture[0]
2212         try:
2213             ip = p[IP]
2214             tcp = p[TCP]
2215             self.assertEqual(ip.src, self.pg5.remote_ip4)
2216             self.assertEqual(tcp.sport, local_port)
2217             self.assert_packet_checksums_valid(p)
2218         except:
2219             self.logger.error(ppp("Unexpected or invalid packet:", p))
2220             raise
2221
2222         # from client VRF1 to server VRF0 (no translation)
2223         p = (
2224             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2225             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2226             / TCP(sport=local_port, dport=12349)
2227         )
2228         self.pg0.add_stream(p)
2229         self.pg_enable_capture(self.pg_interfaces)
2230         self.pg_start()
2231         capture = self.pg6.get_capture(1)
2232         p = capture[0]
2233         try:
2234             ip = p[IP]
2235             tcp = p[TCP]
2236             self.assertEqual(ip.src, self.pg0.remote_ip4)
2237             self.assertEqual(tcp.sport, local_port)
2238             self.assert_packet_checksums_valid(p)
2239         except:
2240             self.logger.error(ppp("Unexpected or invalid packet:", p))
2241             raise
2242
2243         # from server VRF0 back to client VRF1 (no translation)
2244         p = (
2245             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2246             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2247             / TCP(sport=local_port, dport=12349)
2248         )
2249         self.pg0.add_stream(p)
2250         self.pg_enable_capture(self.pg_interfaces)
2251         self.pg_start()
2252         capture = self.pg6.get_capture(1)
2253         p = capture[0]
2254         try:
2255             ip = p[IP]
2256             tcp = p[TCP]
2257             self.assertEqual(ip.src, self.pg0.remote_ip4)
2258             self.assertEqual(tcp.sport, local_port)
2259             self.assert_packet_checksums_valid(p)
2260         except:
2261             self.logger.error(ppp("Unexpected or invalid packet:", p))
2262             raise
2263
2264         # from client VRF0 to server VRF1 (no translation)
2265         p = (
2266             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2267             / IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4)
2268             / TCP(sport=12344, dport=local_port)
2269         )
2270         self.pg0.add_stream(p)
2271         self.pg_enable_capture(self.pg_interfaces)
2272         self.pg_start()
2273         capture = self.pg5.get_capture(1)
2274         p = capture[0]
2275         try:
2276             ip = p[IP]
2277             tcp = p[TCP]
2278             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2279             self.assertEqual(tcp.dport, local_port)
2280             self.assert_packet_checksums_valid(p)
2281         except:
2282             self.logger.error(ppp("Unexpected or invalid packet:", p))
2283             raise
2284
2285         # from server VRF1 back to client VRF0 (no translation)
2286         p = (
2287             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2288             / IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4)
2289             / TCP(sport=local_port, dport=12344)
2290         )
2291         self.pg5.add_stream(p)
2292         self.pg_enable_capture(self.pg_interfaces)
2293         self.pg_start()
2294         capture = self.pg0.get_capture(1)
2295         p = capture[0]
2296         try:
2297             ip = p[IP]
2298             tcp = p[TCP]
2299             self.assertEqual(ip.src, self.pg5.remote_ip4)
2300             self.assertEqual(tcp.sport, local_port)
2301             self.assert_packet_checksums_valid(p)
2302         except:
2303             self.logger.error(ppp("Unexpected or invalid packet:", p))
2304             raise
2305
2306     def test_outside_address_distribution(self):
2307         """Outside address distribution based on source address"""
2308
2309         x = 100
2310         nat_addresses = []
2311
2312         for i in range(1, x):
2313             a = "10.0.0.%d" % i
2314             nat_addresses.append(a)
2315
2316         self.nat_add_inside_interface(self.pg0)
2317         self.nat_add_outside_interface(self.pg1)
2318
2319         self.vapi.nat44_add_del_address_range(
2320             first_ip_address=nat_addresses[0],
2321             last_ip_address=nat_addresses[-1],
2322             vrf_id=0xFFFFFFFF,
2323             is_add=1,
2324             flags=0,
2325         )
2326
2327         self.pg0.generate_remote_hosts(x)
2328
2329         pkts = []
2330         for i in range(x):
2331             info = self.create_packet_info(self.pg0, self.pg1)
2332             payload = self.info_to_payload(info)
2333             p = (
2334                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2335                 / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4)
2336                 / UDP(sport=7000 + i, dport=8000 + i)
2337                 / Raw(payload)
2338             )
2339             info.data = p
2340             pkts.append(p)
2341
2342         self.pg0.add_stream(pkts)
2343         self.pg_enable_capture(self.pg_interfaces)
2344         self.pg_start()
2345         recvd = self.pg1.get_capture(len(pkts))
2346         for p_recvd in recvd:
2347             payload_info = self.payload_to_info(p_recvd[Raw])
2348             packet_index = payload_info.index
2349             info = self._packet_infos[packet_index]
2350             self.assertTrue(info is not None)
2351             self.assertEqual(packet_index, info.index)
2352             p_sent = info.data
2353             packed = socket.inet_aton(p_sent[IP].src)
2354             numeric = struct.unpack("!L", packed)[0]
2355             numeric = socket.htonl(numeric)
2356             a = nat_addresses[(numeric - 1) % len(nat_addresses)]
2357             self.assertEqual(
2358                 a,
2359                 p_recvd[IP].src,
2360                 "Invalid packet (src IP %s translated to %s, but expected %s)"
2361                 % (p_sent[IP].src, p_recvd[IP].src, a),
2362             )
2363
2364     def test_dynamic_edge_ports(self):
2365         """NAT44ED dynamic translation test: edge ports"""
2366
2367         worker_count = self.vpp_worker_count or 1
2368         port_offset = 1024
2369         port_per_thread = (65536 - port_offset) // worker_count
2370         port_count = port_per_thread * worker_count
2371
2372         # worker thread edge ports
2373         thread_edge_ports = {0, port_offset - 1, 65535}
2374         for i in range(0, worker_count):
2375             port_thread_offset = (port_per_thread * i) + port_offset
2376             for port_range_offset in [0, port_per_thread - 1]:
2377                 port = port_thread_offset + port_range_offset
2378                 thread_edge_ports.add(port)
2379         thread_drop_ports = set(
2380             filter(
2381                 lambda x: x not in range(port_offset, port_offset + port_count),
2382                 thread_edge_ports,
2383             )
2384         )
2385
2386         in_if = self.pg7
2387         out_if = self.pg8
2388
2389         self.nat_add_address(self.nat_addr)
2390
2391         try:
2392             self.configure_ip4_interface(in_if, hosts=worker_count)
2393             self.configure_ip4_interface(out_if)
2394
2395             self.nat_add_inside_interface(in_if)
2396             self.nat_add_outside_interface(out_if)
2397
2398             # in2out
2399             tc1 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2400             uc1 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2401             ic1 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2402             dc1 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2403
2404             pkt_count = worker_count * len(thread_edge_ports)
2405
2406             i2o_pkts = [[] for x in range(0, worker_count)]
2407             for i in range(0, worker_count):
2408                 remote_host = in_if.remote_hosts[i]
2409                 for port in thread_edge_ports:
2410                     p = (
2411                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2412                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2413                         / TCP(sport=port, dport=port)
2414                     )
2415                     i2o_pkts[i].append(p)
2416
2417                     p = (
2418                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2419                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2420                         / UDP(sport=port, dport=port)
2421                     )
2422                     i2o_pkts[i].append(p)
2423
2424                     p = (
2425                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2426                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2427                         / ICMP(id=port, seq=port, type="echo-request")
2428                     )
2429                     i2o_pkts[i].append(p)
2430
2431             for i in range(0, worker_count):
2432                 if len(i2o_pkts[i]) > 0:
2433                     in_if.add_stream(i2o_pkts[i], worker=i)
2434
2435             self.pg_enable_capture(self.pg_interfaces)
2436             self.pg_start()
2437             capture = out_if.get_capture(pkt_count * 3)
2438             for packet in capture:
2439                 self.assert_packet_checksums_valid(packet)
2440                 if packet.haslayer(TCP):
2441                     self.assert_in_range(
2442                         packet[TCP].sport,
2443                         port_offset,
2444                         port_offset + port_count,
2445                         "src TCP port",
2446                     )
2447                 elif packet.haslayer(UDP):
2448                     self.assert_in_range(
2449                         packet[UDP].sport,
2450                         port_offset,
2451                         port_offset + port_count,
2452                         "src UDP port",
2453                     )
2454                 elif packet.haslayer(ICMP):
2455                     self.assert_in_range(
2456                         packet[ICMP].id,
2457                         port_offset,
2458                         port_offset + port_count,
2459                         "ICMP id",
2460                     )
2461                 else:
2462                     self.fail(
2463                         ppp("Unexpected or invalid packet (outside network):", packet)
2464                     )
2465
2466             if_idx = in_if.sw_if_index
2467             tc2 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2468             uc2 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2469             ic2 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2470             dc2 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2471
2472             self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2473             self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2474             self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2475             self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2476
2477             # out2in
2478             tc1 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2479             uc1 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2480             ic1 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2481             dc1 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2482             dc3 = self.statistics["/nat44-ed/out2in/slowpath/drops"]
2483
2484             # replies to unchanged thread ports should pass on each worker,
2485             # excluding packets outside dynamic port range
2486             drop_count = worker_count * len(thread_drop_ports)
2487             pass_count = worker_count * len(thread_edge_ports) - drop_count
2488
2489             o2i_pkts = [[] for x in range(0, worker_count)]
2490             for i in range(0, worker_count):
2491                 for port in thread_edge_ports:
2492                     p = (
2493                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2494                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2495                         / TCP(sport=port, dport=port)
2496                     )
2497                     o2i_pkts[i].append(p)
2498
2499                     p = (
2500                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2501                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2502                         / UDP(sport=port, dport=port)
2503                     )
2504                     o2i_pkts[i].append(p)
2505
2506                     p = (
2507                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2508                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2509                         / ICMP(id=port, seq=port, type="echo-reply")
2510                     )
2511                     o2i_pkts[i].append(p)
2512
2513             for i in range(0, worker_count):
2514                 if len(o2i_pkts[i]) > 0:
2515                     out_if.add_stream(o2i_pkts[i], worker=i)
2516
2517             self.pg_enable_capture(self.pg_interfaces)
2518             self.pg_start()
2519             capture = in_if.get_capture(pass_count * 3)
2520             for packet in capture:
2521                 self.assert_packet_checksums_valid(packet)
2522                 if packet.haslayer(TCP):
2523                     self.assertIn(packet[TCP].dport, thread_edge_ports, "dst TCP port")
2524                     self.assertEqual(packet[TCP].dport, packet[TCP].sport, "TCP ports")
2525                 elif packet.haslayer(UDP):
2526                     self.assertIn(packet[UDP].dport, thread_edge_ports, "dst UDP port")
2527                     self.assertEqual(packet[UDP].dport, packet[UDP].sport, "UDP ports")
2528                 elif packet.haslayer(ICMP):
2529                     self.assertIn(packet[ICMP].id, thread_edge_ports, "ICMP id")
2530                     self.assertEqual(packet[ICMP].id, packet[ICMP].seq, "ICMP id & seq")
2531                 else:
2532                     self.fail(
2533                         ppp("Unexpected or invalid packet (inside network):", packet)
2534                     )
2535
2536             if_idx = out_if.sw_if_index
2537             tc2 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2538             uc2 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2539             ic2 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2540             dc2 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2541             dc4 = self.statistics["/nat44-ed/out2in/slowpath/drops"]
2542
2543             self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pass_count)
2544             self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pass_count)
2545             self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pass_count)
2546             self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2547             self.assertEqual(
2548                 dc4[:, if_idx].sum() - dc3[:, if_idx].sum(), drop_count * 3
2549             )
2550
2551         finally:
2552             in_if.unconfig()
2553             out_if.unconfig()
2554
2555     def test_delete_interface(self):
2556         """NAT44ED delete nat interface"""
2557
2558         self.nat_add_address(self.nat_addr)
2559
2560         interfaces = self.create_loopback_interfaces(4)
2561         self.nat_add_outside_interface(interfaces[0])
2562         self.nat_add_inside_interface(interfaces[1])
2563         self.nat_add_outside_interface(interfaces[2])
2564         self.nat_add_inside_interface(interfaces[2])
2565         self.vapi.nat44_ed_add_del_output_interface(
2566             sw_if_index=interfaces[3].sw_if_index, is_add=1
2567         )
2568
2569         nat_sw_if_indices = [
2570             i.sw_if_index
2571             for i in self.vapi.nat44_interface_dump()
2572             + list(self.vapi.vpp.details_iter(self.vapi.nat44_ed_output_interface_get))
2573         ]
2574         self.assertEqual(len(nat_sw_if_indices), len(interfaces))
2575
2576         loopbacks = []
2577         for i in interfaces:
2578             # delete nat-enabled interface
2579             self.assertIn(i.sw_if_index, nat_sw_if_indices)
2580             i.remove_vpp_config()
2581
2582             # create interface with the same index
2583             lo = VppLoInterface(self)
2584             loopbacks.append(lo)
2585             self.assertEqual(lo.sw_if_index, i.sw_if_index)
2586
2587             # check interface is not nat-enabled
2588             nat_sw_if_indices = [
2589                 i.sw_if_index
2590                 for i in self.vapi.nat44_interface_dump()
2591                 + list(
2592                     self.vapi.vpp.details_iter(self.vapi.nat44_ed_output_interface_get)
2593                 )
2594             ]
2595             self.assertNotIn(lo.sw_if_index, nat_sw_if_indices)
2596
2597         for i in loopbacks:
2598             i.remove_vpp_config()
2599
2600
2601 @tag_fixme_ubuntu2204
2602 class TestNAT44EDMW(TestNAT44ED):
2603     """NAT44ED MW Test Case"""
2604
2605     vpp_worker_count = 4
2606     max_sessions = 5000
2607
2608     def test_dynamic(self):
2609         """NAT44ED dynamic translation test"""
2610         pkt_count = 1500
2611         tcp_port_offset = 20
2612         udp_port_offset = 20
2613         icmp_id_offset = 20
2614
2615         self.nat_add_address(self.nat_addr)
2616         self.nat_add_inside_interface(self.pg0)
2617         self.nat_add_outside_interface(self.pg1)
2618
2619         # in2out
2620         tc1 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2621         uc1 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2622         ic1 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2623         dc1 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2624
2625         i2o_pkts = [[] for x in range(0, self.vpp_worker_count)]
2626
2627         for i in range(pkt_count):
2628             p = (
2629                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2630                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2631                 / TCP(sport=tcp_port_offset + i, dport=20)
2632             )
2633             i2o_pkts[p[TCP].sport % self.vpp_worker_count].append(p)
2634
2635             p = (
2636                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2637                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2638                 / UDP(sport=udp_port_offset + i, dport=20)
2639             )
2640             i2o_pkts[p[UDP].sport % self.vpp_worker_count].append(p)
2641
2642             p = (
2643                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2644                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2645                 / ICMP(id=icmp_id_offset + i, type="echo-request")
2646             )
2647             i2o_pkts[p[ICMP].id % self.vpp_worker_count].append(p)
2648
2649         for i in range(0, self.vpp_worker_count):
2650             if len(i2o_pkts[i]) > 0:
2651                 self.pg0.add_stream(i2o_pkts[i], worker=i)
2652
2653         self.pg_enable_capture(self.pg_interfaces)
2654         self.pg_start()
2655         capture = self.pg1.get_capture(pkt_count * 3, timeout=5)
2656
2657         if_idx = self.pg0.sw_if_index
2658         tc2 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2659         uc2 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2660         ic2 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2661         dc2 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2662
2663         self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2664         self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2665         self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2666         self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2667
2668         self.logger.info(self.vapi.cli("show trace"))
2669
2670         # out2in
2671         tc1 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2672         uc1 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2673         ic1 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2674         dc1 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2675
2676         recvd_tcp_ports = set()
2677         recvd_udp_ports = set()
2678         recvd_icmp_ids = set()
2679
2680         for p in capture:
2681             if TCP in p:
2682                 recvd_tcp_ports.add(p[TCP].sport)
2683             if UDP in p:
2684                 recvd_udp_ports.add(p[UDP].sport)
2685             if ICMP in p:
2686                 recvd_icmp_ids.add(p[ICMP].id)
2687
2688         recvd_tcp_ports = list(recvd_tcp_ports)
2689         recvd_udp_ports = list(recvd_udp_ports)
2690         recvd_icmp_ids = list(recvd_icmp_ids)
2691
2692         o2i_pkts = [[] for x in range(0, self.vpp_worker_count)]
2693         for i in range(pkt_count):
2694             p = (
2695                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2696                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2697                 / TCP(dport=choice(recvd_tcp_ports), sport=20)
2698             )
2699             o2i_pkts[p[TCP].dport % self.vpp_worker_count].append(p)
2700
2701             p = (
2702                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2703                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2704                 / UDP(dport=choice(recvd_udp_ports), sport=20)
2705             )
2706             o2i_pkts[p[UDP].dport % self.vpp_worker_count].append(p)
2707
2708             p = (
2709                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2710                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2711                 / ICMP(id=choice(recvd_icmp_ids), type="echo-reply")
2712             )
2713             o2i_pkts[p[ICMP].id % self.vpp_worker_count].append(p)
2714
2715         for i in range(0, self.vpp_worker_count):
2716             if len(o2i_pkts[i]) > 0:
2717                 self.pg1.add_stream(o2i_pkts[i], worker=i)
2718
2719         self.pg_enable_capture(self.pg_interfaces)
2720         self.pg_start()
2721         capture = self.pg0.get_capture(pkt_count * 3)
2722         for packet in capture:
2723             try:
2724                 self.assert_packet_checksums_valid(packet)
2725                 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2726                 if packet.haslayer(TCP):
2727                     self.assert_in_range(
2728                         packet[TCP].dport,
2729                         tcp_port_offset,
2730                         tcp_port_offset + pkt_count,
2731                         "dst TCP port",
2732                     )
2733                 elif packet.haslayer(UDP):
2734                     self.assert_in_range(
2735                         packet[UDP].dport,
2736                         udp_port_offset,
2737                         udp_port_offset + pkt_count,
2738                         "dst UDP port",
2739                     )
2740                 else:
2741                     self.assert_in_range(
2742                         packet[ICMP].id,
2743                         icmp_id_offset,
2744                         icmp_id_offset + pkt_count,
2745                         "ICMP id",
2746                     )
2747             except:
2748                 self.logger.error(
2749                     ppp("Unexpected or invalid packet (inside network):", packet)
2750                 )
2751                 raise
2752
2753         if_idx = self.pg1.sw_if_index
2754         tc2 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2755         uc2 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2756         ic2 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2757         dc2 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2758
2759         self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2760         self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2761         self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2762         self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2763
2764         sc = self.statistics["/nat44-ed/total-sessions"]
2765         self.assertEqual(
2766             sc[:, 0].sum(),
2767             len(recvd_tcp_ports) + len(recvd_udp_ports) + len(recvd_icmp_ids),
2768         )
2769
2770     def test_frag_in_order(self):
2771         """NAT44ED translate fragments arriving in order"""
2772
2773         self.nat_add_address(self.nat_addr)
2774         self.nat_add_inside_interface(self.pg0)
2775         self.nat_add_outside_interface(self.pg1)
2776
2777         self.frag_in_order(proto=IP_PROTOS.tcp, ignore_port=True)
2778         self.frag_in_order(proto=IP_PROTOS.udp, ignore_port=True)
2779         self.frag_in_order(proto=IP_PROTOS.icmp, ignore_port=True)
2780
2781     def test_frag_in_order_do_not_translate(self):
2782         """NAT44ED don't translate fragments arriving in order"""
2783
2784         self.nat_add_address(self.nat_addr)
2785         self.nat_add_inside_interface(self.pg0)
2786         self.nat_add_outside_interface(self.pg1)
2787         self.vapi.nat44_forwarding_enable_disable(enable=True)
2788
2789         self.frag_in_order(proto=IP_PROTOS.tcp, dont_translate=True)
2790
2791     def test_frag_out_of_order(self):
2792         """NAT44ED translate fragments arriving out of order"""
2793
2794         self.nat_add_address(self.nat_addr)
2795         self.nat_add_inside_interface(self.pg0)
2796         self.nat_add_outside_interface(self.pg1)
2797
2798         self.frag_out_of_order(proto=IP_PROTOS.tcp, ignore_port=True)
2799         self.frag_out_of_order(proto=IP_PROTOS.udp, ignore_port=True)
2800         self.frag_out_of_order(proto=IP_PROTOS.icmp, ignore_port=True)
2801
2802     def test_frag_in_order_in_plus_out(self):
2803         """NAT44ED in+out interface fragments in order"""
2804
2805         in_port = self.random_port()
2806         out_port = self.random_port()
2807
2808         self.nat_add_address(self.nat_addr)
2809         self.nat_add_inside_interface(self.pg0)
2810         self.nat_add_outside_interface(self.pg0)
2811         self.nat_add_inside_interface(self.pg1)
2812         self.nat_add_outside_interface(self.pg1)
2813
2814         # add static mappings for server
2815         self.nat_add_static_mapping(
2816             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.tcp
2817         )
2818         self.nat_add_static_mapping(
2819             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.udp
2820         )
2821         self.nat_add_static_mapping(
2822             self.server_addr, self.nat_addr, proto=IP_PROTOS.icmp
2823         )
2824
2825         # run tests for each protocol
2826         self.frag_in_order_in_plus_out(
2827             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.tcp
2828         )
2829         self.frag_in_order_in_plus_out(
2830             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.udp
2831         )
2832         self.frag_in_order_in_plus_out(
2833             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.icmp
2834         )
2835
2836     def test_frag_out_of_order_in_plus_out(self):
2837         """NAT44ED in+out interface fragments out of order"""
2838
2839         in_port = self.random_port()
2840         out_port = self.random_port()
2841
2842         self.nat_add_address(self.nat_addr)
2843         self.nat_add_inside_interface(self.pg0)
2844         self.nat_add_outside_interface(self.pg0)
2845         self.nat_add_inside_interface(self.pg1)
2846         self.nat_add_outside_interface(self.pg1)
2847
2848         # add static mappings for server
2849         self.nat_add_static_mapping(
2850             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.tcp
2851         )
2852         self.nat_add_static_mapping(
2853             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.udp
2854         )
2855         self.nat_add_static_mapping(
2856             self.server_addr, self.nat_addr, proto=IP_PROTOS.icmp
2857         )
2858
2859         # run tests for each protocol
2860         self.frag_out_of_order_in_plus_out(
2861             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.tcp
2862         )
2863         self.frag_out_of_order_in_plus_out(
2864             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.udp
2865         )
2866         self.frag_out_of_order_in_plus_out(
2867             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.icmp
2868         )
2869
2870     def test_reass_hairpinning(self):
2871         """NAT44ED fragments hairpinning"""
2872
2873         server_addr = self.pg0.remote_hosts[1].ip4
2874
2875         host_in_port = self.random_port()
2876         server_in_port = self.random_port()
2877         server_out_port = self.random_port()
2878
2879         self.nat_add_address(self.nat_addr)
2880         self.nat_add_inside_interface(self.pg0)
2881         self.nat_add_outside_interface(self.pg1)
2882
2883         # add static mapping for server
2884         self.nat_add_static_mapping(
2885             server_addr,
2886             self.nat_addr,
2887             server_in_port,
2888             server_out_port,
2889             proto=IP_PROTOS.tcp,
2890         )
2891         self.nat_add_static_mapping(
2892             server_addr,
2893             self.nat_addr,
2894             server_in_port,
2895             server_out_port,
2896             proto=IP_PROTOS.udp,
2897         )
2898         self.nat_add_static_mapping(server_addr, self.nat_addr)
2899
2900         self.reass_hairpinning(
2901             server_addr,
2902             server_in_port,
2903             server_out_port,
2904             host_in_port,
2905             proto=IP_PROTOS.tcp,
2906             ignore_port=True,
2907         )
2908         self.reass_hairpinning(
2909             server_addr,
2910             server_in_port,
2911             server_out_port,
2912             host_in_port,
2913             proto=IP_PROTOS.udp,
2914             ignore_port=True,
2915         )
2916         self.reass_hairpinning(
2917             server_addr,
2918             server_in_port,
2919             server_out_port,
2920             host_in_port,
2921             proto=IP_PROTOS.icmp,
2922             ignore_port=True,
2923         )
2924
2925     def test_session_limit_per_vrf(self):
2926         """NAT44ED per vrf session limit"""
2927
2928         inside = self.pg0
2929         inside_vrf10 = self.pg2
2930         outside = self.pg1
2931
2932         limit = 5
2933
2934         # 2 interfaces pg0, pg1 (vrf10, limit 1 tcp session)
2935         # non existing vrf_id makes process core dump
2936         self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=10)
2937
2938         self.nat_add_inside_interface(inside)
2939         self.nat_add_inside_interface(inside_vrf10)
2940         self.nat_add_outside_interface(outside)
2941
2942         # vrf independent
2943         self.nat_add_interface_address(outside)
2944
2945         # BUG: causing core dump - when bad vrf_id is specified
2946         # self.nat_add_address(outside.local_ip4, vrf_id=20)
2947
2948         stream = self.create_tcp_stream(inside_vrf10, outside, limit * 2)
2949         inside_vrf10.add_stream(stream)
2950
2951         self.pg_enable_capture(self.pg_interfaces)
2952         self.pg_start()
2953
2954         capture = outside.get_capture(limit)
2955
2956         stream = self.create_tcp_stream(inside, outside, limit * 2)
2957         inside.add_stream(stream)
2958
2959         self.pg_enable_capture(self.pg_interfaces)
2960         self.pg_start()
2961
2962         capture = outside.get_capture(len(stream))
2963
2964     def test_show_max_translations(self):
2965         """NAT44ED API test - max translations per thread"""
2966         config = self.vapi.nat44_show_running_config()
2967         self.assertEqual(self.max_sessions, config.sessions)
2968
2969     def test_lru_cleanup(self):
2970         """NAT44ED LRU cleanup algorithm"""
2971
2972         self.nat_add_address(self.nat_addr)
2973         self.nat_add_inside_interface(self.pg0)
2974         self.nat_add_outside_interface(self.pg1)
2975
2976         self.vapi.nat_set_timeouts(
2977             udp=1, tcp_established=7440, tcp_transitory=30, icmp=1
2978         )
2979
2980         tcp_port_out = self.init_tcp_session(self.pg0, self.pg1, 2000, 80)
2981         pkts = []
2982         for i in range(0, self.max_sessions - 1):
2983             p = (
2984                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2985                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
2986                 / UDP(sport=7000 + i, dport=80)
2987             )
2988             pkts.append(p)
2989
2990         self.pg0.add_stream(pkts)
2991         self.pg_enable_capture(self.pg_interfaces)
2992         self.pg_start()
2993         self.pg1.get_capture(len(pkts))
2994         self.virtual_sleep(1.5, "wait for timeouts")
2995
2996         pkts = []
2997         for i in range(0, self.max_sessions - 1):
2998             p = (
2999                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3000                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
3001                 / ICMP(id=8000 + i, type="echo-request")
3002             )
3003             pkts.append(p)
3004
3005         self.pg0.add_stream(pkts)
3006         self.pg_enable_capture(self.pg_interfaces)
3007         self.pg_start()
3008         self.pg1.get_capture(len(pkts))
3009
3010     def test_session_rst_timeout(self):
3011         """NAT44ED session RST timeouts"""
3012
3013         self.nat_add_address(self.nat_addr)
3014         self.nat_add_inside_interface(self.pg0)
3015         self.nat_add_outside_interface(self.pg1)
3016
3017         self.vapi.nat_set_timeouts(
3018             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
3019         )
3020
3021         self.init_tcp_session(
3022             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
3023         )
3024         p = (
3025             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3026             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3027             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="R")
3028         )
3029         self.send_and_expect(self.pg0, p, self.pg1)
3030
3031         self.virtual_sleep(6)
3032
3033         # The session is already closed
3034         p = (
3035             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3036             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3037             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
3038         )
3039         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
3040
3041         # The session can be re-opened
3042         p = (
3043             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3044             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3045             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="S")
3046         )
3047         self.send_and_expect(self.pg0, p, self.pg1)
3048
3049     def test_session_rst_established_timeout(self):
3050         """NAT44ED session RST timeouts"""
3051
3052         self.nat_add_address(self.nat_addr)
3053         self.nat_add_inside_interface(self.pg0)
3054         self.nat_add_outside_interface(self.pg1)
3055
3056         self.vapi.nat_set_timeouts(
3057             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
3058         )
3059
3060         self.init_tcp_session(
3061             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
3062         )
3063
3064         # Wait at least the transitory time, the session is in established
3065         # state anyway. RST followed by a data packet should move it to
3066         # transitory state.
3067         self.virtual_sleep(6)
3068         p = (
3069             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3070             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3071             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="R")
3072         )
3073         self.send_and_expect(self.pg0, p, self.pg1)
3074
3075         p = (
3076             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3077             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3078             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
3079         )
3080         self.send_and_expect(self.pg0, p, self.pg1)
3081
3082         # State is transitory, session should be closed after 6 seconds
3083         self.virtual_sleep(6)
3084
3085         p = (
3086             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3087             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3088             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
3089         )
3090         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
3091
3092     def test_dynamic_out_of_ports(self):
3093         """NAT44ED dynamic translation test: out of ports"""
3094
3095         self.nat_add_inside_interface(self.pg0)
3096         self.nat_add_outside_interface(self.pg1)
3097
3098         # in2out and no NAT addresses added
3099         pkts = self.create_stream_in(self.pg0, self.pg1)
3100
3101         self.send_and_assert_no_replies(
3102             self.pg0,
3103             pkts,
3104             msg="i2o pkts",
3105             stats_diff=self.no_diff
3106             | {
3107                 "err": {
3108                     "/err/nat44-ed-in2out-slowpath/out of ports": len(pkts),
3109                 },
3110                 self.pg0.sw_if_index: {
3111                     "/nat44-ed/in2out/slowpath/drops": len(pkts),
3112                 },
3113             },
3114         )
3115
3116         # in2out after NAT addresses added
3117         self.nat_add_address(self.nat_addr)
3118
3119         tcpn, udpn, icmpn = (
3120             sum(x) for x in zip(*((TCP in p, UDP in p, ICMP in p) for p in pkts))
3121         )
3122
3123         self.send_and_expect(
3124             self.pg0,
3125             pkts,
3126             self.pg1,
3127             msg="i2o pkts",
3128             stats_diff=self.no_diff
3129             | {
3130                 "err": {
3131                     "/err/nat44-ed-in2out-slowpath/out of ports": 0,
3132                 },
3133                 self.pg0.sw_if_index: {
3134                     "/nat44-ed/in2out/slowpath/drops": 0,
3135                     "/nat44-ed/in2out/slowpath/tcp": tcpn,
3136                     "/nat44-ed/in2out/slowpath/udp": udpn,
3137                     "/nat44-ed/in2out/slowpath/icmp": icmpn,
3138                 },
3139             },
3140         )
3141
3142     def test_unknown_proto(self):
3143         """NAT44ED translate packet with unknown protocol"""
3144
3145         self.nat_add_address(self.nat_addr)
3146         self.nat_add_inside_interface(self.pg0)
3147         self.nat_add_outside_interface(self.pg1)
3148
3149         # in2out
3150         p = (
3151             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3152             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3153             / TCP(sport=self.tcp_port_in, dport=20)
3154         )
3155         self.pg0.add_stream(p)
3156         self.pg_enable_capture(self.pg_interfaces)
3157         self.pg_start()
3158         p = self.pg1.get_capture(1)
3159
3160         p = (
3161             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3162             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3163             / GRE()
3164             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3165             / TCP(sport=1234, dport=1234)
3166         )
3167         self.pg0.add_stream(p)
3168         self.pg_enable_capture(self.pg_interfaces)
3169         self.pg_start()
3170         p = self.pg1.get_capture(1)
3171         packet = p[0]
3172         try:
3173             self.assertEqual(packet[IP].src, self.nat_addr)
3174             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3175             self.assertEqual(packet.haslayer(GRE), 1)
3176             self.assert_packet_checksums_valid(packet)
3177         except:
3178             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3179             raise
3180
3181         # out2in
3182         p = (
3183             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3184             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3185             / GRE()
3186             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3187             / TCP(sport=1234, dport=1234)
3188         )
3189         self.pg1.add_stream(p)
3190         self.pg_enable_capture(self.pg_interfaces)
3191         self.pg_start()
3192         p = self.pg0.get_capture(1)
3193         packet = p[0]
3194         try:
3195             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3196             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3197             self.assertEqual(packet.haslayer(GRE), 1)
3198             self.assert_packet_checksums_valid(packet)
3199         except:
3200             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3201             raise
3202
3203     def test_hairpinning_unknown_proto(self):
3204         """NAT44ED translate packet with unknown protocol - hairpinning"""
3205         host = self.pg0.remote_hosts[0]
3206         server = self.pg0.remote_hosts[1]
3207         host_in_port = 1234
3208         server_out_port = 8765
3209         server_nat_ip = "10.0.0.11"
3210
3211         self.nat_add_address(self.nat_addr)
3212         self.nat_add_inside_interface(self.pg0)
3213         self.nat_add_outside_interface(self.pg1)
3214
3215         # add static mapping for server
3216         self.nat_add_static_mapping(server.ip4, server_nat_ip)
3217
3218         # host to server
3219         p = (
3220             Ether(src=host.mac, dst=self.pg0.local_mac)
3221             / IP(src=host.ip4, dst=server_nat_ip)
3222             / TCP(sport=host_in_port, dport=server_out_port)
3223         )
3224         self.pg0.add_stream(p)
3225         self.pg_enable_capture(self.pg_interfaces)
3226         self.pg_start()
3227         self.pg0.get_capture(1)
3228
3229         p = (
3230             Ether(dst=self.pg0.local_mac, src=host.mac)
3231             / IP(src=host.ip4, dst=server_nat_ip)
3232             / GRE()
3233             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3234             / TCP(sport=1234, dport=1234)
3235         )
3236         self.pg0.add_stream(p)
3237         self.pg_enable_capture(self.pg_interfaces)
3238         self.pg_start()
3239         p = self.pg0.get_capture(1)
3240         packet = p[0]
3241         try:
3242             self.assertEqual(packet[IP].src, self.nat_addr)
3243             self.assertEqual(packet[IP].dst, server.ip4)
3244             self.assertEqual(packet.haslayer(GRE), 1)
3245             self.assert_packet_checksums_valid(packet)
3246         except:
3247             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3248             raise
3249
3250         # server to host
3251         p = (
3252             Ether(dst=self.pg0.local_mac, src=server.mac)
3253             / IP(src=server.ip4, dst=self.nat_addr)
3254             / GRE()
3255             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3256             / TCP(sport=1234, dport=1234)
3257         )
3258         self.pg0.add_stream(p)
3259         self.pg_enable_capture(self.pg_interfaces)
3260         self.pg_start()
3261         p = self.pg0.get_capture(1)
3262         packet = p[0]
3263         try:
3264             self.assertEqual(packet[IP].src, server_nat_ip)
3265             self.assertEqual(packet[IP].dst, host.ip4)
3266             self.assertEqual(packet.haslayer(GRE), 1)
3267             self.assert_packet_checksums_valid(packet)
3268         except:
3269             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3270             raise
3271
3272     def test_output_feature_and_service(self):
3273         """NAT44ED interface output feature and services"""
3274         external_addr = "1.2.3.4"
3275         external_port = 80
3276         local_port = 8080
3277
3278         self.vapi.nat44_forwarding_enable_disable(enable=1)
3279         self.nat_add_address(self.nat_addr)
3280         flags = self.config_flags.NAT_IS_ADDR_ONLY
3281         self.vapi.nat44_add_del_identity_mapping(
3282             ip_address=self.pg1.remote_ip4,
3283             sw_if_index=0xFFFFFFFF,
3284             flags=flags,
3285             is_add=1,
3286         )
3287         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
3288         self.nat_add_static_mapping(
3289             self.pg0.remote_ip4,
3290             external_addr,
3291             local_port,
3292             external_port,
3293             proto=IP_PROTOS.tcp,
3294             flags=flags,
3295         )
3296
3297         self.nat_add_inside_interface(self.pg0)
3298         self.nat_add_outside_interface(self.pg0)
3299         self.vapi.nat44_ed_add_del_output_interface(
3300             sw_if_index=self.pg1.sw_if_index, is_add=1
3301         )
3302
3303         # from client to service
3304         p = (
3305             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3306             / IP(src=self.pg1.remote_ip4, dst=external_addr)
3307             / TCP(sport=12345, dport=external_port)
3308         )
3309         self.pg1.add_stream(p)
3310         self.pg_enable_capture(self.pg_interfaces)
3311         self.pg_start()
3312         capture = self.pg0.get_capture(1)
3313         p = capture[0]
3314         try:
3315             ip = p[IP]
3316             tcp = p[TCP]
3317             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3318             self.assertEqual(tcp.dport, local_port)
3319             self.assert_packet_checksums_valid(p)
3320         except:
3321             self.logger.error(ppp("Unexpected or invalid packet:", p))
3322             raise
3323
3324         # from service back to client
3325         p = (
3326             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3327             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3328             / TCP(sport=local_port, dport=12345)
3329         )
3330         self.pg0.add_stream(p)
3331         self.pg_enable_capture(self.pg_interfaces)
3332         self.pg_start()
3333         capture = self.pg1.get_capture(1)
3334         p = capture[0]
3335         try:
3336             ip = p[IP]
3337             tcp = p[TCP]
3338             self.assertEqual(ip.src, external_addr)
3339             self.assertEqual(tcp.sport, external_port)
3340             self.assert_packet_checksums_valid(p)
3341         except:
3342             self.logger.error(ppp("Unexpected or invalid packet:", p))
3343             raise
3344
3345         # from local network host to external network
3346         pkts = self.create_stream_in(self.pg0, self.pg1)
3347         self.pg0.add_stream(pkts)
3348         self.pg_enable_capture(self.pg_interfaces)
3349         self.pg_start()
3350         capture = self.pg1.get_capture(len(pkts))
3351         self.verify_capture_out(capture, ignore_port=True)
3352         pkts = self.create_stream_in(self.pg0, self.pg1)
3353         self.pg0.add_stream(pkts)
3354         self.pg_enable_capture(self.pg_interfaces)
3355         self.pg_start()
3356         capture = self.pg1.get_capture(len(pkts))
3357         self.verify_capture_out(capture, ignore_port=True)
3358
3359         # from external network back to local network host
3360         pkts = self.create_stream_out(self.pg1)
3361         self.pg1.add_stream(pkts)
3362         self.pg_enable_capture(self.pg_interfaces)
3363         self.pg_start()
3364         capture = self.pg0.get_capture(len(pkts))
3365         self.verify_capture_in(capture, self.pg0)
3366
3367     def test_output_feature_and_service3(self):
3368         """NAT44ED interface output feature and DST NAT"""
3369         external_addr = "1.2.3.4"
3370         external_port = 80
3371         local_port = 8080
3372
3373         self.vapi.nat44_forwarding_enable_disable(enable=1)
3374         self.nat_add_address(self.nat_addr)
3375         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
3376         self.nat_add_static_mapping(
3377             self.pg1.remote_ip4,
3378             external_addr,
3379             local_port,
3380             external_port,
3381             proto=IP_PROTOS.tcp,
3382             flags=flags,
3383         )
3384
3385         self.nat_add_inside_interface(self.pg0)
3386         self.nat_add_outside_interface(self.pg0)
3387         self.vapi.nat44_ed_add_del_output_interface(
3388             sw_if_index=self.pg1.sw_if_index, is_add=1
3389         )
3390
3391         p = (
3392             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3393             / IP(src=self.pg0.remote_ip4, dst=external_addr)
3394             / TCP(sport=12345, dport=external_port)
3395         )
3396         self.pg0.add_stream(p)
3397         self.pg_enable_capture(self.pg_interfaces)
3398         self.pg_start()
3399         capture = self.pg1.get_capture(1)
3400         p = capture[0]
3401         try:
3402             ip = p[IP]
3403             tcp = p[TCP]
3404             self.assertEqual(ip.src, self.pg0.remote_ip4)
3405             self.assertEqual(tcp.sport, 12345)
3406             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3407             self.assertEqual(tcp.dport, local_port)
3408             self.assert_packet_checksums_valid(p)
3409         except:
3410             self.logger.error(ppp("Unexpected or invalid packet:", p))
3411             raise
3412
3413         p = (
3414             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3415             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
3416             / TCP(sport=local_port, dport=12345)
3417         )
3418         self.pg1.add_stream(p)
3419         self.pg_enable_capture(self.pg_interfaces)
3420         self.pg_start()
3421         capture = self.pg0.get_capture(1)
3422         p = capture[0]
3423         try:
3424             ip = p[IP]
3425             tcp = p[TCP]
3426             self.assertEqual(ip.src, external_addr)
3427             self.assertEqual(tcp.sport, external_port)
3428             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3429             self.assertEqual(tcp.dport, 12345)
3430             self.assert_packet_checksums_valid(p)
3431         except:
3432             self.logger.error(ppp("Unexpected or invalid packet:", p))
3433             raise
3434
3435     def test_self_twice_nat_lb_negative(self):
3436         """NAT44ED Self Twice NAT local service load balancing (negative test)"""
3437         self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, client_id=2)
3438
3439     def test_self_twice_nat_negative(self):
3440         """NAT44ED Self Twice NAT (negative test)"""
3441         self.twice_nat_common(self_twice_nat=True)
3442
3443     def test_static_lb_multi_clients(self):
3444         """NAT44ED local service load balancing - multiple clients"""
3445
3446         external_addr = self.nat_addr
3447         external_port = 80
3448         local_port = 8080
3449         server1 = self.pg0.remote_hosts[0]
3450         server2 = self.pg0.remote_hosts[1]
3451         server3 = self.pg0.remote_hosts[2]
3452
3453         locals = [
3454             {"addr": server1.ip4, "port": local_port, "probability": 90, "vrf_id": 0},
3455             {"addr": server2.ip4, "port": local_port, "probability": 10, "vrf_id": 0},
3456         ]
3457
3458         flags = self.config_flags.NAT_IS_INSIDE
3459         self.vapi.nat44_interface_add_del_feature(
3460             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3461         )
3462         self.vapi.nat44_interface_add_del_feature(
3463             sw_if_index=self.pg1.sw_if_index, is_add=1
3464         )
3465
3466         self.nat_add_address(self.nat_addr)
3467         self.vapi.nat44_add_del_lb_static_mapping(
3468             is_add=1,
3469             external_addr=external_addr,
3470             external_port=external_port,
3471             protocol=IP_PROTOS.tcp,
3472             local_num=len(locals),
3473             locals=locals,
3474         )
3475
3476         server1_n = 0
3477         server2_n = 0
3478         clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3479         pkts = []
3480         for client in clients:
3481             p = (
3482                 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3483                 / IP(src=client, dst=self.nat_addr)
3484                 / TCP(sport=12345, dport=external_port)
3485             )
3486             pkts.append(p)
3487         self.pg1.add_stream(pkts)
3488         self.pg_enable_capture(self.pg_interfaces)
3489         self.pg_start()
3490         capture = self.pg0.get_capture(len(pkts))
3491         for p in capture:
3492             if p[IP].dst == server1.ip4:
3493                 server1_n += 1
3494             else:
3495                 server2_n += 1
3496         self.assertGreaterEqual(server1_n, server2_n)
3497
3498         local = {
3499             "addr": server3.ip4,
3500             "port": local_port,
3501             "probability": 20,
3502             "vrf_id": 0,
3503         }
3504
3505         # add new back-end
3506         self.vapi.nat44_lb_static_mapping_add_del_local(
3507             is_add=1,
3508             external_addr=external_addr,
3509             external_port=external_port,
3510             local=local,
3511             protocol=IP_PROTOS.tcp,
3512         )
3513         server1_n = 0
3514         server2_n = 0
3515         server3_n = 0
3516         clients = ip4_range(self.pg1.remote_ip4, 60, 110)
3517         pkts = []
3518         for client in clients:
3519             p = (
3520                 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3521                 / IP(src=client, dst=self.nat_addr)
3522                 / TCP(sport=12346, dport=external_port)
3523             )
3524             pkts.append(p)
3525         self.assertGreater(len(pkts), 0)
3526         self.pg1.add_stream(pkts)
3527         self.pg_enable_capture(self.pg_interfaces)
3528         self.pg_start()
3529         capture = self.pg0.get_capture(len(pkts))
3530         for p in capture:
3531             if p[IP].dst == server1.ip4:
3532                 server1_n += 1
3533             elif p[IP].dst == server2.ip4:
3534                 server2_n += 1
3535             else:
3536                 server3_n += 1
3537         self.assertGreater(server1_n, 0)
3538         self.assertGreater(server2_n, 0)
3539         self.assertGreater(server3_n, 0)
3540
3541         local = {
3542             "addr": server2.ip4,
3543             "port": local_port,
3544             "probability": 10,
3545             "vrf_id": 0,
3546         }
3547
3548         # remove one back-end
3549         self.vapi.nat44_lb_static_mapping_add_del_local(
3550             is_add=0,
3551             external_addr=external_addr,
3552             external_port=external_port,
3553             local=local,
3554             protocol=IP_PROTOS.tcp,
3555         )
3556         server1_n = 0
3557         server2_n = 0
3558         server3_n = 0
3559         self.pg1.add_stream(pkts)
3560         self.pg_enable_capture(self.pg_interfaces)
3561         self.pg_start()
3562         capture = self.pg0.get_capture(len(pkts))
3563         for p in capture:
3564             if p[IP].dst == server1.ip4:
3565                 server1_n += 1
3566             elif p[IP].dst == server2.ip4:
3567                 server2_n += 1
3568             else:
3569                 server3_n += 1
3570         self.assertGreater(server1_n, 0)
3571         self.assertEqual(server2_n, 0)
3572         self.assertGreater(server3_n, 0)
3573
3574     # put zzz in front of syslog test name so that it runs as a last test
3575     # setting syslog sender cannot be undone and if it is set, it messes
3576     # with self.send_and_assert_no_replies functionality
3577     def test_zzz_syslog_sess(self):
3578         """NAT44ED Test syslog session creation and deletion"""
3579         self.vapi.syslog_set_filter(self.syslog_severity.SYSLOG_API_SEVERITY_INFO)
3580         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
3581
3582         self.nat_add_address(self.nat_addr)
3583         self.nat_add_inside_interface(self.pg0)
3584         self.nat_add_outside_interface(self.pg1)
3585
3586         p = (
3587             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3588             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3589             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
3590         )
3591         self.pg0.add_stream(p)
3592         self.pg_enable_capture(self.pg_interfaces)
3593         self.pg_start()
3594         capture = self.pg1.get_capture(1)
3595         self.tcp_port_out = capture[0][TCP].sport
3596         capture = self.pg3.get_capture(1)
3597         self.verify_syslog_sess(capture[0][Raw].load, "SADD")
3598
3599         self.pg_enable_capture(self.pg_interfaces)
3600         self.pg_start()
3601         self.nat_add_address(self.nat_addr, is_add=0)
3602         capture = self.pg3.get_capture(1)
3603         self.verify_syslog_sess(capture[0][Raw].load, "SDEL")
3604
3605     # put zzz in front of syslog test name so that it runs as a last test
3606     # setting syslog sender cannot be undone and if it is set, it messes
3607     # with self.send_and_assert_no_replies functionality
3608     def test_zzz_syslog_sess_reopen(self):
3609         """Syslog events for session reopen"""
3610         self.vapi.syslog_set_filter(self.syslog_severity.SYSLOG_API_SEVERITY_INFO)
3611         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
3612
3613         self.nat_add_address(self.nat_addr)
3614         self.nat_add_inside_interface(self.pg0)
3615         self.nat_add_outside_interface(self.pg1)
3616
3617         # SYN in2out
3618         p = (
3619             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3620             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3621             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
3622         )
3623         capture = self.send_and_expect(self.pg0, p, self.pg1)[0]
3624         self.tcp_port_out = capture[0][TCP].sport
3625         capture = self.pg3.get_capture(1)
3626         self.verify_syslog_sess(capture[0][Raw].load, "SADD")
3627
3628         # SYN out2in
3629         p = (
3630             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3631             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3632             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="SA")
3633         )
3634         self.send_and_expect(self.pg1, p, self.pg0)
3635
3636         p = (
3637             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3638             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3639             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")
3640         )
3641         self.send_and_expect(self.pg0, p, self.pg1)
3642
3643         # FIN in2out
3644         p = (
3645             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3646             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3647             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="F")
3648         )
3649         self.send_and_expect(self.pg0, p, self.pg1)
3650
3651         # FIN out2in
3652         p = (
3653             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3654             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3655             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="F")
3656         )
3657         self.send_and_expect(self.pg1, p, self.pg0)
3658
3659         self.init_tcp_session(
3660             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
3661         )
3662
3663         # 2 records should be produced - first one del & add
3664         capture = self.pg3.get_capture(2)
3665         self.verify_syslog_sess(capture[0][Raw].load, "SDEL")
3666         self.verify_syslog_sess(capture[1][Raw].load, "SADD")
3667
3668     def test_twice_nat_interface_addr(self):
3669         """NAT44ED Acquire twice NAT addresses from interface"""
3670         flags = self.config_flags.NAT_IS_TWICE_NAT
3671         self.vapi.nat44_add_del_interface_addr(
3672             sw_if_index=self.pg11.sw_if_index, flags=flags, is_add=1
3673         )
3674
3675         # no address in NAT pool
3676         adresses = self.vapi.nat44_address_dump()
3677         self.assertEqual(0, len(adresses))
3678
3679         # configure interface address and check NAT address pool
3680         self.pg11.config_ip4()
3681         adresses = self.vapi.nat44_address_dump()
3682         self.assertEqual(1, len(adresses))
3683         self.assertEqual(str(adresses[0].ip_address), self.pg11.local_ip4)
3684         self.assertEqual(adresses[0].flags, flags)
3685
3686         # remove interface address and check NAT address pool
3687         self.pg11.unconfig_ip4()
3688         adresses = self.vapi.nat44_address_dump()
3689         self.assertEqual(0, len(adresses))
3690
3691     def test_output_feature_stateful_acl(self):
3692         """NAT44ED output feature works with stateful ACL"""
3693
3694         self.nat_add_address(self.nat_addr)
3695         self.vapi.nat44_ed_add_del_output_interface(
3696             sw_if_index=self.pg1.sw_if_index, is_add=1
3697         )
3698
3699         # First ensure that the NAT is working sans ACL
3700
3701         # send packets out2in, no sessions yet so packets should drop
3702         pkts_out2in = self.create_stream_out(self.pg1)
3703         self.send_and_assert_no_replies(self.pg1, pkts_out2in)
3704
3705         # send packets into inside intf, ensure received via outside intf
3706         pkts_in2out = self.create_stream_in(self.pg0, self.pg1)
3707         capture = self.send_and_expect(
3708             self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)
3709         )
3710         self.verify_capture_out(capture, ignore_port=True)
3711
3712         # send out2in again, with sessions created it should work now
3713         pkts_out2in = self.create_stream_out(self.pg1)
3714         capture = self.send_and_expect(
3715             self.pg1, pkts_out2in, self.pg0, len(pkts_out2in)
3716         )
3717         self.verify_capture_in(capture, self.pg0)
3718
3719         # Create an ACL blocking everything
3720         out2in_deny_rule = AclRule(is_permit=0)
3721         out2in_acl = VppAcl(self, rules=[out2in_deny_rule])
3722         out2in_acl.add_vpp_config()
3723
3724         # create an ACL to permit/reflect everything
3725         in2out_reflect_rule = AclRule(is_permit=2)
3726         in2out_acl = VppAcl(self, rules=[in2out_reflect_rule])
3727         in2out_acl.add_vpp_config()
3728
3729         # apply as input acl on interface and confirm it blocks everything
3730         acl_if = VppAclInterface(
3731             self, sw_if_index=self.pg1.sw_if_index, n_input=1, acls=[out2in_acl]
3732         )
3733         acl_if.add_vpp_config()
3734         self.send_and_assert_no_replies(self.pg1, pkts_out2in)
3735
3736         # apply output acl
3737         acl_if.acls = [out2in_acl, in2out_acl]
3738         acl_if.add_vpp_config()
3739         # send in2out to generate ACL state (NAT state was created earlier)
3740         capture = self.send_and_expect(
3741             self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)
3742         )
3743         self.verify_capture_out(capture, ignore_port=True)
3744
3745         # send out2in again. ACL state exists so it should work now.
3746         # TCP packets with the syn flag set also need the ack flag
3747         for p in pkts_out2in:
3748             if p.haslayer(TCP) and p[TCP].flags & 0x02:
3749                 p[TCP].flags |= 0x10
3750         capture = self.send_and_expect(
3751             self.pg1, pkts_out2in, self.pg0, len(pkts_out2in)
3752         )
3753         self.verify_capture_in(capture, self.pg0)
3754         self.logger.info(self.vapi.cli("show trace"))
3755
3756     def test_tcp_close(self):
3757         """NAT44ED Close TCP session from inside network - output feature"""
3758         config = self.vapi.nat44_show_running_config()
3759         old_timeouts = config.timeouts
3760         new_transitory = 2
3761         self.vapi.nat_set_timeouts(
3762             udp=old_timeouts.udp,
3763             tcp_established=old_timeouts.tcp_established,
3764             icmp=old_timeouts.icmp,
3765             tcp_transitory=new_transitory,
3766         )
3767
3768         self.vapi.nat44_forwarding_enable_disable(enable=1)
3769         self.nat_add_address(self.pg1.local_ip4)
3770         twice_nat_addr = "10.0.1.3"
3771         service_ip = "192.168.16.150"
3772         self.nat_add_address(twice_nat_addr, twice_nat=1)
3773
3774         flags = self.config_flags.NAT_IS_INSIDE
3775         self.vapi.nat44_interface_add_del_feature(
3776             sw_if_index=self.pg0.sw_if_index, is_add=1
3777         )
3778         self.vapi.nat44_interface_add_del_feature(
3779             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3780         )
3781         self.vapi.nat44_ed_add_del_output_interface(
3782             is_add=1, sw_if_index=self.pg1.sw_if_index
3783         )
3784
3785         flags = (
3786             self.config_flags.NAT_IS_OUT2IN_ONLY | self.config_flags.NAT_IS_TWICE_NAT
3787         )
3788         self.nat_add_static_mapping(
3789             self.pg0.remote_ip4, service_ip, 80, 80, proto=IP_PROTOS.tcp, flags=flags
3790         )
3791         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3792         start_sessnum = len(sessions)
3793
3794         # SYN packet out->in
3795         p = (
3796             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3797             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3798             / TCP(sport=33898, dport=80, flags="S")
3799         )
3800         capture = self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3801         p = capture[0]
3802         tcp_port = p[TCP].sport
3803
3804         # SYN + ACK packet in->out
3805         p = (
3806             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3807             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3808             / TCP(sport=80, dport=tcp_port, flags="SA")
3809         )
3810         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3811
3812         # ACK packet out->in
3813         p = (
3814             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3815             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3816             / TCP(sport=33898, dport=80, flags="A")
3817         )
3818         self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3819
3820         # FIN packet in -> out
3821         p = (
3822             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3823             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3824             / TCP(sport=80, dport=tcp_port, flags="FA", seq=100, ack=300)
3825         )
3826         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3827
3828         # FIN+ACK packet out -> in
3829         p = (
3830             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3831             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3832             / TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101)
3833         )
3834         self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3835
3836         # ACK packet in -> out
3837         p = (
3838             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3839             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3840             / TCP(sport=80, dport=tcp_port, flags="A", seq=101, ack=301)
3841         )
3842         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3843
3844         # session now in transitory timeout, but traffic still flows
3845         # try FIN packet out->in
3846         p = (
3847             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3848             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3849             / TCP(sport=33898, dport=80, flags="F")
3850         )
3851         self.pg1.add_stream(p)
3852         self.pg_enable_capture(self.pg_interfaces)
3853         self.pg_start()
3854
3855         self.virtual_sleep(new_transitory, "wait for transitory timeout")
3856         self.pg0.get_capture(1)
3857
3858         # session should still exist
3859         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3860         self.assertEqual(len(sessions) - start_sessnum, 1)
3861
3862         # send FIN+ACK packet out -> in - will cause session to be wiped
3863         # but won't create a new session
3864         p = (
3865             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3866             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3867             / TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101)
3868         )
3869         self.send_and_assert_no_replies(self.pg1, p)
3870         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3871         self.assertEqual(len(sessions) - start_sessnum, 0)
3872
3873     def test_tcp_session_close_in(self):
3874         """NAT44ED Close TCP session from inside network"""
3875
3876         in_port = self.tcp_port_in
3877         out_port = 10505
3878         ext_port = self.tcp_external_port
3879
3880         self.nat_add_address(self.nat_addr)
3881         self.nat_add_inside_interface(self.pg0)
3882         self.nat_add_outside_interface(self.pg1)
3883         self.nat_add_static_mapping(
3884             self.pg0.remote_ip4,
3885             self.nat_addr,
3886             in_port,
3887             out_port,
3888             proto=IP_PROTOS.tcp,
3889             flags=self.config_flags.NAT_IS_TWICE_NAT,
3890         )
3891
3892         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3893         session_n = len(sessions)
3894
3895         self.vapi.nat_set_timeouts(
3896             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
3897         )
3898
3899         self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
3900
3901         # FIN packet in -> out
3902         p = (
3903             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3904             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3905             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
3906         )
3907         self.send_and_expect(self.pg0, p, self.pg1)
3908         pkts = []
3909
3910         # ACK packet out -> in
3911         p = (
3912             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3913             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3914             / TCP(sport=ext_port, dport=out_port, flags="A", seq=300, ack=101)
3915         )
3916         pkts.append(p)
3917
3918         # FIN packet out -> in
3919         p = (
3920             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3921             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3922             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
3923         )
3924         pkts.append(p)
3925
3926         self.send_and_expect(self.pg1, pkts, self.pg0)
3927
3928         # ACK packet in -> out
3929         p = (
3930             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3931             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3932             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3933         )
3934         self.send_and_expect(self.pg0, p, self.pg1)
3935
3936         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3937         self.assertEqual(len(sessions) - session_n, 1)
3938
3939         # retransmit FIN packet out -> in
3940         p = (
3941             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3942             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3943             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
3944         )
3945
3946         self.send_and_expect(self.pg1, p, self.pg0)
3947
3948         # retransmit ACK packet in -> out
3949         p = (
3950             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3951             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3952             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3953         )
3954         self.send_and_expect(self.pg0, p, self.pg1)
3955
3956         self.virtual_sleep(3)
3957         # retransmit ACK packet in -> out - this will cause session to be wiped
3958         p = (
3959             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3960             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3961             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3962         )
3963         self.send_and_assert_no_replies(self.pg0, p)
3964         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3965         self.assertEqual(len(sessions) - session_n, 0)
3966
3967     def test_tcp_session_close_out(self):
3968         """NAT44ED Close TCP session from outside network"""
3969
3970         in_port = self.tcp_port_in
3971         out_port = 10505
3972         ext_port = self.tcp_external_port
3973
3974         self.nat_add_address(self.nat_addr)
3975         self.nat_add_inside_interface(self.pg0)
3976         self.nat_add_outside_interface(self.pg1)
3977         self.nat_add_static_mapping(
3978             self.pg0.remote_ip4,
3979             self.nat_addr,
3980             in_port,
3981             out_port,
3982             proto=IP_PROTOS.tcp,
3983             flags=self.config_flags.NAT_IS_TWICE_NAT,
3984         )
3985
3986         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3987         session_n = len(sessions)
3988
3989         self.vapi.nat_set_timeouts(
3990             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
3991         )
3992
3993         _ = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
3994
3995         # FIN packet out -> in
3996         p = (
3997             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3998             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3999             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=100, ack=300)
4000         )
4001         self.pg1.add_stream(p)
4002         self.pg_enable_capture(self.pg_interfaces)
4003         self.pg_start()
4004         self.pg0.get_capture(1)
4005
4006         # FIN+ACK packet in -> out
4007         p = (
4008             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4009             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4010             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=300, ack=101)
4011         )
4012
4013         self.pg0.add_stream(p)
4014         self.pg_enable_capture(self.pg_interfaces)
4015         self.pg_start()
4016         self.pg1.get_capture(1)
4017
4018         # ACK packet out -> in
4019         p = (
4020             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4021             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4022             / TCP(sport=ext_port, dport=out_port, flags="A", seq=101, ack=301)
4023         )
4024         self.pg1.add_stream(p)
4025         self.pg_enable_capture(self.pg_interfaces)
4026         self.pg_start()
4027         self.pg0.get_capture(1)
4028
4029         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4030         self.assertEqual(len(sessions) - session_n, 1)
4031
4032         # retransmit FIN packet out -> in
4033         p = (
4034             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4035             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4036             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
4037         )
4038         self.send_and_expect(self.pg1, p, self.pg0)
4039
4040         # retransmit ACK packet in -> out
4041         p = (
4042             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4043             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4044             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4045         )
4046         self.send_and_expect(self.pg0, p, self.pg1)
4047
4048         self.virtual_sleep(3)
4049         # retransmit ACK packet in -> out - this will cause session to be wiped
4050         p = (
4051             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4052             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4053             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4054         )
4055         self.send_and_assert_no_replies(self.pg0, p)
4056         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4057         self.assertEqual(len(sessions) - session_n, 0)
4058
4059     def test_tcp_session_close_simultaneous(self):
4060         """Simultaneous TCP close from both sides"""
4061
4062         in_port = self.tcp_port_in
4063         ext_port = 10505
4064
4065         self.nat_add_address(self.nat_addr)
4066         self.nat_add_inside_interface(self.pg0)
4067         self.nat_add_outside_interface(self.pg1)
4068         self.nat_add_static_mapping(
4069             self.pg0.remote_ip4,
4070             self.nat_addr,
4071             in_port,
4072             ext_port,
4073             proto=IP_PROTOS.tcp,
4074             flags=self.config_flags.NAT_IS_TWICE_NAT,
4075         )
4076
4077         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4078         session_n = len(sessions)
4079
4080         self.vapi.nat_set_timeouts(
4081             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4082         )
4083
4084         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4085
4086         # FIN packet in -> out
4087         p = (
4088             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4089             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4090             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4091         )
4092         self.send_and_expect(self.pg0, p, self.pg1)
4093
4094         # FIN packet out -> in
4095         p = (
4096             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4097             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4098             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4099         )
4100         self.send_and_expect(self.pg1, p, self.pg0)
4101
4102         # ACK packet in -> out
4103         p = (
4104             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4105             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4106             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4107         )
4108         self.send_and_expect(self.pg0, p, self.pg1)
4109
4110         # ACK packet out -> in
4111         p = (
4112             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4113             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4114             / TCP(sport=ext_port, dport=out_port, flags="A", seq=301, ack=101)
4115         )
4116         self.send_and_expect(self.pg1, p, self.pg0)
4117
4118         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4119         self.assertEqual(len(sessions) - session_n, 1)
4120
4121         # retransmit FIN packet out -> in
4122         p = (
4123             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4124             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4125             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
4126         )
4127         self.send_and_expect(self.pg1, p, self.pg0)
4128
4129         # retransmit ACK packet in -> out
4130         p = (
4131             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4132             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4133             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4134         )
4135         self.send_and_expect(self.pg0, p, self.pg1)
4136
4137         self.virtual_sleep(3)
4138         # retransmit ACK packet in -> out - this will cause session to be wiped
4139         p = (
4140             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4141             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4142             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4143         )
4144         self.pg_send(self.pg0, p)
4145         self.send_and_assert_no_replies(self.pg0, p)
4146         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4147         self.assertEqual(len(sessions) - session_n, 0)
4148
4149     def test_tcp_session_half_reopen_inside(self):
4150         """TCP session in FIN/FIN state not reopened by in2out SYN only"""
4151         in_port = self.tcp_port_in
4152         ext_port = 10505
4153
4154         self.nat_add_address(self.nat_addr)
4155         self.nat_add_inside_interface(self.pg0)
4156         self.nat_add_outside_interface(self.pg1)
4157         self.nat_add_static_mapping(
4158             self.pg0.remote_ip4,
4159             self.nat_addr,
4160             in_port,
4161             ext_port,
4162             proto=IP_PROTOS.tcp,
4163             flags=self.config_flags.NAT_IS_TWICE_NAT,
4164         )
4165
4166         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4167         session_n = len(sessions)
4168
4169         self.vapi.nat_set_timeouts(
4170             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4171         )
4172
4173         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4174
4175         # FIN packet in -> out
4176         p = (
4177             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4178             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4179             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4180         )
4181         self.send_and_expect(self.pg0, p, self.pg1)
4182
4183         # FIN packet out -> in
4184         p = (
4185             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4186             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4187             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4188         )
4189         self.send_and_expect(self.pg1, p, self.pg0)
4190
4191         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4192         self.assertEqual(len(sessions) - session_n, 1)
4193
4194         # send SYN packet in -> out
4195         p = (
4196             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4197             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4198             / TCP(sport=in_port, dport=ext_port, flags="S", seq=101, ack=301)
4199         )
4200         self.send_and_expect(self.pg0, p, self.pg1)
4201
4202         self.virtual_sleep(3)
4203         # send ACK packet in -> out - session should be wiped
4204         p = (
4205             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4206             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4207             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4208         )
4209         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
4210         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4211         self.assertEqual(len(sessions) - session_n, 0)
4212
4213     def test_tcp_session_half_reopen_outside(self):
4214         """TCP session in FIN/FIN state not reopened by out2in SYN only"""
4215         in_port = self.tcp_port_in
4216         ext_port = 10505
4217
4218         self.nat_add_address(self.nat_addr)
4219         self.nat_add_inside_interface(self.pg0)
4220         self.nat_add_outside_interface(self.pg1)
4221         self.nat_add_static_mapping(
4222             self.pg0.remote_ip4,
4223             self.nat_addr,
4224             in_port,
4225             ext_port,
4226             proto=IP_PROTOS.tcp,
4227             flags=self.config_flags.NAT_IS_TWICE_NAT,
4228         )
4229
4230         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4231         session_n = len(sessions)
4232
4233         self.vapi.nat_set_timeouts(
4234             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4235         )
4236
4237         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4238
4239         # FIN packet in -> out
4240         p = (
4241             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4242             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4243             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4244         )
4245         self.send_and_expect(self.pg0, p, self.pg1)
4246
4247         # FIN packet out -> in
4248         p = (
4249             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4250             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4251             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4252         )
4253         self.send_and_expect(self.pg1, p, self.pg0)
4254
4255         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4256         self.assertEqual(len(sessions) - session_n, 1)
4257
4258         # send SYN packet out -> in
4259         p = (
4260             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4261             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4262             / TCP(sport=ext_port, dport=out_port, flags="S", seq=300, ack=101)
4263         )
4264         self.send_and_expect(self.pg1, p, self.pg0)
4265
4266         self.virtual_sleep(3)
4267         # send ACK packet in -> out - session should be wiped
4268         p = (
4269             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4270             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4271             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4272         )
4273         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
4274         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4275         self.assertEqual(len(sessions) - session_n, 0)
4276
4277     def test_tcp_session_reopen(self):
4278         """TCP session in FIN/FIN state reopened by SYN from both sides"""
4279         in_port = self.tcp_port_in
4280         ext_port = 10505
4281
4282         self.nat_add_address(self.nat_addr)
4283         self.nat_add_inside_interface(self.pg0)
4284         self.nat_add_outside_interface(self.pg1)
4285         self.nat_add_static_mapping(
4286             self.pg0.remote_ip4,
4287             self.nat_addr,
4288             in_port,
4289             ext_port,
4290             proto=IP_PROTOS.tcp,
4291             flags=self.config_flags.NAT_IS_TWICE_NAT,
4292         )
4293
4294         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4295         session_n = len(sessions)
4296
4297         self.vapi.nat_set_timeouts(
4298             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4299         )
4300
4301         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4302
4303         # FIN packet in -> out
4304         p = (
4305             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4306             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4307             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4308         )
4309         self.send_and_expect(self.pg0, p, self.pg1)
4310
4311         # FIN packet out -> in
4312         p = (
4313             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4314             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4315             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4316         )
4317         self.send_and_expect(self.pg1, p, self.pg0)
4318
4319         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4320         self.assertEqual(len(sessions) - session_n, 1)
4321
4322         # send SYN packet out -> in
4323         p = (
4324             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4325             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4326             / TCP(sport=ext_port, dport=out_port, flags="S", seq=300, ack=101)
4327         )
4328         self.send_and_expect(self.pg1, p, self.pg0)
4329
4330         # send SYN packet in -> out
4331         p = (
4332             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4333             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4334             / TCP(sport=in_port, dport=ext_port, flags="SA", seq=101, ack=301)
4335         )
4336         self.send_and_expect(self.pg0, p, self.pg1)
4337
4338         # send ACK packet out -> in
4339         p = (
4340             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4341             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4342             / TCP(sport=ext_port, dport=out_port, flags="A", seq=300, ack=101)
4343         )
4344         self.send_and_expect(self.pg1, p, self.pg0)
4345
4346         self.virtual_sleep(3)
4347         # send ACK packet in -> out - should be forwarded and session alive
4348         p = (
4349             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4350             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4351             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4352         )
4353         self.send_and_expect(self.pg0, p, self.pg1)
4354         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4355         self.assertEqual(len(sessions) - session_n, 1)
4356
4357     def test_dynamic_vrf(self):
4358         """NAT44ED dynamic translation test: different VRF"""
4359
4360         vrf_id_in = 33
4361         vrf_id_out = 34
4362
4363         self.nat_add_address(self.nat_addr, vrf_id=vrf_id_in)
4364
4365         try:
4366             self.configure_ip4_interface(self.pg7, table_id=vrf_id_in)
4367             self.configure_ip4_interface(self.pg8, table_id=vrf_id_out)
4368
4369             self.nat_add_inside_interface(self.pg7)
4370             self.nat_add_outside_interface(self.pg8)
4371
4372             # just basic stuff nothing special
4373             pkts = self.create_stream_in(self.pg7, self.pg8)
4374             self.pg7.add_stream(pkts)
4375             self.pg_enable_capture(self.pg_interfaces)
4376             self.pg_start()
4377             capture = self.pg8.get_capture(len(pkts))
4378             self.verify_capture_out(capture, ignore_port=True)
4379
4380             pkts = self.create_stream_out(self.pg8)
4381             self.pg8.add_stream(pkts)
4382             self.pg_enable_capture(self.pg_interfaces)
4383             self.pg_start()
4384             capture = self.pg7.get_capture(len(pkts))
4385             self.verify_capture_in(capture, self.pg7)
4386
4387         finally:
4388             self.pg7.unconfig()
4389             self.pg8.unconfig()
4390
4391             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id_in})
4392             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id_out})
4393
4394     def test_dynamic_output_feature_vrf(self):
4395         """NAT44ED dynamic translation test: output-feature, VRF"""
4396
4397         # other then default (0)
4398         new_vrf_id = 22
4399
4400         self.nat_add_address(self.nat_addr)
4401         self.vapi.nat44_ed_add_del_output_interface(
4402             sw_if_index=self.pg8.sw_if_index, is_add=1
4403         )
4404         try:
4405             self.configure_ip4_interface(self.pg7, table_id=new_vrf_id)
4406             self.configure_ip4_interface(self.pg8, table_id=new_vrf_id)
4407
4408             # in2out
4409             tcpn = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
4410             udpn = self.statistics["/nat44-ed/in2out/slowpath/udp"]
4411             icmpn = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
4412             drops = self.statistics["/nat44-ed/in2out/slowpath/drops"]
4413
4414             pkts = self.create_stream_in(self.pg7, self.pg8)
4415             self.pg7.add_stream(pkts)
4416             self.pg_enable_capture(self.pg_interfaces)
4417             self.pg_start()
4418             capture = self.pg8.get_capture(len(pkts))
4419             self.verify_capture_out(capture, ignore_port=True)
4420
4421             if_idx = self.pg8.sw_if_index
4422             cnt = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
4423             self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
4424             cnt = self.statistics["/nat44-ed/in2out/slowpath/udp"]
4425             self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
4426             cnt = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
4427             self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
4428             cnt = self.statistics["/nat44-ed/in2out/slowpath/drops"]
4429             self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
4430
4431             # out2in
4432             tcpn = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
4433             udpn = self.statistics["/nat44-ed/out2in/fastpath/udp"]
4434             icmpn = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
4435             drops = self.statistics["/nat44-ed/out2in/fastpath/drops"]
4436
4437             pkts = self.create_stream_out(self.pg8)
4438             self.pg8.add_stream(pkts)
4439             self.pg_enable_capture(self.pg_interfaces)
4440             self.pg_start()
4441             capture = self.pg7.get_capture(len(pkts))
4442             self.verify_capture_in(capture, self.pg7)
4443
4444             if_idx = self.pg8.sw_if_index
4445             cnt = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
4446             self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
4447             cnt = self.statistics["/nat44-ed/out2in/fastpath/udp"]
4448             self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
4449             cnt = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
4450             self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
4451             cnt = self.statistics["/nat44-ed/out2in/fastpath/drops"]
4452             self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
4453
4454             sessions = self.statistics["/nat44-ed/total-sessions"]
4455             self.assertEqual(sessions[:, 0].sum(), 3)
4456
4457         finally:
4458             self.pg7.unconfig()
4459             self.pg8.unconfig()
4460
4461             self.vapi.ip_table_add_del(is_add=0, table={"table_id": new_vrf_id})
4462
4463     def test_next_src_nat(self):
4464         """NAT44ED On way back forward packet to nat44-in2out node."""
4465
4466         twice_nat_addr = "10.0.1.3"
4467         external_port = 80
4468         local_port = 8080
4469         post_twice_nat_port = 0
4470
4471         self.vapi.nat44_forwarding_enable_disable(enable=1)
4472         self.nat_add_address(twice_nat_addr, twice_nat=1)
4473         flags = (
4474             self.config_flags.NAT_IS_OUT2IN_ONLY
4475             | self.config_flags.NAT_IS_SELF_TWICE_NAT
4476         )
4477         self.nat_add_static_mapping(
4478             self.pg6.remote_ip4,
4479             self.pg1.remote_ip4,
4480             local_port,
4481             external_port,
4482             proto=IP_PROTOS.tcp,
4483             vrf_id=1,
4484             flags=flags,
4485         )
4486         self.vapi.nat44_interface_add_del_feature(
4487             sw_if_index=self.pg6.sw_if_index, is_add=1
4488         )
4489
4490         p = (
4491             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
4492             / IP(src=self.pg6.remote_ip4, dst=self.pg1.remote_ip4)
4493             / TCP(sport=12345, dport=external_port)
4494         )
4495         self.pg6.add_stream(p)
4496         self.pg_enable_capture(self.pg_interfaces)
4497         self.pg_start()
4498         capture = self.pg6.get_capture(1)
4499         p = capture[0]
4500         try:
4501             ip = p[IP]
4502             tcp = p[TCP]
4503             self.assertEqual(ip.src, twice_nat_addr)
4504             self.assertNotEqual(tcp.sport, 12345)
4505             post_twice_nat_port = tcp.sport
4506             self.assertEqual(ip.dst, self.pg6.remote_ip4)
4507             self.assertEqual(tcp.dport, local_port)
4508             self.assert_packet_checksums_valid(p)
4509         except:
4510             self.logger.error(ppp("Unexpected or invalid packet:", p))
4511             raise
4512
4513         p = (
4514             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
4515             / IP(src=self.pg6.remote_ip4, dst=twice_nat_addr)
4516             / TCP(sport=local_port, dport=post_twice_nat_port)
4517         )
4518         self.pg6.add_stream(p)
4519         self.pg_enable_capture(self.pg_interfaces)
4520         self.pg_start()
4521         capture = self.pg6.get_capture(1)
4522         p = capture[0]
4523         try:
4524             ip = p[IP]
4525             tcp = p[TCP]
4526             self.assertEqual(ip.src, self.pg1.remote_ip4)
4527             self.assertEqual(tcp.sport, external_port)
4528             self.assertEqual(ip.dst, self.pg6.remote_ip4)
4529             self.assertEqual(tcp.dport, 12345)
4530             self.assert_packet_checksums_valid(p)
4531         except:
4532             self.logger.error(ppp("Unexpected or invalid packet:", p))
4533             raise
4534
4535     def test_one_armed_nat44_static(self):
4536         """NAT44ED One armed NAT and 1:1 NAPT asymmetrical rule"""
4537
4538         remote_host = self.pg4.remote_hosts[0]
4539         local_host = self.pg4.remote_hosts[1]
4540         external_port = 80
4541         local_port = 8080
4542         eh_port_in = 0
4543
4544         self.vapi.nat44_forwarding_enable_disable(enable=1)
4545         self.nat_add_address(self.nat_addr, twice_nat=1)
4546         flags = (
4547             self.config_flags.NAT_IS_OUT2IN_ONLY | self.config_flags.NAT_IS_TWICE_NAT
4548         )
4549         self.nat_add_static_mapping(
4550             local_host.ip4,
4551             self.nat_addr,
4552             local_port,
4553             external_port,
4554             proto=IP_PROTOS.tcp,
4555             flags=flags,
4556         )
4557         flags = self.config_flags.NAT_IS_INSIDE
4558         self.vapi.nat44_interface_add_del_feature(
4559             sw_if_index=self.pg4.sw_if_index, is_add=1
4560         )
4561         self.vapi.nat44_interface_add_del_feature(
4562             sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
4563         )
4564
4565         # from client to service
4566         p = (
4567             Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
4568             / IP(src=remote_host.ip4, dst=self.nat_addr)
4569             / TCP(sport=12345, dport=external_port)
4570         )
4571         self.pg4.add_stream(p)
4572         self.pg_enable_capture(self.pg_interfaces)
4573         self.pg_start()
4574         capture = self.pg4.get_capture(1)
4575         p = capture[0]
4576         try:
4577             ip = p[IP]
4578             tcp = p[TCP]
4579             self.assertEqual(ip.dst, local_host.ip4)
4580             self.assertEqual(ip.src, self.nat_addr)
4581             self.assertEqual(tcp.dport, local_port)
4582             self.assertNotEqual(tcp.sport, 12345)
4583             eh_port_in = tcp.sport
4584             self.assert_packet_checksums_valid(p)
4585         except:
4586             self.logger.error(ppp("Unexpected or invalid packet:", p))
4587             raise
4588
4589         # from service back to client
4590         p = (
4591             Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
4592             / IP(src=local_host.ip4, dst=self.nat_addr)
4593             / TCP(sport=local_port, dport=eh_port_in)
4594         )
4595         self.pg4.add_stream(p)
4596         self.pg_enable_capture(self.pg_interfaces)
4597         self.pg_start()
4598         capture = self.pg4.get_capture(1)
4599         p = capture[0]
4600         try:
4601             ip = p[IP]
4602             tcp = p[TCP]
4603             self.assertEqual(ip.src, self.nat_addr)
4604             self.assertEqual(ip.dst, remote_host.ip4)
4605             self.assertEqual(tcp.sport, external_port)
4606             self.assertEqual(tcp.dport, 12345)
4607             self.assert_packet_checksums_valid(p)
4608         except:
4609             self.logger.error(ppp("Unexpected or invalid packet:", p))
4610             raise
4611
4612     def test_icmp_error_fwd_outbound(self):
4613         """NAT44ED ICMP error outbound with forwarding enabled"""
4614
4615         # Ensure that an outbound ICMP error message is properly associated
4616         # with the inbound forward bypass session it is related to.
4617         payload = "H" * 10
4618
4619         self.nat_add_address(self.nat_addr)
4620         self.nat_add_inside_interface(self.pg0)
4621         self.nat_add_outside_interface(self.pg1)
4622
4623         # enable forwarding and initiate connection out2in
4624         self.vapi.nat44_forwarding_enable_disable(enable=1)
4625         p1 = (
4626             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4627             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
4628             / UDP(sport=21, dport=20)
4629             / payload
4630         )
4631
4632         self.pg1.add_stream(p1)
4633         self.pg_enable_capture(self.pg_interfaces)
4634         self.pg_start()
4635         capture = self.pg0.get_capture(1)[0]
4636
4637         self.logger.info(self.vapi.cli("show nat44 sessions"))
4638
4639         # reply with ICMP error message in2out
4640         # We cannot reliably retrieve forward bypass sessions via the API.
4641         # session dumps for a user will only look on the worker that the
4642         # user is supposed to be mapped to in2out. The forward bypass session
4643         # is not necessarily created on that worker.
4644         p2 = (
4645             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4646             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4647             / ICMP(type="dest-unreach", code="port-unreachable")
4648             / capture[IP:]
4649         )
4650
4651         self.pg0.add_stream(p2)
4652         self.pg_enable_capture(self.pg_interfaces)
4653         self.pg_start()
4654         capture = self.pg1.get_capture(1)[0]
4655
4656         self.logger.info(self.vapi.cli("show nat44 sessions"))
4657
4658         self.logger.info(ppp("p1 packet:", p1))
4659         self.logger.info(ppp("p2 packet:", p2))
4660         self.logger.info(ppp("capture packet:", capture))
4661
4662     def test_tcp_session_open_retransmit1(self):
4663         """NAT44ED Open TCP session with SYN,ACK retransmit 1
4664
4665         The client does not receive the [SYN,ACK] or the
4666         ACK from the client is lost. Therefore, the [SYN, ACK]
4667         is retransmitted by the server.
4668         """
4669
4670         in_port = self.tcp_port_in
4671         ext_port = self.tcp_external_port
4672         payload = "H" * 10
4673
4674         self.nat_add_address(self.nat_addr)
4675         self.nat_add_inside_interface(self.pg0)
4676         self.nat_add_outside_interface(self.pg1)
4677
4678         self.vapi.nat_set_timeouts(
4679             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
4680         )
4681         # SYN packet in->out
4682         p = (
4683             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4684             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4685             / TCP(sport=in_port, dport=ext_port, flags="S")
4686         )
4687         p = self.send_and_expect(self.pg0, p, self.pg1)[0]
4688         out_port = p[TCP].sport
4689
4690         # SYN + ACK packet out->in
4691         p = (
4692             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4693             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4694             / TCP(sport=ext_port, dport=out_port, flags="SA")
4695         )
4696         self.send_and_expect(self.pg1, p, self.pg0)
4697
4698         # ACK in->out does not arrive
4699
4700         # resent SYN + ACK packet out->in
4701         p = (
4702             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4703             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4704             / TCP(sport=ext_port, dport=out_port, flags="SA")
4705         )
4706         self.send_and_expect(self.pg1, p, self.pg0)
4707
4708         # ACK packet in->out
4709         p = (
4710             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4711             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4712             / TCP(sport=in_port, dport=ext_port, flags="A")
4713         )
4714         self.send_and_expect(self.pg0, p, self.pg1)
4715
4716         # Verify that the data can be transmitted after the transitory time
4717         self.virtual_sleep(6)
4718
4719         p = (
4720             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4721             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4722             / TCP(sport=in_port, dport=ext_port, flags="PA")
4723             / Raw(payload)
4724         )
4725         self.send_and_expect(self.pg0, p, self.pg1)
4726
4727     def test_tcp_session_open_retransmit2(self):
4728         """NAT44ED Open TCP session with SYN,ACK retransmit 2
4729
4730         The ACK is lost to the server after the TCP session is opened.
4731         Data is sent by the client, then the [SYN,ACK] is
4732         retransmitted by the server.
4733         """
4734
4735         in_port = self.tcp_port_in
4736         ext_port = self.tcp_external_port
4737         payload = "H" * 10
4738
4739         self.nat_add_address(self.nat_addr)
4740         self.nat_add_inside_interface(self.pg0)
4741         self.nat_add_outside_interface(self.pg1)
4742
4743         self.vapi.nat_set_timeouts(
4744             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
4745         )
4746         # SYN packet in->out
4747         p = (
4748             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4749             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4750             / TCP(sport=in_port, dport=ext_port, flags="S")
4751         )
4752         p = self.send_and_expect(self.pg0, p, self.pg1)[0]
4753         out_port = p[TCP].sport
4754
4755         # SYN + ACK packet out->in
4756         p = (
4757             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4758             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4759             / TCP(sport=ext_port, dport=out_port, flags="SA")
4760         )
4761         self.send_and_expect(self.pg1, p, self.pg0)
4762
4763         # ACK packet in->out -- not received by the server
4764         p = (
4765             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4766             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4767             / TCP(sport=in_port, dport=ext_port, flags="A")
4768         )
4769         self.send_and_expect(self.pg0, p, self.pg1)
4770
4771         # PUSH + ACK packet in->out
4772         p = (
4773             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4774             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4775             / TCP(sport=in_port, dport=ext_port, flags="PA")
4776             / Raw(payload)
4777         )
4778         self.send_and_expect(self.pg0, p, self.pg1)
4779
4780         # resent SYN + ACK packet out->in
4781         p = (
4782             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4783             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4784             / TCP(sport=ext_port, dport=out_port, flags="SA")
4785         )
4786         self.send_and_expect(self.pg1, p, self.pg0)
4787
4788         # resent ACK packet in->out
4789         p = (
4790             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4791             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4792             / TCP(sport=in_port, dport=ext_port, flags="A")
4793         )
4794         self.send_and_expect(self.pg0, p, self.pg1)
4795
4796         # resent PUSH + ACK packet in->out
4797         p = (
4798             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4799             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4800             / TCP(sport=in_port, dport=ext_port, flags="PA")
4801             / Raw(payload)
4802         )
4803         self.send_and_expect(self.pg0, p, self.pg1)
4804
4805         # ACK packet out->in
4806         p = (
4807             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4808             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4809             / TCP(sport=ext_port, dport=out_port, flags="A")
4810         )
4811         self.send_and_expect(self.pg1, p, self.pg0)
4812
4813         # Verify that the data can be transmitted after the transitory time
4814         self.virtual_sleep(6)
4815
4816         p = (
4817             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4818             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4819             / TCP(sport=in_port, dport=ext_port, flags="PA")
4820             / Raw(payload)
4821         )
4822         self.send_and_expect(self.pg0, p, self.pg1)
4823
4824
4825 if __name__ == "__main__":
4826     unittest.main(testRunner=VppTestRunner)