tests: disable failing tests on ubuntu-22.04 debian-11
[vpp.git] / test / test_nat44_ei.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import random
5 import socket
6 import struct
7 import unittest
8 from io import BytesIO
9
10 import scapy.compat
11 from framework import tag_fixme_debian11, is_distro_debian11
12 from framework import VppTestCase, VppTestRunner
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from scapy.all import (
15     bind_layers,
16     Packet,
17     ByteEnumField,
18     ShortField,
19     IPField,
20     IntField,
21     LongField,
22     XByteField,
23     FlagsField,
24     FieldLenField,
25     PacketListField,
26 )
27 from scapy.data import IP_PROTOS
28 from scapy.layers.inet import IP, TCP, UDP, ICMP
29 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
30 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
31 from scapy.layers.l2 import Ether, ARP, GRE
32 from scapy.packet import Raw
33 from syslog_rfc5424_parser import SyslogMessage, ParseError
34 from syslog_rfc5424_parser.constants import SyslogSeverity
35 from util import ppp
36 from vpp_ip_route import VppIpRoute, VppRoutePath
37 from vpp_neighbor import VppNeighbor
38 from vpp_papi import VppEnum
39
40
41 # NAT HA protocol event data
42 class Event(Packet):
43     name = "Event"
44     fields_desc = [
45         ByteEnumField("event_type", None, {1: "add", 2: "del", 3: "refresh"}),
46         ByteEnumField("protocol", None, {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}),
47         ShortField("flags", 0),
48         IPField("in_addr", None),
49         IPField("out_addr", None),
50         ShortField("in_port", None),
51         ShortField("out_port", None),
52         IPField("eh_addr", None),
53         IPField("ehn_addr", None),
54         ShortField("eh_port", None),
55         ShortField("ehn_port", None),
56         IntField("fib_index", None),
57         IntField("total_pkts", 0),
58         LongField("total_bytes", 0),
59     ]
60
61     def extract_padding(self, s):
62         return "", s
63
64
65 # NAT HA protocol header
66 class HANATStateSync(Packet):
67     name = "HA NAT state sync"
68     fields_desc = [
69         XByteField("version", 1),
70         FlagsField("flags", 0, 8, ["ACK"]),
71         FieldLenField("count", None, count_of="events"),
72         IntField("sequence_number", 1),
73         IntField("thread_index", 0),
74         PacketListField("events", [], Event, count_from=lambda pkt: pkt.count),
75     ]
76
77
78 class MethodHolder(VppTestCase):
79     """NAT create capture and verify method holder"""
80
81     @property
82     def config_flags(self):
83         return VppEnum.vl_api_nat44_ei_config_flags_t
84
85     @property
86     def SYSLOG_SEVERITY(self):
87         return VppEnum.vl_api_syslog_severity_t
88
89     def nat44_add_static_mapping(
90         self,
91         local_ip,
92         external_ip="0.0.0.0",
93         local_port=0,
94         external_port=0,
95         vrf_id=0,
96         is_add=1,
97         external_sw_if_index=0xFFFFFFFF,
98         proto=0,
99         tag="",
100         flags=0,
101     ):
102         """
103         Add/delete NAT44EI static mapping
104
105         :param local_ip: Local IP address
106         :param external_ip: External IP address
107         :param local_port: Local port number (Optional)
108         :param external_port: External port number (Optional)
109         :param vrf_id: VRF ID (Default 0)
110         :param is_add: 1 if add, 0 if delete (Default add)
111         :param external_sw_if_index: External interface instead of IP address
112         :param proto: IP protocol (Mandatory if port specified)
113         :param tag: Opaque string tag
114         :param flags: NAT configuration flags
115         """
116
117         if not (local_port and external_port):
118             flags |= self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
119
120         self.vapi.nat44_ei_add_del_static_mapping(
121             is_add=is_add,
122             local_ip_address=local_ip,
123             external_ip_address=external_ip,
124             external_sw_if_index=external_sw_if_index,
125             local_port=local_port,
126             external_port=external_port,
127             vrf_id=vrf_id,
128             protocol=proto,
129             flags=flags,
130             tag=tag,
131         )
132
133     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
134         """
135         Add/delete NAT44EI address
136
137         :param ip: IP address
138         :param is_add: 1 if add, 0 if delete (Default add)
139         """
140         self.vapi.nat44_ei_add_del_address_range(
141             first_ip_address=ip, last_ip_address=ip, vrf_id=vrf_id, is_add=is_add
142         )
143
144     def create_routes_and_neigbors(self):
145         r1 = VppIpRoute(
146             self,
147             self.pg7.remote_ip4,
148             32,
149             [VppRoutePath(self.pg7.remote_ip4, self.pg7.sw_if_index)],
150         )
151         r2 = VppIpRoute(
152             self,
153             self.pg8.remote_ip4,
154             32,
155             [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)],
156         )
157         r1.add_vpp_config()
158         r2.add_vpp_config()
159
160         n1 = VppNeighbor(
161             self,
162             self.pg7.sw_if_index,
163             self.pg7.remote_mac,
164             self.pg7.remote_ip4,
165             is_static=1,
166         )
167         n2 = VppNeighbor(
168             self,
169             self.pg8.sw_if_index,
170             self.pg8.remote_mac,
171             self.pg8.remote_ip4,
172             is_static=1,
173         )
174         n1.add_vpp_config()
175         n2.add_vpp_config()
176
177     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
178         """
179         Create packet stream for inside network
180
181         :param in_if: Inside interface
182         :param out_if: Outside interface
183         :param dst_ip: Destination address
184         :param ttl: TTL of generated packets
185         """
186         if dst_ip is None:
187             dst_ip = out_if.remote_ip4
188
189         pkts = []
190         # TCP
191         p = (
192             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
193             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
194             / TCP(sport=self.tcp_port_in, dport=20)
195         )
196         pkts.extend([p, p])
197
198         # UDP
199         p = (
200             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
201             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
202             / UDP(sport=self.udp_port_in, dport=20)
203         )
204         pkts.append(p)
205
206         # ICMP
207         p = (
208             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
209             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
210             / ICMP(id=self.icmp_id_in, type="echo-request")
211         )
212         pkts.append(p)
213
214         return pkts
215
216     def compose_ip6(self, ip4, pref, plen):
217         """
218         Compose IPv4-embedded IPv6 addresses
219
220         :param ip4: IPv4 address
221         :param pref: IPv6 prefix
222         :param plen: IPv6 prefix length
223         :returns: IPv4-embedded IPv6 addresses
224         """
225         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
226         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
227         if plen == 32:
228             pref_n[4] = ip4_n[0]
229             pref_n[5] = ip4_n[1]
230             pref_n[6] = ip4_n[2]
231             pref_n[7] = ip4_n[3]
232         elif plen == 40:
233             pref_n[5] = ip4_n[0]
234             pref_n[6] = ip4_n[1]
235             pref_n[7] = ip4_n[2]
236             pref_n[9] = ip4_n[3]
237         elif plen == 48:
238             pref_n[6] = ip4_n[0]
239             pref_n[7] = ip4_n[1]
240             pref_n[9] = ip4_n[2]
241             pref_n[10] = ip4_n[3]
242         elif plen == 56:
243             pref_n[7] = ip4_n[0]
244             pref_n[9] = ip4_n[1]
245             pref_n[10] = ip4_n[2]
246             pref_n[11] = ip4_n[3]
247         elif plen == 64:
248             pref_n[9] = ip4_n[0]
249             pref_n[10] = ip4_n[1]
250             pref_n[11] = ip4_n[2]
251             pref_n[12] = ip4_n[3]
252         elif plen == 96:
253             pref_n[12] = ip4_n[0]
254             pref_n[13] = ip4_n[1]
255             pref_n[14] = ip4_n[2]
256             pref_n[15] = ip4_n[3]
257         packed_pref_n = b"".join([scapy.compat.chb(x) for x in pref_n])
258         return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
259
260     def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
261         """
262         Create packet stream for outside network
263
264         :param out_if: Outside interface
265         :param dst_ip: Destination IP address (Default use global NAT address)
266         :param ttl: TTL of generated packets
267         :param use_inside_ports: Use inside NAT ports as destination ports
268                instead of outside ports
269         """
270         if dst_ip is None:
271             dst_ip = self.nat_addr
272         if not use_inside_ports:
273             tcp_port = self.tcp_port_out
274             udp_port = self.udp_port_out
275             icmp_id = self.icmp_id_out
276         else:
277             tcp_port = self.tcp_port_in
278             udp_port = self.udp_port_in
279             icmp_id = self.icmp_id_in
280         pkts = []
281         # TCP
282         p = (
283             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
284             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
285             / TCP(dport=tcp_port, sport=20)
286         )
287         pkts.extend([p, p])
288
289         # UDP
290         p = (
291             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
292             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
293             / UDP(dport=udp_port, sport=20)
294         )
295         pkts.append(p)
296
297         # ICMP
298         p = (
299             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
300             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
301             / ICMP(id=icmp_id, type="echo-reply")
302         )
303         pkts.append(p)
304
305         return pkts
306
307     def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
308         """
309         Create packet stream for outside network
310
311         :param out_if: Outside interface
312         :param dst_ip: Destination IP address (Default use global NAT address)
313         :param hl: HL of generated packets
314         """
315         pkts = []
316         # TCP
317         p = (
318             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
319             / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
320             / TCP(dport=self.tcp_port_out, sport=20)
321         )
322         pkts.append(p)
323
324         # UDP
325         p = (
326             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
327             / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
328             / UDP(dport=self.udp_port_out, sport=20)
329         )
330         pkts.append(p)
331
332         # ICMP
333         p = (
334             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
335             / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
336             / ICMPv6EchoReply(id=self.icmp_id_out)
337         )
338         pkts.append(p)
339
340         return pkts
341
342     def verify_capture_out(
343         self,
344         capture,
345         nat_ip=None,
346         same_port=False,
347         dst_ip=None,
348         is_ip6=False,
349         ignore_port=False,
350     ):
351         """
352         Verify captured packets on outside network
353
354         :param capture: Captured packets
355         :param nat_ip: Translated IP address (Default use global NAT address)
356         :param same_port: Source port number is not translated (Default False)
357         :param dst_ip: Destination IP address (Default do not verify)
358         :param is_ip6: If L3 protocol is IPv6 (Default False)
359         """
360         if is_ip6:
361             IP46 = IPv6
362             ICMP46 = ICMPv6EchoRequest
363         else:
364             IP46 = IP
365             ICMP46 = ICMP
366         if nat_ip is None:
367             nat_ip = self.nat_addr
368         for packet in capture:
369             try:
370                 if not is_ip6:
371                     self.assert_packet_checksums_valid(packet)
372                 self.assertEqual(packet[IP46].src, nat_ip)
373                 if dst_ip is not None:
374                     self.assertEqual(packet[IP46].dst, dst_ip)
375                 if packet.haslayer(TCP):
376                     if not ignore_port:
377                         if same_port:
378                             self.assertEqual(packet[TCP].sport, self.tcp_port_in)
379                         else:
380                             self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
381                     self.tcp_port_out = packet[TCP].sport
382                     self.assert_packet_checksums_valid(packet)
383                 elif packet.haslayer(UDP):
384                     if not ignore_port:
385                         if same_port:
386                             self.assertEqual(packet[UDP].sport, self.udp_port_in)
387                         else:
388                             self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
389                     self.udp_port_out = packet[UDP].sport
390                 else:
391                     if not ignore_port:
392                         if same_port:
393                             self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
394                         else:
395                             self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
396                     self.icmp_id_out = packet[ICMP46].id
397                     self.assert_packet_checksums_valid(packet)
398             except:
399                 self.logger.error(
400                     ppp("Unexpected or invalid packet (outside network):", packet)
401                 )
402                 raise
403
404     def verify_capture_out_ip6(self, capture, nat_ip, same_port=False, dst_ip=None):
405         """
406         Verify captured packets on outside network
407
408         :param capture: Captured packets
409         :param nat_ip: Translated IP address
410         :param same_port: Source port number is not translated (Default False)
411         :param dst_ip: Destination IP address (Default do not verify)
412         """
413         return self.verify_capture_out(capture, nat_ip, same_port, dst_ip, True)
414
415     def verify_capture_in(self, capture, in_if):
416         """
417         Verify captured packets on inside network
418
419         :param capture: Captured packets
420         :param in_if: Inside interface
421         """
422         for packet in capture:
423             try:
424                 self.assert_packet_checksums_valid(packet)
425                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
426                 if packet.haslayer(TCP):
427                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
428                 elif packet.haslayer(UDP):
429                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
430                 else:
431                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
432             except:
433                 self.logger.error(
434                     ppp("Unexpected or invalid packet (inside network):", packet)
435                 )
436                 raise
437
438     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
439         """
440         Verify captured packet that don't have to be translated
441
442         :param capture: Captured packets
443         :param ingress_if: Ingress interface
444         :param egress_if: Egress interface
445         """
446         for packet in capture:
447             try:
448                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
449                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
450                 if packet.haslayer(TCP):
451                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
452                 elif packet.haslayer(UDP):
453                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
454                 else:
455                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
456             except:
457                 self.logger.error(
458                     ppp("Unexpected or invalid packet (inside network):", packet)
459                 )
460                 raise
461
462     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None, icmp_type=11):
463         """
464         Verify captured packets with ICMP errors on outside network
465
466         :param capture: Captured packets
467         :param src_ip: Translated IP address or IP address of VPP
468                        (Default use global NAT address)
469         :param icmp_type: Type of error ICMP packet
470                           we are expecting (Default 11)
471         """
472         if src_ip is None:
473             src_ip = self.nat_addr
474         for packet in capture:
475             try:
476                 self.assertEqual(packet[IP].src, src_ip)
477                 self.assertEqual(packet.haslayer(ICMP), 1)
478                 icmp = packet[ICMP]
479                 self.assertEqual(icmp.type, icmp_type)
480                 self.assertTrue(icmp.haslayer(IPerror))
481                 inner_ip = icmp[IPerror]
482                 if inner_ip.haslayer(TCPerror):
483                     self.assertEqual(inner_ip[TCPerror].dport, self.tcp_port_out)
484                 elif inner_ip.haslayer(UDPerror):
485                     self.assertEqual(inner_ip[UDPerror].dport, self.udp_port_out)
486                 else:
487                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
488             except:
489                 self.logger.error(
490                     ppp("Unexpected or invalid packet (outside network):", packet)
491                 )
492                 raise
493
494     def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
495         """
496         Verify captured packets with ICMP errors on inside network
497
498         :param capture: Captured packets
499         :param in_if: Inside interface
500         :param icmp_type: Type of error ICMP packet
501                           we are expecting (Default 11)
502         """
503         for packet in capture:
504             try:
505                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
506                 self.assertEqual(packet.haslayer(ICMP), 1)
507                 icmp = packet[ICMP]
508                 self.assertEqual(icmp.type, icmp_type)
509                 self.assertTrue(icmp.haslayer(IPerror))
510                 inner_ip = icmp[IPerror]
511                 if inner_ip.haslayer(TCPerror):
512                     self.assertEqual(inner_ip[TCPerror].sport, self.tcp_port_in)
513                 elif inner_ip.haslayer(UDPerror):
514                     self.assertEqual(inner_ip[UDPerror].sport, self.udp_port_in)
515                 else:
516                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
517             except:
518                 self.logger.error(
519                     ppp("Unexpected or invalid packet (inside network):", packet)
520                 )
521                 raise
522
523     def create_stream_frag(
524         self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
525     ):
526         """
527         Create fragmented packet stream
528
529         :param src_if: Source interface
530         :param dst: Destination IPv4 address
531         :param sport: Source port
532         :param dport: Destination port
533         :param data: Payload data
534         :param proto: protocol (TCP, UDP, ICMP)
535         :param echo_reply: use echo_reply if protocol is ICMP
536         :returns: Fragments
537         """
538         if proto == IP_PROTOS.tcp:
539             p = (
540                 IP(src=src_if.remote_ip4, dst=dst)
541                 / TCP(sport=sport, dport=dport)
542                 / Raw(data)
543             )
544             p = p.__class__(scapy.compat.raw(p))
545             chksum = p[TCP].chksum
546             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
547         elif proto == IP_PROTOS.udp:
548             proto_header = UDP(sport=sport, dport=dport)
549         elif proto == IP_PROTOS.icmp:
550             if not echo_reply:
551                 proto_header = ICMP(id=sport, type="echo-request")
552             else:
553                 proto_header = ICMP(id=sport, type="echo-reply")
554         else:
555             raise Exception("Unsupported protocol")
556         id = random.randint(0, 65535)
557         pkts = []
558         if proto == IP_PROTOS.tcp:
559             raw = Raw(data[0:4])
560         else:
561             raw = Raw(data[0:16])
562         p = (
563             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
564             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
565             / proto_header
566             / raw
567         )
568         pkts.append(p)
569         if proto == IP_PROTOS.tcp:
570             raw = Raw(data[4:20])
571         else:
572             raw = Raw(data[16:32])
573         p = (
574             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
575             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
576             / raw
577         )
578         pkts.append(p)
579         if proto == IP_PROTOS.tcp:
580             raw = Raw(data[20:])
581         else:
582             raw = Raw(data[32:])
583         p = (
584             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
585             / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
586             / raw
587         )
588         pkts.append(p)
589         return pkts
590
591     def reass_frags_and_verify(self, frags, src, dst):
592         """
593         Reassemble and verify fragmented packet
594
595         :param frags: Captured fragments
596         :param src: Source IPv4 address to verify
597         :param dst: Destination IPv4 address to verify
598
599         :returns: Reassembled IPv4 packet
600         """
601         buffer = BytesIO()
602         for p in frags:
603             self.assertEqual(p[IP].src, src)
604             self.assertEqual(p[IP].dst, dst)
605             self.assert_ip_checksum_valid(p)
606             buffer.seek(p[IP].frag * 8)
607             buffer.write(bytes(p[IP].payload))
608         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
609         if ip.proto == IP_PROTOS.tcp:
610             p = ip / TCP(buffer.getvalue())
611             self.logger.debug(ppp("Reassembled:", p))
612             self.assert_tcp_checksum_valid(p)
613         elif ip.proto == IP_PROTOS.udp:
614             p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
615         elif ip.proto == IP_PROTOS.icmp:
616             p = ip / ICMP(buffer.getvalue())
617         return p
618
619     def verify_ipfix_nat44_ses(self, data):
620         """
621         Verify IPFIX NAT44EI session create/delete event
622
623         :param data: Decoded IPFIX data records
624         """
625         nat44_ses_create_num = 0
626         nat44_ses_delete_num = 0
627         self.assertEqual(6, len(data))
628         for record in data:
629             # natEvent
630             self.assertIn(scapy.compat.orb(record[230]), [4, 5])
631             if scapy.compat.orb(record[230]) == 4:
632                 nat44_ses_create_num += 1
633             else:
634                 nat44_ses_delete_num += 1
635             # sourceIPv4Address
636             self.assertEqual(self.pg0.remote_ip4, str(ipaddress.IPv4Address(record[8])))
637             # postNATSourceIPv4Address
638             self.assertEqual(
639                 socket.inet_pton(socket.AF_INET, self.nat_addr), record[225]
640             )
641             # ingressVRFID
642             self.assertEqual(struct.pack("!I", 0), record[234])
643             # protocolIdentifier/sourceTransportPort
644             # /postNAPTSourceTransportPort
645             if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
646                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
647                 self.assertEqual(struct.pack("!H", self.icmp_id_out), record[227])
648             elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
649                 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
650                 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
651             elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
652                 self.assertEqual(struct.pack("!H", self.udp_port_in), record[7])
653                 self.assertEqual(struct.pack("!H", self.udp_port_out), record[227])
654             else:
655                 self.fail(f"Invalid protocol {scapy.compat.orb(record[4])}")
656         self.assertEqual(3, nat44_ses_create_num)
657         self.assertEqual(3, nat44_ses_delete_num)
658
659     def verify_ipfix_addr_exhausted(self, data):
660         self.assertEqual(1, len(data))
661         record = data[0]
662         # natEvent
663         self.assertEqual(scapy.compat.orb(record[230]), 3)
664         # natPoolID
665         self.assertEqual(struct.pack("!I", 0), record[283])
666
667     def verify_ipfix_max_sessions(self, data, limit):
668         self.assertEqual(1, len(data))
669         record = data[0]
670         # natEvent
671         self.assertEqual(scapy.compat.orb(record[230]), 13)
672         # natQuotaExceededEvent
673         self.assertEqual(struct.pack("!I", 1), record[466])
674         # maxSessionEntries
675         self.assertEqual(struct.pack("!I", limit), record[471])
676
677     def verify_no_nat44_user(self):
678         """Verify that there is no NAT44EI user"""
679         users = self.vapi.nat44_ei_user_dump()
680         self.assertEqual(len(users), 0)
681         users = self.statistics["/nat44-ei/total-users"]
682         self.assertEqual(users[0][0], 0)
683         sessions = self.statistics["/nat44-ei/total-sessions"]
684         self.assertEqual(sessions[0][0], 0)
685
686     def verify_syslog_apmap(self, data, is_add=True):
687         message = data.decode("utf-8")
688         try:
689             message = SyslogMessage.parse(message)
690         except ParseError as e:
691             self.logger.error(e)
692             raise
693         else:
694             self.assertEqual(message.severity, SyslogSeverity.info)
695             self.assertEqual(message.appname, "NAT")
696             self.assertEqual(message.msgid, "APMADD" if is_add else "APMDEL")
697             sd_params = message.sd.get("napmap")
698             self.assertTrue(sd_params is not None)
699             self.assertEqual(sd_params.get("IATYP"), "IPv4")
700             self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
701             self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
702             self.assertEqual(sd_params.get("XATYP"), "IPv4")
703             self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
704             self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
705             self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
706             self.assertTrue(sd_params.get("SSUBIX") is not None)
707             self.assertEqual(sd_params.get("SVLAN"), "0")
708
709     def verify_mss_value(self, pkt, mss):
710         if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
711             raise TypeError("Not a TCP/IP packet")
712
713         for option in pkt[TCP].options:
714             if option[0] == "MSS":
715                 self.assertEqual(option[1], mss)
716                 self.assert_tcp_checksum_valid(pkt)
717
718     @staticmethod
719     def proto2layer(proto):
720         if proto == IP_PROTOS.tcp:
721             return TCP
722         elif proto == IP_PROTOS.udp:
723             return UDP
724         elif proto == IP_PROTOS.icmp:
725             return ICMP
726         else:
727             raise Exception("Unsupported protocol")
728
729     def frag_in_order(
730         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
731     ):
732         layer = self.proto2layer(proto)
733
734         if proto == IP_PROTOS.tcp:
735             data = b"A" * 4 + b"B" * 16 + b"C" * 3
736         else:
737             data = b"A" * 16 + b"B" * 16 + b"C" * 3
738         self.port_in = random.randint(1025, 65535)
739
740         # in2out
741         pkts = self.create_stream_frag(
742             self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
743         )
744         self.pg0.add_stream(pkts)
745         self.pg_enable_capture(self.pg_interfaces)
746         self.pg_start()
747         frags = self.pg1.get_capture(len(pkts))
748         if not dont_translate:
749             p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
750         else:
751             p = self.reass_frags_and_verify(
752                 frags, self.pg0.remote_ip4, self.pg1.remote_ip4
753             )
754         if proto != IP_PROTOS.icmp:
755             if not dont_translate:
756                 self.assertEqual(p[layer].dport, 20)
757                 if not ignore_port:
758                     self.assertNotEqual(p[layer].sport, self.port_in)
759             else:
760                 self.assertEqual(p[layer].sport, self.port_in)
761         else:
762             if not ignore_port:
763                 if not dont_translate:
764                     self.assertNotEqual(p[layer].id, self.port_in)
765                 else:
766                     self.assertEqual(p[layer].id, self.port_in)
767         self.assertEqual(data, p[Raw].load)
768
769         # out2in
770         if not dont_translate:
771             dst_addr = self.nat_addr
772         else:
773             dst_addr = self.pg0.remote_ip4
774         if proto != IP_PROTOS.icmp:
775             sport = 20
776             dport = p[layer].sport
777         else:
778             sport = p[layer].id
779             dport = 0
780         pkts = self.create_stream_frag(
781             self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
782         )
783         self.pg1.add_stream(pkts)
784         self.pg_enable_capture(self.pg_interfaces)
785         self.pg_start()
786         frags = self.pg0.get_capture(len(pkts))
787         p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
788         if proto != IP_PROTOS.icmp:
789             self.assertEqual(p[layer].sport, 20)
790             self.assertEqual(p[layer].dport, self.port_in)
791         else:
792             self.assertEqual(p[layer].id, self.port_in)
793         self.assertEqual(data, p[Raw].load)
794
795     def reass_hairpinning(
796         self,
797         server_addr,
798         server_in_port,
799         server_out_port,
800         host_in_port,
801         proto=IP_PROTOS.tcp,
802         ignore_port=False,
803     ):
804
805         layer = self.proto2layer(proto)
806
807         if proto == IP_PROTOS.tcp:
808             data = b"A" * 4 + b"B" * 16 + b"C" * 3
809         else:
810             data = b"A" * 16 + b"B" * 16 + b"C" * 3
811
812         # send packet from host to server
813         pkts = self.create_stream_frag(
814             self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto
815         )
816         self.pg0.add_stream(pkts)
817         self.pg_enable_capture(self.pg_interfaces)
818         self.pg_start()
819         frags = self.pg0.get_capture(len(pkts))
820         p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr)
821         if proto != IP_PROTOS.icmp:
822             if not ignore_port:
823                 self.assertNotEqual(p[layer].sport, host_in_port)
824             self.assertEqual(p[layer].dport, server_in_port)
825         else:
826             if not ignore_port:
827                 self.assertNotEqual(p[layer].id, host_in_port)
828         self.assertEqual(data, p[Raw].load)
829
830     def frag_out_of_order(
831         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
832     ):
833         layer = self.proto2layer(proto)
834
835         if proto == IP_PROTOS.tcp:
836             data = b"A" * 4 + b"B" * 16 + b"C" * 3
837         else:
838             data = b"A" * 16 + b"B" * 16 + b"C" * 3
839         self.port_in = random.randint(1025, 65535)
840
841         for i in range(2):
842             # in2out
843             pkts = self.create_stream_frag(
844                 self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
845             )
846             pkts.reverse()
847             self.pg0.add_stream(pkts)
848             self.pg_enable_capture(self.pg_interfaces)
849             self.pg_start()
850             frags = self.pg1.get_capture(len(pkts))
851             if not dont_translate:
852                 p = self.reass_frags_and_verify(
853                     frags, self.nat_addr, self.pg1.remote_ip4
854                 )
855             else:
856                 p = self.reass_frags_and_verify(
857                     frags, self.pg0.remote_ip4, self.pg1.remote_ip4
858                 )
859             if proto != IP_PROTOS.icmp:
860                 if not dont_translate:
861                     self.assertEqual(p[layer].dport, 20)
862                     if not ignore_port:
863                         self.assertNotEqual(p[layer].sport, self.port_in)
864                 else:
865                     self.assertEqual(p[layer].sport, self.port_in)
866             else:
867                 if not ignore_port:
868                     if not dont_translate:
869                         self.assertNotEqual(p[layer].id, self.port_in)
870                     else:
871                         self.assertEqual(p[layer].id, self.port_in)
872             self.assertEqual(data, p[Raw].load)
873
874             # out2in
875             if not dont_translate:
876                 dst_addr = self.nat_addr
877             else:
878                 dst_addr = self.pg0.remote_ip4
879             if proto != IP_PROTOS.icmp:
880                 sport = 20
881                 dport = p[layer].sport
882             else:
883                 sport = p[layer].id
884                 dport = 0
885             pkts = self.create_stream_frag(
886                 self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
887             )
888             pkts.reverse()
889             self.pg1.add_stream(pkts)
890             self.pg_enable_capture(self.pg_interfaces)
891             self.pg_start()
892             frags = self.pg0.get_capture(len(pkts))
893             p = self.reass_frags_and_verify(
894                 frags, self.pg1.remote_ip4, self.pg0.remote_ip4
895             )
896             if proto != IP_PROTOS.icmp:
897                 self.assertEqual(p[layer].sport, 20)
898                 self.assertEqual(p[layer].dport, self.port_in)
899             else:
900                 self.assertEqual(p[layer].id, self.port_in)
901             self.assertEqual(data, p[Raw].load)
902
903
904 def get_nat44_ei_in2out_worker_index(ip, vpp_worker_count):
905     if 0 == vpp_worker_count:
906         return 0
907     numeric = socket.inet_aton(ip)
908     numeric = struct.unpack("!L", numeric)[0]
909     numeric = socket.htonl(numeric)
910     h = numeric + (numeric >> 8) + (numeric >> 16) + (numeric >> 24)
911     return 1 + h % vpp_worker_count
912
913
914 @tag_fixme_debian11
915 class TestNAT44EI(MethodHolder):
916     """NAT44EI Test Cases"""
917
918     max_translations = 10240
919     max_users = 10240
920
921     @classmethod
922     def setUpClass(cls):
923         super(TestNAT44EI, cls).setUpClass()
924         if is_distro_debian11 == True and not hasattr(cls, "vpp"):
925             return
926         cls.vapi.cli("set log class nat44-ei level debug")
927
928         cls.tcp_port_in = 6303
929         cls.tcp_port_out = 6303
930         cls.udp_port_in = 6304
931         cls.udp_port_out = 6304
932         cls.icmp_id_in = 6305
933         cls.icmp_id_out = 6305
934         cls.nat_addr = "10.0.0.3"
935         cls.ipfix_src_port = 4739
936         cls.ipfix_domain_id = 1
937         cls.tcp_external_port = 80
938         cls.udp_external_port = 69
939
940         cls.create_pg_interfaces(range(10))
941         cls.interfaces = list(cls.pg_interfaces[0:4])
942
943         for i in cls.interfaces:
944             i.admin_up()
945             i.config_ip4()
946             i.resolve_arp()
947
948         cls.pg0.generate_remote_hosts(3)
949         cls.pg0.configure_ipv4_neighbors()
950
951         cls.pg1.generate_remote_hosts(1)
952         cls.pg1.configure_ipv4_neighbors()
953
954         cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
955         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10})
956         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20})
957
958         cls.pg4._local_ip4 = "172.16.255.1"
959         cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
960         cls.pg4.set_table_ip4(10)
961         cls.pg5._local_ip4 = "172.17.255.3"
962         cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
963         cls.pg5.set_table_ip4(10)
964         cls.pg6._local_ip4 = "172.16.255.1"
965         cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
966         cls.pg6.set_table_ip4(20)
967         for i in cls.overlapping_interfaces:
968             i.config_ip4()
969             i.admin_up()
970             i.resolve_arp()
971
972         cls.pg7.admin_up()
973         cls.pg8.admin_up()
974
975         cls.pg9.generate_remote_hosts(2)
976         cls.pg9.config_ip4()
977         cls.vapi.sw_interface_add_del_address(
978             sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24"
979         )
980
981         cls.pg9.admin_up()
982         cls.pg9.resolve_arp()
983         cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
984         cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
985         cls.pg9.resolve_arp()
986
987     def plugin_enable(self):
988         self.vapi.nat44_ei_plugin_enable_disable(
989             sessions=self.max_translations, users=self.max_users, enable=1
990         )
991
992     def setUp(self):
993         super(TestNAT44EI, self).setUp()
994         self.plugin_enable()
995
996     def tearDown(self):
997         super(TestNAT44EI, self).tearDown()
998         if not self.vpp_dead:
999             self.vapi.nat44_ei_ipfix_enable_disable(
1000                 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
1001             )
1002             self.ipfix_src_port = 4739
1003             self.ipfix_domain_id = 1
1004
1005             self.vapi.nat44_ei_plugin_enable_disable(enable=0)
1006             self.vapi.cli("clear logging")
1007
1008     def test_clear_sessions(self):
1009         """NAT44EI session clearing test"""
1010
1011         self.nat44_add_address(self.nat_addr)
1012         flags = self.config_flags.NAT44_EI_IF_INSIDE
1013         self.vapi.nat44_ei_interface_add_del_feature(
1014             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1015         )
1016         self.vapi.nat44_ei_interface_add_del_feature(
1017             sw_if_index=self.pg1.sw_if_index, is_add=1
1018         )
1019
1020         pkts = self.create_stream_in(self.pg0, self.pg1)
1021         self.pg0.add_stream(pkts)
1022         self.pg_enable_capture(self.pg_interfaces)
1023         self.pg_start()
1024         capture = self.pg1.get_capture(len(pkts))
1025         self.verify_capture_out(capture)
1026
1027         sessions = self.statistics["/nat44-ei/total-sessions"]
1028         self.assertGreater(sessions[:, 0].sum(), 0, "Session count invalid")
1029         self.logger.info("sessions before clearing: %s" % sessions[0][0])
1030
1031         self.vapi.cli("clear nat44 ei sessions")
1032
1033         sessions = self.statistics["/nat44-ei/total-sessions"]
1034         self.assertEqual(sessions[:, 0].sum(), 0, "Session count invalid")
1035         self.logger.info("sessions after clearing: %s" % sessions[0][0])
1036
1037     def test_dynamic(self):
1038         """NAT44EI dynamic translation test"""
1039         self.nat44_add_address(self.nat_addr)
1040         flags = self.config_flags.NAT44_EI_IF_INSIDE
1041         self.vapi.nat44_ei_interface_add_del_feature(
1042             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1043         )
1044         self.vapi.nat44_ei_interface_add_del_feature(
1045             sw_if_index=self.pg1.sw_if_index, is_add=1
1046         )
1047
1048         # in2out
1049         tcpn = self.statistics["/nat44-ei/in2out/slowpath/tcp"]
1050         udpn = self.statistics["/nat44-ei/in2out/slowpath/udp"]
1051         icmpn = self.statistics["/nat44-ei/in2out/slowpath/icmp"]
1052         drops = self.statistics["/nat44-ei/in2out/slowpath/drops"]
1053
1054         pkts = self.create_stream_in(self.pg0, self.pg1)
1055         self.pg0.add_stream(pkts)
1056         self.pg_enable_capture(self.pg_interfaces)
1057         self.pg_start()
1058         capture = self.pg1.get_capture(len(pkts))
1059         self.verify_capture_out(capture)
1060
1061         if_idx = self.pg0.sw_if_index
1062         cnt = self.statistics["/nat44-ei/in2out/slowpath/tcp"]
1063         self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
1064         cnt = self.statistics["/nat44-ei/in2out/slowpath/udp"]
1065         self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
1066         cnt = self.statistics["/nat44-ei/in2out/slowpath/icmp"]
1067         self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
1068         cnt = self.statistics["/nat44-ei/in2out/slowpath/drops"]
1069         self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
1070
1071         # out2in
1072         tcpn = self.statistics["/nat44-ei/out2in/slowpath/tcp"]
1073         udpn = self.statistics["/nat44-ei/out2in/slowpath/udp"]
1074         icmpn = self.statistics["/nat44-ei/out2in/slowpath/icmp"]
1075         drops = self.statistics["/nat44-ei/out2in/slowpath/drops"]
1076
1077         pkts = self.create_stream_out(self.pg1)
1078         self.pg1.add_stream(pkts)
1079         self.pg_enable_capture(self.pg_interfaces)
1080         self.pg_start()
1081         capture = self.pg0.get_capture(len(pkts))
1082         self.verify_capture_in(capture, self.pg0)
1083
1084         if_idx = self.pg1.sw_if_index
1085         cnt = self.statistics["/nat44-ei/out2in/slowpath/tcp"]
1086         self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
1087         cnt = self.statistics["/nat44-ei/out2in/slowpath/udp"]
1088         self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
1089         cnt = self.statistics["/nat44-ei/out2in/slowpath/icmp"]
1090         self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
1091         cnt = self.statistics["/nat44-ei/out2in/slowpath/drops"]
1092         self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
1093
1094         users = self.statistics["/nat44-ei/total-users"]
1095         self.assertEqual(users[:, 0].sum(), 1)
1096         sessions = self.statistics["/nat44-ei/total-sessions"]
1097         self.assertEqual(sessions[:, 0].sum(), 3)
1098
1099     def test_dynamic_icmp_errors_in2out_ttl_1(self):
1100         """NAT44EI handling of client packets with TTL=1"""
1101
1102         self.nat44_add_address(self.nat_addr)
1103         flags = self.config_flags.NAT44_EI_IF_INSIDE
1104         self.vapi.nat44_ei_interface_add_del_feature(
1105             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1106         )
1107         self.vapi.nat44_ei_interface_add_del_feature(
1108             sw_if_index=self.pg1.sw_if_index, is_add=1
1109         )
1110
1111         # Client side - generate traffic
1112         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1113         capture = self.send_and_expect_some(self.pg0, pkts, self.pg0)
1114
1115         # Client side - verify ICMP type 11 packets
1116         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1117
1118     def test_dynamic_icmp_errors_out2in_ttl_1(self):
1119         """NAT44EI handling of server packets with TTL=1"""
1120
1121         self.nat44_add_address(self.nat_addr)
1122         flags = self.config_flags.NAT44_EI_IF_INSIDE
1123         self.vapi.nat44_ei_interface_add_del_feature(
1124             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1125         )
1126         self.vapi.nat44_ei_interface_add_del_feature(
1127             sw_if_index=self.pg1.sw_if_index, is_add=1
1128         )
1129
1130         # Client side - create sessions
1131         pkts = self.create_stream_in(self.pg0, self.pg1)
1132         self.pg0.add_stream(pkts)
1133         self.pg_enable_capture(self.pg_interfaces)
1134         self.pg_start()
1135
1136         # Server side - generate traffic
1137         capture = self.pg1.get_capture(len(pkts))
1138         self.verify_capture_out(capture)
1139         pkts = self.create_stream_out(self.pg1, ttl=1)
1140         capture = self.send_and_expect_some(self.pg1, pkts, self.pg1)
1141
1142         # Server side - verify ICMP type 11 packets
1143         self.verify_capture_out_with_icmp_errors(capture, src_ip=self.pg1.local_ip4)
1144
1145     def test_dynamic_icmp_errors_in2out_ttl_2(self):
1146         """NAT44EI handling of error responses to client packets with TTL=2"""
1147
1148         self.nat44_add_address(self.nat_addr)
1149         flags = self.config_flags.NAT44_EI_IF_INSIDE
1150         self.vapi.nat44_ei_interface_add_del_feature(
1151             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1152         )
1153         self.vapi.nat44_ei_interface_add_del_feature(
1154             sw_if_index=self.pg1.sw_if_index, is_add=1
1155         )
1156
1157         # Client side - generate traffic
1158         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1159         self.pg0.add_stream(pkts)
1160         self.pg_enable_capture(self.pg_interfaces)
1161         self.pg_start()
1162
1163         # Server side - simulate ICMP type 11 response
1164         capture = self.pg1.get_capture(len(pkts))
1165         pkts = [
1166             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1167             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1168             / ICMP(type=11)
1169             / packet[IP]
1170             for packet in capture
1171         ]
1172         self.pg1.add_stream(pkts)
1173         self.pg_enable_capture(self.pg_interfaces)
1174         self.pg_start()
1175
1176         # Client side - verify ICMP type 11 packets
1177         capture = self.pg0.get_capture(len(pkts))
1178         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1179
1180     def test_dynamic_icmp_errors_out2in_ttl_2(self):
1181         """NAT44EI handling of error responses to server packets with TTL=2"""
1182
1183         self.nat44_add_address(self.nat_addr)
1184         flags = self.config_flags.NAT44_EI_IF_INSIDE
1185         self.vapi.nat44_ei_interface_add_del_feature(
1186             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1187         )
1188         self.vapi.nat44_ei_interface_add_del_feature(
1189             sw_if_index=self.pg1.sw_if_index, is_add=1
1190         )
1191
1192         # Client side - create sessions
1193         pkts = self.create_stream_in(self.pg0, self.pg1)
1194         self.pg0.add_stream(pkts)
1195         self.pg_enable_capture(self.pg_interfaces)
1196         self.pg_start()
1197
1198         # Server side - generate traffic
1199         capture = self.pg1.get_capture(len(pkts))
1200         self.verify_capture_out(capture)
1201         pkts = self.create_stream_out(self.pg1, ttl=2)
1202         self.pg1.add_stream(pkts)
1203         self.pg_enable_capture(self.pg_interfaces)
1204         self.pg_start()
1205
1206         # Client side - simulate ICMP type 11 response
1207         capture = self.pg0.get_capture(len(pkts))
1208         pkts = [
1209             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1210             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1211             / ICMP(type=11)
1212             / packet[IP]
1213             for packet in capture
1214         ]
1215         self.pg0.add_stream(pkts)
1216         self.pg_enable_capture(self.pg_interfaces)
1217         self.pg_start()
1218
1219         # Server side - verify ICMP type 11 packets
1220         capture = self.pg1.get_capture(len(pkts))
1221         self.verify_capture_out_with_icmp_errors(capture)
1222
1223     def test_ping_out_interface_from_outside(self):
1224         """NAT44EI ping out interface from outside network"""
1225
1226         self.nat44_add_address(self.nat_addr)
1227         flags = self.config_flags.NAT44_EI_IF_INSIDE
1228         self.vapi.nat44_ei_interface_add_del_feature(
1229             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1230         )
1231         self.vapi.nat44_ei_interface_add_del_feature(
1232             sw_if_index=self.pg1.sw_if_index, is_add=1
1233         )
1234
1235         p = (
1236             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1237             / IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4)
1238             / ICMP(id=self.icmp_id_out, type="echo-request")
1239         )
1240         pkts = [p]
1241         self.pg1.add_stream(pkts)
1242         self.pg_enable_capture(self.pg_interfaces)
1243         self.pg_start()
1244         capture = self.pg1.get_capture(len(pkts))
1245         packet = capture[0]
1246         try:
1247             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1248             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1249             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1250             self.assertEqual(packet[ICMP].type, 0)  # echo reply
1251         except:
1252             self.logger.error(
1253                 ppp("Unexpected or invalid packet (outside network):", packet)
1254             )
1255             raise
1256
1257     def test_ping_internal_host_from_outside(self):
1258         """NAT44EI ping internal host from outside network"""
1259
1260         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1261         flags = self.config_flags.NAT44_EI_IF_INSIDE
1262         self.vapi.nat44_ei_interface_add_del_feature(
1263             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1264         )
1265         self.vapi.nat44_ei_interface_add_del_feature(
1266             sw_if_index=self.pg1.sw_if_index, is_add=1
1267         )
1268
1269         # out2in
1270         pkt = (
1271             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1272             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64)
1273             / ICMP(id=self.icmp_id_out, type="echo-request")
1274         )
1275         self.pg1.add_stream(pkt)
1276         self.pg_enable_capture(self.pg_interfaces)
1277         self.pg_start()
1278         capture = self.pg0.get_capture(1)
1279         self.verify_capture_in(capture, self.pg0)
1280         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1281
1282         # in2out
1283         pkt = (
1284             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1285             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
1286             / ICMP(id=self.icmp_id_in, type="echo-reply")
1287         )
1288         self.pg0.add_stream(pkt)
1289         self.pg_enable_capture(self.pg_interfaces)
1290         self.pg_start()
1291         capture = self.pg1.get_capture(1)
1292         self.verify_capture_out(capture, same_port=True)
1293         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1294
1295     def test_forwarding(self):
1296         """NAT44EI forwarding test"""
1297
1298         flags = self.config_flags.NAT44_EI_IF_INSIDE
1299         self.vapi.nat44_ei_interface_add_del_feature(
1300             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1301         )
1302         self.vapi.nat44_ei_interface_add_del_feature(
1303             sw_if_index=self.pg1.sw_if_index, is_add=1
1304         )
1305         self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
1306
1307         real_ip = self.pg0.remote_ip4
1308         alias_ip = self.nat_addr
1309         flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1310         self.vapi.nat44_ei_add_del_static_mapping(
1311             is_add=1,
1312             local_ip_address=real_ip,
1313             external_ip_address=alias_ip,
1314             external_sw_if_index=0xFFFFFFFF,
1315             flags=flags,
1316         )
1317
1318         try:
1319             # static mapping match
1320
1321             pkts = self.create_stream_out(self.pg1)
1322             self.pg1.add_stream(pkts)
1323             self.pg_enable_capture(self.pg_interfaces)
1324             self.pg_start()
1325             capture = self.pg0.get_capture(len(pkts))
1326             self.verify_capture_in(capture, self.pg0)
1327
1328             pkts = self.create_stream_in(self.pg0, self.pg1)
1329             self.pg0.add_stream(pkts)
1330             self.pg_enable_capture(self.pg_interfaces)
1331             self.pg_start()
1332             capture = self.pg1.get_capture(len(pkts))
1333             self.verify_capture_out(capture, same_port=True)
1334
1335             # no static mapping match
1336
1337             host0 = self.pg0.remote_hosts[0]
1338             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1339             try:
1340                 pkts = self.create_stream_out(
1341                     self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1342                 )
1343                 self.pg1.add_stream(pkts)
1344                 self.pg_enable_capture(self.pg_interfaces)
1345                 self.pg_start()
1346                 capture = self.pg0.get_capture(len(pkts))
1347                 self.verify_capture_in(capture, self.pg0)
1348
1349                 pkts = self.create_stream_in(self.pg0, self.pg1)
1350                 self.pg0.add_stream(pkts)
1351                 self.pg_enable_capture(self.pg_interfaces)
1352                 self.pg_start()
1353                 capture = self.pg1.get_capture(len(pkts))
1354                 self.verify_capture_out(
1355                     capture, nat_ip=self.pg0.remote_ip4, same_port=True
1356                 )
1357             finally:
1358                 self.pg0.remote_hosts[0] = host0
1359
1360         finally:
1361             self.vapi.nat44_ei_forwarding_enable_disable(enable=0)
1362             flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1363             self.vapi.nat44_ei_add_del_static_mapping(
1364                 is_add=0,
1365                 local_ip_address=real_ip,
1366                 external_ip_address=alias_ip,
1367                 external_sw_if_index=0xFFFFFFFF,
1368                 flags=flags,
1369             )
1370
1371     def test_static_in(self):
1372         """NAT44EI 1:1 NAT initialized from inside network"""
1373
1374         nat_ip = "10.0.0.10"
1375         self.tcp_port_out = 6303
1376         self.udp_port_out = 6304
1377         self.icmp_id_out = 6305
1378
1379         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1380         flags = self.config_flags.NAT44_EI_IF_INSIDE
1381         self.vapi.nat44_ei_interface_add_del_feature(
1382             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1383         )
1384         self.vapi.nat44_ei_interface_add_del_feature(
1385             sw_if_index=self.pg1.sw_if_index, is_add=1
1386         )
1387         sm = self.vapi.nat44_ei_static_mapping_dump()
1388         self.assertEqual(len(sm), 1)
1389         self.assertEqual(sm[0].tag, "")
1390         self.assertEqual(sm[0].protocol, 0)
1391         self.assertEqual(sm[0].local_port, 0)
1392         self.assertEqual(sm[0].external_port, 0)
1393
1394         # in2out
1395         pkts = self.create_stream_in(self.pg0, self.pg1)
1396         self.pg0.add_stream(pkts)
1397         self.pg_enable_capture(self.pg_interfaces)
1398         self.pg_start()
1399         capture = self.pg1.get_capture(len(pkts))
1400         self.verify_capture_out(capture, nat_ip, True)
1401
1402         # out2in
1403         pkts = self.create_stream_out(self.pg1, nat_ip)
1404         self.pg1.add_stream(pkts)
1405         self.pg_enable_capture(self.pg_interfaces)
1406         self.pg_start()
1407         capture = self.pg0.get_capture(len(pkts))
1408         self.verify_capture_in(capture, self.pg0)
1409
1410     def test_static_out(self):
1411         """NAT44EI 1:1 NAT initialized from outside network"""
1412
1413         nat_ip = "10.0.0.20"
1414         self.tcp_port_out = 6303
1415         self.udp_port_out = 6304
1416         self.icmp_id_out = 6305
1417         tag = "testTAG"
1418
1419         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1420         flags = self.config_flags.NAT44_EI_IF_INSIDE
1421         self.vapi.nat44_ei_interface_add_del_feature(
1422             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1423         )
1424         self.vapi.nat44_ei_interface_add_del_feature(
1425             sw_if_index=self.pg1.sw_if_index, is_add=1
1426         )
1427         sm = self.vapi.nat44_ei_static_mapping_dump()
1428         self.assertEqual(len(sm), 1)
1429         self.assertEqual(sm[0].tag, tag)
1430
1431         # out2in
1432         pkts = self.create_stream_out(self.pg1, nat_ip)
1433         self.pg1.add_stream(pkts)
1434         self.pg_enable_capture(self.pg_interfaces)
1435         self.pg_start()
1436         capture = self.pg0.get_capture(len(pkts))
1437         self.verify_capture_in(capture, self.pg0)
1438
1439         # in2out
1440         pkts = self.create_stream_in(self.pg0, self.pg1)
1441         self.pg0.add_stream(pkts)
1442         self.pg_enable_capture(self.pg_interfaces)
1443         self.pg_start()
1444         capture = self.pg1.get_capture(len(pkts))
1445         self.verify_capture_out(capture, nat_ip, True)
1446
1447     def test_static_with_port_in(self):
1448         """NAT44EI 1:1 NAPT initialized from inside network"""
1449
1450         self.tcp_port_out = 3606
1451         self.udp_port_out = 3607
1452         self.icmp_id_out = 3608
1453
1454         self.nat44_add_address(self.nat_addr)
1455         self.nat44_add_static_mapping(
1456             self.pg0.remote_ip4,
1457             self.nat_addr,
1458             self.tcp_port_in,
1459             self.tcp_port_out,
1460             proto=IP_PROTOS.tcp,
1461         )
1462         self.nat44_add_static_mapping(
1463             self.pg0.remote_ip4,
1464             self.nat_addr,
1465             self.udp_port_in,
1466             self.udp_port_out,
1467             proto=IP_PROTOS.udp,
1468         )
1469         self.nat44_add_static_mapping(
1470             self.pg0.remote_ip4,
1471             self.nat_addr,
1472             self.icmp_id_in,
1473             self.icmp_id_out,
1474             proto=IP_PROTOS.icmp,
1475         )
1476         flags = self.config_flags.NAT44_EI_IF_INSIDE
1477         self.vapi.nat44_ei_interface_add_del_feature(
1478             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1479         )
1480         self.vapi.nat44_ei_interface_add_del_feature(
1481             sw_if_index=self.pg1.sw_if_index, is_add=1
1482         )
1483
1484         # in2out
1485         pkts = self.create_stream_in(self.pg0, self.pg1)
1486         self.pg0.add_stream(pkts)
1487         self.pg_enable_capture(self.pg_interfaces)
1488         self.pg_start()
1489         capture = self.pg1.get_capture(len(pkts))
1490         self.verify_capture_out(capture)
1491
1492         # out2in
1493         pkts = self.create_stream_out(self.pg1)
1494         self.pg1.add_stream(pkts)
1495         self.pg_enable_capture(self.pg_interfaces)
1496         self.pg_start()
1497         capture = self.pg0.get_capture(len(pkts))
1498         self.verify_capture_in(capture, self.pg0)
1499
1500     def test_static_with_port_out(self):
1501         """NAT44EI 1:1 NAPT initialized from outside network"""
1502
1503         self.tcp_port_out = 30606
1504         self.udp_port_out = 30607
1505         self.icmp_id_out = 30608
1506
1507         self.nat44_add_address(self.nat_addr)
1508         self.nat44_add_static_mapping(
1509             self.pg0.remote_ip4,
1510             self.nat_addr,
1511             self.tcp_port_in,
1512             self.tcp_port_out,
1513             proto=IP_PROTOS.tcp,
1514         )
1515         self.nat44_add_static_mapping(
1516             self.pg0.remote_ip4,
1517             self.nat_addr,
1518             self.udp_port_in,
1519             self.udp_port_out,
1520             proto=IP_PROTOS.udp,
1521         )
1522         self.nat44_add_static_mapping(
1523             self.pg0.remote_ip4,
1524             self.nat_addr,
1525             self.icmp_id_in,
1526             self.icmp_id_out,
1527             proto=IP_PROTOS.icmp,
1528         )
1529         flags = self.config_flags.NAT44_EI_IF_INSIDE
1530         self.vapi.nat44_ei_interface_add_del_feature(
1531             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1532         )
1533         self.vapi.nat44_ei_interface_add_del_feature(
1534             sw_if_index=self.pg1.sw_if_index, is_add=1
1535         )
1536
1537         # out2in
1538         pkts = self.create_stream_out(self.pg1)
1539         self.pg1.add_stream(pkts)
1540         self.pg_enable_capture(self.pg_interfaces)
1541         self.pg_start()
1542         capture = self.pg0.get_capture(len(pkts))
1543         self.verify_capture_in(capture, self.pg0)
1544
1545         # in2out
1546         pkts = self.create_stream_in(self.pg0, self.pg1)
1547         self.pg0.add_stream(pkts)
1548         self.pg_enable_capture(self.pg_interfaces)
1549         self.pg_start()
1550         capture = self.pg1.get_capture(len(pkts))
1551         self.verify_capture_out(capture)
1552
1553     def test_static_vrf_aware(self):
1554         """NAT44EI 1:1 NAT VRF awareness"""
1555
1556         nat_ip1 = "10.0.0.30"
1557         nat_ip2 = "10.0.0.40"
1558         self.tcp_port_out = 6303
1559         self.udp_port_out = 6304
1560         self.icmp_id_out = 6305
1561
1562         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1, vrf_id=10)
1563         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2, vrf_id=10)
1564         flags = self.config_flags.NAT44_EI_IF_INSIDE
1565         self.vapi.nat44_ei_interface_add_del_feature(
1566             sw_if_index=self.pg3.sw_if_index, is_add=1
1567         )
1568         self.vapi.nat44_ei_interface_add_del_feature(
1569             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1570         )
1571         self.vapi.nat44_ei_interface_add_del_feature(
1572             sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
1573         )
1574
1575         # inside interface VRF match NAT44EI static mapping VRF
1576         pkts = self.create_stream_in(self.pg4, self.pg3)
1577         self.pg4.add_stream(pkts)
1578         self.pg_enable_capture(self.pg_interfaces)
1579         self.pg_start()
1580         capture = self.pg3.get_capture(len(pkts))
1581         self.verify_capture_out(capture, nat_ip1, True)
1582
1583         # inside interface VRF don't match NAT44EI static mapping VRF (packets
1584         # are dropped)
1585         pkts = self.create_stream_in(self.pg0, self.pg3)
1586         self.pg0.add_stream(pkts)
1587         self.pg_enable_capture(self.pg_interfaces)
1588         self.pg_start()
1589         self.pg3.assert_nothing_captured()
1590
1591     def test_dynamic_to_static(self):
1592         """NAT44EI Switch from dynamic translation to 1:1NAT"""
1593         nat_ip = "10.0.0.10"
1594         self.tcp_port_out = 6303
1595         self.udp_port_out = 6304
1596         self.icmp_id_out = 6305
1597
1598         self.nat44_add_address(self.nat_addr)
1599         flags = self.config_flags.NAT44_EI_IF_INSIDE
1600         self.vapi.nat44_ei_interface_add_del_feature(
1601             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1602         )
1603         self.vapi.nat44_ei_interface_add_del_feature(
1604             sw_if_index=self.pg1.sw_if_index, is_add=1
1605         )
1606
1607         # dynamic
1608         pkts = self.create_stream_in(self.pg0, self.pg1)
1609         self.pg0.add_stream(pkts)
1610         self.pg_enable_capture(self.pg_interfaces)
1611         self.pg_start()
1612         capture = self.pg1.get_capture(len(pkts))
1613         self.verify_capture_out(capture)
1614
1615         # 1:1NAT
1616         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1617         sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
1618         self.assertEqual(len(sessions), 0)
1619         pkts = self.create_stream_in(self.pg0, self.pg1)
1620         self.pg0.add_stream(pkts)
1621         self.pg_enable_capture(self.pg_interfaces)
1622         self.pg_start()
1623         capture = self.pg1.get_capture(len(pkts))
1624         self.verify_capture_out(capture, nat_ip, True)
1625
1626     def test_identity_nat(self):
1627         """NAT44EI Identity NAT"""
1628         flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1629         self.vapi.nat44_ei_add_del_identity_mapping(
1630             ip_address=self.pg0.remote_ip4,
1631             sw_if_index=0xFFFFFFFF,
1632             flags=flags,
1633             is_add=1,
1634         )
1635         flags = self.config_flags.NAT44_EI_IF_INSIDE
1636         self.vapi.nat44_ei_interface_add_del_feature(
1637             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1638         )
1639         self.vapi.nat44_ei_interface_add_del_feature(
1640             sw_if_index=self.pg1.sw_if_index, is_add=1
1641         )
1642
1643         p = (
1644             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1645             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
1646             / TCP(sport=12345, dport=56789)
1647         )
1648         self.pg1.add_stream(p)
1649         self.pg_enable_capture(self.pg_interfaces)
1650         self.pg_start()
1651         capture = self.pg0.get_capture(1)
1652         p = capture[0]
1653         try:
1654             ip = p[IP]
1655             tcp = p[TCP]
1656             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1657             self.assertEqual(ip.src, self.pg1.remote_ip4)
1658             self.assertEqual(tcp.dport, 56789)
1659             self.assertEqual(tcp.sport, 12345)
1660             self.assert_packet_checksums_valid(p)
1661         except:
1662             self.logger.error(ppp("Unexpected or invalid packet:", p))
1663             raise
1664
1665         sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
1666         self.assertEqual(len(sessions), 0)
1667         flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1668         self.vapi.nat44_ei_add_del_identity_mapping(
1669             ip_address=self.pg0.remote_ip4,
1670             sw_if_index=0xFFFFFFFF,
1671             flags=flags,
1672             vrf_id=1,
1673             is_add=1,
1674         )
1675         identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
1676         self.assertEqual(len(identity_mappings), 2)
1677
1678     def test_multiple_inside_interfaces(self):
1679         """NAT44EI multiple non-overlapping address space inside interfaces"""
1680
1681         self.nat44_add_address(self.nat_addr)
1682         flags = self.config_flags.NAT44_EI_IF_INSIDE
1683         self.vapi.nat44_ei_interface_add_del_feature(
1684             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1685         )
1686         self.vapi.nat44_ei_interface_add_del_feature(
1687             sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
1688         )
1689         self.vapi.nat44_ei_interface_add_del_feature(
1690             sw_if_index=self.pg3.sw_if_index, is_add=1
1691         )
1692
1693         # between two NAT44EI inside interfaces (no translation)
1694         pkts = self.create_stream_in(self.pg0, self.pg1)
1695         self.pg0.add_stream(pkts)
1696         self.pg_enable_capture(self.pg_interfaces)
1697         self.pg_start()
1698         capture = self.pg1.get_capture(len(pkts))
1699         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1700
1701         # from inside to interface without translation
1702         pkts = self.create_stream_in(self.pg0, self.pg2)
1703         self.pg0.add_stream(pkts)
1704         self.pg_enable_capture(self.pg_interfaces)
1705         self.pg_start()
1706         capture = self.pg2.get_capture(len(pkts))
1707         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1708
1709         # in2out 1st interface
1710         pkts = self.create_stream_in(self.pg0, self.pg3)
1711         self.pg0.add_stream(pkts)
1712         self.pg_enable_capture(self.pg_interfaces)
1713         self.pg_start()
1714         capture = self.pg3.get_capture(len(pkts))
1715         self.verify_capture_out(capture)
1716
1717         # out2in 1st interface
1718         pkts = self.create_stream_out(self.pg3)
1719         self.pg3.add_stream(pkts)
1720         self.pg_enable_capture(self.pg_interfaces)
1721         self.pg_start()
1722         capture = self.pg0.get_capture(len(pkts))
1723         self.verify_capture_in(capture, self.pg0)
1724
1725         # in2out 2nd interface
1726         pkts = self.create_stream_in(self.pg1, self.pg3)
1727         self.pg1.add_stream(pkts)
1728         self.pg_enable_capture(self.pg_interfaces)
1729         self.pg_start()
1730         capture = self.pg3.get_capture(len(pkts))
1731         self.verify_capture_out(capture)
1732
1733         # out2in 2nd interface
1734         pkts = self.create_stream_out(self.pg3)
1735         self.pg3.add_stream(pkts)
1736         self.pg_enable_capture(self.pg_interfaces)
1737         self.pg_start()
1738         capture = self.pg1.get_capture(len(pkts))
1739         self.verify_capture_in(capture, self.pg1)
1740
1741     def test_inside_overlapping_interfaces(self):
1742         """NAT44EI multiple inside interfaces with overlapping address space"""
1743
1744         static_nat_ip = "10.0.0.10"
1745         self.nat44_add_address(self.nat_addr)
1746         flags = self.config_flags.NAT44_EI_IF_INSIDE
1747         self.vapi.nat44_ei_interface_add_del_feature(
1748             sw_if_index=self.pg3.sw_if_index, is_add=1
1749         )
1750         self.vapi.nat44_ei_interface_add_del_feature(
1751             sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
1752         )
1753         self.vapi.nat44_ei_interface_add_del_feature(
1754             sw_if_index=self.pg5.sw_if_index, flags=flags, is_add=1
1755         )
1756         self.vapi.nat44_ei_interface_add_del_feature(
1757             sw_if_index=self.pg6.sw_if_index, flags=flags, is_add=1
1758         )
1759         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip, vrf_id=20)
1760
1761         # between NAT44EI inside interfaces with same VRF (no translation)
1762         pkts = self.create_stream_in(self.pg4, self.pg5)
1763         self.pg4.add_stream(pkts)
1764         self.pg_enable_capture(self.pg_interfaces)
1765         self.pg_start()
1766         capture = self.pg5.get_capture(len(pkts))
1767         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1768
1769         # between NAT44EI inside interfaces with different VRF (hairpinning)
1770         p = (
1771             Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
1772             / IP(src=self.pg4.remote_ip4, dst=static_nat_ip)
1773             / TCP(sport=1234, dport=5678)
1774         )
1775         self.pg4.add_stream(p)
1776         self.pg_enable_capture(self.pg_interfaces)
1777         self.pg_start()
1778         capture = self.pg6.get_capture(1)
1779         p = capture[0]
1780         try:
1781             ip = p[IP]
1782             tcp = p[TCP]
1783             self.assertEqual(ip.src, self.nat_addr)
1784             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1785             self.assertNotEqual(tcp.sport, 1234)
1786             self.assertEqual(tcp.dport, 5678)
1787         except:
1788             self.logger.error(ppp("Unexpected or invalid packet:", p))
1789             raise
1790
1791         # in2out 1st interface
1792         pkts = self.create_stream_in(self.pg4, self.pg3)
1793         self.pg4.add_stream(pkts)
1794         self.pg_enable_capture(self.pg_interfaces)
1795         self.pg_start()
1796         capture = self.pg3.get_capture(len(pkts))
1797         self.verify_capture_out(capture)
1798
1799         # out2in 1st interface
1800         pkts = self.create_stream_out(self.pg3)
1801         self.pg3.add_stream(pkts)
1802         self.pg_enable_capture(self.pg_interfaces)
1803         self.pg_start()
1804         capture = self.pg4.get_capture(len(pkts))
1805         self.verify_capture_in(capture, self.pg4)
1806
1807         # in2out 2nd interface
1808         pkts = self.create_stream_in(self.pg5, self.pg3)
1809         self.pg5.add_stream(pkts)
1810         self.pg_enable_capture(self.pg_interfaces)
1811         self.pg_start()
1812         capture = self.pg3.get_capture(len(pkts))
1813         self.verify_capture_out(capture)
1814
1815         # out2in 2nd interface
1816         pkts = self.create_stream_out(self.pg3)
1817         self.pg3.add_stream(pkts)
1818         self.pg_enable_capture(self.pg_interfaces)
1819         self.pg_start()
1820         capture = self.pg5.get_capture(len(pkts))
1821         self.verify_capture_in(capture, self.pg5)
1822
1823         # pg5 session dump
1824         addresses = self.vapi.nat44_ei_address_dump()
1825         self.assertEqual(len(addresses), 1)
1826         sessions = self.vapi.nat44_ei_user_session_dump(self.pg5.remote_ip4, 10)
1827         self.assertEqual(len(sessions), 3)
1828         for session in sessions:
1829             self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1830             self.assertEqual(str(session.inside_ip_address), self.pg5.remote_ip4)
1831             self.assertEqual(session.outside_ip_address, addresses[0].ip_address)
1832         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1833         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1834         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1835         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1836         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1837         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1838         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1839         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1840         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1841
1842         # in2out 3rd interface
1843         pkts = self.create_stream_in(self.pg6, self.pg3)
1844         self.pg6.add_stream(pkts)
1845         self.pg_enable_capture(self.pg_interfaces)
1846         self.pg_start()
1847         capture = self.pg3.get_capture(len(pkts))
1848         self.verify_capture_out(capture, static_nat_ip, True)
1849
1850         # out2in 3rd interface
1851         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1852         self.pg3.add_stream(pkts)
1853         self.pg_enable_capture(self.pg_interfaces)
1854         self.pg_start()
1855         capture = self.pg6.get_capture(len(pkts))
1856         self.verify_capture_in(capture, self.pg6)
1857
1858         # general user and session dump verifications
1859         users = self.vapi.nat44_ei_user_dump()
1860         self.assertGreaterEqual(len(users), 3)
1861         addresses = self.vapi.nat44_ei_address_dump()
1862         self.assertEqual(len(addresses), 1)
1863         for user in users:
1864             sessions = self.vapi.nat44_ei_user_session_dump(
1865                 user.ip_address, user.vrf_id
1866             )
1867             for session in sessions:
1868                 self.assertEqual(user.ip_address, session.inside_ip_address)
1869                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1870                 self.assertTrue(
1871                     session.protocol in [IP_PROTOS.tcp, IP_PROTOS.udp, IP_PROTOS.icmp]
1872                 )
1873
1874         # pg4 session dump
1875         sessions = self.vapi.nat44_ei_user_session_dump(self.pg4.remote_ip4, 10)
1876         self.assertGreaterEqual(len(sessions), 4)
1877         for session in sessions:
1878             self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1879             self.assertEqual(str(session.inside_ip_address), self.pg4.remote_ip4)
1880             self.assertEqual(session.outside_ip_address, addresses[0].ip_address)
1881
1882         # pg6 session dump
1883         sessions = self.vapi.nat44_ei_user_session_dump(self.pg6.remote_ip4, 20)
1884         self.assertGreaterEqual(len(sessions), 3)
1885         for session in sessions:
1886             self.assertTrue(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1887             self.assertEqual(str(session.inside_ip_address), self.pg6.remote_ip4)
1888             self.assertEqual(str(session.outside_ip_address), static_nat_ip)
1889             self.assertTrue(
1890                 session.inside_port
1891                 in [self.tcp_port_in, self.udp_port_in, self.icmp_id_in]
1892             )
1893
1894     def test_hairpinning(self):
1895         """NAT44EI hairpinning - 1:1 NAPT"""
1896
1897         host = self.pg0.remote_hosts[0]
1898         server = self.pg0.remote_hosts[1]
1899         host_in_port = 1234
1900         host_out_port = 0
1901         server_in_port = 5678
1902         server_out_port = 8765
1903
1904         self.nat44_add_address(self.nat_addr)
1905         flags = self.config_flags.NAT44_EI_IF_INSIDE
1906         self.vapi.nat44_ei_interface_add_del_feature(
1907             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1908         )
1909         self.vapi.nat44_ei_interface_add_del_feature(
1910             sw_if_index=self.pg1.sw_if_index, is_add=1
1911         )
1912
1913         # add static mapping for server
1914         self.nat44_add_static_mapping(
1915             server.ip4,
1916             self.nat_addr,
1917             server_in_port,
1918             server_out_port,
1919             proto=IP_PROTOS.tcp,
1920         )
1921
1922         cnt = self.statistics["/nat44-ei/hairpinning"]
1923         # send packet from host to server
1924         p = (
1925             Ether(src=host.mac, dst=self.pg0.local_mac)
1926             / IP(src=host.ip4, dst=self.nat_addr)
1927             / TCP(sport=host_in_port, dport=server_out_port)
1928         )
1929         self.pg0.add_stream(p)
1930         self.pg_enable_capture(self.pg_interfaces)
1931         self.pg_start()
1932         capture = self.pg0.get_capture(1)
1933         p = capture[0]
1934         try:
1935             ip = p[IP]
1936             tcp = p[TCP]
1937             self.assertEqual(ip.src, self.nat_addr)
1938             self.assertEqual(ip.dst, server.ip4)
1939             self.assertNotEqual(tcp.sport, host_in_port)
1940             self.assertEqual(tcp.dport, server_in_port)
1941             self.assert_packet_checksums_valid(p)
1942             host_out_port = tcp.sport
1943         except:
1944             self.logger.error(ppp("Unexpected or invalid packet:", p))
1945             raise
1946
1947         after = self.statistics["/nat44-ei/hairpinning"]
1948         if_idx = self.pg0.sw_if_index
1949         self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
1950
1951         # send reply from server to host
1952         p = (
1953             Ether(src=server.mac, dst=self.pg0.local_mac)
1954             / IP(src=server.ip4, dst=self.nat_addr)
1955             / TCP(sport=server_in_port, dport=host_out_port)
1956         )
1957         self.pg0.add_stream(p)
1958         self.pg_enable_capture(self.pg_interfaces)
1959         self.pg_start()
1960         capture = self.pg0.get_capture(1)
1961         p = capture[0]
1962         try:
1963             ip = p[IP]
1964             tcp = p[TCP]
1965             self.assertEqual(ip.src, self.nat_addr)
1966             self.assertEqual(ip.dst, host.ip4)
1967             self.assertEqual(tcp.sport, server_out_port)
1968             self.assertEqual(tcp.dport, host_in_port)
1969             self.assert_packet_checksums_valid(p)
1970         except:
1971             self.logger.error(ppp("Unexpected or invalid packet:", p))
1972             raise
1973
1974         after = self.statistics["/nat44-ei/hairpinning"]
1975         if_idx = self.pg0.sw_if_index
1976         self.assertEqual(
1977             after[:, if_idx].sum() - cnt[:, if_idx].sum(),
1978             2 + (1 if self.vpp_worker_count > 0 else 0),
1979         )
1980
1981     def test_hairpinning2(self):
1982         """NAT44EI hairpinning - 1:1 NAT"""
1983
1984         server1_nat_ip = "10.0.0.10"
1985         server2_nat_ip = "10.0.0.11"
1986         host = self.pg0.remote_hosts[0]
1987         server1 = self.pg0.remote_hosts[1]
1988         server2 = self.pg0.remote_hosts[2]
1989         server_tcp_port = 22
1990         server_udp_port = 20
1991
1992         self.nat44_add_address(self.nat_addr)
1993         flags = self.config_flags.NAT44_EI_IF_INSIDE
1994         self.vapi.nat44_ei_interface_add_del_feature(
1995             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1996         )
1997         self.vapi.nat44_ei_interface_add_del_feature(
1998             sw_if_index=self.pg1.sw_if_index, is_add=1
1999         )
2000
2001         # add static mapping for servers
2002         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2003         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2004
2005         # host to server1
2006         pkts = []
2007         p = (
2008             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2009             / IP(src=host.ip4, dst=server1_nat_ip)
2010             / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
2011         )
2012         pkts.append(p)
2013         p = (
2014             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2015             / IP(src=host.ip4, dst=server1_nat_ip)
2016             / UDP(sport=self.udp_port_in, dport=server_udp_port)
2017         )
2018         pkts.append(p)
2019         p = (
2020             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2021             / IP(src=host.ip4, dst=server1_nat_ip)
2022             / ICMP(id=self.icmp_id_in, type="echo-request")
2023         )
2024         pkts.append(p)
2025         self.pg0.add_stream(pkts)
2026         self.pg_enable_capture(self.pg_interfaces)
2027         self.pg_start()
2028         capture = self.pg0.get_capture(len(pkts))
2029         for packet in capture:
2030             try:
2031                 self.assertEqual(packet[IP].src, self.nat_addr)
2032                 self.assertEqual(packet[IP].dst, server1.ip4)
2033                 if packet.haslayer(TCP):
2034                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2035                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2036                     self.tcp_port_out = packet[TCP].sport
2037                     self.assert_packet_checksums_valid(packet)
2038                 elif packet.haslayer(UDP):
2039                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2040                     self.assertEqual(packet[UDP].dport, server_udp_port)
2041                     self.udp_port_out = packet[UDP].sport
2042                 else:
2043                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2044                     self.icmp_id_out = packet[ICMP].id
2045             except:
2046                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2047                 raise
2048
2049         # server1 to host
2050         pkts = []
2051         p = (
2052             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2053             / IP(src=server1.ip4, dst=self.nat_addr)
2054             / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
2055         )
2056         pkts.append(p)
2057         p = (
2058             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2059             / IP(src=server1.ip4, dst=self.nat_addr)
2060             / UDP(sport=server_udp_port, dport=self.udp_port_out)
2061         )
2062         pkts.append(p)
2063         p = (
2064             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2065             / IP(src=server1.ip4, dst=self.nat_addr)
2066             / ICMP(id=self.icmp_id_out, type="echo-reply")
2067         )
2068         pkts.append(p)
2069         self.pg0.add_stream(pkts)
2070         self.pg_enable_capture(self.pg_interfaces)
2071         self.pg_start()
2072         capture = self.pg0.get_capture(len(pkts))
2073         for packet in capture:
2074             try:
2075                 self.assertEqual(packet[IP].src, server1_nat_ip)
2076                 self.assertEqual(packet[IP].dst, host.ip4)
2077                 if packet.haslayer(TCP):
2078                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2079                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2080                     self.assert_packet_checksums_valid(packet)
2081                 elif packet.haslayer(UDP):
2082                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2083                     self.assertEqual(packet[UDP].sport, server_udp_port)
2084                 else:
2085                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2086             except:
2087                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2088                 raise
2089
2090         # server2 to server1
2091         pkts = []
2092         p = (
2093             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2094             / IP(src=server2.ip4, dst=server1_nat_ip)
2095             / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
2096         )
2097         pkts.append(p)
2098         p = (
2099             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2100             / IP(src=server2.ip4, dst=server1_nat_ip)
2101             / UDP(sport=self.udp_port_in, dport=server_udp_port)
2102         )
2103         pkts.append(p)
2104         p = (
2105             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2106             / IP(src=server2.ip4, dst=server1_nat_ip)
2107             / ICMP(id=self.icmp_id_in, type="echo-request")
2108         )
2109         pkts.append(p)
2110         self.pg0.add_stream(pkts)
2111         self.pg_enable_capture(self.pg_interfaces)
2112         self.pg_start()
2113         capture = self.pg0.get_capture(len(pkts))
2114         for packet in capture:
2115             try:
2116                 self.assertEqual(packet[IP].src, server2_nat_ip)
2117                 self.assertEqual(packet[IP].dst, server1.ip4)
2118                 if packet.haslayer(TCP):
2119                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2120                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2121                     self.tcp_port_out = packet[TCP].sport
2122                     self.assert_packet_checksums_valid(packet)
2123                 elif packet.haslayer(UDP):
2124                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
2125                     self.assertEqual(packet[UDP].dport, server_udp_port)
2126                     self.udp_port_out = packet[UDP].sport
2127                 else:
2128                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2129                     self.icmp_id_out = packet[ICMP].id
2130             except:
2131                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2132                 raise
2133
2134         # server1 to server2
2135         pkts = []
2136         p = (
2137             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2138             / IP(src=server1.ip4, dst=server2_nat_ip)
2139             / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
2140         )
2141         pkts.append(p)
2142         p = (
2143             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2144             / IP(src=server1.ip4, dst=server2_nat_ip)
2145             / UDP(sport=server_udp_port, dport=self.udp_port_out)
2146         )
2147         pkts.append(p)
2148         p = (
2149             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2150             / IP(src=server1.ip4, dst=server2_nat_ip)
2151             / ICMP(id=self.icmp_id_out, type="echo-reply")
2152         )
2153         pkts.append(p)
2154         self.pg0.add_stream(pkts)
2155         self.pg_enable_capture(self.pg_interfaces)
2156         self.pg_start()
2157         capture = self.pg0.get_capture(len(pkts))
2158         for packet in capture:
2159             try:
2160                 self.assertEqual(packet[IP].src, server1_nat_ip)
2161                 self.assertEqual(packet[IP].dst, server2.ip4)
2162                 if packet.haslayer(TCP):
2163                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2164                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2165                     self.assert_packet_checksums_valid(packet)
2166                 elif packet.haslayer(UDP):
2167                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2168                     self.assertEqual(packet[UDP].sport, server_udp_port)
2169                 else:
2170                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2171             except:
2172                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2173                 raise
2174
2175     def test_hairpinning_avoid_inf_loop(self):
2176         """NAT44EI hairpinning - 1:1 NAPT avoid infinite loop"""
2177
2178         host = self.pg0.remote_hosts[0]
2179         server = self.pg0.remote_hosts[1]
2180         host_in_port = 1234
2181         host_out_port = 0
2182         server_in_port = 5678
2183         server_out_port = 8765
2184
2185         self.nat44_add_address(self.nat_addr)
2186         flags = self.config_flags.NAT44_EI_IF_INSIDE
2187         self.vapi.nat44_ei_interface_add_del_feature(
2188             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2189         )
2190         self.vapi.nat44_ei_interface_add_del_feature(
2191             sw_if_index=self.pg1.sw_if_index, is_add=1
2192         )
2193
2194         # add static mapping for server
2195         self.nat44_add_static_mapping(
2196             server.ip4,
2197             self.nat_addr,
2198             server_in_port,
2199             server_out_port,
2200             proto=IP_PROTOS.tcp,
2201         )
2202
2203         # add another static mapping that maps pg0.local_ip4 address to itself
2204         self.nat44_add_static_mapping(self.pg0.local_ip4, self.pg0.local_ip4)
2205
2206         # send packet from host to VPP (the packet should get dropped)
2207         p = (
2208             Ether(src=host.mac, dst=self.pg0.local_mac)
2209             / IP(src=host.ip4, dst=self.pg0.local_ip4)
2210             / TCP(sport=host_in_port, dport=server_out_port)
2211         )
2212         self.pg0.add_stream(p)
2213         self.pg_enable_capture(self.pg_interfaces)
2214         self.pg_start()
2215         # Here VPP used to crash due to an infinite loop
2216
2217         cnt = self.statistics["/nat44-ei/hairpinning"]
2218         # send packet from host to server
2219         p = (
2220             Ether(src=host.mac, dst=self.pg0.local_mac)
2221             / IP(src=host.ip4, dst=self.nat_addr)
2222             / TCP(sport=host_in_port, dport=server_out_port)
2223         )
2224         self.pg0.add_stream(p)
2225         self.pg_enable_capture(self.pg_interfaces)
2226         self.pg_start()
2227         capture = self.pg0.get_capture(1)
2228         p = capture[0]
2229         try:
2230             ip = p[IP]
2231             tcp = p[TCP]
2232             self.assertEqual(ip.src, self.nat_addr)
2233             self.assertEqual(ip.dst, server.ip4)
2234             self.assertNotEqual(tcp.sport, host_in_port)
2235             self.assertEqual(tcp.dport, server_in_port)
2236             self.assert_packet_checksums_valid(p)
2237             host_out_port = tcp.sport
2238         except:
2239             self.logger.error(ppp("Unexpected or invalid packet:", p))
2240             raise
2241
2242         after = self.statistics["/nat44-ei/hairpinning"]
2243         if_idx = self.pg0.sw_if_index
2244         self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
2245
2246         # send reply from server to host
2247         p = (
2248             Ether(src=server.mac, dst=self.pg0.local_mac)
2249             / IP(src=server.ip4, dst=self.nat_addr)
2250             / TCP(sport=server_in_port, dport=host_out_port)
2251         )
2252         self.pg0.add_stream(p)
2253         self.pg_enable_capture(self.pg_interfaces)
2254         self.pg_start()
2255         capture = self.pg0.get_capture(1)
2256         p = capture[0]
2257         try:
2258             ip = p[IP]
2259             tcp = p[TCP]
2260             self.assertEqual(ip.src, self.nat_addr)
2261             self.assertEqual(ip.dst, host.ip4)
2262             self.assertEqual(tcp.sport, server_out_port)
2263             self.assertEqual(tcp.dport, host_in_port)
2264             self.assert_packet_checksums_valid(p)
2265         except:
2266             self.logger.error(ppp("Unexpected or invalid packet:", p))
2267             raise
2268
2269         after = self.statistics["/nat44-ei/hairpinning"]
2270         if_idx = self.pg0.sw_if_index
2271         self.assertEqual(
2272             after[:, if_idx].sum() - cnt[:, if_idx].sum(),
2273             2 + (1 if self.vpp_worker_count > 0 else 0),
2274         )
2275
2276     def test_interface_addr(self):
2277         """NAT44EI acquire addresses from interface"""
2278         self.vapi.nat44_ei_add_del_interface_addr(
2279             is_add=1, sw_if_index=self.pg7.sw_if_index
2280         )
2281
2282         # no address in NAT pool
2283         addresses = self.vapi.nat44_ei_address_dump()
2284         self.assertEqual(0, len(addresses))
2285
2286         # configure interface address and check NAT address pool
2287         self.pg7.config_ip4()
2288         addresses = self.vapi.nat44_ei_address_dump()
2289         self.assertEqual(1, len(addresses))
2290         self.assertEqual(str(addresses[0].ip_address), self.pg7.local_ip4)
2291
2292         # remove interface address and check NAT address pool
2293         self.pg7.unconfig_ip4()
2294         addresses = self.vapi.nat44_ei_address_dump()
2295         self.assertEqual(0, len(addresses))
2296
2297     def test_interface_addr_static_mapping(self):
2298         """NAT44EI Static mapping with addresses from interface"""
2299         tag = "testTAG"
2300
2301         self.vapi.nat44_ei_add_del_interface_addr(
2302             is_add=1, sw_if_index=self.pg7.sw_if_index
2303         )
2304         self.nat44_add_static_mapping(
2305             "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag
2306         )
2307
2308         # static mappings with external interface
2309         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2310         self.assertEqual(1, len(static_mappings))
2311         self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index)
2312         self.assertEqual(static_mappings[0].tag, tag)
2313
2314         # configure interface address and check static mappings
2315         self.pg7.config_ip4()
2316         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2317         self.assertEqual(2, len(static_mappings))
2318         resolved = False
2319         for sm in static_mappings:
2320             if sm.external_sw_if_index == 0xFFFFFFFF:
2321                 self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4)
2322                 self.assertEqual(sm.tag, tag)
2323                 resolved = True
2324         self.assertTrue(resolved)
2325
2326         # remove interface address and check static mappings
2327         self.pg7.unconfig_ip4()
2328         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2329         self.assertEqual(1, len(static_mappings))
2330         self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index)
2331         self.assertEqual(static_mappings[0].tag, tag)
2332
2333         # configure interface address again and check static mappings
2334         self.pg7.config_ip4()
2335         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2336         self.assertEqual(2, len(static_mappings))
2337         resolved = False
2338         for sm in static_mappings:
2339             if sm.external_sw_if_index == 0xFFFFFFFF:
2340                 self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4)
2341                 self.assertEqual(sm.tag, tag)
2342                 resolved = True
2343         self.assertTrue(resolved)
2344
2345         # remove static mapping
2346         self.nat44_add_static_mapping(
2347             "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag, is_add=0
2348         )
2349         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2350         self.assertEqual(0, len(static_mappings))
2351
2352     def test_interface_addr_identity_nat(self):
2353         """NAT44EI Identity NAT with addresses from interface"""
2354
2355         port = 53053
2356         self.vapi.nat44_ei_add_del_interface_addr(
2357             is_add=1, sw_if_index=self.pg7.sw_if_index
2358         )
2359         self.vapi.nat44_ei_add_del_identity_mapping(
2360             ip_address=b"0",
2361             sw_if_index=self.pg7.sw_if_index,
2362             port=port,
2363             protocol=IP_PROTOS.tcp,
2364             is_add=1,
2365         )
2366
2367         # identity mappings with external interface
2368         identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2369         self.assertEqual(1, len(identity_mappings))
2370         self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index)
2371
2372         # configure interface address and check identity mappings
2373         self.pg7.config_ip4()
2374         identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2375         resolved = False
2376         self.assertEqual(2, len(identity_mappings))
2377         for sm in identity_mappings:
2378             if sm.sw_if_index == 0xFFFFFFFF:
2379                 self.assertEqual(
2380                     str(identity_mappings[0].ip_address), self.pg7.local_ip4
2381                 )
2382                 self.assertEqual(port, identity_mappings[0].port)
2383                 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2384                 resolved = True
2385         self.assertTrue(resolved)
2386
2387         # remove interface address and check identity mappings
2388         self.pg7.unconfig_ip4()
2389         identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2390         self.assertEqual(1, len(identity_mappings))
2391         self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index)
2392
2393     def test_ipfix_nat44_sess(self):
2394         """NAT44EI IPFIX logging NAT44EI session created/deleted"""
2395         self.ipfix_domain_id = 10
2396         self.ipfix_src_port = 20202
2397         collector_port = 30303
2398         bind_layers(UDP, IPFIX, dport=30303)
2399         self.nat44_add_address(self.nat_addr)
2400         flags = self.config_flags.NAT44_EI_IF_INSIDE
2401         self.vapi.nat44_ei_interface_add_del_feature(
2402             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2403         )
2404         self.vapi.nat44_ei_interface_add_del_feature(
2405             sw_if_index=self.pg1.sw_if_index, is_add=1
2406         )
2407         self.vapi.set_ipfix_exporter(
2408             collector_address=self.pg3.remote_ip4,
2409             src_address=self.pg3.local_ip4,
2410             path_mtu=512,
2411             template_interval=10,
2412             collector_port=collector_port,
2413         )
2414         self.vapi.nat44_ei_ipfix_enable_disable(
2415             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2416         )
2417
2418         pkts = self.create_stream_in(self.pg0, self.pg1)
2419         self.pg0.add_stream(pkts)
2420         self.pg_enable_capture(self.pg_interfaces)
2421         self.pg_start()
2422         capture = self.pg1.get_capture(len(pkts))
2423         self.verify_capture_out(capture)
2424         self.nat44_add_address(self.nat_addr, is_add=0)
2425         self.vapi.ipfix_flush()
2426         capture = self.pg3.get_capture(7)
2427         ipfix = IPFIXDecoder()
2428         # first load template
2429         for p in capture:
2430             self.assertTrue(p.haslayer(IPFIX))
2431             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2432             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2433             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2434             self.assertEqual(p[UDP].dport, collector_port)
2435             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2436             if p.haslayer(Template):
2437                 ipfix.add_template(p.getlayer(Template))
2438         # verify events in data set
2439         for p in capture:
2440             if p.haslayer(Data):
2441                 data = ipfix.decode_data_set(p.getlayer(Set))
2442                 self.verify_ipfix_nat44_ses(data)
2443
2444     def test_ipfix_addr_exhausted(self):
2445         """NAT44EI IPFIX logging NAT addresses exhausted"""
2446         flags = self.config_flags.NAT44_EI_IF_INSIDE
2447         self.vapi.nat44_ei_interface_add_del_feature(
2448             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2449         )
2450         self.vapi.nat44_ei_interface_add_del_feature(
2451             sw_if_index=self.pg1.sw_if_index, is_add=1
2452         )
2453         self.vapi.set_ipfix_exporter(
2454             collector_address=self.pg3.remote_ip4,
2455             src_address=self.pg3.local_ip4,
2456             path_mtu=512,
2457             template_interval=10,
2458         )
2459         self.vapi.nat44_ei_ipfix_enable_disable(
2460             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2461         )
2462
2463         p = (
2464             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2465             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2466             / TCP(sport=3025)
2467         )
2468         self.pg0.add_stream(p)
2469         self.pg_enable_capture(self.pg_interfaces)
2470         self.pg_start()
2471         self.pg1.assert_nothing_captured()
2472         self.vapi.ipfix_flush()
2473         capture = self.pg3.get_capture(7)
2474         ipfix = IPFIXDecoder()
2475         # first load template
2476         for p in capture:
2477             self.assertTrue(p.haslayer(IPFIX))
2478             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2479             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2480             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2481             self.assertEqual(p[UDP].dport, 4739)
2482             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2483             if p.haslayer(Template):
2484                 ipfix.add_template(p.getlayer(Template))
2485         # verify events in data set
2486         for p in capture:
2487             if p.haslayer(Data):
2488                 data = ipfix.decode_data_set(p.getlayer(Set))
2489                 self.verify_ipfix_addr_exhausted(data)
2490
2491     def test_ipfix_max_sessions(self):
2492         """NAT44EI IPFIX logging maximum session entries exceeded"""
2493         self.nat44_add_address(self.nat_addr)
2494         flags = self.config_flags.NAT44_EI_IF_INSIDE
2495         self.vapi.nat44_ei_interface_add_del_feature(
2496             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2497         )
2498         self.vapi.nat44_ei_interface_add_del_feature(
2499             sw_if_index=self.pg1.sw_if_index, is_add=1
2500         )
2501
2502         max_sessions_per_thread = self.max_translations
2503         max_sessions = max(1, self.vpp_worker_count) * max_sessions_per_thread
2504
2505         pkts = []
2506         for i in range(0, max_sessions):
2507             src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2508             p = (
2509                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2510                 / IP(src=src, dst=self.pg1.remote_ip4)
2511                 / TCP(sport=1025)
2512             )
2513             pkts.append(p)
2514         self.pg0.add_stream(pkts)
2515         self.pg_enable_capture(self.pg_interfaces)
2516         self.pg_start()
2517
2518         self.pg1.get_capture(max_sessions)
2519         self.vapi.set_ipfix_exporter(
2520             collector_address=self.pg3.remote_ip4,
2521             src_address=self.pg3.local_ip4,
2522             path_mtu=512,
2523             template_interval=10,
2524         )
2525         self.vapi.nat44_ei_ipfix_enable_disable(
2526             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2527         )
2528
2529         p = (
2530             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2531             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2532             / TCP(sport=1025)
2533         )
2534         self.pg0.add_stream(p)
2535         self.pg_enable_capture(self.pg_interfaces)
2536         self.pg_start()
2537         self.pg1.assert_nothing_captured()
2538         self.vapi.ipfix_flush()
2539         capture = self.pg3.get_capture(7)
2540         ipfix = IPFIXDecoder()
2541         # first load template
2542         for p in capture:
2543             self.assertTrue(p.haslayer(IPFIX))
2544             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2545             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2546             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2547             self.assertEqual(p[UDP].dport, 4739)
2548             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2549             if p.haslayer(Template):
2550                 ipfix.add_template(p.getlayer(Template))
2551         # verify events in data set
2552         for p in capture:
2553             if p.haslayer(Data):
2554                 data = ipfix.decode_data_set(p.getlayer(Set))
2555                 self.verify_ipfix_max_sessions(data, max_sessions_per_thread)
2556
2557     def test_syslog_apmap(self):
2558         """NAT44EI syslog address and port mapping creation and deletion"""
2559         self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2560         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2561         self.nat44_add_address(self.nat_addr)
2562         flags = self.config_flags.NAT44_EI_IF_INSIDE
2563         self.vapi.nat44_ei_interface_add_del_feature(
2564             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2565         )
2566         self.vapi.nat44_ei_interface_add_del_feature(
2567             sw_if_index=self.pg1.sw_if_index, is_add=1
2568         )
2569
2570         p = (
2571             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2572             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2573             / TCP(sport=self.tcp_port_in, dport=20)
2574         )
2575         self.pg0.add_stream(p)
2576         self.pg_enable_capture(self.pg_interfaces)
2577         self.pg_start()
2578         capture = self.pg1.get_capture(1)
2579         self.tcp_port_out = capture[0][TCP].sport
2580         capture = self.pg3.get_capture(1)
2581         self.verify_syslog_apmap(capture[0][Raw].load)
2582
2583         self.pg_enable_capture(self.pg_interfaces)
2584         self.pg_start()
2585         self.nat44_add_address(self.nat_addr, is_add=0)
2586         capture = self.pg3.get_capture(1)
2587         self.verify_syslog_apmap(capture[0][Raw].load, False)
2588
2589     def test_pool_addr_fib(self):
2590         """NAT44EI add pool addresses to FIB"""
2591         static_addr = "10.0.0.10"
2592         self.nat44_add_address(self.nat_addr)
2593         flags = self.config_flags.NAT44_EI_IF_INSIDE
2594         self.vapi.nat44_ei_interface_add_del_feature(
2595             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2596         )
2597         self.vapi.nat44_ei_interface_add_del_feature(
2598             sw_if_index=self.pg1.sw_if_index, is_add=1
2599         )
2600         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2601
2602         # NAT44EI address
2603         p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2604             op=ARP.who_has,
2605             pdst=self.nat_addr,
2606             psrc=self.pg1.remote_ip4,
2607             hwsrc=self.pg1.remote_mac,
2608         )
2609         self.pg1.add_stream(p)
2610         self.pg_enable_capture(self.pg_interfaces)
2611         self.pg_start()
2612         capture = self.pg1.get_capture(1)
2613         self.assertTrue(capture[0].haslayer(ARP))
2614         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2615
2616         # 1:1 NAT address
2617         p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2618             op=ARP.who_has,
2619             pdst=static_addr,
2620             psrc=self.pg1.remote_ip4,
2621             hwsrc=self.pg1.remote_mac,
2622         )
2623         self.pg1.add_stream(p)
2624         self.pg_enable_capture(self.pg_interfaces)
2625         self.pg_start()
2626         capture = self.pg1.get_capture(1)
2627         self.assertTrue(capture[0].haslayer(ARP))
2628         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2629
2630         # send ARP to non-NAT44EI interface
2631         p = Ether(src=self.pg2.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2632             op=ARP.who_has,
2633             pdst=self.nat_addr,
2634             psrc=self.pg2.remote_ip4,
2635             hwsrc=self.pg2.remote_mac,
2636         )
2637         self.pg2.add_stream(p)
2638         self.pg_enable_capture(self.pg_interfaces)
2639         self.pg_start()
2640         self.pg1.assert_nothing_captured()
2641
2642         # remove addresses and verify
2643         self.nat44_add_address(self.nat_addr, is_add=0)
2644         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr, is_add=0)
2645
2646         p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2647             op=ARP.who_has,
2648             pdst=self.nat_addr,
2649             psrc=self.pg1.remote_ip4,
2650             hwsrc=self.pg1.remote_mac,
2651         )
2652         self.pg1.add_stream(p)
2653         self.pg_enable_capture(self.pg_interfaces)
2654         self.pg_start()
2655         self.pg1.assert_nothing_captured()
2656
2657         p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2658             op=ARP.who_has,
2659             pdst=static_addr,
2660             psrc=self.pg1.remote_ip4,
2661             hwsrc=self.pg1.remote_mac,
2662         )
2663         self.pg1.add_stream(p)
2664         self.pg_enable_capture(self.pg_interfaces)
2665         self.pg_start()
2666         self.pg1.assert_nothing_captured()
2667
2668     def test_vrf_mode(self):
2669         """NAT44EI tenant VRF aware address pool mode"""
2670
2671         vrf_id1 = 1
2672         vrf_id2 = 2
2673         nat_ip1 = "10.0.0.10"
2674         nat_ip2 = "10.0.0.11"
2675
2676         self.pg0.unconfig_ip4()
2677         self.pg1.unconfig_ip4()
2678         self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1})
2679         self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2})
2680         self.pg0.set_table_ip4(vrf_id1)
2681         self.pg1.set_table_ip4(vrf_id2)
2682         self.pg0.config_ip4()
2683         self.pg1.config_ip4()
2684         self.pg0.resolve_arp()
2685         self.pg1.resolve_arp()
2686
2687         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2688         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2689         flags = self.config_flags.NAT44_EI_IF_INSIDE
2690         self.vapi.nat44_ei_interface_add_del_feature(
2691             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2692         )
2693         self.vapi.nat44_ei_interface_add_del_feature(
2694             sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
2695         )
2696         self.vapi.nat44_ei_interface_add_del_feature(
2697             sw_if_index=self.pg2.sw_if_index, is_add=1
2698         )
2699
2700         try:
2701             # first VRF
2702             pkts = self.create_stream_in(self.pg0, self.pg2)
2703             self.pg0.add_stream(pkts)
2704             self.pg_enable_capture(self.pg_interfaces)
2705             self.pg_start()
2706             capture = self.pg2.get_capture(len(pkts))
2707             self.verify_capture_out(capture, nat_ip1)
2708
2709             # second VRF
2710             pkts = self.create_stream_in(self.pg1, self.pg2)
2711             self.pg1.add_stream(pkts)
2712             self.pg_enable_capture(self.pg_interfaces)
2713             self.pg_start()
2714             capture = self.pg2.get_capture(len(pkts))
2715             self.verify_capture_out(capture, nat_ip2)
2716
2717         finally:
2718             self.pg0.unconfig_ip4()
2719             self.pg1.unconfig_ip4()
2720             self.pg0.set_table_ip4(0)
2721             self.pg1.set_table_ip4(0)
2722             self.pg0.config_ip4()
2723             self.pg1.config_ip4()
2724             self.pg0.resolve_arp()
2725             self.pg1.resolve_arp()
2726             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id1})
2727             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id2})
2728
2729     def test_vrf_feature_independent(self):
2730         """NAT44EI tenant VRF independent address pool mode"""
2731
2732         nat_ip1 = "10.0.0.10"
2733         nat_ip2 = "10.0.0.11"
2734
2735         self.nat44_add_address(nat_ip1)
2736         self.nat44_add_address(nat_ip2, vrf_id=99)
2737         flags = self.config_flags.NAT44_EI_IF_INSIDE
2738         self.vapi.nat44_ei_interface_add_del_feature(
2739             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2740         )
2741         self.vapi.nat44_ei_interface_add_del_feature(
2742             sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
2743         )
2744         self.vapi.nat44_ei_interface_add_del_feature(
2745             sw_if_index=self.pg2.sw_if_index, is_add=1
2746         )
2747
2748         # first VRF
2749         pkts = self.create_stream_in(self.pg0, self.pg2)
2750         self.pg0.add_stream(pkts)
2751         self.pg_enable_capture(self.pg_interfaces)
2752         self.pg_start()
2753         capture = self.pg2.get_capture(len(pkts))
2754         self.verify_capture_out(capture, nat_ip1)
2755
2756         # second VRF
2757         pkts = self.create_stream_in(self.pg1, self.pg2)
2758         self.pg1.add_stream(pkts)
2759         self.pg_enable_capture(self.pg_interfaces)
2760         self.pg_start()
2761         capture = self.pg2.get_capture(len(pkts))
2762         self.verify_capture_out(capture, nat_ip1)
2763
2764     def test_dynamic_ipless_interfaces(self):
2765         """NAT44EI interfaces without configured IP address"""
2766         self.create_routes_and_neigbors()
2767         self.nat44_add_address(self.nat_addr)
2768         flags = self.config_flags.NAT44_EI_IF_INSIDE
2769         self.vapi.nat44_ei_interface_add_del_feature(
2770             sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2771         )
2772         self.vapi.nat44_ei_interface_add_del_feature(
2773             sw_if_index=self.pg8.sw_if_index, is_add=1
2774         )
2775
2776         # in2out
2777         pkts = self.create_stream_in(self.pg7, self.pg8)
2778         self.pg7.add_stream(pkts)
2779         self.pg_enable_capture(self.pg_interfaces)
2780         self.pg_start()
2781         capture = self.pg8.get_capture(len(pkts))
2782         self.verify_capture_out(capture)
2783
2784         # out2in
2785         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2786         self.pg8.add_stream(pkts)
2787         self.pg_enable_capture(self.pg_interfaces)
2788         self.pg_start()
2789         capture = self.pg7.get_capture(len(pkts))
2790         self.verify_capture_in(capture, self.pg7)
2791
2792     def test_static_ipless_interfaces(self):
2793         """NAT44EI interfaces without configured IP address - 1:1 NAT"""
2794
2795         self.create_routes_and_neigbors()
2796         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2797         flags = self.config_flags.NAT44_EI_IF_INSIDE
2798         self.vapi.nat44_ei_interface_add_del_feature(
2799             sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2800         )
2801         self.vapi.nat44_ei_interface_add_del_feature(
2802             sw_if_index=self.pg8.sw_if_index, is_add=1
2803         )
2804
2805         # out2in
2806         pkts = self.create_stream_out(self.pg8)
2807         self.pg8.add_stream(pkts)
2808         self.pg_enable_capture(self.pg_interfaces)
2809         self.pg_start()
2810         capture = self.pg7.get_capture(len(pkts))
2811         self.verify_capture_in(capture, self.pg7)
2812
2813         # in2out
2814         pkts = self.create_stream_in(self.pg7, self.pg8)
2815         self.pg7.add_stream(pkts)
2816         self.pg_enable_capture(self.pg_interfaces)
2817         self.pg_start()
2818         capture = self.pg8.get_capture(len(pkts))
2819         self.verify_capture_out(capture, self.nat_addr, True)
2820
2821     def test_static_with_port_ipless_interfaces(self):
2822         """NAT44EI interfaces without configured IP address - 1:1 NAPT"""
2823
2824         self.tcp_port_out = 30606
2825         self.udp_port_out = 30607
2826         self.icmp_id_out = 30608
2827
2828         self.create_routes_and_neigbors()
2829         self.nat44_add_address(self.nat_addr)
2830         self.nat44_add_static_mapping(
2831             self.pg7.remote_ip4,
2832             self.nat_addr,
2833             self.tcp_port_in,
2834             self.tcp_port_out,
2835             proto=IP_PROTOS.tcp,
2836         )
2837         self.nat44_add_static_mapping(
2838             self.pg7.remote_ip4,
2839             self.nat_addr,
2840             self.udp_port_in,
2841             self.udp_port_out,
2842             proto=IP_PROTOS.udp,
2843         )
2844         self.nat44_add_static_mapping(
2845             self.pg7.remote_ip4,
2846             self.nat_addr,
2847             self.icmp_id_in,
2848             self.icmp_id_out,
2849             proto=IP_PROTOS.icmp,
2850         )
2851         flags = self.config_flags.NAT44_EI_IF_INSIDE
2852         self.vapi.nat44_ei_interface_add_del_feature(
2853             sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2854         )
2855         self.vapi.nat44_ei_interface_add_del_feature(
2856             sw_if_index=self.pg8.sw_if_index, is_add=1
2857         )
2858
2859         # out2in
2860         pkts = self.create_stream_out(self.pg8)
2861         self.pg8.add_stream(pkts)
2862         self.pg_enable_capture(self.pg_interfaces)
2863         self.pg_start()
2864         capture = self.pg7.get_capture(len(pkts))
2865         self.verify_capture_in(capture, self.pg7)
2866
2867         # in2out
2868         pkts = self.create_stream_in(self.pg7, self.pg8)
2869         self.pg7.add_stream(pkts)
2870         self.pg_enable_capture(self.pg_interfaces)
2871         self.pg_start()
2872         capture = self.pg8.get_capture(len(pkts))
2873         self.verify_capture_out(capture)
2874
2875     def test_static_unknown_proto(self):
2876         """NAT44EI 1:1 translate packet with unknown protocol"""
2877         nat_ip = "10.0.0.10"
2878         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2879         flags = self.config_flags.NAT44_EI_IF_INSIDE
2880         self.vapi.nat44_ei_interface_add_del_feature(
2881             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2882         )
2883         self.vapi.nat44_ei_interface_add_del_feature(
2884             sw_if_index=self.pg1.sw_if_index, is_add=1
2885         )
2886
2887         # in2out
2888         p = (
2889             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2890             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2891             / GRE()
2892             / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4)
2893             / TCP(sport=1234, dport=1234)
2894         )
2895         self.pg0.add_stream(p)
2896         self.pg_enable_capture(self.pg_interfaces)
2897         self.pg_start()
2898         p = self.pg1.get_capture(1)
2899         packet = p[0]
2900         try:
2901             self.assertEqual(packet[IP].src, nat_ip)
2902             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2903             self.assertEqual(packet.haslayer(GRE), 1)
2904             self.assert_packet_checksums_valid(packet)
2905         except:
2906             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2907             raise
2908
2909         # out2in
2910         p = (
2911             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2912             / IP(src=self.pg1.remote_ip4, dst=nat_ip)
2913             / GRE()
2914             / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4)
2915             / TCP(sport=1234, dport=1234)
2916         )
2917         self.pg1.add_stream(p)
2918         self.pg_enable_capture(self.pg_interfaces)
2919         self.pg_start()
2920         p = self.pg0.get_capture(1)
2921         packet = p[0]
2922         try:
2923             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2924             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2925             self.assertEqual(packet.haslayer(GRE), 1)
2926             self.assert_packet_checksums_valid(packet)
2927         except:
2928             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2929             raise
2930
2931     def test_hairpinning_static_unknown_proto(self):
2932         """NAT44EI 1:1 translate packet with unknown protocol - hairpinning"""
2933
2934         host = self.pg0.remote_hosts[0]
2935         server = self.pg0.remote_hosts[1]
2936
2937         host_nat_ip = "10.0.0.10"
2938         server_nat_ip = "10.0.0.11"
2939
2940         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2941         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2942         flags = self.config_flags.NAT44_EI_IF_INSIDE
2943         self.vapi.nat44_ei_interface_add_del_feature(
2944             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2945         )
2946         self.vapi.nat44_ei_interface_add_del_feature(
2947             sw_if_index=self.pg1.sw_if_index, is_add=1
2948         )
2949
2950         # host to server
2951         p = (
2952             Ether(dst=self.pg0.local_mac, src=host.mac)
2953             / IP(src=host.ip4, dst=server_nat_ip)
2954             / GRE()
2955             / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4)
2956             / TCP(sport=1234, dport=1234)
2957         )
2958         self.pg0.add_stream(p)
2959         self.pg_enable_capture(self.pg_interfaces)
2960         self.pg_start()
2961         p = self.pg0.get_capture(1)
2962         packet = p[0]
2963         try:
2964             self.assertEqual(packet[IP].src, host_nat_ip)
2965             self.assertEqual(packet[IP].dst, server.ip4)
2966             self.assertEqual(packet.haslayer(GRE), 1)
2967             self.assert_packet_checksums_valid(packet)
2968         except:
2969             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2970             raise
2971
2972         # server to host
2973         p = (
2974             Ether(dst=self.pg0.local_mac, src=server.mac)
2975             / IP(src=server.ip4, dst=host_nat_ip)
2976             / GRE()
2977             / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4)
2978             / TCP(sport=1234, dport=1234)
2979         )
2980         self.pg0.add_stream(p)
2981         self.pg_enable_capture(self.pg_interfaces)
2982         self.pg_start()
2983         p = self.pg0.get_capture(1)
2984         packet = p[0]
2985         try:
2986             self.assertEqual(packet[IP].src, server_nat_ip)
2987             self.assertEqual(packet[IP].dst, host.ip4)
2988             self.assertEqual(packet.haslayer(GRE), 1)
2989             self.assert_packet_checksums_valid(packet)
2990         except:
2991             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2992             raise
2993
2994     def test_output_feature(self):
2995         """NAT44EI output feature (in2out postrouting)"""
2996         self.nat44_add_address(self.nat_addr)
2997         self.vapi.nat44_ei_add_del_output_interface(
2998             sw_if_index=self.pg3.sw_if_index, is_add=1
2999         )
3000
3001         # in2out
3002         pkts = self.create_stream_in(self.pg0, self.pg3)
3003         self.pg0.add_stream(pkts)
3004         self.pg_enable_capture(self.pg_interfaces)
3005         self.pg_start()
3006         capture = self.pg3.get_capture(len(pkts))
3007         self.verify_capture_out(capture)
3008
3009         # out2in
3010         pkts = self.create_stream_out(self.pg3)
3011         self.pg3.add_stream(pkts)
3012         self.pg_enable_capture(self.pg_interfaces)
3013         self.pg_start()
3014         capture = self.pg0.get_capture(len(pkts))
3015         self.verify_capture_in(capture, self.pg0)
3016
3017         # from non-NAT interface to NAT inside interface
3018         pkts = self.create_stream_in(self.pg2, self.pg0)
3019         self.pg2.add_stream(pkts)
3020         self.pg_enable_capture(self.pg_interfaces)
3021         self.pg_start()
3022         capture = self.pg0.get_capture(len(pkts))
3023         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3024
3025     def test_output_feature_vrf_aware(self):
3026         """NAT44EI output feature VRF aware (in2out postrouting)"""
3027         nat_ip_vrf10 = "10.0.0.10"
3028         nat_ip_vrf20 = "10.0.0.20"
3029
3030         r1 = VppIpRoute(
3031             self,
3032             self.pg3.remote_ip4,
3033             32,
3034             [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
3035             table_id=10,
3036         )
3037         r2 = VppIpRoute(
3038             self,
3039             self.pg3.remote_ip4,
3040             32,
3041             [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
3042             table_id=20,
3043         )
3044         r1.add_vpp_config()
3045         r2.add_vpp_config()
3046
3047         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3048         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3049         self.vapi.nat44_ei_add_del_output_interface(
3050             sw_if_index=self.pg3.sw_if_index, is_add=1
3051         )
3052
3053         # in2out VRF 10
3054         pkts = self.create_stream_in(self.pg4, self.pg3)
3055         self.pg4.add_stream(pkts)
3056         self.pg_enable_capture(self.pg_interfaces)
3057         self.pg_start()
3058         capture = self.pg3.get_capture(len(pkts))
3059         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3060
3061         # out2in VRF 10
3062         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3063         self.pg3.add_stream(pkts)
3064         self.pg_enable_capture(self.pg_interfaces)
3065         self.pg_start()
3066         capture = self.pg4.get_capture(len(pkts))
3067         self.verify_capture_in(capture, self.pg4)
3068
3069         # in2out VRF 20
3070         pkts = self.create_stream_in(self.pg6, self.pg3)
3071         self.pg6.add_stream(pkts)
3072         self.pg_enable_capture(self.pg_interfaces)
3073         self.pg_start()
3074         capture = self.pg3.get_capture(len(pkts))
3075         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3076
3077         # out2in VRF 20
3078         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3079         self.pg3.add_stream(pkts)
3080         self.pg_enable_capture(self.pg_interfaces)
3081         self.pg_start()
3082         capture = self.pg6.get_capture(len(pkts))
3083         self.verify_capture_in(capture, self.pg6)
3084
3085     def test_output_feature_hairpinning(self):
3086         """NAT44EI output feature hairpinning (in2out postrouting)"""
3087         host = self.pg0.remote_hosts[0]
3088         server = self.pg0.remote_hosts[1]
3089         host_in_port = 1234
3090         host_out_port = 0
3091         server_in_port = 5678
3092         server_out_port = 8765
3093
3094         self.nat44_add_address(self.nat_addr)
3095         self.vapi.nat44_ei_add_del_output_interface(
3096             sw_if_index=self.pg0.sw_if_index, is_add=1
3097         )
3098         self.vapi.nat44_ei_add_del_output_interface(
3099             sw_if_index=self.pg1.sw_if_index, is_add=1
3100         )
3101
3102         # add static mapping for server
3103         self.nat44_add_static_mapping(
3104             server.ip4,
3105             self.nat_addr,
3106             server_in_port,
3107             server_out_port,
3108             proto=IP_PROTOS.tcp,
3109         )
3110
3111         # send packet from host to server
3112         p = (
3113             Ether(src=host.mac, dst=self.pg0.local_mac)
3114             / IP(src=host.ip4, dst=self.nat_addr)
3115             / TCP(sport=host_in_port, dport=server_out_port)
3116         )
3117         self.pg0.add_stream(p)
3118         self.pg_enable_capture(self.pg_interfaces)
3119         self.pg_start()
3120         capture = self.pg0.get_capture(1)
3121         p = capture[0]
3122         try:
3123             ip = p[IP]
3124             tcp = p[TCP]
3125             self.assertEqual(ip.src, self.nat_addr)
3126             self.assertEqual(ip.dst, server.ip4)
3127             self.assertNotEqual(tcp.sport, host_in_port)
3128             self.assertEqual(tcp.dport, server_in_port)
3129             self.assert_packet_checksums_valid(p)
3130             host_out_port = tcp.sport
3131         except:
3132             self.logger.error(ppp("Unexpected or invalid packet:", p))
3133             raise
3134
3135         # send reply from server to host
3136         p = (
3137             Ether(src=server.mac, dst=self.pg0.local_mac)
3138             / IP(src=server.ip4, dst=self.nat_addr)
3139             / TCP(sport=server_in_port, dport=host_out_port)
3140         )
3141         self.pg0.add_stream(p)
3142         self.pg_enable_capture(self.pg_interfaces)
3143         self.pg_start()
3144         capture = self.pg0.get_capture(1)
3145         p = capture[0]
3146         try:
3147             ip = p[IP]
3148             tcp = p[TCP]
3149             self.assertEqual(ip.src, self.nat_addr)
3150             self.assertEqual(ip.dst, host.ip4)
3151             self.assertEqual(tcp.sport, server_out_port)
3152             self.assertEqual(tcp.dport, host_in_port)
3153             self.assert_packet_checksums_valid(p)
3154         except:
3155             self.logger.error(ppp("Unexpected or invalid packet:", p))
3156             raise
3157
3158     def test_one_armed_nat44(self):
3159         """NAT44EI One armed NAT"""
3160         remote_host = self.pg9.remote_hosts[0]
3161         local_host = self.pg9.remote_hosts[1]
3162         external_port = 0
3163
3164         self.nat44_add_address(self.nat_addr)
3165         flags = self.config_flags.NAT44_EI_IF_INSIDE
3166         self.vapi.nat44_ei_interface_add_del_feature(
3167             sw_if_index=self.pg9.sw_if_index, is_add=1
3168         )
3169         self.vapi.nat44_ei_interface_add_del_feature(
3170             sw_if_index=self.pg9.sw_if_index, flags=flags, is_add=1
3171         )
3172
3173         # in2out
3174         p = (
3175             Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac)
3176             / IP(src=local_host.ip4, dst=remote_host.ip4)
3177             / TCP(sport=12345, dport=80)
3178         )
3179         self.pg9.add_stream(p)
3180         self.pg_enable_capture(self.pg_interfaces)
3181         self.pg_start()
3182         capture = self.pg9.get_capture(1)
3183         p = capture[0]
3184         try:
3185             ip = p[IP]
3186             tcp = p[TCP]
3187             self.assertEqual(ip.src, self.nat_addr)
3188             self.assertEqual(ip.dst, remote_host.ip4)
3189             self.assertNotEqual(tcp.sport, 12345)
3190             external_port = tcp.sport
3191             self.assertEqual(tcp.dport, 80)
3192             self.assert_packet_checksums_valid(p)
3193         except:
3194             self.logger.error(ppp("Unexpected or invalid packet:", p))
3195             raise
3196
3197         # out2in
3198         p = (
3199             Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac)
3200             / IP(src=remote_host.ip4, dst=self.nat_addr)
3201             / TCP(sport=80, dport=external_port)
3202         )
3203         self.pg9.add_stream(p)
3204         self.pg_enable_capture(self.pg_interfaces)
3205         self.pg_start()
3206         capture = self.pg9.get_capture(1)
3207         p = capture[0]
3208         try:
3209             ip = p[IP]
3210             tcp = p[TCP]
3211             self.assertEqual(ip.src, remote_host.ip4)
3212             self.assertEqual(ip.dst, local_host.ip4)
3213             self.assertEqual(tcp.sport, 80)
3214             self.assertEqual(tcp.dport, 12345)
3215             self.assert_packet_checksums_valid(p)
3216         except:
3217             self.logger.error(ppp("Unexpected or invalid packet:", p))
3218             raise
3219
3220         if self.vpp_worker_count > 1:
3221             node = "nat44-ei-handoff-classify"
3222         else:
3223             node = "nat44-ei-classify"
3224
3225         err = self.statistics.get_err_counter("/err/%s/next in2out" % node)
3226         self.assertEqual(err, 1)
3227         err = self.statistics.get_err_counter("/err/%s/next out2in" % node)
3228         self.assertEqual(err, 1)
3229
3230     def test_del_session(self):
3231         """NAT44EI delete session"""
3232         self.nat44_add_address(self.nat_addr)
3233         flags = self.config_flags.NAT44_EI_IF_INSIDE
3234         self.vapi.nat44_ei_interface_add_del_feature(
3235             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3236         )
3237         self.vapi.nat44_ei_interface_add_del_feature(
3238             sw_if_index=self.pg1.sw_if_index, is_add=1
3239         )
3240
3241         pkts = self.create_stream_in(self.pg0, self.pg1)
3242         self.pg0.add_stream(pkts)
3243         self.pg_enable_capture(self.pg_interfaces)
3244         self.pg_start()
3245         self.pg1.get_capture(len(pkts))
3246
3247         sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
3248         nsessions = len(sessions)
3249
3250         self.vapi.nat44_ei_del_session(
3251             address=sessions[0].inside_ip_address,
3252             port=sessions[0].inside_port,
3253             protocol=sessions[0].protocol,
3254             flags=self.config_flags.NAT44_EI_IF_INSIDE,
3255         )
3256
3257         self.vapi.nat44_ei_del_session(
3258             address=sessions[1].outside_ip_address,
3259             port=sessions[1].outside_port,
3260             protocol=sessions[1].protocol,
3261         )
3262
3263         sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
3264         self.assertEqual(nsessions - len(sessions), 2)
3265
3266         self.vapi.nat44_ei_del_session(
3267             address=sessions[0].inside_ip_address,
3268             port=sessions[0].inside_port,
3269             protocol=sessions[0].protocol,
3270             flags=self.config_flags.NAT44_EI_IF_INSIDE,
3271         )
3272
3273         self.verify_no_nat44_user()
3274
3275     def test_frag_in_order(self):
3276         """NAT44EI translate fragments arriving in order"""
3277
3278         self.nat44_add_address(self.nat_addr)
3279         flags = self.config_flags.NAT44_EI_IF_INSIDE
3280         self.vapi.nat44_ei_interface_add_del_feature(
3281             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3282         )
3283         self.vapi.nat44_ei_interface_add_del_feature(
3284             sw_if_index=self.pg1.sw_if_index, is_add=1
3285         )
3286
3287         self.frag_in_order(proto=IP_PROTOS.tcp)
3288         self.frag_in_order(proto=IP_PROTOS.udp)
3289         self.frag_in_order(proto=IP_PROTOS.icmp)
3290
3291     def test_frag_forwarding(self):
3292         """NAT44EI forwarding fragment test"""
3293         self.vapi.nat44_ei_add_del_interface_addr(
3294             is_add=1, sw_if_index=self.pg1.sw_if_index
3295         )
3296         flags = self.config_flags.NAT44_EI_IF_INSIDE
3297         self.vapi.nat44_ei_interface_add_del_feature(
3298             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3299         )
3300         self.vapi.nat44_ei_interface_add_del_feature(
3301             sw_if_index=self.pg1.sw_if_index, is_add=1
3302         )
3303         self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
3304
3305         data = b"A" * 16 + b"B" * 16 + b"C" * 3
3306         pkts = self.create_stream_frag(
3307             self.pg1, self.pg0.remote_ip4, 4789, 4789, data, proto=IP_PROTOS.udp
3308         )
3309         self.pg1.add_stream(pkts)
3310         self.pg_enable_capture(self.pg_interfaces)
3311         self.pg_start()
3312         frags = self.pg0.get_capture(len(pkts))
3313         p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
3314         self.assertEqual(p[UDP].sport, 4789)
3315         self.assertEqual(p[UDP].dport, 4789)
3316         self.assertEqual(data, p[Raw].load)
3317
3318     def test_reass_hairpinning(self):
3319         """NAT44EI fragments hairpinning"""
3320
3321         server_addr = self.pg0.remote_hosts[1].ip4
3322         host_in_port = random.randint(1025, 65535)
3323         server_in_port = random.randint(1025, 65535)
3324         server_out_port = random.randint(1025, 65535)
3325
3326         self.nat44_add_address(self.nat_addr)
3327         flags = self.config_flags.NAT44_EI_IF_INSIDE
3328         self.vapi.nat44_ei_interface_add_del_feature(
3329             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3330         )
3331         self.vapi.nat44_ei_interface_add_del_feature(
3332             sw_if_index=self.pg1.sw_if_index, is_add=1
3333         )
3334         # add static mapping for server
3335         self.nat44_add_static_mapping(
3336             server_addr,
3337             self.nat_addr,
3338             server_in_port,
3339             server_out_port,
3340             proto=IP_PROTOS.tcp,
3341         )
3342         self.nat44_add_static_mapping(
3343             server_addr,
3344             self.nat_addr,
3345             server_in_port,
3346             server_out_port,
3347             proto=IP_PROTOS.udp,
3348         )
3349         self.nat44_add_static_mapping(server_addr, self.nat_addr)
3350
3351         self.reass_hairpinning(
3352             server_addr,
3353             server_in_port,
3354             server_out_port,
3355             host_in_port,
3356             proto=IP_PROTOS.tcp,
3357         )
3358         self.reass_hairpinning(
3359             server_addr,
3360             server_in_port,
3361             server_out_port,
3362             host_in_port,
3363             proto=IP_PROTOS.udp,
3364         )
3365         self.reass_hairpinning(
3366             server_addr,
3367             server_in_port,
3368             server_out_port,
3369             host_in_port,
3370             proto=IP_PROTOS.icmp,
3371         )
3372
3373     def test_frag_out_of_order(self):
3374         """NAT44EI translate fragments arriving out of order"""
3375
3376         self.nat44_add_address(self.nat_addr)
3377         flags = self.config_flags.NAT44_EI_IF_INSIDE
3378         self.vapi.nat44_ei_interface_add_del_feature(
3379             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3380         )
3381         self.vapi.nat44_ei_interface_add_del_feature(
3382             sw_if_index=self.pg1.sw_if_index, is_add=1
3383         )
3384
3385         self.frag_out_of_order(proto=IP_PROTOS.tcp)
3386         self.frag_out_of_order(proto=IP_PROTOS.udp)
3387         self.frag_out_of_order(proto=IP_PROTOS.icmp)
3388
3389     def test_port_restricted(self):
3390         """NAT44EI Port restricted NAT44EI (MAP-E CE)"""
3391         self.nat44_add_address(self.nat_addr)
3392         flags = self.config_flags.NAT44_EI_IF_INSIDE
3393         self.vapi.nat44_ei_interface_add_del_feature(
3394             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3395         )
3396         self.vapi.nat44_ei_interface_add_del_feature(
3397             sw_if_index=self.pg1.sw_if_index, is_add=1
3398         )
3399         self.vapi.nat44_ei_set_addr_and_port_alloc_alg(
3400             alg=1, psid_offset=6, psid_length=6, psid=10
3401         )
3402
3403         p = (
3404             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3405             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3406             / TCP(sport=4567, dport=22)
3407         )
3408         self.pg0.add_stream(p)
3409         self.pg_enable_capture(self.pg_interfaces)
3410         self.pg_start()
3411         capture = self.pg1.get_capture(1)
3412         p = capture[0]
3413         try:
3414             ip = p[IP]
3415             tcp = p[TCP]
3416             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3417             self.assertEqual(ip.src, self.nat_addr)
3418             self.assertEqual(tcp.dport, 22)
3419             self.assertNotEqual(tcp.sport, 4567)
3420             self.assertEqual((tcp.sport >> 6) & 63, 10)
3421             self.assert_packet_checksums_valid(p)
3422         except:
3423             self.logger.error(ppp("Unexpected or invalid packet:", p))
3424             raise
3425
3426     def test_port_range(self):
3427         """NAT44EI External address port range"""
3428         self.nat44_add_address(self.nat_addr)
3429         flags = self.config_flags.NAT44_EI_IF_INSIDE
3430         self.vapi.nat44_ei_interface_add_del_feature(
3431             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3432         )
3433         self.vapi.nat44_ei_interface_add_del_feature(
3434             sw_if_index=self.pg1.sw_if_index, is_add=1
3435         )
3436         self.vapi.nat44_ei_set_addr_and_port_alloc_alg(
3437             alg=2, start_port=1025, end_port=1027
3438         )
3439
3440         pkts = []
3441         for port in range(0, 5):
3442             p = (
3443                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3444                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3445                 / TCP(sport=1125 + port)
3446             )
3447             pkts.append(p)
3448         self.pg0.add_stream(pkts)
3449         self.pg_enable_capture(self.pg_interfaces)
3450         self.pg_start()
3451         capture = self.pg1.get_capture(3)
3452         for p in capture:
3453             tcp = p[TCP]
3454             self.assertGreaterEqual(tcp.sport, 1025)
3455             self.assertLessEqual(tcp.sport, 1027)
3456
3457     def test_multiple_outside_vrf(self):
3458         """NAT44EI Multiple outside VRF"""
3459         vrf_id1 = 1
3460         vrf_id2 = 2
3461
3462         self.pg1.unconfig_ip4()
3463         self.pg2.unconfig_ip4()
3464         self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1})
3465         self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2})
3466         self.pg1.set_table_ip4(vrf_id1)
3467         self.pg2.set_table_ip4(vrf_id2)
3468         self.pg1.config_ip4()
3469         self.pg2.config_ip4()
3470         self.pg1.resolve_arp()
3471         self.pg2.resolve_arp()
3472
3473         self.nat44_add_address(self.nat_addr)
3474         flags = self.config_flags.NAT44_EI_IF_INSIDE
3475         self.vapi.nat44_ei_interface_add_del_feature(
3476             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3477         )
3478         self.vapi.nat44_ei_interface_add_del_feature(
3479             sw_if_index=self.pg1.sw_if_index, is_add=1
3480         )
3481         self.vapi.nat44_ei_interface_add_del_feature(
3482             sw_if_index=self.pg2.sw_if_index, is_add=1
3483         )
3484
3485         try:
3486             # first VRF
3487             pkts = self.create_stream_in(self.pg0, self.pg1)
3488             self.pg0.add_stream(pkts)
3489             self.pg_enable_capture(self.pg_interfaces)
3490             self.pg_start()
3491             capture = self.pg1.get_capture(len(pkts))
3492             self.verify_capture_out(capture, self.nat_addr)
3493
3494             pkts = self.create_stream_out(self.pg1, self.nat_addr)
3495             self.pg1.add_stream(pkts)
3496             self.pg_enable_capture(self.pg_interfaces)
3497             self.pg_start()
3498             capture = self.pg0.get_capture(len(pkts))
3499             self.verify_capture_in(capture, self.pg0)
3500
3501             self.tcp_port_in = 60303
3502             self.udp_port_in = 60304
3503             self.icmp_id_in = 60305
3504
3505             # second VRF
3506             pkts = self.create_stream_in(self.pg0, self.pg2)
3507             self.pg0.add_stream(pkts)
3508             self.pg_enable_capture(self.pg_interfaces)
3509             self.pg_start()
3510             capture = self.pg2.get_capture(len(pkts))
3511             self.verify_capture_out(capture, self.nat_addr)
3512
3513             pkts = self.create_stream_out(self.pg2, self.nat_addr)
3514             self.pg2.add_stream(pkts)
3515             self.pg_enable_capture(self.pg_interfaces)
3516             self.pg_start()
3517             capture = self.pg0.get_capture(len(pkts))
3518             self.verify_capture_in(capture, self.pg0)
3519
3520         finally:
3521             self.nat44_add_address(self.nat_addr, is_add=0)
3522             self.pg1.unconfig_ip4()
3523             self.pg2.unconfig_ip4()
3524             self.pg1.set_table_ip4(0)
3525             self.pg2.set_table_ip4(0)
3526             self.pg1.config_ip4()
3527             self.pg2.config_ip4()
3528             self.pg1.resolve_arp()
3529             self.pg2.resolve_arp()
3530
3531     def test_mss_clamping(self):
3532         """NAT44EI TCP MSS clamping"""
3533         self.nat44_add_address(self.nat_addr)
3534         flags = self.config_flags.NAT44_EI_IF_INSIDE
3535         self.vapi.nat44_ei_interface_add_del_feature(
3536             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3537         )
3538         self.vapi.nat44_ei_interface_add_del_feature(
3539             sw_if_index=self.pg1.sw_if_index, is_add=1
3540         )
3541
3542         p = (
3543             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3544             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3545             / TCP(
3546                 sport=self.tcp_port_in,
3547                 dport=self.tcp_external_port,
3548                 flags="S",
3549                 options=[("MSS", 1400)],
3550             )
3551         )
3552
3553         self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1000)
3554         self.pg0.add_stream(p)
3555         self.pg_enable_capture(self.pg_interfaces)
3556         self.pg_start()
3557         capture = self.pg1.get_capture(1)
3558         # Negotiated MSS value greater than configured - changed
3559         self.verify_mss_value(capture[0], 1000)
3560
3561         self.vapi.nat44_ei_set_mss_clamping(enable=0, mss_value=1500)
3562         self.pg0.add_stream(p)
3563         self.pg_enable_capture(self.pg_interfaces)
3564         self.pg_start()
3565         capture = self.pg1.get_capture(1)
3566         # MSS clamping disabled - negotiated MSS unchanged
3567         self.verify_mss_value(capture[0], 1400)
3568
3569         self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1500)
3570         self.pg0.add_stream(p)
3571         self.pg_enable_capture(self.pg_interfaces)
3572         self.pg_start()
3573         capture = self.pg1.get_capture(1)
3574         # Negotiated MSS value smaller than configured - unchanged
3575         self.verify_mss_value(capture[0], 1400)
3576
3577     def test_ha_send(self):
3578         """NAT44EI Send HA session synchronization events (active)"""
3579         flags = self.config_flags.NAT44_EI_IF_INSIDE
3580         self.vapi.nat44_ei_interface_add_del_feature(
3581             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3582         )
3583         self.vapi.nat44_ei_interface_add_del_feature(
3584             sw_if_index=self.pg1.sw_if_index, is_add=1
3585         )
3586         self.nat44_add_address(self.nat_addr)
3587
3588         self.vapi.nat44_ei_ha_set_listener(
3589             ip_address=self.pg3.local_ip4, port=12345, path_mtu=512
3590         )
3591         self.vapi.nat44_ei_ha_set_failover(
3592             ip_address=self.pg3.remote_ip4, port=12346, session_refresh_interval=10
3593         )
3594         bind_layers(UDP, HANATStateSync, sport=12345)
3595
3596         # create sessions
3597         pkts = self.create_stream_in(self.pg0, self.pg1)
3598         self.pg0.add_stream(pkts)
3599         self.pg_enable_capture(self.pg_interfaces)
3600         self.pg_start()
3601         capture = self.pg1.get_capture(len(pkts))
3602         self.verify_capture_out(capture)
3603         # active send HA events
3604         self.vapi.nat44_ei_ha_flush()
3605         stats = self.statistics["/nat44-ei/ha/add-event-send"]
3606         self.assertEqual(stats[:, 0].sum(), 3)
3607         capture = self.pg3.get_capture(1)
3608         p = capture[0]
3609         self.assert_packet_checksums_valid(p)
3610         try:
3611             ip = p[IP]
3612             udp = p[UDP]
3613             hanat = p[HANATStateSync]
3614         except IndexError:
3615             self.logger.error(ppp("Invalid packet:", p))
3616             raise
3617         else:
3618             self.assertEqual(ip.src, self.pg3.local_ip4)
3619             self.assertEqual(ip.dst, self.pg3.remote_ip4)
3620             self.assertEqual(udp.sport, 12345)
3621             self.assertEqual(udp.dport, 12346)
3622             self.assertEqual(hanat.version, 1)
3623             # self.assertEqual(hanat.thread_index, 0)
3624             self.assertEqual(hanat.count, 3)
3625             seq = hanat.sequence_number
3626             for event in hanat.events:
3627                 self.assertEqual(event.event_type, 1)
3628                 self.assertEqual(event.in_addr, self.pg0.remote_ip4)
3629                 self.assertEqual(event.out_addr, self.nat_addr)
3630                 self.assertEqual(event.fib_index, 0)
3631
3632         # ACK received events
3633         ack = (
3634             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3635             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3636             / UDP(sport=12346, dport=12345)
3637             / HANATStateSync(
3638                 sequence_number=seq, flags="ACK", thread_index=hanat.thread_index
3639             )
3640         )
3641         self.pg3.add_stream(ack)
3642         self.pg_start()
3643         stats = self.statistics["/nat44-ei/ha/ack-recv"]
3644         self.assertEqual(stats[:, 0].sum(), 1)
3645
3646         # delete one session
3647         self.pg_enable_capture(self.pg_interfaces)
3648         self.vapi.nat44_ei_del_session(
3649             address=self.pg0.remote_ip4,
3650             port=self.tcp_port_in,
3651             protocol=IP_PROTOS.tcp,
3652             flags=self.config_flags.NAT44_EI_IF_INSIDE,
3653         )
3654         self.vapi.nat44_ei_ha_flush()
3655         stats = self.statistics["/nat44-ei/ha/del-event-send"]
3656         self.assertEqual(stats[:, 0].sum(), 1)
3657         capture = self.pg3.get_capture(1)
3658         p = capture[0]
3659         try:
3660             hanat = p[HANATStateSync]
3661         except IndexError:
3662             self.logger.error(ppp("Invalid packet:", p))
3663             raise
3664         else:
3665             self.assertGreater(hanat.sequence_number, seq)
3666
3667         # do not send ACK, active retry send HA event again
3668         self.pg_enable_capture(self.pg_interfaces)
3669         self.virtual_sleep(12)
3670         stats = self.statistics["/nat44-ei/ha/retry-count"]
3671         self.assertEqual(stats[:, 0].sum(), 3)
3672         stats = self.statistics["/nat44-ei/ha/missed-count"]
3673         self.assertEqual(stats[:, 0].sum(), 1)
3674         capture = self.pg3.get_capture(3)
3675         for packet in capture:
3676             self.assertEqual(packet, p)
3677
3678         # session counters refresh
3679         pkts = self.create_stream_out(self.pg1)
3680         self.pg1.add_stream(pkts)
3681         self.pg_enable_capture(self.pg_interfaces)
3682         self.pg_start()
3683         self.pg0.get_capture(2)
3684         self.vapi.nat44_ei_ha_flush()
3685         stats = self.statistics["/nat44-ei/ha/refresh-event-send"]
3686         self.assertEqual(stats[:, 0].sum(), 2)
3687         capture = self.pg3.get_capture(1)
3688         p = capture[0]
3689         self.assert_packet_checksums_valid(p)
3690         try:
3691             ip = p[IP]
3692             udp = p[UDP]
3693             hanat = p[HANATStateSync]
3694         except IndexError:
3695             self.logger.error(ppp("Invalid packet:", p))
3696             raise
3697         else:
3698             self.assertEqual(ip.src, self.pg3.local_ip4)
3699             self.assertEqual(ip.dst, self.pg3.remote_ip4)
3700             self.assertEqual(udp.sport, 12345)
3701             self.assertEqual(udp.dport, 12346)
3702             self.assertEqual(hanat.version, 1)
3703             self.assertEqual(hanat.count, 2)
3704             seq = hanat.sequence_number
3705             for event in hanat.events:
3706                 self.assertEqual(event.event_type, 3)
3707                 self.assertEqual(event.out_addr, self.nat_addr)
3708                 self.assertEqual(event.fib_index, 0)
3709                 self.assertEqual(event.total_pkts, 2)
3710                 self.assertGreater(event.total_bytes, 0)
3711
3712         stats = self.statistics["/nat44-ei/ha/ack-recv"]
3713         ack = (
3714             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3715             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3716             / UDP(sport=12346, dport=12345)
3717             / HANATStateSync(
3718                 sequence_number=seq, flags="ACK", thread_index=hanat.thread_index
3719             )
3720         )
3721         self.pg3.add_stream(ack)
3722         self.pg_start()
3723         stats = self.statistics["/nat44-ei/ha/ack-recv"]
3724         self.assertEqual(stats[:, 0].sum(), 2)
3725
3726     def test_ha_recv(self):
3727         """NAT44EI Receive HA session synchronization events (passive)"""
3728         self.nat44_add_address(self.nat_addr)
3729         flags = self.config_flags.NAT44_EI_IF_INSIDE
3730         self.vapi.nat44_ei_interface_add_del_feature(
3731             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3732         )
3733         self.vapi.nat44_ei_interface_add_del_feature(
3734             sw_if_index=self.pg1.sw_if_index, is_add=1
3735         )
3736         self.vapi.nat44_ei_ha_set_listener(
3737             ip_address=self.pg3.local_ip4, port=12345, path_mtu=512
3738         )
3739         bind_layers(UDP, HANATStateSync, sport=12345)
3740
3741         # this is a bit tricky - HA dictates thread index due to how it's
3742         # designed, but once we use HA to create a session, we also want
3743         # to pass a packet through said session. so the session must end
3744         # up on the correct thread from both directions - in2out (based on
3745         # IP address) and out2in (based on outside port)
3746
3747         # first choose a thread index which is correct for IP
3748         thread_index = get_nat44_ei_in2out_worker_index(
3749             self.pg0.remote_ip4, self.vpp_worker_count
3750         )
3751
3752         # now pick a port which is correct for given thread
3753         port_per_thread = int((0xFFFF - 1024) / max(1, self.vpp_worker_count))
3754         self.tcp_port_out = 1024 + random.randint(1, port_per_thread)
3755         self.udp_port_out = 1024 + random.randint(1, port_per_thread)
3756         if self.vpp_worker_count > 0:
3757             self.tcp_port_out += port_per_thread * (thread_index - 1)
3758             self.udp_port_out += port_per_thread * (thread_index - 1)
3759
3760         # send HA session add events to failover/passive
3761         p = (
3762             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3763             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3764             / UDP(sport=12346, dport=12345)
3765             / HANATStateSync(
3766                 sequence_number=1,
3767                 events=[
3768                     Event(
3769                         event_type="add",
3770                         protocol="tcp",
3771                         in_addr=self.pg0.remote_ip4,
3772                         out_addr=self.nat_addr,
3773                         in_port=self.tcp_port_in,
3774                         out_port=self.tcp_port_out,
3775                         eh_addr=self.pg1.remote_ip4,
3776                         ehn_addr=self.pg1.remote_ip4,
3777                         eh_port=self.tcp_external_port,
3778                         ehn_port=self.tcp_external_port,
3779                         fib_index=0,
3780                     ),
3781                     Event(
3782                         event_type="add",
3783                         protocol="udp",
3784                         in_addr=self.pg0.remote_ip4,
3785                         out_addr=self.nat_addr,
3786                         in_port=self.udp_port_in,
3787                         out_port=self.udp_port_out,
3788                         eh_addr=self.pg1.remote_ip4,
3789                         ehn_addr=self.pg1.remote_ip4,
3790                         eh_port=self.udp_external_port,
3791                         ehn_port=self.udp_external_port,
3792                         fib_index=0,
3793                     ),
3794                 ],
3795                 thread_index=thread_index,
3796             )
3797         )
3798
3799         self.pg3.add_stream(p)
3800         self.pg_enable_capture(self.pg_interfaces)
3801         self.pg_start()
3802         # receive ACK
3803         capture = self.pg3.get_capture(1)
3804         p = capture[0]
3805         try:
3806             hanat = p[HANATStateSync]
3807         except IndexError:
3808             self.logger.error(ppp("Invalid packet:", p))
3809             raise
3810         else:
3811             self.assertEqual(hanat.sequence_number, 1)
3812             self.assertEqual(hanat.flags, "ACK")
3813             self.assertEqual(hanat.version, 1)
3814             self.assertEqual(hanat.thread_index, thread_index)
3815         stats = self.statistics["/nat44-ei/ha/ack-send"]
3816         self.assertEqual(stats[:, 0].sum(), 1)
3817         stats = self.statistics["/nat44-ei/ha/add-event-recv"]
3818         self.assertEqual(stats[:, 0].sum(), 2)
3819         users = self.statistics["/nat44-ei/total-users"]
3820         self.assertEqual(users[:, 0].sum(), 1)
3821         sessions = self.statistics["/nat44-ei/total-sessions"]
3822         self.assertEqual(sessions[:, 0].sum(), 2)
3823         users = self.vapi.nat44_ei_user_dump()
3824         self.assertEqual(len(users), 1)
3825         self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3826         # there should be 2 sessions created by HA
3827         sessions = self.vapi.nat44_ei_user_session_dump(
3828             users[0].ip_address, users[0].vrf_id
3829         )
3830         self.assertEqual(len(sessions), 2)
3831         for session in sessions:
3832             self.assertEqual(str(session.inside_ip_address), self.pg0.remote_ip4)
3833             self.assertEqual(str(session.outside_ip_address), self.nat_addr)
3834             self.assertIn(session.inside_port, [self.tcp_port_in, self.udp_port_in])
3835             self.assertIn(session.outside_port, [self.tcp_port_out, self.udp_port_out])
3836             self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp])
3837
3838         # send HA session delete event to failover/passive
3839         p = (
3840             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3841             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3842             / UDP(sport=12346, dport=12345)
3843             / HANATStateSync(
3844                 sequence_number=2,
3845                 events=[
3846                     Event(
3847                         event_type="del",
3848                         protocol="udp",
3849                         in_addr=self.pg0.remote_ip4,
3850                         out_addr=self.nat_addr,
3851                         in_port=self.udp_port_in,
3852                         out_port=self.udp_port_out,
3853                         eh_addr=self.pg1.remote_ip4,
3854                         ehn_addr=self.pg1.remote_ip4,
3855                         eh_port=self.udp_external_port,
3856                         ehn_port=self.udp_external_port,
3857                         fib_index=0,
3858                     )
3859                 ],
3860                 thread_index=thread_index,
3861             )
3862         )
3863
3864         self.pg3.add_stream(p)
3865         self.pg_enable_capture(self.pg_interfaces)
3866         self.pg_start()
3867         # receive ACK
3868         capture = self.pg3.get_capture(1)
3869         p = capture[0]
3870         try:
3871             hanat = p[HANATStateSync]
3872         except IndexError:
3873             self.logger.error(ppp("Invalid packet:", p))
3874             raise
3875         else:
3876             self.assertEqual(hanat.sequence_number, 2)
3877             self.assertEqual(hanat.flags, "ACK")
3878             self.assertEqual(hanat.version, 1)
3879         users = self.vapi.nat44_ei_user_dump()
3880         self.assertEqual(len(users), 1)
3881         self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3882         # now we should have only 1 session, 1 deleted by HA
3883         sessions = self.vapi.nat44_ei_user_session_dump(
3884             users[0].ip_address, users[0].vrf_id
3885         )
3886         self.assertEqual(len(sessions), 1)
3887         stats = self.statistics["/nat44-ei/ha/del-event-recv"]
3888         self.assertEqual(stats[:, 0].sum(), 1)
3889
3890         stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed")
3891         self.assertEqual(stats, 2)
3892
3893         # send HA session refresh event to failover/passive
3894         p = (
3895             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3896             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3897             / UDP(sport=12346, dport=12345)
3898             / HANATStateSync(
3899                 sequence_number=3,
3900                 events=[
3901                     Event(
3902                         event_type="refresh",
3903                         protocol="tcp",
3904                         in_addr=self.pg0.remote_ip4,
3905                         out_addr=self.nat_addr,
3906                         in_port=self.tcp_port_in,
3907                         out_port=self.tcp_port_out,
3908                         eh_addr=self.pg1.remote_ip4,
3909                         ehn_addr=self.pg1.remote_ip4,
3910                         eh_port=self.tcp_external_port,
3911                         ehn_port=self.tcp_external_port,
3912                         fib_index=0,
3913                         total_bytes=1024,
3914                         total_pkts=2,
3915                     )
3916                 ],
3917                 thread_index=thread_index,
3918             )
3919         )
3920         self.pg3.add_stream(p)
3921         self.pg_enable_capture(self.pg_interfaces)
3922         self.pg_start()
3923         # receive ACK
3924         capture = self.pg3.get_capture(1)
3925         p = capture[0]
3926         try:
3927             hanat = p[HANATStateSync]
3928         except IndexError:
3929             self.logger.error(ppp("Invalid packet:", p))
3930             raise
3931         else:
3932             self.assertEqual(hanat.sequence_number, 3)
3933             self.assertEqual(hanat.flags, "ACK")
3934             self.assertEqual(hanat.version, 1)
3935         users = self.vapi.nat44_ei_user_dump()
3936         self.assertEqual(len(users), 1)
3937         self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3938         sessions = self.vapi.nat44_ei_user_session_dump(
3939             users[0].ip_address, users[0].vrf_id
3940         )
3941         self.assertEqual(len(sessions), 1)
3942         session = sessions[0]
3943         self.assertEqual(session.total_bytes, 1024)
3944         self.assertEqual(session.total_pkts, 2)
3945         stats = self.statistics["/nat44-ei/ha/refresh-event-recv"]
3946         self.assertEqual(stats[:, 0].sum(), 1)
3947
3948         stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed")
3949         self.assertEqual(stats, 3)
3950
3951         # send packet to test session created by HA
3952         p = (
3953             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3954             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3955             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out)
3956         )
3957         self.pg1.add_stream(p)
3958         self.pg_enable_capture(self.pg_interfaces)
3959         self.pg_start()
3960         capture = self.pg0.get_capture(1)
3961         p = capture[0]
3962         try:
3963             ip = p[IP]
3964             tcp = p[TCP]
3965         except IndexError:
3966             self.logger.error(ppp("Invalid packet:", p))
3967             raise
3968         else:
3969             self.assertEqual(ip.src, self.pg1.remote_ip4)
3970             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3971             self.assertEqual(tcp.sport, self.tcp_external_port)
3972             self.assertEqual(tcp.dport, self.tcp_port_in)
3973
3974     def reconfigure_frame_queue_nelts(self, frame_queue_nelts):
3975         self.vapi.nat44_ei_plugin_enable_disable(enable=0)
3976         self.vapi.nat44_ei_set_fq_options(frame_queue_nelts=frame_queue_nelts)
3977         # keep plugin configuration persistent
3978         self.plugin_enable()
3979         return self.vapi.nat44_ei_show_fq_options().frame_queue_nelts
3980
3981     def test_set_frame_queue_nelts(self):
3982         """NAT44EI API test - worker handoff frame queue elements"""
3983         self.assertEqual(self.reconfigure_frame_queue_nelts(512), 512)
3984
3985     def show_commands_at_teardown(self):
3986         self.logger.info(self.vapi.cli("show nat44 ei timeouts"))
3987         self.logger.info(self.vapi.cli("show nat44 ei addresses"))
3988         self.logger.info(self.vapi.cli("show nat44 ei interfaces"))
3989         self.logger.info(self.vapi.cli("show nat44 ei static mappings"))
3990         self.logger.info(self.vapi.cli("show nat44 ei interface address"))
3991         self.logger.info(self.vapi.cli("show nat44 ei sessions detail"))
3992         self.logger.info(self.vapi.cli("show nat44 ei hash tables detail"))
3993         self.logger.info(self.vapi.cli("show nat44 ei ha"))
3994         self.logger.info(self.vapi.cli("show nat44 ei addr-port-assignment-alg"))
3995
3996     def test_outside_address_distribution(self):
3997         """Outside address distribution based on source address"""
3998
3999         x = 100
4000         nat_addresses = []
4001
4002         for i in range(1, x):
4003             a = "10.0.0.%d" % i
4004             nat_addresses.append(a)
4005
4006         flags = self.config_flags.NAT44_EI_IF_INSIDE
4007         self.vapi.nat44_ei_interface_add_del_feature(
4008             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4009         )
4010         self.vapi.nat44_ei_interface_add_del_feature(
4011             sw_if_index=self.pg1.sw_if_index, is_add=1
4012         )
4013
4014         self.vapi.nat44_ei_add_del_address_range(
4015             first_ip_address=nat_addresses[0],
4016             last_ip_address=nat_addresses[-1],
4017             vrf_id=0xFFFFFFFF,
4018             is_add=1,
4019         )
4020
4021         self.pg0.generate_remote_hosts(x)
4022
4023         pkts = []
4024         for i in range(x):
4025             info = self.create_packet_info(self.pg0, self.pg1)
4026             payload = self.info_to_payload(info)
4027             p = (
4028                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4029                 / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4)
4030                 / UDP(sport=7000 + i, dport=8000 + i)
4031                 / Raw(payload)
4032             )
4033             info.data = p
4034             pkts.append(p)
4035
4036         self.pg0.add_stream(pkts)
4037         self.pg_enable_capture(self.pg_interfaces)
4038         self.pg_start()
4039         recvd = self.pg1.get_capture(len(pkts))
4040         for p_recvd in recvd:
4041             payload_info = self.payload_to_info(p_recvd[Raw])
4042             packet_index = payload_info.index
4043             info = self._packet_infos[packet_index]
4044             self.assertTrue(info is not None)
4045             self.assertEqual(packet_index, info.index)
4046             p_sent = info.data
4047             packed = socket.inet_aton(p_sent[IP].src)
4048             numeric = struct.unpack("!L", packed)[0]
4049             numeric = socket.htonl(numeric)
4050             a = nat_addresses[(numeric - 1) % len(nat_addresses)]
4051             self.assertEqual(
4052                 a,
4053                 p_recvd[IP].src,
4054                 "Invalid packet (src IP %s translated to %s, but expected %s)"
4055                 % (p_sent[IP].src, p_recvd[IP].src, a),
4056             )
4057
4058     def test_default_user_sessions(self):
4059         """NAT44EI default per-user session limit is used and reported"""
4060         nat44_ei_config = self.vapi.nat44_ei_show_running_config()
4061         # a nonzero default should be reported for user_sessions
4062         self.assertNotEqual(nat44_ei_config.user_sessions, 0)
4063
4064
4065 class TestNAT44Out2InDPO(MethodHolder):
4066     """NAT44EI Test Cases using out2in DPO"""
4067
4068     @classmethod
4069     def setUpClass(cls):
4070         super(TestNAT44Out2InDPO, cls).setUpClass()
4071         cls.vapi.cli("set log class nat44-ei level debug")
4072
4073         cls.tcp_port_in = 6303
4074         cls.tcp_port_out = 6303
4075         cls.udp_port_in = 6304
4076         cls.udp_port_out = 6304
4077         cls.icmp_id_in = 6305
4078         cls.icmp_id_out = 6305
4079         cls.nat_addr = "10.0.0.3"
4080         cls.dst_ip4 = "192.168.70.1"
4081
4082         cls.create_pg_interfaces(range(2))
4083
4084         cls.pg0.admin_up()
4085         cls.pg0.config_ip4()
4086         cls.pg0.resolve_arp()
4087
4088         cls.pg1.admin_up()
4089         cls.pg1.config_ip6()
4090         cls.pg1.resolve_ndp()
4091
4092         r1 = VppIpRoute(
4093             cls,
4094             "::",
4095             0,
4096             [VppRoutePath(cls.pg1.remote_ip6, cls.pg1.sw_if_index)],
4097             register=False,
4098         )
4099         r1.add_vpp_config()
4100
4101     def setUp(self):
4102         super(TestNAT44Out2InDPO, self).setUp()
4103         flags = self.config_flags.NAT44_EI_OUT2IN_DPO
4104         self.vapi.nat44_ei_plugin_enable_disable(enable=1, flags=flags)
4105
4106     def tearDown(self):
4107         super(TestNAT44Out2InDPO, self).tearDown()
4108         if not self.vpp_dead:
4109             self.vapi.nat44_ei_plugin_enable_disable(enable=0)
4110             self.vapi.cli("clear logging")
4111
4112     def configure_xlat(self):
4113         self.dst_ip6_pfx = "1:2:3::"
4114         self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.dst_ip6_pfx)
4115         self.dst_ip6_pfx_len = 96
4116         self.src_ip6_pfx = "4:5:6::"
4117         self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.src_ip6_pfx)
4118         self.src_ip6_pfx_len = 96
4119         self.vapi.map_add_domain(
4120             self.dst_ip6_pfx_n,
4121             self.dst_ip6_pfx_len,
4122             self.src_ip6_pfx_n,
4123             self.src_ip6_pfx_len,
4124             "\x00\x00\x00\x00",
4125             0,
4126         )
4127
4128     @unittest.skip("Temporary disabled")
4129     def test_464xlat_ce(self):
4130         """Test 464XLAT CE with NAT44EI"""
4131
4132         self.configure_xlat()
4133
4134         flags = self.config_flags.NAT44_EI_IF_INSIDE
4135         self.vapi.nat44_ei_interface_add_del_feature(
4136             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4137         )
4138         self.vapi.nat44_ei_add_del_address_range(
4139             first_ip_address=self.nat_addr_n,
4140             last_ip_address=self.nat_addr_n,
4141             vrf_id=0xFFFFFFFF,
4142             is_add=1,
4143         )
4144
4145         out_src_ip6 = self.compose_ip6(
4146             self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len
4147         )
4148         out_dst_ip6 = self.compose_ip6(
4149             self.nat_addr, self.src_ip6_pfx, self.src_ip6_pfx_len
4150         )
4151
4152         try:
4153             pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4154             self.pg0.add_stream(pkts)
4155             self.pg_enable_capture(self.pg_interfaces)
4156             self.pg_start()
4157             capture = self.pg1.get_capture(len(pkts))
4158             self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6, dst_ip=out_src_ip6)
4159
4160             pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4161             self.pg1.add_stream(pkts)
4162             self.pg_enable_capture(self.pg_interfaces)
4163             self.pg_start()
4164             capture = self.pg0.get_capture(len(pkts))
4165             self.verify_capture_in(capture, self.pg0)
4166         finally:
4167             self.vapi.nat44_ei_interface_add_del_feature(
4168                 sw_if_index=self.pg0.sw_if_index, flags=flags
4169             )
4170             self.vapi.nat44_ei_add_del_address_range(
4171                 first_ip_address=self.nat_addr_n,
4172                 last_ip_address=self.nat_addr_n,
4173                 vrf_id=0xFFFFFFFF,
4174             )
4175
4176     @unittest.skip("Temporary disabled")
4177     def test_464xlat_ce_no_nat(self):
4178         """Test 464XLAT CE without NAT44EI"""
4179
4180         self.configure_xlat()
4181
4182         out_src_ip6 = self.compose_ip6(
4183             self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len
4184         )
4185         out_dst_ip6 = self.compose_ip6(
4186             self.pg0.remote_ip4, self.src_ip6_pfx, self.src_ip6_pfx_len
4187         )
4188
4189         pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4190         self.pg0.add_stream(pkts)
4191         self.pg_enable_capture(self.pg_interfaces)
4192         self.pg_start()
4193         capture = self.pg1.get_capture(len(pkts))
4194         self.verify_capture_out_ip6(
4195             capture, dst_ip=out_src_ip6, nat_ip=out_dst_ip6, same_port=True
4196         )
4197
4198         pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4199         self.pg1.add_stream(pkts)
4200         self.pg_enable_capture(self.pg_interfaces)
4201         self.pg_start()
4202         capture = self.pg0.get_capture(len(pkts))
4203         self.verify_capture_in(capture, self.pg0)
4204
4205
4206 class TestNAT44EIMW(MethodHolder):
4207     """NAT44EI Test Cases (multiple workers)"""
4208
4209     vpp_worker_count = 2
4210     max_translations = 10240
4211     max_users = 10240
4212
4213     @classmethod
4214     def setUpClass(cls):
4215         super(TestNAT44EIMW, cls).setUpClass()
4216         cls.vapi.cli("set log class nat level debug")
4217
4218         cls.tcp_port_in = 6303
4219         cls.tcp_port_out = 6303
4220         cls.udp_port_in = 6304
4221         cls.udp_port_out = 6304
4222         cls.icmp_id_in = 6305
4223         cls.icmp_id_out = 6305
4224         cls.nat_addr = "10.0.0.3"
4225         cls.ipfix_src_port = 4739
4226         cls.ipfix_domain_id = 1
4227         cls.tcp_external_port = 80
4228         cls.udp_external_port = 69
4229
4230         cls.create_pg_interfaces(range(10))
4231         cls.interfaces = list(cls.pg_interfaces[0:4])
4232
4233         for i in cls.interfaces:
4234             i.admin_up()
4235             i.config_ip4()
4236             i.resolve_arp()
4237
4238         cls.pg0.generate_remote_hosts(3)
4239         cls.pg0.configure_ipv4_neighbors()
4240
4241         cls.pg1.generate_remote_hosts(1)
4242         cls.pg1.configure_ipv4_neighbors()
4243
4244         cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
4245         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10})
4246         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20})
4247
4248         cls.pg4._local_ip4 = "172.16.255.1"
4249         cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
4250         cls.pg4.set_table_ip4(10)
4251         cls.pg5._local_ip4 = "172.17.255.3"
4252         cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
4253         cls.pg5.set_table_ip4(10)
4254         cls.pg6._local_ip4 = "172.16.255.1"
4255         cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
4256         cls.pg6.set_table_ip4(20)
4257         for i in cls.overlapping_interfaces:
4258             i.config_ip4()
4259             i.admin_up()
4260             i.resolve_arp()
4261
4262         cls.pg7.admin_up()
4263         cls.pg8.admin_up()
4264
4265         cls.pg9.generate_remote_hosts(2)
4266         cls.pg9.config_ip4()
4267         cls.vapi.sw_interface_add_del_address(
4268             sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24"
4269         )
4270
4271         cls.pg9.admin_up()
4272         cls.pg9.resolve_arp()
4273         cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
4274         cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
4275         cls.pg9.resolve_arp()
4276
4277     def setUp(self):
4278         super(TestNAT44EIMW, self).setUp()
4279         self.vapi.nat44_ei_plugin_enable_disable(
4280             sessions=self.max_translations, users=self.max_users, enable=1
4281         )
4282
4283     def tearDown(self):
4284         super(TestNAT44EIMW, self).tearDown()
4285         if not self.vpp_dead:
4286             self.vapi.nat44_ei_ipfix_enable_disable(
4287                 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
4288             )
4289             self.ipfix_src_port = 4739
4290             self.ipfix_domain_id = 1
4291
4292             self.vapi.nat44_ei_plugin_enable_disable(enable=0)
4293             self.vapi.cli("clear logging")
4294
4295     def test_hairpinning(self):
4296         """NAT44EI hairpinning - 1:1 NAPT"""
4297
4298         host = self.pg0.remote_hosts[0]
4299         server = self.pg0.remote_hosts[1]
4300         host_in_port = 1234
4301         host_out_port = 0
4302         server_in_port = 5678
4303         server_out_port = 8765
4304         worker_1 = 1
4305         worker_2 = 2
4306
4307         self.nat44_add_address(self.nat_addr)
4308         flags = self.config_flags.NAT44_EI_IF_INSIDE
4309         self.vapi.nat44_ei_interface_add_del_feature(
4310             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4311         )
4312         self.vapi.nat44_ei_interface_add_del_feature(
4313             sw_if_index=self.pg1.sw_if_index, is_add=1
4314         )
4315
4316         # add static mapping for server
4317         self.nat44_add_static_mapping(
4318             server.ip4,
4319             self.nat_addr,
4320             server_in_port,
4321             server_out_port,
4322             proto=IP_PROTOS.tcp,
4323         )
4324
4325         cnt = self.statistics["/nat44-ei/hairpinning"]
4326         # send packet from host to server
4327         p = (
4328             Ether(src=host.mac, dst=self.pg0.local_mac)
4329             / IP(src=host.ip4, dst=self.nat_addr)
4330             / TCP(sport=host_in_port, dport=server_out_port)
4331         )
4332         self.pg0.add_stream(p)
4333         self.pg_enable_capture(self.pg_interfaces)
4334         self.pg_start()
4335         capture = self.pg0.get_capture(1)
4336         p = capture[0]
4337         try:
4338             ip = p[IP]
4339             tcp = p[TCP]
4340             self.assertEqual(ip.src, self.nat_addr)
4341             self.assertEqual(ip.dst, server.ip4)
4342             self.assertNotEqual(tcp.sport, host_in_port)
4343             self.assertEqual(tcp.dport, server_in_port)
4344             self.assert_packet_checksums_valid(p)
4345             host_out_port = tcp.sport
4346         except:
4347             self.logger.error(ppp("Unexpected or invalid packet:", p))
4348             raise
4349
4350         after = self.statistics["/nat44-ei/hairpinning"]
4351
4352         if_idx = self.pg0.sw_if_index
4353         self.assertEqual(after[worker_2][if_idx] - cnt[worker_1][if_idx], 1)
4354
4355         # send reply from server to host
4356         p = (
4357             Ether(src=server.mac, dst=self.pg0.local_mac)
4358             / IP(src=server.ip4, dst=self.nat_addr)
4359             / TCP(sport=server_in_port, dport=host_out_port)
4360         )
4361         self.pg0.add_stream(p)
4362         self.pg_enable_capture(self.pg_interfaces)
4363         self.pg_start()
4364         capture = self.pg0.get_capture(1)
4365         p = capture[0]
4366         try:
4367             ip = p[IP]
4368             tcp = p[TCP]
4369             self.assertEqual(ip.src, self.nat_addr)
4370             self.assertEqual(ip.dst, host.ip4)
4371             self.assertEqual(tcp.sport, server_out_port)
4372             self.assertEqual(tcp.dport, host_in_port)
4373             self.assert_packet_checksums_valid(p)
4374         except:
4375             self.logger.error(ppp("Unexpected or invalid packet:", p))
4376             raise
4377
4378         after = self.statistics["/nat44-ei/hairpinning"]
4379         if_idx = self.pg0.sw_if_index
4380         self.assertEqual(after[worker_1][if_idx] - cnt[worker_1][if_idx], 1)
4381         self.assertEqual(after[worker_2][if_idx] - cnt[worker_2][if_idx], 2)
4382
4383     def test_hairpinning2(self):
4384         """NAT44EI hairpinning - 1:1 NAT"""
4385
4386         server1_nat_ip = "10.0.0.10"
4387         server2_nat_ip = "10.0.0.11"
4388         host = self.pg0.remote_hosts[0]
4389         server1 = self.pg0.remote_hosts[1]
4390         server2 = self.pg0.remote_hosts[2]
4391         server_tcp_port = 22
4392         server_udp_port = 20
4393
4394         self.nat44_add_address(self.nat_addr)
4395         flags = self.config_flags.NAT44_EI_IF_INSIDE
4396         self.vapi.nat44_ei_interface_add_del_feature(
4397             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4398         )
4399         self.vapi.nat44_ei_interface_add_del_feature(
4400             sw_if_index=self.pg1.sw_if_index, is_add=1
4401         )
4402
4403         # add static mapping for servers
4404         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
4405         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
4406
4407         # host to server1
4408         pkts = []
4409         p = (
4410             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4411             / IP(src=host.ip4, dst=server1_nat_ip)
4412             / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
4413         )
4414         pkts.append(p)
4415         p = (
4416             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4417             / IP(src=host.ip4, dst=server1_nat_ip)
4418             / UDP(sport=self.udp_port_in, dport=server_udp_port)
4419         )
4420         pkts.append(p)
4421         p = (
4422             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4423             / IP(src=host.ip4, dst=server1_nat_ip)
4424             / ICMP(id=self.icmp_id_in, type="echo-request")
4425         )
4426         pkts.append(p)
4427         self.pg0.add_stream(pkts)
4428         self.pg_enable_capture(self.pg_interfaces)
4429         self.pg_start()
4430         capture = self.pg0.get_capture(len(pkts))
4431         for packet in capture:
4432             try:
4433                 self.assertEqual(packet[IP].src, self.nat_addr)
4434                 self.assertEqual(packet[IP].dst, server1.ip4)
4435                 if packet.haslayer(TCP):
4436                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
4437                     self.assertEqual(packet[TCP].dport, server_tcp_port)
4438                     self.tcp_port_out = packet[TCP].sport
4439                     self.assert_packet_checksums_valid(packet)
4440                 elif packet.haslayer(UDP):
4441                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
4442                     self.assertEqual(packet[UDP].dport, server_udp_port)
4443                     self.udp_port_out = packet[UDP].sport
4444                 else:
4445                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
4446                     self.icmp_id_out = packet[ICMP].id
4447             except:
4448                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4449                 raise
4450
4451         # server1 to host
4452         pkts = []
4453         p = (
4454             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4455             / IP(src=server1.ip4, dst=self.nat_addr)
4456             / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
4457         )
4458         pkts.append(p)
4459         p = (
4460             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4461             / IP(src=server1.ip4, dst=self.nat_addr)
4462             / UDP(sport=server_udp_port, dport=self.udp_port_out)
4463         )
4464         pkts.append(p)
4465         p = (
4466             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4467             / IP(src=server1.ip4, dst=self.nat_addr)
4468             / ICMP(id=self.icmp_id_out, type="echo-reply")
4469         )
4470         pkts.append(p)
4471         self.pg0.add_stream(pkts)
4472         self.pg_enable_capture(self.pg_interfaces)
4473         self.pg_start()
4474         capture = self.pg0.get_capture(len(pkts))
4475         for packet in capture:
4476             try:
4477                 self.assertEqual(packet[IP].src, server1_nat_ip)
4478                 self.assertEqual(packet[IP].dst, host.ip4)
4479                 if packet.haslayer(TCP):
4480                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
4481                     self.assertEqual(packet[TCP].sport, server_tcp_port)
4482                     self.assert_packet_checksums_valid(packet)
4483                 elif packet.haslayer(UDP):
4484                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
4485                     self.assertEqual(packet[UDP].sport, server_udp_port)
4486                 else:
4487                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4488             except:
4489                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4490                 raise
4491
4492         # server2 to server1
4493         pkts = []
4494         p = (
4495             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4496             / IP(src=server2.ip4, dst=server1_nat_ip)
4497             / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
4498         )
4499         pkts.append(p)
4500         p = (
4501             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4502             / IP(src=server2.ip4, dst=server1_nat_ip)
4503             / UDP(sport=self.udp_port_in, dport=server_udp_port)
4504         )
4505         pkts.append(p)
4506         p = (
4507             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4508             / IP(src=server2.ip4, dst=server1_nat_ip)
4509             / ICMP(id=self.icmp_id_in, type="echo-request")
4510         )
4511         pkts.append(p)
4512         self.pg0.add_stream(pkts)
4513         self.pg_enable_capture(self.pg_interfaces)
4514         self.pg_start()
4515         capture = self.pg0.get_capture(len(pkts))
4516         for packet in capture:
4517             try:
4518                 self.assertEqual(packet[IP].src, server2_nat_ip)
4519                 self.assertEqual(packet[IP].dst, server1.ip4)
4520                 if packet.haslayer(TCP):
4521                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
4522                     self.assertEqual(packet[TCP].dport, server_tcp_port)
4523                     self.tcp_port_out = packet[TCP].sport
4524                     self.assert_packet_checksums_valid(packet)
4525                 elif packet.haslayer(UDP):
4526                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
4527                     self.assertEqual(packet[UDP].dport, server_udp_port)
4528                     self.udp_port_out = packet[UDP].sport
4529                 else:
4530                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4531                     self.icmp_id_out = packet[ICMP].id
4532             except:
4533                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4534                 raise
4535
4536         # server1 to server2
4537         pkts = []
4538         p = (
4539             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4540             / IP(src=server1.ip4, dst=server2_nat_ip)
4541             / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
4542         )
4543         pkts.append(p)
4544         p = (
4545             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4546             / IP(src=server1.ip4, dst=server2_nat_ip)
4547             / UDP(sport=server_udp_port, dport=self.udp_port_out)
4548         )
4549         pkts.append(p)
4550         p = (
4551             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4552             / IP(src=server1.ip4, dst=server2_nat_ip)
4553             / ICMP(id=self.icmp_id_out, type="echo-reply")
4554         )
4555         pkts.append(p)
4556         self.pg0.add_stream(pkts)
4557         self.pg_enable_capture(self.pg_interfaces)
4558         self.pg_start()
4559         capture = self.pg0.get_capture(len(pkts))
4560         for packet in capture:
4561             try:
4562                 self.assertEqual(packet[IP].src, server1_nat_ip)
4563                 self.assertEqual(packet[IP].dst, server2.ip4)
4564                 if packet.haslayer(TCP):
4565                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
4566                     self.assertEqual(packet[TCP].sport, server_tcp_port)
4567                     self.assert_packet_checksums_valid(packet)
4568                 elif packet.haslayer(UDP):
4569                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
4570                     self.assertEqual(packet[UDP].sport, server_udp_port)
4571                 else:
4572                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4573             except:
4574                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4575                 raise
4576
4577
4578 if __name__ == "__main__":
4579     unittest.main(testRunner=VppTestRunner)