tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / test_nat44_ei.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import random
5 import socket
6 import struct
7 import unittest
8 from io import BytesIO
9
10 import scapy.compat
11 from framework import VppTestCase, VppLoInterface
12 from asfframework import VppTestRunner, tag_fixme_debian11, is_distro_debian11
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from scapy.all import (
15     bind_layers,
16     Packet,
17     ByteEnumField,
18     ShortField,
19     IPField,
20     IntField,
21     LongField,
22     XByteField,
23     FlagsField,
24     FieldLenField,
25     PacketListField,
26 )
27 from scapy.data import IP_PROTOS
28 from scapy.layers.inet import IP, TCP, UDP, ICMP
29 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
30 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
31 from scapy.layers.l2 import Ether, ARP, GRE
32 from scapy.packet import Raw
33 from syslog_rfc5424_parser import SyslogMessage, ParseError
34 from syslog_rfc5424_parser.constants import SyslogSeverity
35 from util import ppp
36 from vpp_ip_route import VppIpRoute, VppRoutePath
37 from vpp_neighbor import VppNeighbor
38 from vpp_papi import VppEnum
39
40
41 # NAT HA protocol event data
42 class Event(Packet):
43     name = "Event"
44     fields_desc = [
45         ByteEnumField("event_type", None, {1: "add", 2: "del", 3: "refresh"}),
46         ByteEnumField("protocol", None, {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}),
47         ShortField("flags", 0),
48         IPField("in_addr", None),
49         IPField("out_addr", None),
50         ShortField("in_port", None),
51         ShortField("out_port", None),
52         IPField("eh_addr", None),
53         IPField("ehn_addr", None),
54         ShortField("eh_port", None),
55         ShortField("ehn_port", None),
56         IntField("fib_index", None),
57         IntField("total_pkts", 0),
58         LongField("total_bytes", 0),
59     ]
60
61     def extract_padding(self, s):
62         return "", s
63
64
65 # NAT HA protocol header
66 class HANATStateSync(Packet):
67     name = "HA NAT state sync"
68     fields_desc = [
69         XByteField("version", 1),
70         FlagsField("flags", 0, 8, ["ACK"]),
71         FieldLenField("count", None, count_of="events"),
72         IntField("sequence_number", 1),
73         IntField("thread_index", 0),
74         PacketListField("events", [], Event, count_from=lambda pkt: pkt.count),
75     ]
76
77
78 class MethodHolder(VppTestCase):
79     """NAT create capture and verify method holder"""
80
81     @property
82     def config_flags(self):
83         return VppEnum.vl_api_nat44_ei_config_flags_t
84
85     @property
86     def SYSLOG_SEVERITY(self):
87         return VppEnum.vl_api_syslog_severity_t
88
89     def nat44_add_static_mapping(
90         self,
91         local_ip,
92         external_ip="0.0.0.0",
93         local_port=0,
94         external_port=0,
95         vrf_id=0,
96         is_add=1,
97         external_sw_if_index=0xFFFFFFFF,
98         proto=0,
99         tag="",
100         flags=0,
101     ):
102         """
103         Add/delete NAT44EI static mapping
104
105         :param local_ip: Local IP address
106         :param external_ip: External IP address
107         :param local_port: Local port number (Optional)
108         :param external_port: External port number (Optional)
109         :param vrf_id: VRF ID (Default 0)
110         :param is_add: 1 if add, 0 if delete (Default add)
111         :param external_sw_if_index: External interface instead of IP address
112         :param proto: IP protocol (Mandatory if port specified)
113         :param tag: Opaque string tag
114         :param flags: NAT configuration flags
115         """
116
117         if not (local_port and external_port):
118             flags |= self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
119
120         self.vapi.nat44_ei_add_del_static_mapping(
121             is_add=is_add,
122             local_ip_address=local_ip,
123             external_ip_address=external_ip,
124             external_sw_if_index=external_sw_if_index,
125             local_port=local_port,
126             external_port=external_port,
127             vrf_id=vrf_id,
128             protocol=proto,
129             flags=flags,
130             tag=tag,
131         )
132
133     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
134         """
135         Add/delete NAT44EI address
136
137         :param ip: IP address
138         :param is_add: 1 if add, 0 if delete (Default add)
139         """
140         self.vapi.nat44_ei_add_del_address_range(
141             first_ip_address=ip, last_ip_address=ip, vrf_id=vrf_id, is_add=is_add
142         )
143
144     def create_routes_and_neigbors(self):
145         r1 = VppIpRoute(
146             self,
147             self.pg7.remote_ip4,
148             32,
149             [VppRoutePath(self.pg7.remote_ip4, self.pg7.sw_if_index)],
150         )
151         r2 = VppIpRoute(
152             self,
153             self.pg8.remote_ip4,
154             32,
155             [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)],
156         )
157         r1.add_vpp_config()
158         r2.add_vpp_config()
159
160         n1 = VppNeighbor(
161             self,
162             self.pg7.sw_if_index,
163             self.pg7.remote_mac,
164             self.pg7.remote_ip4,
165             is_static=1,
166         )
167         n2 = VppNeighbor(
168             self,
169             self.pg8.sw_if_index,
170             self.pg8.remote_mac,
171             self.pg8.remote_ip4,
172             is_static=1,
173         )
174         n1.add_vpp_config()
175         n2.add_vpp_config()
176
177     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
178         """
179         Create packet stream for inside network
180
181         :param in_if: Inside interface
182         :param out_if: Outside interface
183         :param dst_ip: Destination address
184         :param ttl: TTL of generated packets
185         """
186         if dst_ip is None:
187             dst_ip = out_if.remote_ip4
188
189         pkts = []
190         # TCP
191         p = (
192             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
193             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
194             / TCP(sport=self.tcp_port_in, dport=20)
195         )
196         pkts.extend([p, p])
197
198         # UDP
199         p = (
200             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
201             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
202             / UDP(sport=self.udp_port_in, dport=20)
203         )
204         pkts.append(p)
205
206         # ICMP
207         p = (
208             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
209             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
210             / ICMP(id=self.icmp_id_in, type="echo-request")
211         )
212         pkts.append(p)
213
214         return pkts
215
216     def compose_ip6(self, ip4, pref, plen):
217         """
218         Compose IPv4-embedded IPv6 addresses
219
220         :param ip4: IPv4 address
221         :param pref: IPv6 prefix
222         :param plen: IPv6 prefix length
223         :returns: IPv4-embedded IPv6 addresses
224         """
225         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
226         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
227         if plen == 32:
228             pref_n[4] = ip4_n[0]
229             pref_n[5] = ip4_n[1]
230             pref_n[6] = ip4_n[2]
231             pref_n[7] = ip4_n[3]
232         elif plen == 40:
233             pref_n[5] = ip4_n[0]
234             pref_n[6] = ip4_n[1]
235             pref_n[7] = ip4_n[2]
236             pref_n[9] = ip4_n[3]
237         elif plen == 48:
238             pref_n[6] = ip4_n[0]
239             pref_n[7] = ip4_n[1]
240             pref_n[9] = ip4_n[2]
241             pref_n[10] = ip4_n[3]
242         elif plen == 56:
243             pref_n[7] = ip4_n[0]
244             pref_n[9] = ip4_n[1]
245             pref_n[10] = ip4_n[2]
246             pref_n[11] = ip4_n[3]
247         elif plen == 64:
248             pref_n[9] = ip4_n[0]
249             pref_n[10] = ip4_n[1]
250             pref_n[11] = ip4_n[2]
251             pref_n[12] = ip4_n[3]
252         elif plen == 96:
253             pref_n[12] = ip4_n[0]
254             pref_n[13] = ip4_n[1]
255             pref_n[14] = ip4_n[2]
256             pref_n[15] = ip4_n[3]
257         packed_pref_n = b"".join([scapy.compat.chb(x) for x in pref_n])
258         return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
259
260     def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
261         """
262         Create packet stream for outside network
263
264         :param out_if: Outside interface
265         :param dst_ip: Destination IP address (Default use global NAT address)
266         :param ttl: TTL of generated packets
267         :param use_inside_ports: Use inside NAT ports as destination ports
268                instead of outside ports
269         """
270         if dst_ip is None:
271             dst_ip = self.nat_addr
272         if not use_inside_ports:
273             tcp_port = self.tcp_port_out
274             udp_port = self.udp_port_out
275             icmp_id = self.icmp_id_out
276         else:
277             tcp_port = self.tcp_port_in
278             udp_port = self.udp_port_in
279             icmp_id = self.icmp_id_in
280         pkts = []
281         # TCP
282         p = (
283             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
284             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
285             / TCP(dport=tcp_port, sport=20)
286         )
287         pkts.extend([p, p])
288
289         # UDP
290         p = (
291             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
292             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
293             / UDP(dport=udp_port, sport=20)
294         )
295         pkts.append(p)
296
297         # ICMP
298         p = (
299             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
300             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
301             / ICMP(id=icmp_id, type="echo-reply")
302         )
303         pkts.append(p)
304
305         return pkts
306
307     def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
308         """
309         Create packet stream for outside network
310
311         :param out_if: Outside interface
312         :param dst_ip: Destination IP address (Default use global NAT address)
313         :param hl: HL of generated packets
314         """
315         pkts = []
316         # TCP
317         p = (
318             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
319             / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
320             / TCP(dport=self.tcp_port_out, sport=20)
321         )
322         pkts.append(p)
323
324         # UDP
325         p = (
326             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
327             / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
328             / UDP(dport=self.udp_port_out, sport=20)
329         )
330         pkts.append(p)
331
332         # ICMP
333         p = (
334             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
335             / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
336             / ICMPv6EchoReply(id=self.icmp_id_out)
337         )
338         pkts.append(p)
339
340         return pkts
341
342     def verify_capture_out(
343         self,
344         capture,
345         nat_ip=None,
346         same_port=False,
347         dst_ip=None,
348         is_ip6=False,
349         ignore_port=False,
350     ):
351         """
352         Verify captured packets on outside network
353
354         :param capture: Captured packets
355         :param nat_ip: Translated IP address (Default use global NAT address)
356         :param same_port: Source port number is not translated (Default False)
357         :param dst_ip: Destination IP address (Default do not verify)
358         :param is_ip6: If L3 protocol is IPv6 (Default False)
359         """
360         if is_ip6:
361             IP46 = IPv6
362             ICMP46 = ICMPv6EchoRequest
363         else:
364             IP46 = IP
365             ICMP46 = ICMP
366         if nat_ip is None:
367             nat_ip = self.nat_addr
368         for packet in capture:
369             try:
370                 if not is_ip6:
371                     self.assert_packet_checksums_valid(packet)
372                 self.assertEqual(packet[IP46].src, nat_ip)
373                 if dst_ip is not None:
374                     self.assertEqual(packet[IP46].dst, dst_ip)
375                 if packet.haslayer(TCP):
376                     if not ignore_port:
377                         if same_port:
378                             self.assertEqual(packet[TCP].sport, self.tcp_port_in)
379                         else:
380                             self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
381                     self.tcp_port_out = packet[TCP].sport
382                     self.assert_packet_checksums_valid(packet)
383                 elif packet.haslayer(UDP):
384                     if not ignore_port:
385                         if same_port:
386                             self.assertEqual(packet[UDP].sport, self.udp_port_in)
387                         else:
388                             self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
389                     self.udp_port_out = packet[UDP].sport
390                 else:
391                     if not ignore_port:
392                         if same_port:
393                             self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
394                         else:
395                             self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
396                     self.icmp_id_out = packet[ICMP46].id
397                     self.assert_packet_checksums_valid(packet)
398             except:
399                 self.logger.error(
400                     ppp("Unexpected or invalid packet (outside network):", packet)
401                 )
402                 raise
403
404     def verify_capture_out_ip6(self, capture, nat_ip, same_port=False, dst_ip=None):
405         """
406         Verify captured packets on outside network
407
408         :param capture: Captured packets
409         :param nat_ip: Translated IP address
410         :param same_port: Source port number is not translated (Default False)
411         :param dst_ip: Destination IP address (Default do not verify)
412         """
413         return self.verify_capture_out(capture, nat_ip, same_port, dst_ip, True)
414
415     def verify_capture_in(self, capture, in_if):
416         """
417         Verify captured packets on inside network
418
419         :param capture: Captured packets
420         :param in_if: Inside interface
421         """
422         for packet in capture:
423             try:
424                 self.assert_packet_checksums_valid(packet)
425                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
426                 if packet.haslayer(TCP):
427                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
428                 elif packet.haslayer(UDP):
429                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
430                 else:
431                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
432             except:
433                 self.logger.error(
434                     ppp("Unexpected or invalid packet (inside network):", packet)
435                 )
436                 raise
437
438     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
439         """
440         Verify captured packet that don't have to be translated
441
442         :param capture: Captured packets
443         :param ingress_if: Ingress interface
444         :param egress_if: Egress interface
445         """
446         for packet in capture:
447             try:
448                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
449                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
450                 if packet.haslayer(TCP):
451                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
452                 elif packet.haslayer(UDP):
453                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
454                 else:
455                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
456             except:
457                 self.logger.error(
458                     ppp("Unexpected or invalid packet (inside network):", packet)
459                 )
460                 raise
461
462     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None, icmp_type=11):
463         """
464         Verify captured packets with ICMP errors on outside network
465
466         :param capture: Captured packets
467         :param src_ip: Translated IP address or IP address of VPP
468                        (Default use global NAT address)
469         :param icmp_type: Type of error ICMP packet
470                           we are expecting (Default 11)
471         """
472         if src_ip is None:
473             src_ip = self.nat_addr
474         for packet in capture:
475             try:
476                 self.assertEqual(packet[IP].src, src_ip)
477                 self.assertEqual(packet.haslayer(ICMP), 1)
478                 icmp = packet[ICMP]
479                 self.assertEqual(icmp.type, icmp_type)
480                 self.assertTrue(icmp.haslayer(IPerror))
481                 inner_ip = icmp[IPerror]
482                 if inner_ip.haslayer(TCPerror):
483                     self.assertEqual(inner_ip[TCPerror].dport, self.tcp_port_out)
484                 elif inner_ip.haslayer(UDPerror):
485                     self.assertEqual(inner_ip[UDPerror].dport, self.udp_port_out)
486                 else:
487                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
488             except:
489                 self.logger.error(
490                     ppp("Unexpected or invalid packet (outside network):", packet)
491                 )
492                 raise
493
494     def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
495         """
496         Verify captured packets with ICMP errors on inside network
497
498         :param capture: Captured packets
499         :param in_if: Inside interface
500         :param icmp_type: Type of error ICMP packet
501                           we are expecting (Default 11)
502         """
503         for packet in capture:
504             try:
505                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
506                 self.assertEqual(packet.haslayer(ICMP), 1)
507                 icmp = packet[ICMP]
508                 self.assertEqual(icmp.type, icmp_type)
509                 self.assertTrue(icmp.haslayer(IPerror))
510                 inner_ip = icmp[IPerror]
511                 if inner_ip.haslayer(TCPerror):
512                     self.assertEqual(inner_ip[TCPerror].sport, self.tcp_port_in)
513                 elif inner_ip.haslayer(UDPerror):
514                     self.assertEqual(inner_ip[UDPerror].sport, self.udp_port_in)
515                 else:
516                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
517             except:
518                 self.logger.error(
519                     ppp("Unexpected or invalid packet (inside network):", packet)
520                 )
521                 raise
522
523     def create_stream_frag(
524         self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
525     ):
526         """
527         Create fragmented packet stream
528
529         :param src_if: Source interface
530         :param dst: Destination IPv4 address
531         :param sport: Source port
532         :param dport: Destination port
533         :param data: Payload data
534         :param proto: protocol (TCP, UDP, ICMP)
535         :param echo_reply: use echo_reply if protocol is ICMP
536         :returns: Fragments
537         """
538         if proto == IP_PROTOS.tcp:
539             p = (
540                 IP(src=src_if.remote_ip4, dst=dst)
541                 / TCP(sport=sport, dport=dport)
542                 / Raw(data)
543             )
544             p = p.__class__(scapy.compat.raw(p))
545             chksum = p[TCP].chksum
546             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
547         elif proto == IP_PROTOS.udp:
548             proto_header = UDP(sport=sport, dport=dport)
549         elif proto == IP_PROTOS.icmp:
550             if not echo_reply:
551                 proto_header = ICMP(id=sport, type="echo-request")
552             else:
553                 proto_header = ICMP(id=sport, type="echo-reply")
554         else:
555             raise Exception("Unsupported protocol")
556         id = random.randint(0, 65535)
557         pkts = []
558         if proto == IP_PROTOS.tcp:
559             raw = Raw(data[0:4])
560         else:
561             raw = Raw(data[0:16])
562         p = (
563             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
564             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
565             / proto_header
566             / raw
567         )
568         pkts.append(p)
569         if proto == IP_PROTOS.tcp:
570             raw = Raw(data[4:20])
571         else:
572             raw = Raw(data[16:32])
573         p = (
574             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
575             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
576             / raw
577         )
578         pkts.append(p)
579         if proto == IP_PROTOS.tcp:
580             raw = Raw(data[20:])
581         else:
582             raw = Raw(data[32:])
583         p = (
584             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
585             / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
586             / raw
587         )
588         pkts.append(p)
589         return pkts
590
591     def reass_frags_and_verify(self, frags, src, dst):
592         """
593         Reassemble and verify fragmented packet
594
595         :param frags: Captured fragments
596         :param src: Source IPv4 address to verify
597         :param dst: Destination IPv4 address to verify
598
599         :returns: Reassembled IPv4 packet
600         """
601         buffer = BytesIO()
602         for p in frags:
603             self.assertEqual(p[IP].src, src)
604             self.assertEqual(p[IP].dst, dst)
605             self.assert_ip_checksum_valid(p)
606             buffer.seek(p[IP].frag * 8)
607             buffer.write(bytes(p[IP].payload))
608         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
609         if ip.proto == IP_PROTOS.tcp:
610             p = ip / TCP(buffer.getvalue())
611             self.logger.debug(ppp("Reassembled:", p))
612             self.assert_tcp_checksum_valid(p)
613         elif ip.proto == IP_PROTOS.udp:
614             p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
615         elif ip.proto == IP_PROTOS.icmp:
616             p = ip / ICMP(buffer.getvalue())
617         return p
618
619     def verify_ipfix_nat44_ses(self, data):
620         """
621         Verify IPFIX NAT44EI session create/delete event
622
623         :param data: Decoded IPFIX data records
624         """
625         nat44_ses_create_num = 0
626         nat44_ses_delete_num = 0
627         self.assertEqual(6, len(data))
628         for record in data:
629             # natEvent
630             self.assertIn(scapy.compat.orb(record[230]), [4, 5])
631             if scapy.compat.orb(record[230]) == 4:
632                 nat44_ses_create_num += 1
633             else:
634                 nat44_ses_delete_num += 1
635             # sourceIPv4Address
636             self.assertEqual(self.pg0.remote_ip4, str(ipaddress.IPv4Address(record[8])))
637             # postNATSourceIPv4Address
638             self.assertEqual(
639                 socket.inet_pton(socket.AF_INET, self.nat_addr), record[225]
640             )
641             # ingressVRFID
642             self.assertEqual(struct.pack("!I", 0), record[234])
643             # protocolIdentifier/sourceTransportPort
644             # /postNAPTSourceTransportPort
645             if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
646                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
647                 self.assertEqual(struct.pack("!H", self.icmp_id_out), record[227])
648             elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
649                 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
650                 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
651             elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
652                 self.assertEqual(struct.pack("!H", self.udp_port_in), record[7])
653                 self.assertEqual(struct.pack("!H", self.udp_port_out), record[227])
654             else:
655                 self.fail(f"Invalid protocol {scapy.compat.orb(record[4])}")
656         self.assertEqual(3, nat44_ses_create_num)
657         self.assertEqual(3, nat44_ses_delete_num)
658
659     def verify_ipfix_addr_exhausted(self, data):
660         self.assertEqual(1, len(data))
661         record = data[0]
662         # natEvent
663         self.assertEqual(scapy.compat.orb(record[230]), 3)
664         # natPoolID
665         self.assertEqual(struct.pack("!I", 0), record[283])
666         return len(data)
667
668     def verify_ipfix_max_sessions(self, data, limit):
669         self.assertEqual(1, len(data))
670         record = data[0]
671         # natEvent
672         self.assertEqual(scapy.compat.orb(record[230]), 13)
673         # natQuotaExceededEvent
674         self.assertEqual(struct.pack("!I", 1), record[466])
675         # maxSessionEntries
676         self.assertEqual(struct.pack("!I", limit), record[471])
677         return len(data)
678
679     def verify_no_nat44_user(self):
680         """Verify that there is no NAT44EI user"""
681         users = self.vapi.nat44_ei_user_dump()
682         self.assertEqual(len(users), 0)
683         users = self.statistics["/nat44-ei/total-users"]
684         self.assertEqual(users[0][0], 0)
685         sessions = self.statistics["/nat44-ei/total-sessions"]
686         self.assertEqual(sessions[0][0], 0)
687
688     def verify_syslog_apmap(self, data, is_add=True):
689         message = data.decode("utf-8")
690         try:
691             message = SyslogMessage.parse(message)
692         except ParseError as e:
693             self.logger.error(e)
694             raise
695         else:
696             self.assertEqual(message.severity, SyslogSeverity.info)
697             self.assertEqual(message.appname, "NAT")
698             self.assertEqual(message.msgid, "APMADD" if is_add else "APMDEL")
699             sd_params = message.sd.get("napmap")
700             self.assertTrue(sd_params is not None)
701             self.assertEqual(sd_params.get("IATYP"), "IPv4")
702             self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
703             self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
704             self.assertEqual(sd_params.get("XATYP"), "IPv4")
705             self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
706             self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
707             self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
708             self.assertTrue(sd_params.get("SSUBIX") is not None)
709             self.assertEqual(sd_params.get("SVLAN"), "0")
710
711     def verify_mss_value(self, pkt, mss):
712         if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
713             raise TypeError("Not a TCP/IP packet")
714
715         for option in pkt[TCP].options:
716             if option[0] == "MSS":
717                 self.assertEqual(option[1], mss)
718                 self.assert_tcp_checksum_valid(pkt)
719
720     @staticmethod
721     def proto2layer(proto):
722         if proto == IP_PROTOS.tcp:
723             return TCP
724         elif proto == IP_PROTOS.udp:
725             return UDP
726         elif proto == IP_PROTOS.icmp:
727             return ICMP
728         else:
729             raise Exception("Unsupported protocol")
730
731     def frag_in_order(
732         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
733     ):
734         layer = self.proto2layer(proto)
735
736         if proto == IP_PROTOS.tcp:
737             data = b"A" * 4 + b"B" * 16 + b"C" * 3
738         else:
739             data = b"A" * 16 + b"B" * 16 + b"C" * 3
740         self.port_in = random.randint(1025, 65535)
741
742         # in2out
743         pkts = self.create_stream_frag(
744             self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
745         )
746         self.pg0.add_stream(pkts)
747         self.pg_enable_capture(self.pg_interfaces)
748         self.pg_start()
749         frags = self.pg1.get_capture(len(pkts))
750         if not dont_translate:
751             p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
752         else:
753             p = self.reass_frags_and_verify(
754                 frags, self.pg0.remote_ip4, self.pg1.remote_ip4
755             )
756         if proto != IP_PROTOS.icmp:
757             if not dont_translate:
758                 self.assertEqual(p[layer].dport, 20)
759                 if not ignore_port:
760                     self.assertNotEqual(p[layer].sport, self.port_in)
761             else:
762                 self.assertEqual(p[layer].sport, self.port_in)
763         else:
764             if not ignore_port:
765                 if not dont_translate:
766                     self.assertNotEqual(p[layer].id, self.port_in)
767                 else:
768                     self.assertEqual(p[layer].id, self.port_in)
769         self.assertEqual(data, p[Raw].load)
770
771         # out2in
772         if not dont_translate:
773             dst_addr = self.nat_addr
774         else:
775             dst_addr = self.pg0.remote_ip4
776         if proto != IP_PROTOS.icmp:
777             sport = 20
778             dport = p[layer].sport
779         else:
780             sport = p[layer].id
781             dport = 0
782         pkts = self.create_stream_frag(
783             self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
784         )
785         self.pg1.add_stream(pkts)
786         self.pg_enable_capture(self.pg_interfaces)
787         self.pg_start()
788         frags = self.pg0.get_capture(len(pkts))
789         p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
790         if proto != IP_PROTOS.icmp:
791             self.assertEqual(p[layer].sport, 20)
792             self.assertEqual(p[layer].dport, self.port_in)
793         else:
794             self.assertEqual(p[layer].id, self.port_in)
795         self.assertEqual(data, p[Raw].load)
796
797     def reass_hairpinning(
798         self,
799         server_addr,
800         server_in_port,
801         server_out_port,
802         host_in_port,
803         proto=IP_PROTOS.tcp,
804         ignore_port=False,
805     ):
806         layer = self.proto2layer(proto)
807
808         if proto == IP_PROTOS.tcp:
809             data = b"A" * 4 + b"B" * 16 + b"C" * 3
810         else:
811             data = b"A" * 16 + b"B" * 16 + b"C" * 3
812
813         # send packet from host to server
814         pkts = self.create_stream_frag(
815             self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto
816         )
817         self.pg0.add_stream(pkts)
818         self.pg_enable_capture(self.pg_interfaces)
819         self.pg_start()
820         frags = self.pg0.get_capture(len(pkts))
821         p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr)
822         if proto != IP_PROTOS.icmp:
823             if not ignore_port:
824                 self.assertNotEqual(p[layer].sport, host_in_port)
825             self.assertEqual(p[layer].dport, server_in_port)
826         else:
827             if not ignore_port:
828                 self.assertNotEqual(p[layer].id, host_in_port)
829         self.assertEqual(data, p[Raw].load)
830
831     def frag_out_of_order(
832         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
833     ):
834         layer = self.proto2layer(proto)
835
836         if proto == IP_PROTOS.tcp:
837             data = b"A" * 4 + b"B" * 16 + b"C" * 3
838         else:
839             data = b"A" * 16 + b"B" * 16 + b"C" * 3
840         self.port_in = random.randint(1025, 65535)
841
842         for i in range(2):
843             # in2out
844             pkts = self.create_stream_frag(
845                 self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
846             )
847             pkts.reverse()
848             self.pg0.add_stream(pkts)
849             self.pg_enable_capture(self.pg_interfaces)
850             self.pg_start()
851             frags = self.pg1.get_capture(len(pkts))
852             if not dont_translate:
853                 p = self.reass_frags_and_verify(
854                     frags, self.nat_addr, self.pg1.remote_ip4
855                 )
856             else:
857                 p = self.reass_frags_and_verify(
858                     frags, self.pg0.remote_ip4, self.pg1.remote_ip4
859                 )
860             if proto != IP_PROTOS.icmp:
861                 if not dont_translate:
862                     self.assertEqual(p[layer].dport, 20)
863                     if not ignore_port:
864                         self.assertNotEqual(p[layer].sport, self.port_in)
865                 else:
866                     self.assertEqual(p[layer].sport, self.port_in)
867             else:
868                 if not ignore_port:
869                     if not dont_translate:
870                         self.assertNotEqual(p[layer].id, self.port_in)
871                     else:
872                         self.assertEqual(p[layer].id, self.port_in)
873             self.assertEqual(data, p[Raw].load)
874
875             # out2in
876             if not dont_translate:
877                 dst_addr = self.nat_addr
878             else:
879                 dst_addr = self.pg0.remote_ip4
880             if proto != IP_PROTOS.icmp:
881                 sport = 20
882                 dport = p[layer].sport
883             else:
884                 sport = p[layer].id
885                 dport = 0
886             pkts = self.create_stream_frag(
887                 self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
888             )
889             pkts.reverse()
890             self.pg1.add_stream(pkts)
891             self.pg_enable_capture(self.pg_interfaces)
892             self.pg_start()
893             frags = self.pg0.get_capture(len(pkts))
894             p = self.reass_frags_and_verify(
895                 frags, self.pg1.remote_ip4, self.pg0.remote_ip4
896             )
897             if proto != IP_PROTOS.icmp:
898                 self.assertEqual(p[layer].sport, 20)
899                 self.assertEqual(p[layer].dport, self.port_in)
900             else:
901                 self.assertEqual(p[layer].id, self.port_in)
902             self.assertEqual(data, p[Raw].load)
903
904
905 def get_nat44_ei_in2out_worker_index(ip, vpp_worker_count):
906     if 0 == vpp_worker_count:
907         return 0
908     numeric = socket.inet_aton(ip)
909     numeric = struct.unpack("!L", numeric)[0]
910     numeric = socket.htonl(numeric)
911     h = numeric + (numeric >> 8) + (numeric >> 16) + (numeric >> 24)
912     return 1 + h % vpp_worker_count
913
914
915 @tag_fixme_debian11
916 class TestNAT44EI(MethodHolder):
917     """NAT44EI Test Cases"""
918
919     max_translations = 10240
920     max_users = 10240
921
922     @classmethod
923     def setUpClass(cls):
924         super(TestNAT44EI, cls).setUpClass()
925         if is_distro_debian11 == True and not hasattr(cls, "vpp"):
926             return
927         cls.vapi.cli("set log class nat44-ei level debug")
928
929         cls.tcp_port_in = 6303
930         cls.tcp_port_out = 6303
931         cls.udp_port_in = 6304
932         cls.udp_port_out = 6304
933         cls.icmp_id_in = 6305
934         cls.icmp_id_out = 6305
935         cls.nat_addr = "10.0.0.3"
936         cls.ipfix_src_port = 4739
937         cls.ipfix_domain_id = 1
938         cls.tcp_external_port = 80
939         cls.udp_external_port = 69
940
941         cls.create_pg_interfaces(range(10))
942         cls.interfaces = list(cls.pg_interfaces[0:4])
943
944         for i in cls.interfaces:
945             i.admin_up()
946             i.config_ip4()
947             i.resolve_arp()
948
949         cls.pg0.generate_remote_hosts(3)
950         cls.pg0.configure_ipv4_neighbors()
951
952         cls.pg1.generate_remote_hosts(1)
953         cls.pg1.configure_ipv4_neighbors()
954
955         cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
956         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10})
957         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20})
958
959         cls.pg4._local_ip4 = "172.16.255.1"
960         cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
961         cls.pg4.set_table_ip4(10)
962         cls.pg5._local_ip4 = "172.17.255.3"
963         cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
964         cls.pg5.set_table_ip4(10)
965         cls.pg6._local_ip4 = "172.16.255.1"
966         cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
967         cls.pg6.set_table_ip4(20)
968         for i in cls.overlapping_interfaces:
969             i.config_ip4()
970             i.admin_up()
971             i.resolve_arp()
972
973         cls.pg7.admin_up()
974         cls.pg8.admin_up()
975
976         cls.pg9.generate_remote_hosts(2)
977         cls.pg9.config_ip4()
978         cls.vapi.sw_interface_add_del_address(
979             sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24"
980         )
981
982         cls.pg9.admin_up()
983         cls.pg9.resolve_arp()
984         cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
985         cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
986         cls.pg9.resolve_arp()
987
988     def plugin_enable(self):
989         self.vapi.nat44_ei_plugin_enable_disable(
990             sessions=self.max_translations, users=self.max_users, enable=1
991         )
992
993     def setUp(self):
994         super(TestNAT44EI, self).setUp()
995         self.plugin_enable()
996
997     def tearDown(self):
998         super(TestNAT44EI, self).tearDown()
999         if not self.vpp_dead:
1000             self.vapi.nat44_ei_ipfix_enable_disable(
1001                 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
1002             )
1003             self.ipfix_src_port = 4739
1004             self.ipfix_domain_id = 1
1005
1006             self.vapi.nat44_ei_plugin_enable_disable(enable=0)
1007             self.vapi.cli("clear logging")
1008
1009     def test_clear_sessions(self):
1010         """NAT44EI session clearing test"""
1011
1012         self.nat44_add_address(self.nat_addr)
1013         flags = self.config_flags.NAT44_EI_IF_INSIDE
1014         self.vapi.nat44_ei_interface_add_del_feature(
1015             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1016         )
1017         self.vapi.nat44_ei_interface_add_del_feature(
1018             sw_if_index=self.pg1.sw_if_index, is_add=1
1019         )
1020
1021         pkts = self.create_stream_in(self.pg0, self.pg1)
1022         self.pg0.add_stream(pkts)
1023         self.pg_enable_capture(self.pg_interfaces)
1024         self.pg_start()
1025         capture = self.pg1.get_capture(len(pkts))
1026         self.verify_capture_out(capture)
1027
1028         sessions = self.statistics["/nat44-ei/total-sessions"]
1029         self.assertGreater(sessions[:, 0].sum(), 0, "Session count invalid")
1030         self.logger.info("sessions before clearing: %s" % sessions[0][0])
1031
1032         self.vapi.cli("clear nat44 ei sessions")
1033
1034         sessions = self.statistics["/nat44-ei/total-sessions"]
1035         self.assertEqual(sessions[:, 0].sum(), 0, "Session count invalid")
1036         self.logger.info("sessions after clearing: %s" % sessions[0][0])
1037
1038     def test_dynamic(self):
1039         """NAT44EI dynamic translation test"""
1040         self.nat44_add_address(self.nat_addr)
1041         flags = self.config_flags.NAT44_EI_IF_INSIDE
1042         self.vapi.nat44_ei_interface_add_del_feature(
1043             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1044         )
1045         self.vapi.nat44_ei_interface_add_del_feature(
1046             sw_if_index=self.pg1.sw_if_index, is_add=1
1047         )
1048
1049         # in2out
1050         tcpn = self.statistics["/nat44-ei/in2out/slowpath/tcp"]
1051         udpn = self.statistics["/nat44-ei/in2out/slowpath/udp"]
1052         icmpn = self.statistics["/nat44-ei/in2out/slowpath/icmp"]
1053         drops = self.statistics["/nat44-ei/in2out/slowpath/drops"]
1054
1055         pkts = self.create_stream_in(self.pg0, self.pg1)
1056         self.pg0.add_stream(pkts)
1057         self.pg_enable_capture(self.pg_interfaces)
1058         self.pg_start()
1059         capture = self.pg1.get_capture(len(pkts))
1060         self.verify_capture_out(capture)
1061
1062         if_idx = self.pg0.sw_if_index
1063         cnt = self.statistics["/nat44-ei/in2out/slowpath/tcp"]
1064         self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
1065         cnt = self.statistics["/nat44-ei/in2out/slowpath/udp"]
1066         self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
1067         cnt = self.statistics["/nat44-ei/in2out/slowpath/icmp"]
1068         self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
1069         cnt = self.statistics["/nat44-ei/in2out/slowpath/drops"]
1070         self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
1071
1072         # out2in
1073         tcpn = self.statistics["/nat44-ei/out2in/slowpath/tcp"]
1074         udpn = self.statistics["/nat44-ei/out2in/slowpath/udp"]
1075         icmpn = self.statistics["/nat44-ei/out2in/slowpath/icmp"]
1076         drops = self.statistics["/nat44-ei/out2in/slowpath/drops"]
1077
1078         pkts = self.create_stream_out(self.pg1)
1079         self.pg1.add_stream(pkts)
1080         self.pg_enable_capture(self.pg_interfaces)
1081         self.pg_start()
1082         capture = self.pg0.get_capture(len(pkts))
1083         self.verify_capture_in(capture, self.pg0)
1084
1085         if_idx = self.pg1.sw_if_index
1086         cnt = self.statistics["/nat44-ei/out2in/slowpath/tcp"]
1087         self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
1088         cnt = self.statistics["/nat44-ei/out2in/slowpath/udp"]
1089         self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
1090         cnt = self.statistics["/nat44-ei/out2in/slowpath/icmp"]
1091         self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
1092         cnt = self.statistics["/nat44-ei/out2in/slowpath/drops"]
1093         self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
1094
1095         users = self.statistics["/nat44-ei/total-users"]
1096         self.assertEqual(users[:, 0].sum(), 1)
1097         sessions = self.statistics["/nat44-ei/total-sessions"]
1098         self.assertEqual(sessions[:, 0].sum(), 3)
1099
1100     def test_dynamic_icmp_errors_in2out_ttl_1(self):
1101         """NAT44EI handling of client packets with TTL=1"""
1102
1103         self.nat44_add_address(self.nat_addr)
1104         flags = self.config_flags.NAT44_EI_IF_INSIDE
1105         self.vapi.nat44_ei_interface_add_del_feature(
1106             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1107         )
1108         self.vapi.nat44_ei_interface_add_del_feature(
1109             sw_if_index=self.pg1.sw_if_index, is_add=1
1110         )
1111
1112         # Client side - generate traffic
1113         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1114         capture = self.send_and_expect_some(self.pg0, pkts, self.pg0)
1115
1116         # Client side - verify ICMP type 11 packets
1117         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1118
1119     def test_dynamic_icmp_errors_out2in_ttl_1(self):
1120         """NAT44EI handling of server packets with TTL=1"""
1121
1122         self.nat44_add_address(self.nat_addr)
1123         flags = self.config_flags.NAT44_EI_IF_INSIDE
1124         self.vapi.nat44_ei_interface_add_del_feature(
1125             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1126         )
1127         self.vapi.nat44_ei_interface_add_del_feature(
1128             sw_if_index=self.pg1.sw_if_index, is_add=1
1129         )
1130
1131         # Client side - create sessions
1132         pkts = self.create_stream_in(self.pg0, self.pg1)
1133         self.pg0.add_stream(pkts)
1134         self.pg_enable_capture(self.pg_interfaces)
1135         self.pg_start()
1136
1137         # Server side - generate traffic
1138         capture = self.pg1.get_capture(len(pkts))
1139         self.verify_capture_out(capture)
1140         pkts = self.create_stream_out(self.pg1, ttl=1)
1141         capture = self.send_and_expect_some(self.pg1, pkts, self.pg1)
1142
1143         # Server side - verify ICMP type 11 packets
1144         self.verify_capture_out_with_icmp_errors(capture, src_ip=self.pg1.local_ip4)
1145
1146     def test_dynamic_icmp_errors_in2out_ttl_2(self):
1147         """NAT44EI handling of error responses to client packets with TTL=2"""
1148
1149         self.nat44_add_address(self.nat_addr)
1150         flags = self.config_flags.NAT44_EI_IF_INSIDE
1151         self.vapi.nat44_ei_interface_add_del_feature(
1152             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1153         )
1154         self.vapi.nat44_ei_interface_add_del_feature(
1155             sw_if_index=self.pg1.sw_if_index, is_add=1
1156         )
1157
1158         # Client side - generate traffic
1159         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1160         self.pg0.add_stream(pkts)
1161         self.pg_enable_capture(self.pg_interfaces)
1162         self.pg_start()
1163
1164         # Server side - simulate ICMP type 11 response
1165         capture = self.pg1.get_capture(len(pkts))
1166         pkts = [
1167             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1168             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1169             / ICMP(type=11)
1170             / packet[IP]
1171             for packet in capture
1172         ]
1173         self.pg1.add_stream(pkts)
1174         self.pg_enable_capture(self.pg_interfaces)
1175         self.pg_start()
1176
1177         # Client side - verify ICMP type 11 packets
1178         capture = self.pg0.get_capture(len(pkts))
1179         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1180
1181     def test_dynamic_icmp_errors_out2in_ttl_2(self):
1182         """NAT44EI handling of error responses to server packets with TTL=2"""
1183
1184         self.nat44_add_address(self.nat_addr)
1185         flags = self.config_flags.NAT44_EI_IF_INSIDE
1186         self.vapi.nat44_ei_interface_add_del_feature(
1187             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1188         )
1189         self.vapi.nat44_ei_interface_add_del_feature(
1190             sw_if_index=self.pg1.sw_if_index, is_add=1
1191         )
1192
1193         # Client side - create sessions
1194         pkts = self.create_stream_in(self.pg0, self.pg1)
1195         self.pg0.add_stream(pkts)
1196         self.pg_enable_capture(self.pg_interfaces)
1197         self.pg_start()
1198
1199         # Server side - generate traffic
1200         capture = self.pg1.get_capture(len(pkts))
1201         self.verify_capture_out(capture)
1202         pkts = self.create_stream_out(self.pg1, ttl=2)
1203         self.pg1.add_stream(pkts)
1204         self.pg_enable_capture(self.pg_interfaces)
1205         self.pg_start()
1206
1207         # Client side - simulate ICMP type 11 response
1208         capture = self.pg0.get_capture(len(pkts))
1209         pkts = [
1210             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1211             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1212             / ICMP(type=11)
1213             / packet[IP]
1214             for packet in capture
1215         ]
1216         self.pg0.add_stream(pkts)
1217         self.pg_enable_capture(self.pg_interfaces)
1218         self.pg_start()
1219
1220         # Server side - verify ICMP type 11 packets
1221         capture = self.pg1.get_capture(len(pkts))
1222         self.verify_capture_out_with_icmp_errors(capture)
1223
1224     def test_ping_out_interface_from_outside(self):
1225         """NAT44EI ping out interface from outside network"""
1226
1227         self.nat44_add_address(self.nat_addr)
1228         flags = self.config_flags.NAT44_EI_IF_INSIDE
1229         self.vapi.nat44_ei_interface_add_del_feature(
1230             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1231         )
1232         self.vapi.nat44_ei_interface_add_del_feature(
1233             sw_if_index=self.pg1.sw_if_index, is_add=1
1234         )
1235
1236         p = (
1237             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1238             / IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4)
1239             / ICMP(id=self.icmp_id_out, type="echo-request")
1240         )
1241         pkts = [p]
1242         self.pg1.add_stream(pkts)
1243         self.pg_enable_capture(self.pg_interfaces)
1244         self.pg_start()
1245         capture = self.pg1.get_capture(len(pkts))
1246         packet = capture[0]
1247         try:
1248             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1249             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1250             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1251             self.assertEqual(packet[ICMP].type, 0)  # echo reply
1252         except:
1253             self.logger.error(
1254                 ppp("Unexpected or invalid packet (outside network):", packet)
1255             )
1256             raise
1257
1258     def test_ping_internal_host_from_outside(self):
1259         """NAT44EI ping internal host from outside network"""
1260
1261         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1262         flags = self.config_flags.NAT44_EI_IF_INSIDE
1263         self.vapi.nat44_ei_interface_add_del_feature(
1264             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1265         )
1266         self.vapi.nat44_ei_interface_add_del_feature(
1267             sw_if_index=self.pg1.sw_if_index, is_add=1
1268         )
1269
1270         # out2in
1271         pkt = (
1272             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1273             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64)
1274             / ICMP(id=self.icmp_id_out, type="echo-request")
1275         )
1276         self.pg1.add_stream(pkt)
1277         self.pg_enable_capture(self.pg_interfaces)
1278         self.pg_start()
1279         capture = self.pg0.get_capture(1)
1280         self.verify_capture_in(capture, self.pg0)
1281         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1282
1283         # in2out
1284         pkt = (
1285             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1286             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
1287             / ICMP(id=self.icmp_id_in, type="echo-reply")
1288         )
1289         self.pg0.add_stream(pkt)
1290         self.pg_enable_capture(self.pg_interfaces)
1291         self.pg_start()
1292         capture = self.pg1.get_capture(1)
1293         self.verify_capture_out(capture, same_port=True)
1294         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1295
1296     def test_forwarding(self):
1297         """NAT44EI forwarding test"""
1298
1299         flags = self.config_flags.NAT44_EI_IF_INSIDE
1300         self.vapi.nat44_ei_interface_add_del_feature(
1301             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1302         )
1303         self.vapi.nat44_ei_interface_add_del_feature(
1304             sw_if_index=self.pg1.sw_if_index, is_add=1
1305         )
1306         self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
1307
1308         real_ip = self.pg0.remote_ip4
1309         alias_ip = self.nat_addr
1310         flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1311         self.vapi.nat44_ei_add_del_static_mapping(
1312             is_add=1,
1313             local_ip_address=real_ip,
1314             external_ip_address=alias_ip,
1315             external_sw_if_index=0xFFFFFFFF,
1316             flags=flags,
1317         )
1318
1319         try:
1320             # static mapping match
1321
1322             pkts = self.create_stream_out(self.pg1)
1323             self.pg1.add_stream(pkts)
1324             self.pg_enable_capture(self.pg_interfaces)
1325             self.pg_start()
1326             capture = self.pg0.get_capture(len(pkts))
1327             self.verify_capture_in(capture, self.pg0)
1328
1329             pkts = self.create_stream_in(self.pg0, self.pg1)
1330             self.pg0.add_stream(pkts)
1331             self.pg_enable_capture(self.pg_interfaces)
1332             self.pg_start()
1333             capture = self.pg1.get_capture(len(pkts))
1334             self.verify_capture_out(capture, same_port=True)
1335
1336             # no static mapping match
1337
1338             host0 = self.pg0.remote_hosts[0]
1339             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1340             try:
1341                 pkts = self.create_stream_out(
1342                     self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1343                 )
1344                 self.pg1.add_stream(pkts)
1345                 self.pg_enable_capture(self.pg_interfaces)
1346                 self.pg_start()
1347                 capture = self.pg0.get_capture(len(pkts))
1348                 self.verify_capture_in(capture, self.pg0)
1349
1350                 pkts = self.create_stream_in(self.pg0, self.pg1)
1351                 self.pg0.add_stream(pkts)
1352                 self.pg_enable_capture(self.pg_interfaces)
1353                 self.pg_start()
1354                 capture = self.pg1.get_capture(len(pkts))
1355                 self.verify_capture_out(
1356                     capture, nat_ip=self.pg0.remote_ip4, same_port=True
1357                 )
1358             finally:
1359                 self.pg0.remote_hosts[0] = host0
1360
1361         finally:
1362             self.vapi.nat44_ei_forwarding_enable_disable(enable=0)
1363             flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1364             self.vapi.nat44_ei_add_del_static_mapping(
1365                 is_add=0,
1366                 local_ip_address=real_ip,
1367                 external_ip_address=alias_ip,
1368                 external_sw_if_index=0xFFFFFFFF,
1369                 flags=flags,
1370             )
1371
1372     def test_static_in(self):
1373         """NAT44EI 1:1 NAT initialized from inside network"""
1374
1375         nat_ip = "10.0.0.10"
1376         self.tcp_port_out = 6303
1377         self.udp_port_out = 6304
1378         self.icmp_id_out = 6305
1379
1380         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1381         flags = self.config_flags.NAT44_EI_IF_INSIDE
1382         self.vapi.nat44_ei_interface_add_del_feature(
1383             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1384         )
1385         self.vapi.nat44_ei_interface_add_del_feature(
1386             sw_if_index=self.pg1.sw_if_index, is_add=1
1387         )
1388         sm = self.vapi.nat44_ei_static_mapping_dump()
1389         self.assertEqual(len(sm), 1)
1390         self.assertEqual(sm[0].tag, "")
1391         self.assertEqual(sm[0].protocol, 0)
1392         self.assertEqual(sm[0].local_port, 0)
1393         self.assertEqual(sm[0].external_port, 0)
1394
1395         # in2out
1396         pkts = self.create_stream_in(self.pg0, self.pg1)
1397         self.pg0.add_stream(pkts)
1398         self.pg_enable_capture(self.pg_interfaces)
1399         self.pg_start()
1400         capture = self.pg1.get_capture(len(pkts))
1401         self.verify_capture_out(capture, nat_ip, True)
1402
1403         # out2in
1404         pkts = self.create_stream_out(self.pg1, nat_ip)
1405         self.pg1.add_stream(pkts)
1406         self.pg_enable_capture(self.pg_interfaces)
1407         self.pg_start()
1408         capture = self.pg0.get_capture(len(pkts))
1409         self.verify_capture_in(capture, self.pg0)
1410
1411     def test_static_out(self):
1412         """NAT44EI 1:1 NAT initialized from outside network"""
1413
1414         nat_ip = "10.0.0.20"
1415         self.tcp_port_out = 6303
1416         self.udp_port_out = 6304
1417         self.icmp_id_out = 6305
1418         tag = "testTAG"
1419
1420         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1421         flags = self.config_flags.NAT44_EI_IF_INSIDE
1422         self.vapi.nat44_ei_interface_add_del_feature(
1423             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1424         )
1425         self.vapi.nat44_ei_interface_add_del_feature(
1426             sw_if_index=self.pg1.sw_if_index, is_add=1
1427         )
1428         sm = self.vapi.nat44_ei_static_mapping_dump()
1429         self.assertEqual(len(sm), 1)
1430         self.assertEqual(sm[0].tag, tag)
1431
1432         # out2in
1433         pkts = self.create_stream_out(self.pg1, nat_ip)
1434         self.pg1.add_stream(pkts)
1435         self.pg_enable_capture(self.pg_interfaces)
1436         self.pg_start()
1437         capture = self.pg0.get_capture(len(pkts))
1438         self.verify_capture_in(capture, self.pg0)
1439
1440         # in2out
1441         pkts = self.create_stream_in(self.pg0, self.pg1)
1442         self.pg0.add_stream(pkts)
1443         self.pg_enable_capture(self.pg_interfaces)
1444         self.pg_start()
1445         capture = self.pg1.get_capture(len(pkts))
1446         self.verify_capture_out(capture, nat_ip, True)
1447
1448     def test_static_with_port_in(self):
1449         """NAT44EI 1:1 NAPT initialized from inside network"""
1450
1451         self.tcp_port_out = 3606
1452         self.udp_port_out = 3607
1453         self.icmp_id_out = 3608
1454
1455         self.nat44_add_address(self.nat_addr)
1456         self.nat44_add_static_mapping(
1457             self.pg0.remote_ip4,
1458             self.nat_addr,
1459             self.tcp_port_in,
1460             self.tcp_port_out,
1461             proto=IP_PROTOS.tcp,
1462         )
1463         self.nat44_add_static_mapping(
1464             self.pg0.remote_ip4,
1465             self.nat_addr,
1466             self.udp_port_in,
1467             self.udp_port_out,
1468             proto=IP_PROTOS.udp,
1469         )
1470         self.nat44_add_static_mapping(
1471             self.pg0.remote_ip4,
1472             self.nat_addr,
1473             self.icmp_id_in,
1474             self.icmp_id_out,
1475             proto=IP_PROTOS.icmp,
1476         )
1477         flags = self.config_flags.NAT44_EI_IF_INSIDE
1478         self.vapi.nat44_ei_interface_add_del_feature(
1479             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1480         )
1481         self.vapi.nat44_ei_interface_add_del_feature(
1482             sw_if_index=self.pg1.sw_if_index, is_add=1
1483         )
1484
1485         # in2out
1486         pkts = self.create_stream_in(self.pg0, self.pg1)
1487         self.pg0.add_stream(pkts)
1488         self.pg_enable_capture(self.pg_interfaces)
1489         self.pg_start()
1490         capture = self.pg1.get_capture(len(pkts))
1491         self.verify_capture_out(capture)
1492
1493         # out2in
1494         pkts = self.create_stream_out(self.pg1)
1495         self.pg1.add_stream(pkts)
1496         self.pg_enable_capture(self.pg_interfaces)
1497         self.pg_start()
1498         capture = self.pg0.get_capture(len(pkts))
1499         self.verify_capture_in(capture, self.pg0)
1500
1501     def test_static_with_port_out(self):
1502         """NAT44EI 1:1 NAPT initialized from outside network"""
1503
1504         self.tcp_port_out = 30606
1505         self.udp_port_out = 30607
1506         self.icmp_id_out = 30608
1507
1508         self.nat44_add_address(self.nat_addr)
1509         self.nat44_add_static_mapping(
1510             self.pg0.remote_ip4,
1511             self.nat_addr,
1512             self.tcp_port_in,
1513             self.tcp_port_out,
1514             proto=IP_PROTOS.tcp,
1515         )
1516         self.nat44_add_static_mapping(
1517             self.pg0.remote_ip4,
1518             self.nat_addr,
1519             self.udp_port_in,
1520             self.udp_port_out,
1521             proto=IP_PROTOS.udp,
1522         )
1523         self.nat44_add_static_mapping(
1524             self.pg0.remote_ip4,
1525             self.nat_addr,
1526             self.icmp_id_in,
1527             self.icmp_id_out,
1528             proto=IP_PROTOS.icmp,
1529         )
1530         flags = self.config_flags.NAT44_EI_IF_INSIDE
1531         self.vapi.nat44_ei_interface_add_del_feature(
1532             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1533         )
1534         self.vapi.nat44_ei_interface_add_del_feature(
1535             sw_if_index=self.pg1.sw_if_index, is_add=1
1536         )
1537
1538         # out2in
1539         pkts = self.create_stream_out(self.pg1)
1540         self.pg1.add_stream(pkts)
1541         self.pg_enable_capture(self.pg_interfaces)
1542         self.pg_start()
1543         capture = self.pg0.get_capture(len(pkts))
1544         self.verify_capture_in(capture, self.pg0)
1545
1546         # in2out
1547         pkts = self.create_stream_in(self.pg0, self.pg1)
1548         self.pg0.add_stream(pkts)
1549         self.pg_enable_capture(self.pg_interfaces)
1550         self.pg_start()
1551         capture = self.pg1.get_capture(len(pkts))
1552         self.verify_capture_out(capture)
1553
1554     def test_static_vrf_aware(self):
1555         """NAT44EI 1:1 NAT VRF awareness"""
1556
1557         nat_ip1 = "10.0.0.30"
1558         nat_ip2 = "10.0.0.40"
1559         self.tcp_port_out = 6303
1560         self.udp_port_out = 6304
1561         self.icmp_id_out = 6305
1562
1563         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1, vrf_id=10)
1564         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2, vrf_id=10)
1565         flags = self.config_flags.NAT44_EI_IF_INSIDE
1566         self.vapi.nat44_ei_interface_add_del_feature(
1567             sw_if_index=self.pg3.sw_if_index, is_add=1
1568         )
1569         self.vapi.nat44_ei_interface_add_del_feature(
1570             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1571         )
1572         self.vapi.nat44_ei_interface_add_del_feature(
1573             sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
1574         )
1575
1576         # inside interface VRF match NAT44EI static mapping VRF
1577         pkts = self.create_stream_in(self.pg4, self.pg3)
1578         self.pg4.add_stream(pkts)
1579         self.pg_enable_capture(self.pg_interfaces)
1580         self.pg_start()
1581         capture = self.pg3.get_capture(len(pkts))
1582         self.verify_capture_out(capture, nat_ip1, True)
1583
1584         # inside interface VRF don't match NAT44EI static mapping VRF (packets
1585         # are dropped)
1586         pkts = self.create_stream_in(self.pg0, self.pg3)
1587         self.pg0.add_stream(pkts)
1588         self.pg_enable_capture(self.pg_interfaces)
1589         self.pg_start()
1590         self.pg3.assert_nothing_captured()
1591
1592     def test_dynamic_to_static(self):
1593         """NAT44EI Switch from dynamic translation to 1:1NAT"""
1594         nat_ip = "10.0.0.10"
1595         self.tcp_port_out = 6303
1596         self.udp_port_out = 6304
1597         self.icmp_id_out = 6305
1598
1599         self.nat44_add_address(self.nat_addr)
1600         flags = self.config_flags.NAT44_EI_IF_INSIDE
1601         self.vapi.nat44_ei_interface_add_del_feature(
1602             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1603         )
1604         self.vapi.nat44_ei_interface_add_del_feature(
1605             sw_if_index=self.pg1.sw_if_index, is_add=1
1606         )
1607
1608         # dynamic
1609         pkts = self.create_stream_in(self.pg0, self.pg1)
1610         self.pg0.add_stream(pkts)
1611         self.pg_enable_capture(self.pg_interfaces)
1612         self.pg_start()
1613         capture = self.pg1.get_capture(len(pkts))
1614         self.verify_capture_out(capture)
1615
1616         # 1:1NAT
1617         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1618         sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
1619         self.assertEqual(len(sessions), 0)
1620         pkts = self.create_stream_in(self.pg0, self.pg1)
1621         self.pg0.add_stream(pkts)
1622         self.pg_enable_capture(self.pg_interfaces)
1623         self.pg_start()
1624         capture = self.pg1.get_capture(len(pkts))
1625         self.verify_capture_out(capture, nat_ip, True)
1626
1627     def test_identity_nat(self):
1628         """NAT44EI Identity NAT"""
1629         flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1630         self.vapi.nat44_ei_add_del_identity_mapping(
1631             ip_address=self.pg0.remote_ip4,
1632             sw_if_index=0xFFFFFFFF,
1633             flags=flags,
1634             is_add=1,
1635         )
1636         flags = self.config_flags.NAT44_EI_IF_INSIDE
1637         self.vapi.nat44_ei_interface_add_del_feature(
1638             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1639         )
1640         self.vapi.nat44_ei_interface_add_del_feature(
1641             sw_if_index=self.pg1.sw_if_index, is_add=1
1642         )
1643
1644         p = (
1645             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1646             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
1647             / TCP(sport=12345, dport=56789)
1648         )
1649         self.pg1.add_stream(p)
1650         self.pg_enable_capture(self.pg_interfaces)
1651         self.pg_start()
1652         capture = self.pg0.get_capture(1)
1653         p = capture[0]
1654         try:
1655             ip = p[IP]
1656             tcp = p[TCP]
1657             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1658             self.assertEqual(ip.src, self.pg1.remote_ip4)
1659             self.assertEqual(tcp.dport, 56789)
1660             self.assertEqual(tcp.sport, 12345)
1661             self.assert_packet_checksums_valid(p)
1662         except:
1663             self.logger.error(ppp("Unexpected or invalid packet:", p))
1664             raise
1665
1666         sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
1667         self.assertEqual(len(sessions), 0)
1668         flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1669         self.vapi.nat44_ei_add_del_identity_mapping(
1670             ip_address=self.pg0.remote_ip4,
1671             sw_if_index=0xFFFFFFFF,
1672             flags=flags,
1673             vrf_id=1,
1674             is_add=1,
1675         )
1676         identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
1677         self.assertEqual(len(identity_mappings), 2)
1678
1679     def test_multiple_inside_interfaces(self):
1680         """NAT44EI multiple non-overlapping address space inside interfaces"""
1681
1682         self.nat44_add_address(self.nat_addr)
1683         flags = self.config_flags.NAT44_EI_IF_INSIDE
1684         self.vapi.nat44_ei_interface_add_del_feature(
1685             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1686         )
1687         self.vapi.nat44_ei_interface_add_del_feature(
1688             sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
1689         )
1690         self.vapi.nat44_ei_interface_add_del_feature(
1691             sw_if_index=self.pg3.sw_if_index, is_add=1
1692         )
1693
1694         # between two NAT44EI inside interfaces (no translation)
1695         pkts = self.create_stream_in(self.pg0, self.pg1)
1696         self.pg0.add_stream(pkts)
1697         self.pg_enable_capture(self.pg_interfaces)
1698         self.pg_start()
1699         capture = self.pg1.get_capture(len(pkts))
1700         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1701
1702         # from inside to interface without translation
1703         pkts = self.create_stream_in(self.pg0, self.pg2)
1704         self.pg0.add_stream(pkts)
1705         self.pg_enable_capture(self.pg_interfaces)
1706         self.pg_start()
1707         capture = self.pg2.get_capture(len(pkts))
1708         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1709
1710         # in2out 1st interface
1711         pkts = self.create_stream_in(self.pg0, self.pg3)
1712         self.pg0.add_stream(pkts)
1713         self.pg_enable_capture(self.pg_interfaces)
1714         self.pg_start()
1715         capture = self.pg3.get_capture(len(pkts))
1716         self.verify_capture_out(capture)
1717
1718         # out2in 1st interface
1719         pkts = self.create_stream_out(self.pg3)
1720         self.pg3.add_stream(pkts)
1721         self.pg_enable_capture(self.pg_interfaces)
1722         self.pg_start()
1723         capture = self.pg0.get_capture(len(pkts))
1724         self.verify_capture_in(capture, self.pg0)
1725
1726         # in2out 2nd interface
1727         pkts = self.create_stream_in(self.pg1, self.pg3)
1728         self.pg1.add_stream(pkts)
1729         self.pg_enable_capture(self.pg_interfaces)
1730         self.pg_start()
1731         capture = self.pg3.get_capture(len(pkts))
1732         self.verify_capture_out(capture)
1733
1734         # out2in 2nd interface
1735         pkts = self.create_stream_out(self.pg3)
1736         self.pg3.add_stream(pkts)
1737         self.pg_enable_capture(self.pg_interfaces)
1738         self.pg_start()
1739         capture = self.pg1.get_capture(len(pkts))
1740         self.verify_capture_in(capture, self.pg1)
1741
1742     def test_inside_overlapping_interfaces(self):
1743         """NAT44EI multiple inside interfaces with overlapping address space"""
1744
1745         static_nat_ip = "10.0.0.10"
1746         self.nat44_add_address(self.nat_addr)
1747         flags = self.config_flags.NAT44_EI_IF_INSIDE
1748         self.vapi.nat44_ei_interface_add_del_feature(
1749             sw_if_index=self.pg3.sw_if_index, is_add=1
1750         )
1751         self.vapi.nat44_ei_interface_add_del_feature(
1752             sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
1753         )
1754         self.vapi.nat44_ei_interface_add_del_feature(
1755             sw_if_index=self.pg5.sw_if_index, flags=flags, is_add=1
1756         )
1757         self.vapi.nat44_ei_interface_add_del_feature(
1758             sw_if_index=self.pg6.sw_if_index, flags=flags, is_add=1
1759         )
1760         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip, vrf_id=20)
1761
1762         # between NAT44EI inside interfaces with same VRF (no translation)
1763         pkts = self.create_stream_in(self.pg4, self.pg5)
1764         self.pg4.add_stream(pkts)
1765         self.pg_enable_capture(self.pg_interfaces)
1766         self.pg_start()
1767         capture = self.pg5.get_capture(len(pkts))
1768         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1769
1770         # between NAT44EI inside interfaces with different VRF (hairpinning)
1771         p = (
1772             Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
1773             / IP(src=self.pg4.remote_ip4, dst=static_nat_ip)
1774             / TCP(sport=1234, dport=5678)
1775         )
1776         self.pg4.add_stream(p)
1777         self.pg_enable_capture(self.pg_interfaces)
1778         self.pg_start()
1779         capture = self.pg6.get_capture(1)
1780         p = capture[0]
1781         try:
1782             ip = p[IP]
1783             tcp = p[TCP]
1784             self.assertEqual(ip.src, self.nat_addr)
1785             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1786             self.assertNotEqual(tcp.sport, 1234)
1787             self.assertEqual(tcp.dport, 5678)
1788         except:
1789             self.logger.error(ppp("Unexpected or invalid packet:", p))
1790             raise
1791
1792         # in2out 1st interface
1793         pkts = self.create_stream_in(self.pg4, self.pg3)
1794         self.pg4.add_stream(pkts)
1795         self.pg_enable_capture(self.pg_interfaces)
1796         self.pg_start()
1797         capture = self.pg3.get_capture(len(pkts))
1798         self.verify_capture_out(capture)
1799
1800         # out2in 1st interface
1801         pkts = self.create_stream_out(self.pg3)
1802         self.pg3.add_stream(pkts)
1803         self.pg_enable_capture(self.pg_interfaces)
1804         self.pg_start()
1805         capture = self.pg4.get_capture(len(pkts))
1806         self.verify_capture_in(capture, self.pg4)
1807
1808         # in2out 2nd interface
1809         pkts = self.create_stream_in(self.pg5, self.pg3)
1810         self.pg5.add_stream(pkts)
1811         self.pg_enable_capture(self.pg_interfaces)
1812         self.pg_start()
1813         capture = self.pg3.get_capture(len(pkts))
1814         self.verify_capture_out(capture)
1815
1816         # out2in 2nd interface
1817         pkts = self.create_stream_out(self.pg3)
1818         self.pg3.add_stream(pkts)
1819         self.pg_enable_capture(self.pg_interfaces)
1820         self.pg_start()
1821         capture = self.pg5.get_capture(len(pkts))
1822         self.verify_capture_in(capture, self.pg5)
1823
1824         # pg5 session dump
1825         addresses = self.vapi.nat44_ei_address_dump()
1826         self.assertEqual(len(addresses), 1)
1827         sessions = self.vapi.nat44_ei_user_session_dump(self.pg5.remote_ip4, 10)
1828         self.assertEqual(len(sessions), 3)
1829         for session in sessions:
1830             self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1831             self.assertEqual(str(session.inside_ip_address), self.pg5.remote_ip4)
1832             self.assertEqual(session.outside_ip_address, addresses[0].ip_address)
1833         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1834         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1835         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1836         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1837         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1838         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1839         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1840         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1841         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1842
1843         # in2out 3rd interface
1844         pkts = self.create_stream_in(self.pg6, self.pg3)
1845         self.pg6.add_stream(pkts)
1846         self.pg_enable_capture(self.pg_interfaces)
1847         self.pg_start()
1848         capture = self.pg3.get_capture(len(pkts))
1849         self.verify_capture_out(capture, static_nat_ip, True)
1850
1851         # out2in 3rd interface
1852         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1853         self.pg3.add_stream(pkts)
1854         self.pg_enable_capture(self.pg_interfaces)
1855         self.pg_start()
1856         capture = self.pg6.get_capture(len(pkts))
1857         self.verify_capture_in(capture, self.pg6)
1858
1859         # general user and session dump verifications
1860         users = self.vapi.nat44_ei_user_dump()
1861         self.assertGreaterEqual(len(users), 3)
1862         addresses = self.vapi.nat44_ei_address_dump()
1863         self.assertEqual(len(addresses), 1)
1864         for user in users:
1865             sessions = self.vapi.nat44_ei_user_session_dump(
1866                 user.ip_address, user.vrf_id
1867             )
1868             for session in sessions:
1869                 self.assertEqual(user.ip_address, session.inside_ip_address)
1870                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1871                 self.assertTrue(
1872                     session.protocol in [IP_PROTOS.tcp, IP_PROTOS.udp, IP_PROTOS.icmp]
1873                 )
1874
1875         # pg4 session dump
1876         sessions = self.vapi.nat44_ei_user_session_dump(self.pg4.remote_ip4, 10)
1877         self.assertGreaterEqual(len(sessions), 4)
1878         for session in sessions:
1879             self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1880             self.assertEqual(str(session.inside_ip_address), self.pg4.remote_ip4)
1881             self.assertEqual(session.outside_ip_address, addresses[0].ip_address)
1882
1883         # pg6 session dump
1884         sessions = self.vapi.nat44_ei_user_session_dump(self.pg6.remote_ip4, 20)
1885         self.assertGreaterEqual(len(sessions), 3)
1886         for session in sessions:
1887             self.assertTrue(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1888             self.assertEqual(str(session.inside_ip_address), self.pg6.remote_ip4)
1889             self.assertEqual(str(session.outside_ip_address), static_nat_ip)
1890             self.assertTrue(
1891                 session.inside_port
1892                 in [self.tcp_port_in, self.udp_port_in, self.icmp_id_in]
1893             )
1894
1895     def test_hairpinning(self):
1896         """NAT44EI hairpinning - 1:1 NAPT"""
1897
1898         host = self.pg0.remote_hosts[0]
1899         server = self.pg0.remote_hosts[1]
1900         host_in_port = 1234
1901         host_out_port = 0
1902         server_in_port = 5678
1903         server_out_port = 8765
1904
1905         self.nat44_add_address(self.nat_addr)
1906         flags = self.config_flags.NAT44_EI_IF_INSIDE
1907         self.vapi.nat44_ei_interface_add_del_feature(
1908             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1909         )
1910         self.vapi.nat44_ei_interface_add_del_feature(
1911             sw_if_index=self.pg1.sw_if_index, is_add=1
1912         )
1913
1914         # add static mapping for server
1915         self.nat44_add_static_mapping(
1916             server.ip4,
1917             self.nat_addr,
1918             server_in_port,
1919             server_out_port,
1920             proto=IP_PROTOS.tcp,
1921         )
1922
1923         cnt = self.statistics["/nat44-ei/hairpinning"]
1924         # send packet from host to server
1925         p = (
1926             Ether(src=host.mac, dst=self.pg0.local_mac)
1927             / IP(src=host.ip4, dst=self.nat_addr)
1928             / TCP(sport=host_in_port, dport=server_out_port)
1929         )
1930         self.pg0.add_stream(p)
1931         self.pg_enable_capture(self.pg_interfaces)
1932         self.pg_start()
1933         capture = self.pg0.get_capture(1)
1934         p = capture[0]
1935         try:
1936             ip = p[IP]
1937             tcp = p[TCP]
1938             self.assertEqual(ip.src, self.nat_addr)
1939             self.assertEqual(ip.dst, server.ip4)
1940             self.assertNotEqual(tcp.sport, host_in_port)
1941             self.assertEqual(tcp.dport, server_in_port)
1942             self.assert_packet_checksums_valid(p)
1943             host_out_port = tcp.sport
1944         except:
1945             self.logger.error(ppp("Unexpected or invalid packet:", p))
1946             raise
1947
1948         after = self.statistics["/nat44-ei/hairpinning"]
1949         if_idx = self.pg0.sw_if_index
1950         self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
1951
1952         # send reply from server to host
1953         p = (
1954             Ether(src=server.mac, dst=self.pg0.local_mac)
1955             / IP(src=server.ip4, dst=self.nat_addr)
1956             / TCP(sport=server_in_port, dport=host_out_port)
1957         )
1958         self.pg0.add_stream(p)
1959         self.pg_enable_capture(self.pg_interfaces)
1960         self.pg_start()
1961         capture = self.pg0.get_capture(1)
1962         p = capture[0]
1963         try:
1964             ip = p[IP]
1965             tcp = p[TCP]
1966             self.assertEqual(ip.src, self.nat_addr)
1967             self.assertEqual(ip.dst, host.ip4)
1968             self.assertEqual(tcp.sport, server_out_port)
1969             self.assertEqual(tcp.dport, host_in_port)
1970             self.assert_packet_checksums_valid(p)
1971         except:
1972             self.logger.error(ppp("Unexpected or invalid packet:", p))
1973             raise
1974
1975         after = self.statistics["/nat44-ei/hairpinning"]
1976         if_idx = self.pg0.sw_if_index
1977         self.assertEqual(
1978             after[:, if_idx].sum() - cnt[:, if_idx].sum(),
1979             2 + (1 if self.vpp_worker_count > 0 else 0),
1980         )
1981
1982     def test_hairpinning2(self):
1983         """NAT44EI hairpinning - 1:1 NAT"""
1984
1985         server1_nat_ip = "10.0.0.10"
1986         server2_nat_ip = "10.0.0.11"
1987         host = self.pg0.remote_hosts[0]
1988         server1 = self.pg0.remote_hosts[1]
1989         server2 = self.pg0.remote_hosts[2]
1990         server_tcp_port = 22
1991         server_udp_port = 20
1992
1993         self.nat44_add_address(self.nat_addr)
1994         flags = self.config_flags.NAT44_EI_IF_INSIDE
1995         self.vapi.nat44_ei_interface_add_del_feature(
1996             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1997         )
1998         self.vapi.nat44_ei_interface_add_del_feature(
1999             sw_if_index=self.pg1.sw_if_index, is_add=1
2000         )
2001
2002         # add static mapping for servers
2003         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2004         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2005
2006         # host to server1
2007         pkts = []
2008         p = (
2009             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2010             / IP(src=host.ip4, dst=server1_nat_ip)
2011             / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
2012         )
2013         pkts.append(p)
2014         p = (
2015             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2016             / IP(src=host.ip4, dst=server1_nat_ip)
2017             / UDP(sport=self.udp_port_in, dport=server_udp_port)
2018         )
2019         pkts.append(p)
2020         p = (
2021             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2022             / IP(src=host.ip4, dst=server1_nat_ip)
2023             / ICMP(id=self.icmp_id_in, type="echo-request")
2024         )
2025         pkts.append(p)
2026         self.pg0.add_stream(pkts)
2027         self.pg_enable_capture(self.pg_interfaces)
2028         self.pg_start()
2029         capture = self.pg0.get_capture(len(pkts))
2030         for packet in capture:
2031             try:
2032                 self.assertEqual(packet[IP].src, self.nat_addr)
2033                 self.assertEqual(packet[IP].dst, server1.ip4)
2034                 if packet.haslayer(TCP):
2035                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2036                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2037                     self.tcp_port_out = packet[TCP].sport
2038                     self.assert_packet_checksums_valid(packet)
2039                 elif packet.haslayer(UDP):
2040                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2041                     self.assertEqual(packet[UDP].dport, server_udp_port)
2042                     self.udp_port_out = packet[UDP].sport
2043                 else:
2044                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2045                     self.icmp_id_out = packet[ICMP].id
2046             except:
2047                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2048                 raise
2049
2050         # server1 to host
2051         pkts = []
2052         p = (
2053             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2054             / IP(src=server1.ip4, dst=self.nat_addr)
2055             / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
2056         )
2057         pkts.append(p)
2058         p = (
2059             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2060             / IP(src=server1.ip4, dst=self.nat_addr)
2061             / UDP(sport=server_udp_port, dport=self.udp_port_out)
2062         )
2063         pkts.append(p)
2064         p = (
2065             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2066             / IP(src=server1.ip4, dst=self.nat_addr)
2067             / ICMP(id=self.icmp_id_out, type="echo-reply")
2068         )
2069         pkts.append(p)
2070         self.pg0.add_stream(pkts)
2071         self.pg_enable_capture(self.pg_interfaces)
2072         self.pg_start()
2073         capture = self.pg0.get_capture(len(pkts))
2074         for packet in capture:
2075             try:
2076                 self.assertEqual(packet[IP].src, server1_nat_ip)
2077                 self.assertEqual(packet[IP].dst, host.ip4)
2078                 if packet.haslayer(TCP):
2079                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2080                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2081                     self.assert_packet_checksums_valid(packet)
2082                 elif packet.haslayer(UDP):
2083                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2084                     self.assertEqual(packet[UDP].sport, server_udp_port)
2085                 else:
2086                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2087             except:
2088                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2089                 raise
2090
2091         # server2 to server1
2092         pkts = []
2093         p = (
2094             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2095             / IP(src=server2.ip4, dst=server1_nat_ip)
2096             / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
2097         )
2098         pkts.append(p)
2099         p = (
2100             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2101             / IP(src=server2.ip4, dst=server1_nat_ip)
2102             / UDP(sport=self.udp_port_in, dport=server_udp_port)
2103         )
2104         pkts.append(p)
2105         p = (
2106             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2107             / IP(src=server2.ip4, dst=server1_nat_ip)
2108             / ICMP(id=self.icmp_id_in, type="echo-request")
2109         )
2110         pkts.append(p)
2111         self.pg0.add_stream(pkts)
2112         self.pg_enable_capture(self.pg_interfaces)
2113         self.pg_start()
2114         capture = self.pg0.get_capture(len(pkts))
2115         for packet in capture:
2116             try:
2117                 self.assertEqual(packet[IP].src, server2_nat_ip)
2118                 self.assertEqual(packet[IP].dst, server1.ip4)
2119                 if packet.haslayer(TCP):
2120                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2121                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2122                     self.tcp_port_out = packet[TCP].sport
2123                     self.assert_packet_checksums_valid(packet)
2124                 elif packet.haslayer(UDP):
2125                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
2126                     self.assertEqual(packet[UDP].dport, server_udp_port)
2127                     self.udp_port_out = packet[UDP].sport
2128                 else:
2129                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2130                     self.icmp_id_out = packet[ICMP].id
2131             except:
2132                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2133                 raise
2134
2135         # server1 to server2
2136         pkts = []
2137         p = (
2138             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2139             / IP(src=server1.ip4, dst=server2_nat_ip)
2140             / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
2141         )
2142         pkts.append(p)
2143         p = (
2144             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2145             / IP(src=server1.ip4, dst=server2_nat_ip)
2146             / UDP(sport=server_udp_port, dport=self.udp_port_out)
2147         )
2148         pkts.append(p)
2149         p = (
2150             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2151             / IP(src=server1.ip4, dst=server2_nat_ip)
2152             / ICMP(id=self.icmp_id_out, type="echo-reply")
2153         )
2154         pkts.append(p)
2155         self.pg0.add_stream(pkts)
2156         self.pg_enable_capture(self.pg_interfaces)
2157         self.pg_start()
2158         capture = self.pg0.get_capture(len(pkts))
2159         for packet in capture:
2160             try:
2161                 self.assertEqual(packet[IP].src, server1_nat_ip)
2162                 self.assertEqual(packet[IP].dst, server2.ip4)
2163                 if packet.haslayer(TCP):
2164                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2165                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2166                     self.assert_packet_checksums_valid(packet)
2167                 elif packet.haslayer(UDP):
2168                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2169                     self.assertEqual(packet[UDP].sport, server_udp_port)
2170                 else:
2171                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2172             except:
2173                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2174                 raise
2175
2176     def test_hairpinning_avoid_inf_loop(self):
2177         """NAT44EI hairpinning - 1:1 NAPT avoid infinite loop"""
2178
2179         host = self.pg0.remote_hosts[0]
2180         server = self.pg0.remote_hosts[1]
2181         host_in_port = 1234
2182         host_out_port = 0
2183         server_in_port = 5678
2184         server_out_port = 8765
2185
2186         self.nat44_add_address(self.nat_addr)
2187         flags = self.config_flags.NAT44_EI_IF_INSIDE
2188         self.vapi.nat44_ei_interface_add_del_feature(
2189             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2190         )
2191         self.vapi.nat44_ei_interface_add_del_feature(
2192             sw_if_index=self.pg1.sw_if_index, is_add=1
2193         )
2194
2195         # add static mapping for server
2196         self.nat44_add_static_mapping(
2197             server.ip4,
2198             self.nat_addr,
2199             server_in_port,
2200             server_out_port,
2201             proto=IP_PROTOS.tcp,
2202         )
2203
2204         # add another static mapping that maps pg0.local_ip4 address to itself
2205         self.nat44_add_static_mapping(self.pg0.local_ip4, self.pg0.local_ip4)
2206
2207         # send packet from host to VPP (the packet should get dropped)
2208         p = (
2209             Ether(src=host.mac, dst=self.pg0.local_mac)
2210             / IP(src=host.ip4, dst=self.pg0.local_ip4)
2211             / TCP(sport=host_in_port, dport=server_out_port)
2212         )
2213         self.pg0.add_stream(p)
2214         self.pg_enable_capture(self.pg_interfaces)
2215         self.pg_start()
2216         # Here VPP used to crash due to an infinite loop
2217
2218         cnt = self.statistics["/nat44-ei/hairpinning"]
2219         # send packet from host to server
2220         p = (
2221             Ether(src=host.mac, dst=self.pg0.local_mac)
2222             / IP(src=host.ip4, dst=self.nat_addr)
2223             / TCP(sport=host_in_port, dport=server_out_port)
2224         )
2225         self.pg0.add_stream(p)
2226         self.pg_enable_capture(self.pg_interfaces)
2227         self.pg_start()
2228         capture = self.pg0.get_capture(1)
2229         p = capture[0]
2230         try:
2231             ip = p[IP]
2232             tcp = p[TCP]
2233             self.assertEqual(ip.src, self.nat_addr)
2234             self.assertEqual(ip.dst, server.ip4)
2235             self.assertNotEqual(tcp.sport, host_in_port)
2236             self.assertEqual(tcp.dport, server_in_port)
2237             self.assert_packet_checksums_valid(p)
2238             host_out_port = tcp.sport
2239         except:
2240             self.logger.error(ppp("Unexpected or invalid packet:", p))
2241             raise
2242
2243         after = self.statistics["/nat44-ei/hairpinning"]
2244         if_idx = self.pg0.sw_if_index
2245         self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
2246
2247         # send reply from server to host
2248         p = (
2249             Ether(src=server.mac, dst=self.pg0.local_mac)
2250             / IP(src=server.ip4, dst=self.nat_addr)
2251             / TCP(sport=server_in_port, dport=host_out_port)
2252         )
2253         self.pg0.add_stream(p)
2254         self.pg_enable_capture(self.pg_interfaces)
2255         self.pg_start()
2256         capture = self.pg0.get_capture(1)
2257         p = capture[0]
2258         try:
2259             ip = p[IP]
2260             tcp = p[TCP]
2261             self.assertEqual(ip.src, self.nat_addr)
2262             self.assertEqual(ip.dst, host.ip4)
2263             self.assertEqual(tcp.sport, server_out_port)
2264             self.assertEqual(tcp.dport, host_in_port)
2265             self.assert_packet_checksums_valid(p)
2266         except:
2267             self.logger.error(ppp("Unexpected or invalid packet:", p))
2268             raise
2269
2270         after = self.statistics["/nat44-ei/hairpinning"]
2271         if_idx = self.pg0.sw_if_index
2272         self.assertEqual(
2273             after[:, if_idx].sum() - cnt[:, if_idx].sum(),
2274             2 + (1 if self.vpp_worker_count > 0 else 0),
2275         )
2276
2277     def test_interface_addr(self):
2278         """NAT44EI acquire addresses from interface"""
2279         self.vapi.nat44_ei_add_del_interface_addr(
2280             is_add=1, sw_if_index=self.pg7.sw_if_index
2281         )
2282
2283         # no address in NAT pool
2284         addresses = self.vapi.nat44_ei_address_dump()
2285         self.assertEqual(0, len(addresses))
2286
2287         # configure interface address and check NAT address pool
2288         self.pg7.config_ip4()
2289         addresses = self.vapi.nat44_ei_address_dump()
2290         self.assertEqual(1, len(addresses))
2291         self.assertEqual(str(addresses[0].ip_address), self.pg7.local_ip4)
2292
2293         # remove interface address and check NAT address pool
2294         self.pg7.unconfig_ip4()
2295         addresses = self.vapi.nat44_ei_address_dump()
2296         self.assertEqual(0, len(addresses))
2297
2298     def test_interface_addr_static_mapping(self):
2299         """NAT44EI Static mapping with addresses from interface"""
2300         tag = "testTAG"
2301
2302         self.vapi.nat44_ei_add_del_interface_addr(
2303             is_add=1, sw_if_index=self.pg7.sw_if_index
2304         )
2305         self.nat44_add_static_mapping(
2306             "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag
2307         )
2308
2309         # static mappings with external interface
2310         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2311         self.assertEqual(1, len(static_mappings))
2312         self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index)
2313         self.assertEqual(static_mappings[0].tag, tag)
2314
2315         # configure interface address and check static mappings
2316         self.pg7.config_ip4()
2317         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2318         self.assertEqual(2, len(static_mappings))
2319         resolved = False
2320         for sm in static_mappings:
2321             if sm.external_sw_if_index == 0xFFFFFFFF:
2322                 self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4)
2323                 self.assertEqual(sm.tag, tag)
2324                 resolved = True
2325         self.assertTrue(resolved)
2326
2327         # remove interface address and check static mappings
2328         self.pg7.unconfig_ip4()
2329         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2330         self.assertEqual(1, len(static_mappings))
2331         self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index)
2332         self.assertEqual(static_mappings[0].tag, tag)
2333
2334         # configure interface address again and check static mappings
2335         self.pg7.config_ip4()
2336         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2337         self.assertEqual(2, len(static_mappings))
2338         resolved = False
2339         for sm in static_mappings:
2340             if sm.external_sw_if_index == 0xFFFFFFFF:
2341                 self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4)
2342                 self.assertEqual(sm.tag, tag)
2343                 resolved = True
2344         self.assertTrue(resolved)
2345
2346         # remove static mapping
2347         self.nat44_add_static_mapping(
2348             "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag, is_add=0
2349         )
2350         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2351         self.assertEqual(0, len(static_mappings))
2352
2353     def test_interface_addr_identity_nat(self):
2354         """NAT44EI Identity NAT with addresses from interface"""
2355
2356         port = 53053
2357         self.vapi.nat44_ei_add_del_interface_addr(
2358             is_add=1, sw_if_index=self.pg7.sw_if_index
2359         )
2360         self.vapi.nat44_ei_add_del_identity_mapping(
2361             ip_address=b"0",
2362             sw_if_index=self.pg7.sw_if_index,
2363             port=port,
2364             protocol=IP_PROTOS.tcp,
2365             is_add=1,
2366         )
2367
2368         # identity mappings with external interface
2369         identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2370         self.assertEqual(1, len(identity_mappings))
2371         self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index)
2372
2373         # configure interface address and check identity mappings
2374         self.pg7.config_ip4()
2375         identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2376         resolved = False
2377         self.assertEqual(2, len(identity_mappings))
2378         for sm in identity_mappings:
2379             if sm.sw_if_index == 0xFFFFFFFF:
2380                 self.assertEqual(
2381                     str(identity_mappings[0].ip_address), self.pg7.local_ip4
2382                 )
2383                 self.assertEqual(port, identity_mappings[0].port)
2384                 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2385                 resolved = True
2386         self.assertTrue(resolved)
2387
2388         # remove interface address and check identity mappings
2389         self.pg7.unconfig_ip4()
2390         identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2391         self.assertEqual(1, len(identity_mappings))
2392         self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index)
2393
2394     def test_ipfix_nat44_sess(self):
2395         """NAT44EI IPFIX logging NAT44EI session created/deleted"""
2396         self.ipfix_domain_id = 10
2397         self.ipfix_src_port = 20202
2398         collector_port = 30303
2399         bind_layers(UDP, IPFIX, dport=30303)
2400         self.nat44_add_address(self.nat_addr)
2401         flags = self.config_flags.NAT44_EI_IF_INSIDE
2402         self.vapi.nat44_ei_interface_add_del_feature(
2403             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2404         )
2405         self.vapi.nat44_ei_interface_add_del_feature(
2406             sw_if_index=self.pg1.sw_if_index, is_add=1
2407         )
2408         self.vapi.set_ipfix_exporter(
2409             collector_address=self.pg3.remote_ip4,
2410             src_address=self.pg3.local_ip4,
2411             path_mtu=512,
2412             template_interval=10,
2413             collector_port=collector_port,
2414         )
2415         self.vapi.nat44_ei_ipfix_enable_disable(
2416             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2417         )
2418
2419         pkts = self.create_stream_in(self.pg0, self.pg1)
2420         self.pg0.add_stream(pkts)
2421         self.pg_enable_capture(self.pg_interfaces)
2422         self.pg_start()
2423         capture = self.pg1.get_capture(len(pkts))
2424         self.verify_capture_out(capture)
2425         self.nat44_add_address(self.nat_addr, is_add=0)
2426         self.vapi.ipfix_flush()
2427         capture = self.pg3.get_capture(7)
2428         ipfix = IPFIXDecoder()
2429         # first load template
2430         for p in capture:
2431             self.assertTrue(p.haslayer(IPFIX))
2432             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2433             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2434             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2435             self.assertEqual(p[UDP].dport, collector_port)
2436             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2437             if p.haslayer(Template):
2438                 ipfix.add_template(p.getlayer(Template))
2439         # verify events in data set
2440         for p in capture:
2441             if p.haslayer(Data):
2442                 data = ipfix.decode_data_set(p.getlayer(Set))
2443                 self.verify_ipfix_nat44_ses(data)
2444
2445     def test_ipfix_addr_exhausted(self):
2446         """NAT44EI IPFIX logging NAT addresses exhausted"""
2447         flags = self.config_flags.NAT44_EI_IF_INSIDE
2448         self.vapi.nat44_ei_interface_add_del_feature(
2449             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2450         )
2451         self.vapi.nat44_ei_interface_add_del_feature(
2452             sw_if_index=self.pg1.sw_if_index, is_add=1
2453         )
2454         self.vapi.set_ipfix_exporter(
2455             collector_address=self.pg3.remote_ip4,
2456             src_address=self.pg3.local_ip4,
2457             path_mtu=512,
2458             template_interval=10,
2459         )
2460         self.vapi.nat44_ei_ipfix_enable_disable(
2461             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2462         )
2463
2464         p = (
2465             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2466             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2467             / TCP(sport=3025)
2468         ) * 3
2469         self.pg0.add_stream(p)
2470         self.pg_enable_capture(self.pg_interfaces)
2471         self.pg_start()
2472         self.pg1.assert_nothing_captured()
2473         self.vapi.ipfix_flush()
2474         capture = self.pg3.get_capture(7)
2475         ipfix = IPFIXDecoder()
2476         # first load template
2477         for p in capture:
2478             self.assertTrue(p.haslayer(IPFIX))
2479             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2480             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2481             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2482             self.assertEqual(p[UDP].dport, 4739)
2483             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2484             if p.haslayer(Template):
2485                 ipfix.add_template(p.getlayer(Template))
2486         # verify events in data set
2487         event_count = 0
2488         for p in capture:
2489             if p.haslayer(Data):
2490                 data = ipfix.decode_data_set(p.getlayer(Set))
2491                 event_count += self.verify_ipfix_addr_exhausted(data)
2492         self.assertEqual(event_count, 1)
2493
2494     def test_ipfix_max_sessions(self):
2495         """NAT44EI IPFIX logging maximum session entries exceeded"""
2496         self.nat44_add_address(self.nat_addr)
2497         flags = self.config_flags.NAT44_EI_IF_INSIDE
2498         self.vapi.nat44_ei_interface_add_del_feature(
2499             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2500         )
2501         self.vapi.nat44_ei_interface_add_del_feature(
2502             sw_if_index=self.pg1.sw_if_index, is_add=1
2503         )
2504
2505         max_sessions_per_thread = self.max_translations
2506         max_sessions = max(1, self.vpp_worker_count) * max_sessions_per_thread
2507
2508         pkts = []
2509         for i in range(0, max_sessions):
2510             src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2511             p = (
2512                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2513                 / IP(src=src, dst=self.pg1.remote_ip4)
2514                 / TCP(sport=1025)
2515             )
2516             pkts.append(p)
2517         self.pg0.add_stream(pkts)
2518         self.pg_enable_capture(self.pg_interfaces)
2519         self.pg_start()
2520
2521         self.pg1.get_capture(max_sessions)
2522         self.vapi.set_ipfix_exporter(
2523             collector_address=self.pg3.remote_ip4,
2524             src_address=self.pg3.local_ip4,
2525             path_mtu=512,
2526             template_interval=10,
2527         )
2528         self.vapi.nat44_ei_ipfix_enable_disable(
2529             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2530         )
2531
2532         p = (
2533             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2534             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2535             / TCP(sport=1025)
2536         ) * 3
2537         self.pg0.add_stream(p)
2538         self.pg_enable_capture(self.pg_interfaces)
2539         self.pg_start()
2540         self.pg1.assert_nothing_captured()
2541         self.vapi.ipfix_flush()
2542         capture = self.pg3.get_capture(7)
2543         ipfix = IPFIXDecoder()
2544         # first load template
2545         for p in capture:
2546             self.assertTrue(p.haslayer(IPFIX))
2547             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2548             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2549             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2550             self.assertEqual(p[UDP].dport, 4739)
2551             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2552             if p.haslayer(Template):
2553                 ipfix.add_template(p.getlayer(Template))
2554         # verify events in data set
2555         event_count = 0
2556         for p in capture:
2557             if p.haslayer(Data):
2558                 data = ipfix.decode_data_set(p.getlayer(Set))
2559                 event_count += self.verify_ipfix_max_sessions(
2560                     data, max_sessions_per_thread
2561                 )
2562         self.assertEqual(event_count, 1)
2563
2564     def test_syslog_apmap(self):
2565         """NAT44EI syslog address and port mapping creation and deletion"""
2566         self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2567         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2568         self.nat44_add_address(self.nat_addr)
2569         flags = self.config_flags.NAT44_EI_IF_INSIDE
2570         self.vapi.nat44_ei_interface_add_del_feature(
2571             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2572         )
2573         self.vapi.nat44_ei_interface_add_del_feature(
2574             sw_if_index=self.pg1.sw_if_index, is_add=1
2575         )
2576
2577         p = (
2578             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2579             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2580             / TCP(sport=self.tcp_port_in, dport=20)
2581         )
2582         self.pg0.add_stream(p)
2583         self.pg_enable_capture(self.pg_interfaces)
2584         self.pg_start()
2585         capture = self.pg1.get_capture(1)
2586         self.tcp_port_out = capture[0][TCP].sport
2587         capture = self.pg3.get_capture(1)
2588         self.verify_syslog_apmap(capture[0][Raw].load)
2589
2590         self.pg_enable_capture(self.pg_interfaces)
2591         self.pg_start()
2592         self.nat44_add_address(self.nat_addr, is_add=0)
2593         capture = self.pg3.get_capture(1)
2594         self.verify_syslog_apmap(capture[0][Raw].load, False)
2595
2596     def test_pool_addr_fib(self):
2597         """NAT44EI add pool addresses to FIB"""
2598         static_addr = "10.0.0.10"
2599         self.nat44_add_address(self.nat_addr)
2600         flags = self.config_flags.NAT44_EI_IF_INSIDE
2601         self.vapi.nat44_ei_interface_add_del_feature(
2602             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2603         )
2604         self.vapi.nat44_ei_interface_add_del_feature(
2605             sw_if_index=self.pg1.sw_if_index, is_add=1
2606         )
2607         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2608
2609         # NAT44EI address
2610         p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2611             op=ARP.who_has,
2612             pdst=self.nat_addr,
2613             psrc=self.pg1.remote_ip4,
2614             hwsrc=self.pg1.remote_mac,
2615         )
2616         self.pg1.add_stream(p)
2617         self.pg_enable_capture(self.pg_interfaces)
2618         self.pg_start()
2619         capture = self.pg1.get_capture(1)
2620         self.assertTrue(capture[0].haslayer(ARP))
2621         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2622
2623         # 1:1 NAT address
2624         p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2625             op=ARP.who_has,
2626             pdst=static_addr,
2627             psrc=self.pg1.remote_ip4,
2628             hwsrc=self.pg1.remote_mac,
2629         )
2630         self.pg1.add_stream(p)
2631         self.pg_enable_capture(self.pg_interfaces)
2632         self.pg_start()
2633         capture = self.pg1.get_capture(1)
2634         self.assertTrue(capture[0].haslayer(ARP))
2635         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2636
2637         # send ARP to non-NAT44EI interface
2638         p = Ether(src=self.pg2.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2639             op=ARP.who_has,
2640             pdst=self.nat_addr,
2641             psrc=self.pg2.remote_ip4,
2642             hwsrc=self.pg2.remote_mac,
2643         )
2644         self.pg2.add_stream(p)
2645         self.pg_enable_capture(self.pg_interfaces)
2646         self.pg_start()
2647         self.pg1.assert_nothing_captured()
2648
2649         # remove addresses and verify
2650         self.nat44_add_address(self.nat_addr, is_add=0)
2651         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr, is_add=0)
2652
2653         p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2654             op=ARP.who_has,
2655             pdst=self.nat_addr,
2656             psrc=self.pg1.remote_ip4,
2657             hwsrc=self.pg1.remote_mac,
2658         )
2659         self.pg1.add_stream(p)
2660         self.pg_enable_capture(self.pg_interfaces)
2661         self.pg_start()
2662         self.pg1.assert_nothing_captured()
2663
2664         p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2665             op=ARP.who_has,
2666             pdst=static_addr,
2667             psrc=self.pg1.remote_ip4,
2668             hwsrc=self.pg1.remote_mac,
2669         )
2670         self.pg1.add_stream(p)
2671         self.pg_enable_capture(self.pg_interfaces)
2672         self.pg_start()
2673         self.pg1.assert_nothing_captured()
2674
2675     def test_vrf_mode(self):
2676         """NAT44EI tenant VRF aware address pool mode"""
2677
2678         vrf_id1 = 1
2679         vrf_id2 = 2
2680         nat_ip1 = "10.0.0.10"
2681         nat_ip2 = "10.0.0.11"
2682
2683         self.pg0.unconfig_ip4()
2684         self.pg1.unconfig_ip4()
2685         self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1})
2686         self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2})
2687         self.pg0.set_table_ip4(vrf_id1)
2688         self.pg1.set_table_ip4(vrf_id2)
2689         self.pg0.config_ip4()
2690         self.pg1.config_ip4()
2691         self.pg0.resolve_arp()
2692         self.pg1.resolve_arp()
2693
2694         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2695         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2696         flags = self.config_flags.NAT44_EI_IF_INSIDE
2697         self.vapi.nat44_ei_interface_add_del_feature(
2698             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2699         )
2700         self.vapi.nat44_ei_interface_add_del_feature(
2701             sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
2702         )
2703         self.vapi.nat44_ei_interface_add_del_feature(
2704             sw_if_index=self.pg2.sw_if_index, is_add=1
2705         )
2706
2707         try:
2708             # first VRF
2709             pkts = self.create_stream_in(self.pg0, self.pg2)
2710             self.pg0.add_stream(pkts)
2711             self.pg_enable_capture(self.pg_interfaces)
2712             self.pg_start()
2713             capture = self.pg2.get_capture(len(pkts))
2714             self.verify_capture_out(capture, nat_ip1)
2715
2716             # second VRF
2717             pkts = self.create_stream_in(self.pg1, self.pg2)
2718             self.pg1.add_stream(pkts)
2719             self.pg_enable_capture(self.pg_interfaces)
2720             self.pg_start()
2721             capture = self.pg2.get_capture(len(pkts))
2722             self.verify_capture_out(capture, nat_ip2)
2723
2724         finally:
2725             self.pg0.unconfig_ip4()
2726             self.pg1.unconfig_ip4()
2727             self.pg0.set_table_ip4(0)
2728             self.pg1.set_table_ip4(0)
2729             self.pg0.config_ip4()
2730             self.pg1.config_ip4()
2731             self.pg0.resolve_arp()
2732             self.pg1.resolve_arp()
2733             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id1})
2734             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id2})
2735
2736     def test_vrf_feature_independent(self):
2737         """NAT44EI tenant VRF independent address pool mode"""
2738
2739         nat_ip1 = "10.0.0.10"
2740         nat_ip2 = "10.0.0.11"
2741
2742         self.nat44_add_address(nat_ip1)
2743         self.nat44_add_address(nat_ip2, vrf_id=99)
2744         flags = self.config_flags.NAT44_EI_IF_INSIDE
2745         self.vapi.nat44_ei_interface_add_del_feature(
2746             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2747         )
2748         self.vapi.nat44_ei_interface_add_del_feature(
2749             sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
2750         )
2751         self.vapi.nat44_ei_interface_add_del_feature(
2752             sw_if_index=self.pg2.sw_if_index, is_add=1
2753         )
2754
2755         # first VRF
2756         pkts = self.create_stream_in(self.pg0, self.pg2)
2757         self.pg0.add_stream(pkts)
2758         self.pg_enable_capture(self.pg_interfaces)
2759         self.pg_start()
2760         capture = self.pg2.get_capture(len(pkts))
2761         self.verify_capture_out(capture, nat_ip1)
2762
2763         # second VRF
2764         pkts = self.create_stream_in(self.pg1, self.pg2)
2765         self.pg1.add_stream(pkts)
2766         self.pg_enable_capture(self.pg_interfaces)
2767         self.pg_start()
2768         capture = self.pg2.get_capture(len(pkts))
2769         self.verify_capture_out(capture, nat_ip1)
2770
2771     def test_dynamic_ipless_interfaces(self):
2772         """NAT44EI interfaces without configured IP address"""
2773         self.create_routes_and_neigbors()
2774         self.nat44_add_address(self.nat_addr)
2775         flags = self.config_flags.NAT44_EI_IF_INSIDE
2776         self.vapi.nat44_ei_interface_add_del_feature(
2777             sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2778         )
2779         self.vapi.nat44_ei_interface_add_del_feature(
2780             sw_if_index=self.pg8.sw_if_index, is_add=1
2781         )
2782
2783         # in2out
2784         pkts = self.create_stream_in(self.pg7, self.pg8)
2785         self.pg7.add_stream(pkts)
2786         self.pg_enable_capture(self.pg_interfaces)
2787         self.pg_start()
2788         capture = self.pg8.get_capture(len(pkts))
2789         self.verify_capture_out(capture)
2790
2791         # out2in
2792         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2793         self.pg8.add_stream(pkts)
2794         self.pg_enable_capture(self.pg_interfaces)
2795         self.pg_start()
2796         capture = self.pg7.get_capture(len(pkts))
2797         self.verify_capture_in(capture, self.pg7)
2798
2799     def test_static_ipless_interfaces(self):
2800         """NAT44EI interfaces without configured IP address - 1:1 NAT"""
2801
2802         self.create_routes_and_neigbors()
2803         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2804         flags = self.config_flags.NAT44_EI_IF_INSIDE
2805         self.vapi.nat44_ei_interface_add_del_feature(
2806             sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2807         )
2808         self.vapi.nat44_ei_interface_add_del_feature(
2809             sw_if_index=self.pg8.sw_if_index, is_add=1
2810         )
2811
2812         # out2in
2813         pkts = self.create_stream_out(self.pg8)
2814         self.pg8.add_stream(pkts)
2815         self.pg_enable_capture(self.pg_interfaces)
2816         self.pg_start()
2817         capture = self.pg7.get_capture(len(pkts))
2818         self.verify_capture_in(capture, self.pg7)
2819
2820         # in2out
2821         pkts = self.create_stream_in(self.pg7, self.pg8)
2822         self.pg7.add_stream(pkts)
2823         self.pg_enable_capture(self.pg_interfaces)
2824         self.pg_start()
2825         capture = self.pg8.get_capture(len(pkts))
2826         self.verify_capture_out(capture, self.nat_addr, True)
2827
2828     def test_static_with_port_ipless_interfaces(self):
2829         """NAT44EI interfaces without configured IP address - 1:1 NAPT"""
2830
2831         self.tcp_port_out = 30606
2832         self.udp_port_out = 30607
2833         self.icmp_id_out = 30608
2834
2835         self.create_routes_and_neigbors()
2836         self.nat44_add_address(self.nat_addr)
2837         self.nat44_add_static_mapping(
2838             self.pg7.remote_ip4,
2839             self.nat_addr,
2840             self.tcp_port_in,
2841             self.tcp_port_out,
2842             proto=IP_PROTOS.tcp,
2843         )
2844         self.nat44_add_static_mapping(
2845             self.pg7.remote_ip4,
2846             self.nat_addr,
2847             self.udp_port_in,
2848             self.udp_port_out,
2849             proto=IP_PROTOS.udp,
2850         )
2851         self.nat44_add_static_mapping(
2852             self.pg7.remote_ip4,
2853             self.nat_addr,
2854             self.icmp_id_in,
2855             self.icmp_id_out,
2856             proto=IP_PROTOS.icmp,
2857         )
2858         flags = self.config_flags.NAT44_EI_IF_INSIDE
2859         self.vapi.nat44_ei_interface_add_del_feature(
2860             sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2861         )
2862         self.vapi.nat44_ei_interface_add_del_feature(
2863             sw_if_index=self.pg8.sw_if_index, is_add=1
2864         )
2865
2866         # out2in
2867         pkts = self.create_stream_out(self.pg8)
2868         self.pg8.add_stream(pkts)
2869         self.pg_enable_capture(self.pg_interfaces)
2870         self.pg_start()
2871         capture = self.pg7.get_capture(len(pkts))
2872         self.verify_capture_in(capture, self.pg7)
2873
2874         # in2out
2875         pkts = self.create_stream_in(self.pg7, self.pg8)
2876         self.pg7.add_stream(pkts)
2877         self.pg_enable_capture(self.pg_interfaces)
2878         self.pg_start()
2879         capture = self.pg8.get_capture(len(pkts))
2880         self.verify_capture_out(capture)
2881
2882     def test_static_unknown_proto(self):
2883         """NAT44EI 1:1 translate packet with unknown protocol"""
2884         nat_ip = "10.0.0.10"
2885         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2886         flags = self.config_flags.NAT44_EI_IF_INSIDE
2887         self.vapi.nat44_ei_interface_add_del_feature(
2888             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2889         )
2890         self.vapi.nat44_ei_interface_add_del_feature(
2891             sw_if_index=self.pg1.sw_if_index, is_add=1
2892         )
2893
2894         # in2out
2895         p = (
2896             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2897             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2898             / GRE()
2899             / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4)
2900             / TCP(sport=1234, dport=1234)
2901         )
2902         self.pg0.add_stream(p)
2903         self.pg_enable_capture(self.pg_interfaces)
2904         self.pg_start()
2905         p = self.pg1.get_capture(1)
2906         packet = p[0]
2907         try:
2908             self.assertEqual(packet[IP].src, nat_ip)
2909             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2910             self.assertEqual(packet.haslayer(GRE), 1)
2911             self.assert_packet_checksums_valid(packet)
2912         except:
2913             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2914             raise
2915
2916         # out2in
2917         p = (
2918             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2919             / IP(src=self.pg1.remote_ip4, dst=nat_ip)
2920             / GRE()
2921             / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4)
2922             / TCP(sport=1234, dport=1234)
2923         )
2924         self.pg1.add_stream(p)
2925         self.pg_enable_capture(self.pg_interfaces)
2926         self.pg_start()
2927         p = self.pg0.get_capture(1)
2928         packet = p[0]
2929         try:
2930             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2931             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2932             self.assertEqual(packet.haslayer(GRE), 1)
2933             self.assert_packet_checksums_valid(packet)
2934         except:
2935             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2936             raise
2937
2938     def test_hairpinning_static_unknown_proto(self):
2939         """NAT44EI 1:1 translate packet with unknown protocol - hairpinning"""
2940
2941         host = self.pg0.remote_hosts[0]
2942         server = self.pg0.remote_hosts[1]
2943
2944         host_nat_ip = "10.0.0.10"
2945         server_nat_ip = "10.0.0.11"
2946
2947         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2948         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2949         flags = self.config_flags.NAT44_EI_IF_INSIDE
2950         self.vapi.nat44_ei_interface_add_del_feature(
2951             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2952         )
2953         self.vapi.nat44_ei_interface_add_del_feature(
2954             sw_if_index=self.pg1.sw_if_index, is_add=1
2955         )
2956
2957         # host to server
2958         p = (
2959             Ether(dst=self.pg0.local_mac, src=host.mac)
2960             / IP(src=host.ip4, dst=server_nat_ip)
2961             / GRE()
2962             / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4)
2963             / TCP(sport=1234, dport=1234)
2964         )
2965         self.pg0.add_stream(p)
2966         self.pg_enable_capture(self.pg_interfaces)
2967         self.pg_start()
2968         p = self.pg0.get_capture(1)
2969         packet = p[0]
2970         try:
2971             self.assertEqual(packet[IP].src, host_nat_ip)
2972             self.assertEqual(packet[IP].dst, server.ip4)
2973             self.assertEqual(packet.haslayer(GRE), 1)
2974             self.assert_packet_checksums_valid(packet)
2975         except:
2976             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2977             raise
2978
2979         # server to host
2980         p = (
2981             Ether(dst=self.pg0.local_mac, src=server.mac)
2982             / IP(src=server.ip4, dst=host_nat_ip)
2983             / GRE()
2984             / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4)
2985             / TCP(sport=1234, dport=1234)
2986         )
2987         self.pg0.add_stream(p)
2988         self.pg_enable_capture(self.pg_interfaces)
2989         self.pg_start()
2990         p = self.pg0.get_capture(1)
2991         packet = p[0]
2992         try:
2993             self.assertEqual(packet[IP].src, server_nat_ip)
2994             self.assertEqual(packet[IP].dst, host.ip4)
2995             self.assertEqual(packet.haslayer(GRE), 1)
2996             self.assert_packet_checksums_valid(packet)
2997         except:
2998             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2999             raise
3000
3001     def test_output_feature(self):
3002         """NAT44EI output feature (in2out postrouting)"""
3003         self.nat44_add_address(self.nat_addr)
3004         self.vapi.nat44_ei_add_del_output_interface(
3005             sw_if_index=self.pg3.sw_if_index, is_add=1
3006         )
3007
3008         # in2out
3009         pkts = self.create_stream_in(self.pg0, self.pg3)
3010         self.pg0.add_stream(pkts)
3011         self.pg_enable_capture(self.pg_interfaces)
3012         self.pg_start()
3013         capture = self.pg3.get_capture(len(pkts))
3014         self.verify_capture_out(capture)
3015
3016         # out2in
3017         pkts = self.create_stream_out(self.pg3)
3018         self.pg3.add_stream(pkts)
3019         self.pg_enable_capture(self.pg_interfaces)
3020         self.pg_start()
3021         capture = self.pg0.get_capture(len(pkts))
3022         self.verify_capture_in(capture, self.pg0)
3023
3024         # from non-NAT interface to NAT inside interface
3025         pkts = self.create_stream_in(self.pg2, self.pg0)
3026         self.pg2.add_stream(pkts)
3027         self.pg_enable_capture(self.pg_interfaces)
3028         self.pg_start()
3029         capture = self.pg0.get_capture(len(pkts))
3030         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3031
3032     def test_output_feature_vrf_aware(self):
3033         """NAT44EI output feature VRF aware (in2out postrouting)"""
3034         nat_ip_vrf10 = "10.0.0.10"
3035         nat_ip_vrf20 = "10.0.0.20"
3036
3037         r1 = VppIpRoute(
3038             self,
3039             self.pg3.remote_ip4,
3040             32,
3041             [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
3042             table_id=10,
3043         )
3044         r2 = VppIpRoute(
3045             self,
3046             self.pg3.remote_ip4,
3047             32,
3048             [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
3049             table_id=20,
3050         )
3051         r1.add_vpp_config()
3052         r2.add_vpp_config()
3053
3054         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3055         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3056         self.vapi.nat44_ei_add_del_output_interface(
3057             sw_if_index=self.pg3.sw_if_index, is_add=1
3058         )
3059
3060         # in2out VRF 10
3061         pkts = self.create_stream_in(self.pg4, self.pg3)
3062         self.pg4.add_stream(pkts)
3063         self.pg_enable_capture(self.pg_interfaces)
3064         self.pg_start()
3065         capture = self.pg3.get_capture(len(pkts))
3066         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3067
3068         # out2in VRF 10
3069         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3070         self.pg3.add_stream(pkts)
3071         self.pg_enable_capture(self.pg_interfaces)
3072         self.pg_start()
3073         capture = self.pg4.get_capture(len(pkts))
3074         self.verify_capture_in(capture, self.pg4)
3075
3076         # in2out VRF 20
3077         pkts = self.create_stream_in(self.pg6, self.pg3)
3078         self.pg6.add_stream(pkts)
3079         self.pg_enable_capture(self.pg_interfaces)
3080         self.pg_start()
3081         capture = self.pg3.get_capture(len(pkts))
3082         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3083
3084         # out2in VRF 20
3085         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3086         self.pg3.add_stream(pkts)
3087         self.pg_enable_capture(self.pg_interfaces)
3088         self.pg_start()
3089         capture = self.pg6.get_capture(len(pkts))
3090         self.verify_capture_in(capture, self.pg6)
3091
3092     def test_output_feature_hairpinning(self):
3093         """NAT44EI output feature hairpinning (in2out postrouting)"""
3094         host = self.pg0.remote_hosts[0]
3095         server = self.pg0.remote_hosts[1]
3096         host_in_port = 1234
3097         host_out_port = 0
3098         server_in_port = 5678
3099         server_out_port = 8765
3100
3101         self.nat44_add_address(self.nat_addr)
3102         self.vapi.nat44_ei_add_del_output_interface(
3103             sw_if_index=self.pg0.sw_if_index, is_add=1
3104         )
3105         self.vapi.nat44_ei_add_del_output_interface(
3106             sw_if_index=self.pg1.sw_if_index, is_add=1
3107         )
3108
3109         # add static mapping for server
3110         self.nat44_add_static_mapping(
3111             server.ip4,
3112             self.nat_addr,
3113             server_in_port,
3114             server_out_port,
3115             proto=IP_PROTOS.tcp,
3116         )
3117
3118         # send packet from host to server
3119         p = (
3120             Ether(src=host.mac, dst=self.pg0.local_mac)
3121             / IP(src=host.ip4, dst=self.nat_addr)
3122             / TCP(sport=host_in_port, dport=server_out_port)
3123         )
3124         self.pg0.add_stream(p)
3125         self.pg_enable_capture(self.pg_interfaces)
3126         self.pg_start()
3127         capture = self.pg0.get_capture(1)
3128         p = capture[0]
3129         try:
3130             ip = p[IP]
3131             tcp = p[TCP]
3132             self.assertEqual(ip.src, self.nat_addr)
3133             self.assertEqual(ip.dst, server.ip4)
3134             self.assertNotEqual(tcp.sport, host_in_port)
3135             self.assertEqual(tcp.dport, server_in_port)
3136             self.assert_packet_checksums_valid(p)
3137             host_out_port = tcp.sport
3138         except:
3139             self.logger.error(ppp("Unexpected or invalid packet:", p))
3140             raise
3141
3142         # send reply from server to host
3143         p = (
3144             Ether(src=server.mac, dst=self.pg0.local_mac)
3145             / IP(src=server.ip4, dst=self.nat_addr)
3146             / TCP(sport=server_in_port, dport=host_out_port)
3147         )
3148         self.pg0.add_stream(p)
3149         self.pg_enable_capture(self.pg_interfaces)
3150         self.pg_start()
3151         capture = self.pg0.get_capture(1)
3152         p = capture[0]
3153         try:
3154             ip = p[IP]
3155             tcp = p[TCP]
3156             self.assertEqual(ip.src, self.nat_addr)
3157             self.assertEqual(ip.dst, host.ip4)
3158             self.assertEqual(tcp.sport, server_out_port)
3159             self.assertEqual(tcp.dport, host_in_port)
3160             self.assert_packet_checksums_valid(p)
3161         except:
3162             self.logger.error(ppp("Unexpected or invalid packet:", p))
3163             raise
3164
3165     def test_one_armed_nat44(self):
3166         """NAT44EI One armed NAT"""
3167         remote_host = self.pg9.remote_hosts[0]
3168         local_host = self.pg9.remote_hosts[1]
3169         external_port = 0
3170
3171         self.nat44_add_address(self.nat_addr)
3172         flags = self.config_flags.NAT44_EI_IF_INSIDE
3173         self.vapi.nat44_ei_interface_add_del_feature(
3174             sw_if_index=self.pg9.sw_if_index, is_add=1
3175         )
3176         self.vapi.nat44_ei_interface_add_del_feature(
3177             sw_if_index=self.pg9.sw_if_index, flags=flags, is_add=1
3178         )
3179
3180         # in2out
3181         p = (
3182             Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac)
3183             / IP(src=local_host.ip4, dst=remote_host.ip4)
3184             / TCP(sport=12345, dport=80)
3185         )
3186         self.pg9.add_stream(p)
3187         self.pg_enable_capture(self.pg_interfaces)
3188         self.pg_start()
3189         capture = self.pg9.get_capture(1)
3190         p = capture[0]
3191         try:
3192             ip = p[IP]
3193             tcp = p[TCP]
3194             self.assertEqual(ip.src, self.nat_addr)
3195             self.assertEqual(ip.dst, remote_host.ip4)
3196             self.assertNotEqual(tcp.sport, 12345)
3197             external_port = tcp.sport
3198             self.assertEqual(tcp.dport, 80)
3199             self.assert_packet_checksums_valid(p)
3200         except:
3201             self.logger.error(ppp("Unexpected or invalid packet:", p))
3202             raise
3203
3204         # out2in
3205         p = (
3206             Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac)
3207             / IP(src=remote_host.ip4, dst=self.nat_addr)
3208             / TCP(sport=80, dport=external_port)
3209         )
3210         self.pg9.add_stream(p)
3211         self.pg_enable_capture(self.pg_interfaces)
3212         self.pg_start()
3213         capture = self.pg9.get_capture(1)
3214         p = capture[0]
3215         try:
3216             ip = p[IP]
3217             tcp = p[TCP]
3218             self.assertEqual(ip.src, remote_host.ip4)
3219             self.assertEqual(ip.dst, local_host.ip4)
3220             self.assertEqual(tcp.sport, 80)
3221             self.assertEqual(tcp.dport, 12345)
3222             self.assert_packet_checksums_valid(p)
3223         except:
3224             self.logger.error(ppp("Unexpected or invalid packet:", p))
3225             raise
3226
3227         if self.vpp_worker_count > 1:
3228             node = "nat44-ei-handoff-classify"
3229         else:
3230             node = "nat44-ei-classify"
3231
3232         err = self.statistics.get_err_counter("/err/%s/next in2out" % node)
3233         self.assertEqual(err, 1)
3234         err = self.statistics.get_err_counter("/err/%s/next out2in" % node)
3235         self.assertEqual(err, 1)
3236
3237     def test_del_session(self):
3238         """NAT44EI delete session"""
3239         self.nat44_add_address(self.nat_addr)
3240         flags = self.config_flags.NAT44_EI_IF_INSIDE
3241         self.vapi.nat44_ei_interface_add_del_feature(
3242             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3243         )
3244         self.vapi.nat44_ei_interface_add_del_feature(
3245             sw_if_index=self.pg1.sw_if_index, is_add=1
3246         )
3247
3248         pkts = self.create_stream_in(self.pg0, self.pg1)
3249         self.pg0.add_stream(pkts)
3250         self.pg_enable_capture(self.pg_interfaces)
3251         self.pg_start()
3252         self.pg1.get_capture(len(pkts))
3253
3254         sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
3255         nsessions = len(sessions)
3256
3257         self.vapi.nat44_ei_del_session(
3258             address=sessions[0].inside_ip_address,
3259             port=sessions[0].inside_port,
3260             protocol=sessions[0].protocol,
3261             flags=self.config_flags.NAT44_EI_IF_INSIDE,
3262         )
3263
3264         self.vapi.nat44_ei_del_session(
3265             address=sessions[1].outside_ip_address,
3266             port=sessions[1].outside_port,
3267             protocol=sessions[1].protocol,
3268         )
3269
3270         sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
3271         self.assertEqual(nsessions - len(sessions), 2)
3272
3273         self.vapi.nat44_ei_del_session(
3274             address=sessions[0].inside_ip_address,
3275             port=sessions[0].inside_port,
3276             protocol=sessions[0].protocol,
3277             flags=self.config_flags.NAT44_EI_IF_INSIDE,
3278         )
3279
3280         self.verify_no_nat44_user()
3281
3282     def test_frag_in_order(self):
3283         """NAT44EI translate fragments arriving in order"""
3284
3285         self.nat44_add_address(self.nat_addr)
3286         flags = self.config_flags.NAT44_EI_IF_INSIDE
3287         self.vapi.nat44_ei_interface_add_del_feature(
3288             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3289         )
3290         self.vapi.nat44_ei_interface_add_del_feature(
3291             sw_if_index=self.pg1.sw_if_index, is_add=1
3292         )
3293
3294         self.frag_in_order(proto=IP_PROTOS.tcp)
3295         self.frag_in_order(proto=IP_PROTOS.udp)
3296         self.frag_in_order(proto=IP_PROTOS.icmp)
3297
3298     def test_frag_forwarding(self):
3299         """NAT44EI forwarding fragment test"""
3300         self.vapi.nat44_ei_add_del_interface_addr(
3301             is_add=1, sw_if_index=self.pg1.sw_if_index
3302         )
3303         flags = self.config_flags.NAT44_EI_IF_INSIDE
3304         self.vapi.nat44_ei_interface_add_del_feature(
3305             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3306         )
3307         self.vapi.nat44_ei_interface_add_del_feature(
3308             sw_if_index=self.pg1.sw_if_index, is_add=1
3309         )
3310         self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
3311
3312         data = b"A" * 16 + b"B" * 16 + b"C" * 3
3313         pkts = self.create_stream_frag(
3314             self.pg1, self.pg0.remote_ip4, 4789, 4789, data, proto=IP_PROTOS.udp
3315         )
3316         self.pg1.add_stream(pkts)
3317         self.pg_enable_capture(self.pg_interfaces)
3318         self.pg_start()
3319         frags = self.pg0.get_capture(len(pkts))
3320         p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
3321         self.assertEqual(p[UDP].sport, 4789)
3322         self.assertEqual(p[UDP].dport, 4789)
3323         self.assertEqual(data, p[Raw].load)
3324
3325     def test_reass_hairpinning(self):
3326         """NAT44EI fragments hairpinning"""
3327
3328         server_addr = self.pg0.remote_hosts[1].ip4
3329         host_in_port = random.randint(1025, 65535)
3330         server_in_port = random.randint(1025, 65535)
3331         server_out_port = random.randint(1025, 65535)
3332
3333         self.nat44_add_address(self.nat_addr)
3334         flags = self.config_flags.NAT44_EI_IF_INSIDE
3335         self.vapi.nat44_ei_interface_add_del_feature(
3336             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3337         )
3338         self.vapi.nat44_ei_interface_add_del_feature(
3339             sw_if_index=self.pg1.sw_if_index, is_add=1
3340         )
3341         # add static mapping for server
3342         self.nat44_add_static_mapping(
3343             server_addr,
3344             self.nat_addr,
3345             server_in_port,
3346             server_out_port,
3347             proto=IP_PROTOS.tcp,
3348         )
3349         self.nat44_add_static_mapping(
3350             server_addr,
3351             self.nat_addr,
3352             server_in_port,
3353             server_out_port,
3354             proto=IP_PROTOS.udp,
3355         )
3356         self.nat44_add_static_mapping(server_addr, self.nat_addr)
3357
3358         self.reass_hairpinning(
3359             server_addr,
3360             server_in_port,
3361             server_out_port,
3362             host_in_port,
3363             proto=IP_PROTOS.tcp,
3364         )
3365         self.reass_hairpinning(
3366             server_addr,
3367             server_in_port,
3368             server_out_port,
3369             host_in_port,
3370             proto=IP_PROTOS.udp,
3371         )
3372         self.reass_hairpinning(
3373             server_addr,
3374             server_in_port,
3375             server_out_port,
3376             host_in_port,
3377             proto=IP_PROTOS.icmp,
3378         )
3379
3380     def test_frag_out_of_order(self):
3381         """NAT44EI translate fragments arriving out of order"""
3382
3383         self.nat44_add_address(self.nat_addr)
3384         flags = self.config_flags.NAT44_EI_IF_INSIDE
3385         self.vapi.nat44_ei_interface_add_del_feature(
3386             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3387         )
3388         self.vapi.nat44_ei_interface_add_del_feature(
3389             sw_if_index=self.pg1.sw_if_index, is_add=1
3390         )
3391
3392         self.frag_out_of_order(proto=IP_PROTOS.tcp)
3393         self.frag_out_of_order(proto=IP_PROTOS.udp)
3394         self.frag_out_of_order(proto=IP_PROTOS.icmp)
3395
3396     def test_port_restricted(self):
3397         """NAT44EI Port restricted NAT44EI (MAP-E CE)"""
3398         self.nat44_add_address(self.nat_addr)
3399         flags = self.config_flags.NAT44_EI_IF_INSIDE
3400         self.vapi.nat44_ei_interface_add_del_feature(
3401             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3402         )
3403         self.vapi.nat44_ei_interface_add_del_feature(
3404             sw_if_index=self.pg1.sw_if_index, is_add=1
3405         )
3406         self.vapi.nat44_ei_set_addr_and_port_alloc_alg(
3407             alg=1, psid_offset=6, psid_length=6, psid=10
3408         )
3409
3410         p = (
3411             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3412             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3413             / TCP(sport=4567, dport=22)
3414         )
3415         self.pg0.add_stream(p)
3416         self.pg_enable_capture(self.pg_interfaces)
3417         self.pg_start()
3418         capture = self.pg1.get_capture(1)
3419         p = capture[0]
3420         try:
3421             ip = p[IP]
3422             tcp = p[TCP]
3423             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3424             self.assertEqual(ip.src, self.nat_addr)
3425             self.assertEqual(tcp.dport, 22)
3426             self.assertNotEqual(tcp.sport, 4567)
3427             self.assertEqual((tcp.sport >> 6) & 63, 10)
3428             self.assert_packet_checksums_valid(p)
3429         except:
3430             self.logger.error(ppp("Unexpected or invalid packet:", p))
3431             raise
3432
3433     def test_port_range(self):
3434         """NAT44EI External address port range"""
3435         self.nat44_add_address(self.nat_addr)
3436         flags = self.config_flags.NAT44_EI_IF_INSIDE
3437         self.vapi.nat44_ei_interface_add_del_feature(
3438             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3439         )
3440         self.vapi.nat44_ei_interface_add_del_feature(
3441             sw_if_index=self.pg1.sw_if_index, is_add=1
3442         )
3443         self.vapi.nat44_ei_set_addr_and_port_alloc_alg(
3444             alg=2, start_port=1025, end_port=1027
3445         )
3446
3447         pkts = []
3448         for port in range(0, 5):
3449             p = (
3450                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3451                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3452                 / TCP(sport=1125 + port)
3453             )
3454             pkts.append(p)
3455         self.pg0.add_stream(pkts)
3456         self.pg_enable_capture(self.pg_interfaces)
3457         self.pg_start()
3458         capture = self.pg1.get_capture(3)
3459         for p in capture:
3460             tcp = p[TCP]
3461             self.assertGreaterEqual(tcp.sport, 1025)
3462             self.assertLessEqual(tcp.sport, 1027)
3463
3464     def test_multiple_outside_vrf(self):
3465         """NAT44EI Multiple outside VRF"""
3466         vrf_id1 = 1
3467         vrf_id2 = 2
3468
3469         self.pg1.unconfig_ip4()
3470         self.pg2.unconfig_ip4()
3471         self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1})
3472         self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2})
3473         self.pg1.set_table_ip4(vrf_id1)
3474         self.pg2.set_table_ip4(vrf_id2)
3475         self.pg1.config_ip4()
3476         self.pg2.config_ip4()
3477         self.pg1.resolve_arp()
3478         self.pg2.resolve_arp()
3479
3480         self.nat44_add_address(self.nat_addr)
3481         flags = self.config_flags.NAT44_EI_IF_INSIDE
3482         self.vapi.nat44_ei_interface_add_del_feature(
3483             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3484         )
3485         self.vapi.nat44_ei_interface_add_del_feature(
3486             sw_if_index=self.pg1.sw_if_index, is_add=1
3487         )
3488         self.vapi.nat44_ei_interface_add_del_feature(
3489             sw_if_index=self.pg2.sw_if_index, is_add=1
3490         )
3491
3492         try:
3493             # first VRF
3494             pkts = self.create_stream_in(self.pg0, self.pg1)
3495             self.pg0.add_stream(pkts)
3496             self.pg_enable_capture(self.pg_interfaces)
3497             self.pg_start()
3498             capture = self.pg1.get_capture(len(pkts))
3499             self.verify_capture_out(capture, self.nat_addr)
3500
3501             pkts = self.create_stream_out(self.pg1, self.nat_addr)
3502             self.pg1.add_stream(pkts)
3503             self.pg_enable_capture(self.pg_interfaces)
3504             self.pg_start()
3505             capture = self.pg0.get_capture(len(pkts))
3506             self.verify_capture_in(capture, self.pg0)
3507
3508             self.tcp_port_in = 60303
3509             self.udp_port_in = 60304
3510             self.icmp_id_in = 60305
3511
3512             # second VRF
3513             pkts = self.create_stream_in(self.pg0, self.pg2)
3514             self.pg0.add_stream(pkts)
3515             self.pg_enable_capture(self.pg_interfaces)
3516             self.pg_start()
3517             capture = self.pg2.get_capture(len(pkts))
3518             self.verify_capture_out(capture, self.nat_addr)
3519
3520             pkts = self.create_stream_out(self.pg2, self.nat_addr)
3521             self.pg2.add_stream(pkts)
3522             self.pg_enable_capture(self.pg_interfaces)
3523             self.pg_start()
3524             capture = self.pg0.get_capture(len(pkts))
3525             self.verify_capture_in(capture, self.pg0)
3526
3527         finally:
3528             self.nat44_add_address(self.nat_addr, is_add=0)
3529             self.pg1.unconfig_ip4()
3530             self.pg2.unconfig_ip4()
3531             self.pg1.set_table_ip4(0)
3532             self.pg2.set_table_ip4(0)
3533             self.pg1.config_ip4()
3534             self.pg2.config_ip4()
3535             self.pg1.resolve_arp()
3536             self.pg2.resolve_arp()
3537
3538     def test_mss_clamping(self):
3539         """NAT44EI TCP MSS clamping"""
3540         self.nat44_add_address(self.nat_addr)
3541         flags = self.config_flags.NAT44_EI_IF_INSIDE
3542         self.vapi.nat44_ei_interface_add_del_feature(
3543             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3544         )
3545         self.vapi.nat44_ei_interface_add_del_feature(
3546             sw_if_index=self.pg1.sw_if_index, is_add=1
3547         )
3548
3549         p = (
3550             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3551             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3552             / TCP(
3553                 sport=self.tcp_port_in,
3554                 dport=self.tcp_external_port,
3555                 flags="S",
3556                 options=[("MSS", 1400)],
3557             )
3558         )
3559
3560         self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1000)
3561         self.pg0.add_stream(p)
3562         self.pg_enable_capture(self.pg_interfaces)
3563         self.pg_start()
3564         capture = self.pg1.get_capture(1)
3565         # Negotiated MSS value greater than configured - changed
3566         self.verify_mss_value(capture[0], 1000)
3567
3568         self.vapi.nat44_ei_set_mss_clamping(enable=0, mss_value=1500)
3569         self.pg0.add_stream(p)
3570         self.pg_enable_capture(self.pg_interfaces)
3571         self.pg_start()
3572         capture = self.pg1.get_capture(1)
3573         # MSS clamping disabled - negotiated MSS unchanged
3574         self.verify_mss_value(capture[0], 1400)
3575
3576         self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1500)
3577         self.pg0.add_stream(p)
3578         self.pg_enable_capture(self.pg_interfaces)
3579         self.pg_start()
3580         capture = self.pg1.get_capture(1)
3581         # Negotiated MSS value smaller than configured - unchanged
3582         self.verify_mss_value(capture[0], 1400)
3583
3584     def test_ha_send(self):
3585         """NAT44EI Send HA session synchronization events (active)"""
3586         flags = self.config_flags.NAT44_EI_IF_INSIDE
3587         self.vapi.nat44_ei_interface_add_del_feature(
3588             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3589         )
3590         self.vapi.nat44_ei_interface_add_del_feature(
3591             sw_if_index=self.pg1.sw_if_index, is_add=1
3592         )
3593         self.nat44_add_address(self.nat_addr)
3594
3595         self.vapi.nat44_ei_ha_set_listener(
3596             ip_address=self.pg3.local_ip4, port=12345, path_mtu=512
3597         )
3598         self.vapi.nat44_ei_ha_set_failover(
3599             ip_address=self.pg3.remote_ip4, port=12346, session_refresh_interval=10
3600         )
3601         bind_layers(UDP, HANATStateSync, sport=12345)
3602
3603         # create sessions
3604         pkts = self.create_stream_in(self.pg0, self.pg1)
3605         self.pg0.add_stream(pkts)
3606         self.pg_enable_capture(self.pg_interfaces)
3607         self.pg_start()
3608         capture = self.pg1.get_capture(len(pkts))
3609         self.verify_capture_out(capture)
3610         # active send HA events
3611         self.vapi.nat44_ei_ha_flush()
3612         stats = self.statistics["/nat44-ei/ha/add-event-send"]
3613         self.assertEqual(stats[:, 0].sum(), 3)
3614         capture = self.pg3.get_capture(1)
3615         p = capture[0]
3616         self.assert_packet_checksums_valid(p)
3617         try:
3618             ip = p[IP]
3619             udp = p[UDP]
3620             hanat = p[HANATStateSync]
3621         except IndexError:
3622             self.logger.error(ppp("Invalid packet:", p))
3623             raise
3624         else:
3625             self.assertEqual(ip.src, self.pg3.local_ip4)
3626             self.assertEqual(ip.dst, self.pg3.remote_ip4)
3627             self.assertEqual(udp.sport, 12345)
3628             self.assertEqual(udp.dport, 12346)
3629             self.assertEqual(hanat.version, 1)
3630             # self.assertEqual(hanat.thread_index, 0)
3631             self.assertEqual(hanat.count, 3)
3632             seq = hanat.sequence_number
3633             for event in hanat.events:
3634                 self.assertEqual(event.event_type, 1)
3635                 self.assertEqual(event.in_addr, self.pg0.remote_ip4)
3636                 self.assertEqual(event.out_addr, self.nat_addr)
3637                 self.assertEqual(event.fib_index, 0)
3638
3639         # ACK received events
3640         ack = (
3641             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3642             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3643             / UDP(sport=12346, dport=12345)
3644             / HANATStateSync(
3645                 sequence_number=seq, flags="ACK", thread_index=hanat.thread_index
3646             )
3647         )
3648         self.pg3.add_stream(ack)
3649         self.pg_start()
3650         stats = self.statistics["/nat44-ei/ha/ack-recv"]
3651         self.assertEqual(stats[:, 0].sum(), 1)
3652
3653         # delete one session
3654         self.pg_enable_capture(self.pg_interfaces)
3655         self.vapi.nat44_ei_del_session(
3656             address=self.pg0.remote_ip4,
3657             port=self.tcp_port_in,
3658             protocol=IP_PROTOS.tcp,
3659             flags=self.config_flags.NAT44_EI_IF_INSIDE,
3660         )
3661         self.vapi.nat44_ei_ha_flush()
3662         stats = self.statistics["/nat44-ei/ha/del-event-send"]
3663         self.assertEqual(stats[:, 0].sum(), 1)
3664         capture = self.pg3.get_capture(1)
3665         p = capture[0]
3666         try:
3667             hanat = p[HANATStateSync]
3668         except IndexError:
3669             self.logger.error(ppp("Invalid packet:", p))
3670             raise
3671         else:
3672             self.assertGreater(hanat.sequence_number, seq)
3673
3674         # do not send ACK, active retry send HA event again
3675         self.pg_enable_capture(self.pg_interfaces)
3676         self.virtual_sleep(12)
3677         stats = self.statistics["/nat44-ei/ha/retry-count"]
3678         self.assertEqual(stats[:, 0].sum(), 3)
3679         stats = self.statistics["/nat44-ei/ha/missed-count"]
3680         self.assertEqual(stats[:, 0].sum(), 1)
3681         capture = self.pg3.get_capture(3)
3682         for packet in capture:
3683             self.assertEqual(packet, p)
3684
3685         # session counters refresh
3686         pkts = self.create_stream_out(self.pg1)
3687         self.pg1.add_stream(pkts)
3688         self.pg_enable_capture(self.pg_interfaces)
3689         self.pg_start()
3690         self.pg0.get_capture(2)
3691         self.vapi.nat44_ei_ha_flush()
3692         stats = self.statistics["/nat44-ei/ha/refresh-event-send"]
3693         self.assertEqual(stats[:, 0].sum(), 2)
3694         capture = self.pg3.get_capture(1)
3695         p = capture[0]
3696         self.assert_packet_checksums_valid(p)
3697         try:
3698             ip = p[IP]
3699             udp = p[UDP]
3700             hanat = p[HANATStateSync]
3701         except IndexError:
3702             self.logger.error(ppp("Invalid packet:", p))
3703             raise
3704         else:
3705             self.assertEqual(ip.src, self.pg3.local_ip4)
3706             self.assertEqual(ip.dst, self.pg3.remote_ip4)
3707             self.assertEqual(udp.sport, 12345)
3708             self.assertEqual(udp.dport, 12346)
3709             self.assertEqual(hanat.version, 1)
3710             self.assertEqual(hanat.count, 2)
3711             seq = hanat.sequence_number
3712             for event in hanat.events:
3713                 self.assertEqual(event.event_type, 3)
3714                 self.assertEqual(event.out_addr, self.nat_addr)
3715                 self.assertEqual(event.fib_index, 0)
3716                 self.assertEqual(event.total_pkts, 2)
3717                 self.assertGreater(event.total_bytes, 0)
3718
3719         stats = self.statistics["/nat44-ei/ha/ack-recv"]
3720         ack = (
3721             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3722             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3723             / UDP(sport=12346, dport=12345)
3724             / HANATStateSync(
3725                 sequence_number=seq, flags="ACK", thread_index=hanat.thread_index
3726             )
3727         )
3728         self.pg3.add_stream(ack)
3729         self.pg_start()
3730         stats = self.statistics["/nat44-ei/ha/ack-recv"]
3731         self.assertEqual(stats[:, 0].sum(), 2)
3732
3733     def test_ha_recv(self):
3734         """NAT44EI Receive HA session synchronization events (passive)"""
3735         self.nat44_add_address(self.nat_addr)
3736         flags = self.config_flags.NAT44_EI_IF_INSIDE
3737         self.vapi.nat44_ei_interface_add_del_feature(
3738             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3739         )
3740         self.vapi.nat44_ei_interface_add_del_feature(
3741             sw_if_index=self.pg1.sw_if_index, is_add=1
3742         )
3743         self.vapi.nat44_ei_ha_set_listener(
3744             ip_address=self.pg3.local_ip4, port=12345, path_mtu=512
3745         )
3746         bind_layers(UDP, HANATStateSync, sport=12345)
3747
3748         # this is a bit tricky - HA dictates thread index due to how it's
3749         # designed, but once we use HA to create a session, we also want
3750         # to pass a packet through said session. so the session must end
3751         # up on the correct thread from both directions - in2out (based on
3752         # IP address) and out2in (based on outside port)
3753
3754         # first choose a thread index which is correct for IP
3755         thread_index = get_nat44_ei_in2out_worker_index(
3756             self.pg0.remote_ip4, self.vpp_worker_count
3757         )
3758
3759         # now pick a port which is correct for given thread
3760         port_per_thread = int((0xFFFF - 1024) / max(1, self.vpp_worker_count))
3761         self.tcp_port_out = 1024 + random.randint(1, port_per_thread)
3762         self.udp_port_out = 1024 + random.randint(1, port_per_thread)
3763         if self.vpp_worker_count > 0:
3764             self.tcp_port_out += port_per_thread * (thread_index - 1)
3765             self.udp_port_out += port_per_thread * (thread_index - 1)
3766
3767         # send HA session add events to failover/passive
3768         p = (
3769             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3770             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3771             / UDP(sport=12346, dport=12345)
3772             / HANATStateSync(
3773                 sequence_number=1,
3774                 events=[
3775                     Event(
3776                         event_type="add",
3777                         protocol="tcp",
3778                         in_addr=self.pg0.remote_ip4,
3779                         out_addr=self.nat_addr,
3780                         in_port=self.tcp_port_in,
3781                         out_port=self.tcp_port_out,
3782                         eh_addr=self.pg1.remote_ip4,
3783                         ehn_addr=self.pg1.remote_ip4,
3784                         eh_port=self.tcp_external_port,
3785                         ehn_port=self.tcp_external_port,
3786                         fib_index=0,
3787                     ),
3788                     Event(
3789                         event_type="add",
3790                         protocol="udp",
3791                         in_addr=self.pg0.remote_ip4,
3792                         out_addr=self.nat_addr,
3793                         in_port=self.udp_port_in,
3794                         out_port=self.udp_port_out,
3795                         eh_addr=self.pg1.remote_ip4,
3796                         ehn_addr=self.pg1.remote_ip4,
3797                         eh_port=self.udp_external_port,
3798                         ehn_port=self.udp_external_port,
3799                         fib_index=0,
3800                     ),
3801                 ],
3802                 thread_index=thread_index,
3803             )
3804         )
3805
3806         self.pg3.add_stream(p)
3807         self.pg_enable_capture(self.pg_interfaces)
3808         self.pg_start()
3809         # receive ACK
3810         capture = self.pg3.get_capture(1)
3811         p = capture[0]
3812         try:
3813             hanat = p[HANATStateSync]
3814         except IndexError:
3815             self.logger.error(ppp("Invalid packet:", p))
3816             raise
3817         else:
3818             self.assertEqual(hanat.sequence_number, 1)
3819             self.assertEqual(hanat.flags, "ACK")
3820             self.assertEqual(hanat.version, 1)
3821             self.assertEqual(hanat.thread_index, thread_index)
3822         stats = self.statistics["/nat44-ei/ha/ack-send"]
3823         self.assertEqual(stats[:, 0].sum(), 1)
3824         stats = self.statistics["/nat44-ei/ha/add-event-recv"]
3825         self.assertEqual(stats[:, 0].sum(), 2)
3826         users = self.statistics["/nat44-ei/total-users"]
3827         self.assertEqual(users[:, 0].sum(), 1)
3828         sessions = self.statistics["/nat44-ei/total-sessions"]
3829         self.assertEqual(sessions[:, 0].sum(), 2)
3830         users = self.vapi.nat44_ei_user_dump()
3831         self.assertEqual(len(users), 1)
3832         self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3833         # there should be 2 sessions created by HA
3834         sessions = self.vapi.nat44_ei_user_session_dump(
3835             users[0].ip_address, users[0].vrf_id
3836         )
3837         self.assertEqual(len(sessions), 2)
3838         for session in sessions:
3839             self.assertEqual(str(session.inside_ip_address), self.pg0.remote_ip4)
3840             self.assertEqual(str(session.outside_ip_address), self.nat_addr)
3841             self.assertIn(session.inside_port, [self.tcp_port_in, self.udp_port_in])
3842             self.assertIn(session.outside_port, [self.tcp_port_out, self.udp_port_out])
3843             self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp])
3844
3845         # send HA session delete event to failover/passive
3846         p = (
3847             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3848             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3849             / UDP(sport=12346, dport=12345)
3850             / HANATStateSync(
3851                 sequence_number=2,
3852                 events=[
3853                     Event(
3854                         event_type="del",
3855                         protocol="udp",
3856                         in_addr=self.pg0.remote_ip4,
3857                         out_addr=self.nat_addr,
3858                         in_port=self.udp_port_in,
3859                         out_port=self.udp_port_out,
3860                         eh_addr=self.pg1.remote_ip4,
3861                         ehn_addr=self.pg1.remote_ip4,
3862                         eh_port=self.udp_external_port,
3863                         ehn_port=self.udp_external_port,
3864                         fib_index=0,
3865                     )
3866                 ],
3867                 thread_index=thread_index,
3868             )
3869         )
3870
3871         self.pg3.add_stream(p)
3872         self.pg_enable_capture(self.pg_interfaces)
3873         self.pg_start()
3874         # receive ACK
3875         capture = self.pg3.get_capture(1)
3876         p = capture[0]
3877         try:
3878             hanat = p[HANATStateSync]
3879         except IndexError:
3880             self.logger.error(ppp("Invalid packet:", p))
3881             raise
3882         else:
3883             self.assertEqual(hanat.sequence_number, 2)
3884             self.assertEqual(hanat.flags, "ACK")
3885             self.assertEqual(hanat.version, 1)
3886         users = self.vapi.nat44_ei_user_dump()
3887         self.assertEqual(len(users), 1)
3888         self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3889         # now we should have only 1 session, 1 deleted by HA
3890         sessions = self.vapi.nat44_ei_user_session_dump(
3891             users[0].ip_address, users[0].vrf_id
3892         )
3893         self.assertEqual(len(sessions), 1)
3894         stats = self.statistics["/nat44-ei/ha/del-event-recv"]
3895         self.assertEqual(stats[:, 0].sum(), 1)
3896
3897         stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed")
3898         self.assertEqual(stats, 2)
3899
3900         # send HA session refresh event to failover/passive
3901         p = (
3902             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3903             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3904             / UDP(sport=12346, dport=12345)
3905             / HANATStateSync(
3906                 sequence_number=3,
3907                 events=[
3908                     Event(
3909                         event_type="refresh",
3910                         protocol="tcp",
3911                         in_addr=self.pg0.remote_ip4,
3912                         out_addr=self.nat_addr,
3913                         in_port=self.tcp_port_in,
3914                         out_port=self.tcp_port_out,
3915                         eh_addr=self.pg1.remote_ip4,
3916                         ehn_addr=self.pg1.remote_ip4,
3917                         eh_port=self.tcp_external_port,
3918                         ehn_port=self.tcp_external_port,
3919                         fib_index=0,
3920                         total_bytes=1024,
3921                         total_pkts=2,
3922                     )
3923                 ],
3924                 thread_index=thread_index,
3925             )
3926         )
3927         self.pg3.add_stream(p)
3928         self.pg_enable_capture(self.pg_interfaces)
3929         self.pg_start()
3930         # receive ACK
3931         capture = self.pg3.get_capture(1)
3932         p = capture[0]
3933         try:
3934             hanat = p[HANATStateSync]
3935         except IndexError:
3936             self.logger.error(ppp("Invalid packet:", p))
3937             raise
3938         else:
3939             self.assertEqual(hanat.sequence_number, 3)
3940             self.assertEqual(hanat.flags, "ACK")
3941             self.assertEqual(hanat.version, 1)
3942         users = self.vapi.nat44_ei_user_dump()
3943         self.assertEqual(len(users), 1)
3944         self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3945         sessions = self.vapi.nat44_ei_user_session_dump(
3946             users[0].ip_address, users[0].vrf_id
3947         )
3948         self.assertEqual(len(sessions), 1)
3949         session = sessions[0]
3950         self.assertEqual(session.total_bytes, 1024)
3951         self.assertEqual(session.total_pkts, 2)
3952         stats = self.statistics["/nat44-ei/ha/refresh-event-recv"]
3953         self.assertEqual(stats[:, 0].sum(), 1)
3954
3955         stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed")
3956         self.assertEqual(stats, 3)
3957
3958         # send packet to test session created by HA
3959         p = (
3960             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3961             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3962             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out)
3963         )
3964         self.pg1.add_stream(p)
3965         self.pg_enable_capture(self.pg_interfaces)
3966         self.pg_start()
3967         capture = self.pg0.get_capture(1)
3968         p = capture[0]
3969         try:
3970             ip = p[IP]
3971             tcp = p[TCP]
3972         except IndexError:
3973             self.logger.error(ppp("Invalid packet:", p))
3974             raise
3975         else:
3976             self.assertEqual(ip.src, self.pg1.remote_ip4)
3977             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3978             self.assertEqual(tcp.sport, self.tcp_external_port)
3979             self.assertEqual(tcp.dport, self.tcp_port_in)
3980
3981     def reconfigure_frame_queue_nelts(self, frame_queue_nelts):
3982         self.vapi.nat44_ei_plugin_enable_disable(enable=0)
3983         self.vapi.nat44_ei_set_fq_options(frame_queue_nelts=frame_queue_nelts)
3984         # keep plugin configuration persistent
3985         self.plugin_enable()
3986         return self.vapi.nat44_ei_show_fq_options().frame_queue_nelts
3987
3988     def test_set_frame_queue_nelts(self):
3989         """NAT44EI API test - worker handoff frame queue elements"""
3990         self.assertEqual(self.reconfigure_frame_queue_nelts(512), 512)
3991
3992     def show_commands_at_teardown(self):
3993         self.logger.info(self.vapi.cli("show nat44 ei timeouts"))
3994         self.logger.info(self.vapi.cli("show nat44 ei addresses"))
3995         self.logger.info(self.vapi.cli("show nat44 ei interfaces"))
3996         self.logger.info(self.vapi.cli("show nat44 ei static mappings"))
3997         self.logger.info(self.vapi.cli("show nat44 ei interface address"))
3998         self.logger.info(self.vapi.cli("show nat44 ei sessions detail"))
3999         self.logger.info(self.vapi.cli("show nat44 ei hash tables detail"))
4000         self.logger.info(self.vapi.cli("show nat44 ei ha"))
4001         self.logger.info(self.vapi.cli("show nat44 ei addr-port-assignment-alg"))
4002
4003     def test_outside_address_distribution(self):
4004         """Outside address distribution based on source address"""
4005
4006         x = 100
4007         nat_addresses = []
4008
4009         for i in range(1, x):
4010             a = "10.0.0.%d" % i
4011             nat_addresses.append(a)
4012
4013         flags = self.config_flags.NAT44_EI_IF_INSIDE
4014         self.vapi.nat44_ei_interface_add_del_feature(
4015             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4016         )
4017         self.vapi.nat44_ei_interface_add_del_feature(
4018             sw_if_index=self.pg1.sw_if_index, is_add=1
4019         )
4020
4021         self.vapi.nat44_ei_add_del_address_range(
4022             first_ip_address=nat_addresses[0],
4023             last_ip_address=nat_addresses[-1],
4024             vrf_id=0xFFFFFFFF,
4025             is_add=1,
4026         )
4027
4028         self.pg0.generate_remote_hosts(x)
4029
4030         pkts = []
4031         for i in range(x):
4032             info = self.create_packet_info(self.pg0, self.pg1)
4033             payload = self.info_to_payload(info)
4034             p = (
4035                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4036                 / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4)
4037                 / UDP(sport=7000 + i, dport=8000 + i)
4038                 / Raw(payload)
4039             )
4040             info.data = p
4041             pkts.append(p)
4042
4043         self.pg0.add_stream(pkts)
4044         self.pg_enable_capture(self.pg_interfaces)
4045         self.pg_start()
4046         recvd = self.pg1.get_capture(len(pkts))
4047         for p_recvd in recvd:
4048             payload_info = self.payload_to_info(p_recvd[Raw])
4049             packet_index = payload_info.index
4050             info = self._packet_infos[packet_index]
4051             self.assertTrue(info is not None)
4052             self.assertEqual(packet_index, info.index)
4053             p_sent = info.data
4054             packed = socket.inet_aton(p_sent[IP].src)
4055             numeric = struct.unpack("!L", packed)[0]
4056             numeric = socket.htonl(numeric)
4057             a = nat_addresses[(numeric - 1) % len(nat_addresses)]
4058             self.assertEqual(
4059                 a,
4060                 p_recvd[IP].src,
4061                 "Invalid packet (src IP %s translated to %s, but expected %s)"
4062                 % (p_sent[IP].src, p_recvd[IP].src, a),
4063             )
4064
4065     def test_default_user_sessions(self):
4066         """NAT44EI default per-user session limit is used and reported"""
4067         nat44_ei_config = self.vapi.nat44_ei_show_running_config()
4068         # a nonzero default should be reported for user_sessions
4069         self.assertNotEqual(nat44_ei_config.user_sessions, 0)
4070
4071     def test_delete_interface(self):
4072         """NAT44EI delete nat interface"""
4073
4074         self.nat44_add_address(self.nat_addr)
4075
4076         interfaces = self.create_loopback_interfaces(4)
4077
4078         self.vapi.nat44_ei_interface_add_del_feature(
4079             sw_if_index=interfaces[0].sw_if_index, is_add=1
4080         )
4081         flags = self.config_flags.NAT44_EI_IF_INSIDE
4082         self.vapi.nat44_ei_interface_add_del_feature(
4083             sw_if_index=interfaces[1].sw_if_index, flags=flags, is_add=1
4084         )
4085         flags |= self.config_flags.NAT44_EI_IF_OUTSIDE
4086         self.vapi.nat44_ei_interface_add_del_feature(
4087             sw_if_index=interfaces[2].sw_if_index, flags=flags, is_add=1
4088         )
4089         self.vapi.nat44_ei_add_del_output_interface(
4090             sw_if_index=interfaces[3].sw_if_index, is_add=1
4091         )
4092
4093         nat_sw_if_indices = [
4094             i.sw_if_index
4095             for i in self.vapi.nat44_ei_interface_dump()
4096             + list(self.vapi.vpp.details_iter(self.vapi.nat44_ei_output_interface_get))
4097         ]
4098         self.assertEqual(len(nat_sw_if_indices), len(interfaces))
4099
4100         loopbacks = []
4101         for i in interfaces:
4102             # delete nat-enabled interface
4103             self.assertIn(i.sw_if_index, nat_sw_if_indices)
4104             i.remove_vpp_config()
4105
4106             # create interface with the same index
4107             lo = VppLoInterface(self)
4108             loopbacks.append(lo)
4109             self.assertEqual(lo.sw_if_index, i.sw_if_index)
4110
4111             # check interface is not nat-enabled
4112             nat_sw_if_indices = [
4113                 i.sw_if_index
4114                 for i in self.vapi.nat44_ei_interface_dump()
4115                 + list(
4116                     self.vapi.vpp.details_iter(self.vapi.nat44_ei_output_interface_get)
4117                 )
4118             ]
4119             self.assertNotIn(lo.sw_if_index, nat_sw_if_indices)
4120
4121         for i in loopbacks:
4122             i.remove_vpp_config()
4123
4124
4125 class TestNAT44Out2InDPO(MethodHolder):
4126     """NAT44EI Test Cases using out2in DPO"""
4127
4128     @classmethod
4129     def setUpClass(cls):
4130         super(TestNAT44Out2InDPO, cls).setUpClass()
4131         cls.vapi.cli("set log class nat44-ei level debug")
4132
4133         cls.tcp_port_in = 6303
4134         cls.tcp_port_out = 6303
4135         cls.udp_port_in = 6304
4136         cls.udp_port_out = 6304
4137         cls.icmp_id_in = 6305
4138         cls.icmp_id_out = 6305
4139         cls.nat_addr = "10.0.0.3"
4140         cls.dst_ip4 = "192.168.70.1"
4141
4142         cls.create_pg_interfaces(range(2))
4143
4144         cls.pg0.admin_up()
4145         cls.pg0.config_ip4()
4146         cls.pg0.resolve_arp()
4147
4148         cls.pg1.admin_up()
4149         cls.pg1.config_ip6()
4150         cls.pg1.resolve_ndp()
4151
4152         r1 = VppIpRoute(
4153             cls,
4154             "::",
4155             0,
4156             [VppRoutePath(cls.pg1.remote_ip6, cls.pg1.sw_if_index)],
4157             register=False,
4158         )
4159         r1.add_vpp_config()
4160
4161     def setUp(self):
4162         super(TestNAT44Out2InDPO, self).setUp()
4163         flags = self.config_flags.NAT44_EI_OUT2IN_DPO
4164         self.vapi.nat44_ei_plugin_enable_disable(enable=1, flags=flags)
4165
4166     def tearDown(self):
4167         super(TestNAT44Out2InDPO, self).tearDown()
4168         if not self.vpp_dead:
4169             self.vapi.nat44_ei_plugin_enable_disable(enable=0)
4170             self.vapi.cli("clear logging")
4171
4172     def configure_xlat(self):
4173         self.dst_ip6_pfx = "1:2:3::"
4174         self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.dst_ip6_pfx)
4175         self.dst_ip6_pfx_len = 96
4176         self.src_ip6_pfx = "4:5:6::"
4177         self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.src_ip6_pfx)
4178         self.src_ip6_pfx_len = 96
4179         self.vapi.map_add_domain(
4180             self.dst_ip6_pfx_n,
4181             self.dst_ip6_pfx_len,
4182             self.src_ip6_pfx_n,
4183             self.src_ip6_pfx_len,
4184             "\x00\x00\x00\x00",
4185             0,
4186         )
4187
4188     @unittest.skip("Temporary disabled")
4189     def test_464xlat_ce(self):
4190         """Test 464XLAT CE with NAT44EI"""
4191
4192         self.configure_xlat()
4193
4194         flags = self.config_flags.NAT44_EI_IF_INSIDE
4195         self.vapi.nat44_ei_interface_add_del_feature(
4196             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4197         )
4198         self.vapi.nat44_ei_add_del_address_range(
4199             first_ip_address=self.nat_addr_n,
4200             last_ip_address=self.nat_addr_n,
4201             vrf_id=0xFFFFFFFF,
4202             is_add=1,
4203         )
4204
4205         out_src_ip6 = self.compose_ip6(
4206             self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len
4207         )
4208         out_dst_ip6 = self.compose_ip6(
4209             self.nat_addr, self.src_ip6_pfx, self.src_ip6_pfx_len
4210         )
4211
4212         try:
4213             pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4214             self.pg0.add_stream(pkts)
4215             self.pg_enable_capture(self.pg_interfaces)
4216             self.pg_start()
4217             capture = self.pg1.get_capture(len(pkts))
4218             self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6, dst_ip=out_src_ip6)
4219
4220             pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4221             self.pg1.add_stream(pkts)
4222             self.pg_enable_capture(self.pg_interfaces)
4223             self.pg_start()
4224             capture = self.pg0.get_capture(len(pkts))
4225             self.verify_capture_in(capture, self.pg0)
4226         finally:
4227             self.vapi.nat44_ei_interface_add_del_feature(
4228                 sw_if_index=self.pg0.sw_if_index, flags=flags
4229             )
4230             self.vapi.nat44_ei_add_del_address_range(
4231                 first_ip_address=self.nat_addr_n,
4232                 last_ip_address=self.nat_addr_n,
4233                 vrf_id=0xFFFFFFFF,
4234             )
4235
4236     @unittest.skip("Temporary disabled")
4237     def test_464xlat_ce_no_nat(self):
4238         """Test 464XLAT CE without NAT44EI"""
4239
4240         self.configure_xlat()
4241
4242         out_src_ip6 = self.compose_ip6(
4243             self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len
4244         )
4245         out_dst_ip6 = self.compose_ip6(
4246             self.pg0.remote_ip4, self.src_ip6_pfx, self.src_ip6_pfx_len
4247         )
4248
4249         pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4250         self.pg0.add_stream(pkts)
4251         self.pg_enable_capture(self.pg_interfaces)
4252         self.pg_start()
4253         capture = self.pg1.get_capture(len(pkts))
4254         self.verify_capture_out_ip6(
4255             capture, dst_ip=out_src_ip6, nat_ip=out_dst_ip6, same_port=True
4256         )
4257
4258         pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4259         self.pg1.add_stream(pkts)
4260         self.pg_enable_capture(self.pg_interfaces)
4261         self.pg_start()
4262         capture = self.pg0.get_capture(len(pkts))
4263         self.verify_capture_in(capture, self.pg0)
4264
4265
4266 class TestNAT44EIMW(MethodHolder):
4267     """NAT44EI Test Cases (multiple workers)"""
4268
4269     vpp_worker_count = 2
4270     max_translations = 10240
4271     max_users = 10240
4272
4273     @classmethod
4274     def setUpClass(cls):
4275         super(TestNAT44EIMW, cls).setUpClass()
4276         cls.vapi.cli("set log class nat level debug")
4277
4278         cls.tcp_port_in = 6303
4279         cls.tcp_port_out = 6303
4280         cls.udp_port_in = 6304
4281         cls.udp_port_out = 6304
4282         cls.icmp_id_in = 6305
4283         cls.icmp_id_out = 6305
4284         cls.nat_addr = "10.0.0.3"
4285         cls.ipfix_src_port = 4739
4286         cls.ipfix_domain_id = 1
4287         cls.tcp_external_port = 80
4288         cls.udp_external_port = 69
4289
4290         cls.create_pg_interfaces(range(10))
4291         cls.interfaces = list(cls.pg_interfaces[0:4])
4292
4293         for i in cls.interfaces:
4294             i.admin_up()
4295             i.config_ip4()
4296             i.resolve_arp()
4297
4298         cls.pg0.generate_remote_hosts(3)
4299         cls.pg0.configure_ipv4_neighbors()
4300
4301         cls.pg1.generate_remote_hosts(1)
4302         cls.pg1.configure_ipv4_neighbors()
4303
4304         cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
4305         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10})
4306         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20})
4307
4308         cls.pg4._local_ip4 = "172.16.255.1"
4309         cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
4310         cls.pg4.set_table_ip4(10)
4311         cls.pg5._local_ip4 = "172.17.255.3"
4312         cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
4313         cls.pg5.set_table_ip4(10)
4314         cls.pg6._local_ip4 = "172.16.255.1"
4315         cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
4316         cls.pg6.set_table_ip4(20)
4317         for i in cls.overlapping_interfaces:
4318             i.config_ip4()
4319             i.admin_up()
4320             i.resolve_arp()
4321
4322         cls.pg7.admin_up()
4323         cls.pg8.admin_up()
4324
4325         cls.pg9.generate_remote_hosts(2)
4326         cls.pg9.config_ip4()
4327         cls.vapi.sw_interface_add_del_address(
4328             sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24"
4329         )
4330
4331         cls.pg9.admin_up()
4332         cls.pg9.resolve_arp()
4333         cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
4334         cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
4335         cls.pg9.resolve_arp()
4336
4337     def setUp(self):
4338         super(TestNAT44EIMW, self).setUp()
4339         self.vapi.nat44_ei_plugin_enable_disable(
4340             sessions=self.max_translations, users=self.max_users, enable=1
4341         )
4342
4343     def tearDown(self):
4344         super(TestNAT44EIMW, self).tearDown()
4345         if not self.vpp_dead:
4346             self.vapi.nat44_ei_ipfix_enable_disable(
4347                 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
4348             )
4349             self.ipfix_src_port = 4739
4350             self.ipfix_domain_id = 1
4351
4352             self.vapi.nat44_ei_plugin_enable_disable(enable=0)
4353             self.vapi.cli("clear logging")
4354
4355     def test_hairpinning(self):
4356         """NAT44EI hairpinning - 1:1 NAPT"""
4357
4358         host = self.pg0.remote_hosts[0]
4359         server = self.pg0.remote_hosts[1]
4360         host_in_port = 1234
4361         host_out_port = 0
4362         server_in_port = 5678
4363         server_out_port = 8765
4364         worker_1 = 1
4365         worker_2 = 2
4366
4367         self.nat44_add_address(self.nat_addr)
4368         flags = self.config_flags.NAT44_EI_IF_INSIDE
4369         self.vapi.nat44_ei_interface_add_del_feature(
4370             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4371         )
4372         self.vapi.nat44_ei_interface_add_del_feature(
4373             sw_if_index=self.pg1.sw_if_index, is_add=1
4374         )
4375
4376         # add static mapping for server
4377         self.nat44_add_static_mapping(
4378             server.ip4,
4379             self.nat_addr,
4380             server_in_port,
4381             server_out_port,
4382             proto=IP_PROTOS.tcp,
4383         )
4384
4385         cnt = self.statistics["/nat44-ei/hairpinning"]
4386         # send packet from host to server
4387         p = (
4388             Ether(src=host.mac, dst=self.pg0.local_mac)
4389             / IP(src=host.ip4, dst=self.nat_addr)
4390             / TCP(sport=host_in_port, dport=server_out_port)
4391         )
4392         self.pg0.add_stream(p)
4393         self.pg_enable_capture(self.pg_interfaces)
4394         self.pg_start()
4395         capture = self.pg0.get_capture(1)
4396         p = capture[0]
4397         try:
4398             ip = p[IP]
4399             tcp = p[TCP]
4400             self.assertEqual(ip.src, self.nat_addr)
4401             self.assertEqual(ip.dst, server.ip4)
4402             self.assertNotEqual(tcp.sport, host_in_port)
4403             self.assertEqual(tcp.dport, server_in_port)
4404             self.assert_packet_checksums_valid(p)
4405             host_out_port = tcp.sport
4406         except:
4407             self.logger.error(ppp("Unexpected or invalid packet:", p))
4408             raise
4409
4410         after = self.statistics["/nat44-ei/hairpinning"]
4411
4412         if_idx = self.pg0.sw_if_index
4413         self.assertEqual(after[worker_2][if_idx] - cnt[worker_1][if_idx], 1)
4414
4415         # send reply from server to host
4416         p = (
4417             Ether(src=server.mac, dst=self.pg0.local_mac)
4418             / IP(src=server.ip4, dst=self.nat_addr)
4419             / TCP(sport=server_in_port, dport=host_out_port)
4420         )
4421         self.pg0.add_stream(p)
4422         self.pg_enable_capture(self.pg_interfaces)
4423         self.pg_start()
4424         capture = self.pg0.get_capture(1)
4425         p = capture[0]
4426         try:
4427             ip = p[IP]
4428             tcp = p[TCP]
4429             self.assertEqual(ip.src, self.nat_addr)
4430             self.assertEqual(ip.dst, host.ip4)
4431             self.assertEqual(tcp.sport, server_out_port)
4432             self.assertEqual(tcp.dport, host_in_port)
4433             self.assert_packet_checksums_valid(p)
4434         except:
4435             self.logger.error(ppp("Unexpected or invalid packet:", p))
4436             raise
4437
4438         after = self.statistics["/nat44-ei/hairpinning"]
4439         if_idx = self.pg0.sw_if_index
4440         self.assertEqual(after[worker_1][if_idx] - cnt[worker_1][if_idx], 1)
4441         self.assertEqual(after[worker_2][if_idx] - cnt[worker_2][if_idx], 2)
4442
4443     def test_hairpinning2(self):
4444         """NAT44EI hairpinning - 1:1 NAT"""
4445
4446         server1_nat_ip = "10.0.0.10"
4447         server2_nat_ip = "10.0.0.11"
4448         host = self.pg0.remote_hosts[0]
4449         server1 = self.pg0.remote_hosts[1]
4450         server2 = self.pg0.remote_hosts[2]
4451         server_tcp_port = 22
4452         server_udp_port = 20
4453
4454         self.nat44_add_address(self.nat_addr)
4455         flags = self.config_flags.NAT44_EI_IF_INSIDE
4456         self.vapi.nat44_ei_interface_add_del_feature(
4457             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4458         )
4459         self.vapi.nat44_ei_interface_add_del_feature(
4460             sw_if_index=self.pg1.sw_if_index, is_add=1
4461         )
4462
4463         # add static mapping for servers
4464         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
4465         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
4466
4467         # host to server1
4468         pkts = []
4469         p = (
4470             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4471             / IP(src=host.ip4, dst=server1_nat_ip)
4472             / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
4473         )
4474         pkts.append(p)
4475         p = (
4476             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4477             / IP(src=host.ip4, dst=server1_nat_ip)
4478             / UDP(sport=self.udp_port_in, dport=server_udp_port)
4479         )
4480         pkts.append(p)
4481         p = (
4482             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4483             / IP(src=host.ip4, dst=server1_nat_ip)
4484             / ICMP(id=self.icmp_id_in, type="echo-request")
4485         )
4486         pkts.append(p)
4487         self.pg0.add_stream(pkts)
4488         self.pg_enable_capture(self.pg_interfaces)
4489         self.pg_start()
4490         capture = self.pg0.get_capture(len(pkts))
4491         for packet in capture:
4492             try:
4493                 self.assertEqual(packet[IP].src, self.nat_addr)
4494                 self.assertEqual(packet[IP].dst, server1.ip4)
4495                 if packet.haslayer(TCP):
4496                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
4497                     self.assertEqual(packet[TCP].dport, server_tcp_port)
4498                     self.tcp_port_out = packet[TCP].sport
4499                     self.assert_packet_checksums_valid(packet)
4500                 elif packet.haslayer(UDP):
4501                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
4502                     self.assertEqual(packet[UDP].dport, server_udp_port)
4503                     self.udp_port_out = packet[UDP].sport
4504                 else:
4505                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
4506                     self.icmp_id_out = packet[ICMP].id
4507             except:
4508                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4509                 raise
4510
4511         # server1 to host
4512         pkts = []
4513         p = (
4514             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4515             / IP(src=server1.ip4, dst=self.nat_addr)
4516             / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
4517         )
4518         pkts.append(p)
4519         p = (
4520             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4521             / IP(src=server1.ip4, dst=self.nat_addr)
4522             / UDP(sport=server_udp_port, dport=self.udp_port_out)
4523         )
4524         pkts.append(p)
4525         p = (
4526             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4527             / IP(src=server1.ip4, dst=self.nat_addr)
4528             / ICMP(id=self.icmp_id_out, type="echo-reply")
4529         )
4530         pkts.append(p)
4531         self.pg0.add_stream(pkts)
4532         self.pg_enable_capture(self.pg_interfaces)
4533         self.pg_start()
4534         capture = self.pg0.get_capture(len(pkts))
4535         for packet in capture:
4536             try:
4537                 self.assertEqual(packet[IP].src, server1_nat_ip)
4538                 self.assertEqual(packet[IP].dst, host.ip4)
4539                 if packet.haslayer(TCP):
4540                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
4541                     self.assertEqual(packet[TCP].sport, server_tcp_port)
4542                     self.assert_packet_checksums_valid(packet)
4543                 elif packet.haslayer(UDP):
4544                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
4545                     self.assertEqual(packet[UDP].sport, server_udp_port)
4546                 else:
4547                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4548             except:
4549                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4550                 raise
4551
4552         # server2 to server1
4553         pkts = []
4554         p = (
4555             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4556             / IP(src=server2.ip4, dst=server1_nat_ip)
4557             / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
4558         )
4559         pkts.append(p)
4560         p = (
4561             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4562             / IP(src=server2.ip4, dst=server1_nat_ip)
4563             / UDP(sport=self.udp_port_in, dport=server_udp_port)
4564         )
4565         pkts.append(p)
4566         p = (
4567             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4568             / IP(src=server2.ip4, dst=server1_nat_ip)
4569             / ICMP(id=self.icmp_id_in, type="echo-request")
4570         )
4571         pkts.append(p)
4572         self.pg0.add_stream(pkts)
4573         self.pg_enable_capture(self.pg_interfaces)
4574         self.pg_start()
4575         capture = self.pg0.get_capture(len(pkts))
4576         for packet in capture:
4577             try:
4578                 self.assertEqual(packet[IP].src, server2_nat_ip)
4579                 self.assertEqual(packet[IP].dst, server1.ip4)
4580                 if packet.haslayer(TCP):
4581                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
4582                     self.assertEqual(packet[TCP].dport, server_tcp_port)
4583                     self.tcp_port_out = packet[TCP].sport
4584                     self.assert_packet_checksums_valid(packet)
4585                 elif packet.haslayer(UDP):
4586                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
4587                     self.assertEqual(packet[UDP].dport, server_udp_port)
4588                     self.udp_port_out = packet[UDP].sport
4589                 else:
4590                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4591                     self.icmp_id_out = packet[ICMP].id
4592             except:
4593                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4594                 raise
4595
4596         # server1 to server2
4597         pkts = []
4598         p = (
4599             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4600             / IP(src=server1.ip4, dst=server2_nat_ip)
4601             / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
4602         )
4603         pkts.append(p)
4604         p = (
4605             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4606             / IP(src=server1.ip4, dst=server2_nat_ip)
4607             / UDP(sport=server_udp_port, dport=self.udp_port_out)
4608         )
4609         pkts.append(p)
4610         p = (
4611             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4612             / IP(src=server1.ip4, dst=server2_nat_ip)
4613             / ICMP(id=self.icmp_id_out, type="echo-reply")
4614         )
4615         pkts.append(p)
4616         self.pg0.add_stream(pkts)
4617         self.pg_enable_capture(self.pg_interfaces)
4618         self.pg_start()
4619         capture = self.pg0.get_capture(len(pkts))
4620         for packet in capture:
4621             try:
4622                 self.assertEqual(packet[IP].src, server1_nat_ip)
4623                 self.assertEqual(packet[IP].dst, server2.ip4)
4624                 if packet.haslayer(TCP):
4625                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
4626                     self.assertEqual(packet[TCP].sport, server_tcp_port)
4627                     self.assert_packet_checksums_valid(packet)
4628                 elif packet.haslayer(UDP):
4629                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
4630                     self.assertEqual(packet[UDP].sport, server_udp_port)
4631                 else:
4632                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4633             except:
4634                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4635                 raise
4636
4637
4638 if __name__ == "__main__":
4639     unittest.main(testRunner=VppTestRunner)