http_static: misc bug fixes
[vpp.git] / test / test_nat44_ed.py
1 #!/usr/bin/env python3
2
3 import unittest
4 from io import BytesIO
5 from random import randint, choice
6
7 import scapy.compat
8 from framework import tag_fixme_ubuntu2204, is_distro_ubuntu2204
9 from framework import VppTestCase, VppTestRunner
10 from scapy.data import IP_PROTOS
11 from scapy.layers.inet import IP, TCP, UDP, ICMP, GRE
12 from scapy.layers.inet import IPerror, TCPerror
13 from scapy.layers.l2 import Ether
14 from scapy.packet import Raw
15 from syslog_rfc5424_parser import SyslogMessage, ParseError
16 from syslog_rfc5424_parser.constants import SyslogSeverity
17 from util import ppp, pr, ip4_range
18 from vpp_acl import AclRule, VppAcl, VppAclInterface
19 from vpp_ip_route import VppIpRoute, VppRoutePath
20 from vpp_papi import VppEnum
21 from util import StatsDiff
22
23
24 class TestNAT44ED(VppTestCase):
25     """NAT44ED Test Case"""
26
27     nat_addr = "10.0.10.3"
28
29     tcp_port_in = 6303
30     tcp_port_out = 6303
31
32     udp_port_in = 6304
33     udp_port_out = 6304
34
35     icmp_id_in = 6305
36     icmp_id_out = 6305
37
38     tcp_external_port = 80
39
40     max_sessions = 100
41
42     def setUp(self):
43         super().setUp()
44         self.plugin_enable()
45
46     def tearDown(self):
47         super().tearDown()
48         if not self.vpp_dead:
49             self.plugin_disable()
50
51     def plugin_enable(self):
52         self.vapi.nat44_ed_plugin_enable_disable(sessions=self.max_sessions, enable=1)
53
54     def plugin_disable(self):
55         self.vapi.nat44_ed_plugin_enable_disable(enable=0)
56
57     @property
58     def config_flags(self):
59         return VppEnum.vl_api_nat_config_flags_t
60
61     @property
62     def nat44_config_flags(self):
63         return VppEnum.vl_api_nat44_config_flags_t
64
65     @property
66     def syslog_severity(self):
67         return VppEnum.vl_api_syslog_severity_t
68
69     @property
70     def server_addr(self):
71         return self.pg1.remote_hosts[0].ip4
72
73     @staticmethod
74     def random_port():
75         return randint(1024, 65535)
76
77     @staticmethod
78     def proto2layer(proto):
79         if proto == IP_PROTOS.tcp:
80             return TCP
81         elif proto == IP_PROTOS.udp:
82             return UDP
83         elif proto == IP_PROTOS.icmp:
84             return ICMP
85         else:
86             raise Exception("Unsupported protocol")
87
88     @classmethod
89     def create_and_add_ip4_table(cls, i, table_id=0):
90         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": table_id})
91         i.set_table_ip4(table_id)
92
93     @classmethod
94     def configure_ip4_interface(cls, i, hosts=0, table_id=None):
95         if table_id:
96             cls.create_and_add_ip4_table(i, table_id)
97
98         i.admin_up()
99         i.config_ip4()
100         i.resolve_arp()
101
102         if hosts:
103             i.generate_remote_hosts(hosts)
104             i.configure_ipv4_neighbors()
105
106     @classmethod
107     def nat_add_interface_address(cls, i):
108         cls.vapi.nat44_add_del_interface_addr(sw_if_index=i.sw_if_index, is_add=1)
109
110     def nat_add_inside_interface(self, i):
111         self.vapi.nat44_interface_add_del_feature(
112             flags=self.config_flags.NAT_IS_INSIDE, sw_if_index=i.sw_if_index, is_add=1
113         )
114
115     def nat_add_outside_interface(self, i):
116         self.vapi.nat44_interface_add_del_feature(
117             flags=self.config_flags.NAT_IS_OUTSIDE, sw_if_index=i.sw_if_index, is_add=1
118         )
119
120     def nat_add_address(self, address, twice_nat=0, vrf_id=0xFFFFFFFF, is_add=1):
121         flags = self.config_flags.NAT_IS_TWICE_NAT if twice_nat else 0
122         self.vapi.nat44_add_del_address_range(
123             first_ip_address=address,
124             last_ip_address=address,
125             vrf_id=vrf_id,
126             is_add=is_add,
127             flags=flags,
128         )
129
130     def nat_add_static_mapping(
131         self,
132         local_ip,
133         external_ip="0.0.0.0",
134         local_port=0,
135         external_port=0,
136         vrf_id=0,
137         is_add=1,
138         external_sw_if_index=0xFFFFFFFF,
139         proto=0,
140         tag="",
141         flags=0,
142     ):
143
144         if not (local_port and external_port):
145             flags |= self.config_flags.NAT_IS_ADDR_ONLY
146
147         self.vapi.nat44_add_del_static_mapping(
148             is_add=is_add,
149             local_ip_address=local_ip,
150             external_ip_address=external_ip,
151             external_sw_if_index=external_sw_if_index,
152             local_port=local_port,
153             external_port=external_port,
154             vrf_id=vrf_id,
155             protocol=proto,
156             flags=flags,
157             tag=tag,
158         )
159
160     @classmethod
161     def setUpClass(cls):
162         super().setUpClass()
163         if is_distro_ubuntu2204 == True and not hasattr(cls, "vpp"):
164             return
165
166         cls.create_pg_interfaces(range(12))
167         cls.interfaces = list(cls.pg_interfaces[:4])
168
169         cls.create_and_add_ip4_table(cls.pg2, 10)
170
171         for i in cls.interfaces:
172             cls.configure_ip4_interface(i, hosts=3)
173
174         # test specific (test-multiple-vrf)
175         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 1})
176
177         # test specific (test-one-armed-nat44-static)
178         cls.pg4.generate_remote_hosts(2)
179         cls.pg4.config_ip4()
180         cls.vapi.sw_interface_add_del_address(
181             sw_if_index=cls.pg4.sw_if_index, prefix="10.0.0.1/24"
182         )
183         cls.pg4.admin_up()
184         cls.pg4.resolve_arp()
185         cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
186         cls.pg4.resolve_arp()
187
188         # test specific interface (pg5)
189         cls.pg5._local_ip4 = "10.1.1.1"
190         cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
191         cls.pg5.set_table_ip4(1)
192         cls.pg5.config_ip4()
193         cls.pg5.admin_up()
194         cls.pg5.resolve_arp()
195
196         # test specific interface (pg6)
197         cls.pg6._local_ip4 = "10.1.2.1"
198         cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
199         cls.pg6.set_table_ip4(1)
200         cls.pg6.config_ip4()
201         cls.pg6.admin_up()
202         cls.pg6.resolve_arp()
203
204         rl = list()
205
206         rl.append(
207             VppIpRoute(
208                 cls,
209                 "0.0.0.0",
210                 0,
211                 [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=0)],
212                 register=False,
213                 table_id=1,
214             )
215         )
216         rl.append(
217             VppIpRoute(
218                 cls,
219                 "0.0.0.0",
220                 0,
221                 [VppRoutePath(cls.pg1.local_ip4, cls.pg1.sw_if_index)],
222                 register=False,
223             )
224         )
225         rl.append(
226             VppIpRoute(
227                 cls,
228                 cls.pg5.remote_ip4,
229                 32,
230                 [VppRoutePath("0.0.0.0", cls.pg5.sw_if_index)],
231                 register=False,
232                 table_id=1,
233             )
234         )
235         rl.append(
236             VppIpRoute(
237                 cls,
238                 cls.pg6.remote_ip4,
239                 32,
240                 [VppRoutePath("0.0.0.0", cls.pg6.sw_if_index)],
241                 register=False,
242                 table_id=1,
243             )
244         )
245         rl.append(
246             VppIpRoute(
247                 cls,
248                 cls.pg6.remote_ip4,
249                 16,
250                 [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=1)],
251                 register=False,
252                 table_id=0,
253             )
254         )
255
256         for r in rl:
257             r.add_vpp_config()
258
259         cls.no_diff = StatsDiff(
260             {
261                 pg.sw_if_index: {
262                     "/nat44-ed/in2out/fastpath/tcp": 0,
263                     "/nat44-ed/in2out/fastpath/udp": 0,
264                     "/nat44-ed/in2out/fastpath/icmp": 0,
265                     "/nat44-ed/in2out/fastpath/drops": 0,
266                     "/nat44-ed/in2out/slowpath/tcp": 0,
267                     "/nat44-ed/in2out/slowpath/udp": 0,
268                     "/nat44-ed/in2out/slowpath/icmp": 0,
269                     "/nat44-ed/in2out/slowpath/drops": 0,
270                     "/nat44-ed/in2out/fastpath/tcp": 0,
271                     "/nat44-ed/in2out/fastpath/udp": 0,
272                     "/nat44-ed/in2out/fastpath/icmp": 0,
273                     "/nat44-ed/in2out/fastpath/drops": 0,
274                     "/nat44-ed/in2out/slowpath/tcp": 0,
275                     "/nat44-ed/in2out/slowpath/udp": 0,
276                     "/nat44-ed/in2out/slowpath/icmp": 0,
277                     "/nat44-ed/in2out/slowpath/drops": 0,
278                 }
279                 for pg in cls.pg_interfaces
280             }
281         )
282
283     def get_err_counter(self, path):
284         return self.statistics.get_err_counter(path)
285
286     def reass_hairpinning(
287         self,
288         server_addr,
289         server_in_port,
290         server_out_port,
291         host_in_port,
292         proto=IP_PROTOS.tcp,
293         ignore_port=False,
294     ):
295         layer = self.proto2layer(proto)
296
297         if proto == IP_PROTOS.tcp:
298             data = b"A" * 4 + b"B" * 16 + b"C" * 3
299         else:
300             data = b"A" * 16 + b"B" * 16 + b"C" * 3
301
302         # send packet from host to server
303         pkts = self.create_stream_frag(
304             self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto
305         )
306         self.pg0.add_stream(pkts)
307         self.pg_enable_capture(self.pg_interfaces)
308         self.pg_start()
309         frags = self.pg0.get_capture(len(pkts))
310         p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr)
311         if proto != IP_PROTOS.icmp:
312             if not ignore_port:
313                 self.assertNotEqual(p[layer].sport, host_in_port)
314             self.assertEqual(p[layer].dport, server_in_port)
315         else:
316             if not ignore_port:
317                 self.assertNotEqual(p[layer].id, host_in_port)
318         self.assertEqual(data, p[Raw].load)
319
320     def frag_out_of_order(
321         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
322     ):
323         layer = self.proto2layer(proto)
324
325         if proto == IP_PROTOS.tcp:
326             data = b"A" * 4 + b"B" * 16 + b"C" * 3
327         else:
328             data = b"A" * 16 + b"B" * 16 + b"C" * 3
329         self.port_in = self.random_port()
330
331         for i in range(2):
332             # in2out
333             pkts = self.create_stream_frag(
334                 self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
335             )
336             pkts.reverse()
337             self.pg0.add_stream(pkts)
338             self.pg_enable_capture(self.pg_interfaces)
339             self.pg_start()
340             frags = self.pg1.get_capture(len(pkts))
341             if not dont_translate:
342                 p = self.reass_frags_and_verify(
343                     frags, self.nat_addr, self.pg1.remote_ip4
344                 )
345             else:
346                 p = self.reass_frags_and_verify(
347                     frags, self.pg0.remote_ip4, self.pg1.remote_ip4
348                 )
349             if proto != IP_PROTOS.icmp:
350                 if not dont_translate:
351                     self.assertEqual(p[layer].dport, 20)
352                     if not ignore_port:
353                         self.assertNotEqual(p[layer].sport, self.port_in)
354                 else:
355                     self.assertEqual(p[layer].sport, self.port_in)
356             else:
357                 if not ignore_port:
358                     if not dont_translate:
359                         self.assertNotEqual(p[layer].id, self.port_in)
360                     else:
361                         self.assertEqual(p[layer].id, self.port_in)
362             self.assertEqual(data, p[Raw].load)
363
364             # out2in
365             if not dont_translate:
366                 dst_addr = self.nat_addr
367             else:
368                 dst_addr = self.pg0.remote_ip4
369             if proto != IP_PROTOS.icmp:
370                 sport = 20
371                 dport = p[layer].sport
372             else:
373                 sport = p[layer].id
374                 dport = 0
375             pkts = self.create_stream_frag(
376                 self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
377             )
378             pkts.reverse()
379             self.pg1.add_stream(pkts)
380             self.pg_enable_capture(self.pg_interfaces)
381             self.logger.info(self.vapi.cli("show trace"))
382             self.pg_start()
383             frags = self.pg0.get_capture(len(pkts))
384             p = self.reass_frags_and_verify(
385                 frags, self.pg1.remote_ip4, self.pg0.remote_ip4
386             )
387             if proto != IP_PROTOS.icmp:
388                 self.assertEqual(p[layer].sport, 20)
389                 self.assertEqual(p[layer].dport, self.port_in)
390             else:
391                 self.assertEqual(p[layer].id, self.port_in)
392             self.assertEqual(data, p[Raw].load)
393
394     def reass_frags_and_verify(self, frags, src, dst):
395         buffer = BytesIO()
396         for p in frags:
397             self.assertEqual(p[IP].src, src)
398             self.assertEqual(p[IP].dst, dst)
399             self.assert_ip_checksum_valid(p)
400             buffer.seek(p[IP].frag * 8)
401             buffer.write(bytes(p[IP].payload))
402         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
403         if ip.proto == IP_PROTOS.tcp:
404             p = ip / TCP(buffer.getvalue())
405             self.logger.debug(ppp("Reassembled:", p))
406             self.assert_tcp_checksum_valid(p)
407         elif ip.proto == IP_PROTOS.udp:
408             p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
409         elif ip.proto == IP_PROTOS.icmp:
410             p = ip / ICMP(buffer.getvalue())
411         return p
412
413     def frag_in_order(
414         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
415     ):
416         layer = self.proto2layer(proto)
417
418         if proto == IP_PROTOS.tcp:
419             data = b"A" * 4 + b"B" * 16 + b"C" * 3
420         else:
421             data = b"A" * 16 + b"B" * 16 + b"C" * 3
422         self.port_in = self.random_port()
423
424         # in2out
425         pkts = self.create_stream_frag(
426             self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
427         )
428         self.pg0.add_stream(pkts)
429         self.pg_enable_capture(self.pg_interfaces)
430         self.pg_start()
431         frags = self.pg1.get_capture(len(pkts))
432         if not dont_translate:
433             p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
434         else:
435             p = self.reass_frags_and_verify(
436                 frags, self.pg0.remote_ip4, self.pg1.remote_ip4
437             )
438         if proto != IP_PROTOS.icmp:
439             if not dont_translate:
440                 self.assertEqual(p[layer].dport, 20)
441                 if not ignore_port:
442                     self.assertNotEqual(p[layer].sport, self.port_in)
443             else:
444                 self.assertEqual(p[layer].sport, self.port_in)
445         else:
446             if not ignore_port:
447                 if not dont_translate:
448                     self.assertNotEqual(p[layer].id, self.port_in)
449                 else:
450                     self.assertEqual(p[layer].id, self.port_in)
451         self.assertEqual(data, p[Raw].load)
452
453         # out2in
454         if not dont_translate:
455             dst_addr = self.nat_addr
456         else:
457             dst_addr = self.pg0.remote_ip4
458         if proto != IP_PROTOS.icmp:
459             sport = 20
460             dport = p[layer].sport
461         else:
462             sport = p[layer].id
463             dport = 0
464         pkts = self.create_stream_frag(
465             self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
466         )
467         self.pg1.add_stream(pkts)
468         self.pg_enable_capture(self.pg_interfaces)
469         self.pg_start()
470         frags = self.pg0.get_capture(len(pkts))
471         p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
472         if proto != IP_PROTOS.icmp:
473             self.assertEqual(p[layer].sport, 20)
474             self.assertEqual(p[layer].dport, self.port_in)
475         else:
476             self.assertEqual(p[layer].id, self.port_in)
477         self.assertEqual(data, p[Raw].load)
478
479     def verify_capture_out(
480         self, capture, nat_ip=None, same_port=False, dst_ip=None, ignore_port=False
481     ):
482         if nat_ip is None:
483             nat_ip = self.nat_addr
484         for packet in capture:
485             try:
486                 self.assert_packet_checksums_valid(packet)
487                 self.assertEqual(packet[IP].src, nat_ip)
488                 if dst_ip is not None:
489                     self.assertEqual(packet[IP].dst, dst_ip)
490                 if packet.haslayer(TCP):
491                     if not ignore_port:
492                         if same_port:
493                             self.assertEqual(packet[TCP].sport, self.tcp_port_in)
494                         else:
495                             self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
496                     self.tcp_port_out = packet[TCP].sport
497                     self.assert_packet_checksums_valid(packet)
498                 elif packet.haslayer(UDP):
499                     if not ignore_port:
500                         if same_port:
501                             self.assertEqual(packet[UDP].sport, self.udp_port_in)
502                         else:
503                             self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
504                     self.udp_port_out = packet[UDP].sport
505                 else:
506                     if not ignore_port:
507                         if same_port:
508                             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
509                         else:
510                             self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
511                     self.icmp_id_out = packet[ICMP].id
512                     self.assert_packet_checksums_valid(packet)
513             except:
514                 self.logger.error(
515                     ppp("Unexpected or invalid packet (outside network):", packet)
516                 )
517                 raise
518
519     def verify_capture_in(self, capture, in_if):
520         for packet in capture:
521             try:
522                 self.assert_packet_checksums_valid(packet)
523                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
524                 if packet.haslayer(TCP):
525                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
526                 elif packet.haslayer(UDP):
527                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
528                 else:
529                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
530             except:
531                 self.logger.error(
532                     ppp("Unexpected or invalid packet (inside network):", packet)
533                 )
534                 raise
535
536     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
537         if dst_ip is None:
538             dst_ip = out_if.remote_ip4
539
540         pkts = []
541         # TCP
542         p = (
543             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
544             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
545             / TCP(sport=self.tcp_port_in, dport=20)
546         )
547         pkts.extend([p, p])
548
549         # UDP
550         p = (
551             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
552             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
553             / UDP(sport=self.udp_port_in, dport=20)
554         )
555         pkts.append(p)
556
557         # ICMP
558         p = (
559             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
560             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
561             / ICMP(id=self.icmp_id_in, type="echo-request")
562         )
563         pkts.append(p)
564
565         return pkts
566
567     def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
568         if dst_ip is None:
569             dst_ip = self.nat_addr
570         if not use_inside_ports:
571             tcp_port = self.tcp_port_out
572             udp_port = self.udp_port_out
573             icmp_id = self.icmp_id_out
574         else:
575             tcp_port = self.tcp_port_in
576             udp_port = self.udp_port_in
577             icmp_id = self.icmp_id_in
578         pkts = []
579         # TCP
580         p = (
581             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
582             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
583             / TCP(dport=tcp_port, sport=20)
584         )
585         pkts.extend([p, p])
586
587         # UDP
588         p = (
589             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
590             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
591             / UDP(dport=udp_port, sport=20)
592         )
593         pkts.append(p)
594
595         # ICMP
596         p = (
597             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
598             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
599             / ICMP(id=icmp_id, type="echo-reply")
600         )
601         pkts.append(p)
602
603         return pkts
604
605     def create_tcp_stream(self, in_if, out_if, count):
606         pkts = []
607         port = 6303
608
609         for i in range(count):
610             p = (
611                 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
612                 / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64)
613                 / TCP(sport=port + i, dport=20)
614             )
615             pkts.append(p)
616
617         return pkts
618
619     def create_stream_frag(
620         self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
621     ):
622         if proto == IP_PROTOS.tcp:
623             p = (
624                 IP(src=src_if.remote_ip4, dst=dst)
625                 / TCP(sport=sport, dport=dport)
626                 / Raw(data)
627             )
628             p = p.__class__(scapy.compat.raw(p))
629             chksum = p[TCP].chksum
630             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
631         elif proto == IP_PROTOS.udp:
632             proto_header = UDP(sport=sport, dport=dport)
633         elif proto == IP_PROTOS.icmp:
634             if not echo_reply:
635                 proto_header = ICMP(id=sport, type="echo-request")
636             else:
637                 proto_header = ICMP(id=sport, type="echo-reply")
638         else:
639             raise Exception("Unsupported protocol")
640         id = self.random_port()
641         pkts = []
642         if proto == IP_PROTOS.tcp:
643             raw = Raw(data[0:4])
644         else:
645             raw = Raw(data[0:16])
646         p = (
647             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
648             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
649             / proto_header
650             / raw
651         )
652         pkts.append(p)
653         if proto == IP_PROTOS.tcp:
654             raw = Raw(data[4:20])
655         else:
656             raw = Raw(data[16:32])
657         p = (
658             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
659             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
660             / raw
661         )
662         pkts.append(p)
663         if proto == IP_PROTOS.tcp:
664             raw = Raw(data[20:])
665         else:
666             raw = Raw(data[32:])
667         p = (
668             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
669             / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
670             / raw
671         )
672         pkts.append(p)
673         return pkts
674
675     def frag_in_order_in_plus_out(
676         self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
677     ):
678
679         layer = self.proto2layer(proto)
680
681         if proto == IP_PROTOS.tcp:
682             data = b"A" * 4 + b"B" * 16 + b"C" * 3
683         else:
684             data = b"A" * 16 + b"B" * 16 + b"C" * 3
685         port_in = self.random_port()
686
687         for i in range(2):
688             # out2in
689             pkts = self.create_stream_frag(
690                 self.pg0, out_addr, port_in, out_port, data, proto
691             )
692             self.pg0.add_stream(pkts)
693             self.pg_enable_capture(self.pg_interfaces)
694             self.pg_start()
695             frags = self.pg1.get_capture(len(pkts))
696             p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr)
697             if proto != IP_PROTOS.icmp:
698                 self.assertEqual(p[layer].sport, port_in)
699                 self.assertEqual(p[layer].dport, in_port)
700             else:
701                 self.assertEqual(p[layer].id, port_in)
702             self.assertEqual(data, p[Raw].load)
703
704             # in2out
705             if proto != IP_PROTOS.icmp:
706                 pkts = self.create_stream_frag(
707                     self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto
708                 )
709             else:
710                 pkts = self.create_stream_frag(
711                     self.pg1,
712                     self.pg0.remote_ip4,
713                     p[layer].id,
714                     0,
715                     data,
716                     proto,
717                     echo_reply=True,
718                 )
719             self.pg1.add_stream(pkts)
720             self.pg_enable_capture(self.pg_interfaces)
721             self.pg_start()
722             frags = self.pg0.get_capture(len(pkts))
723             p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4)
724             if proto != IP_PROTOS.icmp:
725                 self.assertEqual(p[layer].sport, out_port)
726                 self.assertEqual(p[layer].dport, port_in)
727             else:
728                 self.assertEqual(p[layer].id, port_in)
729             self.assertEqual(data, p[Raw].load)
730
731     def frag_out_of_order_in_plus_out(
732         self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
733     ):
734
735         layer = self.proto2layer(proto)
736
737         if proto == IP_PROTOS.tcp:
738             data = b"A" * 4 + b"B" * 16 + b"C" * 3
739         else:
740             data = b"A" * 16 + b"B" * 16 + b"C" * 3
741         port_in = self.random_port()
742
743         for i in range(2):
744             # out2in
745             pkts = self.create_stream_frag(
746                 self.pg0, out_addr, port_in, out_port, data, proto
747             )
748             pkts.reverse()
749             self.pg0.add_stream(pkts)
750             self.pg_enable_capture(self.pg_interfaces)
751             self.pg_start()
752             frags = self.pg1.get_capture(len(pkts))
753             p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr)
754             if proto != IP_PROTOS.icmp:
755                 self.assertEqual(p[layer].dport, in_port)
756                 self.assertEqual(p[layer].sport, port_in)
757                 self.assertEqual(p[layer].dport, in_port)
758             else:
759                 self.assertEqual(p[layer].id, port_in)
760             self.assertEqual(data, p[Raw].load)
761
762             # in2out
763             if proto != IP_PROTOS.icmp:
764                 pkts = self.create_stream_frag(
765                     self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto
766                 )
767             else:
768                 pkts = self.create_stream_frag(
769                     self.pg1,
770                     self.pg0.remote_ip4,
771                     p[layer].id,
772                     0,
773                     data,
774                     proto,
775                     echo_reply=True,
776                 )
777             pkts.reverse()
778             self.pg1.add_stream(pkts)
779             self.pg_enable_capture(self.pg_interfaces)
780             self.pg_start()
781             frags = self.pg0.get_capture(len(pkts))
782             p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4)
783             if proto != IP_PROTOS.icmp:
784                 self.assertEqual(p[layer].sport, out_port)
785                 self.assertEqual(p[layer].dport, port_in)
786             else:
787                 self.assertEqual(p[layer].id, port_in)
788             self.assertEqual(data, p[Raw].load)
789
790     def init_tcp_session(self, in_if, out_if, in_port, ext_port):
791         # SYN packet in->out
792         p = (
793             Ether(src=in_if.remote_mac, dst=in_if.local_mac)
794             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
795             / TCP(sport=in_port, dport=ext_port, flags="S")
796         )
797         in_if.add_stream(p)
798         self.pg_enable_capture(self.pg_interfaces)
799         self.pg_start()
800         capture = out_if.get_capture(1)
801         p = capture[0]
802         out_port = p[TCP].sport
803
804         # SYN + ACK packet out->in
805         p = (
806             Ether(src=out_if.remote_mac, dst=out_if.local_mac)
807             / IP(src=out_if.remote_ip4, dst=self.nat_addr)
808             / TCP(sport=ext_port, dport=out_port, flags="SA")
809         )
810         out_if.add_stream(p)
811         self.pg_enable_capture(self.pg_interfaces)
812         self.pg_start()
813         in_if.get_capture(1)
814
815         # ACK packet in->out
816         p = (
817             Ether(src=in_if.remote_mac, dst=in_if.local_mac)
818             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
819             / TCP(sport=in_port, dport=ext_port, flags="A")
820         )
821         in_if.add_stream(p)
822         self.pg_enable_capture(self.pg_interfaces)
823         self.pg_start()
824         out_if.get_capture(1)
825
826         return out_port
827
828     def twice_nat_common(
829         self, self_twice_nat=False, same_pg=False, lb=False, client_id=None
830     ):
831         twice_nat_addr = "10.0.1.3"
832
833         port_in = 8080
834         if lb:
835             if not same_pg:
836                 port_in1 = port_in
837                 port_in2 = port_in
838             else:
839                 port_in1 = port_in + 1
840                 port_in2 = port_in + 2
841
842         port_out = 80
843         eh_port_out = 4567
844
845         server1 = self.pg0.remote_hosts[0]
846         server2 = self.pg0.remote_hosts[1]
847         if lb and same_pg:
848             server2 = server1
849         if not lb:
850             server = server1
851
852         pg0 = self.pg0
853         if same_pg:
854             pg1 = self.pg0
855         else:
856             pg1 = self.pg1
857
858         eh_translate = (not self_twice_nat) or (not lb and same_pg) or client_id == 1
859
860         self.nat_add_address(self.nat_addr)
861         self.nat_add_address(twice_nat_addr, twice_nat=1)
862
863         flags = 0
864         if self_twice_nat:
865             flags |= self.config_flags.NAT_IS_SELF_TWICE_NAT
866         else:
867             flags |= self.config_flags.NAT_IS_TWICE_NAT
868
869         if not lb:
870             self.nat_add_static_mapping(
871                 pg0.remote_ip4,
872                 self.nat_addr,
873                 port_in,
874                 port_out,
875                 proto=IP_PROTOS.tcp,
876                 flags=flags,
877             )
878         else:
879             locals = [
880                 {"addr": server1.ip4, "port": port_in1, "probability": 50, "vrf_id": 0},
881                 {"addr": server2.ip4, "port": port_in2, "probability": 50, "vrf_id": 0},
882             ]
883             out_addr = self.nat_addr
884
885             self.vapi.nat44_add_del_lb_static_mapping(
886                 is_add=1,
887                 flags=flags,
888                 external_addr=out_addr,
889                 external_port=port_out,
890                 protocol=IP_PROTOS.tcp,
891                 local_num=len(locals),
892                 locals=locals,
893             )
894         self.nat_add_inside_interface(pg0)
895         self.nat_add_outside_interface(pg1)
896
897         if same_pg:
898             if not lb:
899                 client = server
900             else:
901                 assert client_id is not None
902                 if client_id == 1:
903                     client = self.pg0.remote_hosts[0]
904                 elif client_id == 2:
905                     client = self.pg0.remote_hosts[1]
906         else:
907             client = pg1.remote_hosts[0]
908         p = (
909             Ether(src=pg1.remote_mac, dst=pg1.local_mac)
910             / IP(src=client.ip4, dst=self.nat_addr)
911             / TCP(sport=eh_port_out, dport=port_out)
912         )
913         pg1.add_stream(p)
914         self.pg_enable_capture(self.pg_interfaces)
915         self.pg_start()
916         capture = pg0.get_capture(1)
917         p = capture[0]
918         try:
919             ip = p[IP]
920             tcp = p[TCP]
921             if lb:
922                 if ip.dst == server1.ip4:
923                     server = server1
924                     port_in = port_in1
925                 else:
926                     server = server2
927                     port_in = port_in2
928             self.assertEqual(ip.dst, server.ip4)
929             if lb and same_pg:
930                 self.assertIn(tcp.dport, [port_in1, port_in2])
931             else:
932                 self.assertEqual(tcp.dport, port_in)
933             if eh_translate:
934                 self.assertEqual(ip.src, twice_nat_addr)
935                 self.assertNotEqual(tcp.sport, eh_port_out)
936             else:
937                 self.assertEqual(ip.src, client.ip4)
938                 self.assertEqual(tcp.sport, eh_port_out)
939             eh_addr_in = ip.src
940             eh_port_in = tcp.sport
941             saved_port_in = tcp.dport
942             self.assert_packet_checksums_valid(p)
943         except:
944             self.logger.error(ppp("Unexpected or invalid packet:", p))
945             raise
946
947         p = (
948             Ether(src=server.mac, dst=pg0.local_mac)
949             / IP(src=server.ip4, dst=eh_addr_in)
950             / TCP(sport=saved_port_in, dport=eh_port_in)
951         )
952         pg0.add_stream(p)
953         self.pg_enable_capture(self.pg_interfaces)
954         self.pg_start()
955         capture = pg1.get_capture(1)
956         p = capture[0]
957         try:
958             ip = p[IP]
959             tcp = p[TCP]
960             self.assertEqual(ip.dst, client.ip4)
961             self.assertEqual(ip.src, self.nat_addr)
962             self.assertEqual(tcp.dport, eh_port_out)
963             self.assertEqual(tcp.sport, port_out)
964             self.assert_packet_checksums_valid(p)
965         except:
966             self.logger.error(ppp("Unexpected or invalid packet:", p))
967             raise
968
969         if eh_translate:
970             sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
971             self.assertEqual(len(sessions), 1)
972             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
973             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_TWICE_NAT)
974             self.logger.info(self.vapi.cli("show nat44 sessions"))
975             self.vapi.nat44_del_session(
976                 address=sessions[0].inside_ip_address,
977                 port=sessions[0].inside_port,
978                 protocol=sessions[0].protocol,
979                 flags=(
980                     self.config_flags.NAT_IS_INSIDE
981                     | self.config_flags.NAT_IS_EXT_HOST_VALID
982                 ),
983                 ext_host_address=sessions[0].ext_host_nat_address,
984                 ext_host_port=sessions[0].ext_host_nat_port,
985             )
986             sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
987             self.assertEqual(len(sessions), 0)
988
989     def verify_syslog_sess(self, data, msgid, is_ip6=False):
990         message = data.decode("utf-8")
991         try:
992             message = SyslogMessage.parse(message)
993         except ParseError as e:
994             self.logger.error(e)
995             raise
996         else:
997             self.assertEqual(message.severity, SyslogSeverity.info)
998             self.assertEqual(message.appname, "NAT")
999             self.assertEqual(message.msgid, msgid)
1000             sd_params = message.sd.get("nsess")
1001             self.assertTrue(sd_params is not None)
1002             if is_ip6:
1003                 self.assertEqual(sd_params.get("IATYP"), "IPv6")
1004                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip6)
1005             else:
1006                 self.assertEqual(sd_params.get("IATYP"), "IPv4")
1007                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
1008                 self.assertTrue(sd_params.get("SSUBIX") is not None)
1009             self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
1010             self.assertEqual(sd_params.get("XATYP"), "IPv4")
1011             self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
1012             self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
1013             self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
1014             self.assertEqual(sd_params.get("SVLAN"), "0")
1015             self.assertEqual(sd_params.get("XDADDR"), self.pg1.remote_ip4)
1016             self.assertEqual(sd_params.get("XDPORT"), "%d" % self.tcp_external_port)
1017
1018     def test_icmp_error(self):
1019         """NAT44ED test ICMP error message with inner header"""
1020
1021         payload = "H" * 10
1022
1023         self.nat_add_address(self.nat_addr)
1024         self.nat_add_inside_interface(self.pg0)
1025         self.nat_add_outside_interface(self.pg1)
1026
1027         # in2out (initiate connection)
1028         p1 = [
1029             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1030             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1031             / UDP(sport=21, dport=20)
1032             / payload,
1033             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1034             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1035             / TCP(sport=21, dport=20, flags="S")
1036             / payload,
1037             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1038             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1039             / ICMP(type="echo-request", id=7777)
1040             / payload,
1041         ]
1042
1043         capture = self.send_and_expect(self.pg0, p1, self.pg1)
1044
1045         # out2in (send error message)
1046         p2 = [
1047             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1048             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1049             / ICMP(type="dest-unreach", code="port-unreachable")
1050             / c[IP:]
1051             for c in capture
1052         ]
1053
1054         capture = self.send_and_expect(self.pg1, p2, self.pg0)
1055
1056         for c in capture:
1057             try:
1058                 assert c[IP].dst == self.pg0.remote_ip4
1059                 assert c[IPerror].src == self.pg0.remote_ip4
1060             except AssertionError as a:
1061                 raise AssertionError(f"Packet {pr(c)} not translated properly") from a
1062
1063     def test_icmp_echo_reply_trailer(self):
1064         """ICMP echo reply with ethernet trailer"""
1065
1066         self.nat_add_address(self.nat_addr)
1067         self.nat_add_inside_interface(self.pg0)
1068         self.nat_add_outside_interface(self.pg1)
1069
1070         # in2out
1071         p1 = (
1072             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1073             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1074             / ICMP(type=8, id=0xABCD, seq=0)
1075         )
1076
1077         self.pg0.add_stream(p1)
1078         self.pg_enable_capture(self.pg_interfaces)
1079         self.pg_start()
1080         c = self.pg1.get_capture(1)[0]
1081
1082         self.logger.debug(self.vapi.cli("show trace"))
1083
1084         # out2in
1085         p2 = (
1086             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1087             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, id=0xEE59)
1088             / ICMP(type=0, id=c[ICMP].id, seq=0)
1089         )
1090
1091         # force checksum calculation
1092         p2 = p2.__class__(bytes(p2))
1093
1094         self.logger.debug(ppp("Packet before modification:", p2))
1095
1096         # hex representation of vss monitoring ethernet trailer
1097         # this seems to be just added to end of packet without modifying
1098         # IP or ICMP lengths / checksums
1099         p2 = p2 / Raw("\x00\x00\x52\x54\x00\x46\xab\x04\x84\x18")
1100         # change it so that IP/ICMP is unaffected
1101         p2[IP].len = 28
1102
1103         self.logger.debug(ppp("Packet with added trailer:", p2))
1104
1105         self.pg1.add_stream(p2)
1106         self.pg_enable_capture(self.pg_interfaces)
1107         self.pg_start()
1108
1109         self.pg0.get_capture(1)
1110
1111     def test_users_dump(self):
1112         """NAT44ED API test - nat44_user_dump"""
1113
1114         self.nat_add_address(self.nat_addr)
1115         self.nat_add_inside_interface(self.pg0)
1116         self.nat_add_outside_interface(self.pg1)
1117
1118         self.vapi.nat44_forwarding_enable_disable(enable=1)
1119
1120         local_ip = self.pg0.remote_ip4
1121         external_ip = self.nat_addr
1122         self.nat_add_static_mapping(local_ip, external_ip)
1123
1124         users = self.vapi.nat44_user_dump()
1125         self.assertEqual(len(users), 0)
1126
1127         # in2out - static mapping match
1128
1129         pkts = self.create_stream_out(self.pg1)
1130         self.pg1.add_stream(pkts)
1131         self.pg_enable_capture(self.pg_interfaces)
1132         self.pg_start()
1133         capture = self.pg0.get_capture(len(pkts))
1134         self.verify_capture_in(capture, self.pg0)
1135
1136         pkts = self.create_stream_in(self.pg0, self.pg1)
1137         self.pg0.add_stream(pkts)
1138         self.pg_enable_capture(self.pg_interfaces)
1139         self.pg_start()
1140         capture = self.pg1.get_capture(len(pkts))
1141         self.verify_capture_out(capture, same_port=True)
1142
1143         users = self.vapi.nat44_user_dump()
1144         self.assertEqual(len(users), 1)
1145         static_user = users[0]
1146         self.assertEqual(static_user.nstaticsessions, 3)
1147         self.assertEqual(static_user.nsessions, 0)
1148
1149         # in2out - no static mapping match (forwarding test)
1150
1151         host0 = self.pg0.remote_hosts[0]
1152         self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1153         try:
1154             pkts = self.create_stream_out(
1155                 self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1156             )
1157             self.pg1.add_stream(pkts)
1158             self.pg_enable_capture(self.pg_interfaces)
1159             self.pg_start()
1160             capture = self.pg0.get_capture(len(pkts))
1161             self.verify_capture_in(capture, self.pg0)
1162
1163             pkts = self.create_stream_in(self.pg0, self.pg1)
1164             self.pg0.add_stream(pkts)
1165             self.pg_enable_capture(self.pg_interfaces)
1166             self.pg_start()
1167             capture = self.pg1.get_capture(len(pkts))
1168             self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, same_port=True)
1169         finally:
1170             self.pg0.remote_hosts[0] = host0
1171
1172         users = self.vapi.nat44_user_dump()
1173         self.assertEqual(len(users), 2)
1174         if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4:
1175             non_static_user = users[1]
1176             static_user = users[0]
1177         else:
1178             non_static_user = users[0]
1179             static_user = users[1]
1180         self.assertEqual(static_user.nstaticsessions, 3)
1181         self.assertEqual(static_user.nsessions, 0)
1182         self.assertEqual(non_static_user.nstaticsessions, 0)
1183         self.assertEqual(non_static_user.nsessions, 3)
1184
1185         users = self.vapi.nat44_user_dump()
1186         self.assertEqual(len(users), 2)
1187         if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4:
1188             non_static_user = users[1]
1189             static_user = users[0]
1190         else:
1191             non_static_user = users[0]
1192             static_user = users[1]
1193         self.assertEqual(static_user.nstaticsessions, 3)
1194         self.assertEqual(static_user.nsessions, 0)
1195         self.assertEqual(non_static_user.nstaticsessions, 0)
1196         self.assertEqual(non_static_user.nsessions, 3)
1197
1198     def test_frag_out_of_order_do_not_translate(self):
1199         """NAT44ED don't translate fragments arriving out of order"""
1200         self.nat_add_inside_interface(self.pg0)
1201         self.nat_add_outside_interface(self.pg1)
1202         self.vapi.nat44_forwarding_enable_disable(enable=True)
1203         self.frag_out_of_order(proto=IP_PROTOS.tcp, dont_translate=True)
1204
1205     def test_forwarding(self):
1206         """NAT44ED forwarding test"""
1207
1208         self.nat_add_inside_interface(self.pg0)
1209         self.nat_add_outside_interface(self.pg1)
1210         self.vapi.nat44_forwarding_enable_disable(enable=1)
1211
1212         real_ip = self.pg0.remote_ip4
1213         alias_ip = self.nat_addr
1214         flags = self.config_flags.NAT_IS_ADDR_ONLY
1215         self.vapi.nat44_add_del_static_mapping(
1216             is_add=1,
1217             local_ip_address=real_ip,
1218             external_ip_address=alias_ip,
1219             external_sw_if_index=0xFFFFFFFF,
1220             flags=flags,
1221         )
1222
1223         try:
1224             # in2out - static mapping match
1225
1226             pkts = self.create_stream_out(self.pg1)
1227             self.pg1.add_stream(pkts)
1228             self.pg_enable_capture(self.pg_interfaces)
1229             self.pg_start()
1230             capture = self.pg0.get_capture(len(pkts))
1231             self.verify_capture_in(capture, self.pg0)
1232
1233             pkts = self.create_stream_in(self.pg0, self.pg1)
1234             self.pg0.add_stream(pkts)
1235             self.pg_enable_capture(self.pg_interfaces)
1236             self.pg_start()
1237             capture = self.pg1.get_capture(len(pkts))
1238             self.verify_capture_out(capture, same_port=True)
1239
1240             # in2out - no static mapping match
1241
1242             host0 = self.pg0.remote_hosts[0]
1243             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1244             try:
1245                 pkts = self.create_stream_out(
1246                     self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1247                 )
1248                 self.pg1.add_stream(pkts)
1249                 self.pg_enable_capture(self.pg_interfaces)
1250                 self.pg_start()
1251                 capture = self.pg0.get_capture(len(pkts))
1252                 self.verify_capture_in(capture, self.pg0)
1253
1254                 pkts = self.create_stream_in(self.pg0, self.pg1)
1255                 self.pg0.add_stream(pkts)
1256                 self.pg_enable_capture(self.pg_interfaces)
1257                 self.pg_start()
1258                 capture = self.pg1.get_capture(len(pkts))
1259                 self.verify_capture_out(
1260                     capture, nat_ip=self.pg0.remote_ip4, same_port=True
1261                 )
1262             finally:
1263                 self.pg0.remote_hosts[0] = host0
1264
1265             user = self.pg0.remote_hosts[1]
1266             sessions = self.vapi.nat44_user_session_dump(user.ip4, 0)
1267             self.assertEqual(len(sessions), 3)
1268             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1269             self.vapi.nat44_del_session(
1270                 address=sessions[0].inside_ip_address,
1271                 port=sessions[0].inside_port,
1272                 protocol=sessions[0].protocol,
1273                 flags=(
1274                     self.config_flags.NAT_IS_INSIDE
1275                     | self.config_flags.NAT_IS_EXT_HOST_VALID
1276                 ),
1277                 ext_host_address=sessions[0].ext_host_address,
1278                 ext_host_port=sessions[0].ext_host_port,
1279             )
1280             sessions = self.vapi.nat44_user_session_dump(user.ip4, 0)
1281             self.assertEqual(len(sessions), 2)
1282
1283         finally:
1284             self.vapi.nat44_forwarding_enable_disable(enable=0)
1285             flags = self.config_flags.NAT_IS_ADDR_ONLY
1286             self.vapi.nat44_add_del_static_mapping(
1287                 is_add=0,
1288                 local_ip_address=real_ip,
1289                 external_ip_address=alias_ip,
1290                 external_sw_if_index=0xFFFFFFFF,
1291                 flags=flags,
1292             )
1293
1294     def test_output_feature_and_service2(self):
1295         """NAT44ED interface output feature and service host direct access"""
1296         self.vapi.nat44_forwarding_enable_disable(enable=1)
1297         self.nat_add_address(self.nat_addr)
1298
1299         self.vapi.nat44_ed_add_del_output_interface(
1300             sw_if_index=self.pg1.sw_if_index, is_add=1
1301         )
1302
1303         # session initiated from service host - translate
1304         pkts = self.create_stream_in(self.pg0, self.pg1)
1305         self.pg0.add_stream(pkts)
1306         self.pg_enable_capture(self.pg_interfaces)
1307         self.pg_start()
1308         capture = self.pg1.get_capture(len(pkts))
1309         self.verify_capture_out(capture, ignore_port=True)
1310
1311         pkts = self.create_stream_out(self.pg1)
1312         self.pg1.add_stream(pkts)
1313         self.pg_enable_capture(self.pg_interfaces)
1314         self.pg_start()
1315         capture = self.pg0.get_capture(len(pkts))
1316         self.verify_capture_in(capture, self.pg0)
1317
1318         # session initiated from remote host - do not translate
1319         tcp_port_in = self.tcp_port_in
1320         udp_port_in = self.udp_port_in
1321         icmp_id_in = self.icmp_id_in
1322
1323         self.tcp_port_in = 60303
1324         self.udp_port_in = 60304
1325         self.icmp_id_in = 60305
1326
1327         try:
1328             pkts = self.create_stream_out(
1329                 self.pg1, self.pg0.remote_ip4, use_inside_ports=True
1330             )
1331             self.pg1.add_stream(pkts)
1332             self.pg_enable_capture(self.pg_interfaces)
1333             self.pg_start()
1334             capture = self.pg0.get_capture(len(pkts))
1335             self.verify_capture_in(capture, self.pg0)
1336
1337             pkts = self.create_stream_in(self.pg0, self.pg1)
1338             self.pg0.add_stream(pkts)
1339             self.pg_enable_capture(self.pg_interfaces)
1340             self.pg_start()
1341             capture = self.pg1.get_capture(len(pkts))
1342             self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, same_port=True)
1343         finally:
1344             self.tcp_port_in = tcp_port_in
1345             self.udp_port_in = udp_port_in
1346             self.icmp_id_in = icmp_id_in
1347
1348     def test_twice_nat(self):
1349         """NAT44ED Twice NAT"""
1350         self.twice_nat_common()
1351
1352     def test_self_twice_nat_positive(self):
1353         """NAT44ED Self Twice NAT (positive test)"""
1354         self.twice_nat_common(self_twice_nat=True, same_pg=True)
1355
1356     def test_self_twice_nat_lb_positive(self):
1357         """NAT44ED Self Twice NAT local service load balancing (positive test)"""
1358         self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, client_id=1)
1359
1360     def test_twice_nat_lb(self):
1361         """NAT44ED Twice NAT local service load balancing"""
1362         self.twice_nat_common(lb=True)
1363
1364     def test_output_feature(self):
1365         """NAT44ED interface output feature (in2out postrouting)"""
1366         self.vapi.nat44_forwarding_enable_disable(enable=1)
1367         self.nat_add_address(self.nat_addr)
1368
1369         self.nat_add_outside_interface(self.pg0)
1370         self.vapi.nat44_ed_add_del_output_interface(
1371             sw_if_index=self.pg1.sw_if_index, is_add=1
1372         )
1373
1374         # in2out
1375         pkts = self.create_stream_in(self.pg0, self.pg1)
1376         self.pg0.add_stream(pkts)
1377         self.pg_enable_capture(self.pg_interfaces)
1378         self.pg_start()
1379         capture = self.pg1.get_capture(len(pkts))
1380         self.verify_capture_out(capture, ignore_port=True)
1381         self.logger.debug(self.vapi.cli("show trace"))
1382
1383         # out2in
1384         pkts = self.create_stream_out(self.pg1)
1385         self.pg1.add_stream(pkts)
1386         self.pg_enable_capture(self.pg_interfaces)
1387         self.pg_start()
1388         capture = self.pg0.get_capture(len(pkts))
1389         self.verify_capture_in(capture, self.pg0)
1390         self.logger.debug(self.vapi.cli("show trace"))
1391
1392         # in2out
1393         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1394         self.pg0.add_stream(pkts)
1395         self.pg_enable_capture(self.pg_interfaces)
1396         self.pg_start()
1397         capture = self.pg1.get_capture(len(pkts))
1398         self.verify_capture_out(capture, ignore_port=True)
1399         self.logger.debug(self.vapi.cli("show trace"))
1400
1401         # out2in
1402         pkts = self.create_stream_out(self.pg1, ttl=2)
1403         self.pg1.add_stream(pkts)
1404         self.pg_enable_capture(self.pg_interfaces)
1405         self.pg_start()
1406         capture = self.pg0.get_capture(len(pkts))
1407         self.verify_capture_in(capture, self.pg0)
1408         self.logger.debug(self.vapi.cli("show trace"))
1409
1410         # in2out
1411         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1412         capture = self.send_and_expect_some(self.pg0, pkts, self.pg0)
1413         for p in capture:
1414             self.assertIn(ICMP, p)
1415             self.assertEqual(p[ICMP].type, 11)  # 11 == time-exceeded
1416
1417     def test_static_with_port_out2(self):
1418         """NAT44ED 1:1 NAPT asymmetrical rule"""
1419
1420         external_port = 80
1421         local_port = 8080
1422
1423         self.vapi.nat44_forwarding_enable_disable(enable=1)
1424         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1425         self.nat_add_static_mapping(
1426             self.pg0.remote_ip4,
1427             self.nat_addr,
1428             local_port,
1429             external_port,
1430             proto=IP_PROTOS.tcp,
1431             flags=flags,
1432         )
1433
1434         self.nat_add_inside_interface(self.pg0)
1435         self.nat_add_outside_interface(self.pg1)
1436
1437         # from client to service
1438         p = (
1439             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1440             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1441             / TCP(sport=12345, dport=external_port)
1442         )
1443         self.pg1.add_stream(p)
1444         self.pg_enable_capture(self.pg_interfaces)
1445         self.pg_start()
1446         capture = self.pg0.get_capture(1)
1447         p = capture[0]
1448         try:
1449             ip = p[IP]
1450             tcp = p[TCP]
1451             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1452             self.assertEqual(tcp.dport, local_port)
1453             self.assert_packet_checksums_valid(p)
1454         except:
1455             self.logger.error(ppp("Unexpected or invalid packet:", p))
1456             raise
1457
1458         # ICMP error
1459         p = (
1460             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1461             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1462             / ICMP(type=11)
1463             / capture[0][IP]
1464         )
1465         self.pg0.add_stream(p)
1466         self.pg_enable_capture(self.pg_interfaces)
1467         self.pg_start()
1468         capture = self.pg1.get_capture(1)
1469         p = capture[0]
1470         try:
1471             self.assertEqual(p[IP].src, self.nat_addr)
1472             inner = p[IPerror]
1473             self.assertEqual(inner.dst, self.nat_addr)
1474             self.assertEqual(inner[TCPerror].dport, external_port)
1475         except:
1476             self.logger.error(ppp("Unexpected or invalid packet:", p))
1477             raise
1478
1479         # from service back to client
1480         p = (
1481             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1482             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1483             / TCP(sport=local_port, dport=12345)
1484         )
1485         self.pg0.add_stream(p)
1486         self.pg_enable_capture(self.pg_interfaces)
1487         self.pg_start()
1488         capture = self.pg1.get_capture(1)
1489         p = capture[0]
1490         try:
1491             ip = p[IP]
1492             tcp = p[TCP]
1493             self.assertEqual(ip.src, self.nat_addr)
1494             self.assertEqual(tcp.sport, external_port)
1495             self.assert_packet_checksums_valid(p)
1496         except:
1497             self.logger.error(ppp("Unexpected or invalid packet:", p))
1498             raise
1499
1500         # ICMP error
1501         p = (
1502             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1503             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1504             / ICMP(type=11)
1505             / capture[0][IP]
1506         )
1507         self.pg1.add_stream(p)
1508         self.pg_enable_capture(self.pg_interfaces)
1509         self.pg_start()
1510         capture = self.pg0.get_capture(1)
1511         p = capture[0]
1512         try:
1513             self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1514             inner = p[IPerror]
1515             self.assertEqual(inner.src, self.pg0.remote_ip4)
1516             self.assertEqual(inner[TCPerror].sport, local_port)
1517         except:
1518             self.logger.error(ppp("Unexpected or invalid packet:", p))
1519             raise
1520
1521         # from client to server (no translation)
1522         p = (
1523             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1524             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
1525             / TCP(sport=12346, dport=local_port)
1526         )
1527         self.pg1.add_stream(p)
1528         self.pg_enable_capture(self.pg_interfaces)
1529         self.pg_start()
1530         capture = self.pg0.get_capture(1)
1531         p = capture[0]
1532         try:
1533             ip = p[IP]
1534             tcp = p[TCP]
1535             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1536             self.assertEqual(tcp.dport, local_port)
1537             self.assert_packet_checksums_valid(p)
1538         except:
1539             self.logger.error(ppp("Unexpected or invalid packet:", p))
1540             raise
1541
1542         # from service back to client (no translation)
1543         p = (
1544             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1545             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1546             / TCP(sport=local_port, dport=12346)
1547         )
1548         self.pg0.add_stream(p)
1549         self.pg_enable_capture(self.pg_interfaces)
1550         self.pg_start()
1551         capture = self.pg1.get_capture(1)
1552         p = capture[0]
1553         try:
1554             ip = p[IP]
1555             tcp = p[TCP]
1556             self.assertEqual(ip.src, self.pg0.remote_ip4)
1557             self.assertEqual(tcp.sport, local_port)
1558             self.assert_packet_checksums_valid(p)
1559         except:
1560             self.logger.error(ppp("Unexpected or invalid packet:", p))
1561             raise
1562
1563     def test_static_lb(self):
1564         """NAT44ED local service load balancing"""
1565         external_addr_n = self.nat_addr
1566         external_port = 80
1567         local_port = 8080
1568         server1 = self.pg0.remote_hosts[0]
1569         server2 = self.pg0.remote_hosts[1]
1570
1571         locals = [
1572             {"addr": server1.ip4, "port": local_port, "probability": 70, "vrf_id": 0},
1573             {"addr": server2.ip4, "port": local_port, "probability": 30, "vrf_id": 0},
1574         ]
1575
1576         self.nat_add_address(self.nat_addr)
1577         self.vapi.nat44_add_del_lb_static_mapping(
1578             is_add=1,
1579             external_addr=external_addr_n,
1580             external_port=external_port,
1581             protocol=IP_PROTOS.tcp,
1582             local_num=len(locals),
1583             locals=locals,
1584         )
1585         flags = self.config_flags.NAT_IS_INSIDE
1586         self.vapi.nat44_interface_add_del_feature(
1587             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1588         )
1589         self.vapi.nat44_interface_add_del_feature(
1590             sw_if_index=self.pg1.sw_if_index, is_add=1
1591         )
1592
1593         # from client to service
1594         p = (
1595             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1596             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1597             / TCP(sport=12345, dport=external_port)
1598         )
1599         self.pg1.add_stream(p)
1600         self.pg_enable_capture(self.pg_interfaces)
1601         self.pg_start()
1602         capture = self.pg0.get_capture(1)
1603         p = capture[0]
1604         server = None
1605         try:
1606             ip = p[IP]
1607             tcp = p[TCP]
1608             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1609             if ip.dst == server1.ip4:
1610                 server = server1
1611             else:
1612                 server = server2
1613             self.assertEqual(tcp.dport, local_port)
1614             self.assert_packet_checksums_valid(p)
1615         except:
1616             self.logger.error(ppp("Unexpected or invalid packet:", p))
1617             raise
1618
1619         # from service back to client
1620         p = (
1621             Ether(src=server.mac, dst=self.pg0.local_mac)
1622             / IP(src=server.ip4, dst=self.pg1.remote_ip4)
1623             / TCP(sport=local_port, dport=12345)
1624         )
1625         self.pg0.add_stream(p)
1626         self.pg_enable_capture(self.pg_interfaces)
1627         self.pg_start()
1628         capture = self.pg1.get_capture(1)
1629         p = capture[0]
1630         try:
1631             ip = p[IP]
1632             tcp = p[TCP]
1633             self.assertEqual(ip.src, self.nat_addr)
1634             self.assertEqual(tcp.sport, external_port)
1635             self.assert_packet_checksums_valid(p)
1636         except:
1637             self.logger.error(ppp("Unexpected or invalid packet:", p))
1638             raise
1639
1640         sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
1641         self.assertEqual(len(sessions), 1)
1642         self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1643         self.vapi.nat44_del_session(
1644             address=sessions[0].inside_ip_address,
1645             port=sessions[0].inside_port,
1646             protocol=sessions[0].protocol,
1647             flags=(
1648                 self.config_flags.NAT_IS_INSIDE
1649                 | self.config_flags.NAT_IS_EXT_HOST_VALID
1650             ),
1651             ext_host_address=sessions[0].ext_host_address,
1652             ext_host_port=sessions[0].ext_host_port,
1653         )
1654         sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
1655         self.assertEqual(len(sessions), 0)
1656
1657     def test_static_lb_2(self):
1658         """NAT44ED local service load balancing (asymmetrical rule)"""
1659         external_addr = self.nat_addr
1660         external_port = 80
1661         local_port = 8080
1662         server1 = self.pg0.remote_hosts[0]
1663         server2 = self.pg0.remote_hosts[1]
1664
1665         locals = [
1666             {"addr": server1.ip4, "port": local_port, "probability": 70, "vrf_id": 0},
1667             {"addr": server2.ip4, "port": local_port, "probability": 30, "vrf_id": 0},
1668         ]
1669
1670         self.vapi.nat44_forwarding_enable_disable(enable=1)
1671         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1672         self.vapi.nat44_add_del_lb_static_mapping(
1673             is_add=1,
1674             flags=flags,
1675             external_addr=external_addr,
1676             external_port=external_port,
1677             protocol=IP_PROTOS.tcp,
1678             local_num=len(locals),
1679             locals=locals,
1680         )
1681         flags = self.config_flags.NAT_IS_INSIDE
1682         self.vapi.nat44_interface_add_del_feature(
1683             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1684         )
1685         self.vapi.nat44_interface_add_del_feature(
1686             sw_if_index=self.pg1.sw_if_index, is_add=1
1687         )
1688
1689         # from client to service
1690         p = (
1691             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1692             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1693             / TCP(sport=12345, dport=external_port)
1694         )
1695         self.pg1.add_stream(p)
1696         self.pg_enable_capture(self.pg_interfaces)
1697         self.pg_start()
1698         capture = self.pg0.get_capture(1)
1699         p = capture[0]
1700         server = None
1701         try:
1702             ip = p[IP]
1703             tcp = p[TCP]
1704             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1705             if ip.dst == server1.ip4:
1706                 server = server1
1707             else:
1708                 server = server2
1709             self.assertEqual(tcp.dport, local_port)
1710             self.assert_packet_checksums_valid(p)
1711         except:
1712             self.logger.error(ppp("Unexpected or invalid packet:", p))
1713             raise
1714
1715         # from service back to client
1716         p = (
1717             Ether(src=server.mac, dst=self.pg0.local_mac)
1718             / IP(src=server.ip4, dst=self.pg1.remote_ip4)
1719             / TCP(sport=local_port, dport=12345)
1720         )
1721         self.pg0.add_stream(p)
1722         self.pg_enable_capture(self.pg_interfaces)
1723         self.pg_start()
1724         capture = self.pg1.get_capture(1)
1725         p = capture[0]
1726         try:
1727             ip = p[IP]
1728             tcp = p[TCP]
1729             self.assertEqual(ip.src, self.nat_addr)
1730             self.assertEqual(tcp.sport, external_port)
1731             self.assert_packet_checksums_valid(p)
1732         except:
1733             self.logger.error(ppp("Unexpected or invalid packet:", p))
1734             raise
1735
1736         # from client to server (no translation)
1737         p = (
1738             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1739             / IP(src=self.pg1.remote_ip4, dst=server1.ip4)
1740             / TCP(sport=12346, dport=local_port)
1741         )
1742         self.pg1.add_stream(p)
1743         self.pg_enable_capture(self.pg_interfaces)
1744         self.pg_start()
1745         capture = self.pg0.get_capture(1)
1746         p = capture[0]
1747         server = None
1748         try:
1749             ip = p[IP]
1750             tcp = p[TCP]
1751             self.assertEqual(ip.dst, server1.ip4)
1752             self.assertEqual(tcp.dport, local_port)
1753             self.assert_packet_checksums_valid(p)
1754         except:
1755             self.logger.error(ppp("Unexpected or invalid packet:", p))
1756             raise
1757
1758         # from service back to client (no translation)
1759         p = (
1760             Ether(src=server1.mac, dst=self.pg0.local_mac)
1761             / IP(src=server1.ip4, dst=self.pg1.remote_ip4)
1762             / TCP(sport=local_port, dport=12346)
1763         )
1764         self.pg0.add_stream(p)
1765         self.pg_enable_capture(self.pg_interfaces)
1766         self.pg_start()
1767         capture = self.pg1.get_capture(1)
1768         p = capture[0]
1769         try:
1770             ip = p[IP]
1771             tcp = p[TCP]
1772             self.assertEqual(ip.src, server1.ip4)
1773             self.assertEqual(tcp.sport, local_port)
1774             self.assert_packet_checksums_valid(p)
1775         except:
1776             self.logger.error(ppp("Unexpected or invalid packet:", p))
1777             raise
1778
1779     def test_lb_affinity(self):
1780         """NAT44ED local service load balancing affinity"""
1781         external_addr = self.nat_addr
1782         external_port = 80
1783         local_port = 8080
1784         server1 = self.pg0.remote_hosts[0]
1785         server2 = self.pg0.remote_hosts[1]
1786
1787         locals = [
1788             {"addr": server1.ip4, "port": local_port, "probability": 50, "vrf_id": 0},
1789             {"addr": server2.ip4, "port": local_port, "probability": 50, "vrf_id": 0},
1790         ]
1791
1792         self.nat_add_address(self.nat_addr)
1793         self.vapi.nat44_add_del_lb_static_mapping(
1794             is_add=1,
1795             external_addr=external_addr,
1796             external_port=external_port,
1797             protocol=IP_PROTOS.tcp,
1798             affinity=10800,
1799             local_num=len(locals),
1800             locals=locals,
1801         )
1802         flags = self.config_flags.NAT_IS_INSIDE
1803         self.vapi.nat44_interface_add_del_feature(
1804             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1805         )
1806         self.vapi.nat44_interface_add_del_feature(
1807             sw_if_index=self.pg1.sw_if_index, is_add=1
1808         )
1809
1810         p = (
1811             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1812             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1813             / TCP(sport=1025, dport=external_port)
1814         )
1815         self.pg1.add_stream(p)
1816         self.pg_enable_capture(self.pg_interfaces)
1817         self.pg_start()
1818         capture = self.pg0.get_capture(1)
1819         backend = capture[0][IP].dst
1820
1821         sessions = self.vapi.nat44_user_session_dump(backend, 0)
1822         self.assertEqual(len(sessions), 1)
1823         self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1824         self.vapi.nat44_del_session(
1825             address=sessions[0].inside_ip_address,
1826             port=sessions[0].inside_port,
1827             protocol=sessions[0].protocol,
1828             flags=(
1829                 self.config_flags.NAT_IS_INSIDE
1830                 | self.config_flags.NAT_IS_EXT_HOST_VALID
1831             ),
1832             ext_host_address=sessions[0].ext_host_address,
1833             ext_host_port=sessions[0].ext_host_port,
1834         )
1835
1836         pkts = []
1837         for port in range(1030, 1100):
1838             p = (
1839                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1840                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1841                 / TCP(sport=port, dport=external_port)
1842             )
1843             pkts.append(p)
1844         self.pg1.add_stream(pkts)
1845         self.pg_enable_capture(self.pg_interfaces)
1846         self.pg_start()
1847         capture = self.pg0.get_capture(len(pkts))
1848         for p in capture:
1849             self.assertEqual(p[IP].dst, backend)
1850
1851     def test_multiple_vrf_1(self):
1852         """Multiple VRF - both client & service in VRF1"""
1853
1854         external_addr = "1.2.3.4"
1855         external_port = 80
1856         local_port = 8080
1857         port = 0
1858
1859         flags = self.config_flags.NAT_IS_INSIDE
1860         self.vapi.nat44_interface_add_del_feature(
1861             sw_if_index=self.pg5.sw_if_index, is_add=1
1862         )
1863         self.vapi.nat44_interface_add_del_feature(
1864             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
1865         )
1866         self.vapi.nat44_interface_add_del_feature(
1867             sw_if_index=self.pg6.sw_if_index, is_add=1
1868         )
1869         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1870         self.nat_add_static_mapping(
1871             self.pg5.remote_ip4,
1872             external_addr,
1873             local_port,
1874             external_port,
1875             vrf_id=1,
1876             proto=IP_PROTOS.tcp,
1877             flags=flags,
1878         )
1879
1880         p = (
1881             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
1882             / IP(src=self.pg6.remote_ip4, dst=external_addr)
1883             / TCP(sport=12345, dport=external_port)
1884         )
1885         self.pg6.add_stream(p)
1886         self.pg_enable_capture(self.pg_interfaces)
1887         self.pg_start()
1888         capture = self.pg5.get_capture(1)
1889         p = capture[0]
1890         try:
1891             ip = p[IP]
1892             tcp = p[TCP]
1893             self.assertEqual(ip.dst, self.pg5.remote_ip4)
1894             self.assertEqual(tcp.dport, local_port)
1895             self.assert_packet_checksums_valid(p)
1896         except:
1897             self.logger.error(ppp("Unexpected or invalid packet:", p))
1898             raise
1899
1900         p = (
1901             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
1902             / IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4)
1903             / TCP(sport=local_port, dport=12345)
1904         )
1905         self.pg5.add_stream(p)
1906         self.pg_enable_capture(self.pg_interfaces)
1907         self.pg_start()
1908         capture = self.pg6.get_capture(1)
1909         p = capture[0]
1910         try:
1911             ip = p[IP]
1912             tcp = p[TCP]
1913             self.assertEqual(ip.src, external_addr)
1914             self.assertEqual(tcp.sport, external_port)
1915             self.assert_packet_checksums_valid(p)
1916         except:
1917             self.logger.error(ppp("Unexpected or invalid packet:", p))
1918             raise
1919
1920     def test_multiple_vrf_2(self):
1921         """Multiple VRF - dynamic NAT from VRF1 to VRF0 (output-feature)"""
1922
1923         external_addr = "1.2.3.4"
1924         external_port = 80
1925         local_port = 8080
1926         port = 0
1927
1928         self.nat_add_address(self.nat_addr)
1929         flags = self.config_flags.NAT_IS_INSIDE
1930         self.vapi.nat44_ed_add_del_output_interface(
1931             sw_if_index=self.pg1.sw_if_index, is_add=1
1932         )
1933         self.vapi.nat44_interface_add_del_feature(
1934             sw_if_index=self.pg5.sw_if_index, is_add=1
1935         )
1936         self.vapi.nat44_interface_add_del_feature(
1937             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
1938         )
1939         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1940         self.nat_add_static_mapping(
1941             self.pg5.remote_ip4,
1942             external_addr,
1943             local_port,
1944             external_port,
1945             vrf_id=1,
1946             proto=IP_PROTOS.tcp,
1947             flags=flags,
1948         )
1949
1950         p = (
1951             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
1952             / IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4)
1953             / TCP(sport=2345, dport=22)
1954         )
1955         self.pg5.add_stream(p)
1956         self.pg_enable_capture(self.pg_interfaces)
1957         self.pg_start()
1958         capture = self.pg1.get_capture(1)
1959         p = capture[0]
1960         try:
1961             ip = p[IP]
1962             tcp = p[TCP]
1963             self.assertEqual(ip.src, self.nat_addr)
1964             self.assert_packet_checksums_valid(p)
1965             port = tcp.sport
1966         except:
1967             self.logger.error(ppp("Unexpected or invalid packet:", p))
1968             raise
1969
1970         p = (
1971             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1972             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1973             / TCP(sport=22, dport=port)
1974         )
1975         self.pg1.add_stream(p)
1976         self.pg_enable_capture(self.pg_interfaces)
1977         self.pg_start()
1978         capture = self.pg5.get_capture(1)
1979         p = capture[0]
1980         try:
1981             ip = p[IP]
1982             tcp = p[TCP]
1983             self.assertEqual(ip.dst, self.pg5.remote_ip4)
1984             self.assertEqual(tcp.dport, 2345)
1985             self.assert_packet_checksums_valid(p)
1986         except:
1987             self.logger.error(ppp("Unexpected or invalid packet:", p))
1988             raise
1989
1990     def test_multiple_vrf_3(self):
1991         """Multiple VRF - client in VRF1, service in VRF0"""
1992
1993         external_addr = "1.2.3.4"
1994         external_port = 80
1995         local_port = 8080
1996         port = 0
1997
1998         flags = self.config_flags.NAT_IS_INSIDE
1999         self.vapi.nat44_interface_add_del_feature(
2000             sw_if_index=self.pg0.sw_if_index, is_add=1
2001         )
2002         self.vapi.nat44_interface_add_del_feature(
2003             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2004         )
2005         self.vapi.nat44_interface_add_del_feature(
2006             sw_if_index=self.pg6.sw_if_index, is_add=1
2007         )
2008         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2009         self.nat_add_static_mapping(
2010             self.pg0.remote_ip4,
2011             external_sw_if_index=self.pg0.sw_if_index,
2012             local_port=local_port,
2013             vrf_id=0,
2014             external_port=external_port,
2015             proto=IP_PROTOS.tcp,
2016             flags=flags,
2017         )
2018
2019         # from client VRF1 to service VRF0
2020         p = (
2021             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
2022             / IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4)
2023             / TCP(sport=12346, dport=external_port)
2024         )
2025         self.pg6.add_stream(p)
2026         self.pg_enable_capture(self.pg_interfaces)
2027         self.pg_start()
2028         capture = self.pg0.get_capture(1)
2029         p = capture[0]
2030         try:
2031             ip = p[IP]
2032             tcp = p[TCP]
2033             self.assertEqual(ip.dst, self.pg0.remote_ip4)
2034             self.assertEqual(tcp.dport, local_port)
2035             self.assert_packet_checksums_valid(p)
2036         except:
2037             self.logger.error(ppp("Unexpected or invalid packet:", p))
2038             raise
2039
2040         # from service VRF0 back to client VRF1
2041         p = (
2042             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2043             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2044             / TCP(sport=local_port, dport=12346)
2045         )
2046         self.pg0.add_stream(p)
2047         self.pg_enable_capture(self.pg_interfaces)
2048         self.pg_start()
2049         capture = self.pg6.get_capture(1)
2050         p = capture[0]
2051         try:
2052             ip = p[IP]
2053             tcp = p[TCP]
2054             self.assertEqual(ip.src, self.pg0.local_ip4)
2055             self.assertEqual(tcp.sport, external_port)
2056             self.assert_packet_checksums_valid(p)
2057         except:
2058             self.logger.error(ppp("Unexpected or invalid packet:", p))
2059             raise
2060
2061     def test_multiple_vrf_4(self):
2062         """Multiple VRF - client in VRF0, service in VRF1"""
2063
2064         external_addr = "1.2.3.4"
2065         external_port = 80
2066         local_port = 8080
2067         port = 0
2068
2069         flags = self.config_flags.NAT_IS_INSIDE
2070         self.vapi.nat44_interface_add_del_feature(
2071             sw_if_index=self.pg0.sw_if_index, is_add=1
2072         )
2073         self.vapi.nat44_interface_add_del_feature(
2074             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2075         )
2076         self.vapi.nat44_interface_add_del_feature(
2077             sw_if_index=self.pg5.sw_if_index, is_add=1
2078         )
2079         self.vapi.nat44_interface_add_del_feature(
2080             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
2081         )
2082         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2083         self.nat_add_static_mapping(
2084             self.pg5.remote_ip4,
2085             external_addr,
2086             local_port,
2087             external_port,
2088             vrf_id=1,
2089             proto=IP_PROTOS.tcp,
2090             flags=flags,
2091         )
2092
2093         # from client VRF0 to service VRF1
2094         p = (
2095             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2096             / IP(src=self.pg0.remote_ip4, dst=external_addr)
2097             / TCP(sport=12347, dport=external_port)
2098         )
2099         self.pg0.add_stream(p)
2100         self.pg_enable_capture(self.pg_interfaces)
2101         self.pg_start()
2102         capture = self.pg5.get_capture(1)
2103         p = capture[0]
2104         try:
2105             ip = p[IP]
2106             tcp = p[TCP]
2107             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2108             self.assertEqual(tcp.dport, local_port)
2109             self.assert_packet_checksums_valid(p)
2110         except:
2111             self.logger.error(ppp("Unexpected or invalid packet:", p))
2112             raise
2113
2114         # from service VRF1 back to client VRF0
2115         p = (
2116             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2117             / IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4)
2118             / TCP(sport=local_port, dport=12347)
2119         )
2120         self.pg5.add_stream(p)
2121         self.pg_enable_capture(self.pg_interfaces)
2122         self.pg_start()
2123         capture = self.pg0.get_capture(1)
2124         p = capture[0]
2125         try:
2126             ip = p[IP]
2127             tcp = p[TCP]
2128             self.assertEqual(ip.src, external_addr)
2129             self.assertEqual(tcp.sport, external_port)
2130             self.assert_packet_checksums_valid(p)
2131         except:
2132             self.logger.error(ppp("Unexpected or invalid packet:", p))
2133             raise
2134
2135     def test_multiple_vrf_5(self):
2136         """Multiple VRF - forwarding - no translation"""
2137
2138         external_addr = "1.2.3.4"
2139         external_port = 80
2140         local_port = 8080
2141         port = 0
2142
2143         self.vapi.nat44_forwarding_enable_disable(enable=1)
2144         flags = self.config_flags.NAT_IS_INSIDE
2145         self.vapi.nat44_interface_add_del_feature(
2146             sw_if_index=self.pg0.sw_if_index, is_add=1
2147         )
2148         self.vapi.nat44_interface_add_del_feature(
2149             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2150         )
2151         self.vapi.nat44_interface_add_del_feature(
2152             sw_if_index=self.pg5.sw_if_index, is_add=1
2153         )
2154         self.vapi.nat44_interface_add_del_feature(
2155             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
2156         )
2157         self.vapi.nat44_interface_add_del_feature(
2158             sw_if_index=self.pg6.sw_if_index, is_add=1
2159         )
2160         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2161         self.nat_add_static_mapping(
2162             self.pg5.remote_ip4,
2163             external_addr,
2164             local_port,
2165             external_port,
2166             vrf_id=1,
2167             proto=IP_PROTOS.tcp,
2168             flags=flags,
2169         )
2170         self.nat_add_static_mapping(
2171             self.pg0.remote_ip4,
2172             external_sw_if_index=self.pg0.sw_if_index,
2173             local_port=local_port,
2174             vrf_id=0,
2175             external_port=external_port,
2176             proto=IP_PROTOS.tcp,
2177             flags=flags,
2178         )
2179
2180         # from client to server (both VRF1, no translation)
2181         p = (
2182             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
2183             / IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4)
2184             / TCP(sport=12348, dport=local_port)
2185         )
2186         self.pg6.add_stream(p)
2187         self.pg_enable_capture(self.pg_interfaces)
2188         self.pg_start()
2189         capture = self.pg5.get_capture(1)
2190         p = capture[0]
2191         try:
2192             ip = p[IP]
2193             tcp = p[TCP]
2194             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2195             self.assertEqual(tcp.dport, local_port)
2196             self.assert_packet_checksums_valid(p)
2197         except:
2198             self.logger.error(ppp("Unexpected or invalid packet:", p))
2199             raise
2200
2201         # from server back to client (both VRF1, no translation)
2202         p = (
2203             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2204             / IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4)
2205             / TCP(sport=local_port, dport=12348)
2206         )
2207         self.pg5.add_stream(p)
2208         self.pg_enable_capture(self.pg_interfaces)
2209         self.pg_start()
2210         capture = self.pg6.get_capture(1)
2211         p = capture[0]
2212         try:
2213             ip = p[IP]
2214             tcp = p[TCP]
2215             self.assertEqual(ip.src, self.pg5.remote_ip4)
2216             self.assertEqual(tcp.sport, local_port)
2217             self.assert_packet_checksums_valid(p)
2218         except:
2219             self.logger.error(ppp("Unexpected or invalid packet:", p))
2220             raise
2221
2222         # from client VRF1 to server VRF0 (no translation)
2223         p = (
2224             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2225             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2226             / TCP(sport=local_port, dport=12349)
2227         )
2228         self.pg0.add_stream(p)
2229         self.pg_enable_capture(self.pg_interfaces)
2230         self.pg_start()
2231         capture = self.pg6.get_capture(1)
2232         p = capture[0]
2233         try:
2234             ip = p[IP]
2235             tcp = p[TCP]
2236             self.assertEqual(ip.src, self.pg0.remote_ip4)
2237             self.assertEqual(tcp.sport, local_port)
2238             self.assert_packet_checksums_valid(p)
2239         except:
2240             self.logger.error(ppp("Unexpected or invalid packet:", p))
2241             raise
2242
2243         # from server VRF0 back to client VRF1 (no translation)
2244         p = (
2245             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2246             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2247             / TCP(sport=local_port, dport=12349)
2248         )
2249         self.pg0.add_stream(p)
2250         self.pg_enable_capture(self.pg_interfaces)
2251         self.pg_start()
2252         capture = self.pg6.get_capture(1)
2253         p = capture[0]
2254         try:
2255             ip = p[IP]
2256             tcp = p[TCP]
2257             self.assertEqual(ip.src, self.pg0.remote_ip4)
2258             self.assertEqual(tcp.sport, local_port)
2259             self.assert_packet_checksums_valid(p)
2260         except:
2261             self.logger.error(ppp("Unexpected or invalid packet:", p))
2262             raise
2263
2264         # from client VRF0 to server VRF1 (no translation)
2265         p = (
2266             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2267             / IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4)
2268             / TCP(sport=12344, dport=local_port)
2269         )
2270         self.pg0.add_stream(p)
2271         self.pg_enable_capture(self.pg_interfaces)
2272         self.pg_start()
2273         capture = self.pg5.get_capture(1)
2274         p = capture[0]
2275         try:
2276             ip = p[IP]
2277             tcp = p[TCP]
2278             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2279             self.assertEqual(tcp.dport, local_port)
2280             self.assert_packet_checksums_valid(p)
2281         except:
2282             self.logger.error(ppp("Unexpected or invalid packet:", p))
2283             raise
2284
2285         # from server VRF1 back to client VRF0 (no translation)
2286         p = (
2287             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2288             / IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4)
2289             / TCP(sport=local_port, dport=12344)
2290         )
2291         self.pg5.add_stream(p)
2292         self.pg_enable_capture(self.pg_interfaces)
2293         self.pg_start()
2294         capture = self.pg0.get_capture(1)
2295         p = capture[0]
2296         try:
2297             ip = p[IP]
2298             tcp = p[TCP]
2299             self.assertEqual(ip.src, self.pg5.remote_ip4)
2300             self.assertEqual(tcp.sport, local_port)
2301             self.assert_packet_checksums_valid(p)
2302         except:
2303             self.logger.error(ppp("Unexpected or invalid packet:", p))
2304             raise
2305
2306     def test_outside_address_distribution(self):
2307         """Outside address distribution based on source address"""
2308
2309         x = 100
2310         nat_addresses = []
2311
2312         for i in range(1, x):
2313             a = "10.0.0.%d" % i
2314             nat_addresses.append(a)
2315
2316         self.nat_add_inside_interface(self.pg0)
2317         self.nat_add_outside_interface(self.pg1)
2318
2319         self.vapi.nat44_add_del_address_range(
2320             first_ip_address=nat_addresses[0],
2321             last_ip_address=nat_addresses[-1],
2322             vrf_id=0xFFFFFFFF,
2323             is_add=1,
2324             flags=0,
2325         )
2326
2327         self.pg0.generate_remote_hosts(x)
2328
2329         pkts = []
2330         for i in range(x):
2331             info = self.create_packet_info(self.pg0, self.pg1)
2332             payload = self.info_to_payload(info)
2333             p = (
2334                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2335                 / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4)
2336                 / UDP(sport=7000 + i, dport=8000 + i)
2337                 / Raw(payload)
2338             )
2339             info.data = p
2340             pkts.append(p)
2341
2342         self.pg0.add_stream(pkts)
2343         self.pg_enable_capture(self.pg_interfaces)
2344         self.pg_start()
2345         recvd = self.pg1.get_capture(len(pkts))
2346         for p_recvd in recvd:
2347             payload_info = self.payload_to_info(p_recvd[Raw])
2348             packet_index = payload_info.index
2349             info = self._packet_infos[packet_index]
2350             self.assertTrue(info is not None)
2351             self.assertEqual(packet_index, info.index)
2352             p_sent = info.data
2353             packed = socket.inet_aton(p_sent[IP].src)
2354             numeric = struct.unpack("!L", packed)[0]
2355             numeric = socket.htonl(numeric)
2356             a = nat_addresses[(numeric - 1) % len(nat_addresses)]
2357             self.assertEqual(
2358                 a,
2359                 p_recvd[IP].src,
2360                 "Invalid packet (src IP %s translated to %s, but expected %s)"
2361                 % (p_sent[IP].src, p_recvd[IP].src, a),
2362             )
2363
2364     def test_dynamic_edge_ports(self):
2365         """NAT44ED dynamic translation test: edge ports"""
2366
2367         worker_count = self.vpp_worker_count or 1
2368         port_offset = 1024
2369         port_per_thread = (65536 - port_offset) // worker_count
2370         port_count = port_per_thread * worker_count
2371
2372         # worker thread edge ports
2373         thread_edge_ports = {0, port_offset - 1, 65535}
2374         for i in range(0, worker_count):
2375             port_thread_offset = (port_per_thread * i) + port_offset
2376             for port_range_offset in [0, port_per_thread - 1]:
2377                 port = port_thread_offset + port_range_offset
2378                 thread_edge_ports.add(port)
2379         thread_drop_ports = set(
2380             filter(
2381                 lambda x: x not in range(port_offset, port_offset + port_count),
2382                 thread_edge_ports,
2383             )
2384         )
2385
2386         in_if = self.pg7
2387         out_if = self.pg8
2388
2389         self.nat_add_address(self.nat_addr)
2390
2391         try:
2392             self.configure_ip4_interface(in_if, hosts=worker_count)
2393             self.configure_ip4_interface(out_if)
2394
2395             self.nat_add_inside_interface(in_if)
2396             self.nat_add_outside_interface(out_if)
2397
2398             # in2out
2399             tc1 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2400             uc1 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2401             ic1 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2402             dc1 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2403
2404             pkt_count = worker_count * len(thread_edge_ports)
2405
2406             i2o_pkts = [[] for x in range(0, worker_count)]
2407             for i in range(0, worker_count):
2408                 remote_host = in_if.remote_hosts[i]
2409                 for port in thread_edge_ports:
2410                     p = (
2411                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2412                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2413                         / TCP(sport=port, dport=port)
2414                     )
2415                     i2o_pkts[i].append(p)
2416
2417                     p = (
2418                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2419                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2420                         / UDP(sport=port, dport=port)
2421                     )
2422                     i2o_pkts[i].append(p)
2423
2424                     p = (
2425                         Ether(dst=in_if.local_mac, src=in_if.remote_mac)
2426                         / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
2427                         / ICMP(id=port, seq=port, type="echo-request")
2428                     )
2429                     i2o_pkts[i].append(p)
2430
2431             for i in range(0, worker_count):
2432                 if len(i2o_pkts[i]) > 0:
2433                     in_if.add_stream(i2o_pkts[i], worker=i)
2434
2435             self.pg_enable_capture(self.pg_interfaces)
2436             self.pg_start()
2437             capture = out_if.get_capture(pkt_count * 3)
2438             for packet in capture:
2439                 self.assert_packet_checksums_valid(packet)
2440                 if packet.haslayer(TCP):
2441                     self.assert_in_range(
2442                         packet[TCP].sport,
2443                         port_offset,
2444                         port_offset + port_count,
2445                         "src TCP port",
2446                     )
2447                 elif packet.haslayer(UDP):
2448                     self.assert_in_range(
2449                         packet[UDP].sport,
2450                         port_offset,
2451                         port_offset + port_count,
2452                         "src UDP port",
2453                     )
2454                 elif packet.haslayer(ICMP):
2455                     self.assert_in_range(
2456                         packet[ICMP].id,
2457                         port_offset,
2458                         port_offset + port_count,
2459                         "ICMP id",
2460                     )
2461                 else:
2462                     self.fail(
2463                         ppp("Unexpected or invalid packet (outside network):", packet)
2464                     )
2465
2466             if_idx = in_if.sw_if_index
2467             tc2 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2468             uc2 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2469             ic2 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2470             dc2 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2471
2472             self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2473             self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2474             self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2475             self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2476
2477             # out2in
2478             tc1 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2479             uc1 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2480             ic1 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2481             dc1 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2482             dc3 = self.statistics["/nat44-ed/out2in/slowpath/drops"]
2483
2484             # replies to unchanged thread ports should pass on each worker,
2485             # excluding packets outside dynamic port range
2486             drop_count = worker_count * len(thread_drop_ports)
2487             pass_count = worker_count * len(thread_edge_ports) - drop_count
2488
2489             o2i_pkts = [[] for x in range(0, worker_count)]
2490             for i in range(0, worker_count):
2491                 for port in thread_edge_ports:
2492                     p = (
2493                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2494                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2495                         / TCP(sport=port, dport=port)
2496                     )
2497                     o2i_pkts[i].append(p)
2498
2499                     p = (
2500                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2501                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2502                         / UDP(sport=port, dport=port)
2503                     )
2504                     o2i_pkts[i].append(p)
2505
2506                     p = (
2507                         Ether(dst=out_if.local_mac, src=out_if.remote_mac)
2508                         / IP(src=out_if.remote_ip4, dst=self.nat_addr)
2509                         / ICMP(id=port, seq=port, type="echo-reply")
2510                     )
2511                     o2i_pkts[i].append(p)
2512
2513             for i in range(0, worker_count):
2514                 if len(o2i_pkts[i]) > 0:
2515                     out_if.add_stream(o2i_pkts[i], worker=i)
2516
2517             self.pg_enable_capture(self.pg_interfaces)
2518             self.pg_start()
2519             capture = in_if.get_capture(pass_count * 3)
2520             for packet in capture:
2521                 self.assert_packet_checksums_valid(packet)
2522                 if packet.haslayer(TCP):
2523                     self.assertIn(packet[TCP].dport, thread_edge_ports, "dst TCP port")
2524                     self.assertEqual(packet[TCP].dport, packet[TCP].sport, "TCP ports")
2525                 elif packet.haslayer(UDP):
2526                     self.assertIn(packet[UDP].dport, thread_edge_ports, "dst UDP port")
2527                     self.assertEqual(packet[UDP].dport, packet[UDP].sport, "UDP ports")
2528                 elif packet.haslayer(ICMP):
2529                     self.assertIn(packet[ICMP].id, thread_edge_ports, "ICMP id")
2530                     self.assertEqual(packet[ICMP].id, packet[ICMP].seq, "ICMP id & seq")
2531                 else:
2532                     self.fail(
2533                         ppp("Unexpected or invalid packet (inside network):", packet)
2534                     )
2535
2536             if_idx = out_if.sw_if_index
2537             tc2 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2538             uc2 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2539             ic2 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2540             dc2 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2541             dc4 = self.statistics["/nat44-ed/out2in/slowpath/drops"]
2542
2543             self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pass_count)
2544             self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pass_count)
2545             self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pass_count)
2546             self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2547             self.assertEqual(
2548                 dc4[:, if_idx].sum() - dc3[:, if_idx].sum(), drop_count * 3
2549             )
2550
2551         finally:
2552             in_if.unconfig()
2553             out_if.unconfig()
2554
2555
2556 @tag_fixme_ubuntu2204
2557 class TestNAT44EDMW(TestNAT44ED):
2558     """NAT44ED MW Test Case"""
2559
2560     vpp_worker_count = 4
2561     max_sessions = 5000
2562
2563     def test_dynamic(self):
2564         """NAT44ED dynamic translation test"""
2565         pkt_count = 1500
2566         tcp_port_offset = 20
2567         udp_port_offset = 20
2568         icmp_id_offset = 20
2569
2570         self.nat_add_address(self.nat_addr)
2571         self.nat_add_inside_interface(self.pg0)
2572         self.nat_add_outside_interface(self.pg1)
2573
2574         # in2out
2575         tc1 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2576         uc1 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2577         ic1 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2578         dc1 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2579
2580         i2o_pkts = [[] for x in range(0, self.vpp_worker_count)]
2581
2582         for i in range(pkt_count):
2583             p = (
2584                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2585                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2586                 / TCP(sport=tcp_port_offset + i, dport=20)
2587             )
2588             i2o_pkts[p[TCP].sport % self.vpp_worker_count].append(p)
2589
2590             p = (
2591                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2592                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2593                 / UDP(sport=udp_port_offset + i, dport=20)
2594             )
2595             i2o_pkts[p[UDP].sport % self.vpp_worker_count].append(p)
2596
2597             p = (
2598                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2599                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2600                 / ICMP(id=icmp_id_offset + i, type="echo-request")
2601             )
2602             i2o_pkts[p[ICMP].id % self.vpp_worker_count].append(p)
2603
2604         for i in range(0, self.vpp_worker_count):
2605             if len(i2o_pkts[i]) > 0:
2606                 self.pg0.add_stream(i2o_pkts[i], worker=i)
2607
2608         self.pg_enable_capture(self.pg_interfaces)
2609         self.pg_start()
2610         capture = self.pg1.get_capture(pkt_count * 3, timeout=5)
2611
2612         if_idx = self.pg0.sw_if_index
2613         tc2 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2614         uc2 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2615         ic2 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2616         dc2 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2617
2618         self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2619         self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2620         self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2621         self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2622
2623         self.logger.info(self.vapi.cli("show trace"))
2624
2625         # out2in
2626         tc1 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2627         uc1 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2628         ic1 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2629         dc1 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2630
2631         recvd_tcp_ports = set()
2632         recvd_udp_ports = set()
2633         recvd_icmp_ids = set()
2634
2635         for p in capture:
2636             if TCP in p:
2637                 recvd_tcp_ports.add(p[TCP].sport)
2638             if UDP in p:
2639                 recvd_udp_ports.add(p[UDP].sport)
2640             if ICMP in p:
2641                 recvd_icmp_ids.add(p[ICMP].id)
2642
2643         recvd_tcp_ports = list(recvd_tcp_ports)
2644         recvd_udp_ports = list(recvd_udp_ports)
2645         recvd_icmp_ids = list(recvd_icmp_ids)
2646
2647         o2i_pkts = [[] for x in range(0, self.vpp_worker_count)]
2648         for i in range(pkt_count):
2649             p = (
2650                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2651                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2652                 / TCP(dport=choice(recvd_tcp_ports), sport=20)
2653             )
2654             o2i_pkts[p[TCP].dport % self.vpp_worker_count].append(p)
2655
2656             p = (
2657                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2658                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2659                 / UDP(dport=choice(recvd_udp_ports), sport=20)
2660             )
2661             o2i_pkts[p[UDP].dport % self.vpp_worker_count].append(p)
2662
2663             p = (
2664                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2665                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2666                 / ICMP(id=choice(recvd_icmp_ids), type="echo-reply")
2667             )
2668             o2i_pkts[p[ICMP].id % self.vpp_worker_count].append(p)
2669
2670         for i in range(0, self.vpp_worker_count):
2671             if len(o2i_pkts[i]) > 0:
2672                 self.pg1.add_stream(o2i_pkts[i], worker=i)
2673
2674         self.pg_enable_capture(self.pg_interfaces)
2675         self.pg_start()
2676         capture = self.pg0.get_capture(pkt_count * 3)
2677         for packet in capture:
2678             try:
2679                 self.assert_packet_checksums_valid(packet)
2680                 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2681                 if packet.haslayer(TCP):
2682                     self.assert_in_range(
2683                         packet[TCP].dport,
2684                         tcp_port_offset,
2685                         tcp_port_offset + pkt_count,
2686                         "dst TCP port",
2687                     )
2688                 elif packet.haslayer(UDP):
2689                     self.assert_in_range(
2690                         packet[UDP].dport,
2691                         udp_port_offset,
2692                         udp_port_offset + pkt_count,
2693                         "dst UDP port",
2694                     )
2695                 else:
2696                     self.assert_in_range(
2697                         packet[ICMP].id,
2698                         icmp_id_offset,
2699                         icmp_id_offset + pkt_count,
2700                         "ICMP id",
2701                     )
2702             except:
2703                 self.logger.error(
2704                     ppp("Unexpected or invalid packet (inside network):", packet)
2705                 )
2706                 raise
2707
2708         if_idx = self.pg1.sw_if_index
2709         tc2 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2710         uc2 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2711         ic2 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2712         dc2 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2713
2714         self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2715         self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2716         self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2717         self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2718
2719         sc = self.statistics["/nat44-ed/total-sessions"]
2720         self.assertEqual(
2721             sc[:, 0].sum(),
2722             len(recvd_tcp_ports) + len(recvd_udp_ports) + len(recvd_icmp_ids),
2723         )
2724
2725     def test_frag_in_order(self):
2726         """NAT44ED translate fragments arriving in order"""
2727
2728         self.nat_add_address(self.nat_addr)
2729         self.nat_add_inside_interface(self.pg0)
2730         self.nat_add_outside_interface(self.pg1)
2731
2732         self.frag_in_order(proto=IP_PROTOS.tcp, ignore_port=True)
2733         self.frag_in_order(proto=IP_PROTOS.udp, ignore_port=True)
2734         self.frag_in_order(proto=IP_PROTOS.icmp, ignore_port=True)
2735
2736     def test_frag_in_order_do_not_translate(self):
2737         """NAT44ED don't translate fragments arriving in order"""
2738
2739         self.nat_add_address(self.nat_addr)
2740         self.nat_add_inside_interface(self.pg0)
2741         self.nat_add_outside_interface(self.pg1)
2742         self.vapi.nat44_forwarding_enable_disable(enable=True)
2743
2744         self.frag_in_order(proto=IP_PROTOS.tcp, dont_translate=True)
2745
2746     def test_frag_out_of_order(self):
2747         """NAT44ED translate fragments arriving out of order"""
2748
2749         self.nat_add_address(self.nat_addr)
2750         self.nat_add_inside_interface(self.pg0)
2751         self.nat_add_outside_interface(self.pg1)
2752
2753         self.frag_out_of_order(proto=IP_PROTOS.tcp, ignore_port=True)
2754         self.frag_out_of_order(proto=IP_PROTOS.udp, ignore_port=True)
2755         self.frag_out_of_order(proto=IP_PROTOS.icmp, ignore_port=True)
2756
2757     def test_frag_in_order_in_plus_out(self):
2758         """NAT44ED in+out interface fragments in order"""
2759
2760         in_port = self.random_port()
2761         out_port = self.random_port()
2762
2763         self.nat_add_address(self.nat_addr)
2764         self.nat_add_inside_interface(self.pg0)
2765         self.nat_add_outside_interface(self.pg0)
2766         self.nat_add_inside_interface(self.pg1)
2767         self.nat_add_outside_interface(self.pg1)
2768
2769         # add static mappings for server
2770         self.nat_add_static_mapping(
2771             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.tcp
2772         )
2773         self.nat_add_static_mapping(
2774             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.udp
2775         )
2776         self.nat_add_static_mapping(
2777             self.server_addr, self.nat_addr, proto=IP_PROTOS.icmp
2778         )
2779
2780         # run tests for each protocol
2781         self.frag_in_order_in_plus_out(
2782             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.tcp
2783         )
2784         self.frag_in_order_in_plus_out(
2785             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.udp
2786         )
2787         self.frag_in_order_in_plus_out(
2788             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.icmp
2789         )
2790
2791     def test_frag_out_of_order_in_plus_out(self):
2792         """NAT44ED in+out interface fragments out of order"""
2793
2794         in_port = self.random_port()
2795         out_port = self.random_port()
2796
2797         self.nat_add_address(self.nat_addr)
2798         self.nat_add_inside_interface(self.pg0)
2799         self.nat_add_outside_interface(self.pg0)
2800         self.nat_add_inside_interface(self.pg1)
2801         self.nat_add_outside_interface(self.pg1)
2802
2803         # add static mappings for server
2804         self.nat_add_static_mapping(
2805             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.tcp
2806         )
2807         self.nat_add_static_mapping(
2808             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.udp
2809         )
2810         self.nat_add_static_mapping(
2811             self.server_addr, self.nat_addr, proto=IP_PROTOS.icmp
2812         )
2813
2814         # run tests for each protocol
2815         self.frag_out_of_order_in_plus_out(
2816             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.tcp
2817         )
2818         self.frag_out_of_order_in_plus_out(
2819             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.udp
2820         )
2821         self.frag_out_of_order_in_plus_out(
2822             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.icmp
2823         )
2824
2825     def test_reass_hairpinning(self):
2826         """NAT44ED fragments hairpinning"""
2827
2828         server_addr = self.pg0.remote_hosts[1].ip4
2829
2830         host_in_port = self.random_port()
2831         server_in_port = self.random_port()
2832         server_out_port = self.random_port()
2833
2834         self.nat_add_address(self.nat_addr)
2835         self.nat_add_inside_interface(self.pg0)
2836         self.nat_add_outside_interface(self.pg1)
2837
2838         # add static mapping for server
2839         self.nat_add_static_mapping(
2840             server_addr,
2841             self.nat_addr,
2842             server_in_port,
2843             server_out_port,
2844             proto=IP_PROTOS.tcp,
2845         )
2846         self.nat_add_static_mapping(
2847             server_addr,
2848             self.nat_addr,
2849             server_in_port,
2850             server_out_port,
2851             proto=IP_PROTOS.udp,
2852         )
2853         self.nat_add_static_mapping(server_addr, self.nat_addr)
2854
2855         self.reass_hairpinning(
2856             server_addr,
2857             server_in_port,
2858             server_out_port,
2859             host_in_port,
2860             proto=IP_PROTOS.tcp,
2861             ignore_port=True,
2862         )
2863         self.reass_hairpinning(
2864             server_addr,
2865             server_in_port,
2866             server_out_port,
2867             host_in_port,
2868             proto=IP_PROTOS.udp,
2869             ignore_port=True,
2870         )
2871         self.reass_hairpinning(
2872             server_addr,
2873             server_in_port,
2874             server_out_port,
2875             host_in_port,
2876             proto=IP_PROTOS.icmp,
2877             ignore_port=True,
2878         )
2879
2880     def test_session_limit_per_vrf(self):
2881         """NAT44ED per vrf session limit"""
2882
2883         inside = self.pg0
2884         inside_vrf10 = self.pg2
2885         outside = self.pg1
2886
2887         limit = 5
2888
2889         # 2 interfaces pg0, pg1 (vrf10, limit 1 tcp session)
2890         # non existing vrf_id makes process core dump
2891         self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=10)
2892
2893         self.nat_add_inside_interface(inside)
2894         self.nat_add_inside_interface(inside_vrf10)
2895         self.nat_add_outside_interface(outside)
2896
2897         # vrf independent
2898         self.nat_add_interface_address(outside)
2899
2900         # BUG: causing core dump - when bad vrf_id is specified
2901         # self.nat_add_address(outside.local_ip4, vrf_id=20)
2902
2903         stream = self.create_tcp_stream(inside_vrf10, outside, limit * 2)
2904         inside_vrf10.add_stream(stream)
2905
2906         self.pg_enable_capture(self.pg_interfaces)
2907         self.pg_start()
2908
2909         capture = outside.get_capture(limit)
2910
2911         stream = self.create_tcp_stream(inside, outside, limit * 2)
2912         inside.add_stream(stream)
2913
2914         self.pg_enable_capture(self.pg_interfaces)
2915         self.pg_start()
2916
2917         capture = outside.get_capture(len(stream))
2918
2919     def test_show_max_translations(self):
2920         """NAT44ED API test - max translations per thread"""
2921         config = self.vapi.nat44_show_running_config()
2922         self.assertEqual(self.max_sessions, config.sessions)
2923
2924     def test_lru_cleanup(self):
2925         """NAT44ED LRU cleanup algorithm"""
2926
2927         self.nat_add_address(self.nat_addr)
2928         self.nat_add_inside_interface(self.pg0)
2929         self.nat_add_outside_interface(self.pg1)
2930
2931         self.vapi.nat_set_timeouts(
2932             udp=1, tcp_established=7440, tcp_transitory=30, icmp=1
2933         )
2934
2935         tcp_port_out = self.init_tcp_session(self.pg0, self.pg1, 2000, 80)
2936         pkts = []
2937         for i in range(0, self.max_sessions - 1):
2938             p = (
2939                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2940                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
2941                 / UDP(sport=7000 + i, dport=80)
2942             )
2943             pkts.append(p)
2944
2945         self.pg0.add_stream(pkts)
2946         self.pg_enable_capture(self.pg_interfaces)
2947         self.pg_start()
2948         self.pg1.get_capture(len(pkts))
2949         self.virtual_sleep(1.5, "wait for timeouts")
2950
2951         pkts = []
2952         for i in range(0, self.max_sessions - 1):
2953             p = (
2954                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2955                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
2956                 / ICMP(id=8000 + i, type="echo-request")
2957             )
2958             pkts.append(p)
2959
2960         self.pg0.add_stream(pkts)
2961         self.pg_enable_capture(self.pg_interfaces)
2962         self.pg_start()
2963         self.pg1.get_capture(len(pkts))
2964
2965     def test_session_rst_timeout(self):
2966         """NAT44ED session RST timeouts"""
2967
2968         self.nat_add_address(self.nat_addr)
2969         self.nat_add_inside_interface(self.pg0)
2970         self.nat_add_outside_interface(self.pg1)
2971
2972         self.vapi.nat_set_timeouts(
2973             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
2974         )
2975
2976         self.init_tcp_session(
2977             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
2978         )
2979         p = (
2980             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2981             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2982             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="R")
2983         )
2984         self.send_and_expect(self.pg0, p, self.pg1)
2985
2986         self.virtual_sleep(6)
2987
2988         # The session is already closed
2989         p = (
2990             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2991             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2992             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
2993         )
2994         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
2995
2996         # The session can be re-opened
2997         p = (
2998             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2999             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3000             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="S")
3001         )
3002         self.send_and_expect(self.pg0, p, self.pg1)
3003
3004     def test_session_rst_established_timeout(self):
3005         """NAT44ED session RST timeouts"""
3006
3007         self.nat_add_address(self.nat_addr)
3008         self.nat_add_inside_interface(self.pg0)
3009         self.nat_add_outside_interface(self.pg1)
3010
3011         self.vapi.nat_set_timeouts(
3012             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
3013         )
3014
3015         self.init_tcp_session(
3016             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
3017         )
3018
3019         # Wait at least the transitory time, the session is in established
3020         # state anyway. RST followed by a data packet should move it to
3021         # transitory state.
3022         self.virtual_sleep(6)
3023         p = (
3024             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3025             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3026             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="R")
3027         )
3028         self.send_and_expect(self.pg0, p, self.pg1)
3029
3030         p = (
3031             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3032             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3033             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
3034         )
3035         self.send_and_expect(self.pg0, p, self.pg1)
3036
3037         # State is transitory, session should be closed after 6 seconds
3038         self.virtual_sleep(6)
3039
3040         p = (
3041             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3042             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3043             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
3044         )
3045         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
3046
3047     def test_dynamic_out_of_ports(self):
3048         """NAT44ED dynamic translation test: out of ports"""
3049
3050         self.nat_add_inside_interface(self.pg0)
3051         self.nat_add_outside_interface(self.pg1)
3052
3053         # in2out and no NAT addresses added
3054         pkts = self.create_stream_in(self.pg0, self.pg1)
3055
3056         self.send_and_assert_no_replies(
3057             self.pg0,
3058             pkts,
3059             msg="i2o pkts",
3060             stats_diff=self.no_diff
3061             | {
3062                 "err": {
3063                     "/err/nat44-ed-in2out-slowpath/out of ports": len(pkts),
3064                 },
3065                 self.pg0.sw_if_index: {
3066                     "/nat44-ed/in2out/slowpath/drops": len(pkts),
3067                 },
3068             },
3069         )
3070
3071         # in2out after NAT addresses added
3072         self.nat_add_address(self.nat_addr)
3073
3074         tcpn, udpn, icmpn = (
3075             sum(x) for x in zip(*((TCP in p, UDP in p, ICMP in p) for p in pkts))
3076         )
3077
3078         self.send_and_expect(
3079             self.pg0,
3080             pkts,
3081             self.pg1,
3082             msg="i2o pkts",
3083             stats_diff=self.no_diff
3084             | {
3085                 "err": {
3086                     "/err/nat44-ed-in2out-slowpath/out of ports": 0,
3087                 },
3088                 self.pg0.sw_if_index: {
3089                     "/nat44-ed/in2out/slowpath/drops": 0,
3090                     "/nat44-ed/in2out/slowpath/tcp": tcpn,
3091                     "/nat44-ed/in2out/slowpath/udp": udpn,
3092                     "/nat44-ed/in2out/slowpath/icmp": icmpn,
3093                 },
3094             },
3095         )
3096
3097     def test_unknown_proto(self):
3098         """NAT44ED translate packet with unknown protocol"""
3099
3100         self.nat_add_address(self.nat_addr)
3101         self.nat_add_inside_interface(self.pg0)
3102         self.nat_add_outside_interface(self.pg1)
3103
3104         # in2out
3105         p = (
3106             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3107             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3108             / TCP(sport=self.tcp_port_in, dport=20)
3109         )
3110         self.pg0.add_stream(p)
3111         self.pg_enable_capture(self.pg_interfaces)
3112         self.pg_start()
3113         p = self.pg1.get_capture(1)
3114
3115         p = (
3116             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3117             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3118             / GRE()
3119             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3120             / TCP(sport=1234, dport=1234)
3121         )
3122         self.pg0.add_stream(p)
3123         self.pg_enable_capture(self.pg_interfaces)
3124         self.pg_start()
3125         p = self.pg1.get_capture(1)
3126         packet = p[0]
3127         try:
3128             self.assertEqual(packet[IP].src, self.nat_addr)
3129             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3130             self.assertEqual(packet.haslayer(GRE), 1)
3131             self.assert_packet_checksums_valid(packet)
3132         except:
3133             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3134             raise
3135
3136         # out2in
3137         p = (
3138             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3139             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3140             / GRE()
3141             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3142             / TCP(sport=1234, dport=1234)
3143         )
3144         self.pg1.add_stream(p)
3145         self.pg_enable_capture(self.pg_interfaces)
3146         self.pg_start()
3147         p = self.pg0.get_capture(1)
3148         packet = p[0]
3149         try:
3150             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3151             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3152             self.assertEqual(packet.haslayer(GRE), 1)
3153             self.assert_packet_checksums_valid(packet)
3154         except:
3155             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3156             raise
3157
3158     def test_hairpinning_unknown_proto(self):
3159         """NAT44ED translate packet with unknown protocol - hairpinning"""
3160         host = self.pg0.remote_hosts[0]
3161         server = self.pg0.remote_hosts[1]
3162         host_in_port = 1234
3163         server_out_port = 8765
3164         server_nat_ip = "10.0.0.11"
3165
3166         self.nat_add_address(self.nat_addr)
3167         self.nat_add_inside_interface(self.pg0)
3168         self.nat_add_outside_interface(self.pg1)
3169
3170         # add static mapping for server
3171         self.nat_add_static_mapping(server.ip4, server_nat_ip)
3172
3173         # host to server
3174         p = (
3175             Ether(src=host.mac, dst=self.pg0.local_mac)
3176             / IP(src=host.ip4, dst=server_nat_ip)
3177             / TCP(sport=host_in_port, dport=server_out_port)
3178         )
3179         self.pg0.add_stream(p)
3180         self.pg_enable_capture(self.pg_interfaces)
3181         self.pg_start()
3182         self.pg0.get_capture(1)
3183
3184         p = (
3185             Ether(dst=self.pg0.local_mac, src=host.mac)
3186             / IP(src=host.ip4, dst=server_nat_ip)
3187             / GRE()
3188             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3189             / TCP(sport=1234, dport=1234)
3190         )
3191         self.pg0.add_stream(p)
3192         self.pg_enable_capture(self.pg_interfaces)
3193         self.pg_start()
3194         p = self.pg0.get_capture(1)
3195         packet = p[0]
3196         try:
3197             self.assertEqual(packet[IP].src, self.nat_addr)
3198             self.assertEqual(packet[IP].dst, server.ip4)
3199             self.assertEqual(packet.haslayer(GRE), 1)
3200             self.assert_packet_checksums_valid(packet)
3201         except:
3202             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3203             raise
3204
3205         # server to host
3206         p = (
3207             Ether(dst=self.pg0.local_mac, src=server.mac)
3208             / IP(src=server.ip4, dst=self.nat_addr)
3209             / GRE()
3210             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3211             / TCP(sport=1234, dport=1234)
3212         )
3213         self.pg0.add_stream(p)
3214         self.pg_enable_capture(self.pg_interfaces)
3215         self.pg_start()
3216         p = self.pg0.get_capture(1)
3217         packet = p[0]
3218         try:
3219             self.assertEqual(packet[IP].src, server_nat_ip)
3220             self.assertEqual(packet[IP].dst, host.ip4)
3221             self.assertEqual(packet.haslayer(GRE), 1)
3222             self.assert_packet_checksums_valid(packet)
3223         except:
3224             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3225             raise
3226
3227     def test_output_feature_and_service(self):
3228         """NAT44ED interface output feature and services"""
3229         external_addr = "1.2.3.4"
3230         external_port = 80
3231         local_port = 8080
3232
3233         self.vapi.nat44_forwarding_enable_disable(enable=1)
3234         self.nat_add_address(self.nat_addr)
3235         flags = self.config_flags.NAT_IS_ADDR_ONLY
3236         self.vapi.nat44_add_del_identity_mapping(
3237             ip_address=self.pg1.remote_ip4,
3238             sw_if_index=0xFFFFFFFF,
3239             flags=flags,
3240             is_add=1,
3241         )
3242         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
3243         self.nat_add_static_mapping(
3244             self.pg0.remote_ip4,
3245             external_addr,
3246             local_port,
3247             external_port,
3248             proto=IP_PROTOS.tcp,
3249             flags=flags,
3250         )
3251
3252         self.nat_add_inside_interface(self.pg0)
3253         self.nat_add_outside_interface(self.pg0)
3254         self.vapi.nat44_ed_add_del_output_interface(
3255             sw_if_index=self.pg1.sw_if_index, is_add=1
3256         )
3257
3258         # from client to service
3259         p = (
3260             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3261             / IP(src=self.pg1.remote_ip4, dst=external_addr)
3262             / TCP(sport=12345, dport=external_port)
3263         )
3264         self.pg1.add_stream(p)
3265         self.pg_enable_capture(self.pg_interfaces)
3266         self.pg_start()
3267         capture = self.pg0.get_capture(1)
3268         p = capture[0]
3269         try:
3270             ip = p[IP]
3271             tcp = p[TCP]
3272             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3273             self.assertEqual(tcp.dport, local_port)
3274             self.assert_packet_checksums_valid(p)
3275         except:
3276             self.logger.error(ppp("Unexpected or invalid packet:", p))
3277             raise
3278
3279         # from service back to client
3280         p = (
3281             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3282             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3283             / TCP(sport=local_port, dport=12345)
3284         )
3285         self.pg0.add_stream(p)
3286         self.pg_enable_capture(self.pg_interfaces)
3287         self.pg_start()
3288         capture = self.pg1.get_capture(1)
3289         p = capture[0]
3290         try:
3291             ip = p[IP]
3292             tcp = p[TCP]
3293             self.assertEqual(ip.src, external_addr)
3294             self.assertEqual(tcp.sport, external_port)
3295             self.assert_packet_checksums_valid(p)
3296         except:
3297             self.logger.error(ppp("Unexpected or invalid packet:", p))
3298             raise
3299
3300         # from local network host to external network
3301         pkts = self.create_stream_in(self.pg0, self.pg1)
3302         self.pg0.add_stream(pkts)
3303         self.pg_enable_capture(self.pg_interfaces)
3304         self.pg_start()
3305         capture = self.pg1.get_capture(len(pkts))
3306         self.verify_capture_out(capture, ignore_port=True)
3307         pkts = self.create_stream_in(self.pg0, self.pg1)
3308         self.pg0.add_stream(pkts)
3309         self.pg_enable_capture(self.pg_interfaces)
3310         self.pg_start()
3311         capture = self.pg1.get_capture(len(pkts))
3312         self.verify_capture_out(capture, ignore_port=True)
3313
3314         # from external network back to local network host
3315         pkts = self.create_stream_out(self.pg1)
3316         self.pg1.add_stream(pkts)
3317         self.pg_enable_capture(self.pg_interfaces)
3318         self.pg_start()
3319         capture = self.pg0.get_capture(len(pkts))
3320         self.verify_capture_in(capture, self.pg0)
3321
3322     def test_output_feature_and_service3(self):
3323         """NAT44ED interface output feature and DST NAT"""
3324         external_addr = "1.2.3.4"
3325         external_port = 80
3326         local_port = 8080
3327
3328         self.vapi.nat44_forwarding_enable_disable(enable=1)
3329         self.nat_add_address(self.nat_addr)
3330         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
3331         self.nat_add_static_mapping(
3332             self.pg1.remote_ip4,
3333             external_addr,
3334             local_port,
3335             external_port,
3336             proto=IP_PROTOS.tcp,
3337             flags=flags,
3338         )
3339
3340         self.nat_add_inside_interface(self.pg0)
3341         self.nat_add_outside_interface(self.pg0)
3342         self.vapi.nat44_ed_add_del_output_interface(
3343             sw_if_index=self.pg1.sw_if_index, is_add=1
3344         )
3345
3346         p = (
3347             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3348             / IP(src=self.pg0.remote_ip4, dst=external_addr)
3349             / TCP(sport=12345, dport=external_port)
3350         )
3351         self.pg0.add_stream(p)
3352         self.pg_enable_capture(self.pg_interfaces)
3353         self.pg_start()
3354         capture = self.pg1.get_capture(1)
3355         p = capture[0]
3356         try:
3357             ip = p[IP]
3358             tcp = p[TCP]
3359             self.assertEqual(ip.src, self.pg0.remote_ip4)
3360             self.assertEqual(tcp.sport, 12345)
3361             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3362             self.assertEqual(tcp.dport, local_port)
3363             self.assert_packet_checksums_valid(p)
3364         except:
3365             self.logger.error(ppp("Unexpected or invalid packet:", p))
3366             raise
3367
3368         p = (
3369             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3370             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
3371             / TCP(sport=local_port, dport=12345)
3372         )
3373         self.pg1.add_stream(p)
3374         self.pg_enable_capture(self.pg_interfaces)
3375         self.pg_start()
3376         capture = self.pg0.get_capture(1)
3377         p = capture[0]
3378         try:
3379             ip = p[IP]
3380             tcp = p[TCP]
3381             self.assertEqual(ip.src, external_addr)
3382             self.assertEqual(tcp.sport, external_port)
3383             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3384             self.assertEqual(tcp.dport, 12345)
3385             self.assert_packet_checksums_valid(p)
3386         except:
3387             self.logger.error(ppp("Unexpected or invalid packet:", p))
3388             raise
3389
3390     def test_self_twice_nat_lb_negative(self):
3391         """NAT44ED Self Twice NAT local service load balancing (negative test)"""
3392         self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, client_id=2)
3393
3394     def test_self_twice_nat_negative(self):
3395         """NAT44ED Self Twice NAT (negative test)"""
3396         self.twice_nat_common(self_twice_nat=True)
3397
3398     def test_static_lb_multi_clients(self):
3399         """NAT44ED local service load balancing - multiple clients"""
3400
3401         external_addr = self.nat_addr
3402         external_port = 80
3403         local_port = 8080
3404         server1 = self.pg0.remote_hosts[0]
3405         server2 = self.pg0.remote_hosts[1]
3406         server3 = self.pg0.remote_hosts[2]
3407
3408         locals = [
3409             {"addr": server1.ip4, "port": local_port, "probability": 90, "vrf_id": 0},
3410             {"addr": server2.ip4, "port": local_port, "probability": 10, "vrf_id": 0},
3411         ]
3412
3413         flags = self.config_flags.NAT_IS_INSIDE
3414         self.vapi.nat44_interface_add_del_feature(
3415             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3416         )
3417         self.vapi.nat44_interface_add_del_feature(
3418             sw_if_index=self.pg1.sw_if_index, is_add=1
3419         )
3420
3421         self.nat_add_address(self.nat_addr)
3422         self.vapi.nat44_add_del_lb_static_mapping(
3423             is_add=1,
3424             external_addr=external_addr,
3425             external_port=external_port,
3426             protocol=IP_PROTOS.tcp,
3427             local_num=len(locals),
3428             locals=locals,
3429         )
3430
3431         server1_n = 0
3432         server2_n = 0
3433         clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3434         pkts = []
3435         for client in clients:
3436             p = (
3437                 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3438                 / IP(src=client, dst=self.nat_addr)
3439                 / TCP(sport=12345, dport=external_port)
3440             )
3441             pkts.append(p)
3442         self.pg1.add_stream(pkts)
3443         self.pg_enable_capture(self.pg_interfaces)
3444         self.pg_start()
3445         capture = self.pg0.get_capture(len(pkts))
3446         for p in capture:
3447             if p[IP].dst == server1.ip4:
3448                 server1_n += 1
3449             else:
3450                 server2_n += 1
3451         self.assertGreaterEqual(server1_n, server2_n)
3452
3453         local = {
3454             "addr": server3.ip4,
3455             "port": local_port,
3456             "probability": 20,
3457             "vrf_id": 0,
3458         }
3459
3460         # add new back-end
3461         self.vapi.nat44_lb_static_mapping_add_del_local(
3462             is_add=1,
3463             external_addr=external_addr,
3464             external_port=external_port,
3465             local=local,
3466             protocol=IP_PROTOS.tcp,
3467         )
3468         server1_n = 0
3469         server2_n = 0
3470         server3_n = 0
3471         clients = ip4_range(self.pg1.remote_ip4, 60, 110)
3472         pkts = []
3473         for client in clients:
3474             p = (
3475                 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3476                 / IP(src=client, dst=self.nat_addr)
3477                 / TCP(sport=12346, dport=external_port)
3478             )
3479             pkts.append(p)
3480         self.assertGreater(len(pkts), 0)
3481         self.pg1.add_stream(pkts)
3482         self.pg_enable_capture(self.pg_interfaces)
3483         self.pg_start()
3484         capture = self.pg0.get_capture(len(pkts))
3485         for p in capture:
3486             if p[IP].dst == server1.ip4:
3487                 server1_n += 1
3488             elif p[IP].dst == server2.ip4:
3489                 server2_n += 1
3490             else:
3491                 server3_n += 1
3492         self.assertGreater(server1_n, 0)
3493         self.assertGreater(server2_n, 0)
3494         self.assertGreater(server3_n, 0)
3495
3496         local = {
3497             "addr": server2.ip4,
3498             "port": local_port,
3499             "probability": 10,
3500             "vrf_id": 0,
3501         }
3502
3503         # remove one back-end
3504         self.vapi.nat44_lb_static_mapping_add_del_local(
3505             is_add=0,
3506             external_addr=external_addr,
3507             external_port=external_port,
3508             local=local,
3509             protocol=IP_PROTOS.tcp,
3510         )
3511         server1_n = 0
3512         server2_n = 0
3513         server3_n = 0
3514         self.pg1.add_stream(pkts)
3515         self.pg_enable_capture(self.pg_interfaces)
3516         self.pg_start()
3517         capture = self.pg0.get_capture(len(pkts))
3518         for p in capture:
3519             if p[IP].dst == server1.ip4:
3520                 server1_n += 1
3521             elif p[IP].dst == server2.ip4:
3522                 server2_n += 1
3523             else:
3524                 server3_n += 1
3525         self.assertGreater(server1_n, 0)
3526         self.assertEqual(server2_n, 0)
3527         self.assertGreater(server3_n, 0)
3528
3529     # put zzz in front of syslog test name so that it runs as a last test
3530     # setting syslog sender cannot be undone and if it is set, it messes
3531     # with self.send_and_assert_no_replies functionality
3532     def test_zzz_syslog_sess(self):
3533         """NAT44ED Test syslog session creation and deletion"""
3534         self.vapi.syslog_set_filter(self.syslog_severity.SYSLOG_API_SEVERITY_INFO)
3535         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
3536
3537         self.nat_add_address(self.nat_addr)
3538         self.nat_add_inside_interface(self.pg0)
3539         self.nat_add_outside_interface(self.pg1)
3540
3541         p = (
3542             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3543             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3544             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
3545         )
3546         self.pg0.add_stream(p)
3547         self.pg_enable_capture(self.pg_interfaces)
3548         self.pg_start()
3549         capture = self.pg1.get_capture(1)
3550         self.tcp_port_out = capture[0][TCP].sport
3551         capture = self.pg3.get_capture(1)
3552         self.verify_syslog_sess(capture[0][Raw].load, "SADD")
3553
3554         self.pg_enable_capture(self.pg_interfaces)
3555         self.pg_start()
3556         self.nat_add_address(self.nat_addr, is_add=0)
3557         capture = self.pg3.get_capture(1)
3558         self.verify_syslog_sess(capture[0][Raw].load, "SDEL")
3559
3560     # put zzz in front of syslog test name so that it runs as a last test
3561     # setting syslog sender cannot be undone and if it is set, it messes
3562     # with self.send_and_assert_no_replies functionality
3563     def test_zzz_syslog_sess_reopen(self):
3564         """Syslog events for session reopen"""
3565         self.vapi.syslog_set_filter(self.syslog_severity.SYSLOG_API_SEVERITY_INFO)
3566         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
3567
3568         self.nat_add_address(self.nat_addr)
3569         self.nat_add_inside_interface(self.pg0)
3570         self.nat_add_outside_interface(self.pg1)
3571
3572         # SYN in2out
3573         p = (
3574             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3575             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3576             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
3577         )
3578         capture = self.send_and_expect(self.pg0, p, self.pg1)[0]
3579         self.tcp_port_out = capture[0][TCP].sport
3580         capture = self.pg3.get_capture(1)
3581         self.verify_syslog_sess(capture[0][Raw].load, "SADD")
3582
3583         # SYN out2in
3584         p = (
3585             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3586             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3587             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="SA")
3588         )
3589         self.send_and_expect(self.pg1, p, self.pg0)
3590
3591         p = (
3592             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3593             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3594             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")
3595         )
3596         self.send_and_expect(self.pg0, p, self.pg1)
3597
3598         # FIN in2out
3599         p = (
3600             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3601             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3602             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="F")
3603         )
3604         self.send_and_expect(self.pg0, p, self.pg1)
3605
3606         # FIN out2in
3607         p = (
3608             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3609             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3610             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="F")
3611         )
3612         self.send_and_expect(self.pg1, p, self.pg0)
3613
3614         self.init_tcp_session(
3615             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
3616         )
3617
3618         # 2 records should be produced - first one del & add
3619         capture = self.pg3.get_capture(2)
3620         self.verify_syslog_sess(capture[0][Raw].load, "SDEL")
3621         self.verify_syslog_sess(capture[1][Raw].load, "SADD")
3622
3623     def test_twice_nat_interface_addr(self):
3624         """NAT44ED Acquire twice NAT addresses from interface"""
3625         flags = self.config_flags.NAT_IS_TWICE_NAT
3626         self.vapi.nat44_add_del_interface_addr(
3627             sw_if_index=self.pg11.sw_if_index, flags=flags, is_add=1
3628         )
3629
3630         # no address in NAT pool
3631         adresses = self.vapi.nat44_address_dump()
3632         self.assertEqual(0, len(adresses))
3633
3634         # configure interface address and check NAT address pool
3635         self.pg11.config_ip4()
3636         adresses = self.vapi.nat44_address_dump()
3637         self.assertEqual(1, len(adresses))
3638         self.assertEqual(str(adresses[0].ip_address), self.pg11.local_ip4)
3639         self.assertEqual(adresses[0].flags, flags)
3640
3641         # remove interface address and check NAT address pool
3642         self.pg11.unconfig_ip4()
3643         adresses = self.vapi.nat44_address_dump()
3644         self.assertEqual(0, len(adresses))
3645
3646     def test_output_feature_stateful_acl(self):
3647         """NAT44ED output feature works with stateful ACL"""
3648
3649         self.nat_add_address(self.nat_addr)
3650         self.vapi.nat44_ed_add_del_output_interface(
3651             sw_if_index=self.pg1.sw_if_index, is_add=1
3652         )
3653
3654         # First ensure that the NAT is working sans ACL
3655
3656         # send packets out2in, no sessions yet so packets should drop
3657         pkts_out2in = self.create_stream_out(self.pg1)
3658         self.send_and_assert_no_replies(self.pg1, pkts_out2in)
3659
3660         # send packets into inside intf, ensure received via outside intf
3661         pkts_in2out = self.create_stream_in(self.pg0, self.pg1)
3662         capture = self.send_and_expect(
3663             self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)
3664         )
3665         self.verify_capture_out(capture, ignore_port=True)
3666
3667         # send out2in again, with sessions created it should work now
3668         pkts_out2in = self.create_stream_out(self.pg1)
3669         capture = self.send_and_expect(
3670             self.pg1, pkts_out2in, self.pg0, len(pkts_out2in)
3671         )
3672         self.verify_capture_in(capture, self.pg0)
3673
3674         # Create an ACL blocking everything
3675         out2in_deny_rule = AclRule(is_permit=0)
3676         out2in_acl = VppAcl(self, rules=[out2in_deny_rule])
3677         out2in_acl.add_vpp_config()
3678
3679         # create an ACL to permit/reflect everything
3680         in2out_reflect_rule = AclRule(is_permit=2)
3681         in2out_acl = VppAcl(self, rules=[in2out_reflect_rule])
3682         in2out_acl.add_vpp_config()
3683
3684         # apply as input acl on interface and confirm it blocks everything
3685         acl_if = VppAclInterface(
3686             self, sw_if_index=self.pg1.sw_if_index, n_input=1, acls=[out2in_acl]
3687         )
3688         acl_if.add_vpp_config()
3689         self.send_and_assert_no_replies(self.pg1, pkts_out2in)
3690
3691         # apply output acl
3692         acl_if.acls = [out2in_acl, in2out_acl]
3693         acl_if.add_vpp_config()
3694         # send in2out to generate ACL state (NAT state was created earlier)
3695         capture = self.send_and_expect(
3696             self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)
3697         )
3698         self.verify_capture_out(capture, ignore_port=True)
3699
3700         # send out2in again. ACL state exists so it should work now.
3701         # TCP packets with the syn flag set also need the ack flag
3702         for p in pkts_out2in:
3703             if p.haslayer(TCP) and p[TCP].flags & 0x02:
3704                 p[TCP].flags |= 0x10
3705         capture = self.send_and_expect(
3706             self.pg1, pkts_out2in, self.pg0, len(pkts_out2in)
3707         )
3708         self.verify_capture_in(capture, self.pg0)
3709         self.logger.info(self.vapi.cli("show trace"))
3710
3711     def test_tcp_close(self):
3712         """NAT44ED Close TCP session from inside network - output feature"""
3713         config = self.vapi.nat44_show_running_config()
3714         old_timeouts = config.timeouts
3715         new_transitory = 2
3716         self.vapi.nat_set_timeouts(
3717             udp=old_timeouts.udp,
3718             tcp_established=old_timeouts.tcp_established,
3719             icmp=old_timeouts.icmp,
3720             tcp_transitory=new_transitory,
3721         )
3722
3723         self.vapi.nat44_forwarding_enable_disable(enable=1)
3724         self.nat_add_address(self.pg1.local_ip4)
3725         twice_nat_addr = "10.0.1.3"
3726         service_ip = "192.168.16.150"
3727         self.nat_add_address(twice_nat_addr, twice_nat=1)
3728
3729         flags = self.config_flags.NAT_IS_INSIDE
3730         self.vapi.nat44_interface_add_del_feature(
3731             sw_if_index=self.pg0.sw_if_index, is_add=1
3732         )
3733         self.vapi.nat44_interface_add_del_feature(
3734             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3735         )
3736         self.vapi.nat44_ed_add_del_output_interface(
3737             is_add=1, sw_if_index=self.pg1.sw_if_index
3738         )
3739
3740         flags = (
3741             self.config_flags.NAT_IS_OUT2IN_ONLY | self.config_flags.NAT_IS_TWICE_NAT
3742         )
3743         self.nat_add_static_mapping(
3744             self.pg0.remote_ip4, service_ip, 80, 80, proto=IP_PROTOS.tcp, flags=flags
3745         )
3746         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3747         start_sessnum = len(sessions)
3748
3749         # SYN packet out->in
3750         p = (
3751             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3752             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3753             / TCP(sport=33898, dport=80, flags="S")
3754         )
3755         capture = self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3756         p = capture[0]
3757         tcp_port = p[TCP].sport
3758
3759         # SYN + ACK packet in->out
3760         p = (
3761             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3762             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3763             / TCP(sport=80, dport=tcp_port, flags="SA")
3764         )
3765         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3766
3767         # ACK packet out->in
3768         p = (
3769             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3770             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3771             / TCP(sport=33898, dport=80, flags="A")
3772         )
3773         self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3774
3775         # FIN packet in -> out
3776         p = (
3777             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3778             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3779             / TCP(sport=80, dport=tcp_port, flags="FA", seq=100, ack=300)
3780         )
3781         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3782
3783         # FIN+ACK packet out -> in
3784         p = (
3785             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3786             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3787             / TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101)
3788         )
3789         self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3790
3791         # ACK packet in -> out
3792         p = (
3793             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3794             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3795             / TCP(sport=80, dport=tcp_port, flags="A", seq=101, ack=301)
3796         )
3797         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3798
3799         # session now in transitory timeout, but traffic still flows
3800         # try FIN packet out->in
3801         p = (
3802             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3803             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3804             / TCP(sport=33898, dport=80, flags="F")
3805         )
3806         self.pg1.add_stream(p)
3807         self.pg_enable_capture(self.pg_interfaces)
3808         self.pg_start()
3809
3810         self.virtual_sleep(new_transitory, "wait for transitory timeout")
3811         self.pg0.get_capture(1)
3812
3813         # session should still exist
3814         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3815         self.assertEqual(len(sessions) - start_sessnum, 1)
3816
3817         # send FIN+ACK packet out -> in - will cause session to be wiped
3818         # but won't create a new session
3819         p = (
3820             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3821             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3822             / TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101)
3823         )
3824         self.send_and_assert_no_replies(self.pg1, p)
3825         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3826         self.assertEqual(len(sessions) - start_sessnum, 0)
3827
3828     def test_tcp_session_close_in(self):
3829         """NAT44ED Close TCP session from inside network"""
3830
3831         in_port = self.tcp_port_in
3832         out_port = 10505
3833         ext_port = self.tcp_external_port
3834
3835         self.nat_add_address(self.nat_addr)
3836         self.nat_add_inside_interface(self.pg0)
3837         self.nat_add_outside_interface(self.pg1)
3838         self.nat_add_static_mapping(
3839             self.pg0.remote_ip4,
3840             self.nat_addr,
3841             in_port,
3842             out_port,
3843             proto=IP_PROTOS.tcp,
3844             flags=self.config_flags.NAT_IS_TWICE_NAT,
3845         )
3846
3847         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3848         session_n = len(sessions)
3849
3850         self.vapi.nat_set_timeouts(
3851             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
3852         )
3853
3854         self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
3855
3856         # FIN packet in -> out
3857         p = (
3858             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3859             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3860             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
3861         )
3862         self.send_and_expect(self.pg0, p, self.pg1)
3863         pkts = []
3864
3865         # ACK packet out -> in
3866         p = (
3867             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3868             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3869             / TCP(sport=ext_port, dport=out_port, flags="A", seq=300, ack=101)
3870         )
3871         pkts.append(p)
3872
3873         # FIN packet out -> in
3874         p = (
3875             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3876             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3877             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
3878         )
3879         pkts.append(p)
3880
3881         self.send_and_expect(self.pg1, pkts, self.pg0)
3882
3883         # ACK packet in -> out
3884         p = (
3885             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3886             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3887             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3888         )
3889         self.send_and_expect(self.pg0, p, self.pg1)
3890
3891         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3892         self.assertEqual(len(sessions) - session_n, 1)
3893
3894         # retransmit FIN packet out -> in
3895         p = (
3896             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3897             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3898             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
3899         )
3900
3901         self.send_and_expect(self.pg1, p, self.pg0)
3902
3903         # retransmit ACK packet in -> out
3904         p = (
3905             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3906             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3907             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3908         )
3909         self.send_and_expect(self.pg0, p, self.pg1)
3910
3911         self.virtual_sleep(3)
3912         # retransmit ACK packet in -> out - this will cause session to be wiped
3913         p = (
3914             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3915             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3916             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3917         )
3918         self.send_and_assert_no_replies(self.pg0, p)
3919         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3920         self.assertEqual(len(sessions) - session_n, 0)
3921
3922     def test_tcp_session_close_out(self):
3923         """NAT44ED Close TCP session from outside network"""
3924
3925         in_port = self.tcp_port_in
3926         out_port = 10505
3927         ext_port = self.tcp_external_port
3928
3929         self.nat_add_address(self.nat_addr)
3930         self.nat_add_inside_interface(self.pg0)
3931         self.nat_add_outside_interface(self.pg1)
3932         self.nat_add_static_mapping(
3933             self.pg0.remote_ip4,
3934             self.nat_addr,
3935             in_port,
3936             out_port,
3937             proto=IP_PROTOS.tcp,
3938             flags=self.config_flags.NAT_IS_TWICE_NAT,
3939         )
3940
3941         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3942         session_n = len(sessions)
3943
3944         self.vapi.nat_set_timeouts(
3945             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
3946         )
3947
3948         _ = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
3949
3950         # FIN packet out -> in
3951         p = (
3952             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3953             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3954             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=100, ack=300)
3955         )
3956         self.pg1.add_stream(p)
3957         self.pg_enable_capture(self.pg_interfaces)
3958         self.pg_start()
3959         self.pg0.get_capture(1)
3960
3961         # FIN+ACK packet in -> out
3962         p = (
3963             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3964             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3965             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=300, ack=101)
3966         )
3967
3968         self.pg0.add_stream(p)
3969         self.pg_enable_capture(self.pg_interfaces)
3970         self.pg_start()
3971         self.pg1.get_capture(1)
3972
3973         # ACK packet out -> in
3974         p = (
3975             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3976             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3977             / TCP(sport=ext_port, dport=out_port, flags="A", seq=101, ack=301)
3978         )
3979         self.pg1.add_stream(p)
3980         self.pg_enable_capture(self.pg_interfaces)
3981         self.pg_start()
3982         self.pg0.get_capture(1)
3983
3984         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3985         self.assertEqual(len(sessions) - session_n, 1)
3986
3987         # retransmit FIN packet out -> in
3988         p = (
3989             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3990             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3991             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
3992         )
3993         self.send_and_expect(self.pg1, p, self.pg0)
3994
3995         # retransmit ACK packet in -> out
3996         p = (
3997             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3998             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3999             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4000         )
4001         self.send_and_expect(self.pg0, p, self.pg1)
4002
4003         self.virtual_sleep(3)
4004         # retransmit ACK packet in -> out - this will cause session to be wiped
4005         p = (
4006             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4007             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4008             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4009         )
4010         self.send_and_assert_no_replies(self.pg0, p)
4011         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4012         self.assertEqual(len(sessions) - session_n, 0)
4013
4014     def test_tcp_session_close_simultaneous(self):
4015         """Simultaneous TCP close from both sides"""
4016
4017         in_port = self.tcp_port_in
4018         ext_port = 10505
4019
4020         self.nat_add_address(self.nat_addr)
4021         self.nat_add_inside_interface(self.pg0)
4022         self.nat_add_outside_interface(self.pg1)
4023         self.nat_add_static_mapping(
4024             self.pg0.remote_ip4,
4025             self.nat_addr,
4026             in_port,
4027             ext_port,
4028             proto=IP_PROTOS.tcp,
4029             flags=self.config_flags.NAT_IS_TWICE_NAT,
4030         )
4031
4032         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4033         session_n = len(sessions)
4034
4035         self.vapi.nat_set_timeouts(
4036             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4037         )
4038
4039         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4040
4041         # FIN packet in -> out
4042         p = (
4043             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4044             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4045             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4046         )
4047         self.send_and_expect(self.pg0, p, self.pg1)
4048
4049         # FIN packet out -> in
4050         p = (
4051             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4052             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4053             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4054         )
4055         self.send_and_expect(self.pg1, p, self.pg0)
4056
4057         # ACK packet in -> out
4058         p = (
4059             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4060             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4061             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4062         )
4063         self.send_and_expect(self.pg0, p, self.pg1)
4064
4065         # ACK packet out -> in
4066         p = (
4067             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4068             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4069             / TCP(sport=ext_port, dport=out_port, flags="A", seq=301, ack=101)
4070         )
4071         self.send_and_expect(self.pg1, p, self.pg0)
4072
4073         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4074         self.assertEqual(len(sessions) - session_n, 1)
4075
4076         # retransmit FIN packet out -> in
4077         p = (
4078             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4079             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4080             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
4081         )
4082         self.send_and_expect(self.pg1, p, self.pg0)
4083
4084         # retransmit ACK packet in -> out
4085         p = (
4086             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4087             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4088             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4089         )
4090         self.send_and_expect(self.pg0, p, self.pg1)
4091
4092         self.virtual_sleep(3)
4093         # retransmit ACK packet in -> out - this will cause session to be wiped
4094         p = (
4095             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4096             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4097             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4098         )
4099         self.pg_send(self.pg0, p)
4100         self.send_and_assert_no_replies(self.pg0, p)
4101         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4102         self.assertEqual(len(sessions) - session_n, 0)
4103
4104     def test_tcp_session_half_reopen_inside(self):
4105         """TCP session in FIN/FIN state not reopened by in2out SYN only"""
4106         in_port = self.tcp_port_in
4107         ext_port = 10505
4108
4109         self.nat_add_address(self.nat_addr)
4110         self.nat_add_inside_interface(self.pg0)
4111         self.nat_add_outside_interface(self.pg1)
4112         self.nat_add_static_mapping(
4113             self.pg0.remote_ip4,
4114             self.nat_addr,
4115             in_port,
4116             ext_port,
4117             proto=IP_PROTOS.tcp,
4118             flags=self.config_flags.NAT_IS_TWICE_NAT,
4119         )
4120
4121         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4122         session_n = len(sessions)
4123
4124         self.vapi.nat_set_timeouts(
4125             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4126         )
4127
4128         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4129
4130         # FIN packet in -> out
4131         p = (
4132             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4133             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4134             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4135         )
4136         self.send_and_expect(self.pg0, p, self.pg1)
4137
4138         # FIN packet out -> in
4139         p = (
4140             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4141             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4142             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4143         )
4144         self.send_and_expect(self.pg1, p, self.pg0)
4145
4146         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4147         self.assertEqual(len(sessions) - session_n, 1)
4148
4149         # send SYN packet in -> out
4150         p = (
4151             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4152             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4153             / TCP(sport=in_port, dport=ext_port, flags="S", seq=101, ack=301)
4154         )
4155         self.send_and_expect(self.pg0, p, self.pg1)
4156
4157         self.virtual_sleep(3)
4158         # send ACK packet in -> out - session should be wiped
4159         p = (
4160             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4161             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4162             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4163         )
4164         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
4165         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4166         self.assertEqual(len(sessions) - session_n, 0)
4167
4168     def test_tcp_session_half_reopen_outside(self):
4169         """TCP session in FIN/FIN state not reopened by out2in SYN only"""
4170         in_port = self.tcp_port_in
4171         ext_port = 10505
4172
4173         self.nat_add_address(self.nat_addr)
4174         self.nat_add_inside_interface(self.pg0)
4175         self.nat_add_outside_interface(self.pg1)
4176         self.nat_add_static_mapping(
4177             self.pg0.remote_ip4,
4178             self.nat_addr,
4179             in_port,
4180             ext_port,
4181             proto=IP_PROTOS.tcp,
4182             flags=self.config_flags.NAT_IS_TWICE_NAT,
4183         )
4184
4185         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4186         session_n = len(sessions)
4187
4188         self.vapi.nat_set_timeouts(
4189             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4190         )
4191
4192         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4193
4194         # FIN packet in -> out
4195         p = (
4196             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4197             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4198             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4199         )
4200         self.send_and_expect(self.pg0, p, self.pg1)
4201
4202         # FIN packet out -> in
4203         p = (
4204             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4205             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4206             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4207         )
4208         self.send_and_expect(self.pg1, p, self.pg0)
4209
4210         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4211         self.assertEqual(len(sessions) - session_n, 1)
4212
4213         # send SYN packet out -> in
4214         p = (
4215             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4216             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4217             / TCP(sport=ext_port, dport=out_port, flags="S", seq=300, ack=101)
4218         )
4219         self.send_and_expect(self.pg1, p, self.pg0)
4220
4221         self.virtual_sleep(3)
4222         # send ACK packet in -> out - session should be wiped
4223         p = (
4224             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4225             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4226             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4227         )
4228         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
4229         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4230         self.assertEqual(len(sessions) - session_n, 0)
4231
4232     def test_tcp_session_reopen(self):
4233         """TCP session in FIN/FIN state reopened by SYN from both sides"""
4234         in_port = self.tcp_port_in
4235         ext_port = 10505
4236
4237         self.nat_add_address(self.nat_addr)
4238         self.nat_add_inside_interface(self.pg0)
4239         self.nat_add_outside_interface(self.pg1)
4240         self.nat_add_static_mapping(
4241             self.pg0.remote_ip4,
4242             self.nat_addr,
4243             in_port,
4244             ext_port,
4245             proto=IP_PROTOS.tcp,
4246             flags=self.config_flags.NAT_IS_TWICE_NAT,
4247         )
4248
4249         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4250         session_n = len(sessions)
4251
4252         self.vapi.nat_set_timeouts(
4253             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4254         )
4255
4256         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4257
4258         # FIN packet in -> out
4259         p = (
4260             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4261             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4262             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4263         )
4264         self.send_and_expect(self.pg0, p, self.pg1)
4265
4266         # FIN packet out -> in
4267         p = (
4268             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4269             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4270             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4271         )
4272         self.send_and_expect(self.pg1, p, self.pg0)
4273
4274         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4275         self.assertEqual(len(sessions) - session_n, 1)
4276
4277         # send SYN packet out -> in
4278         p = (
4279             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4280             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4281             / TCP(sport=ext_port, dport=out_port, flags="S", seq=300, ack=101)
4282         )
4283         self.send_and_expect(self.pg1, p, self.pg0)
4284
4285         # send SYN packet in -> out
4286         p = (
4287             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4288             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4289             / TCP(sport=in_port, dport=ext_port, flags="SA", seq=101, ack=301)
4290         )
4291         self.send_and_expect(self.pg0, p, self.pg1)
4292
4293         # send ACK packet out -> in
4294         p = (
4295             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4296             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4297             / TCP(sport=ext_port, dport=out_port, flags="A", seq=300, ack=101)
4298         )
4299         self.send_and_expect(self.pg1, p, self.pg0)
4300
4301         self.virtual_sleep(3)
4302         # send ACK packet in -> out - should be forwarded and session alive
4303         p = (
4304             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4305             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4306             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4307         )
4308         self.send_and_expect(self.pg0, p, self.pg1)
4309         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4310         self.assertEqual(len(sessions) - session_n, 1)
4311
4312     def test_dynamic_vrf(self):
4313         """NAT44ED dynamic translation test: different VRF"""
4314
4315         vrf_id_in = 33
4316         vrf_id_out = 34
4317
4318         self.nat_add_address(self.nat_addr, vrf_id=vrf_id_in)
4319
4320         try:
4321             self.configure_ip4_interface(self.pg7, table_id=vrf_id_in)
4322             self.configure_ip4_interface(self.pg8, table_id=vrf_id_out)
4323
4324             self.nat_add_inside_interface(self.pg7)
4325             self.nat_add_outside_interface(self.pg8)
4326
4327             # just basic stuff nothing special
4328             pkts = self.create_stream_in(self.pg7, self.pg8)
4329             self.pg7.add_stream(pkts)
4330             self.pg_enable_capture(self.pg_interfaces)
4331             self.pg_start()
4332             capture = self.pg8.get_capture(len(pkts))
4333             self.verify_capture_out(capture, ignore_port=True)
4334
4335             pkts = self.create_stream_out(self.pg8)
4336             self.pg8.add_stream(pkts)
4337             self.pg_enable_capture(self.pg_interfaces)
4338             self.pg_start()
4339             capture = self.pg7.get_capture(len(pkts))
4340             self.verify_capture_in(capture, self.pg7)
4341
4342         finally:
4343             self.pg7.unconfig()
4344             self.pg8.unconfig()
4345
4346             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id_in})
4347             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id_out})
4348
4349     def test_dynamic_output_feature_vrf(self):
4350         """NAT44ED dynamic translation test: output-feature, VRF"""
4351
4352         # other then default (0)
4353         new_vrf_id = 22
4354
4355         self.nat_add_address(self.nat_addr)
4356         self.vapi.nat44_ed_add_del_output_interface(
4357             sw_if_index=self.pg8.sw_if_index, is_add=1
4358         )
4359         try:
4360             self.configure_ip4_interface(self.pg7, table_id=new_vrf_id)
4361             self.configure_ip4_interface(self.pg8, table_id=new_vrf_id)
4362
4363             # in2out
4364             tcpn = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
4365             udpn = self.statistics["/nat44-ed/in2out/slowpath/udp"]
4366             icmpn = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
4367             drops = self.statistics["/nat44-ed/in2out/slowpath/drops"]
4368
4369             pkts = self.create_stream_in(self.pg7, self.pg8)
4370             self.pg7.add_stream(pkts)
4371             self.pg_enable_capture(self.pg_interfaces)
4372             self.pg_start()
4373             capture = self.pg8.get_capture(len(pkts))
4374             self.verify_capture_out(capture, ignore_port=True)
4375
4376             if_idx = self.pg8.sw_if_index
4377             cnt = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
4378             self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
4379             cnt = self.statistics["/nat44-ed/in2out/slowpath/udp"]
4380             self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
4381             cnt = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
4382             self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
4383             cnt = self.statistics["/nat44-ed/in2out/slowpath/drops"]
4384             self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
4385
4386             # out2in
4387             tcpn = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
4388             udpn = self.statistics["/nat44-ed/out2in/fastpath/udp"]
4389             icmpn = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
4390             drops = self.statistics["/nat44-ed/out2in/fastpath/drops"]
4391
4392             pkts = self.create_stream_out(self.pg8)
4393             self.pg8.add_stream(pkts)
4394             self.pg_enable_capture(self.pg_interfaces)
4395             self.pg_start()
4396             capture = self.pg7.get_capture(len(pkts))
4397             self.verify_capture_in(capture, self.pg7)
4398
4399             if_idx = self.pg8.sw_if_index
4400             cnt = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
4401             self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
4402             cnt = self.statistics["/nat44-ed/out2in/fastpath/udp"]
4403             self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
4404             cnt = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
4405             self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
4406             cnt = self.statistics["/nat44-ed/out2in/fastpath/drops"]
4407             self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
4408
4409             sessions = self.statistics["/nat44-ed/total-sessions"]
4410             self.assertEqual(sessions[:, 0].sum(), 3)
4411
4412         finally:
4413             self.pg7.unconfig()
4414             self.pg8.unconfig()
4415
4416             self.vapi.ip_table_add_del(is_add=0, table={"table_id": new_vrf_id})
4417
4418     def test_next_src_nat(self):
4419         """NAT44ED On way back forward packet to nat44-in2out node."""
4420
4421         twice_nat_addr = "10.0.1.3"
4422         external_port = 80
4423         local_port = 8080
4424         post_twice_nat_port = 0
4425
4426         self.vapi.nat44_forwarding_enable_disable(enable=1)
4427         self.nat_add_address(twice_nat_addr, twice_nat=1)
4428         flags = (
4429             self.config_flags.NAT_IS_OUT2IN_ONLY
4430             | self.config_flags.NAT_IS_SELF_TWICE_NAT
4431         )
4432         self.nat_add_static_mapping(
4433             self.pg6.remote_ip4,
4434             self.pg1.remote_ip4,
4435             local_port,
4436             external_port,
4437             proto=IP_PROTOS.tcp,
4438             vrf_id=1,
4439             flags=flags,
4440         )
4441         self.vapi.nat44_interface_add_del_feature(
4442             sw_if_index=self.pg6.sw_if_index, is_add=1
4443         )
4444
4445         p = (
4446             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
4447             / IP(src=self.pg6.remote_ip4, dst=self.pg1.remote_ip4)
4448             / TCP(sport=12345, dport=external_port)
4449         )
4450         self.pg6.add_stream(p)
4451         self.pg_enable_capture(self.pg_interfaces)
4452         self.pg_start()
4453         capture = self.pg6.get_capture(1)
4454         p = capture[0]
4455         try:
4456             ip = p[IP]
4457             tcp = p[TCP]
4458             self.assertEqual(ip.src, twice_nat_addr)
4459             self.assertNotEqual(tcp.sport, 12345)
4460             post_twice_nat_port = tcp.sport
4461             self.assertEqual(ip.dst, self.pg6.remote_ip4)
4462             self.assertEqual(tcp.dport, local_port)
4463             self.assert_packet_checksums_valid(p)
4464         except:
4465             self.logger.error(ppp("Unexpected or invalid packet:", p))
4466             raise
4467
4468         p = (
4469             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
4470             / IP(src=self.pg6.remote_ip4, dst=twice_nat_addr)
4471             / TCP(sport=local_port, dport=post_twice_nat_port)
4472         )
4473         self.pg6.add_stream(p)
4474         self.pg_enable_capture(self.pg_interfaces)
4475         self.pg_start()
4476         capture = self.pg6.get_capture(1)
4477         p = capture[0]
4478         try:
4479             ip = p[IP]
4480             tcp = p[TCP]
4481             self.assertEqual(ip.src, self.pg1.remote_ip4)
4482             self.assertEqual(tcp.sport, external_port)
4483             self.assertEqual(ip.dst, self.pg6.remote_ip4)
4484             self.assertEqual(tcp.dport, 12345)
4485             self.assert_packet_checksums_valid(p)
4486         except:
4487             self.logger.error(ppp("Unexpected or invalid packet:", p))
4488             raise
4489
4490     def test_one_armed_nat44_static(self):
4491         """NAT44ED One armed NAT and 1:1 NAPT asymmetrical rule"""
4492
4493         remote_host = self.pg4.remote_hosts[0]
4494         local_host = self.pg4.remote_hosts[1]
4495         external_port = 80
4496         local_port = 8080
4497         eh_port_in = 0
4498
4499         self.vapi.nat44_forwarding_enable_disable(enable=1)
4500         self.nat_add_address(self.nat_addr, twice_nat=1)
4501         flags = (
4502             self.config_flags.NAT_IS_OUT2IN_ONLY | self.config_flags.NAT_IS_TWICE_NAT
4503         )
4504         self.nat_add_static_mapping(
4505             local_host.ip4,
4506             self.nat_addr,
4507             local_port,
4508             external_port,
4509             proto=IP_PROTOS.tcp,
4510             flags=flags,
4511         )
4512         flags = self.config_flags.NAT_IS_INSIDE
4513         self.vapi.nat44_interface_add_del_feature(
4514             sw_if_index=self.pg4.sw_if_index, is_add=1
4515         )
4516         self.vapi.nat44_interface_add_del_feature(
4517             sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
4518         )
4519
4520         # from client to service
4521         p = (
4522             Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
4523             / IP(src=remote_host.ip4, dst=self.nat_addr)
4524             / TCP(sport=12345, dport=external_port)
4525         )
4526         self.pg4.add_stream(p)
4527         self.pg_enable_capture(self.pg_interfaces)
4528         self.pg_start()
4529         capture = self.pg4.get_capture(1)
4530         p = capture[0]
4531         try:
4532             ip = p[IP]
4533             tcp = p[TCP]
4534             self.assertEqual(ip.dst, local_host.ip4)
4535             self.assertEqual(ip.src, self.nat_addr)
4536             self.assertEqual(tcp.dport, local_port)
4537             self.assertNotEqual(tcp.sport, 12345)
4538             eh_port_in = tcp.sport
4539             self.assert_packet_checksums_valid(p)
4540         except:
4541             self.logger.error(ppp("Unexpected or invalid packet:", p))
4542             raise
4543
4544         # from service back to client
4545         p = (
4546             Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
4547             / IP(src=local_host.ip4, dst=self.nat_addr)
4548             / TCP(sport=local_port, dport=eh_port_in)
4549         )
4550         self.pg4.add_stream(p)
4551         self.pg_enable_capture(self.pg_interfaces)
4552         self.pg_start()
4553         capture = self.pg4.get_capture(1)
4554         p = capture[0]
4555         try:
4556             ip = p[IP]
4557             tcp = p[TCP]
4558             self.assertEqual(ip.src, self.nat_addr)
4559             self.assertEqual(ip.dst, remote_host.ip4)
4560             self.assertEqual(tcp.sport, external_port)
4561             self.assertEqual(tcp.dport, 12345)
4562             self.assert_packet_checksums_valid(p)
4563         except:
4564             self.logger.error(ppp("Unexpected or invalid packet:", p))
4565             raise
4566
4567     def test_icmp_error_fwd_outbound(self):
4568         """NAT44ED ICMP error outbound with forwarding enabled"""
4569
4570         # Ensure that an outbound ICMP error message is properly associated
4571         # with the inbound forward bypass session it is related to.
4572         payload = "H" * 10
4573
4574         self.nat_add_address(self.nat_addr)
4575         self.nat_add_inside_interface(self.pg0)
4576         self.nat_add_outside_interface(self.pg1)
4577
4578         # enable forwarding and initiate connection out2in
4579         self.vapi.nat44_forwarding_enable_disable(enable=1)
4580         p1 = (
4581             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4582             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
4583             / UDP(sport=21, dport=20)
4584             / payload
4585         )
4586
4587         self.pg1.add_stream(p1)
4588         self.pg_enable_capture(self.pg_interfaces)
4589         self.pg_start()
4590         capture = self.pg0.get_capture(1)[0]
4591
4592         self.logger.info(self.vapi.cli("show nat44 sessions"))
4593
4594         # reply with ICMP error message in2out
4595         # We cannot reliably retrieve forward bypass sessions via the API.
4596         # session dumps for a user will only look on the worker that the
4597         # user is supposed to be mapped to in2out. The forward bypass session
4598         # is not necessarily created on that worker.
4599         p2 = (
4600             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4601             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4602             / ICMP(type="dest-unreach", code="port-unreachable")
4603             / capture[IP:]
4604         )
4605
4606         self.pg0.add_stream(p2)
4607         self.pg_enable_capture(self.pg_interfaces)
4608         self.pg_start()
4609         capture = self.pg1.get_capture(1)[0]
4610
4611         self.logger.info(self.vapi.cli("show nat44 sessions"))
4612
4613         self.logger.info(ppp("p1 packet:", p1))
4614         self.logger.info(ppp("p2 packet:", p2))
4615         self.logger.info(ppp("capture packet:", capture))
4616
4617     def test_tcp_session_open_retransmit1(self):
4618         """NAT44ED Open TCP session with SYN,ACK retransmit 1
4619
4620         The client does not receive the [SYN,ACK] or the
4621         ACK from the client is lost. Therefore, the [SYN, ACK]
4622         is retransmitted by the server.
4623         """
4624
4625         in_port = self.tcp_port_in
4626         ext_port = self.tcp_external_port
4627         payload = "H" * 10
4628
4629         self.nat_add_address(self.nat_addr)
4630         self.nat_add_inside_interface(self.pg0)
4631         self.nat_add_outside_interface(self.pg1)
4632
4633         self.vapi.nat_set_timeouts(
4634             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
4635         )
4636         # SYN packet in->out
4637         p = (
4638             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4639             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4640             / TCP(sport=in_port, dport=ext_port, flags="S")
4641         )
4642         p = self.send_and_expect(self.pg0, p, self.pg1)[0]
4643         out_port = p[TCP].sport
4644
4645         # SYN + ACK packet out->in
4646         p = (
4647             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4648             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4649             / TCP(sport=ext_port, dport=out_port, flags="SA")
4650         )
4651         self.send_and_expect(self.pg1, p, self.pg0)
4652
4653         # ACK in->out does not arrive
4654
4655         # resent SYN + ACK packet out->in
4656         p = (
4657             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4658             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4659             / TCP(sport=ext_port, dport=out_port, flags="SA")
4660         )
4661         self.send_and_expect(self.pg1, p, self.pg0)
4662
4663         # ACK packet in->out
4664         p = (
4665             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4666             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4667             / TCP(sport=in_port, dport=ext_port, flags="A")
4668         )
4669         self.send_and_expect(self.pg0, p, self.pg1)
4670
4671         # Verify that the data can be transmitted after the transitory time
4672         self.virtual_sleep(6)
4673
4674         p = (
4675             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4676             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4677             / TCP(sport=in_port, dport=ext_port, flags="PA")
4678             / Raw(payload)
4679         )
4680         self.send_and_expect(self.pg0, p, self.pg1)
4681
4682     def test_tcp_session_open_retransmit2(self):
4683         """NAT44ED Open TCP session with SYN,ACK retransmit 2
4684
4685         The ACK is lost to the server after the TCP session is opened.
4686         Data is sent by the client, then the [SYN,ACK] is
4687         retransmitted by the server.
4688         """
4689
4690         in_port = self.tcp_port_in
4691         ext_port = self.tcp_external_port
4692         payload = "H" * 10
4693
4694         self.nat_add_address(self.nat_addr)
4695         self.nat_add_inside_interface(self.pg0)
4696         self.nat_add_outside_interface(self.pg1)
4697
4698         self.vapi.nat_set_timeouts(
4699             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
4700         )
4701         # SYN packet in->out
4702         p = (
4703             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4704             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4705             / TCP(sport=in_port, dport=ext_port, flags="S")
4706         )
4707         p = self.send_and_expect(self.pg0, p, self.pg1)[0]
4708         out_port = p[TCP].sport
4709
4710         # SYN + ACK packet out->in
4711         p = (
4712             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4713             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4714             / TCP(sport=ext_port, dport=out_port, flags="SA")
4715         )
4716         self.send_and_expect(self.pg1, p, self.pg0)
4717
4718         # ACK packet in->out -- not received by the server
4719         p = (
4720             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4721             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4722             / TCP(sport=in_port, dport=ext_port, flags="A")
4723         )
4724         self.send_and_expect(self.pg0, p, self.pg1)
4725
4726         # PUSH + ACK packet in->out
4727         p = (
4728             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4729             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4730             / TCP(sport=in_port, dport=ext_port, flags="PA")
4731             / Raw(payload)
4732         )
4733         self.send_and_expect(self.pg0, p, self.pg1)
4734
4735         # resent SYN + ACK packet out->in
4736         p = (
4737             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4738             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4739             / TCP(sport=ext_port, dport=out_port, flags="SA")
4740         )
4741         self.send_and_expect(self.pg1, p, self.pg0)
4742
4743         # resent ACK packet in->out
4744         p = (
4745             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4746             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4747             / TCP(sport=in_port, dport=ext_port, flags="A")
4748         )
4749         self.send_and_expect(self.pg0, p, self.pg1)
4750
4751         # resent PUSH + ACK packet in->out
4752         p = (
4753             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4754             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4755             / TCP(sport=in_port, dport=ext_port, flags="PA")
4756             / Raw(payload)
4757         )
4758         self.send_and_expect(self.pg0, p, self.pg1)
4759
4760         # ACK packet out->in
4761         p = (
4762             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4763             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4764             / TCP(sport=ext_port, dport=out_port, flags="A")
4765         )
4766         self.send_and_expect(self.pg1, p, self.pg0)
4767
4768         # Verify that the data can be transmitted after the transitory time
4769         self.virtual_sleep(6)
4770
4771         p = (
4772             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4773             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4774             / TCP(sport=in_port, dport=ext_port, flags="PA")
4775             / Raw(payload)
4776         )
4777         self.send_and_expect(self.pg0, p, self.pg1)
4778
4779
4780 if __name__ == "__main__":
4781     unittest.main(testRunner=VppTestRunner)