tests: replace pycodestyle with black
[vpp.git] / test / test_nat44_ed.py
1 #!/usr/bin/env python3
2
3 import unittest
4 from io import BytesIO
5 from random import randint, choice
6
7 import scapy.compat
8 from framework import VppTestCase, VppTestRunner
9 from scapy.data import IP_PROTOS
10 from scapy.layers.inet import IP, TCP, UDP, ICMP, GRE
11 from scapy.layers.inet import IPerror, TCPerror
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from syslog_rfc5424_parser import SyslogMessage, ParseError
15 from syslog_rfc5424_parser.constants import SyslogSeverity
16 from util import ppp, pr, ip4_range
17 from vpp_acl import AclRule, VppAcl, VppAclInterface
18 from vpp_ip_route import VppIpRoute, VppRoutePath
19 from vpp_papi import VppEnum
20 from util import StatsDiff
21
22
23 class TestNAT44ED(VppTestCase):
24     """NAT44ED Test Case"""
25
26     nat_addr = "10.0.10.3"
27
28     tcp_port_in = 6303
29     tcp_port_out = 6303
30
31     udp_port_in = 6304
32     udp_port_out = 6304
33
34     icmp_id_in = 6305
35     icmp_id_out = 6305
36
37     tcp_external_port = 80
38
39     max_sessions = 100
40
41     def setUp(self):
42         super().setUp()
43         self.plugin_enable()
44
45     def tearDown(self):
46         super().tearDown()
47         if not self.vpp_dead:
48             self.plugin_disable()
49
50     def plugin_enable(self):
51         self.vapi.nat44_ed_plugin_enable_disable(sessions=self.max_sessions, enable=1)
52
53     def plugin_disable(self):
54         self.vapi.nat44_ed_plugin_enable_disable(enable=0)
55
56     @property
57     def config_flags(self):
58         return VppEnum.vl_api_nat_config_flags_t
59
60     @property
61     def nat44_config_flags(self):
62         return VppEnum.vl_api_nat44_config_flags_t
63
64     @property
65     def syslog_severity(self):
66         return VppEnum.vl_api_syslog_severity_t
67
68     @property
69     def server_addr(self):
70         return self.pg1.remote_hosts[0].ip4
71
72     @staticmethod
73     def random_port():
74         return randint(1025, 65535)
75
76     @staticmethod
77     def proto2layer(proto):
78         if proto == IP_PROTOS.tcp:
79             return TCP
80         elif proto == IP_PROTOS.udp:
81             return UDP
82         elif proto == IP_PROTOS.icmp:
83             return ICMP
84         else:
85             raise Exception("Unsupported protocol")
86
87     @classmethod
88     def create_and_add_ip4_table(cls, i, table_id=0):
89         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": table_id})
90         i.set_table_ip4(table_id)
91
92     @classmethod
93     def configure_ip4_interface(cls, i, hosts=0, table_id=None):
94         if table_id:
95             cls.create_and_add_ip4_table(i, table_id)
96
97         i.admin_up()
98         i.config_ip4()
99         i.resolve_arp()
100
101         if hosts:
102             i.generate_remote_hosts(hosts)
103             i.configure_ipv4_neighbors()
104
105     @classmethod
106     def nat_add_interface_address(cls, i):
107         cls.vapi.nat44_add_del_interface_addr(sw_if_index=i.sw_if_index, is_add=1)
108
109     def nat_add_inside_interface(self, i):
110         self.vapi.nat44_interface_add_del_feature(
111             flags=self.config_flags.NAT_IS_INSIDE, sw_if_index=i.sw_if_index, is_add=1
112         )
113
114     def nat_add_outside_interface(self, i):
115         self.vapi.nat44_interface_add_del_feature(
116             flags=self.config_flags.NAT_IS_OUTSIDE, sw_if_index=i.sw_if_index, is_add=1
117         )
118
119     def nat_add_address(self, address, twice_nat=0, vrf_id=0xFFFFFFFF, is_add=1):
120         flags = self.config_flags.NAT_IS_TWICE_NAT if twice_nat else 0
121         self.vapi.nat44_add_del_address_range(
122             first_ip_address=address,
123             last_ip_address=address,
124             vrf_id=vrf_id,
125             is_add=is_add,
126             flags=flags,
127         )
128
129     def nat_add_static_mapping(
130         self,
131         local_ip,
132         external_ip="0.0.0.0",
133         local_port=0,
134         external_port=0,
135         vrf_id=0,
136         is_add=1,
137         external_sw_if_index=0xFFFFFFFF,
138         proto=0,
139         tag="",
140         flags=0,
141     ):
142
143         if not (local_port and external_port):
144             flags |= self.config_flags.NAT_IS_ADDR_ONLY
145
146         self.vapi.nat44_add_del_static_mapping(
147             is_add=is_add,
148             local_ip_address=local_ip,
149             external_ip_address=external_ip,
150             external_sw_if_index=external_sw_if_index,
151             local_port=local_port,
152             external_port=external_port,
153             vrf_id=vrf_id,
154             protocol=proto,
155             flags=flags,
156             tag=tag,
157         )
158
159     @classmethod
160     def setUpClass(cls):
161         super().setUpClass()
162
163         cls.create_pg_interfaces(range(12))
164         cls.interfaces = list(cls.pg_interfaces[:4])
165
166         cls.create_and_add_ip4_table(cls.pg2, 10)
167
168         for i in cls.interfaces:
169             cls.configure_ip4_interface(i, hosts=3)
170
171         # test specific (test-multiple-vrf)
172         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 1})
173
174         # test specific (test-one-armed-nat44-static)
175         cls.pg4.generate_remote_hosts(2)
176         cls.pg4.config_ip4()
177         cls.vapi.sw_interface_add_del_address(
178             sw_if_index=cls.pg4.sw_if_index, prefix="10.0.0.1/24"
179         )
180         cls.pg4.admin_up()
181         cls.pg4.resolve_arp()
182         cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
183         cls.pg4.resolve_arp()
184
185         # test specific interface (pg5)
186         cls.pg5._local_ip4 = "10.1.1.1"
187         cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
188         cls.pg5.set_table_ip4(1)
189         cls.pg5.config_ip4()
190         cls.pg5.admin_up()
191         cls.pg5.resolve_arp()
192
193         # test specific interface (pg6)
194         cls.pg6._local_ip4 = "10.1.2.1"
195         cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
196         cls.pg6.set_table_ip4(1)
197         cls.pg6.config_ip4()
198         cls.pg6.admin_up()
199         cls.pg6.resolve_arp()
200
201         rl = list()
202
203         rl.append(
204             VppIpRoute(
205                 cls,
206                 "0.0.0.0",
207                 0,
208                 [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=0)],
209                 register=False,
210                 table_id=1,
211             )
212         )
213         rl.append(
214             VppIpRoute(
215                 cls,
216                 "0.0.0.0",
217                 0,
218                 [VppRoutePath(cls.pg1.local_ip4, cls.pg1.sw_if_index)],
219                 register=False,
220             )
221         )
222         rl.append(
223             VppIpRoute(
224                 cls,
225                 cls.pg5.remote_ip4,
226                 32,
227                 [VppRoutePath("0.0.0.0", cls.pg5.sw_if_index)],
228                 register=False,
229                 table_id=1,
230             )
231         )
232         rl.append(
233             VppIpRoute(
234                 cls,
235                 cls.pg6.remote_ip4,
236                 32,
237                 [VppRoutePath("0.0.0.0", cls.pg6.sw_if_index)],
238                 register=False,
239                 table_id=1,
240             )
241         )
242         rl.append(
243             VppIpRoute(
244                 cls,
245                 cls.pg6.remote_ip4,
246                 16,
247                 [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=1)],
248                 register=False,
249                 table_id=0,
250             )
251         )
252
253         for r in rl:
254             r.add_vpp_config()
255
256         cls.no_diff = StatsDiff(
257             {
258                 pg.sw_if_index: {
259                     "/nat44-ed/in2out/fastpath/tcp": 0,
260                     "/nat44-ed/in2out/fastpath/udp": 0,
261                     "/nat44-ed/in2out/fastpath/icmp": 0,
262                     "/nat44-ed/in2out/fastpath/drops": 0,
263                     "/nat44-ed/in2out/slowpath/tcp": 0,
264                     "/nat44-ed/in2out/slowpath/udp": 0,
265                     "/nat44-ed/in2out/slowpath/icmp": 0,
266                     "/nat44-ed/in2out/slowpath/drops": 0,
267                     "/nat44-ed/in2out/fastpath/tcp": 0,
268                     "/nat44-ed/in2out/fastpath/udp": 0,
269                     "/nat44-ed/in2out/fastpath/icmp": 0,
270                     "/nat44-ed/in2out/fastpath/drops": 0,
271                     "/nat44-ed/in2out/slowpath/tcp": 0,
272                     "/nat44-ed/in2out/slowpath/udp": 0,
273                     "/nat44-ed/in2out/slowpath/icmp": 0,
274                     "/nat44-ed/in2out/slowpath/drops": 0,
275                 }
276                 for pg in cls.pg_interfaces
277             }
278         )
279
280     def get_err_counter(self, path):
281         return self.statistics.get_err_counter(path)
282
283     def reass_hairpinning(
284         self,
285         server_addr,
286         server_in_port,
287         server_out_port,
288         host_in_port,
289         proto=IP_PROTOS.tcp,
290         ignore_port=False,
291     ):
292         layer = self.proto2layer(proto)
293
294         if proto == IP_PROTOS.tcp:
295             data = b"A" * 4 + b"B" * 16 + b"C" * 3
296         else:
297             data = b"A" * 16 + b"B" * 16 + b"C" * 3
298
299         # send packet from host to server
300         pkts = self.create_stream_frag(
301             self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto
302         )
303         self.pg0.add_stream(pkts)
304         self.pg_enable_capture(self.pg_interfaces)
305         self.pg_start()
306         frags = self.pg0.get_capture(len(pkts))
307         p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr)
308         if proto != IP_PROTOS.icmp:
309             if not ignore_port:
310                 self.assertNotEqual(p[layer].sport, host_in_port)
311             self.assertEqual(p[layer].dport, server_in_port)
312         else:
313             if not ignore_port:
314                 self.assertNotEqual(p[layer].id, host_in_port)
315         self.assertEqual(data, p[Raw].load)
316
317     def frag_out_of_order(
318         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
319     ):
320         layer = self.proto2layer(proto)
321
322         if proto == IP_PROTOS.tcp:
323             data = b"A" * 4 + b"B" * 16 + b"C" * 3
324         else:
325             data = b"A" * 16 + b"B" * 16 + b"C" * 3
326         self.port_in = self.random_port()
327
328         for i in range(2):
329             # in2out
330             pkts = self.create_stream_frag(
331                 self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
332             )
333             pkts.reverse()
334             self.pg0.add_stream(pkts)
335             self.pg_enable_capture(self.pg_interfaces)
336             self.pg_start()
337             frags = self.pg1.get_capture(len(pkts))
338             if not dont_translate:
339                 p = self.reass_frags_and_verify(
340                     frags, self.nat_addr, self.pg1.remote_ip4
341                 )
342             else:
343                 p = self.reass_frags_and_verify(
344                     frags, self.pg0.remote_ip4, self.pg1.remote_ip4
345                 )
346             if proto != IP_PROTOS.icmp:
347                 if not dont_translate:
348                     self.assertEqual(p[layer].dport, 20)
349                     if not ignore_port:
350                         self.assertNotEqual(p[layer].sport, self.port_in)
351                 else:
352                     self.assertEqual(p[layer].sport, self.port_in)
353             else:
354                 if not ignore_port:
355                     if not dont_translate:
356                         self.assertNotEqual(p[layer].id, self.port_in)
357                     else:
358                         self.assertEqual(p[layer].id, self.port_in)
359             self.assertEqual(data, p[Raw].load)
360
361             # out2in
362             if not dont_translate:
363                 dst_addr = self.nat_addr
364             else:
365                 dst_addr = self.pg0.remote_ip4
366             if proto != IP_PROTOS.icmp:
367                 sport = 20
368                 dport = p[layer].sport
369             else:
370                 sport = p[layer].id
371                 dport = 0
372             pkts = self.create_stream_frag(
373                 self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
374             )
375             pkts.reverse()
376             self.pg1.add_stream(pkts)
377             self.pg_enable_capture(self.pg_interfaces)
378             self.logger.info(self.vapi.cli("show trace"))
379             self.pg_start()
380             frags = self.pg0.get_capture(len(pkts))
381             p = self.reass_frags_and_verify(
382                 frags, self.pg1.remote_ip4, self.pg0.remote_ip4
383             )
384             if proto != IP_PROTOS.icmp:
385                 self.assertEqual(p[layer].sport, 20)
386                 self.assertEqual(p[layer].dport, self.port_in)
387             else:
388                 self.assertEqual(p[layer].id, self.port_in)
389             self.assertEqual(data, p[Raw].load)
390
391     def reass_frags_and_verify(self, frags, src, dst):
392         buffer = BytesIO()
393         for p in frags:
394             self.assertEqual(p[IP].src, src)
395             self.assertEqual(p[IP].dst, dst)
396             self.assert_ip_checksum_valid(p)
397             buffer.seek(p[IP].frag * 8)
398             buffer.write(bytes(p[IP].payload))
399         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
400         if ip.proto == IP_PROTOS.tcp:
401             p = ip / TCP(buffer.getvalue())
402             self.logger.debug(ppp("Reassembled:", p))
403             self.assert_tcp_checksum_valid(p)
404         elif ip.proto == IP_PROTOS.udp:
405             p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
406         elif ip.proto == IP_PROTOS.icmp:
407             p = ip / ICMP(buffer.getvalue())
408         return p
409
410     def frag_in_order(
411         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
412     ):
413         layer = self.proto2layer(proto)
414
415         if proto == IP_PROTOS.tcp:
416             data = b"A" * 4 + b"B" * 16 + b"C" * 3
417         else:
418             data = b"A" * 16 + b"B" * 16 + b"C" * 3
419         self.port_in = self.random_port()
420
421         # in2out
422         pkts = self.create_stream_frag(
423             self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
424         )
425         self.pg0.add_stream(pkts)
426         self.pg_enable_capture(self.pg_interfaces)
427         self.pg_start()
428         frags = self.pg1.get_capture(len(pkts))
429         if not dont_translate:
430             p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
431         else:
432             p = self.reass_frags_and_verify(
433                 frags, self.pg0.remote_ip4, self.pg1.remote_ip4
434             )
435         if proto != IP_PROTOS.icmp:
436             if not dont_translate:
437                 self.assertEqual(p[layer].dport, 20)
438                 if not ignore_port:
439                     self.assertNotEqual(p[layer].sport, self.port_in)
440             else:
441                 self.assertEqual(p[layer].sport, self.port_in)
442         else:
443             if not ignore_port:
444                 if not dont_translate:
445                     self.assertNotEqual(p[layer].id, self.port_in)
446                 else:
447                     self.assertEqual(p[layer].id, self.port_in)
448         self.assertEqual(data, p[Raw].load)
449
450         # out2in
451         if not dont_translate:
452             dst_addr = self.nat_addr
453         else:
454             dst_addr = self.pg0.remote_ip4
455         if proto != IP_PROTOS.icmp:
456             sport = 20
457             dport = p[layer].sport
458         else:
459             sport = p[layer].id
460             dport = 0
461         pkts = self.create_stream_frag(
462             self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
463         )
464         self.pg1.add_stream(pkts)
465         self.pg_enable_capture(self.pg_interfaces)
466         self.pg_start()
467         frags = self.pg0.get_capture(len(pkts))
468         p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
469         if proto != IP_PROTOS.icmp:
470             self.assertEqual(p[layer].sport, 20)
471             self.assertEqual(p[layer].dport, self.port_in)
472         else:
473             self.assertEqual(p[layer].id, self.port_in)
474         self.assertEqual(data, p[Raw].load)
475
476     def verify_capture_out(
477         self, capture, nat_ip=None, same_port=False, dst_ip=None, ignore_port=False
478     ):
479         if nat_ip is None:
480             nat_ip = self.nat_addr
481         for packet in capture:
482             try:
483                 self.assert_packet_checksums_valid(packet)
484                 self.assertEqual(packet[IP].src, nat_ip)
485                 if dst_ip is not None:
486                     self.assertEqual(packet[IP].dst, dst_ip)
487                 if packet.haslayer(TCP):
488                     if not ignore_port:
489                         if same_port:
490                             self.assertEqual(packet[TCP].sport, self.tcp_port_in)
491                         else:
492                             self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
493                     self.tcp_port_out = packet[TCP].sport
494                     self.assert_packet_checksums_valid(packet)
495                 elif packet.haslayer(UDP):
496                     if not ignore_port:
497                         if same_port:
498                             self.assertEqual(packet[UDP].sport, self.udp_port_in)
499                         else:
500                             self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
501                     self.udp_port_out = packet[UDP].sport
502                 else:
503                     if not ignore_port:
504                         if same_port:
505                             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
506                         else:
507                             self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
508                     self.icmp_id_out = packet[ICMP].id
509                     self.assert_packet_checksums_valid(packet)
510             except:
511                 self.logger.error(
512                     ppp("Unexpected or invalid packet (outside network):", packet)
513                 )
514                 raise
515
516     def verify_capture_in(self, capture, in_if):
517         for packet in capture:
518             try:
519                 self.assert_packet_checksums_valid(packet)
520                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
521                 if packet.haslayer(TCP):
522                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
523                 elif packet.haslayer(UDP):
524                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
525                 else:
526                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
527             except:
528                 self.logger.error(
529                     ppp("Unexpected or invalid packet (inside network):", packet)
530                 )
531                 raise
532
533     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
534         if dst_ip is None:
535             dst_ip = out_if.remote_ip4
536
537         pkts = []
538         # TCP
539         p = (
540             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
541             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
542             / TCP(sport=self.tcp_port_in, dport=20)
543         )
544         pkts.extend([p, p])
545
546         # UDP
547         p = (
548             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
549             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
550             / UDP(sport=self.udp_port_in, dport=20)
551         )
552         pkts.append(p)
553
554         # ICMP
555         p = (
556             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
557             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
558             / ICMP(id=self.icmp_id_in, type="echo-request")
559         )
560         pkts.append(p)
561
562         return pkts
563
564     def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
565         if dst_ip is None:
566             dst_ip = self.nat_addr
567         if not use_inside_ports:
568             tcp_port = self.tcp_port_out
569             udp_port = self.udp_port_out
570             icmp_id = self.icmp_id_out
571         else:
572             tcp_port = self.tcp_port_in
573             udp_port = self.udp_port_in
574             icmp_id = self.icmp_id_in
575         pkts = []
576         # TCP
577         p = (
578             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
579             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
580             / TCP(dport=tcp_port, sport=20)
581         )
582         pkts.extend([p, p])
583
584         # UDP
585         p = (
586             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
587             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
588             / UDP(dport=udp_port, sport=20)
589         )
590         pkts.append(p)
591
592         # ICMP
593         p = (
594             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
595             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
596             / ICMP(id=icmp_id, type="echo-reply")
597         )
598         pkts.append(p)
599
600         return pkts
601
602     def create_tcp_stream(self, in_if, out_if, count):
603         pkts = []
604         port = 6303
605
606         for i in range(count):
607             p = (
608                 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
609                 / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64)
610                 / TCP(sport=port + i, dport=20)
611             )
612             pkts.append(p)
613
614         return pkts
615
616     def create_stream_frag(
617         self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
618     ):
619         if proto == IP_PROTOS.tcp:
620             p = (
621                 IP(src=src_if.remote_ip4, dst=dst)
622                 / TCP(sport=sport, dport=dport)
623                 / Raw(data)
624             )
625             p = p.__class__(scapy.compat.raw(p))
626             chksum = p[TCP].chksum
627             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
628         elif proto == IP_PROTOS.udp:
629             proto_header = UDP(sport=sport, dport=dport)
630         elif proto == IP_PROTOS.icmp:
631             if not echo_reply:
632                 proto_header = ICMP(id=sport, type="echo-request")
633             else:
634                 proto_header = ICMP(id=sport, type="echo-reply")
635         else:
636             raise Exception("Unsupported protocol")
637         id = self.random_port()
638         pkts = []
639         if proto == IP_PROTOS.tcp:
640             raw = Raw(data[0:4])
641         else:
642             raw = Raw(data[0:16])
643         p = (
644             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
645             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
646             / proto_header
647             / raw
648         )
649         pkts.append(p)
650         if proto == IP_PROTOS.tcp:
651             raw = Raw(data[4:20])
652         else:
653             raw = Raw(data[16:32])
654         p = (
655             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
656             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
657             / raw
658         )
659         pkts.append(p)
660         if proto == IP_PROTOS.tcp:
661             raw = Raw(data[20:])
662         else:
663             raw = Raw(data[32:])
664         p = (
665             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
666             / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
667             / raw
668         )
669         pkts.append(p)
670         return pkts
671
672     def frag_in_order_in_plus_out(
673         self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
674     ):
675
676         layer = self.proto2layer(proto)
677
678         if proto == IP_PROTOS.tcp:
679             data = b"A" * 4 + b"B" * 16 + b"C" * 3
680         else:
681             data = b"A" * 16 + b"B" * 16 + b"C" * 3
682         port_in = self.random_port()
683
684         for i in range(2):
685             # out2in
686             pkts = self.create_stream_frag(
687                 self.pg0, out_addr, port_in, out_port, data, proto
688             )
689             self.pg0.add_stream(pkts)
690             self.pg_enable_capture(self.pg_interfaces)
691             self.pg_start()
692             frags = self.pg1.get_capture(len(pkts))
693             p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr)
694             if proto != IP_PROTOS.icmp:
695                 self.assertEqual(p[layer].sport, port_in)
696                 self.assertEqual(p[layer].dport, in_port)
697             else:
698                 self.assertEqual(p[layer].id, port_in)
699             self.assertEqual(data, p[Raw].load)
700
701             # in2out
702             if proto != IP_PROTOS.icmp:
703                 pkts = self.create_stream_frag(
704                     self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto
705                 )
706             else:
707                 pkts = self.create_stream_frag(
708                     self.pg1,
709                     self.pg0.remote_ip4,
710                     p[layer].id,
711                     0,
712                     data,
713                     proto,
714                     echo_reply=True,
715                 )
716             self.pg1.add_stream(pkts)
717             self.pg_enable_capture(self.pg_interfaces)
718             self.pg_start()
719             frags = self.pg0.get_capture(len(pkts))
720             p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4)
721             if proto != IP_PROTOS.icmp:
722                 self.assertEqual(p[layer].sport, out_port)
723                 self.assertEqual(p[layer].dport, port_in)
724             else:
725                 self.assertEqual(p[layer].id, port_in)
726             self.assertEqual(data, p[Raw].load)
727
728     def frag_out_of_order_in_plus_out(
729         self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
730     ):
731
732         layer = self.proto2layer(proto)
733
734         if proto == IP_PROTOS.tcp:
735             data = b"A" * 4 + b"B" * 16 + b"C" * 3
736         else:
737             data = b"A" * 16 + b"B" * 16 + b"C" * 3
738         port_in = self.random_port()
739
740         for i in range(2):
741             # out2in
742             pkts = self.create_stream_frag(
743                 self.pg0, out_addr, port_in, out_port, data, proto
744             )
745             pkts.reverse()
746             self.pg0.add_stream(pkts)
747             self.pg_enable_capture(self.pg_interfaces)
748             self.pg_start()
749             frags = self.pg1.get_capture(len(pkts))
750             p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr)
751             if proto != IP_PROTOS.icmp:
752                 self.assertEqual(p[layer].dport, in_port)
753                 self.assertEqual(p[layer].sport, port_in)
754                 self.assertEqual(p[layer].dport, in_port)
755             else:
756                 self.assertEqual(p[layer].id, port_in)
757             self.assertEqual(data, p[Raw].load)
758
759             # in2out
760             if proto != IP_PROTOS.icmp:
761                 pkts = self.create_stream_frag(
762                     self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto
763                 )
764             else:
765                 pkts = self.create_stream_frag(
766                     self.pg1,
767                     self.pg0.remote_ip4,
768                     p[layer].id,
769                     0,
770                     data,
771                     proto,
772                     echo_reply=True,
773                 )
774             pkts.reverse()
775             self.pg1.add_stream(pkts)
776             self.pg_enable_capture(self.pg_interfaces)
777             self.pg_start()
778             frags = self.pg0.get_capture(len(pkts))
779             p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4)
780             if proto != IP_PROTOS.icmp:
781                 self.assertEqual(p[layer].sport, out_port)
782                 self.assertEqual(p[layer].dport, port_in)
783             else:
784                 self.assertEqual(p[layer].id, port_in)
785             self.assertEqual(data, p[Raw].load)
786
787     def init_tcp_session(self, in_if, out_if, in_port, ext_port):
788         # SYN packet in->out
789         p = (
790             Ether(src=in_if.remote_mac, dst=in_if.local_mac)
791             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
792             / TCP(sport=in_port, dport=ext_port, flags="S")
793         )
794         in_if.add_stream(p)
795         self.pg_enable_capture(self.pg_interfaces)
796         self.pg_start()
797         capture = out_if.get_capture(1)
798         p = capture[0]
799         out_port = p[TCP].sport
800
801         # SYN + ACK packet out->in
802         p = (
803             Ether(src=out_if.remote_mac, dst=out_if.local_mac)
804             / IP(src=out_if.remote_ip4, dst=self.nat_addr)
805             / TCP(sport=ext_port, dport=out_port, flags="SA")
806         )
807         out_if.add_stream(p)
808         self.pg_enable_capture(self.pg_interfaces)
809         self.pg_start()
810         in_if.get_capture(1)
811
812         # ACK packet in->out
813         p = (
814             Ether(src=in_if.remote_mac, dst=in_if.local_mac)
815             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
816             / TCP(sport=in_port, dport=ext_port, flags="A")
817         )
818         in_if.add_stream(p)
819         self.pg_enable_capture(self.pg_interfaces)
820         self.pg_start()
821         out_if.get_capture(1)
822
823         return out_port
824
825     def twice_nat_common(
826         self, self_twice_nat=False, same_pg=False, lb=False, client_id=None
827     ):
828         twice_nat_addr = "10.0.1.3"
829
830         port_in = 8080
831         if lb:
832             if not same_pg:
833                 port_in1 = port_in
834                 port_in2 = port_in
835             else:
836                 port_in1 = port_in + 1
837                 port_in2 = port_in + 2
838
839         port_out = 80
840         eh_port_out = 4567
841
842         server1 = self.pg0.remote_hosts[0]
843         server2 = self.pg0.remote_hosts[1]
844         if lb and same_pg:
845             server2 = server1
846         if not lb:
847             server = server1
848
849         pg0 = self.pg0
850         if same_pg:
851             pg1 = self.pg0
852         else:
853             pg1 = self.pg1
854
855         eh_translate = (not self_twice_nat) or (not lb and same_pg) or client_id == 1
856
857         self.nat_add_address(self.nat_addr)
858         self.nat_add_address(twice_nat_addr, twice_nat=1)
859
860         flags = 0
861         if self_twice_nat:
862             flags |= self.config_flags.NAT_IS_SELF_TWICE_NAT
863         else:
864             flags |= self.config_flags.NAT_IS_TWICE_NAT
865
866         if not lb:
867             self.nat_add_static_mapping(
868                 pg0.remote_ip4,
869                 self.nat_addr,
870                 port_in,
871                 port_out,
872                 proto=IP_PROTOS.tcp,
873                 flags=flags,
874             )
875         else:
876             locals = [
877                 {"addr": server1.ip4, "port": port_in1, "probability": 50, "vrf_id": 0},
878                 {"addr": server2.ip4, "port": port_in2, "probability": 50, "vrf_id": 0},
879             ]
880             out_addr = self.nat_addr
881
882             self.vapi.nat44_add_del_lb_static_mapping(
883                 is_add=1,
884                 flags=flags,
885                 external_addr=out_addr,
886                 external_port=port_out,
887                 protocol=IP_PROTOS.tcp,
888                 local_num=len(locals),
889                 locals=locals,
890             )
891         self.nat_add_inside_interface(pg0)
892         self.nat_add_outside_interface(pg1)
893
894         if same_pg:
895             if not lb:
896                 client = server
897             else:
898                 assert client_id is not None
899                 if client_id == 1:
900                     client = self.pg0.remote_hosts[0]
901                 elif client_id == 2:
902                     client = self.pg0.remote_hosts[1]
903         else:
904             client = pg1.remote_hosts[0]
905         p = (
906             Ether(src=pg1.remote_mac, dst=pg1.local_mac)
907             / IP(src=client.ip4, dst=self.nat_addr)
908             / TCP(sport=eh_port_out, dport=port_out)
909         )
910         pg1.add_stream(p)
911         self.pg_enable_capture(self.pg_interfaces)
912         self.pg_start()
913         capture = pg0.get_capture(1)
914         p = capture[0]
915         try:
916             ip = p[IP]
917             tcp = p[TCP]
918             if lb:
919                 if ip.dst == server1.ip4:
920                     server = server1
921                     port_in = port_in1
922                 else:
923                     server = server2
924                     port_in = port_in2
925             self.assertEqual(ip.dst, server.ip4)
926             if lb and same_pg:
927                 self.assertIn(tcp.dport, [port_in1, port_in2])
928             else:
929                 self.assertEqual(tcp.dport, port_in)
930             if eh_translate:
931                 self.assertEqual(ip.src, twice_nat_addr)
932                 self.assertNotEqual(tcp.sport, eh_port_out)
933             else:
934                 self.assertEqual(ip.src, client.ip4)
935                 self.assertEqual(tcp.sport, eh_port_out)
936             eh_addr_in = ip.src
937             eh_port_in = tcp.sport
938             saved_port_in = tcp.dport
939             self.assert_packet_checksums_valid(p)
940         except:
941             self.logger.error(ppp("Unexpected or invalid packet:", p))
942             raise
943
944         p = (
945             Ether(src=server.mac, dst=pg0.local_mac)
946             / IP(src=server.ip4, dst=eh_addr_in)
947             / TCP(sport=saved_port_in, dport=eh_port_in)
948         )
949         pg0.add_stream(p)
950         self.pg_enable_capture(self.pg_interfaces)
951         self.pg_start()
952         capture = pg1.get_capture(1)
953         p = capture[0]
954         try:
955             ip = p[IP]
956             tcp = p[TCP]
957             self.assertEqual(ip.dst, client.ip4)
958             self.assertEqual(ip.src, self.nat_addr)
959             self.assertEqual(tcp.dport, eh_port_out)
960             self.assertEqual(tcp.sport, port_out)
961             self.assert_packet_checksums_valid(p)
962         except:
963             self.logger.error(ppp("Unexpected or invalid packet:", p))
964             raise
965
966         if eh_translate:
967             sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
968             self.assertEqual(len(sessions), 1)
969             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
970             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_TWICE_NAT)
971             self.logger.info(self.vapi.cli("show nat44 sessions"))
972             self.vapi.nat44_del_session(
973                 address=sessions[0].inside_ip_address,
974                 port=sessions[0].inside_port,
975                 protocol=sessions[0].protocol,
976                 flags=(
977                     self.config_flags.NAT_IS_INSIDE
978                     | self.config_flags.NAT_IS_EXT_HOST_VALID
979                 ),
980                 ext_host_address=sessions[0].ext_host_nat_address,
981                 ext_host_port=sessions[0].ext_host_nat_port,
982             )
983             sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
984             self.assertEqual(len(sessions), 0)
985
986     def verify_syslog_sess(self, data, msgid, is_ip6=False):
987         message = data.decode("utf-8")
988         try:
989             message = SyslogMessage.parse(message)
990         except ParseError as e:
991             self.logger.error(e)
992             raise
993         else:
994             self.assertEqual(message.severity, SyslogSeverity.info)
995             self.assertEqual(message.appname, "NAT")
996             self.assertEqual(message.msgid, msgid)
997             sd_params = message.sd.get("nsess")
998             self.assertTrue(sd_params is not None)
999             if is_ip6:
1000                 self.assertEqual(sd_params.get("IATYP"), "IPv6")
1001                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip6)
1002             else:
1003                 self.assertEqual(sd_params.get("IATYP"), "IPv4")
1004                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
1005                 self.assertTrue(sd_params.get("SSUBIX") is not None)
1006             self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
1007             self.assertEqual(sd_params.get("XATYP"), "IPv4")
1008             self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
1009             self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
1010             self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
1011             self.assertEqual(sd_params.get("SVLAN"), "0")
1012             self.assertEqual(sd_params.get("XDADDR"), self.pg1.remote_ip4)
1013             self.assertEqual(sd_params.get("XDPORT"), "%d" % self.tcp_external_port)
1014
1015     def test_icmp_error(self):
1016         """NAT44ED test ICMP error message with inner header"""
1017
1018         payload = "H" * 10
1019
1020         self.nat_add_address(self.nat_addr)
1021         self.nat_add_inside_interface(self.pg0)
1022         self.nat_add_outside_interface(self.pg1)
1023
1024         # in2out (initiate connection)
1025         p1 = [
1026             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1027             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1028             / UDP(sport=21, dport=20)
1029             / payload,
1030             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1031             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1032             / TCP(sport=21, dport=20, flags="S")
1033             / payload,
1034             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1035             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1036             / ICMP(type="echo-request", id=7777)
1037             / payload,
1038         ]
1039
1040         capture = self.send_and_expect(self.pg0, p1, self.pg1)
1041
1042         # out2in (send error message)
1043         p2 = [
1044             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1045             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1046             / ICMP(type="dest-unreach", code="port-unreachable")
1047             / c[IP:]
1048             for c in capture
1049         ]
1050
1051         capture = self.send_and_expect(self.pg1, p2, self.pg0)
1052
1053         for c in capture:
1054             try:
1055                 assert c[IP].dst == self.pg0.remote_ip4
1056                 assert c[IPerror].src == self.pg0.remote_ip4
1057             except AssertionError as a:
1058                 raise AssertionError(f"Packet {pr(c)} not translated properly") from a
1059
1060     def test_icmp_echo_reply_trailer(self):
1061         """ICMP echo reply with ethernet trailer"""
1062
1063         self.nat_add_address(self.nat_addr)
1064         self.nat_add_inside_interface(self.pg0)
1065         self.nat_add_outside_interface(self.pg1)
1066
1067         # in2out
1068         p1 = (
1069             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1070             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1071             / ICMP(type=8, id=0xABCD, seq=0)
1072         )
1073
1074         self.pg0.add_stream(p1)
1075         self.pg_enable_capture(self.pg_interfaces)
1076         self.pg_start()
1077         c = self.pg1.get_capture(1)[0]
1078
1079         self.logger.debug(self.vapi.cli("show trace"))
1080
1081         # out2in
1082         p2 = (
1083             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1084             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, id=0xEE59)
1085             / ICMP(type=0, id=c[ICMP].id, seq=0)
1086         )
1087
1088         # force checksum calculation
1089         p2 = p2.__class__(bytes(p2))
1090
1091         self.logger.debug(ppp("Packet before modification:", p2))
1092
1093         # hex representation of vss monitoring ethernet trailer
1094         # this seems to be just added to end of packet without modifying
1095         # IP or ICMP lengths / checksums
1096         p2 = p2 / Raw("\x00\x00\x52\x54\x00\x46\xab\x04\x84\x18")
1097         # change it so that IP/ICMP is unaffected
1098         p2[IP].len = 28
1099
1100         self.logger.debug(ppp("Packet with added trailer:", p2))
1101
1102         self.pg1.add_stream(p2)
1103         self.pg_enable_capture(self.pg_interfaces)
1104         self.pg_start()
1105
1106         self.pg0.get_capture(1)
1107
1108     def test_users_dump(self):
1109         """NAT44ED API test - nat44_user_dump"""
1110
1111         self.nat_add_address(self.nat_addr)
1112         self.nat_add_inside_interface(self.pg0)
1113         self.nat_add_outside_interface(self.pg1)
1114
1115         self.vapi.nat44_forwarding_enable_disable(enable=1)
1116
1117         local_ip = self.pg0.remote_ip4
1118         external_ip = self.nat_addr
1119         self.nat_add_static_mapping(local_ip, external_ip)
1120
1121         users = self.vapi.nat44_user_dump()
1122         self.assertEqual(len(users), 0)
1123
1124         # in2out - static mapping match
1125
1126         pkts = self.create_stream_out(self.pg1)
1127         self.pg1.add_stream(pkts)
1128         self.pg_enable_capture(self.pg_interfaces)
1129         self.pg_start()
1130         capture = self.pg0.get_capture(len(pkts))
1131         self.verify_capture_in(capture, self.pg0)
1132
1133         pkts = self.create_stream_in(self.pg0, self.pg1)
1134         self.pg0.add_stream(pkts)
1135         self.pg_enable_capture(self.pg_interfaces)
1136         self.pg_start()
1137         capture = self.pg1.get_capture(len(pkts))
1138         self.verify_capture_out(capture, same_port=True)
1139
1140         users = self.vapi.nat44_user_dump()
1141         self.assertEqual(len(users), 1)
1142         static_user = users[0]
1143         self.assertEqual(static_user.nstaticsessions, 3)
1144         self.assertEqual(static_user.nsessions, 0)
1145
1146         # in2out - no static mapping match (forwarding test)
1147
1148         host0 = self.pg0.remote_hosts[0]
1149         self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1150         try:
1151             pkts = self.create_stream_out(
1152                 self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1153             )
1154             self.pg1.add_stream(pkts)
1155             self.pg_enable_capture(self.pg_interfaces)
1156             self.pg_start()
1157             capture = self.pg0.get_capture(len(pkts))
1158             self.verify_capture_in(capture, self.pg0)
1159
1160             pkts = self.create_stream_in(self.pg0, self.pg1)
1161             self.pg0.add_stream(pkts)
1162             self.pg_enable_capture(self.pg_interfaces)
1163             self.pg_start()
1164             capture = self.pg1.get_capture(len(pkts))
1165             self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, same_port=True)
1166         finally:
1167             self.pg0.remote_hosts[0] = host0
1168
1169         users = self.vapi.nat44_user_dump()
1170         self.assertEqual(len(users), 2)
1171         if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4:
1172             non_static_user = users[1]
1173             static_user = users[0]
1174         else:
1175             non_static_user = users[0]
1176             static_user = users[1]
1177         self.assertEqual(static_user.nstaticsessions, 3)
1178         self.assertEqual(static_user.nsessions, 0)
1179         self.assertEqual(non_static_user.nstaticsessions, 0)
1180         self.assertEqual(non_static_user.nsessions, 3)
1181
1182         users = self.vapi.nat44_user_dump()
1183         self.assertEqual(len(users), 2)
1184         if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4:
1185             non_static_user = users[1]
1186             static_user = users[0]
1187         else:
1188             non_static_user = users[0]
1189             static_user = users[1]
1190         self.assertEqual(static_user.nstaticsessions, 3)
1191         self.assertEqual(static_user.nsessions, 0)
1192         self.assertEqual(non_static_user.nstaticsessions, 0)
1193         self.assertEqual(non_static_user.nsessions, 3)
1194
1195     def test_frag_out_of_order_do_not_translate(self):
1196         """NAT44ED don't translate fragments arriving out of order"""
1197         self.nat_add_inside_interface(self.pg0)
1198         self.nat_add_outside_interface(self.pg1)
1199         self.vapi.nat44_forwarding_enable_disable(enable=True)
1200         self.frag_out_of_order(proto=IP_PROTOS.tcp, dont_translate=True)
1201
1202     def test_forwarding(self):
1203         """NAT44ED forwarding test"""
1204
1205         self.nat_add_inside_interface(self.pg0)
1206         self.nat_add_outside_interface(self.pg1)
1207         self.vapi.nat44_forwarding_enable_disable(enable=1)
1208
1209         real_ip = self.pg0.remote_ip4
1210         alias_ip = self.nat_addr
1211         flags = self.config_flags.NAT_IS_ADDR_ONLY
1212         self.vapi.nat44_add_del_static_mapping(
1213             is_add=1,
1214             local_ip_address=real_ip,
1215             external_ip_address=alias_ip,
1216             external_sw_if_index=0xFFFFFFFF,
1217             flags=flags,
1218         )
1219
1220         try:
1221             # in2out - static mapping match
1222
1223             pkts = self.create_stream_out(self.pg1)
1224             self.pg1.add_stream(pkts)
1225             self.pg_enable_capture(self.pg_interfaces)
1226             self.pg_start()
1227             capture = self.pg0.get_capture(len(pkts))
1228             self.verify_capture_in(capture, self.pg0)
1229
1230             pkts = self.create_stream_in(self.pg0, self.pg1)
1231             self.pg0.add_stream(pkts)
1232             self.pg_enable_capture(self.pg_interfaces)
1233             self.pg_start()
1234             capture = self.pg1.get_capture(len(pkts))
1235             self.verify_capture_out(capture, same_port=True)
1236
1237             # in2out - no static mapping match
1238
1239             host0 = self.pg0.remote_hosts[0]
1240             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1241             try:
1242                 pkts = self.create_stream_out(
1243                     self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1244                 )
1245                 self.pg1.add_stream(pkts)
1246                 self.pg_enable_capture(self.pg_interfaces)
1247                 self.pg_start()
1248                 capture = self.pg0.get_capture(len(pkts))
1249                 self.verify_capture_in(capture, self.pg0)
1250
1251                 pkts = self.create_stream_in(self.pg0, self.pg1)
1252                 self.pg0.add_stream(pkts)
1253                 self.pg_enable_capture(self.pg_interfaces)
1254                 self.pg_start()
1255                 capture = self.pg1.get_capture(len(pkts))
1256                 self.verify_capture_out(
1257                     capture, nat_ip=self.pg0.remote_ip4, same_port=True
1258                 )
1259             finally:
1260                 self.pg0.remote_hosts[0] = host0
1261
1262             user = self.pg0.remote_hosts[1]
1263             sessions = self.vapi.nat44_user_session_dump(user.ip4, 0)
1264             self.assertEqual(len(sessions), 3)
1265             self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1266             self.vapi.nat44_del_session(
1267                 address=sessions[0].inside_ip_address,
1268                 port=sessions[0].inside_port,
1269                 protocol=sessions[0].protocol,
1270                 flags=(
1271                     self.config_flags.NAT_IS_INSIDE
1272                     | self.config_flags.NAT_IS_EXT_HOST_VALID
1273                 ),
1274                 ext_host_address=sessions[0].ext_host_address,
1275                 ext_host_port=sessions[0].ext_host_port,
1276             )
1277             sessions = self.vapi.nat44_user_session_dump(user.ip4, 0)
1278             self.assertEqual(len(sessions), 2)
1279
1280         finally:
1281             self.vapi.nat44_forwarding_enable_disable(enable=0)
1282             flags = self.config_flags.NAT_IS_ADDR_ONLY
1283             self.vapi.nat44_add_del_static_mapping(
1284                 is_add=0,
1285                 local_ip_address=real_ip,
1286                 external_ip_address=alias_ip,
1287                 external_sw_if_index=0xFFFFFFFF,
1288                 flags=flags,
1289             )
1290
1291     def test_output_feature_and_service2(self):
1292         """NAT44ED interface output feature and service host direct access"""
1293         self.vapi.nat44_forwarding_enable_disable(enable=1)
1294         self.nat_add_address(self.nat_addr)
1295
1296         self.vapi.nat44_ed_add_del_output_interface(
1297             sw_if_index=self.pg1.sw_if_index, is_add=1
1298         )
1299
1300         # session initiated from service host - translate
1301         pkts = self.create_stream_in(self.pg0, self.pg1)
1302         self.pg0.add_stream(pkts)
1303         self.pg_enable_capture(self.pg_interfaces)
1304         self.pg_start()
1305         capture = self.pg1.get_capture(len(pkts))
1306         self.verify_capture_out(capture, ignore_port=True)
1307
1308         pkts = self.create_stream_out(self.pg1)
1309         self.pg1.add_stream(pkts)
1310         self.pg_enable_capture(self.pg_interfaces)
1311         self.pg_start()
1312         capture = self.pg0.get_capture(len(pkts))
1313         self.verify_capture_in(capture, self.pg0)
1314
1315         # session initiated from remote host - do not translate
1316         tcp_port_in = self.tcp_port_in
1317         udp_port_in = self.udp_port_in
1318         icmp_id_in = self.icmp_id_in
1319
1320         self.tcp_port_in = 60303
1321         self.udp_port_in = 60304
1322         self.icmp_id_in = 60305
1323
1324         try:
1325             pkts = self.create_stream_out(
1326                 self.pg1, self.pg0.remote_ip4, use_inside_ports=True
1327             )
1328             self.pg1.add_stream(pkts)
1329             self.pg_enable_capture(self.pg_interfaces)
1330             self.pg_start()
1331             capture = self.pg0.get_capture(len(pkts))
1332             self.verify_capture_in(capture, self.pg0)
1333
1334             pkts = self.create_stream_in(self.pg0, self.pg1)
1335             self.pg0.add_stream(pkts)
1336             self.pg_enable_capture(self.pg_interfaces)
1337             self.pg_start()
1338             capture = self.pg1.get_capture(len(pkts))
1339             self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, same_port=True)
1340         finally:
1341             self.tcp_port_in = tcp_port_in
1342             self.udp_port_in = udp_port_in
1343             self.icmp_id_in = icmp_id_in
1344
1345     def test_twice_nat(self):
1346         """NAT44ED Twice NAT"""
1347         self.twice_nat_common()
1348
1349     def test_self_twice_nat_positive(self):
1350         """NAT44ED Self Twice NAT (positive test)"""
1351         self.twice_nat_common(self_twice_nat=True, same_pg=True)
1352
1353     def test_self_twice_nat_lb_positive(self):
1354         """NAT44ED Self Twice NAT local service load balancing (positive test)"""
1355         self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, client_id=1)
1356
1357     def test_twice_nat_lb(self):
1358         """NAT44ED Twice NAT local service load balancing"""
1359         self.twice_nat_common(lb=True)
1360
1361     def test_output_feature(self):
1362         """NAT44ED interface output feature (in2out postrouting)"""
1363         self.vapi.nat44_forwarding_enable_disable(enable=1)
1364         self.nat_add_address(self.nat_addr)
1365
1366         self.nat_add_outside_interface(self.pg0)
1367         self.vapi.nat44_ed_add_del_output_interface(
1368             sw_if_index=self.pg1.sw_if_index, is_add=1
1369         )
1370
1371         # in2out
1372         pkts = self.create_stream_in(self.pg0, self.pg1)
1373         self.pg0.add_stream(pkts)
1374         self.pg_enable_capture(self.pg_interfaces)
1375         self.pg_start()
1376         capture = self.pg1.get_capture(len(pkts))
1377         self.verify_capture_out(capture, ignore_port=True)
1378         self.logger.debug(self.vapi.cli("show trace"))
1379
1380         # out2in
1381         pkts = self.create_stream_out(self.pg1)
1382         self.pg1.add_stream(pkts)
1383         self.pg_enable_capture(self.pg_interfaces)
1384         self.pg_start()
1385         capture = self.pg0.get_capture(len(pkts))
1386         self.verify_capture_in(capture, self.pg0)
1387         self.logger.debug(self.vapi.cli("show trace"))
1388
1389         # in2out
1390         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1391         self.pg0.add_stream(pkts)
1392         self.pg_enable_capture(self.pg_interfaces)
1393         self.pg_start()
1394         capture = self.pg1.get_capture(len(pkts))
1395         self.verify_capture_out(capture, ignore_port=True)
1396         self.logger.debug(self.vapi.cli("show trace"))
1397
1398         # out2in
1399         pkts = self.create_stream_out(self.pg1, ttl=2)
1400         self.pg1.add_stream(pkts)
1401         self.pg_enable_capture(self.pg_interfaces)
1402         self.pg_start()
1403         capture = self.pg0.get_capture(len(pkts))
1404         self.verify_capture_in(capture, self.pg0)
1405         self.logger.debug(self.vapi.cli("show trace"))
1406
1407         # in2out
1408         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1409         capture = self.send_and_expect_some(self.pg0, pkts, self.pg0)
1410         for p in capture:
1411             self.assertIn(ICMP, p)
1412             self.assertEqual(p[ICMP].type, 11)  # 11 == time-exceeded
1413
1414     def test_static_with_port_out2(self):
1415         """NAT44ED 1:1 NAPT asymmetrical rule"""
1416
1417         external_port = 80
1418         local_port = 8080
1419
1420         self.vapi.nat44_forwarding_enable_disable(enable=1)
1421         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1422         self.nat_add_static_mapping(
1423             self.pg0.remote_ip4,
1424             self.nat_addr,
1425             local_port,
1426             external_port,
1427             proto=IP_PROTOS.tcp,
1428             flags=flags,
1429         )
1430
1431         self.nat_add_inside_interface(self.pg0)
1432         self.nat_add_outside_interface(self.pg1)
1433
1434         # from client to service
1435         p = (
1436             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1437             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1438             / TCP(sport=12345, dport=external_port)
1439         )
1440         self.pg1.add_stream(p)
1441         self.pg_enable_capture(self.pg_interfaces)
1442         self.pg_start()
1443         capture = self.pg0.get_capture(1)
1444         p = capture[0]
1445         try:
1446             ip = p[IP]
1447             tcp = p[TCP]
1448             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1449             self.assertEqual(tcp.dport, local_port)
1450             self.assert_packet_checksums_valid(p)
1451         except:
1452             self.logger.error(ppp("Unexpected or invalid packet:", p))
1453             raise
1454
1455         # ICMP error
1456         p = (
1457             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1458             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1459             / ICMP(type=11)
1460             / capture[0][IP]
1461         )
1462         self.pg0.add_stream(p)
1463         self.pg_enable_capture(self.pg_interfaces)
1464         self.pg_start()
1465         capture = self.pg1.get_capture(1)
1466         p = capture[0]
1467         try:
1468             self.assertEqual(p[IP].src, self.nat_addr)
1469             inner = p[IPerror]
1470             self.assertEqual(inner.dst, self.nat_addr)
1471             self.assertEqual(inner[TCPerror].dport, external_port)
1472         except:
1473             self.logger.error(ppp("Unexpected or invalid packet:", p))
1474             raise
1475
1476         # from service back to client
1477         p = (
1478             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1479             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1480             / TCP(sport=local_port, dport=12345)
1481         )
1482         self.pg0.add_stream(p)
1483         self.pg_enable_capture(self.pg_interfaces)
1484         self.pg_start()
1485         capture = self.pg1.get_capture(1)
1486         p = capture[0]
1487         try:
1488             ip = p[IP]
1489             tcp = p[TCP]
1490             self.assertEqual(ip.src, self.nat_addr)
1491             self.assertEqual(tcp.sport, external_port)
1492             self.assert_packet_checksums_valid(p)
1493         except:
1494             self.logger.error(ppp("Unexpected or invalid packet:", p))
1495             raise
1496
1497         # ICMP error
1498         p = (
1499             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1500             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1501             / ICMP(type=11)
1502             / capture[0][IP]
1503         )
1504         self.pg1.add_stream(p)
1505         self.pg_enable_capture(self.pg_interfaces)
1506         self.pg_start()
1507         capture = self.pg0.get_capture(1)
1508         p = capture[0]
1509         try:
1510             self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1511             inner = p[IPerror]
1512             self.assertEqual(inner.src, self.pg0.remote_ip4)
1513             self.assertEqual(inner[TCPerror].sport, local_port)
1514         except:
1515             self.logger.error(ppp("Unexpected or invalid packet:", p))
1516             raise
1517
1518         # from client to server (no translation)
1519         p = (
1520             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1521             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
1522             / TCP(sport=12346, dport=local_port)
1523         )
1524         self.pg1.add_stream(p)
1525         self.pg_enable_capture(self.pg_interfaces)
1526         self.pg_start()
1527         capture = self.pg0.get_capture(1)
1528         p = capture[0]
1529         try:
1530             ip = p[IP]
1531             tcp = p[TCP]
1532             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1533             self.assertEqual(tcp.dport, local_port)
1534             self.assert_packet_checksums_valid(p)
1535         except:
1536             self.logger.error(ppp("Unexpected or invalid packet:", p))
1537             raise
1538
1539         # from service back to client (no translation)
1540         p = (
1541             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1542             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1543             / TCP(sport=local_port, dport=12346)
1544         )
1545         self.pg0.add_stream(p)
1546         self.pg_enable_capture(self.pg_interfaces)
1547         self.pg_start()
1548         capture = self.pg1.get_capture(1)
1549         p = capture[0]
1550         try:
1551             ip = p[IP]
1552             tcp = p[TCP]
1553             self.assertEqual(ip.src, self.pg0.remote_ip4)
1554             self.assertEqual(tcp.sport, local_port)
1555             self.assert_packet_checksums_valid(p)
1556         except:
1557             self.logger.error(ppp("Unexpected or invalid packet:", p))
1558             raise
1559
1560     def test_static_lb(self):
1561         """NAT44ED local service load balancing"""
1562         external_addr_n = self.nat_addr
1563         external_port = 80
1564         local_port = 8080
1565         server1 = self.pg0.remote_hosts[0]
1566         server2 = self.pg0.remote_hosts[1]
1567
1568         locals = [
1569             {"addr": server1.ip4, "port": local_port, "probability": 70, "vrf_id": 0},
1570             {"addr": server2.ip4, "port": local_port, "probability": 30, "vrf_id": 0},
1571         ]
1572
1573         self.nat_add_address(self.nat_addr)
1574         self.vapi.nat44_add_del_lb_static_mapping(
1575             is_add=1,
1576             external_addr=external_addr_n,
1577             external_port=external_port,
1578             protocol=IP_PROTOS.tcp,
1579             local_num=len(locals),
1580             locals=locals,
1581         )
1582         flags = self.config_flags.NAT_IS_INSIDE
1583         self.vapi.nat44_interface_add_del_feature(
1584             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1585         )
1586         self.vapi.nat44_interface_add_del_feature(
1587             sw_if_index=self.pg1.sw_if_index, is_add=1
1588         )
1589
1590         # from client to service
1591         p = (
1592             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1593             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1594             / TCP(sport=12345, dport=external_port)
1595         )
1596         self.pg1.add_stream(p)
1597         self.pg_enable_capture(self.pg_interfaces)
1598         self.pg_start()
1599         capture = self.pg0.get_capture(1)
1600         p = capture[0]
1601         server = None
1602         try:
1603             ip = p[IP]
1604             tcp = p[TCP]
1605             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1606             if ip.dst == server1.ip4:
1607                 server = server1
1608             else:
1609                 server = server2
1610             self.assertEqual(tcp.dport, local_port)
1611             self.assert_packet_checksums_valid(p)
1612         except:
1613             self.logger.error(ppp("Unexpected or invalid packet:", p))
1614             raise
1615
1616         # from service back to client
1617         p = (
1618             Ether(src=server.mac, dst=self.pg0.local_mac)
1619             / IP(src=server.ip4, dst=self.pg1.remote_ip4)
1620             / TCP(sport=local_port, dport=12345)
1621         )
1622         self.pg0.add_stream(p)
1623         self.pg_enable_capture(self.pg_interfaces)
1624         self.pg_start()
1625         capture = self.pg1.get_capture(1)
1626         p = capture[0]
1627         try:
1628             ip = p[IP]
1629             tcp = p[TCP]
1630             self.assertEqual(ip.src, self.nat_addr)
1631             self.assertEqual(tcp.sport, external_port)
1632             self.assert_packet_checksums_valid(p)
1633         except:
1634             self.logger.error(ppp("Unexpected or invalid packet:", p))
1635             raise
1636
1637         sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
1638         self.assertEqual(len(sessions), 1)
1639         self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1640         self.vapi.nat44_del_session(
1641             address=sessions[0].inside_ip_address,
1642             port=sessions[0].inside_port,
1643             protocol=sessions[0].protocol,
1644             flags=(
1645                 self.config_flags.NAT_IS_INSIDE
1646                 | self.config_flags.NAT_IS_EXT_HOST_VALID
1647             ),
1648             ext_host_address=sessions[0].ext_host_address,
1649             ext_host_port=sessions[0].ext_host_port,
1650         )
1651         sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
1652         self.assertEqual(len(sessions), 0)
1653
1654     def test_static_lb_2(self):
1655         """NAT44ED local service load balancing (asymmetrical rule)"""
1656         external_addr = self.nat_addr
1657         external_port = 80
1658         local_port = 8080
1659         server1 = self.pg0.remote_hosts[0]
1660         server2 = self.pg0.remote_hosts[1]
1661
1662         locals = [
1663             {"addr": server1.ip4, "port": local_port, "probability": 70, "vrf_id": 0},
1664             {"addr": server2.ip4, "port": local_port, "probability": 30, "vrf_id": 0},
1665         ]
1666
1667         self.vapi.nat44_forwarding_enable_disable(enable=1)
1668         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1669         self.vapi.nat44_add_del_lb_static_mapping(
1670             is_add=1,
1671             flags=flags,
1672             external_addr=external_addr,
1673             external_port=external_port,
1674             protocol=IP_PROTOS.tcp,
1675             local_num=len(locals),
1676             locals=locals,
1677         )
1678         flags = self.config_flags.NAT_IS_INSIDE
1679         self.vapi.nat44_interface_add_del_feature(
1680             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1681         )
1682         self.vapi.nat44_interface_add_del_feature(
1683             sw_if_index=self.pg1.sw_if_index, is_add=1
1684         )
1685
1686         # from client to service
1687         p = (
1688             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1689             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1690             / TCP(sport=12345, dport=external_port)
1691         )
1692         self.pg1.add_stream(p)
1693         self.pg_enable_capture(self.pg_interfaces)
1694         self.pg_start()
1695         capture = self.pg0.get_capture(1)
1696         p = capture[0]
1697         server = None
1698         try:
1699             ip = p[IP]
1700             tcp = p[TCP]
1701             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1702             if ip.dst == server1.ip4:
1703                 server = server1
1704             else:
1705                 server = server2
1706             self.assertEqual(tcp.dport, local_port)
1707             self.assert_packet_checksums_valid(p)
1708         except:
1709             self.logger.error(ppp("Unexpected or invalid packet:", p))
1710             raise
1711
1712         # from service back to client
1713         p = (
1714             Ether(src=server.mac, dst=self.pg0.local_mac)
1715             / IP(src=server.ip4, dst=self.pg1.remote_ip4)
1716             / TCP(sport=local_port, dport=12345)
1717         )
1718         self.pg0.add_stream(p)
1719         self.pg_enable_capture(self.pg_interfaces)
1720         self.pg_start()
1721         capture = self.pg1.get_capture(1)
1722         p = capture[0]
1723         try:
1724             ip = p[IP]
1725             tcp = p[TCP]
1726             self.assertEqual(ip.src, self.nat_addr)
1727             self.assertEqual(tcp.sport, external_port)
1728             self.assert_packet_checksums_valid(p)
1729         except:
1730             self.logger.error(ppp("Unexpected or invalid packet:", p))
1731             raise
1732
1733         # from client to server (no translation)
1734         p = (
1735             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1736             / IP(src=self.pg1.remote_ip4, dst=server1.ip4)
1737             / TCP(sport=12346, dport=local_port)
1738         )
1739         self.pg1.add_stream(p)
1740         self.pg_enable_capture(self.pg_interfaces)
1741         self.pg_start()
1742         capture = self.pg0.get_capture(1)
1743         p = capture[0]
1744         server = None
1745         try:
1746             ip = p[IP]
1747             tcp = p[TCP]
1748             self.assertEqual(ip.dst, server1.ip4)
1749             self.assertEqual(tcp.dport, local_port)
1750             self.assert_packet_checksums_valid(p)
1751         except:
1752             self.logger.error(ppp("Unexpected or invalid packet:", p))
1753             raise
1754
1755         # from service back to client (no translation)
1756         p = (
1757             Ether(src=server1.mac, dst=self.pg0.local_mac)
1758             / IP(src=server1.ip4, dst=self.pg1.remote_ip4)
1759             / TCP(sport=local_port, dport=12346)
1760         )
1761         self.pg0.add_stream(p)
1762         self.pg_enable_capture(self.pg_interfaces)
1763         self.pg_start()
1764         capture = self.pg1.get_capture(1)
1765         p = capture[0]
1766         try:
1767             ip = p[IP]
1768             tcp = p[TCP]
1769             self.assertEqual(ip.src, server1.ip4)
1770             self.assertEqual(tcp.sport, local_port)
1771             self.assert_packet_checksums_valid(p)
1772         except:
1773             self.logger.error(ppp("Unexpected or invalid packet:", p))
1774             raise
1775
1776     def test_lb_affinity(self):
1777         """NAT44ED local service load balancing affinity"""
1778         external_addr = self.nat_addr
1779         external_port = 80
1780         local_port = 8080
1781         server1 = self.pg0.remote_hosts[0]
1782         server2 = self.pg0.remote_hosts[1]
1783
1784         locals = [
1785             {"addr": server1.ip4, "port": local_port, "probability": 50, "vrf_id": 0},
1786             {"addr": server2.ip4, "port": local_port, "probability": 50, "vrf_id": 0},
1787         ]
1788
1789         self.nat_add_address(self.nat_addr)
1790         self.vapi.nat44_add_del_lb_static_mapping(
1791             is_add=1,
1792             external_addr=external_addr,
1793             external_port=external_port,
1794             protocol=IP_PROTOS.tcp,
1795             affinity=10800,
1796             local_num=len(locals),
1797             locals=locals,
1798         )
1799         flags = self.config_flags.NAT_IS_INSIDE
1800         self.vapi.nat44_interface_add_del_feature(
1801             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1802         )
1803         self.vapi.nat44_interface_add_del_feature(
1804             sw_if_index=self.pg1.sw_if_index, is_add=1
1805         )
1806
1807         p = (
1808             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1809             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1810             / TCP(sport=1025, dport=external_port)
1811         )
1812         self.pg1.add_stream(p)
1813         self.pg_enable_capture(self.pg_interfaces)
1814         self.pg_start()
1815         capture = self.pg0.get_capture(1)
1816         backend = capture[0][IP].dst
1817
1818         sessions = self.vapi.nat44_user_session_dump(backend, 0)
1819         self.assertEqual(len(sessions), 1)
1820         self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID)
1821         self.vapi.nat44_del_session(
1822             address=sessions[0].inside_ip_address,
1823             port=sessions[0].inside_port,
1824             protocol=sessions[0].protocol,
1825             flags=(
1826                 self.config_flags.NAT_IS_INSIDE
1827                 | self.config_flags.NAT_IS_EXT_HOST_VALID
1828             ),
1829             ext_host_address=sessions[0].ext_host_address,
1830             ext_host_port=sessions[0].ext_host_port,
1831         )
1832
1833         pkts = []
1834         for port in range(1030, 1100):
1835             p = (
1836                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1837                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1838                 / TCP(sport=port, dport=external_port)
1839             )
1840             pkts.append(p)
1841         self.pg1.add_stream(pkts)
1842         self.pg_enable_capture(self.pg_interfaces)
1843         self.pg_start()
1844         capture = self.pg0.get_capture(len(pkts))
1845         for p in capture:
1846             self.assertEqual(p[IP].dst, backend)
1847
1848     def test_multiple_vrf_1(self):
1849         """Multiple VRF - both client & service in VRF1"""
1850
1851         external_addr = "1.2.3.4"
1852         external_port = 80
1853         local_port = 8080
1854         port = 0
1855
1856         flags = self.config_flags.NAT_IS_INSIDE
1857         self.vapi.nat44_interface_add_del_feature(
1858             sw_if_index=self.pg5.sw_if_index, is_add=1
1859         )
1860         self.vapi.nat44_interface_add_del_feature(
1861             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
1862         )
1863         self.vapi.nat44_interface_add_del_feature(
1864             sw_if_index=self.pg6.sw_if_index, is_add=1
1865         )
1866         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1867         self.nat_add_static_mapping(
1868             self.pg5.remote_ip4,
1869             external_addr,
1870             local_port,
1871             external_port,
1872             vrf_id=1,
1873             proto=IP_PROTOS.tcp,
1874             flags=flags,
1875         )
1876
1877         p = (
1878             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
1879             / IP(src=self.pg6.remote_ip4, dst=external_addr)
1880             / TCP(sport=12345, dport=external_port)
1881         )
1882         self.pg6.add_stream(p)
1883         self.pg_enable_capture(self.pg_interfaces)
1884         self.pg_start()
1885         capture = self.pg5.get_capture(1)
1886         p = capture[0]
1887         try:
1888             ip = p[IP]
1889             tcp = p[TCP]
1890             self.assertEqual(ip.dst, self.pg5.remote_ip4)
1891             self.assertEqual(tcp.dport, local_port)
1892             self.assert_packet_checksums_valid(p)
1893         except:
1894             self.logger.error(ppp("Unexpected or invalid packet:", p))
1895             raise
1896
1897         p = (
1898             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
1899             / IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4)
1900             / TCP(sport=local_port, dport=12345)
1901         )
1902         self.pg5.add_stream(p)
1903         self.pg_enable_capture(self.pg_interfaces)
1904         self.pg_start()
1905         capture = self.pg6.get_capture(1)
1906         p = capture[0]
1907         try:
1908             ip = p[IP]
1909             tcp = p[TCP]
1910             self.assertEqual(ip.src, external_addr)
1911             self.assertEqual(tcp.sport, external_port)
1912             self.assert_packet_checksums_valid(p)
1913         except:
1914             self.logger.error(ppp("Unexpected or invalid packet:", p))
1915             raise
1916
1917     def test_multiple_vrf_2(self):
1918         """Multiple VRF - dynamic NAT from VRF1 to VRF0 (output-feature)"""
1919
1920         external_addr = "1.2.3.4"
1921         external_port = 80
1922         local_port = 8080
1923         port = 0
1924
1925         self.nat_add_address(self.nat_addr)
1926         flags = self.config_flags.NAT_IS_INSIDE
1927         self.vapi.nat44_ed_add_del_output_interface(
1928             sw_if_index=self.pg1.sw_if_index, is_add=1
1929         )
1930         self.vapi.nat44_interface_add_del_feature(
1931             sw_if_index=self.pg5.sw_if_index, is_add=1
1932         )
1933         self.vapi.nat44_interface_add_del_feature(
1934             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
1935         )
1936         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
1937         self.nat_add_static_mapping(
1938             self.pg5.remote_ip4,
1939             external_addr,
1940             local_port,
1941             external_port,
1942             vrf_id=1,
1943             proto=IP_PROTOS.tcp,
1944             flags=flags,
1945         )
1946
1947         p = (
1948             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
1949             / IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4)
1950             / TCP(sport=2345, dport=22)
1951         )
1952         self.pg5.add_stream(p)
1953         self.pg_enable_capture(self.pg_interfaces)
1954         self.pg_start()
1955         capture = self.pg1.get_capture(1)
1956         p = capture[0]
1957         try:
1958             ip = p[IP]
1959             tcp = p[TCP]
1960             self.assertEqual(ip.src, self.nat_addr)
1961             self.assert_packet_checksums_valid(p)
1962             port = tcp.sport
1963         except:
1964             self.logger.error(ppp("Unexpected or invalid packet:", p))
1965             raise
1966
1967         p = (
1968             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1969             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1970             / TCP(sport=22, dport=port)
1971         )
1972         self.pg1.add_stream(p)
1973         self.pg_enable_capture(self.pg_interfaces)
1974         self.pg_start()
1975         capture = self.pg5.get_capture(1)
1976         p = capture[0]
1977         try:
1978             ip = p[IP]
1979             tcp = p[TCP]
1980             self.assertEqual(ip.dst, self.pg5.remote_ip4)
1981             self.assertEqual(tcp.dport, 2345)
1982             self.assert_packet_checksums_valid(p)
1983         except:
1984             self.logger.error(ppp("Unexpected or invalid packet:", p))
1985             raise
1986
1987     def test_multiple_vrf_3(self):
1988         """Multiple VRF - client in VRF1, service in VRF0"""
1989
1990         external_addr = "1.2.3.4"
1991         external_port = 80
1992         local_port = 8080
1993         port = 0
1994
1995         flags = self.config_flags.NAT_IS_INSIDE
1996         self.vapi.nat44_interface_add_del_feature(
1997             sw_if_index=self.pg0.sw_if_index, is_add=1
1998         )
1999         self.vapi.nat44_interface_add_del_feature(
2000             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2001         )
2002         self.vapi.nat44_interface_add_del_feature(
2003             sw_if_index=self.pg6.sw_if_index, is_add=1
2004         )
2005         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2006         self.nat_add_static_mapping(
2007             self.pg0.remote_ip4,
2008             external_sw_if_index=self.pg0.sw_if_index,
2009             local_port=local_port,
2010             vrf_id=0,
2011             external_port=external_port,
2012             proto=IP_PROTOS.tcp,
2013             flags=flags,
2014         )
2015
2016         # from client VRF1 to service VRF0
2017         p = (
2018             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
2019             / IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4)
2020             / TCP(sport=12346, dport=external_port)
2021         )
2022         self.pg6.add_stream(p)
2023         self.pg_enable_capture(self.pg_interfaces)
2024         self.pg_start()
2025         capture = self.pg0.get_capture(1)
2026         p = capture[0]
2027         try:
2028             ip = p[IP]
2029             tcp = p[TCP]
2030             self.assertEqual(ip.dst, self.pg0.remote_ip4)
2031             self.assertEqual(tcp.dport, local_port)
2032             self.assert_packet_checksums_valid(p)
2033         except:
2034             self.logger.error(ppp("Unexpected or invalid packet:", p))
2035             raise
2036
2037         # from service VRF0 back to client VRF1
2038         p = (
2039             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2040             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2041             / TCP(sport=local_port, dport=12346)
2042         )
2043         self.pg0.add_stream(p)
2044         self.pg_enable_capture(self.pg_interfaces)
2045         self.pg_start()
2046         capture = self.pg6.get_capture(1)
2047         p = capture[0]
2048         try:
2049             ip = p[IP]
2050             tcp = p[TCP]
2051             self.assertEqual(ip.src, self.pg0.local_ip4)
2052             self.assertEqual(tcp.sport, external_port)
2053             self.assert_packet_checksums_valid(p)
2054         except:
2055             self.logger.error(ppp("Unexpected or invalid packet:", p))
2056             raise
2057
2058     def test_multiple_vrf_4(self):
2059         """Multiple VRF - client in VRF0, service in VRF1"""
2060
2061         external_addr = "1.2.3.4"
2062         external_port = 80
2063         local_port = 8080
2064         port = 0
2065
2066         flags = self.config_flags.NAT_IS_INSIDE
2067         self.vapi.nat44_interface_add_del_feature(
2068             sw_if_index=self.pg0.sw_if_index, is_add=1
2069         )
2070         self.vapi.nat44_interface_add_del_feature(
2071             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2072         )
2073         self.vapi.nat44_interface_add_del_feature(
2074             sw_if_index=self.pg5.sw_if_index, is_add=1
2075         )
2076         self.vapi.nat44_interface_add_del_feature(
2077             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
2078         )
2079         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2080         self.nat_add_static_mapping(
2081             self.pg5.remote_ip4,
2082             external_addr,
2083             local_port,
2084             external_port,
2085             vrf_id=1,
2086             proto=IP_PROTOS.tcp,
2087             flags=flags,
2088         )
2089
2090         # from client VRF0 to service VRF1
2091         p = (
2092             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2093             / IP(src=self.pg0.remote_ip4, dst=external_addr)
2094             / TCP(sport=12347, dport=external_port)
2095         )
2096         self.pg0.add_stream(p)
2097         self.pg_enable_capture(self.pg_interfaces)
2098         self.pg_start()
2099         capture = self.pg5.get_capture(1)
2100         p = capture[0]
2101         try:
2102             ip = p[IP]
2103             tcp = p[TCP]
2104             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2105             self.assertEqual(tcp.dport, local_port)
2106             self.assert_packet_checksums_valid(p)
2107         except:
2108             self.logger.error(ppp("Unexpected or invalid packet:", p))
2109             raise
2110
2111         # from service VRF1 back to client VRF0
2112         p = (
2113             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2114             / IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4)
2115             / TCP(sport=local_port, dport=12347)
2116         )
2117         self.pg5.add_stream(p)
2118         self.pg_enable_capture(self.pg_interfaces)
2119         self.pg_start()
2120         capture = self.pg0.get_capture(1)
2121         p = capture[0]
2122         try:
2123             ip = p[IP]
2124             tcp = p[TCP]
2125             self.assertEqual(ip.src, external_addr)
2126             self.assertEqual(tcp.sport, external_port)
2127             self.assert_packet_checksums_valid(p)
2128         except:
2129             self.logger.error(ppp("Unexpected or invalid packet:", p))
2130             raise
2131
2132     def test_multiple_vrf_5(self):
2133         """Multiple VRF - forwarding - no translation"""
2134
2135         external_addr = "1.2.3.4"
2136         external_port = 80
2137         local_port = 8080
2138         port = 0
2139
2140         self.vapi.nat44_forwarding_enable_disable(enable=1)
2141         flags = self.config_flags.NAT_IS_INSIDE
2142         self.vapi.nat44_interface_add_del_feature(
2143             sw_if_index=self.pg0.sw_if_index, is_add=1
2144         )
2145         self.vapi.nat44_interface_add_del_feature(
2146             sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags
2147         )
2148         self.vapi.nat44_interface_add_del_feature(
2149             sw_if_index=self.pg5.sw_if_index, is_add=1
2150         )
2151         self.vapi.nat44_interface_add_del_feature(
2152             sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags
2153         )
2154         self.vapi.nat44_interface_add_del_feature(
2155             sw_if_index=self.pg6.sw_if_index, is_add=1
2156         )
2157         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
2158         self.nat_add_static_mapping(
2159             self.pg5.remote_ip4,
2160             external_addr,
2161             local_port,
2162             external_port,
2163             vrf_id=1,
2164             proto=IP_PROTOS.tcp,
2165             flags=flags,
2166         )
2167         self.nat_add_static_mapping(
2168             self.pg0.remote_ip4,
2169             external_sw_if_index=self.pg0.sw_if_index,
2170             local_port=local_port,
2171             vrf_id=0,
2172             external_port=external_port,
2173             proto=IP_PROTOS.tcp,
2174             flags=flags,
2175         )
2176
2177         # from client to server (both VRF1, no translation)
2178         p = (
2179             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
2180             / IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4)
2181             / TCP(sport=12348, dport=local_port)
2182         )
2183         self.pg6.add_stream(p)
2184         self.pg_enable_capture(self.pg_interfaces)
2185         self.pg_start()
2186         capture = self.pg5.get_capture(1)
2187         p = capture[0]
2188         try:
2189             ip = p[IP]
2190             tcp = p[TCP]
2191             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2192             self.assertEqual(tcp.dport, local_port)
2193             self.assert_packet_checksums_valid(p)
2194         except:
2195             self.logger.error(ppp("Unexpected or invalid packet:", p))
2196             raise
2197
2198         # from server back to client (both VRF1, no translation)
2199         p = (
2200             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2201             / IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4)
2202             / TCP(sport=local_port, dport=12348)
2203         )
2204         self.pg5.add_stream(p)
2205         self.pg_enable_capture(self.pg_interfaces)
2206         self.pg_start()
2207         capture = self.pg6.get_capture(1)
2208         p = capture[0]
2209         try:
2210             ip = p[IP]
2211             tcp = p[TCP]
2212             self.assertEqual(ip.src, self.pg5.remote_ip4)
2213             self.assertEqual(tcp.sport, local_port)
2214             self.assert_packet_checksums_valid(p)
2215         except:
2216             self.logger.error(ppp("Unexpected or invalid packet:", p))
2217             raise
2218
2219         # from client VRF1 to server VRF0 (no translation)
2220         p = (
2221             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2222             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2223             / TCP(sport=local_port, dport=12349)
2224         )
2225         self.pg0.add_stream(p)
2226         self.pg_enable_capture(self.pg_interfaces)
2227         self.pg_start()
2228         capture = self.pg6.get_capture(1)
2229         p = capture[0]
2230         try:
2231             ip = p[IP]
2232             tcp = p[TCP]
2233             self.assertEqual(ip.src, self.pg0.remote_ip4)
2234             self.assertEqual(tcp.sport, local_port)
2235             self.assert_packet_checksums_valid(p)
2236         except:
2237             self.logger.error(ppp("Unexpected or invalid packet:", p))
2238             raise
2239
2240         # from server VRF0 back to client VRF1 (no translation)
2241         p = (
2242             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2243             / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4)
2244             / TCP(sport=local_port, dport=12349)
2245         )
2246         self.pg0.add_stream(p)
2247         self.pg_enable_capture(self.pg_interfaces)
2248         self.pg_start()
2249         capture = self.pg6.get_capture(1)
2250         p = capture[0]
2251         try:
2252             ip = p[IP]
2253             tcp = p[TCP]
2254             self.assertEqual(ip.src, self.pg0.remote_ip4)
2255             self.assertEqual(tcp.sport, local_port)
2256             self.assert_packet_checksums_valid(p)
2257         except:
2258             self.logger.error(ppp("Unexpected or invalid packet:", p))
2259             raise
2260
2261         # from client VRF0 to server VRF1 (no translation)
2262         p = (
2263             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2264             / IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4)
2265             / TCP(sport=12344, dport=local_port)
2266         )
2267         self.pg0.add_stream(p)
2268         self.pg_enable_capture(self.pg_interfaces)
2269         self.pg_start()
2270         capture = self.pg5.get_capture(1)
2271         p = capture[0]
2272         try:
2273             ip = p[IP]
2274             tcp = p[TCP]
2275             self.assertEqual(ip.dst, self.pg5.remote_ip4)
2276             self.assertEqual(tcp.dport, local_port)
2277             self.assert_packet_checksums_valid(p)
2278         except:
2279             self.logger.error(ppp("Unexpected or invalid packet:", p))
2280             raise
2281
2282         # from server VRF1 back to client VRF0 (no translation)
2283         p = (
2284             Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac)
2285             / IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4)
2286             / TCP(sport=local_port, dport=12344)
2287         )
2288         self.pg5.add_stream(p)
2289         self.pg_enable_capture(self.pg_interfaces)
2290         self.pg_start()
2291         capture = self.pg0.get_capture(1)
2292         p = capture[0]
2293         try:
2294             ip = p[IP]
2295             tcp = p[TCP]
2296             self.assertEqual(ip.src, self.pg5.remote_ip4)
2297             self.assertEqual(tcp.sport, local_port)
2298             self.assert_packet_checksums_valid(p)
2299         except:
2300             self.logger.error(ppp("Unexpected or invalid packet:", p))
2301             raise
2302
2303     def test_outside_address_distribution(self):
2304         """Outside address distribution based on source address"""
2305
2306         x = 100
2307         nat_addresses = []
2308
2309         for i in range(1, x):
2310             a = "10.0.0.%d" % i
2311             nat_addresses.append(a)
2312
2313         self.nat_add_inside_interface(self.pg0)
2314         self.nat_add_outside_interface(self.pg1)
2315
2316         self.vapi.nat44_add_del_address_range(
2317             first_ip_address=nat_addresses[0],
2318             last_ip_address=nat_addresses[-1],
2319             vrf_id=0xFFFFFFFF,
2320             is_add=1,
2321             flags=0,
2322         )
2323
2324         self.pg0.generate_remote_hosts(x)
2325
2326         pkts = []
2327         for i in range(x):
2328             info = self.create_packet_info(self.pg0, self.pg1)
2329             payload = self.info_to_payload(info)
2330             p = (
2331                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2332                 / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4)
2333                 / UDP(sport=7000 + i, dport=8000 + i)
2334                 / Raw(payload)
2335             )
2336             info.data = p
2337             pkts.append(p)
2338
2339         self.pg0.add_stream(pkts)
2340         self.pg_enable_capture(self.pg_interfaces)
2341         self.pg_start()
2342         recvd = self.pg1.get_capture(len(pkts))
2343         for p_recvd in recvd:
2344             payload_info = self.payload_to_info(p_recvd[Raw])
2345             packet_index = payload_info.index
2346             info = self._packet_infos[packet_index]
2347             self.assertTrue(info is not None)
2348             self.assertEqual(packet_index, info.index)
2349             p_sent = info.data
2350             packed = socket.inet_aton(p_sent[IP].src)
2351             numeric = struct.unpack("!L", packed)[0]
2352             numeric = socket.htonl(numeric)
2353             a = nat_addresses[(numeric - 1) % len(nat_addresses)]
2354             self.assertEqual(
2355                 a,
2356                 p_recvd[IP].src,
2357                 "Invalid packet (src IP %s translated to %s, but expected %s)"
2358                 % (p_sent[IP].src, p_recvd[IP].src, a),
2359             )
2360
2361
2362 class TestNAT44EDMW(TestNAT44ED):
2363     """NAT44ED MW Test Case"""
2364
2365     vpp_worker_count = 4
2366     max_sessions = 5000
2367
2368     def test_dynamic(self):
2369         """NAT44ED dynamic translation test"""
2370         pkt_count = 1500
2371         tcp_port_offset = 20
2372         udp_port_offset = 20
2373         icmp_id_offset = 20
2374
2375         self.nat_add_address(self.nat_addr)
2376         self.nat_add_inside_interface(self.pg0)
2377         self.nat_add_outside_interface(self.pg1)
2378
2379         # in2out
2380         tc1 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2381         uc1 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2382         ic1 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2383         dc1 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2384
2385         i2o_pkts = [[] for x in range(0, self.vpp_worker_count)]
2386
2387         for i in range(pkt_count):
2388             p = (
2389                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2390                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2391                 / TCP(sport=tcp_port_offset + i, dport=20)
2392             )
2393             i2o_pkts[p[TCP].sport % self.vpp_worker_count].append(p)
2394
2395             p = (
2396                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2397                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2398                 / UDP(sport=udp_port_offset + i, dport=20)
2399             )
2400             i2o_pkts[p[UDP].sport % self.vpp_worker_count].append(p)
2401
2402             p = (
2403                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2404                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2405                 / ICMP(id=icmp_id_offset + i, type="echo-request")
2406             )
2407             i2o_pkts[p[ICMP].id % self.vpp_worker_count].append(p)
2408
2409         for i in range(0, self.vpp_worker_count):
2410             if len(i2o_pkts[i]) > 0:
2411                 self.pg0.add_stream(i2o_pkts[i], worker=i)
2412
2413         self.pg_enable_capture(self.pg_interfaces)
2414         self.pg_start()
2415         capture = self.pg1.get_capture(pkt_count * 3, timeout=5)
2416
2417         if_idx = self.pg0.sw_if_index
2418         tc2 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
2419         uc2 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
2420         ic2 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
2421         dc2 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
2422
2423         self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2424         self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2425         self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2426         self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2427
2428         self.logger.info(self.vapi.cli("show trace"))
2429
2430         # out2in
2431         tc1 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2432         uc1 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2433         ic1 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2434         dc1 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2435
2436         recvd_tcp_ports = set()
2437         recvd_udp_ports = set()
2438         recvd_icmp_ids = set()
2439
2440         for p in capture:
2441             if TCP in p:
2442                 recvd_tcp_ports.add(p[TCP].sport)
2443             if UDP in p:
2444                 recvd_udp_ports.add(p[UDP].sport)
2445             if ICMP in p:
2446                 recvd_icmp_ids.add(p[ICMP].id)
2447
2448         recvd_tcp_ports = list(recvd_tcp_ports)
2449         recvd_udp_ports = list(recvd_udp_ports)
2450         recvd_icmp_ids = list(recvd_icmp_ids)
2451
2452         o2i_pkts = [[] for x in range(0, self.vpp_worker_count)]
2453         for i in range(pkt_count):
2454             p = (
2455                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2456                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2457                 / TCP(dport=choice(recvd_tcp_ports), sport=20)
2458             )
2459             o2i_pkts[p[TCP].dport % self.vpp_worker_count].append(p)
2460
2461             p = (
2462                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2463                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2464                 / UDP(dport=choice(recvd_udp_ports), sport=20)
2465             )
2466             o2i_pkts[p[UDP].dport % self.vpp_worker_count].append(p)
2467
2468             p = (
2469                 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2470                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2471                 / ICMP(id=choice(recvd_icmp_ids), type="echo-reply")
2472             )
2473             o2i_pkts[p[ICMP].id % self.vpp_worker_count].append(p)
2474
2475         for i in range(0, self.vpp_worker_count):
2476             if len(o2i_pkts[i]) > 0:
2477                 self.pg1.add_stream(o2i_pkts[i], worker=i)
2478
2479         self.pg_enable_capture(self.pg_interfaces)
2480         self.pg_start()
2481         capture = self.pg0.get_capture(pkt_count * 3)
2482         for packet in capture:
2483             try:
2484                 self.assert_packet_checksums_valid(packet)
2485                 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2486                 if packet.haslayer(TCP):
2487                     self.assert_in_range(
2488                         packet[TCP].dport,
2489                         tcp_port_offset,
2490                         tcp_port_offset + pkt_count,
2491                         "dst TCP port",
2492                     )
2493                 elif packet.haslayer(UDP):
2494                     self.assert_in_range(
2495                         packet[UDP].dport,
2496                         udp_port_offset,
2497                         udp_port_offset + pkt_count,
2498                         "dst UDP port",
2499                     )
2500                 else:
2501                     self.assert_in_range(
2502                         packet[ICMP].id,
2503                         icmp_id_offset,
2504                         icmp_id_offset + pkt_count,
2505                         "ICMP id",
2506                     )
2507             except:
2508                 self.logger.error(
2509                     ppp("Unexpected or invalid packet (inside network):", packet)
2510                 )
2511                 raise
2512
2513         if_idx = self.pg1.sw_if_index
2514         tc2 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
2515         uc2 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
2516         ic2 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
2517         dc2 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
2518
2519         self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
2520         self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
2521         self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
2522         self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
2523
2524         sc = self.statistics["/nat44-ed/total-sessions"]
2525         self.assertEqual(
2526             sc[:, 0].sum(),
2527             len(recvd_tcp_ports) + len(recvd_udp_ports) + len(recvd_icmp_ids),
2528         )
2529
2530     def test_frag_in_order(self):
2531         """NAT44ED translate fragments arriving in order"""
2532
2533         self.nat_add_address(self.nat_addr)
2534         self.nat_add_inside_interface(self.pg0)
2535         self.nat_add_outside_interface(self.pg1)
2536
2537         self.frag_in_order(proto=IP_PROTOS.tcp, ignore_port=True)
2538         self.frag_in_order(proto=IP_PROTOS.udp, ignore_port=True)
2539         self.frag_in_order(proto=IP_PROTOS.icmp, ignore_port=True)
2540
2541     def test_frag_in_order_do_not_translate(self):
2542         """NAT44ED don't translate fragments arriving in order"""
2543
2544         self.nat_add_address(self.nat_addr)
2545         self.nat_add_inside_interface(self.pg0)
2546         self.nat_add_outside_interface(self.pg1)
2547         self.vapi.nat44_forwarding_enable_disable(enable=True)
2548
2549         self.frag_in_order(proto=IP_PROTOS.tcp, dont_translate=True)
2550
2551     def test_frag_out_of_order(self):
2552         """NAT44ED translate fragments arriving out of order"""
2553
2554         self.nat_add_address(self.nat_addr)
2555         self.nat_add_inside_interface(self.pg0)
2556         self.nat_add_outside_interface(self.pg1)
2557
2558         self.frag_out_of_order(proto=IP_PROTOS.tcp, ignore_port=True)
2559         self.frag_out_of_order(proto=IP_PROTOS.udp, ignore_port=True)
2560         self.frag_out_of_order(proto=IP_PROTOS.icmp, ignore_port=True)
2561
2562     def test_frag_in_order_in_plus_out(self):
2563         """NAT44ED in+out interface fragments in order"""
2564
2565         in_port = self.random_port()
2566         out_port = self.random_port()
2567
2568         self.nat_add_address(self.nat_addr)
2569         self.nat_add_inside_interface(self.pg0)
2570         self.nat_add_outside_interface(self.pg0)
2571         self.nat_add_inside_interface(self.pg1)
2572         self.nat_add_outside_interface(self.pg1)
2573
2574         # add static mappings for server
2575         self.nat_add_static_mapping(
2576             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.tcp
2577         )
2578         self.nat_add_static_mapping(
2579             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.udp
2580         )
2581         self.nat_add_static_mapping(
2582             self.server_addr, self.nat_addr, proto=IP_PROTOS.icmp
2583         )
2584
2585         # run tests for each protocol
2586         self.frag_in_order_in_plus_out(
2587             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.tcp
2588         )
2589         self.frag_in_order_in_plus_out(
2590             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.udp
2591         )
2592         self.frag_in_order_in_plus_out(
2593             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.icmp
2594         )
2595
2596     def test_frag_out_of_order_in_plus_out(self):
2597         """NAT44ED in+out interface fragments out of order"""
2598
2599         in_port = self.random_port()
2600         out_port = self.random_port()
2601
2602         self.nat_add_address(self.nat_addr)
2603         self.nat_add_inside_interface(self.pg0)
2604         self.nat_add_outside_interface(self.pg0)
2605         self.nat_add_inside_interface(self.pg1)
2606         self.nat_add_outside_interface(self.pg1)
2607
2608         # add static mappings for server
2609         self.nat_add_static_mapping(
2610             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.tcp
2611         )
2612         self.nat_add_static_mapping(
2613             self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.udp
2614         )
2615         self.nat_add_static_mapping(
2616             self.server_addr, self.nat_addr, proto=IP_PROTOS.icmp
2617         )
2618
2619         # run tests for each protocol
2620         self.frag_out_of_order_in_plus_out(
2621             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.tcp
2622         )
2623         self.frag_out_of_order_in_plus_out(
2624             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.udp
2625         )
2626         self.frag_out_of_order_in_plus_out(
2627             self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.icmp
2628         )
2629
2630     def test_reass_hairpinning(self):
2631         """NAT44ED fragments hairpinning"""
2632
2633         server_addr = self.pg0.remote_hosts[1].ip4
2634
2635         host_in_port = self.random_port()
2636         server_in_port = self.random_port()
2637         server_out_port = self.random_port()
2638
2639         self.nat_add_address(self.nat_addr)
2640         self.nat_add_inside_interface(self.pg0)
2641         self.nat_add_outside_interface(self.pg1)
2642
2643         # add static mapping for server
2644         self.nat_add_static_mapping(
2645             server_addr,
2646             self.nat_addr,
2647             server_in_port,
2648             server_out_port,
2649             proto=IP_PROTOS.tcp,
2650         )
2651         self.nat_add_static_mapping(
2652             server_addr,
2653             self.nat_addr,
2654             server_in_port,
2655             server_out_port,
2656             proto=IP_PROTOS.udp,
2657         )
2658         self.nat_add_static_mapping(server_addr, self.nat_addr)
2659
2660         self.reass_hairpinning(
2661             server_addr,
2662             server_in_port,
2663             server_out_port,
2664             host_in_port,
2665             proto=IP_PROTOS.tcp,
2666             ignore_port=True,
2667         )
2668         self.reass_hairpinning(
2669             server_addr,
2670             server_in_port,
2671             server_out_port,
2672             host_in_port,
2673             proto=IP_PROTOS.udp,
2674             ignore_port=True,
2675         )
2676         self.reass_hairpinning(
2677             server_addr,
2678             server_in_port,
2679             server_out_port,
2680             host_in_port,
2681             proto=IP_PROTOS.icmp,
2682             ignore_port=True,
2683         )
2684
2685     def test_session_limit_per_vrf(self):
2686         """NAT44ED per vrf session limit"""
2687
2688         inside = self.pg0
2689         inside_vrf10 = self.pg2
2690         outside = self.pg1
2691
2692         limit = 5
2693
2694         # 2 interfaces pg0, pg1 (vrf10, limit 1 tcp session)
2695         # non existing vrf_id makes process core dump
2696         self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=10)
2697
2698         self.nat_add_inside_interface(inside)
2699         self.nat_add_inside_interface(inside_vrf10)
2700         self.nat_add_outside_interface(outside)
2701
2702         # vrf independent
2703         self.nat_add_interface_address(outside)
2704
2705         # BUG: causing core dump - when bad vrf_id is specified
2706         # self.nat_add_address(outside.local_ip4, vrf_id=20)
2707
2708         stream = self.create_tcp_stream(inside_vrf10, outside, limit * 2)
2709         inside_vrf10.add_stream(stream)
2710
2711         self.pg_enable_capture(self.pg_interfaces)
2712         self.pg_start()
2713
2714         capture = outside.get_capture(limit)
2715
2716         stream = self.create_tcp_stream(inside, outside, limit * 2)
2717         inside.add_stream(stream)
2718
2719         self.pg_enable_capture(self.pg_interfaces)
2720         self.pg_start()
2721
2722         capture = outside.get_capture(len(stream))
2723
2724     def test_show_max_translations(self):
2725         """NAT44ED API test - max translations per thread"""
2726         config = self.vapi.nat44_show_running_config()
2727         self.assertEqual(self.max_sessions, config.sessions)
2728
2729     def test_lru_cleanup(self):
2730         """NAT44ED LRU cleanup algorithm"""
2731
2732         self.nat_add_address(self.nat_addr)
2733         self.nat_add_inside_interface(self.pg0)
2734         self.nat_add_outside_interface(self.pg1)
2735
2736         self.vapi.nat_set_timeouts(
2737             udp=1, tcp_established=7440, tcp_transitory=30, icmp=1
2738         )
2739
2740         tcp_port_out = self.init_tcp_session(self.pg0, self.pg1, 2000, 80)
2741         pkts = []
2742         for i in range(0, self.max_sessions - 1):
2743             p = (
2744                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2745                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
2746                 / UDP(sport=7000 + i, dport=80)
2747             )
2748             pkts.append(p)
2749
2750         self.pg0.add_stream(pkts)
2751         self.pg_enable_capture(self.pg_interfaces)
2752         self.pg_start()
2753         self.pg1.get_capture(len(pkts))
2754         self.virtual_sleep(1.5, "wait for timeouts")
2755
2756         pkts = []
2757         for i in range(0, self.max_sessions - 1):
2758             p = (
2759                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2760                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
2761                 / ICMP(id=8000 + i, type="echo-request")
2762             )
2763             pkts.append(p)
2764
2765         self.pg0.add_stream(pkts)
2766         self.pg_enable_capture(self.pg_interfaces)
2767         self.pg_start()
2768         self.pg1.get_capture(len(pkts))
2769
2770     def test_session_rst_timeout(self):
2771         """NAT44ED session RST timeouts"""
2772
2773         self.nat_add_address(self.nat_addr)
2774         self.nat_add_inside_interface(self.pg0)
2775         self.nat_add_outside_interface(self.pg1)
2776
2777         self.vapi.nat_set_timeouts(
2778             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
2779         )
2780
2781         self.init_tcp_session(
2782             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
2783         )
2784         p = (
2785             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2786             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2787             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="R")
2788         )
2789         self.send_and_expect(self.pg0, p, self.pg1)
2790
2791         self.virtual_sleep(6)
2792
2793         # The session is already closed
2794         p = (
2795             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2796             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2797             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
2798         )
2799         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
2800
2801         # The session can be re-opened
2802         p = (
2803             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2804             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2805             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="S")
2806         )
2807         self.send_and_expect(self.pg0, p, self.pg1)
2808
2809     def test_session_rst_established_timeout(self):
2810         """NAT44ED session RST timeouts"""
2811
2812         self.nat_add_address(self.nat_addr)
2813         self.nat_add_inside_interface(self.pg0)
2814         self.nat_add_outside_interface(self.pg1)
2815
2816         self.vapi.nat_set_timeouts(
2817             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
2818         )
2819
2820         self.init_tcp_session(
2821             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
2822         )
2823
2824         # Wait at least the transitory time, the session is in established
2825         # state anyway. RST followed by a data packet should move it to
2826         # transitory state.
2827         self.virtual_sleep(6)
2828         p = (
2829             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2830             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2831             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="R")
2832         )
2833         self.send_and_expect(self.pg0, p, self.pg1)
2834
2835         p = (
2836             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2837             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2838             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
2839         )
2840         self.send_and_expect(self.pg0, p, self.pg1)
2841
2842         # State is transitory, session should be closed after 6 seconds
2843         self.virtual_sleep(6)
2844
2845         p = (
2846             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2847             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2848             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P")
2849         )
2850         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
2851
2852     def test_dynamic_out_of_ports(self):
2853         """NAT44ED dynamic translation test: out of ports"""
2854
2855         self.nat_add_inside_interface(self.pg0)
2856         self.nat_add_outside_interface(self.pg1)
2857
2858         # in2out and no NAT addresses added
2859         pkts = self.create_stream_in(self.pg0, self.pg1)
2860
2861         self.send_and_assert_no_replies(
2862             self.pg0,
2863             pkts,
2864             msg="i2o pkts",
2865             stats_diff=self.no_diff
2866             | {
2867                 "err": {
2868                     "/err/nat44-ed-in2out-slowpath/out of ports": len(pkts),
2869                 },
2870                 self.pg0.sw_if_index: {
2871                     "/nat44-ed/in2out/slowpath/drops": len(pkts),
2872                 },
2873             },
2874         )
2875
2876         # in2out after NAT addresses added
2877         self.nat_add_address(self.nat_addr)
2878
2879         tcpn, udpn, icmpn = (
2880             sum(x) for x in zip(*((TCP in p, UDP in p, ICMP in p) for p in pkts))
2881         )
2882
2883         self.send_and_expect(
2884             self.pg0,
2885             pkts,
2886             self.pg1,
2887             msg="i2o pkts",
2888             stats_diff=self.no_diff
2889             | {
2890                 "err": {
2891                     "/err/nat44-ed-in2out-slowpath/out of ports": 0,
2892                 },
2893                 self.pg0.sw_if_index: {
2894                     "/nat44-ed/in2out/slowpath/drops": 0,
2895                     "/nat44-ed/in2out/slowpath/tcp": tcpn,
2896                     "/nat44-ed/in2out/slowpath/udp": udpn,
2897                     "/nat44-ed/in2out/slowpath/icmp": icmpn,
2898                 },
2899             },
2900         )
2901
2902     def test_unknown_proto(self):
2903         """NAT44ED translate packet with unknown protocol"""
2904
2905         self.nat_add_address(self.nat_addr)
2906         self.nat_add_inside_interface(self.pg0)
2907         self.nat_add_outside_interface(self.pg1)
2908
2909         # in2out
2910         p = (
2911             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2912             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2913             / TCP(sport=self.tcp_port_in, dport=20)
2914         )
2915         self.pg0.add_stream(p)
2916         self.pg_enable_capture(self.pg_interfaces)
2917         self.pg_start()
2918         p = self.pg1.get_capture(1)
2919
2920         p = (
2921             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2922             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2923             / GRE()
2924             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
2925             / TCP(sport=1234, dport=1234)
2926         )
2927         self.pg0.add_stream(p)
2928         self.pg_enable_capture(self.pg_interfaces)
2929         self.pg_start()
2930         p = self.pg1.get_capture(1)
2931         packet = p[0]
2932         try:
2933             self.assertEqual(packet[IP].src, self.nat_addr)
2934             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2935             self.assertEqual(packet.haslayer(GRE), 1)
2936             self.assert_packet_checksums_valid(packet)
2937         except:
2938             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2939             raise
2940
2941         # out2in
2942         p = (
2943             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2944             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
2945             / GRE()
2946             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
2947             / TCP(sport=1234, dport=1234)
2948         )
2949         self.pg1.add_stream(p)
2950         self.pg_enable_capture(self.pg_interfaces)
2951         self.pg_start()
2952         p = self.pg0.get_capture(1)
2953         packet = p[0]
2954         try:
2955             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2956             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2957             self.assertEqual(packet.haslayer(GRE), 1)
2958             self.assert_packet_checksums_valid(packet)
2959         except:
2960             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2961             raise
2962
2963     def test_hairpinning_unknown_proto(self):
2964         """NAT44ED translate packet with unknown protocol - hairpinning"""
2965         host = self.pg0.remote_hosts[0]
2966         server = self.pg0.remote_hosts[1]
2967         host_in_port = 1234
2968         server_out_port = 8765
2969         server_nat_ip = "10.0.0.11"
2970
2971         self.nat_add_address(self.nat_addr)
2972         self.nat_add_inside_interface(self.pg0)
2973         self.nat_add_outside_interface(self.pg1)
2974
2975         # add static mapping for server
2976         self.nat_add_static_mapping(server.ip4, server_nat_ip)
2977
2978         # host to server
2979         p = (
2980             Ether(src=host.mac, dst=self.pg0.local_mac)
2981             / IP(src=host.ip4, dst=server_nat_ip)
2982             / TCP(sport=host_in_port, dport=server_out_port)
2983         )
2984         self.pg0.add_stream(p)
2985         self.pg_enable_capture(self.pg_interfaces)
2986         self.pg_start()
2987         self.pg0.get_capture(1)
2988
2989         p = (
2990             Ether(dst=self.pg0.local_mac, src=host.mac)
2991             / IP(src=host.ip4, dst=server_nat_ip)
2992             / GRE()
2993             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
2994             / TCP(sport=1234, dport=1234)
2995         )
2996         self.pg0.add_stream(p)
2997         self.pg_enable_capture(self.pg_interfaces)
2998         self.pg_start()
2999         p = self.pg0.get_capture(1)
3000         packet = p[0]
3001         try:
3002             self.assertEqual(packet[IP].src, self.nat_addr)
3003             self.assertEqual(packet[IP].dst, server.ip4)
3004             self.assertEqual(packet.haslayer(GRE), 1)
3005             self.assert_packet_checksums_valid(packet)
3006         except:
3007             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3008             raise
3009
3010         # server to host
3011         p = (
3012             Ether(dst=self.pg0.local_mac, src=server.mac)
3013             / IP(src=server.ip4, dst=self.nat_addr)
3014             / GRE()
3015             / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4)
3016             / TCP(sport=1234, dport=1234)
3017         )
3018         self.pg0.add_stream(p)
3019         self.pg_enable_capture(self.pg_interfaces)
3020         self.pg_start()
3021         p = self.pg0.get_capture(1)
3022         packet = p[0]
3023         try:
3024             self.assertEqual(packet[IP].src, server_nat_ip)
3025             self.assertEqual(packet[IP].dst, host.ip4)
3026             self.assertEqual(packet.haslayer(GRE), 1)
3027             self.assert_packet_checksums_valid(packet)
3028         except:
3029             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3030             raise
3031
3032     def test_output_feature_and_service(self):
3033         """NAT44ED interface output feature and services"""
3034         external_addr = "1.2.3.4"
3035         external_port = 80
3036         local_port = 8080
3037
3038         self.vapi.nat44_forwarding_enable_disable(enable=1)
3039         self.nat_add_address(self.nat_addr)
3040         flags = self.config_flags.NAT_IS_ADDR_ONLY
3041         self.vapi.nat44_add_del_identity_mapping(
3042             ip_address=self.pg1.remote_ip4,
3043             sw_if_index=0xFFFFFFFF,
3044             flags=flags,
3045             is_add=1,
3046         )
3047         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
3048         self.nat_add_static_mapping(
3049             self.pg0.remote_ip4,
3050             external_addr,
3051             local_port,
3052             external_port,
3053             proto=IP_PROTOS.tcp,
3054             flags=flags,
3055         )
3056
3057         self.nat_add_inside_interface(self.pg0)
3058         self.nat_add_outside_interface(self.pg0)
3059         self.vapi.nat44_ed_add_del_output_interface(
3060             sw_if_index=self.pg1.sw_if_index, is_add=1
3061         )
3062
3063         # from client to service
3064         p = (
3065             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3066             / IP(src=self.pg1.remote_ip4, dst=external_addr)
3067             / TCP(sport=12345, dport=external_port)
3068         )
3069         self.pg1.add_stream(p)
3070         self.pg_enable_capture(self.pg_interfaces)
3071         self.pg_start()
3072         capture = self.pg0.get_capture(1)
3073         p = capture[0]
3074         try:
3075             ip = p[IP]
3076             tcp = p[TCP]
3077             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3078             self.assertEqual(tcp.dport, local_port)
3079             self.assert_packet_checksums_valid(p)
3080         except:
3081             self.logger.error(ppp("Unexpected or invalid packet:", p))
3082             raise
3083
3084         # from service back to client
3085         p = (
3086             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3087             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3088             / TCP(sport=local_port, dport=12345)
3089         )
3090         self.pg0.add_stream(p)
3091         self.pg_enable_capture(self.pg_interfaces)
3092         self.pg_start()
3093         capture = self.pg1.get_capture(1)
3094         p = capture[0]
3095         try:
3096             ip = p[IP]
3097             tcp = p[TCP]
3098             self.assertEqual(ip.src, external_addr)
3099             self.assertEqual(tcp.sport, external_port)
3100             self.assert_packet_checksums_valid(p)
3101         except:
3102             self.logger.error(ppp("Unexpected or invalid packet:", p))
3103             raise
3104
3105         # from local network host to external network
3106         pkts = self.create_stream_in(self.pg0, self.pg1)
3107         self.pg0.add_stream(pkts)
3108         self.pg_enable_capture(self.pg_interfaces)
3109         self.pg_start()
3110         capture = self.pg1.get_capture(len(pkts))
3111         self.verify_capture_out(capture, ignore_port=True)
3112         pkts = self.create_stream_in(self.pg0, self.pg1)
3113         self.pg0.add_stream(pkts)
3114         self.pg_enable_capture(self.pg_interfaces)
3115         self.pg_start()
3116         capture = self.pg1.get_capture(len(pkts))
3117         self.verify_capture_out(capture, ignore_port=True)
3118
3119         # from external network back to local network host
3120         pkts = self.create_stream_out(self.pg1)
3121         self.pg1.add_stream(pkts)
3122         self.pg_enable_capture(self.pg_interfaces)
3123         self.pg_start()
3124         capture = self.pg0.get_capture(len(pkts))
3125         self.verify_capture_in(capture, self.pg0)
3126
3127     def test_output_feature_and_service3(self):
3128         """NAT44ED interface output feature and DST NAT"""
3129         external_addr = "1.2.3.4"
3130         external_port = 80
3131         local_port = 8080
3132
3133         self.vapi.nat44_forwarding_enable_disable(enable=1)
3134         self.nat_add_address(self.nat_addr)
3135         flags = self.config_flags.NAT_IS_OUT2IN_ONLY
3136         self.nat_add_static_mapping(
3137             self.pg1.remote_ip4,
3138             external_addr,
3139             local_port,
3140             external_port,
3141             proto=IP_PROTOS.tcp,
3142             flags=flags,
3143         )
3144
3145         self.nat_add_inside_interface(self.pg0)
3146         self.nat_add_outside_interface(self.pg0)
3147         self.vapi.nat44_ed_add_del_output_interface(
3148             sw_if_index=self.pg1.sw_if_index, is_add=1
3149         )
3150
3151         p = (
3152             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3153             / IP(src=self.pg0.remote_ip4, dst=external_addr)
3154             / TCP(sport=12345, dport=external_port)
3155         )
3156         self.pg0.add_stream(p)
3157         self.pg_enable_capture(self.pg_interfaces)
3158         self.pg_start()
3159         capture = self.pg1.get_capture(1)
3160         p = capture[0]
3161         try:
3162             ip = p[IP]
3163             tcp = p[TCP]
3164             self.assertEqual(ip.src, self.pg0.remote_ip4)
3165             self.assertEqual(tcp.sport, 12345)
3166             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3167             self.assertEqual(tcp.dport, local_port)
3168             self.assert_packet_checksums_valid(p)
3169         except:
3170             self.logger.error(ppp("Unexpected or invalid packet:", p))
3171             raise
3172
3173         p = (
3174             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3175             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
3176             / TCP(sport=local_port, dport=12345)
3177         )
3178         self.pg1.add_stream(p)
3179         self.pg_enable_capture(self.pg_interfaces)
3180         self.pg_start()
3181         capture = self.pg0.get_capture(1)
3182         p = capture[0]
3183         try:
3184             ip = p[IP]
3185             tcp = p[TCP]
3186             self.assertEqual(ip.src, external_addr)
3187             self.assertEqual(tcp.sport, external_port)
3188             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3189             self.assertEqual(tcp.dport, 12345)
3190             self.assert_packet_checksums_valid(p)
3191         except:
3192             self.logger.error(ppp("Unexpected or invalid packet:", p))
3193             raise
3194
3195     def test_self_twice_nat_lb_negative(self):
3196         """NAT44ED Self Twice NAT local service load balancing (negative test)"""
3197         self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, client_id=2)
3198
3199     def test_self_twice_nat_negative(self):
3200         """NAT44ED Self Twice NAT (negative test)"""
3201         self.twice_nat_common(self_twice_nat=True)
3202
3203     def test_static_lb_multi_clients(self):
3204         """NAT44ED local service load balancing - multiple clients"""
3205
3206         external_addr = self.nat_addr
3207         external_port = 80
3208         local_port = 8080
3209         server1 = self.pg0.remote_hosts[0]
3210         server2 = self.pg0.remote_hosts[1]
3211         server3 = self.pg0.remote_hosts[2]
3212
3213         locals = [
3214             {"addr": server1.ip4, "port": local_port, "probability": 90, "vrf_id": 0},
3215             {"addr": server2.ip4, "port": local_port, "probability": 10, "vrf_id": 0},
3216         ]
3217
3218         flags = self.config_flags.NAT_IS_INSIDE
3219         self.vapi.nat44_interface_add_del_feature(
3220             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3221         )
3222         self.vapi.nat44_interface_add_del_feature(
3223             sw_if_index=self.pg1.sw_if_index, is_add=1
3224         )
3225
3226         self.nat_add_address(self.nat_addr)
3227         self.vapi.nat44_add_del_lb_static_mapping(
3228             is_add=1,
3229             external_addr=external_addr,
3230             external_port=external_port,
3231             protocol=IP_PROTOS.tcp,
3232             local_num=len(locals),
3233             locals=locals,
3234         )
3235
3236         server1_n = 0
3237         server2_n = 0
3238         clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3239         pkts = []
3240         for client in clients:
3241             p = (
3242                 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3243                 / IP(src=client, dst=self.nat_addr)
3244                 / TCP(sport=12345, dport=external_port)
3245             )
3246             pkts.append(p)
3247         self.pg1.add_stream(pkts)
3248         self.pg_enable_capture(self.pg_interfaces)
3249         self.pg_start()
3250         capture = self.pg0.get_capture(len(pkts))
3251         for p in capture:
3252             if p[IP].dst == server1.ip4:
3253                 server1_n += 1
3254             else:
3255                 server2_n += 1
3256         self.assertGreaterEqual(server1_n, server2_n)
3257
3258         local = {
3259             "addr": server3.ip4,
3260             "port": local_port,
3261             "probability": 20,
3262             "vrf_id": 0,
3263         }
3264
3265         # add new back-end
3266         self.vapi.nat44_lb_static_mapping_add_del_local(
3267             is_add=1,
3268             external_addr=external_addr,
3269             external_port=external_port,
3270             local=local,
3271             protocol=IP_PROTOS.tcp,
3272         )
3273         server1_n = 0
3274         server2_n = 0
3275         server3_n = 0
3276         clients = ip4_range(self.pg1.remote_ip4, 60, 110)
3277         pkts = []
3278         for client in clients:
3279             p = (
3280                 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3281                 / IP(src=client, dst=self.nat_addr)
3282                 / TCP(sport=12346, dport=external_port)
3283             )
3284             pkts.append(p)
3285         self.assertGreater(len(pkts), 0)
3286         self.pg1.add_stream(pkts)
3287         self.pg_enable_capture(self.pg_interfaces)
3288         self.pg_start()
3289         capture = self.pg0.get_capture(len(pkts))
3290         for p in capture:
3291             if p[IP].dst == server1.ip4:
3292                 server1_n += 1
3293             elif p[IP].dst == server2.ip4:
3294                 server2_n += 1
3295             else:
3296                 server3_n += 1
3297         self.assertGreater(server1_n, 0)
3298         self.assertGreater(server2_n, 0)
3299         self.assertGreater(server3_n, 0)
3300
3301         local = {
3302             "addr": server2.ip4,
3303             "port": local_port,
3304             "probability": 10,
3305             "vrf_id": 0,
3306         }
3307
3308         # remove one back-end
3309         self.vapi.nat44_lb_static_mapping_add_del_local(
3310             is_add=0,
3311             external_addr=external_addr,
3312             external_port=external_port,
3313             local=local,
3314             protocol=IP_PROTOS.tcp,
3315         )
3316         server1_n = 0
3317         server2_n = 0
3318         server3_n = 0
3319         self.pg1.add_stream(pkts)
3320         self.pg_enable_capture(self.pg_interfaces)
3321         self.pg_start()
3322         capture = self.pg0.get_capture(len(pkts))
3323         for p in capture:
3324             if p[IP].dst == server1.ip4:
3325                 server1_n += 1
3326             elif p[IP].dst == server2.ip4:
3327                 server2_n += 1
3328             else:
3329                 server3_n += 1
3330         self.assertGreater(server1_n, 0)
3331         self.assertEqual(server2_n, 0)
3332         self.assertGreater(server3_n, 0)
3333
3334     # put zzz in front of syslog test name so that it runs as a last test
3335     # setting syslog sender cannot be undone and if it is set, it messes
3336     # with self.send_and_assert_no_replies functionality
3337     def test_zzz_syslog_sess(self):
3338         """NAT44ED Test syslog session creation and deletion"""
3339         self.vapi.syslog_set_filter(self.syslog_severity.SYSLOG_API_SEVERITY_INFO)
3340         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
3341
3342         self.nat_add_address(self.nat_addr)
3343         self.nat_add_inside_interface(self.pg0)
3344         self.nat_add_outside_interface(self.pg1)
3345
3346         p = (
3347             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3348             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3349             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
3350         )
3351         self.pg0.add_stream(p)
3352         self.pg_enable_capture(self.pg_interfaces)
3353         self.pg_start()
3354         capture = self.pg1.get_capture(1)
3355         self.tcp_port_out = capture[0][TCP].sport
3356         capture = self.pg3.get_capture(1)
3357         self.verify_syslog_sess(capture[0][Raw].load, "SADD")
3358
3359         self.pg_enable_capture(self.pg_interfaces)
3360         self.pg_start()
3361         self.nat_add_address(self.nat_addr, is_add=0)
3362         capture = self.pg3.get_capture(1)
3363         self.verify_syslog_sess(capture[0][Raw].load, "SDEL")
3364
3365     # put zzz in front of syslog test name so that it runs as a last test
3366     # setting syslog sender cannot be undone and if it is set, it messes
3367     # with self.send_and_assert_no_replies functionality
3368     def test_zzz_syslog_sess_reopen(self):
3369         """Syslog events for session reopen"""
3370         self.vapi.syslog_set_filter(self.syslog_severity.SYSLOG_API_SEVERITY_INFO)
3371         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
3372
3373         self.nat_add_address(self.nat_addr)
3374         self.nat_add_inside_interface(self.pg0)
3375         self.nat_add_outside_interface(self.pg1)
3376
3377         # SYN in2out
3378         p = (
3379             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3380             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3381             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
3382         )
3383         capture = self.send_and_expect(self.pg0, p, self.pg1)[0]
3384         self.tcp_port_out = capture[0][TCP].sport
3385         capture = self.pg3.get_capture(1)
3386         self.verify_syslog_sess(capture[0][Raw].load, "SADD")
3387
3388         # SYN out2in
3389         p = (
3390             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3391             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3392             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="SA")
3393         )
3394         self.send_and_expect(self.pg1, p, self.pg0)
3395
3396         p = (
3397             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3398             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3399             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")
3400         )
3401         self.send_and_expect(self.pg0, p, self.pg1)
3402
3403         # FIN in2out
3404         p = (
3405             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3406             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3407             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="F")
3408         )
3409         self.send_and_expect(self.pg0, p, self.pg1)
3410
3411         # FIN out2in
3412         p = (
3413             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3414             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3415             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="F")
3416         )
3417         self.send_and_expect(self.pg1, p, self.pg0)
3418
3419         self.init_tcp_session(
3420             self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port
3421         )
3422
3423         # 2 records should be produced - first one del & add
3424         capture = self.pg3.get_capture(2)
3425         self.verify_syslog_sess(capture[0][Raw].load, "SDEL")
3426         self.verify_syslog_sess(capture[1][Raw].load, "SADD")
3427
3428     def test_twice_nat_interface_addr(self):
3429         """NAT44ED Acquire twice NAT addresses from interface"""
3430         flags = self.config_flags.NAT_IS_TWICE_NAT
3431         self.vapi.nat44_add_del_interface_addr(
3432             sw_if_index=self.pg11.sw_if_index, flags=flags, is_add=1
3433         )
3434
3435         # no address in NAT pool
3436         adresses = self.vapi.nat44_address_dump()
3437         self.assertEqual(0, len(adresses))
3438
3439         # configure interface address and check NAT address pool
3440         self.pg11.config_ip4()
3441         adresses = self.vapi.nat44_address_dump()
3442         self.assertEqual(1, len(adresses))
3443         self.assertEqual(str(adresses[0].ip_address), self.pg11.local_ip4)
3444         self.assertEqual(adresses[0].flags, flags)
3445
3446         # remove interface address and check NAT address pool
3447         self.pg11.unconfig_ip4()
3448         adresses = self.vapi.nat44_address_dump()
3449         self.assertEqual(0, len(adresses))
3450
3451     def test_output_feature_stateful_acl(self):
3452         """NAT44ED output feature works with stateful ACL"""
3453
3454         self.nat_add_address(self.nat_addr)
3455         self.vapi.nat44_ed_add_del_output_interface(
3456             sw_if_index=self.pg1.sw_if_index, is_add=1
3457         )
3458
3459         # First ensure that the NAT is working sans ACL
3460
3461         # send packets out2in, no sessions yet so packets should drop
3462         pkts_out2in = self.create_stream_out(self.pg1)
3463         self.send_and_assert_no_replies(self.pg1, pkts_out2in)
3464
3465         # send packets into inside intf, ensure received via outside intf
3466         pkts_in2out = self.create_stream_in(self.pg0, self.pg1)
3467         capture = self.send_and_expect(
3468             self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)
3469         )
3470         self.verify_capture_out(capture, ignore_port=True)
3471
3472         # send out2in again, with sessions created it should work now
3473         pkts_out2in = self.create_stream_out(self.pg1)
3474         capture = self.send_and_expect(
3475             self.pg1, pkts_out2in, self.pg0, len(pkts_out2in)
3476         )
3477         self.verify_capture_in(capture, self.pg0)
3478
3479         # Create an ACL blocking everything
3480         out2in_deny_rule = AclRule(is_permit=0)
3481         out2in_acl = VppAcl(self, rules=[out2in_deny_rule])
3482         out2in_acl.add_vpp_config()
3483
3484         # create an ACL to permit/reflect everything
3485         in2out_reflect_rule = AclRule(is_permit=2)
3486         in2out_acl = VppAcl(self, rules=[in2out_reflect_rule])
3487         in2out_acl.add_vpp_config()
3488
3489         # apply as input acl on interface and confirm it blocks everything
3490         acl_if = VppAclInterface(
3491             self, sw_if_index=self.pg1.sw_if_index, n_input=1, acls=[out2in_acl]
3492         )
3493         acl_if.add_vpp_config()
3494         self.send_and_assert_no_replies(self.pg1, pkts_out2in)
3495
3496         # apply output acl
3497         acl_if.acls = [out2in_acl, in2out_acl]
3498         acl_if.add_vpp_config()
3499         # send in2out to generate ACL state (NAT state was created earlier)
3500         capture = self.send_and_expect(
3501             self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)
3502         )
3503         self.verify_capture_out(capture, ignore_port=True)
3504
3505         # send out2in again. ACL state exists so it should work now.
3506         # TCP packets with the syn flag set also need the ack flag
3507         for p in pkts_out2in:
3508             if p.haslayer(TCP) and p[TCP].flags & 0x02:
3509                 p[TCP].flags |= 0x10
3510         capture = self.send_and_expect(
3511             self.pg1, pkts_out2in, self.pg0, len(pkts_out2in)
3512         )
3513         self.verify_capture_in(capture, self.pg0)
3514         self.logger.info(self.vapi.cli("show trace"))
3515
3516     def test_tcp_close(self):
3517         """NAT44ED Close TCP session from inside network - output feature"""
3518         config = self.vapi.nat44_show_running_config()
3519         old_timeouts = config.timeouts
3520         new_transitory = 2
3521         self.vapi.nat_set_timeouts(
3522             udp=old_timeouts.udp,
3523             tcp_established=old_timeouts.tcp_established,
3524             icmp=old_timeouts.icmp,
3525             tcp_transitory=new_transitory,
3526         )
3527
3528         self.vapi.nat44_forwarding_enable_disable(enable=1)
3529         self.nat_add_address(self.pg1.local_ip4)
3530         twice_nat_addr = "10.0.1.3"
3531         service_ip = "192.168.16.150"
3532         self.nat_add_address(twice_nat_addr, twice_nat=1)
3533
3534         flags = self.config_flags.NAT_IS_INSIDE
3535         self.vapi.nat44_interface_add_del_feature(
3536             sw_if_index=self.pg0.sw_if_index, is_add=1
3537         )
3538         self.vapi.nat44_interface_add_del_feature(
3539             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3540         )
3541         self.vapi.nat44_ed_add_del_output_interface(
3542             is_add=1, sw_if_index=self.pg1.sw_if_index
3543         )
3544
3545         flags = (
3546             self.config_flags.NAT_IS_OUT2IN_ONLY | self.config_flags.NAT_IS_TWICE_NAT
3547         )
3548         self.nat_add_static_mapping(
3549             self.pg0.remote_ip4, service_ip, 80, 80, proto=IP_PROTOS.tcp, flags=flags
3550         )
3551         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3552         start_sessnum = len(sessions)
3553
3554         # SYN packet out->in
3555         p = (
3556             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3557             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3558             / TCP(sport=33898, dport=80, flags="S")
3559         )
3560         capture = self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3561         p = capture[0]
3562         tcp_port = p[TCP].sport
3563
3564         # SYN + ACK packet in->out
3565         p = (
3566             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3567             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3568             / TCP(sport=80, dport=tcp_port, flags="SA")
3569         )
3570         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3571
3572         # ACK packet out->in
3573         p = (
3574             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3575             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3576             / TCP(sport=33898, dport=80, flags="A")
3577         )
3578         self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3579
3580         # FIN packet in -> out
3581         p = (
3582             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3583             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3584             / TCP(sport=80, dport=tcp_port, flags="FA", seq=100, ack=300)
3585         )
3586         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3587
3588         # FIN+ACK packet out -> in
3589         p = (
3590             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3591             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3592             / TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101)
3593         )
3594         self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
3595
3596         # ACK packet in -> out
3597         p = (
3598             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3599             / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr)
3600             / TCP(sport=80, dport=tcp_port, flags="A", seq=101, ack=301)
3601         )
3602         self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
3603
3604         # session now in transitory timeout, but traffic still flows
3605         # try FIN packet out->in
3606         p = (
3607             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3608             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3609             / TCP(sport=33898, dport=80, flags="F")
3610         )
3611         self.pg1.add_stream(p)
3612         self.pg_enable_capture(self.pg_interfaces)
3613         self.pg_start()
3614
3615         self.virtual_sleep(new_transitory, "wait for transitory timeout")
3616         self.pg0.get_capture(1)
3617
3618         # session should still exist
3619         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3620         self.assertEqual(len(sessions) - start_sessnum, 1)
3621
3622         # send FIN+ACK packet out -> in - will cause session to be wiped
3623         # but won't create a new session
3624         p = (
3625             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3626             / IP(src=self.pg1.remote_ip4, dst=service_ip)
3627             / TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101)
3628         )
3629         self.send_and_assert_no_replies(self.pg1, p)
3630         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3631         self.assertEqual(len(sessions) - start_sessnum, 0)
3632
3633     def test_tcp_session_close_in(self):
3634         """NAT44ED Close TCP session from inside network"""
3635
3636         in_port = self.tcp_port_in
3637         out_port = 10505
3638         ext_port = self.tcp_external_port
3639
3640         self.nat_add_address(self.nat_addr)
3641         self.nat_add_inside_interface(self.pg0)
3642         self.nat_add_outside_interface(self.pg1)
3643         self.nat_add_static_mapping(
3644             self.pg0.remote_ip4,
3645             self.nat_addr,
3646             in_port,
3647             out_port,
3648             proto=IP_PROTOS.tcp,
3649             flags=self.config_flags.NAT_IS_TWICE_NAT,
3650         )
3651
3652         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3653         session_n = len(sessions)
3654
3655         self.vapi.nat_set_timeouts(
3656             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
3657         )
3658
3659         self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
3660
3661         # FIN packet in -> out
3662         p = (
3663             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3664             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3665             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
3666         )
3667         self.send_and_expect(self.pg0, p, self.pg1)
3668         pkts = []
3669
3670         # ACK packet out -> in
3671         p = (
3672             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3673             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3674             / TCP(sport=ext_port, dport=out_port, flags="A", seq=300, ack=101)
3675         )
3676         pkts.append(p)
3677
3678         # FIN packet out -> in
3679         p = (
3680             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3681             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3682             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
3683         )
3684         pkts.append(p)
3685
3686         self.send_and_expect(self.pg1, pkts, self.pg0)
3687
3688         # ACK packet in -> out
3689         p = (
3690             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3691             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3692             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3693         )
3694         self.send_and_expect(self.pg0, p, self.pg1)
3695
3696         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3697         self.assertEqual(len(sessions) - session_n, 1)
3698
3699         # retransmit FIN packet out -> in
3700         p = (
3701             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3702             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3703             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
3704         )
3705
3706         self.send_and_expect(self.pg1, p, self.pg0)
3707
3708         # retransmit ACK packet in -> out
3709         p = (
3710             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3711             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3712             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3713         )
3714         self.send_and_expect(self.pg0, p, self.pg1)
3715
3716         self.virtual_sleep(3)
3717         # retransmit ACK packet in -> out - this will cause session to be wiped
3718         p = (
3719             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3720             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3721             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3722         )
3723         self.send_and_assert_no_replies(self.pg0, p)
3724         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3725         self.assertEqual(len(sessions) - session_n, 0)
3726
3727     def test_tcp_session_close_out(self):
3728         """NAT44ED Close TCP session from outside network"""
3729
3730         in_port = self.tcp_port_in
3731         out_port = 10505
3732         ext_port = self.tcp_external_port
3733
3734         self.nat_add_address(self.nat_addr)
3735         self.nat_add_inside_interface(self.pg0)
3736         self.nat_add_outside_interface(self.pg1)
3737         self.nat_add_static_mapping(
3738             self.pg0.remote_ip4,
3739             self.nat_addr,
3740             in_port,
3741             out_port,
3742             proto=IP_PROTOS.tcp,
3743             flags=self.config_flags.NAT_IS_TWICE_NAT,
3744         )
3745
3746         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3747         session_n = len(sessions)
3748
3749         self.vapi.nat_set_timeouts(
3750             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
3751         )
3752
3753         _ = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
3754
3755         # FIN packet out -> in
3756         p = (
3757             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3758             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3759             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=100, ack=300)
3760         )
3761         self.pg1.add_stream(p)
3762         self.pg_enable_capture(self.pg_interfaces)
3763         self.pg_start()
3764         self.pg0.get_capture(1)
3765
3766         # FIN+ACK packet in -> out
3767         p = (
3768             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3769             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3770             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=300, ack=101)
3771         )
3772
3773         self.pg0.add_stream(p)
3774         self.pg_enable_capture(self.pg_interfaces)
3775         self.pg_start()
3776         self.pg1.get_capture(1)
3777
3778         # ACK packet out -> in
3779         p = (
3780             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3781             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3782             / TCP(sport=ext_port, dport=out_port, flags="A", seq=101, ack=301)
3783         )
3784         self.pg1.add_stream(p)
3785         self.pg_enable_capture(self.pg_interfaces)
3786         self.pg_start()
3787         self.pg0.get_capture(1)
3788
3789         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3790         self.assertEqual(len(sessions) - session_n, 1)
3791
3792         # retransmit FIN packet out -> in
3793         p = (
3794             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3795             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3796             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
3797         )
3798         self.send_and_expect(self.pg1, p, self.pg0)
3799
3800         # retransmit ACK packet in -> out
3801         p = (
3802             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3803             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3804             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3805         )
3806         self.send_and_expect(self.pg0, p, self.pg1)
3807
3808         self.virtual_sleep(3)
3809         # retransmit ACK packet in -> out - this will cause session to be wiped
3810         p = (
3811             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3812             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3813             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3814         )
3815         self.send_and_assert_no_replies(self.pg0, p)
3816         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3817         self.assertEqual(len(sessions) - session_n, 0)
3818
3819     def test_tcp_session_close_simultaneous(self):
3820         """Simultaneous TCP close from both sides"""
3821
3822         in_port = self.tcp_port_in
3823         ext_port = 10505
3824
3825         self.nat_add_address(self.nat_addr)
3826         self.nat_add_inside_interface(self.pg0)
3827         self.nat_add_outside_interface(self.pg1)
3828         self.nat_add_static_mapping(
3829             self.pg0.remote_ip4,
3830             self.nat_addr,
3831             in_port,
3832             ext_port,
3833             proto=IP_PROTOS.tcp,
3834             flags=self.config_flags.NAT_IS_TWICE_NAT,
3835         )
3836
3837         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3838         session_n = len(sessions)
3839
3840         self.vapi.nat_set_timeouts(
3841             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
3842         )
3843
3844         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
3845
3846         # FIN packet in -> out
3847         p = (
3848             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3849             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3850             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
3851         )
3852         self.send_and_expect(self.pg0, p, self.pg1)
3853
3854         # FIN packet out -> in
3855         p = (
3856             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3857             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3858             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
3859         )
3860         self.send_and_expect(self.pg1, p, self.pg0)
3861
3862         # ACK packet in -> out
3863         p = (
3864             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3865             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3866             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3867         )
3868         self.send_and_expect(self.pg0, p, self.pg1)
3869
3870         # ACK packet out -> in
3871         p = (
3872             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3873             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3874             / TCP(sport=ext_port, dport=out_port, flags="A", seq=301, ack=101)
3875         )
3876         self.send_and_expect(self.pg1, p, self.pg0)
3877
3878         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3879         self.assertEqual(len(sessions) - session_n, 1)
3880
3881         # retransmit FIN packet out -> in
3882         p = (
3883             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3884             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3885             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101)
3886         )
3887         self.send_and_expect(self.pg1, p, self.pg0)
3888
3889         # retransmit ACK packet in -> out
3890         p = (
3891             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3892             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3893             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3894         )
3895         self.send_and_expect(self.pg0, p, self.pg1)
3896
3897         self.virtual_sleep(3)
3898         # retransmit ACK packet in -> out - this will cause session to be wiped
3899         p = (
3900             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3901             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3902             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3903         )
3904         self.pg_send(self.pg0, p)
3905         self.send_and_assert_no_replies(self.pg0, p)
3906         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3907         self.assertEqual(len(sessions) - session_n, 0)
3908
3909     def test_tcp_session_half_reopen_inside(self):
3910         """TCP session in FIN/FIN state not reopened by in2out SYN only"""
3911         in_port = self.tcp_port_in
3912         ext_port = 10505
3913
3914         self.nat_add_address(self.nat_addr)
3915         self.nat_add_inside_interface(self.pg0)
3916         self.nat_add_outside_interface(self.pg1)
3917         self.nat_add_static_mapping(
3918             self.pg0.remote_ip4,
3919             self.nat_addr,
3920             in_port,
3921             ext_port,
3922             proto=IP_PROTOS.tcp,
3923             flags=self.config_flags.NAT_IS_TWICE_NAT,
3924         )
3925
3926         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3927         session_n = len(sessions)
3928
3929         self.vapi.nat_set_timeouts(
3930             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
3931         )
3932
3933         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
3934
3935         # FIN packet in -> out
3936         p = (
3937             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3938             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3939             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
3940         )
3941         self.send_and_expect(self.pg0, p, self.pg1)
3942
3943         # FIN packet out -> in
3944         p = (
3945             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
3946             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3947             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
3948         )
3949         self.send_and_expect(self.pg1, p, self.pg0)
3950
3951         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3952         self.assertEqual(len(sessions) - session_n, 1)
3953
3954         # send SYN packet in -> out
3955         p = (
3956             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3957             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3958             / TCP(sport=in_port, dport=ext_port, flags="S", seq=101, ack=301)
3959         )
3960         self.send_and_expect(self.pg0, p, self.pg1)
3961
3962         self.virtual_sleep(3)
3963         # send ACK packet in -> out - session should be wiped
3964         p = (
3965             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3966             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3967             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
3968         )
3969         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
3970         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3971         self.assertEqual(len(sessions) - session_n, 0)
3972
3973     def test_tcp_session_half_reopen_outside(self):
3974         """TCP session in FIN/FIN state not reopened by out2in SYN only"""
3975         in_port = self.tcp_port_in
3976         ext_port = 10505
3977
3978         self.nat_add_address(self.nat_addr)
3979         self.nat_add_inside_interface(self.pg0)
3980         self.nat_add_outside_interface(self.pg1)
3981         self.nat_add_static_mapping(
3982             self.pg0.remote_ip4,
3983             self.nat_addr,
3984             in_port,
3985             ext_port,
3986             proto=IP_PROTOS.tcp,
3987             flags=self.config_flags.NAT_IS_TWICE_NAT,
3988         )
3989
3990         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3991         session_n = len(sessions)
3992
3993         self.vapi.nat_set_timeouts(
3994             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
3995         )
3996
3997         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
3998
3999         # FIN packet in -> out
4000         p = (
4001             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4002             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4003             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4004         )
4005         self.send_and_expect(self.pg0, p, self.pg1)
4006
4007         # FIN packet out -> in
4008         p = (
4009             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4010             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4011             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4012         )
4013         self.send_and_expect(self.pg1, p, self.pg0)
4014
4015         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4016         self.assertEqual(len(sessions) - session_n, 1)
4017
4018         # send SYN packet out -> in
4019         p = (
4020             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4021             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4022             / TCP(sport=ext_port, dport=out_port, flags="S", seq=300, ack=101)
4023         )
4024         self.send_and_expect(self.pg1, p, self.pg0)
4025
4026         self.virtual_sleep(3)
4027         # send ACK packet in -> out - session should be wiped
4028         p = (
4029             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4030             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4031             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4032         )
4033         self.send_and_assert_no_replies(self.pg0, p, self.pg1)
4034         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4035         self.assertEqual(len(sessions) - session_n, 0)
4036
4037     def test_tcp_session_reopen(self):
4038         """TCP session in FIN/FIN state reopened by SYN from both sides"""
4039         in_port = self.tcp_port_in
4040         ext_port = 10505
4041
4042         self.nat_add_address(self.nat_addr)
4043         self.nat_add_inside_interface(self.pg0)
4044         self.nat_add_outside_interface(self.pg1)
4045         self.nat_add_static_mapping(
4046             self.pg0.remote_ip4,
4047             self.nat_addr,
4048             in_port,
4049             ext_port,
4050             proto=IP_PROTOS.tcp,
4051             flags=self.config_flags.NAT_IS_TWICE_NAT,
4052         )
4053
4054         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4055         session_n = len(sessions)
4056
4057         self.vapi.nat_set_timeouts(
4058             udp=300, tcp_established=7440, tcp_transitory=2, icmp=5
4059         )
4060
4061         out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
4062
4063         # FIN packet in -> out
4064         p = (
4065             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4066             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4067             / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300)
4068         )
4069         self.send_and_expect(self.pg0, p, self.pg1)
4070
4071         # FIN packet out -> in
4072         p = (
4073             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4074             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4075             / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100)
4076         )
4077         self.send_and_expect(self.pg1, p, self.pg0)
4078
4079         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4080         self.assertEqual(len(sessions) - session_n, 1)
4081
4082         # send SYN packet out -> in
4083         p = (
4084             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4085             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4086             / TCP(sport=ext_port, dport=out_port, flags="S", seq=300, ack=101)
4087         )
4088         self.send_and_expect(self.pg1, p, self.pg0)
4089
4090         # send SYN packet in -> out
4091         p = (
4092             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4093             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4094             / TCP(sport=in_port, dport=ext_port, flags="SA", seq=101, ack=301)
4095         )
4096         self.send_and_expect(self.pg0, p, self.pg1)
4097
4098         # send ACK packet out -> in
4099         p = (
4100             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4101             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4102             / TCP(sport=ext_port, dport=out_port, flags="A", seq=300, ack=101)
4103         )
4104         self.send_and_expect(self.pg1, p, self.pg0)
4105
4106         self.virtual_sleep(3)
4107         # send ACK packet in -> out - should be forwarded and session alive
4108         p = (
4109             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4110             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4111             / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301)
4112         )
4113         self.send_and_expect(self.pg0, p, self.pg1)
4114         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
4115         self.assertEqual(len(sessions) - session_n, 1)
4116
4117     def test_dynamic_vrf(self):
4118         """NAT44ED dynamic translation test: different VRF"""
4119
4120         vrf_id_in = 33
4121         vrf_id_out = 34
4122
4123         self.nat_add_address(self.nat_addr, vrf_id=vrf_id_in)
4124
4125         try:
4126             self.configure_ip4_interface(self.pg7, table_id=vrf_id_in)
4127             self.configure_ip4_interface(self.pg8, table_id=vrf_id_out)
4128
4129             self.nat_add_inside_interface(self.pg7)
4130             self.nat_add_outside_interface(self.pg8)
4131
4132             # just basic stuff nothing special
4133             pkts = self.create_stream_in(self.pg7, self.pg8)
4134             self.pg7.add_stream(pkts)
4135             self.pg_enable_capture(self.pg_interfaces)
4136             self.pg_start()
4137             capture = self.pg8.get_capture(len(pkts))
4138             self.verify_capture_out(capture, ignore_port=True)
4139
4140             pkts = self.create_stream_out(self.pg8)
4141             self.pg8.add_stream(pkts)
4142             self.pg_enable_capture(self.pg_interfaces)
4143             self.pg_start()
4144             capture = self.pg7.get_capture(len(pkts))
4145             self.verify_capture_in(capture, self.pg7)
4146
4147         finally:
4148             self.pg7.unconfig()
4149             self.pg8.unconfig()
4150
4151             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id_in})
4152             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id_out})
4153
4154     def test_dynamic_output_feature_vrf(self):
4155         """NAT44ED dynamic translation test: output-feature, VRF"""
4156
4157         # other then default (0)
4158         new_vrf_id = 22
4159
4160         self.nat_add_address(self.nat_addr)
4161         self.vapi.nat44_ed_add_del_output_interface(
4162             sw_if_index=self.pg8.sw_if_index, is_add=1
4163         )
4164         try:
4165             self.configure_ip4_interface(self.pg7, table_id=new_vrf_id)
4166             self.configure_ip4_interface(self.pg8, table_id=new_vrf_id)
4167
4168             # in2out
4169             tcpn = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
4170             udpn = self.statistics["/nat44-ed/in2out/slowpath/udp"]
4171             icmpn = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
4172             drops = self.statistics["/nat44-ed/in2out/slowpath/drops"]
4173
4174             pkts = self.create_stream_in(self.pg7, self.pg8)
4175             self.pg7.add_stream(pkts)
4176             self.pg_enable_capture(self.pg_interfaces)
4177             self.pg_start()
4178             capture = self.pg8.get_capture(len(pkts))
4179             self.verify_capture_out(capture, ignore_port=True)
4180
4181             if_idx = self.pg8.sw_if_index
4182             cnt = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
4183             self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
4184             cnt = self.statistics["/nat44-ed/in2out/slowpath/udp"]
4185             self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
4186             cnt = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
4187             self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
4188             cnt = self.statistics["/nat44-ed/in2out/slowpath/drops"]
4189             self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
4190
4191             # out2in
4192             tcpn = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
4193             udpn = self.statistics["/nat44-ed/out2in/fastpath/udp"]
4194             icmpn = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
4195             drops = self.statistics["/nat44-ed/out2in/fastpath/drops"]
4196
4197             pkts = self.create_stream_out(self.pg8)
4198             self.pg8.add_stream(pkts)
4199             self.pg_enable_capture(self.pg_interfaces)
4200             self.pg_start()
4201             capture = self.pg7.get_capture(len(pkts))
4202             self.verify_capture_in(capture, self.pg7)
4203
4204             if_idx = self.pg8.sw_if_index
4205             cnt = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
4206             self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
4207             cnt = self.statistics["/nat44-ed/out2in/fastpath/udp"]
4208             self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
4209             cnt = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
4210             self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
4211             cnt = self.statistics["/nat44-ed/out2in/fastpath/drops"]
4212             self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
4213
4214             sessions = self.statistics["/nat44-ed/total-sessions"]
4215             self.assertEqual(sessions[:, 0].sum(), 3)
4216
4217         finally:
4218             self.pg7.unconfig()
4219             self.pg8.unconfig()
4220
4221             self.vapi.ip_table_add_del(is_add=0, table={"table_id": new_vrf_id})
4222
4223     def test_next_src_nat(self):
4224         """NAT44ED On way back forward packet to nat44-in2out node."""
4225
4226         twice_nat_addr = "10.0.1.3"
4227         external_port = 80
4228         local_port = 8080
4229         post_twice_nat_port = 0
4230
4231         self.vapi.nat44_forwarding_enable_disable(enable=1)
4232         self.nat_add_address(twice_nat_addr, twice_nat=1)
4233         flags = (
4234             self.config_flags.NAT_IS_OUT2IN_ONLY
4235             | self.config_flags.NAT_IS_SELF_TWICE_NAT
4236         )
4237         self.nat_add_static_mapping(
4238             self.pg6.remote_ip4,
4239             self.pg1.remote_ip4,
4240             local_port,
4241             external_port,
4242             proto=IP_PROTOS.tcp,
4243             vrf_id=1,
4244             flags=flags,
4245         )
4246         self.vapi.nat44_interface_add_del_feature(
4247             sw_if_index=self.pg6.sw_if_index, is_add=1
4248         )
4249
4250         p = (
4251             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
4252             / IP(src=self.pg6.remote_ip4, dst=self.pg1.remote_ip4)
4253             / TCP(sport=12345, dport=external_port)
4254         )
4255         self.pg6.add_stream(p)
4256         self.pg_enable_capture(self.pg_interfaces)
4257         self.pg_start()
4258         capture = self.pg6.get_capture(1)
4259         p = capture[0]
4260         try:
4261             ip = p[IP]
4262             tcp = p[TCP]
4263             self.assertEqual(ip.src, twice_nat_addr)
4264             self.assertNotEqual(tcp.sport, 12345)
4265             post_twice_nat_port = tcp.sport
4266             self.assertEqual(ip.dst, self.pg6.remote_ip4)
4267             self.assertEqual(tcp.dport, local_port)
4268             self.assert_packet_checksums_valid(p)
4269         except:
4270             self.logger.error(ppp("Unexpected or invalid packet:", p))
4271             raise
4272
4273         p = (
4274             Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac)
4275             / IP(src=self.pg6.remote_ip4, dst=twice_nat_addr)
4276             / TCP(sport=local_port, dport=post_twice_nat_port)
4277         )
4278         self.pg6.add_stream(p)
4279         self.pg_enable_capture(self.pg_interfaces)
4280         self.pg_start()
4281         capture = self.pg6.get_capture(1)
4282         p = capture[0]
4283         try:
4284             ip = p[IP]
4285             tcp = p[TCP]
4286             self.assertEqual(ip.src, self.pg1.remote_ip4)
4287             self.assertEqual(tcp.sport, external_port)
4288             self.assertEqual(ip.dst, self.pg6.remote_ip4)
4289             self.assertEqual(tcp.dport, 12345)
4290             self.assert_packet_checksums_valid(p)
4291         except:
4292             self.logger.error(ppp("Unexpected or invalid packet:", p))
4293             raise
4294
4295     def test_one_armed_nat44_static(self):
4296         """NAT44ED One armed NAT and 1:1 NAPT asymmetrical rule"""
4297
4298         remote_host = self.pg4.remote_hosts[0]
4299         local_host = self.pg4.remote_hosts[1]
4300         external_port = 80
4301         local_port = 8080
4302         eh_port_in = 0
4303
4304         self.vapi.nat44_forwarding_enable_disable(enable=1)
4305         self.nat_add_address(self.nat_addr, twice_nat=1)
4306         flags = (
4307             self.config_flags.NAT_IS_OUT2IN_ONLY | self.config_flags.NAT_IS_TWICE_NAT
4308         )
4309         self.nat_add_static_mapping(
4310             local_host.ip4,
4311             self.nat_addr,
4312             local_port,
4313             external_port,
4314             proto=IP_PROTOS.tcp,
4315             flags=flags,
4316         )
4317         flags = self.config_flags.NAT_IS_INSIDE
4318         self.vapi.nat44_interface_add_del_feature(
4319             sw_if_index=self.pg4.sw_if_index, is_add=1
4320         )
4321         self.vapi.nat44_interface_add_del_feature(
4322             sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
4323         )
4324
4325         # from client to service
4326         p = (
4327             Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
4328             / IP(src=remote_host.ip4, dst=self.nat_addr)
4329             / TCP(sport=12345, dport=external_port)
4330         )
4331         self.pg4.add_stream(p)
4332         self.pg_enable_capture(self.pg_interfaces)
4333         self.pg_start()
4334         capture = self.pg4.get_capture(1)
4335         p = capture[0]
4336         try:
4337             ip = p[IP]
4338             tcp = p[TCP]
4339             self.assertEqual(ip.dst, local_host.ip4)
4340             self.assertEqual(ip.src, self.nat_addr)
4341             self.assertEqual(tcp.dport, local_port)
4342             self.assertNotEqual(tcp.sport, 12345)
4343             eh_port_in = tcp.sport
4344             self.assert_packet_checksums_valid(p)
4345         except:
4346             self.logger.error(ppp("Unexpected or invalid packet:", p))
4347             raise
4348
4349         # from service back to client
4350         p = (
4351             Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
4352             / IP(src=local_host.ip4, dst=self.nat_addr)
4353             / TCP(sport=local_port, dport=eh_port_in)
4354         )
4355         self.pg4.add_stream(p)
4356         self.pg_enable_capture(self.pg_interfaces)
4357         self.pg_start()
4358         capture = self.pg4.get_capture(1)
4359         p = capture[0]
4360         try:
4361             ip = p[IP]
4362             tcp = p[TCP]
4363             self.assertEqual(ip.src, self.nat_addr)
4364             self.assertEqual(ip.dst, remote_host.ip4)
4365             self.assertEqual(tcp.sport, external_port)
4366             self.assertEqual(tcp.dport, 12345)
4367             self.assert_packet_checksums_valid(p)
4368         except:
4369             self.logger.error(ppp("Unexpected or invalid packet:", p))
4370             raise
4371
4372     def test_icmp_error_fwd_outbound(self):
4373         """NAT44ED ICMP error outbound with forwarding enabled"""
4374
4375         # Ensure that an outbound ICMP error message is properly associated
4376         # with the inbound forward bypass session it is related to.
4377         payload = "H" * 10
4378
4379         self.nat_add_address(self.nat_addr)
4380         self.nat_add_inside_interface(self.pg0)
4381         self.nat_add_outside_interface(self.pg1)
4382
4383         # enable forwarding and initiate connection out2in
4384         self.vapi.nat44_forwarding_enable_disable(enable=1)
4385         p1 = (
4386             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4387             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
4388             / UDP(sport=21, dport=20)
4389             / payload
4390         )
4391
4392         self.pg1.add_stream(p1)
4393         self.pg_enable_capture(self.pg_interfaces)
4394         self.pg_start()
4395         capture = self.pg0.get_capture(1)[0]
4396
4397         self.logger.info(self.vapi.cli("show nat44 sessions"))
4398
4399         # reply with ICMP error message in2out
4400         # We cannot reliably retrieve forward bypass sessions via the API.
4401         # session dumps for a user will only look on the worker that the
4402         # user is supposed to be mapped to in2out. The forward bypass session
4403         # is not necessarily created on that worker.
4404         p2 = (
4405             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4406             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4407             / ICMP(type="dest-unreach", code="port-unreachable")
4408             / capture[IP:]
4409         )
4410
4411         self.pg0.add_stream(p2)
4412         self.pg_enable_capture(self.pg_interfaces)
4413         self.pg_start()
4414         capture = self.pg1.get_capture(1)[0]
4415
4416         self.logger.info(self.vapi.cli("show nat44 sessions"))
4417
4418         self.logger.info(ppp("p1 packet:", p1))
4419         self.logger.info(ppp("p2 packet:", p2))
4420         self.logger.info(ppp("capture packet:", capture))
4421
4422     def test_tcp_session_open_retransmit1(self):
4423         """NAT44ED Open TCP session with SYN,ACK retransmit 1
4424
4425         The client does not receive the [SYN,ACK] or the
4426         ACK from the client is lost. Therefore, the [SYN, ACK]
4427         is retransmitted by the server.
4428         """
4429
4430         in_port = self.tcp_port_in
4431         ext_port = self.tcp_external_port
4432         payload = "H" * 10
4433
4434         self.nat_add_address(self.nat_addr)
4435         self.nat_add_inside_interface(self.pg0)
4436         self.nat_add_outside_interface(self.pg1)
4437
4438         self.vapi.nat_set_timeouts(
4439             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
4440         )
4441         # SYN packet in->out
4442         p = (
4443             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4444             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4445             / TCP(sport=in_port, dport=ext_port, flags="S")
4446         )
4447         p = self.send_and_expect(self.pg0, p, self.pg1)[0]
4448         out_port = p[TCP].sport
4449
4450         # SYN + ACK packet out->in
4451         p = (
4452             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4453             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4454             / TCP(sport=ext_port, dport=out_port, flags="SA")
4455         )
4456         self.send_and_expect(self.pg1, p, self.pg0)
4457
4458         # ACK in->out does not arrive
4459
4460         # resent SYN + ACK packet out->in
4461         p = (
4462             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4463             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4464             / TCP(sport=ext_port, dport=out_port, flags="SA")
4465         )
4466         self.send_and_expect(self.pg1, p, self.pg0)
4467
4468         # ACK packet in->out
4469         p = (
4470             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4471             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4472             / TCP(sport=in_port, dport=ext_port, flags="A")
4473         )
4474         self.send_and_expect(self.pg0, p, self.pg1)
4475
4476         # Verify that the data can be transmitted after the transitory time
4477         self.virtual_sleep(6)
4478
4479         p = (
4480             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4481             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4482             / TCP(sport=in_port, dport=ext_port, flags="PA")
4483             / Raw(payload)
4484         )
4485         self.send_and_expect(self.pg0, p, self.pg1)
4486
4487     def test_tcp_session_open_retransmit2(self):
4488         """NAT44ED Open TCP session with SYN,ACK retransmit 2
4489
4490         The ACK is lost to the server after the TCP session is opened.
4491         Data is sent by the client, then the [SYN,ACK] is
4492         retransmitted by the server.
4493         """
4494
4495         in_port = self.tcp_port_in
4496         ext_port = self.tcp_external_port
4497         payload = "H" * 10
4498
4499         self.nat_add_address(self.nat_addr)
4500         self.nat_add_inside_interface(self.pg0)
4501         self.nat_add_outside_interface(self.pg1)
4502
4503         self.vapi.nat_set_timeouts(
4504             udp=300, tcp_established=7440, tcp_transitory=5, icmp=60
4505         )
4506         # SYN packet in->out
4507         p = (
4508             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4509             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4510             / TCP(sport=in_port, dport=ext_port, flags="S")
4511         )
4512         p = self.send_and_expect(self.pg0, p, self.pg1)[0]
4513         out_port = p[TCP].sport
4514
4515         # SYN + ACK packet out->in
4516         p = (
4517             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4518             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4519             / TCP(sport=ext_port, dport=out_port, flags="SA")
4520         )
4521         self.send_and_expect(self.pg1, p, self.pg0)
4522
4523         # ACK packet in->out -- not received by the server
4524         p = (
4525             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4526             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4527             / TCP(sport=in_port, dport=ext_port, flags="A")
4528         )
4529         self.send_and_expect(self.pg0, p, self.pg1)
4530
4531         # PUSH + ACK packet in->out
4532         p = (
4533             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4534             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4535             / TCP(sport=in_port, dport=ext_port, flags="PA")
4536             / Raw(payload)
4537         )
4538         self.send_and_expect(self.pg0, p, self.pg1)
4539
4540         # resent SYN + ACK packet out->in
4541         p = (
4542             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4543             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4544             / TCP(sport=ext_port, dport=out_port, flags="SA")
4545         )
4546         self.send_and_expect(self.pg1, p, self.pg0)
4547
4548         # resent ACK packet in->out
4549         p = (
4550             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4551             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4552             / TCP(sport=in_port, dport=ext_port, flags="A")
4553         )
4554         self.send_and_expect(self.pg0, p, self.pg1)
4555
4556         # resent PUSH + ACK packet in->out
4557         p = (
4558             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4559             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4560             / TCP(sport=in_port, dport=ext_port, flags="PA")
4561             / Raw(payload)
4562         )
4563         self.send_and_expect(self.pg0, p, self.pg1)
4564
4565         # ACK packet out->in
4566         p = (
4567             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
4568             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
4569             / TCP(sport=ext_port, dport=out_port, flags="A")
4570         )
4571         self.send_and_expect(self.pg1, p, self.pg0)
4572
4573         # Verify that the data can be transmitted after the transitory time
4574         self.virtual_sleep(6)
4575
4576         p = (
4577             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4578             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
4579             / TCP(sport=in_port, dport=ext_port, flags="PA")
4580             / Raw(payload)
4581         )
4582         self.send_and_expect(self.pg0, p, self.pg1)
4583
4584
4585 if __name__ == "__main__":
4586     unittest.main(testRunner=VppTestRunner)