tests docs: update python3 venv packages
[vpp.git] / test / test_nat44_ei.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import random
5 import socket
6 import struct
7 import unittest
8 from io import BytesIO
9
10 import scapy.compat
11 from framework import tag_fixme_debian11, is_distro_debian11
12 from framework import VppTestCase, VppTestRunner, VppLoInterface
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from scapy.all import (
15     bind_layers,
16     Packet,
17     ByteEnumField,
18     ShortField,
19     IPField,
20     IntField,
21     LongField,
22     XByteField,
23     FlagsField,
24     FieldLenField,
25     PacketListField,
26 )
27 from scapy.data import IP_PROTOS
28 from scapy.layers.inet import IP, TCP, UDP, ICMP
29 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
30 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
31 from scapy.layers.l2 import Ether, ARP, GRE
32 from scapy.packet import Raw
33 from syslog_rfc5424_parser import SyslogMessage, ParseError
34 from syslog_rfc5424_parser.constants import SyslogSeverity
35 from util import ppp
36 from vpp_ip_route import VppIpRoute, VppRoutePath
37 from vpp_neighbor import VppNeighbor
38 from vpp_papi import VppEnum
39
40
41 # NAT HA protocol event data
42 class Event(Packet):
43     name = "Event"
44     fields_desc = [
45         ByteEnumField("event_type", None, {1: "add", 2: "del", 3: "refresh"}),
46         ByteEnumField("protocol", None, {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}),
47         ShortField("flags", 0),
48         IPField("in_addr", None),
49         IPField("out_addr", None),
50         ShortField("in_port", None),
51         ShortField("out_port", None),
52         IPField("eh_addr", None),
53         IPField("ehn_addr", None),
54         ShortField("eh_port", None),
55         ShortField("ehn_port", None),
56         IntField("fib_index", None),
57         IntField("total_pkts", 0),
58         LongField("total_bytes", 0),
59     ]
60
61     def extract_padding(self, s):
62         return "", s
63
64
65 # NAT HA protocol header
66 class HANATStateSync(Packet):
67     name = "HA NAT state sync"
68     fields_desc = [
69         XByteField("version", 1),
70         FlagsField("flags", 0, 8, ["ACK"]),
71         FieldLenField("count", None, count_of="events"),
72         IntField("sequence_number", 1),
73         IntField("thread_index", 0),
74         PacketListField("events", [], Event, count_from=lambda pkt: pkt.count),
75     ]
76
77
78 class MethodHolder(VppTestCase):
79     """NAT create capture and verify method holder"""
80
81     @property
82     def config_flags(self):
83         return VppEnum.vl_api_nat44_ei_config_flags_t
84
85     @property
86     def SYSLOG_SEVERITY(self):
87         return VppEnum.vl_api_syslog_severity_t
88
89     def nat44_add_static_mapping(
90         self,
91         local_ip,
92         external_ip="0.0.0.0",
93         local_port=0,
94         external_port=0,
95         vrf_id=0,
96         is_add=1,
97         external_sw_if_index=0xFFFFFFFF,
98         proto=0,
99         tag="",
100         flags=0,
101     ):
102         """
103         Add/delete NAT44EI static mapping
104
105         :param local_ip: Local IP address
106         :param external_ip: External IP address
107         :param local_port: Local port number (Optional)
108         :param external_port: External port number (Optional)
109         :param vrf_id: VRF ID (Default 0)
110         :param is_add: 1 if add, 0 if delete (Default add)
111         :param external_sw_if_index: External interface instead of IP address
112         :param proto: IP protocol (Mandatory if port specified)
113         :param tag: Opaque string tag
114         :param flags: NAT configuration flags
115         """
116
117         if not (local_port and external_port):
118             flags |= self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
119
120         self.vapi.nat44_ei_add_del_static_mapping(
121             is_add=is_add,
122             local_ip_address=local_ip,
123             external_ip_address=external_ip,
124             external_sw_if_index=external_sw_if_index,
125             local_port=local_port,
126             external_port=external_port,
127             vrf_id=vrf_id,
128             protocol=proto,
129             flags=flags,
130             tag=tag,
131         )
132
133     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
134         """
135         Add/delete NAT44EI address
136
137         :param ip: IP address
138         :param is_add: 1 if add, 0 if delete (Default add)
139         """
140         self.vapi.nat44_ei_add_del_address_range(
141             first_ip_address=ip, last_ip_address=ip, vrf_id=vrf_id, is_add=is_add
142         )
143
144     def create_routes_and_neigbors(self):
145         r1 = VppIpRoute(
146             self,
147             self.pg7.remote_ip4,
148             32,
149             [VppRoutePath(self.pg7.remote_ip4, self.pg7.sw_if_index)],
150         )
151         r2 = VppIpRoute(
152             self,
153             self.pg8.remote_ip4,
154             32,
155             [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)],
156         )
157         r1.add_vpp_config()
158         r2.add_vpp_config()
159
160         n1 = VppNeighbor(
161             self,
162             self.pg7.sw_if_index,
163             self.pg7.remote_mac,
164             self.pg7.remote_ip4,
165             is_static=1,
166         )
167         n2 = VppNeighbor(
168             self,
169             self.pg8.sw_if_index,
170             self.pg8.remote_mac,
171             self.pg8.remote_ip4,
172             is_static=1,
173         )
174         n1.add_vpp_config()
175         n2.add_vpp_config()
176
177     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
178         """
179         Create packet stream for inside network
180
181         :param in_if: Inside interface
182         :param out_if: Outside interface
183         :param dst_ip: Destination address
184         :param ttl: TTL of generated packets
185         """
186         if dst_ip is None:
187             dst_ip = out_if.remote_ip4
188
189         pkts = []
190         # TCP
191         p = (
192             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
193             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
194             / TCP(sport=self.tcp_port_in, dport=20)
195         )
196         pkts.extend([p, p])
197
198         # UDP
199         p = (
200             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
201             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
202             / UDP(sport=self.udp_port_in, dport=20)
203         )
204         pkts.append(p)
205
206         # ICMP
207         p = (
208             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
209             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
210             / ICMP(id=self.icmp_id_in, type="echo-request")
211         )
212         pkts.append(p)
213
214         return pkts
215
216     def compose_ip6(self, ip4, pref, plen):
217         """
218         Compose IPv4-embedded IPv6 addresses
219
220         :param ip4: IPv4 address
221         :param pref: IPv6 prefix
222         :param plen: IPv6 prefix length
223         :returns: IPv4-embedded IPv6 addresses
224         """
225         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
226         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
227         if plen == 32:
228             pref_n[4] = ip4_n[0]
229             pref_n[5] = ip4_n[1]
230             pref_n[6] = ip4_n[2]
231             pref_n[7] = ip4_n[3]
232         elif plen == 40:
233             pref_n[5] = ip4_n[0]
234             pref_n[6] = ip4_n[1]
235             pref_n[7] = ip4_n[2]
236             pref_n[9] = ip4_n[3]
237         elif plen == 48:
238             pref_n[6] = ip4_n[0]
239             pref_n[7] = ip4_n[1]
240             pref_n[9] = ip4_n[2]
241             pref_n[10] = ip4_n[3]
242         elif plen == 56:
243             pref_n[7] = ip4_n[0]
244             pref_n[9] = ip4_n[1]
245             pref_n[10] = ip4_n[2]
246             pref_n[11] = ip4_n[3]
247         elif plen == 64:
248             pref_n[9] = ip4_n[0]
249             pref_n[10] = ip4_n[1]
250             pref_n[11] = ip4_n[2]
251             pref_n[12] = ip4_n[3]
252         elif plen == 96:
253             pref_n[12] = ip4_n[0]
254             pref_n[13] = ip4_n[1]
255             pref_n[14] = ip4_n[2]
256             pref_n[15] = ip4_n[3]
257         packed_pref_n = b"".join([scapy.compat.chb(x) for x in pref_n])
258         return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
259
260     def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
261         """
262         Create packet stream for outside network
263
264         :param out_if: Outside interface
265         :param dst_ip: Destination IP address (Default use global NAT address)
266         :param ttl: TTL of generated packets
267         :param use_inside_ports: Use inside NAT ports as destination ports
268                instead of outside ports
269         """
270         if dst_ip is None:
271             dst_ip = self.nat_addr
272         if not use_inside_ports:
273             tcp_port = self.tcp_port_out
274             udp_port = self.udp_port_out
275             icmp_id = self.icmp_id_out
276         else:
277             tcp_port = self.tcp_port_in
278             udp_port = self.udp_port_in
279             icmp_id = self.icmp_id_in
280         pkts = []
281         # TCP
282         p = (
283             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
284             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
285             / TCP(dport=tcp_port, sport=20)
286         )
287         pkts.extend([p, p])
288
289         # UDP
290         p = (
291             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
292             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
293             / UDP(dport=udp_port, sport=20)
294         )
295         pkts.append(p)
296
297         # ICMP
298         p = (
299             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
300             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
301             / ICMP(id=icmp_id, type="echo-reply")
302         )
303         pkts.append(p)
304
305         return pkts
306
307     def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
308         """
309         Create packet stream for outside network
310
311         :param out_if: Outside interface
312         :param dst_ip: Destination IP address (Default use global NAT address)
313         :param hl: HL of generated packets
314         """
315         pkts = []
316         # TCP
317         p = (
318             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
319             / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
320             / TCP(dport=self.tcp_port_out, sport=20)
321         )
322         pkts.append(p)
323
324         # UDP
325         p = (
326             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
327             / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
328             / UDP(dport=self.udp_port_out, sport=20)
329         )
330         pkts.append(p)
331
332         # ICMP
333         p = (
334             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
335             / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
336             / ICMPv6EchoReply(id=self.icmp_id_out)
337         )
338         pkts.append(p)
339
340         return pkts
341
342     def verify_capture_out(
343         self,
344         capture,
345         nat_ip=None,
346         same_port=False,
347         dst_ip=None,
348         is_ip6=False,
349         ignore_port=False,
350     ):
351         """
352         Verify captured packets on outside network
353
354         :param capture: Captured packets
355         :param nat_ip: Translated IP address (Default use global NAT address)
356         :param same_port: Source port number is not translated (Default False)
357         :param dst_ip: Destination IP address (Default do not verify)
358         :param is_ip6: If L3 protocol is IPv6 (Default False)
359         """
360         if is_ip6:
361             IP46 = IPv6
362             ICMP46 = ICMPv6EchoRequest
363         else:
364             IP46 = IP
365             ICMP46 = ICMP
366         if nat_ip is None:
367             nat_ip = self.nat_addr
368         for packet in capture:
369             try:
370                 if not is_ip6:
371                     self.assert_packet_checksums_valid(packet)
372                 self.assertEqual(packet[IP46].src, nat_ip)
373                 if dst_ip is not None:
374                     self.assertEqual(packet[IP46].dst, dst_ip)
375                 if packet.haslayer(TCP):
376                     if not ignore_port:
377                         if same_port:
378                             self.assertEqual(packet[TCP].sport, self.tcp_port_in)
379                         else:
380                             self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
381                     self.tcp_port_out = packet[TCP].sport
382                     self.assert_packet_checksums_valid(packet)
383                 elif packet.haslayer(UDP):
384                     if not ignore_port:
385                         if same_port:
386                             self.assertEqual(packet[UDP].sport, self.udp_port_in)
387                         else:
388                             self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
389                     self.udp_port_out = packet[UDP].sport
390                 else:
391                     if not ignore_port:
392                         if same_port:
393                             self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
394                         else:
395                             self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
396                     self.icmp_id_out = packet[ICMP46].id
397                     self.assert_packet_checksums_valid(packet)
398             except:
399                 self.logger.error(
400                     ppp("Unexpected or invalid packet (outside network):", packet)
401                 )
402                 raise
403
404     def verify_capture_out_ip6(self, capture, nat_ip, same_port=False, dst_ip=None):
405         """
406         Verify captured packets on outside network
407
408         :param capture: Captured packets
409         :param nat_ip: Translated IP address
410         :param same_port: Source port number is not translated (Default False)
411         :param dst_ip: Destination IP address (Default do not verify)
412         """
413         return self.verify_capture_out(capture, nat_ip, same_port, dst_ip, True)
414
415     def verify_capture_in(self, capture, in_if):
416         """
417         Verify captured packets on inside network
418
419         :param capture: Captured packets
420         :param in_if: Inside interface
421         """
422         for packet in capture:
423             try:
424                 self.assert_packet_checksums_valid(packet)
425                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
426                 if packet.haslayer(TCP):
427                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
428                 elif packet.haslayer(UDP):
429                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
430                 else:
431                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
432             except:
433                 self.logger.error(
434                     ppp("Unexpected or invalid packet (inside network):", packet)
435                 )
436                 raise
437
438     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
439         """
440         Verify captured packet that don't have to be translated
441
442         :param capture: Captured packets
443         :param ingress_if: Ingress interface
444         :param egress_if: Egress interface
445         """
446         for packet in capture:
447             try:
448                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
449                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
450                 if packet.haslayer(TCP):
451                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
452                 elif packet.haslayer(UDP):
453                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
454                 else:
455                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
456             except:
457                 self.logger.error(
458                     ppp("Unexpected or invalid packet (inside network):", packet)
459                 )
460                 raise
461
462     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None, icmp_type=11):
463         """
464         Verify captured packets with ICMP errors on outside network
465
466         :param capture: Captured packets
467         :param src_ip: Translated IP address or IP address of VPP
468                        (Default use global NAT address)
469         :param icmp_type: Type of error ICMP packet
470                           we are expecting (Default 11)
471         """
472         if src_ip is None:
473             src_ip = self.nat_addr
474         for packet in capture:
475             try:
476                 self.assertEqual(packet[IP].src, src_ip)
477                 self.assertEqual(packet.haslayer(ICMP), 1)
478                 icmp = packet[ICMP]
479                 self.assertEqual(icmp.type, icmp_type)
480                 self.assertTrue(icmp.haslayer(IPerror))
481                 inner_ip = icmp[IPerror]
482                 if inner_ip.haslayer(TCPerror):
483                     self.assertEqual(inner_ip[TCPerror].dport, self.tcp_port_out)
484                 elif inner_ip.haslayer(UDPerror):
485                     self.assertEqual(inner_ip[UDPerror].dport, self.udp_port_out)
486                 else:
487                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
488             except:
489                 self.logger.error(
490                     ppp("Unexpected or invalid packet (outside network):", packet)
491                 )
492                 raise
493
494     def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
495         """
496         Verify captured packets with ICMP errors on inside network
497
498         :param capture: Captured packets
499         :param in_if: Inside interface
500         :param icmp_type: Type of error ICMP packet
501                           we are expecting (Default 11)
502         """
503         for packet in capture:
504             try:
505                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
506                 self.assertEqual(packet.haslayer(ICMP), 1)
507                 icmp = packet[ICMP]
508                 self.assertEqual(icmp.type, icmp_type)
509                 self.assertTrue(icmp.haslayer(IPerror))
510                 inner_ip = icmp[IPerror]
511                 if inner_ip.haslayer(TCPerror):
512                     self.assertEqual(inner_ip[TCPerror].sport, self.tcp_port_in)
513                 elif inner_ip.haslayer(UDPerror):
514                     self.assertEqual(inner_ip[UDPerror].sport, self.udp_port_in)
515                 else:
516                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
517             except:
518                 self.logger.error(
519                     ppp("Unexpected or invalid packet (inside network):", packet)
520                 )
521                 raise
522
523     def create_stream_frag(
524         self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
525     ):
526         """
527         Create fragmented packet stream
528
529         :param src_if: Source interface
530         :param dst: Destination IPv4 address
531         :param sport: Source port
532         :param dport: Destination port
533         :param data: Payload data
534         :param proto: protocol (TCP, UDP, ICMP)
535         :param echo_reply: use echo_reply if protocol is ICMP
536         :returns: Fragments
537         """
538         if proto == IP_PROTOS.tcp:
539             p = (
540                 IP(src=src_if.remote_ip4, dst=dst)
541                 / TCP(sport=sport, dport=dport)
542                 / Raw(data)
543             )
544             p = p.__class__(scapy.compat.raw(p))
545             chksum = p[TCP].chksum
546             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
547         elif proto == IP_PROTOS.udp:
548             proto_header = UDP(sport=sport, dport=dport)
549         elif proto == IP_PROTOS.icmp:
550             if not echo_reply:
551                 proto_header = ICMP(id=sport, type="echo-request")
552             else:
553                 proto_header = ICMP(id=sport, type="echo-reply")
554         else:
555             raise Exception("Unsupported protocol")
556         id = random.randint(0, 65535)
557         pkts = []
558         if proto == IP_PROTOS.tcp:
559             raw = Raw(data[0:4])
560         else:
561             raw = Raw(data[0:16])
562         p = (
563             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
564             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
565             / proto_header
566             / raw
567         )
568         pkts.append(p)
569         if proto == IP_PROTOS.tcp:
570             raw = Raw(data[4:20])
571         else:
572             raw = Raw(data[16:32])
573         p = (
574             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
575             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
576             / raw
577         )
578         pkts.append(p)
579         if proto == IP_PROTOS.tcp:
580             raw = Raw(data[20:])
581         else:
582             raw = Raw(data[32:])
583         p = (
584             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
585             / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
586             / raw
587         )
588         pkts.append(p)
589         return pkts
590
591     def reass_frags_and_verify(self, frags, src, dst):
592         """
593         Reassemble and verify fragmented packet
594
595         :param frags: Captured fragments
596         :param src: Source IPv4 address to verify
597         :param dst: Destination IPv4 address to verify
598
599         :returns: Reassembled IPv4 packet
600         """
601         buffer = BytesIO()
602         for p in frags:
603             self.assertEqual(p[IP].src, src)
604             self.assertEqual(p[IP].dst, dst)
605             self.assert_ip_checksum_valid(p)
606             buffer.seek(p[IP].frag * 8)
607             buffer.write(bytes(p[IP].payload))
608         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
609         if ip.proto == IP_PROTOS.tcp:
610             p = ip / TCP(buffer.getvalue())
611             self.logger.debug(ppp("Reassembled:", p))
612             self.assert_tcp_checksum_valid(p)
613         elif ip.proto == IP_PROTOS.udp:
614             p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
615         elif ip.proto == IP_PROTOS.icmp:
616             p = ip / ICMP(buffer.getvalue())
617         return p
618
619     def verify_ipfix_nat44_ses(self, data):
620         """
621         Verify IPFIX NAT44EI session create/delete event
622
623         :param data: Decoded IPFIX data records
624         """
625         nat44_ses_create_num = 0
626         nat44_ses_delete_num = 0
627         self.assertEqual(6, len(data))
628         for record in data:
629             # natEvent
630             self.assertIn(scapy.compat.orb(record[230]), [4, 5])
631             if scapy.compat.orb(record[230]) == 4:
632                 nat44_ses_create_num += 1
633             else:
634                 nat44_ses_delete_num += 1
635             # sourceIPv4Address
636             self.assertEqual(self.pg0.remote_ip4, str(ipaddress.IPv4Address(record[8])))
637             # postNATSourceIPv4Address
638             self.assertEqual(
639                 socket.inet_pton(socket.AF_INET, self.nat_addr), record[225]
640             )
641             # ingressVRFID
642             self.assertEqual(struct.pack("!I", 0), record[234])
643             # protocolIdentifier/sourceTransportPort
644             # /postNAPTSourceTransportPort
645             if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
646                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
647                 self.assertEqual(struct.pack("!H", self.icmp_id_out), record[227])
648             elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
649                 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
650                 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
651             elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
652                 self.assertEqual(struct.pack("!H", self.udp_port_in), record[7])
653                 self.assertEqual(struct.pack("!H", self.udp_port_out), record[227])
654             else:
655                 self.fail(f"Invalid protocol {scapy.compat.orb(record[4])}")
656         self.assertEqual(3, nat44_ses_create_num)
657         self.assertEqual(3, nat44_ses_delete_num)
658
659     def verify_ipfix_addr_exhausted(self, data):
660         self.assertEqual(1, len(data))
661         record = data[0]
662         # natEvent
663         self.assertEqual(scapy.compat.orb(record[230]), 3)
664         # natPoolID
665         self.assertEqual(struct.pack("!I", 0), record[283])
666
667     def verify_ipfix_max_sessions(self, data, limit):
668         self.assertEqual(1, len(data))
669         record = data[0]
670         # natEvent
671         self.assertEqual(scapy.compat.orb(record[230]), 13)
672         # natQuotaExceededEvent
673         self.assertEqual(struct.pack("!I", 1), record[466])
674         # maxSessionEntries
675         self.assertEqual(struct.pack("!I", limit), record[471])
676
677     def verify_no_nat44_user(self):
678         """Verify that there is no NAT44EI user"""
679         users = self.vapi.nat44_ei_user_dump()
680         self.assertEqual(len(users), 0)
681         users = self.statistics["/nat44-ei/total-users"]
682         self.assertEqual(users[0][0], 0)
683         sessions = self.statistics["/nat44-ei/total-sessions"]
684         self.assertEqual(sessions[0][0], 0)
685
686     def verify_syslog_apmap(self, data, is_add=True):
687         message = data.decode("utf-8")
688         try:
689             message = SyslogMessage.parse(message)
690         except ParseError as e:
691             self.logger.error(e)
692             raise
693         else:
694             self.assertEqual(message.severity, SyslogSeverity.info)
695             self.assertEqual(message.appname, "NAT")
696             self.assertEqual(message.msgid, "APMADD" if is_add else "APMDEL")
697             sd_params = message.sd.get("napmap")
698             self.assertTrue(sd_params is not None)
699             self.assertEqual(sd_params.get("IATYP"), "IPv4")
700             self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
701             self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
702             self.assertEqual(sd_params.get("XATYP"), "IPv4")
703             self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
704             self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
705             self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
706             self.assertTrue(sd_params.get("SSUBIX") is not None)
707             self.assertEqual(sd_params.get("SVLAN"), "0")
708
709     def verify_mss_value(self, pkt, mss):
710         if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
711             raise TypeError("Not a TCP/IP packet")
712
713         for option in pkt[TCP].options:
714             if option[0] == "MSS":
715                 self.assertEqual(option[1], mss)
716                 self.assert_tcp_checksum_valid(pkt)
717
718     @staticmethod
719     def proto2layer(proto):
720         if proto == IP_PROTOS.tcp:
721             return TCP
722         elif proto == IP_PROTOS.udp:
723             return UDP
724         elif proto == IP_PROTOS.icmp:
725             return ICMP
726         else:
727             raise Exception("Unsupported protocol")
728
729     def frag_in_order(
730         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
731     ):
732         layer = self.proto2layer(proto)
733
734         if proto == IP_PROTOS.tcp:
735             data = b"A" * 4 + b"B" * 16 + b"C" * 3
736         else:
737             data = b"A" * 16 + b"B" * 16 + b"C" * 3
738         self.port_in = random.randint(1025, 65535)
739
740         # in2out
741         pkts = self.create_stream_frag(
742             self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
743         )
744         self.pg0.add_stream(pkts)
745         self.pg_enable_capture(self.pg_interfaces)
746         self.pg_start()
747         frags = self.pg1.get_capture(len(pkts))
748         if not dont_translate:
749             p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
750         else:
751             p = self.reass_frags_and_verify(
752                 frags, self.pg0.remote_ip4, self.pg1.remote_ip4
753             )
754         if proto != IP_PROTOS.icmp:
755             if not dont_translate:
756                 self.assertEqual(p[layer].dport, 20)
757                 if not ignore_port:
758                     self.assertNotEqual(p[layer].sport, self.port_in)
759             else:
760                 self.assertEqual(p[layer].sport, self.port_in)
761         else:
762             if not ignore_port:
763                 if not dont_translate:
764                     self.assertNotEqual(p[layer].id, self.port_in)
765                 else:
766                     self.assertEqual(p[layer].id, self.port_in)
767         self.assertEqual(data, p[Raw].load)
768
769         # out2in
770         if not dont_translate:
771             dst_addr = self.nat_addr
772         else:
773             dst_addr = self.pg0.remote_ip4
774         if proto != IP_PROTOS.icmp:
775             sport = 20
776             dport = p[layer].sport
777         else:
778             sport = p[layer].id
779             dport = 0
780         pkts = self.create_stream_frag(
781             self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
782         )
783         self.pg1.add_stream(pkts)
784         self.pg_enable_capture(self.pg_interfaces)
785         self.pg_start()
786         frags = self.pg0.get_capture(len(pkts))
787         p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
788         if proto != IP_PROTOS.icmp:
789             self.assertEqual(p[layer].sport, 20)
790             self.assertEqual(p[layer].dport, self.port_in)
791         else:
792             self.assertEqual(p[layer].id, self.port_in)
793         self.assertEqual(data, p[Raw].load)
794
795     def reass_hairpinning(
796         self,
797         server_addr,
798         server_in_port,
799         server_out_port,
800         host_in_port,
801         proto=IP_PROTOS.tcp,
802         ignore_port=False,
803     ):
804         layer = self.proto2layer(proto)
805
806         if proto == IP_PROTOS.tcp:
807             data = b"A" * 4 + b"B" * 16 + b"C" * 3
808         else:
809             data = b"A" * 16 + b"B" * 16 + b"C" * 3
810
811         # send packet from host to server
812         pkts = self.create_stream_frag(
813             self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto
814         )
815         self.pg0.add_stream(pkts)
816         self.pg_enable_capture(self.pg_interfaces)
817         self.pg_start()
818         frags = self.pg0.get_capture(len(pkts))
819         p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr)
820         if proto != IP_PROTOS.icmp:
821             if not ignore_port:
822                 self.assertNotEqual(p[layer].sport, host_in_port)
823             self.assertEqual(p[layer].dport, server_in_port)
824         else:
825             if not ignore_port:
826                 self.assertNotEqual(p[layer].id, host_in_port)
827         self.assertEqual(data, p[Raw].load)
828
829     def frag_out_of_order(
830         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
831     ):
832         layer = self.proto2layer(proto)
833
834         if proto == IP_PROTOS.tcp:
835             data = b"A" * 4 + b"B" * 16 + b"C" * 3
836         else:
837             data = b"A" * 16 + b"B" * 16 + b"C" * 3
838         self.port_in = random.randint(1025, 65535)
839
840         for i in range(2):
841             # in2out
842             pkts = self.create_stream_frag(
843                 self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
844             )
845             pkts.reverse()
846             self.pg0.add_stream(pkts)
847             self.pg_enable_capture(self.pg_interfaces)
848             self.pg_start()
849             frags = self.pg1.get_capture(len(pkts))
850             if not dont_translate:
851                 p = self.reass_frags_and_verify(
852                     frags, self.nat_addr, self.pg1.remote_ip4
853                 )
854             else:
855                 p = self.reass_frags_and_verify(
856                     frags, self.pg0.remote_ip4, self.pg1.remote_ip4
857                 )
858             if proto != IP_PROTOS.icmp:
859                 if not dont_translate:
860                     self.assertEqual(p[layer].dport, 20)
861                     if not ignore_port:
862                         self.assertNotEqual(p[layer].sport, self.port_in)
863                 else:
864                     self.assertEqual(p[layer].sport, self.port_in)
865             else:
866                 if not ignore_port:
867                     if not dont_translate:
868                         self.assertNotEqual(p[layer].id, self.port_in)
869                     else:
870                         self.assertEqual(p[layer].id, self.port_in)
871             self.assertEqual(data, p[Raw].load)
872
873             # out2in
874             if not dont_translate:
875                 dst_addr = self.nat_addr
876             else:
877                 dst_addr = self.pg0.remote_ip4
878             if proto != IP_PROTOS.icmp:
879                 sport = 20
880                 dport = p[layer].sport
881             else:
882                 sport = p[layer].id
883                 dport = 0
884             pkts = self.create_stream_frag(
885                 self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
886             )
887             pkts.reverse()
888             self.pg1.add_stream(pkts)
889             self.pg_enable_capture(self.pg_interfaces)
890             self.pg_start()
891             frags = self.pg0.get_capture(len(pkts))
892             p = self.reass_frags_and_verify(
893                 frags, self.pg1.remote_ip4, self.pg0.remote_ip4
894             )
895             if proto != IP_PROTOS.icmp:
896                 self.assertEqual(p[layer].sport, 20)
897                 self.assertEqual(p[layer].dport, self.port_in)
898             else:
899                 self.assertEqual(p[layer].id, self.port_in)
900             self.assertEqual(data, p[Raw].load)
901
902
903 def get_nat44_ei_in2out_worker_index(ip, vpp_worker_count):
904     if 0 == vpp_worker_count:
905         return 0
906     numeric = socket.inet_aton(ip)
907     numeric = struct.unpack("!L", numeric)[0]
908     numeric = socket.htonl(numeric)
909     h = numeric + (numeric >> 8) + (numeric >> 16) + (numeric >> 24)
910     return 1 + h % vpp_worker_count
911
912
913 @tag_fixme_debian11
914 class TestNAT44EI(MethodHolder):
915     """NAT44EI Test Cases"""
916
917     max_translations = 10240
918     max_users = 10240
919
920     @classmethod
921     def setUpClass(cls):
922         super(TestNAT44EI, cls).setUpClass()
923         if is_distro_debian11 == True and not hasattr(cls, "vpp"):
924             return
925         cls.vapi.cli("set log class nat44-ei level debug")
926
927         cls.tcp_port_in = 6303
928         cls.tcp_port_out = 6303
929         cls.udp_port_in = 6304
930         cls.udp_port_out = 6304
931         cls.icmp_id_in = 6305
932         cls.icmp_id_out = 6305
933         cls.nat_addr = "10.0.0.3"
934         cls.ipfix_src_port = 4739
935         cls.ipfix_domain_id = 1
936         cls.tcp_external_port = 80
937         cls.udp_external_port = 69
938
939         cls.create_pg_interfaces(range(10))
940         cls.interfaces = list(cls.pg_interfaces[0:4])
941
942         for i in cls.interfaces:
943             i.admin_up()
944             i.config_ip4()
945             i.resolve_arp()
946
947         cls.pg0.generate_remote_hosts(3)
948         cls.pg0.configure_ipv4_neighbors()
949
950         cls.pg1.generate_remote_hosts(1)
951         cls.pg1.configure_ipv4_neighbors()
952
953         cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
954         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10})
955         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20})
956
957         cls.pg4._local_ip4 = "172.16.255.1"
958         cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
959         cls.pg4.set_table_ip4(10)
960         cls.pg5._local_ip4 = "172.17.255.3"
961         cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
962         cls.pg5.set_table_ip4(10)
963         cls.pg6._local_ip4 = "172.16.255.1"
964         cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
965         cls.pg6.set_table_ip4(20)
966         for i in cls.overlapping_interfaces:
967             i.config_ip4()
968             i.admin_up()
969             i.resolve_arp()
970
971         cls.pg7.admin_up()
972         cls.pg8.admin_up()
973
974         cls.pg9.generate_remote_hosts(2)
975         cls.pg9.config_ip4()
976         cls.vapi.sw_interface_add_del_address(
977             sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24"
978         )
979
980         cls.pg9.admin_up()
981         cls.pg9.resolve_arp()
982         cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
983         cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
984         cls.pg9.resolve_arp()
985
986     def plugin_enable(self):
987         self.vapi.nat44_ei_plugin_enable_disable(
988             sessions=self.max_translations, users=self.max_users, enable=1
989         )
990
991     def setUp(self):
992         super(TestNAT44EI, self).setUp()
993         self.plugin_enable()
994
995     def tearDown(self):
996         super(TestNAT44EI, self).tearDown()
997         if not self.vpp_dead:
998             self.vapi.nat44_ei_ipfix_enable_disable(
999                 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
1000             )
1001             self.ipfix_src_port = 4739
1002             self.ipfix_domain_id = 1
1003
1004             self.vapi.nat44_ei_plugin_enable_disable(enable=0)
1005             self.vapi.cli("clear logging")
1006
1007     def test_clear_sessions(self):
1008         """NAT44EI session clearing test"""
1009
1010         self.nat44_add_address(self.nat_addr)
1011         flags = self.config_flags.NAT44_EI_IF_INSIDE
1012         self.vapi.nat44_ei_interface_add_del_feature(
1013             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1014         )
1015         self.vapi.nat44_ei_interface_add_del_feature(
1016             sw_if_index=self.pg1.sw_if_index, is_add=1
1017         )
1018
1019         pkts = self.create_stream_in(self.pg0, self.pg1)
1020         self.pg0.add_stream(pkts)
1021         self.pg_enable_capture(self.pg_interfaces)
1022         self.pg_start()
1023         capture = self.pg1.get_capture(len(pkts))
1024         self.verify_capture_out(capture)
1025
1026         sessions = self.statistics["/nat44-ei/total-sessions"]
1027         self.assertGreater(sessions[:, 0].sum(), 0, "Session count invalid")
1028         self.logger.info("sessions before clearing: %s" % sessions[0][0])
1029
1030         self.vapi.cli("clear nat44 ei sessions")
1031
1032         sessions = self.statistics["/nat44-ei/total-sessions"]
1033         self.assertEqual(sessions[:, 0].sum(), 0, "Session count invalid")
1034         self.logger.info("sessions after clearing: %s" % sessions[0][0])
1035
1036     def test_dynamic(self):
1037         """NAT44EI dynamic translation test"""
1038         self.nat44_add_address(self.nat_addr)
1039         flags = self.config_flags.NAT44_EI_IF_INSIDE
1040         self.vapi.nat44_ei_interface_add_del_feature(
1041             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1042         )
1043         self.vapi.nat44_ei_interface_add_del_feature(
1044             sw_if_index=self.pg1.sw_if_index, is_add=1
1045         )
1046
1047         # in2out
1048         tcpn = self.statistics["/nat44-ei/in2out/slowpath/tcp"]
1049         udpn = self.statistics["/nat44-ei/in2out/slowpath/udp"]
1050         icmpn = self.statistics["/nat44-ei/in2out/slowpath/icmp"]
1051         drops = self.statistics["/nat44-ei/in2out/slowpath/drops"]
1052
1053         pkts = self.create_stream_in(self.pg0, self.pg1)
1054         self.pg0.add_stream(pkts)
1055         self.pg_enable_capture(self.pg_interfaces)
1056         self.pg_start()
1057         capture = self.pg1.get_capture(len(pkts))
1058         self.verify_capture_out(capture)
1059
1060         if_idx = self.pg0.sw_if_index
1061         cnt = self.statistics["/nat44-ei/in2out/slowpath/tcp"]
1062         self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
1063         cnt = self.statistics["/nat44-ei/in2out/slowpath/udp"]
1064         self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
1065         cnt = self.statistics["/nat44-ei/in2out/slowpath/icmp"]
1066         self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
1067         cnt = self.statistics["/nat44-ei/in2out/slowpath/drops"]
1068         self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
1069
1070         # out2in
1071         tcpn = self.statistics["/nat44-ei/out2in/slowpath/tcp"]
1072         udpn = self.statistics["/nat44-ei/out2in/slowpath/udp"]
1073         icmpn = self.statistics["/nat44-ei/out2in/slowpath/icmp"]
1074         drops = self.statistics["/nat44-ei/out2in/slowpath/drops"]
1075
1076         pkts = self.create_stream_out(self.pg1)
1077         self.pg1.add_stream(pkts)
1078         self.pg_enable_capture(self.pg_interfaces)
1079         self.pg_start()
1080         capture = self.pg0.get_capture(len(pkts))
1081         self.verify_capture_in(capture, self.pg0)
1082
1083         if_idx = self.pg1.sw_if_index
1084         cnt = self.statistics["/nat44-ei/out2in/slowpath/tcp"]
1085         self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
1086         cnt = self.statistics["/nat44-ei/out2in/slowpath/udp"]
1087         self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
1088         cnt = self.statistics["/nat44-ei/out2in/slowpath/icmp"]
1089         self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
1090         cnt = self.statistics["/nat44-ei/out2in/slowpath/drops"]
1091         self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
1092
1093         users = self.statistics["/nat44-ei/total-users"]
1094         self.assertEqual(users[:, 0].sum(), 1)
1095         sessions = self.statistics["/nat44-ei/total-sessions"]
1096         self.assertEqual(sessions[:, 0].sum(), 3)
1097
1098     def test_dynamic_icmp_errors_in2out_ttl_1(self):
1099         """NAT44EI handling of client packets with TTL=1"""
1100
1101         self.nat44_add_address(self.nat_addr)
1102         flags = self.config_flags.NAT44_EI_IF_INSIDE
1103         self.vapi.nat44_ei_interface_add_del_feature(
1104             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1105         )
1106         self.vapi.nat44_ei_interface_add_del_feature(
1107             sw_if_index=self.pg1.sw_if_index, is_add=1
1108         )
1109
1110         # Client side - generate traffic
1111         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1112         capture = self.send_and_expect_some(self.pg0, pkts, self.pg0)
1113
1114         # Client side - verify ICMP type 11 packets
1115         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1116
1117     def test_dynamic_icmp_errors_out2in_ttl_1(self):
1118         """NAT44EI handling of server packets with TTL=1"""
1119
1120         self.nat44_add_address(self.nat_addr)
1121         flags = self.config_flags.NAT44_EI_IF_INSIDE
1122         self.vapi.nat44_ei_interface_add_del_feature(
1123             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1124         )
1125         self.vapi.nat44_ei_interface_add_del_feature(
1126             sw_if_index=self.pg1.sw_if_index, is_add=1
1127         )
1128
1129         # Client side - create sessions
1130         pkts = self.create_stream_in(self.pg0, self.pg1)
1131         self.pg0.add_stream(pkts)
1132         self.pg_enable_capture(self.pg_interfaces)
1133         self.pg_start()
1134
1135         # Server side - generate traffic
1136         capture = self.pg1.get_capture(len(pkts))
1137         self.verify_capture_out(capture)
1138         pkts = self.create_stream_out(self.pg1, ttl=1)
1139         capture = self.send_and_expect_some(self.pg1, pkts, self.pg1)
1140
1141         # Server side - verify ICMP type 11 packets
1142         self.verify_capture_out_with_icmp_errors(capture, src_ip=self.pg1.local_ip4)
1143
1144     def test_dynamic_icmp_errors_in2out_ttl_2(self):
1145         """NAT44EI handling of error responses to client packets with TTL=2"""
1146
1147         self.nat44_add_address(self.nat_addr)
1148         flags = self.config_flags.NAT44_EI_IF_INSIDE
1149         self.vapi.nat44_ei_interface_add_del_feature(
1150             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1151         )
1152         self.vapi.nat44_ei_interface_add_del_feature(
1153             sw_if_index=self.pg1.sw_if_index, is_add=1
1154         )
1155
1156         # Client side - generate traffic
1157         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1158         self.pg0.add_stream(pkts)
1159         self.pg_enable_capture(self.pg_interfaces)
1160         self.pg_start()
1161
1162         # Server side - simulate ICMP type 11 response
1163         capture = self.pg1.get_capture(len(pkts))
1164         pkts = [
1165             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1166             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1167             / ICMP(type=11)
1168             / packet[IP]
1169             for packet in capture
1170         ]
1171         self.pg1.add_stream(pkts)
1172         self.pg_enable_capture(self.pg_interfaces)
1173         self.pg_start()
1174
1175         # Client side - verify ICMP type 11 packets
1176         capture = self.pg0.get_capture(len(pkts))
1177         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1178
1179     def test_dynamic_icmp_errors_out2in_ttl_2(self):
1180         """NAT44EI handling of error responses to server packets with TTL=2"""
1181
1182         self.nat44_add_address(self.nat_addr)
1183         flags = self.config_flags.NAT44_EI_IF_INSIDE
1184         self.vapi.nat44_ei_interface_add_del_feature(
1185             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1186         )
1187         self.vapi.nat44_ei_interface_add_del_feature(
1188             sw_if_index=self.pg1.sw_if_index, is_add=1
1189         )
1190
1191         # Client side - create sessions
1192         pkts = self.create_stream_in(self.pg0, self.pg1)
1193         self.pg0.add_stream(pkts)
1194         self.pg_enable_capture(self.pg_interfaces)
1195         self.pg_start()
1196
1197         # Server side - generate traffic
1198         capture = self.pg1.get_capture(len(pkts))
1199         self.verify_capture_out(capture)
1200         pkts = self.create_stream_out(self.pg1, ttl=2)
1201         self.pg1.add_stream(pkts)
1202         self.pg_enable_capture(self.pg_interfaces)
1203         self.pg_start()
1204
1205         # Client side - simulate ICMP type 11 response
1206         capture = self.pg0.get_capture(len(pkts))
1207         pkts = [
1208             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1209             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1210             / ICMP(type=11)
1211             / packet[IP]
1212             for packet in capture
1213         ]
1214         self.pg0.add_stream(pkts)
1215         self.pg_enable_capture(self.pg_interfaces)
1216         self.pg_start()
1217
1218         # Server side - verify ICMP type 11 packets
1219         capture = self.pg1.get_capture(len(pkts))
1220         self.verify_capture_out_with_icmp_errors(capture)
1221
1222     def test_ping_out_interface_from_outside(self):
1223         """NAT44EI ping out interface from outside network"""
1224
1225         self.nat44_add_address(self.nat_addr)
1226         flags = self.config_flags.NAT44_EI_IF_INSIDE
1227         self.vapi.nat44_ei_interface_add_del_feature(
1228             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1229         )
1230         self.vapi.nat44_ei_interface_add_del_feature(
1231             sw_if_index=self.pg1.sw_if_index, is_add=1
1232         )
1233
1234         p = (
1235             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1236             / IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4)
1237             / ICMP(id=self.icmp_id_out, type="echo-request")
1238         )
1239         pkts = [p]
1240         self.pg1.add_stream(pkts)
1241         self.pg_enable_capture(self.pg_interfaces)
1242         self.pg_start()
1243         capture = self.pg1.get_capture(len(pkts))
1244         packet = capture[0]
1245         try:
1246             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1247             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1248             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1249             self.assertEqual(packet[ICMP].type, 0)  # echo reply
1250         except:
1251             self.logger.error(
1252                 ppp("Unexpected or invalid packet (outside network):", packet)
1253             )
1254             raise
1255
1256     def test_ping_internal_host_from_outside(self):
1257         """NAT44EI ping internal host from outside network"""
1258
1259         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1260         flags = self.config_flags.NAT44_EI_IF_INSIDE
1261         self.vapi.nat44_ei_interface_add_del_feature(
1262             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1263         )
1264         self.vapi.nat44_ei_interface_add_del_feature(
1265             sw_if_index=self.pg1.sw_if_index, is_add=1
1266         )
1267
1268         # out2in
1269         pkt = (
1270             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1271             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64)
1272             / ICMP(id=self.icmp_id_out, type="echo-request")
1273         )
1274         self.pg1.add_stream(pkt)
1275         self.pg_enable_capture(self.pg_interfaces)
1276         self.pg_start()
1277         capture = self.pg0.get_capture(1)
1278         self.verify_capture_in(capture, self.pg0)
1279         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1280
1281         # in2out
1282         pkt = (
1283             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1284             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
1285             / ICMP(id=self.icmp_id_in, type="echo-reply")
1286         )
1287         self.pg0.add_stream(pkt)
1288         self.pg_enable_capture(self.pg_interfaces)
1289         self.pg_start()
1290         capture = self.pg1.get_capture(1)
1291         self.verify_capture_out(capture, same_port=True)
1292         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1293
1294     def test_forwarding(self):
1295         """NAT44EI forwarding test"""
1296
1297         flags = self.config_flags.NAT44_EI_IF_INSIDE
1298         self.vapi.nat44_ei_interface_add_del_feature(
1299             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1300         )
1301         self.vapi.nat44_ei_interface_add_del_feature(
1302             sw_if_index=self.pg1.sw_if_index, is_add=1
1303         )
1304         self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
1305
1306         real_ip = self.pg0.remote_ip4
1307         alias_ip = self.nat_addr
1308         flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1309         self.vapi.nat44_ei_add_del_static_mapping(
1310             is_add=1,
1311             local_ip_address=real_ip,
1312             external_ip_address=alias_ip,
1313             external_sw_if_index=0xFFFFFFFF,
1314             flags=flags,
1315         )
1316
1317         try:
1318             # static mapping match
1319
1320             pkts = self.create_stream_out(self.pg1)
1321             self.pg1.add_stream(pkts)
1322             self.pg_enable_capture(self.pg_interfaces)
1323             self.pg_start()
1324             capture = self.pg0.get_capture(len(pkts))
1325             self.verify_capture_in(capture, self.pg0)
1326
1327             pkts = self.create_stream_in(self.pg0, self.pg1)
1328             self.pg0.add_stream(pkts)
1329             self.pg_enable_capture(self.pg_interfaces)
1330             self.pg_start()
1331             capture = self.pg1.get_capture(len(pkts))
1332             self.verify_capture_out(capture, same_port=True)
1333
1334             # no static mapping match
1335
1336             host0 = self.pg0.remote_hosts[0]
1337             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1338             try:
1339                 pkts = self.create_stream_out(
1340                     self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1341                 )
1342                 self.pg1.add_stream(pkts)
1343                 self.pg_enable_capture(self.pg_interfaces)
1344                 self.pg_start()
1345                 capture = self.pg0.get_capture(len(pkts))
1346                 self.verify_capture_in(capture, self.pg0)
1347
1348                 pkts = self.create_stream_in(self.pg0, self.pg1)
1349                 self.pg0.add_stream(pkts)
1350                 self.pg_enable_capture(self.pg_interfaces)
1351                 self.pg_start()
1352                 capture = self.pg1.get_capture(len(pkts))
1353                 self.verify_capture_out(
1354                     capture, nat_ip=self.pg0.remote_ip4, same_port=True
1355                 )
1356             finally:
1357                 self.pg0.remote_hosts[0] = host0
1358
1359         finally:
1360             self.vapi.nat44_ei_forwarding_enable_disable(enable=0)
1361             flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1362             self.vapi.nat44_ei_add_del_static_mapping(
1363                 is_add=0,
1364                 local_ip_address=real_ip,
1365                 external_ip_address=alias_ip,
1366                 external_sw_if_index=0xFFFFFFFF,
1367                 flags=flags,
1368             )
1369
1370     def test_static_in(self):
1371         """NAT44EI 1:1 NAT initialized from inside network"""
1372
1373         nat_ip = "10.0.0.10"
1374         self.tcp_port_out = 6303
1375         self.udp_port_out = 6304
1376         self.icmp_id_out = 6305
1377
1378         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1379         flags = self.config_flags.NAT44_EI_IF_INSIDE
1380         self.vapi.nat44_ei_interface_add_del_feature(
1381             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1382         )
1383         self.vapi.nat44_ei_interface_add_del_feature(
1384             sw_if_index=self.pg1.sw_if_index, is_add=1
1385         )
1386         sm = self.vapi.nat44_ei_static_mapping_dump()
1387         self.assertEqual(len(sm), 1)
1388         self.assertEqual(sm[0].tag, "")
1389         self.assertEqual(sm[0].protocol, 0)
1390         self.assertEqual(sm[0].local_port, 0)
1391         self.assertEqual(sm[0].external_port, 0)
1392
1393         # in2out
1394         pkts = self.create_stream_in(self.pg0, self.pg1)
1395         self.pg0.add_stream(pkts)
1396         self.pg_enable_capture(self.pg_interfaces)
1397         self.pg_start()
1398         capture = self.pg1.get_capture(len(pkts))
1399         self.verify_capture_out(capture, nat_ip, True)
1400
1401         # out2in
1402         pkts = self.create_stream_out(self.pg1, nat_ip)
1403         self.pg1.add_stream(pkts)
1404         self.pg_enable_capture(self.pg_interfaces)
1405         self.pg_start()
1406         capture = self.pg0.get_capture(len(pkts))
1407         self.verify_capture_in(capture, self.pg0)
1408
1409     def test_static_out(self):
1410         """NAT44EI 1:1 NAT initialized from outside network"""
1411
1412         nat_ip = "10.0.0.20"
1413         self.tcp_port_out = 6303
1414         self.udp_port_out = 6304
1415         self.icmp_id_out = 6305
1416         tag = "testTAG"
1417
1418         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1419         flags = self.config_flags.NAT44_EI_IF_INSIDE
1420         self.vapi.nat44_ei_interface_add_del_feature(
1421             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1422         )
1423         self.vapi.nat44_ei_interface_add_del_feature(
1424             sw_if_index=self.pg1.sw_if_index, is_add=1
1425         )
1426         sm = self.vapi.nat44_ei_static_mapping_dump()
1427         self.assertEqual(len(sm), 1)
1428         self.assertEqual(sm[0].tag, tag)
1429
1430         # out2in
1431         pkts = self.create_stream_out(self.pg1, nat_ip)
1432         self.pg1.add_stream(pkts)
1433         self.pg_enable_capture(self.pg_interfaces)
1434         self.pg_start()
1435         capture = self.pg0.get_capture(len(pkts))
1436         self.verify_capture_in(capture, self.pg0)
1437
1438         # in2out
1439         pkts = self.create_stream_in(self.pg0, self.pg1)
1440         self.pg0.add_stream(pkts)
1441         self.pg_enable_capture(self.pg_interfaces)
1442         self.pg_start()
1443         capture = self.pg1.get_capture(len(pkts))
1444         self.verify_capture_out(capture, nat_ip, True)
1445
1446     def test_static_with_port_in(self):
1447         """NAT44EI 1:1 NAPT initialized from inside network"""
1448
1449         self.tcp_port_out = 3606
1450         self.udp_port_out = 3607
1451         self.icmp_id_out = 3608
1452
1453         self.nat44_add_address(self.nat_addr)
1454         self.nat44_add_static_mapping(
1455             self.pg0.remote_ip4,
1456             self.nat_addr,
1457             self.tcp_port_in,
1458             self.tcp_port_out,
1459             proto=IP_PROTOS.tcp,
1460         )
1461         self.nat44_add_static_mapping(
1462             self.pg0.remote_ip4,
1463             self.nat_addr,
1464             self.udp_port_in,
1465             self.udp_port_out,
1466             proto=IP_PROTOS.udp,
1467         )
1468         self.nat44_add_static_mapping(
1469             self.pg0.remote_ip4,
1470             self.nat_addr,
1471             self.icmp_id_in,
1472             self.icmp_id_out,
1473             proto=IP_PROTOS.icmp,
1474         )
1475         flags = self.config_flags.NAT44_EI_IF_INSIDE
1476         self.vapi.nat44_ei_interface_add_del_feature(
1477             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1478         )
1479         self.vapi.nat44_ei_interface_add_del_feature(
1480             sw_if_index=self.pg1.sw_if_index, is_add=1
1481         )
1482
1483         # in2out
1484         pkts = self.create_stream_in(self.pg0, self.pg1)
1485         self.pg0.add_stream(pkts)
1486         self.pg_enable_capture(self.pg_interfaces)
1487         self.pg_start()
1488         capture = self.pg1.get_capture(len(pkts))
1489         self.verify_capture_out(capture)
1490
1491         # out2in
1492         pkts = self.create_stream_out(self.pg1)
1493         self.pg1.add_stream(pkts)
1494         self.pg_enable_capture(self.pg_interfaces)
1495         self.pg_start()
1496         capture = self.pg0.get_capture(len(pkts))
1497         self.verify_capture_in(capture, self.pg0)
1498
1499     def test_static_with_port_out(self):
1500         """NAT44EI 1:1 NAPT initialized from outside network"""
1501
1502         self.tcp_port_out = 30606
1503         self.udp_port_out = 30607
1504         self.icmp_id_out = 30608
1505
1506         self.nat44_add_address(self.nat_addr)
1507         self.nat44_add_static_mapping(
1508             self.pg0.remote_ip4,
1509             self.nat_addr,
1510             self.tcp_port_in,
1511             self.tcp_port_out,
1512             proto=IP_PROTOS.tcp,
1513         )
1514         self.nat44_add_static_mapping(
1515             self.pg0.remote_ip4,
1516             self.nat_addr,
1517             self.udp_port_in,
1518             self.udp_port_out,
1519             proto=IP_PROTOS.udp,
1520         )
1521         self.nat44_add_static_mapping(
1522             self.pg0.remote_ip4,
1523             self.nat_addr,
1524             self.icmp_id_in,
1525             self.icmp_id_out,
1526             proto=IP_PROTOS.icmp,
1527         )
1528         flags = self.config_flags.NAT44_EI_IF_INSIDE
1529         self.vapi.nat44_ei_interface_add_del_feature(
1530             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1531         )
1532         self.vapi.nat44_ei_interface_add_del_feature(
1533             sw_if_index=self.pg1.sw_if_index, is_add=1
1534         )
1535
1536         # out2in
1537         pkts = self.create_stream_out(self.pg1)
1538         self.pg1.add_stream(pkts)
1539         self.pg_enable_capture(self.pg_interfaces)
1540         self.pg_start()
1541         capture = self.pg0.get_capture(len(pkts))
1542         self.verify_capture_in(capture, self.pg0)
1543
1544         # in2out
1545         pkts = self.create_stream_in(self.pg0, self.pg1)
1546         self.pg0.add_stream(pkts)
1547         self.pg_enable_capture(self.pg_interfaces)
1548         self.pg_start()
1549         capture = self.pg1.get_capture(len(pkts))
1550         self.verify_capture_out(capture)
1551
1552     def test_static_vrf_aware(self):
1553         """NAT44EI 1:1 NAT VRF awareness"""
1554
1555         nat_ip1 = "10.0.0.30"
1556         nat_ip2 = "10.0.0.40"
1557         self.tcp_port_out = 6303
1558         self.udp_port_out = 6304
1559         self.icmp_id_out = 6305
1560
1561         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1, vrf_id=10)
1562         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2, vrf_id=10)
1563         flags = self.config_flags.NAT44_EI_IF_INSIDE
1564         self.vapi.nat44_ei_interface_add_del_feature(
1565             sw_if_index=self.pg3.sw_if_index, is_add=1
1566         )
1567         self.vapi.nat44_ei_interface_add_del_feature(
1568             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1569         )
1570         self.vapi.nat44_ei_interface_add_del_feature(
1571             sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
1572         )
1573
1574         # inside interface VRF match NAT44EI static mapping VRF
1575         pkts = self.create_stream_in(self.pg4, self.pg3)
1576         self.pg4.add_stream(pkts)
1577         self.pg_enable_capture(self.pg_interfaces)
1578         self.pg_start()
1579         capture = self.pg3.get_capture(len(pkts))
1580         self.verify_capture_out(capture, nat_ip1, True)
1581
1582         # inside interface VRF don't match NAT44EI static mapping VRF (packets
1583         # are dropped)
1584         pkts = self.create_stream_in(self.pg0, self.pg3)
1585         self.pg0.add_stream(pkts)
1586         self.pg_enable_capture(self.pg_interfaces)
1587         self.pg_start()
1588         self.pg3.assert_nothing_captured()
1589
1590     def test_dynamic_to_static(self):
1591         """NAT44EI Switch from dynamic translation to 1:1NAT"""
1592         nat_ip = "10.0.0.10"
1593         self.tcp_port_out = 6303
1594         self.udp_port_out = 6304
1595         self.icmp_id_out = 6305
1596
1597         self.nat44_add_address(self.nat_addr)
1598         flags = self.config_flags.NAT44_EI_IF_INSIDE
1599         self.vapi.nat44_ei_interface_add_del_feature(
1600             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1601         )
1602         self.vapi.nat44_ei_interface_add_del_feature(
1603             sw_if_index=self.pg1.sw_if_index, is_add=1
1604         )
1605
1606         # dynamic
1607         pkts = self.create_stream_in(self.pg0, self.pg1)
1608         self.pg0.add_stream(pkts)
1609         self.pg_enable_capture(self.pg_interfaces)
1610         self.pg_start()
1611         capture = self.pg1.get_capture(len(pkts))
1612         self.verify_capture_out(capture)
1613
1614         # 1:1NAT
1615         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1616         sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
1617         self.assertEqual(len(sessions), 0)
1618         pkts = self.create_stream_in(self.pg0, self.pg1)
1619         self.pg0.add_stream(pkts)
1620         self.pg_enable_capture(self.pg_interfaces)
1621         self.pg_start()
1622         capture = self.pg1.get_capture(len(pkts))
1623         self.verify_capture_out(capture, nat_ip, True)
1624
1625     def test_identity_nat(self):
1626         """NAT44EI Identity NAT"""
1627         flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1628         self.vapi.nat44_ei_add_del_identity_mapping(
1629             ip_address=self.pg0.remote_ip4,
1630             sw_if_index=0xFFFFFFFF,
1631             flags=flags,
1632             is_add=1,
1633         )
1634         flags = self.config_flags.NAT44_EI_IF_INSIDE
1635         self.vapi.nat44_ei_interface_add_del_feature(
1636             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1637         )
1638         self.vapi.nat44_ei_interface_add_del_feature(
1639             sw_if_index=self.pg1.sw_if_index, is_add=1
1640         )
1641
1642         p = (
1643             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1644             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
1645             / TCP(sport=12345, dport=56789)
1646         )
1647         self.pg1.add_stream(p)
1648         self.pg_enable_capture(self.pg_interfaces)
1649         self.pg_start()
1650         capture = self.pg0.get_capture(1)
1651         p = capture[0]
1652         try:
1653             ip = p[IP]
1654             tcp = p[TCP]
1655             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1656             self.assertEqual(ip.src, self.pg1.remote_ip4)
1657             self.assertEqual(tcp.dport, 56789)
1658             self.assertEqual(tcp.sport, 12345)
1659             self.assert_packet_checksums_valid(p)
1660         except:
1661             self.logger.error(ppp("Unexpected or invalid packet:", p))
1662             raise
1663
1664         sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
1665         self.assertEqual(len(sessions), 0)
1666         flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1667         self.vapi.nat44_ei_add_del_identity_mapping(
1668             ip_address=self.pg0.remote_ip4,
1669             sw_if_index=0xFFFFFFFF,
1670             flags=flags,
1671             vrf_id=1,
1672             is_add=1,
1673         )
1674         identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
1675         self.assertEqual(len(identity_mappings), 2)
1676
1677     def test_multiple_inside_interfaces(self):
1678         """NAT44EI multiple non-overlapping address space inside interfaces"""
1679
1680         self.nat44_add_address(self.nat_addr)
1681         flags = self.config_flags.NAT44_EI_IF_INSIDE
1682         self.vapi.nat44_ei_interface_add_del_feature(
1683             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1684         )
1685         self.vapi.nat44_ei_interface_add_del_feature(
1686             sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
1687         )
1688         self.vapi.nat44_ei_interface_add_del_feature(
1689             sw_if_index=self.pg3.sw_if_index, is_add=1
1690         )
1691
1692         # between two NAT44EI inside interfaces (no translation)
1693         pkts = self.create_stream_in(self.pg0, self.pg1)
1694         self.pg0.add_stream(pkts)
1695         self.pg_enable_capture(self.pg_interfaces)
1696         self.pg_start()
1697         capture = self.pg1.get_capture(len(pkts))
1698         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1699
1700         # from inside to interface without translation
1701         pkts = self.create_stream_in(self.pg0, self.pg2)
1702         self.pg0.add_stream(pkts)
1703         self.pg_enable_capture(self.pg_interfaces)
1704         self.pg_start()
1705         capture = self.pg2.get_capture(len(pkts))
1706         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1707
1708         # in2out 1st interface
1709         pkts = self.create_stream_in(self.pg0, self.pg3)
1710         self.pg0.add_stream(pkts)
1711         self.pg_enable_capture(self.pg_interfaces)
1712         self.pg_start()
1713         capture = self.pg3.get_capture(len(pkts))
1714         self.verify_capture_out(capture)
1715
1716         # out2in 1st interface
1717         pkts = self.create_stream_out(self.pg3)
1718         self.pg3.add_stream(pkts)
1719         self.pg_enable_capture(self.pg_interfaces)
1720         self.pg_start()
1721         capture = self.pg0.get_capture(len(pkts))
1722         self.verify_capture_in(capture, self.pg0)
1723
1724         # in2out 2nd interface
1725         pkts = self.create_stream_in(self.pg1, self.pg3)
1726         self.pg1.add_stream(pkts)
1727         self.pg_enable_capture(self.pg_interfaces)
1728         self.pg_start()
1729         capture = self.pg3.get_capture(len(pkts))
1730         self.verify_capture_out(capture)
1731
1732         # out2in 2nd interface
1733         pkts = self.create_stream_out(self.pg3)
1734         self.pg3.add_stream(pkts)
1735         self.pg_enable_capture(self.pg_interfaces)
1736         self.pg_start()
1737         capture = self.pg1.get_capture(len(pkts))
1738         self.verify_capture_in(capture, self.pg1)
1739
1740     def test_inside_overlapping_interfaces(self):
1741         """NAT44EI multiple inside interfaces with overlapping address space"""
1742
1743         static_nat_ip = "10.0.0.10"
1744         self.nat44_add_address(self.nat_addr)
1745         flags = self.config_flags.NAT44_EI_IF_INSIDE
1746         self.vapi.nat44_ei_interface_add_del_feature(
1747             sw_if_index=self.pg3.sw_if_index, is_add=1
1748         )
1749         self.vapi.nat44_ei_interface_add_del_feature(
1750             sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
1751         )
1752         self.vapi.nat44_ei_interface_add_del_feature(
1753             sw_if_index=self.pg5.sw_if_index, flags=flags, is_add=1
1754         )
1755         self.vapi.nat44_ei_interface_add_del_feature(
1756             sw_if_index=self.pg6.sw_if_index, flags=flags, is_add=1
1757         )
1758         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip, vrf_id=20)
1759
1760         # between NAT44EI inside interfaces with same VRF (no translation)
1761         pkts = self.create_stream_in(self.pg4, self.pg5)
1762         self.pg4.add_stream(pkts)
1763         self.pg_enable_capture(self.pg_interfaces)
1764         self.pg_start()
1765         capture = self.pg5.get_capture(len(pkts))
1766         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1767
1768         # between NAT44EI inside interfaces with different VRF (hairpinning)
1769         p = (
1770             Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
1771             / IP(src=self.pg4.remote_ip4, dst=static_nat_ip)
1772             / TCP(sport=1234, dport=5678)
1773         )
1774         self.pg4.add_stream(p)
1775         self.pg_enable_capture(self.pg_interfaces)
1776         self.pg_start()
1777         capture = self.pg6.get_capture(1)
1778         p = capture[0]
1779         try:
1780             ip = p[IP]
1781             tcp = p[TCP]
1782             self.assertEqual(ip.src, self.nat_addr)
1783             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1784             self.assertNotEqual(tcp.sport, 1234)
1785             self.assertEqual(tcp.dport, 5678)
1786         except:
1787             self.logger.error(ppp("Unexpected or invalid packet:", p))
1788             raise
1789
1790         # in2out 1st interface
1791         pkts = self.create_stream_in(self.pg4, self.pg3)
1792         self.pg4.add_stream(pkts)
1793         self.pg_enable_capture(self.pg_interfaces)
1794         self.pg_start()
1795         capture = self.pg3.get_capture(len(pkts))
1796         self.verify_capture_out(capture)
1797
1798         # out2in 1st interface
1799         pkts = self.create_stream_out(self.pg3)
1800         self.pg3.add_stream(pkts)
1801         self.pg_enable_capture(self.pg_interfaces)
1802         self.pg_start()
1803         capture = self.pg4.get_capture(len(pkts))
1804         self.verify_capture_in(capture, self.pg4)
1805
1806         # in2out 2nd interface
1807         pkts = self.create_stream_in(self.pg5, self.pg3)
1808         self.pg5.add_stream(pkts)
1809         self.pg_enable_capture(self.pg_interfaces)
1810         self.pg_start()
1811         capture = self.pg3.get_capture(len(pkts))
1812         self.verify_capture_out(capture)
1813
1814         # out2in 2nd interface
1815         pkts = self.create_stream_out(self.pg3)
1816         self.pg3.add_stream(pkts)
1817         self.pg_enable_capture(self.pg_interfaces)
1818         self.pg_start()
1819         capture = self.pg5.get_capture(len(pkts))
1820         self.verify_capture_in(capture, self.pg5)
1821
1822         # pg5 session dump
1823         addresses = self.vapi.nat44_ei_address_dump()
1824         self.assertEqual(len(addresses), 1)
1825         sessions = self.vapi.nat44_ei_user_session_dump(self.pg5.remote_ip4, 10)
1826         self.assertEqual(len(sessions), 3)
1827         for session in sessions:
1828             self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1829             self.assertEqual(str(session.inside_ip_address), self.pg5.remote_ip4)
1830             self.assertEqual(session.outside_ip_address, addresses[0].ip_address)
1831         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1832         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1833         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1834         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1835         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1836         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1837         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1838         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1839         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1840
1841         # in2out 3rd interface
1842         pkts = self.create_stream_in(self.pg6, self.pg3)
1843         self.pg6.add_stream(pkts)
1844         self.pg_enable_capture(self.pg_interfaces)
1845         self.pg_start()
1846         capture = self.pg3.get_capture(len(pkts))
1847         self.verify_capture_out(capture, static_nat_ip, True)
1848
1849         # out2in 3rd interface
1850         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1851         self.pg3.add_stream(pkts)
1852         self.pg_enable_capture(self.pg_interfaces)
1853         self.pg_start()
1854         capture = self.pg6.get_capture(len(pkts))
1855         self.verify_capture_in(capture, self.pg6)
1856
1857         # general user and session dump verifications
1858         users = self.vapi.nat44_ei_user_dump()
1859         self.assertGreaterEqual(len(users), 3)
1860         addresses = self.vapi.nat44_ei_address_dump()
1861         self.assertEqual(len(addresses), 1)
1862         for user in users:
1863             sessions = self.vapi.nat44_ei_user_session_dump(
1864                 user.ip_address, user.vrf_id
1865             )
1866             for session in sessions:
1867                 self.assertEqual(user.ip_address, session.inside_ip_address)
1868                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1869                 self.assertTrue(
1870                     session.protocol in [IP_PROTOS.tcp, IP_PROTOS.udp, IP_PROTOS.icmp]
1871                 )
1872
1873         # pg4 session dump
1874         sessions = self.vapi.nat44_ei_user_session_dump(self.pg4.remote_ip4, 10)
1875         self.assertGreaterEqual(len(sessions), 4)
1876         for session in sessions:
1877             self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1878             self.assertEqual(str(session.inside_ip_address), self.pg4.remote_ip4)
1879             self.assertEqual(session.outside_ip_address, addresses[0].ip_address)
1880
1881         # pg6 session dump
1882         sessions = self.vapi.nat44_ei_user_session_dump(self.pg6.remote_ip4, 20)
1883         self.assertGreaterEqual(len(sessions), 3)
1884         for session in sessions:
1885             self.assertTrue(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1886             self.assertEqual(str(session.inside_ip_address), self.pg6.remote_ip4)
1887             self.assertEqual(str(session.outside_ip_address), static_nat_ip)
1888             self.assertTrue(
1889                 session.inside_port
1890                 in [self.tcp_port_in, self.udp_port_in, self.icmp_id_in]
1891             )
1892
1893     def test_hairpinning(self):
1894         """NAT44EI hairpinning - 1:1 NAPT"""
1895
1896         host = self.pg0.remote_hosts[0]
1897         server = self.pg0.remote_hosts[1]
1898         host_in_port = 1234
1899         host_out_port = 0
1900         server_in_port = 5678
1901         server_out_port = 8765
1902
1903         self.nat44_add_address(self.nat_addr)
1904         flags = self.config_flags.NAT44_EI_IF_INSIDE
1905         self.vapi.nat44_ei_interface_add_del_feature(
1906             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1907         )
1908         self.vapi.nat44_ei_interface_add_del_feature(
1909             sw_if_index=self.pg1.sw_if_index, is_add=1
1910         )
1911
1912         # add static mapping for server
1913         self.nat44_add_static_mapping(
1914             server.ip4,
1915             self.nat_addr,
1916             server_in_port,
1917             server_out_port,
1918             proto=IP_PROTOS.tcp,
1919         )
1920
1921         cnt = self.statistics["/nat44-ei/hairpinning"]
1922         # send packet from host to server
1923         p = (
1924             Ether(src=host.mac, dst=self.pg0.local_mac)
1925             / IP(src=host.ip4, dst=self.nat_addr)
1926             / TCP(sport=host_in_port, dport=server_out_port)
1927         )
1928         self.pg0.add_stream(p)
1929         self.pg_enable_capture(self.pg_interfaces)
1930         self.pg_start()
1931         capture = self.pg0.get_capture(1)
1932         p = capture[0]
1933         try:
1934             ip = p[IP]
1935             tcp = p[TCP]
1936             self.assertEqual(ip.src, self.nat_addr)
1937             self.assertEqual(ip.dst, server.ip4)
1938             self.assertNotEqual(tcp.sport, host_in_port)
1939             self.assertEqual(tcp.dport, server_in_port)
1940             self.assert_packet_checksums_valid(p)
1941             host_out_port = tcp.sport
1942         except:
1943             self.logger.error(ppp("Unexpected or invalid packet:", p))
1944             raise
1945
1946         after = self.statistics["/nat44-ei/hairpinning"]
1947         if_idx = self.pg0.sw_if_index
1948         self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
1949
1950         # send reply from server to host
1951         p = (
1952             Ether(src=server.mac, dst=self.pg0.local_mac)
1953             / IP(src=server.ip4, dst=self.nat_addr)
1954             / TCP(sport=server_in_port, dport=host_out_port)
1955         )
1956         self.pg0.add_stream(p)
1957         self.pg_enable_capture(self.pg_interfaces)
1958         self.pg_start()
1959         capture = self.pg0.get_capture(1)
1960         p = capture[0]
1961         try:
1962             ip = p[IP]
1963             tcp = p[TCP]
1964             self.assertEqual(ip.src, self.nat_addr)
1965             self.assertEqual(ip.dst, host.ip4)
1966             self.assertEqual(tcp.sport, server_out_port)
1967             self.assertEqual(tcp.dport, host_in_port)
1968             self.assert_packet_checksums_valid(p)
1969         except:
1970             self.logger.error(ppp("Unexpected or invalid packet:", p))
1971             raise
1972
1973         after = self.statistics["/nat44-ei/hairpinning"]
1974         if_idx = self.pg0.sw_if_index
1975         self.assertEqual(
1976             after[:, if_idx].sum() - cnt[:, if_idx].sum(),
1977             2 + (1 if self.vpp_worker_count > 0 else 0),
1978         )
1979
1980     def test_hairpinning2(self):
1981         """NAT44EI hairpinning - 1:1 NAT"""
1982
1983         server1_nat_ip = "10.0.0.10"
1984         server2_nat_ip = "10.0.0.11"
1985         host = self.pg0.remote_hosts[0]
1986         server1 = self.pg0.remote_hosts[1]
1987         server2 = self.pg0.remote_hosts[2]
1988         server_tcp_port = 22
1989         server_udp_port = 20
1990
1991         self.nat44_add_address(self.nat_addr)
1992         flags = self.config_flags.NAT44_EI_IF_INSIDE
1993         self.vapi.nat44_ei_interface_add_del_feature(
1994             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1995         )
1996         self.vapi.nat44_ei_interface_add_del_feature(
1997             sw_if_index=self.pg1.sw_if_index, is_add=1
1998         )
1999
2000         # add static mapping for servers
2001         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2002         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2003
2004         # host to server1
2005         pkts = []
2006         p = (
2007             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2008             / IP(src=host.ip4, dst=server1_nat_ip)
2009             / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
2010         )
2011         pkts.append(p)
2012         p = (
2013             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2014             / IP(src=host.ip4, dst=server1_nat_ip)
2015             / UDP(sport=self.udp_port_in, dport=server_udp_port)
2016         )
2017         pkts.append(p)
2018         p = (
2019             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2020             / IP(src=host.ip4, dst=server1_nat_ip)
2021             / ICMP(id=self.icmp_id_in, type="echo-request")
2022         )
2023         pkts.append(p)
2024         self.pg0.add_stream(pkts)
2025         self.pg_enable_capture(self.pg_interfaces)
2026         self.pg_start()
2027         capture = self.pg0.get_capture(len(pkts))
2028         for packet in capture:
2029             try:
2030                 self.assertEqual(packet[IP].src, self.nat_addr)
2031                 self.assertEqual(packet[IP].dst, server1.ip4)
2032                 if packet.haslayer(TCP):
2033                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2034                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2035                     self.tcp_port_out = packet[TCP].sport
2036                     self.assert_packet_checksums_valid(packet)
2037                 elif packet.haslayer(UDP):
2038                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2039                     self.assertEqual(packet[UDP].dport, server_udp_port)
2040                     self.udp_port_out = packet[UDP].sport
2041                 else:
2042                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2043                     self.icmp_id_out = packet[ICMP].id
2044             except:
2045                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2046                 raise
2047
2048         # server1 to host
2049         pkts = []
2050         p = (
2051             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2052             / IP(src=server1.ip4, dst=self.nat_addr)
2053             / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
2054         )
2055         pkts.append(p)
2056         p = (
2057             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2058             / IP(src=server1.ip4, dst=self.nat_addr)
2059             / UDP(sport=server_udp_port, dport=self.udp_port_out)
2060         )
2061         pkts.append(p)
2062         p = (
2063             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2064             / IP(src=server1.ip4, dst=self.nat_addr)
2065             / ICMP(id=self.icmp_id_out, type="echo-reply")
2066         )
2067         pkts.append(p)
2068         self.pg0.add_stream(pkts)
2069         self.pg_enable_capture(self.pg_interfaces)
2070         self.pg_start()
2071         capture = self.pg0.get_capture(len(pkts))
2072         for packet in capture:
2073             try:
2074                 self.assertEqual(packet[IP].src, server1_nat_ip)
2075                 self.assertEqual(packet[IP].dst, host.ip4)
2076                 if packet.haslayer(TCP):
2077                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2078                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2079                     self.assert_packet_checksums_valid(packet)
2080                 elif packet.haslayer(UDP):
2081                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2082                     self.assertEqual(packet[UDP].sport, server_udp_port)
2083                 else:
2084                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2085             except:
2086                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2087                 raise
2088
2089         # server2 to server1
2090         pkts = []
2091         p = (
2092             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2093             / IP(src=server2.ip4, dst=server1_nat_ip)
2094             / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
2095         )
2096         pkts.append(p)
2097         p = (
2098             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2099             / IP(src=server2.ip4, dst=server1_nat_ip)
2100             / UDP(sport=self.udp_port_in, dport=server_udp_port)
2101         )
2102         pkts.append(p)
2103         p = (
2104             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2105             / IP(src=server2.ip4, dst=server1_nat_ip)
2106             / ICMP(id=self.icmp_id_in, type="echo-request")
2107         )
2108         pkts.append(p)
2109         self.pg0.add_stream(pkts)
2110         self.pg_enable_capture(self.pg_interfaces)
2111         self.pg_start()
2112         capture = self.pg0.get_capture(len(pkts))
2113         for packet in capture:
2114             try:
2115                 self.assertEqual(packet[IP].src, server2_nat_ip)
2116                 self.assertEqual(packet[IP].dst, server1.ip4)
2117                 if packet.haslayer(TCP):
2118                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2119                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2120                     self.tcp_port_out = packet[TCP].sport
2121                     self.assert_packet_checksums_valid(packet)
2122                 elif packet.haslayer(UDP):
2123                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
2124                     self.assertEqual(packet[UDP].dport, server_udp_port)
2125                     self.udp_port_out = packet[UDP].sport
2126                 else:
2127                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2128                     self.icmp_id_out = packet[ICMP].id
2129             except:
2130                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2131                 raise
2132
2133         # server1 to server2
2134         pkts = []
2135         p = (
2136             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2137             / IP(src=server1.ip4, dst=server2_nat_ip)
2138             / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
2139         )
2140         pkts.append(p)
2141         p = (
2142             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2143             / IP(src=server1.ip4, dst=server2_nat_ip)
2144             / UDP(sport=server_udp_port, dport=self.udp_port_out)
2145         )
2146         pkts.append(p)
2147         p = (
2148             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2149             / IP(src=server1.ip4, dst=server2_nat_ip)
2150             / ICMP(id=self.icmp_id_out, type="echo-reply")
2151         )
2152         pkts.append(p)
2153         self.pg0.add_stream(pkts)
2154         self.pg_enable_capture(self.pg_interfaces)
2155         self.pg_start()
2156         capture = self.pg0.get_capture(len(pkts))
2157         for packet in capture:
2158             try:
2159                 self.assertEqual(packet[IP].src, server1_nat_ip)
2160                 self.assertEqual(packet[IP].dst, server2.ip4)
2161                 if packet.haslayer(TCP):
2162                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2163                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2164                     self.assert_packet_checksums_valid(packet)
2165                 elif packet.haslayer(UDP):
2166                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2167                     self.assertEqual(packet[UDP].sport, server_udp_port)
2168                 else:
2169                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2170             except:
2171                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2172                 raise
2173
2174     def test_hairpinning_avoid_inf_loop(self):
2175         """NAT44EI hairpinning - 1:1 NAPT avoid infinite loop"""
2176
2177         host = self.pg0.remote_hosts[0]
2178         server = self.pg0.remote_hosts[1]
2179         host_in_port = 1234
2180         host_out_port = 0
2181         server_in_port = 5678
2182         server_out_port = 8765
2183
2184         self.nat44_add_address(self.nat_addr)
2185         flags = self.config_flags.NAT44_EI_IF_INSIDE
2186         self.vapi.nat44_ei_interface_add_del_feature(
2187             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2188         )
2189         self.vapi.nat44_ei_interface_add_del_feature(
2190             sw_if_index=self.pg1.sw_if_index, is_add=1
2191         )
2192
2193         # add static mapping for server
2194         self.nat44_add_static_mapping(
2195             server.ip4,
2196             self.nat_addr,
2197             server_in_port,
2198             server_out_port,
2199             proto=IP_PROTOS.tcp,
2200         )
2201
2202         # add another static mapping that maps pg0.local_ip4 address to itself
2203         self.nat44_add_static_mapping(self.pg0.local_ip4, self.pg0.local_ip4)
2204
2205         # send packet from host to VPP (the packet should get dropped)
2206         p = (
2207             Ether(src=host.mac, dst=self.pg0.local_mac)
2208             / IP(src=host.ip4, dst=self.pg0.local_ip4)
2209             / TCP(sport=host_in_port, dport=server_out_port)
2210         )
2211         self.pg0.add_stream(p)
2212         self.pg_enable_capture(self.pg_interfaces)
2213         self.pg_start()
2214         # Here VPP used to crash due to an infinite loop
2215
2216         cnt = self.statistics["/nat44-ei/hairpinning"]
2217         # send packet from host to server
2218         p = (
2219             Ether(src=host.mac, dst=self.pg0.local_mac)
2220             / IP(src=host.ip4, dst=self.nat_addr)
2221             / TCP(sport=host_in_port, dport=server_out_port)
2222         )
2223         self.pg0.add_stream(p)
2224         self.pg_enable_capture(self.pg_interfaces)
2225         self.pg_start()
2226         capture = self.pg0.get_capture(1)
2227         p = capture[0]
2228         try:
2229             ip = p[IP]
2230             tcp = p[TCP]
2231             self.assertEqual(ip.src, self.nat_addr)
2232             self.assertEqual(ip.dst, server.ip4)
2233             self.assertNotEqual(tcp.sport, host_in_port)
2234             self.assertEqual(tcp.dport, server_in_port)
2235             self.assert_packet_checksums_valid(p)
2236             host_out_port = tcp.sport
2237         except:
2238             self.logger.error(ppp("Unexpected or invalid packet:", p))
2239             raise
2240
2241         after = self.statistics["/nat44-ei/hairpinning"]
2242         if_idx = self.pg0.sw_if_index
2243         self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
2244
2245         # send reply from server to host
2246         p = (
2247             Ether(src=server.mac, dst=self.pg0.local_mac)
2248             / IP(src=server.ip4, dst=self.nat_addr)
2249             / TCP(sport=server_in_port, dport=host_out_port)
2250         )
2251         self.pg0.add_stream(p)
2252         self.pg_enable_capture(self.pg_interfaces)
2253         self.pg_start()
2254         capture = self.pg0.get_capture(1)
2255         p = capture[0]
2256         try:
2257             ip = p[IP]
2258             tcp = p[TCP]
2259             self.assertEqual(ip.src, self.nat_addr)
2260             self.assertEqual(ip.dst, host.ip4)
2261             self.assertEqual(tcp.sport, server_out_port)
2262             self.assertEqual(tcp.dport, host_in_port)
2263             self.assert_packet_checksums_valid(p)
2264         except:
2265             self.logger.error(ppp("Unexpected or invalid packet:", p))
2266             raise
2267
2268         after = self.statistics["/nat44-ei/hairpinning"]
2269         if_idx = self.pg0.sw_if_index
2270         self.assertEqual(
2271             after[:, if_idx].sum() - cnt[:, if_idx].sum(),
2272             2 + (1 if self.vpp_worker_count > 0 else 0),
2273         )
2274
2275     def test_interface_addr(self):
2276         """NAT44EI acquire addresses from interface"""
2277         self.vapi.nat44_ei_add_del_interface_addr(
2278             is_add=1, sw_if_index=self.pg7.sw_if_index
2279         )
2280
2281         # no address in NAT pool
2282         addresses = self.vapi.nat44_ei_address_dump()
2283         self.assertEqual(0, len(addresses))
2284
2285         # configure interface address and check NAT address pool
2286         self.pg7.config_ip4()
2287         addresses = self.vapi.nat44_ei_address_dump()
2288         self.assertEqual(1, len(addresses))
2289         self.assertEqual(str(addresses[0].ip_address), self.pg7.local_ip4)
2290
2291         # remove interface address and check NAT address pool
2292         self.pg7.unconfig_ip4()
2293         addresses = self.vapi.nat44_ei_address_dump()
2294         self.assertEqual(0, len(addresses))
2295
2296     def test_interface_addr_static_mapping(self):
2297         """NAT44EI Static mapping with addresses from interface"""
2298         tag = "testTAG"
2299
2300         self.vapi.nat44_ei_add_del_interface_addr(
2301             is_add=1, sw_if_index=self.pg7.sw_if_index
2302         )
2303         self.nat44_add_static_mapping(
2304             "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag
2305         )
2306
2307         # static mappings with external interface
2308         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2309         self.assertEqual(1, len(static_mappings))
2310         self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index)
2311         self.assertEqual(static_mappings[0].tag, tag)
2312
2313         # configure interface address and check static mappings
2314         self.pg7.config_ip4()
2315         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2316         self.assertEqual(2, len(static_mappings))
2317         resolved = False
2318         for sm in static_mappings:
2319             if sm.external_sw_if_index == 0xFFFFFFFF:
2320                 self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4)
2321                 self.assertEqual(sm.tag, tag)
2322                 resolved = True
2323         self.assertTrue(resolved)
2324
2325         # remove interface address and check static mappings
2326         self.pg7.unconfig_ip4()
2327         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2328         self.assertEqual(1, len(static_mappings))
2329         self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index)
2330         self.assertEqual(static_mappings[0].tag, tag)
2331
2332         # configure interface address again and check static mappings
2333         self.pg7.config_ip4()
2334         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2335         self.assertEqual(2, len(static_mappings))
2336         resolved = False
2337         for sm in static_mappings:
2338             if sm.external_sw_if_index == 0xFFFFFFFF:
2339                 self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4)
2340                 self.assertEqual(sm.tag, tag)
2341                 resolved = True
2342         self.assertTrue(resolved)
2343
2344         # remove static mapping
2345         self.nat44_add_static_mapping(
2346             "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag, is_add=0
2347         )
2348         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2349         self.assertEqual(0, len(static_mappings))
2350
2351     def test_interface_addr_identity_nat(self):
2352         """NAT44EI Identity NAT with addresses from interface"""
2353
2354         port = 53053
2355         self.vapi.nat44_ei_add_del_interface_addr(
2356             is_add=1, sw_if_index=self.pg7.sw_if_index
2357         )
2358         self.vapi.nat44_ei_add_del_identity_mapping(
2359             ip_address=b"0",
2360             sw_if_index=self.pg7.sw_if_index,
2361             port=port,
2362             protocol=IP_PROTOS.tcp,
2363             is_add=1,
2364         )
2365
2366         # identity mappings with external interface
2367         identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2368         self.assertEqual(1, len(identity_mappings))
2369         self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index)
2370
2371         # configure interface address and check identity mappings
2372         self.pg7.config_ip4()
2373         identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2374         resolved = False
2375         self.assertEqual(2, len(identity_mappings))
2376         for sm in identity_mappings:
2377             if sm.sw_if_index == 0xFFFFFFFF:
2378                 self.assertEqual(
2379                     str(identity_mappings[0].ip_address), self.pg7.local_ip4
2380                 )
2381                 self.assertEqual(port, identity_mappings[0].port)
2382                 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2383                 resolved = True
2384         self.assertTrue(resolved)
2385
2386         # remove interface address and check identity mappings
2387         self.pg7.unconfig_ip4()
2388         identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2389         self.assertEqual(1, len(identity_mappings))
2390         self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index)
2391
2392     def test_ipfix_nat44_sess(self):
2393         """NAT44EI IPFIX logging NAT44EI session created/deleted"""
2394         self.ipfix_domain_id = 10
2395         self.ipfix_src_port = 20202
2396         collector_port = 30303
2397         bind_layers(UDP, IPFIX, dport=30303)
2398         self.nat44_add_address(self.nat_addr)
2399         flags = self.config_flags.NAT44_EI_IF_INSIDE
2400         self.vapi.nat44_ei_interface_add_del_feature(
2401             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2402         )
2403         self.vapi.nat44_ei_interface_add_del_feature(
2404             sw_if_index=self.pg1.sw_if_index, is_add=1
2405         )
2406         self.vapi.set_ipfix_exporter(
2407             collector_address=self.pg3.remote_ip4,
2408             src_address=self.pg3.local_ip4,
2409             path_mtu=512,
2410             template_interval=10,
2411             collector_port=collector_port,
2412         )
2413         self.vapi.nat44_ei_ipfix_enable_disable(
2414             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2415         )
2416
2417         pkts = self.create_stream_in(self.pg0, self.pg1)
2418         self.pg0.add_stream(pkts)
2419         self.pg_enable_capture(self.pg_interfaces)
2420         self.pg_start()
2421         capture = self.pg1.get_capture(len(pkts))
2422         self.verify_capture_out(capture)
2423         self.nat44_add_address(self.nat_addr, is_add=0)
2424         self.vapi.ipfix_flush()
2425         capture = self.pg3.get_capture(7)
2426         ipfix = IPFIXDecoder()
2427         # first load template
2428         for p in capture:
2429             self.assertTrue(p.haslayer(IPFIX))
2430             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2431             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2432             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2433             self.assertEqual(p[UDP].dport, collector_port)
2434             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2435             if p.haslayer(Template):
2436                 ipfix.add_template(p.getlayer(Template))
2437         # verify events in data set
2438         for p in capture:
2439             if p.haslayer(Data):
2440                 data = ipfix.decode_data_set(p.getlayer(Set))
2441                 self.verify_ipfix_nat44_ses(data)
2442
2443     def test_ipfix_addr_exhausted(self):
2444         """NAT44EI IPFIX logging NAT addresses exhausted"""
2445         flags = self.config_flags.NAT44_EI_IF_INSIDE
2446         self.vapi.nat44_ei_interface_add_del_feature(
2447             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2448         )
2449         self.vapi.nat44_ei_interface_add_del_feature(
2450             sw_if_index=self.pg1.sw_if_index, is_add=1
2451         )
2452         self.vapi.set_ipfix_exporter(
2453             collector_address=self.pg3.remote_ip4,
2454             src_address=self.pg3.local_ip4,
2455             path_mtu=512,
2456             template_interval=10,
2457         )
2458         self.vapi.nat44_ei_ipfix_enable_disable(
2459             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2460         )
2461
2462         p = (
2463             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2464             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2465             / TCP(sport=3025)
2466         )
2467         self.pg0.add_stream(p)
2468         self.pg_enable_capture(self.pg_interfaces)
2469         self.pg_start()
2470         self.pg1.assert_nothing_captured()
2471         self.vapi.ipfix_flush()
2472         capture = self.pg3.get_capture(7)
2473         ipfix = IPFIXDecoder()
2474         # first load template
2475         for p in capture:
2476             self.assertTrue(p.haslayer(IPFIX))
2477             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2478             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2479             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2480             self.assertEqual(p[UDP].dport, 4739)
2481             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2482             if p.haslayer(Template):
2483                 ipfix.add_template(p.getlayer(Template))
2484         # verify events in data set
2485         for p in capture:
2486             if p.haslayer(Data):
2487                 data = ipfix.decode_data_set(p.getlayer(Set))
2488                 self.verify_ipfix_addr_exhausted(data)
2489
2490     def test_ipfix_max_sessions(self):
2491         """NAT44EI IPFIX logging maximum session entries exceeded"""
2492         self.nat44_add_address(self.nat_addr)
2493         flags = self.config_flags.NAT44_EI_IF_INSIDE
2494         self.vapi.nat44_ei_interface_add_del_feature(
2495             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2496         )
2497         self.vapi.nat44_ei_interface_add_del_feature(
2498             sw_if_index=self.pg1.sw_if_index, is_add=1
2499         )
2500
2501         max_sessions_per_thread = self.max_translations
2502         max_sessions = max(1, self.vpp_worker_count) * max_sessions_per_thread
2503
2504         pkts = []
2505         for i in range(0, max_sessions):
2506             src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2507             p = (
2508                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2509                 / IP(src=src, dst=self.pg1.remote_ip4)
2510                 / TCP(sport=1025)
2511             )
2512             pkts.append(p)
2513         self.pg0.add_stream(pkts)
2514         self.pg_enable_capture(self.pg_interfaces)
2515         self.pg_start()
2516
2517         self.pg1.get_capture(max_sessions)
2518         self.vapi.set_ipfix_exporter(
2519             collector_address=self.pg3.remote_ip4,
2520             src_address=self.pg3.local_ip4,
2521             path_mtu=512,
2522             template_interval=10,
2523         )
2524         self.vapi.nat44_ei_ipfix_enable_disable(
2525             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2526         )
2527
2528         p = (
2529             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2530             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2531             / TCP(sport=1025)
2532         )
2533         self.pg0.add_stream(p)
2534         self.pg_enable_capture(self.pg_interfaces)
2535         self.pg_start()
2536         self.pg1.assert_nothing_captured()
2537         self.vapi.ipfix_flush()
2538         capture = self.pg3.get_capture(7)
2539         ipfix = IPFIXDecoder()
2540         # first load template
2541         for p in capture:
2542             self.assertTrue(p.haslayer(IPFIX))
2543             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2544             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2545             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2546             self.assertEqual(p[UDP].dport, 4739)
2547             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2548             if p.haslayer(Template):
2549                 ipfix.add_template(p.getlayer(Template))
2550         # verify events in data set
2551         for p in capture:
2552             if p.haslayer(Data):
2553                 data = ipfix.decode_data_set(p.getlayer(Set))
2554                 self.verify_ipfix_max_sessions(data, max_sessions_per_thread)
2555
2556     def test_syslog_apmap(self):
2557         """NAT44EI syslog address and port mapping creation and deletion"""
2558         self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2559         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2560         self.nat44_add_address(self.nat_addr)
2561         flags = self.config_flags.NAT44_EI_IF_INSIDE
2562         self.vapi.nat44_ei_interface_add_del_feature(
2563             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2564         )
2565         self.vapi.nat44_ei_interface_add_del_feature(
2566             sw_if_index=self.pg1.sw_if_index, is_add=1
2567         )
2568
2569         p = (
2570             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2571             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2572             / TCP(sport=self.tcp_port_in, dport=20)
2573         )
2574         self.pg0.add_stream(p)
2575         self.pg_enable_capture(self.pg_interfaces)
2576         self.pg_start()
2577         capture = self.pg1.get_capture(1)
2578         self.tcp_port_out = capture[0][TCP].sport
2579         capture = self.pg3.get_capture(1)
2580         self.verify_syslog_apmap(capture[0][Raw].load)
2581
2582         self.pg_enable_capture(self.pg_interfaces)
2583         self.pg_start()
2584         self.nat44_add_address(self.nat_addr, is_add=0)
2585         capture = self.pg3.get_capture(1)
2586         self.verify_syslog_apmap(capture[0][Raw].load, False)
2587
2588     def test_pool_addr_fib(self):
2589         """NAT44EI add pool addresses to FIB"""
2590         static_addr = "10.0.0.10"
2591         self.nat44_add_address(self.nat_addr)
2592         flags = self.config_flags.NAT44_EI_IF_INSIDE
2593         self.vapi.nat44_ei_interface_add_del_feature(
2594             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2595         )
2596         self.vapi.nat44_ei_interface_add_del_feature(
2597             sw_if_index=self.pg1.sw_if_index, is_add=1
2598         )
2599         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2600
2601         # NAT44EI address
2602         p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2603             op=ARP.who_has,
2604             pdst=self.nat_addr,
2605             psrc=self.pg1.remote_ip4,
2606             hwsrc=self.pg1.remote_mac,
2607         )
2608         self.pg1.add_stream(p)
2609         self.pg_enable_capture(self.pg_interfaces)
2610         self.pg_start()
2611         capture = self.pg1.get_capture(1)
2612         self.assertTrue(capture[0].haslayer(ARP))
2613         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2614
2615         # 1:1 NAT address
2616         p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2617             op=ARP.who_has,
2618             pdst=static_addr,
2619             psrc=self.pg1.remote_ip4,
2620             hwsrc=self.pg1.remote_mac,
2621         )
2622         self.pg1.add_stream(p)
2623         self.pg_enable_capture(self.pg_interfaces)
2624         self.pg_start()
2625         capture = self.pg1.get_capture(1)
2626         self.assertTrue(capture[0].haslayer(ARP))
2627         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2628
2629         # send ARP to non-NAT44EI interface
2630         p = Ether(src=self.pg2.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2631             op=ARP.who_has,
2632             pdst=self.nat_addr,
2633             psrc=self.pg2.remote_ip4,
2634             hwsrc=self.pg2.remote_mac,
2635         )
2636         self.pg2.add_stream(p)
2637         self.pg_enable_capture(self.pg_interfaces)
2638         self.pg_start()
2639         self.pg1.assert_nothing_captured()
2640
2641         # remove addresses and verify
2642         self.nat44_add_address(self.nat_addr, is_add=0)
2643         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr, is_add=0)
2644
2645         p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2646             op=ARP.who_has,
2647             pdst=self.nat_addr,
2648             psrc=self.pg1.remote_ip4,
2649             hwsrc=self.pg1.remote_mac,
2650         )
2651         self.pg1.add_stream(p)
2652         self.pg_enable_capture(self.pg_interfaces)
2653         self.pg_start()
2654         self.pg1.assert_nothing_captured()
2655
2656         p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2657             op=ARP.who_has,
2658             pdst=static_addr,
2659             psrc=self.pg1.remote_ip4,
2660             hwsrc=self.pg1.remote_mac,
2661         )
2662         self.pg1.add_stream(p)
2663         self.pg_enable_capture(self.pg_interfaces)
2664         self.pg_start()
2665         self.pg1.assert_nothing_captured()
2666
2667     def test_vrf_mode(self):
2668         """NAT44EI tenant VRF aware address pool mode"""
2669
2670         vrf_id1 = 1
2671         vrf_id2 = 2
2672         nat_ip1 = "10.0.0.10"
2673         nat_ip2 = "10.0.0.11"
2674
2675         self.pg0.unconfig_ip4()
2676         self.pg1.unconfig_ip4()
2677         self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1})
2678         self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2})
2679         self.pg0.set_table_ip4(vrf_id1)
2680         self.pg1.set_table_ip4(vrf_id2)
2681         self.pg0.config_ip4()
2682         self.pg1.config_ip4()
2683         self.pg0.resolve_arp()
2684         self.pg1.resolve_arp()
2685
2686         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2687         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2688         flags = self.config_flags.NAT44_EI_IF_INSIDE
2689         self.vapi.nat44_ei_interface_add_del_feature(
2690             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2691         )
2692         self.vapi.nat44_ei_interface_add_del_feature(
2693             sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
2694         )
2695         self.vapi.nat44_ei_interface_add_del_feature(
2696             sw_if_index=self.pg2.sw_if_index, is_add=1
2697         )
2698
2699         try:
2700             # first VRF
2701             pkts = self.create_stream_in(self.pg0, self.pg2)
2702             self.pg0.add_stream(pkts)
2703             self.pg_enable_capture(self.pg_interfaces)
2704             self.pg_start()
2705             capture = self.pg2.get_capture(len(pkts))
2706             self.verify_capture_out(capture, nat_ip1)
2707
2708             # second VRF
2709             pkts = self.create_stream_in(self.pg1, self.pg2)
2710             self.pg1.add_stream(pkts)
2711             self.pg_enable_capture(self.pg_interfaces)
2712             self.pg_start()
2713             capture = self.pg2.get_capture(len(pkts))
2714             self.verify_capture_out(capture, nat_ip2)
2715
2716         finally:
2717             self.pg0.unconfig_ip4()
2718             self.pg1.unconfig_ip4()
2719             self.pg0.set_table_ip4(0)
2720             self.pg1.set_table_ip4(0)
2721             self.pg0.config_ip4()
2722             self.pg1.config_ip4()
2723             self.pg0.resolve_arp()
2724             self.pg1.resolve_arp()
2725             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id1})
2726             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id2})
2727
2728     def test_vrf_feature_independent(self):
2729         """NAT44EI tenant VRF independent address pool mode"""
2730
2731         nat_ip1 = "10.0.0.10"
2732         nat_ip2 = "10.0.0.11"
2733
2734         self.nat44_add_address(nat_ip1)
2735         self.nat44_add_address(nat_ip2, vrf_id=99)
2736         flags = self.config_flags.NAT44_EI_IF_INSIDE
2737         self.vapi.nat44_ei_interface_add_del_feature(
2738             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2739         )
2740         self.vapi.nat44_ei_interface_add_del_feature(
2741             sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
2742         )
2743         self.vapi.nat44_ei_interface_add_del_feature(
2744             sw_if_index=self.pg2.sw_if_index, is_add=1
2745         )
2746
2747         # first VRF
2748         pkts = self.create_stream_in(self.pg0, self.pg2)
2749         self.pg0.add_stream(pkts)
2750         self.pg_enable_capture(self.pg_interfaces)
2751         self.pg_start()
2752         capture = self.pg2.get_capture(len(pkts))
2753         self.verify_capture_out(capture, nat_ip1)
2754
2755         # second VRF
2756         pkts = self.create_stream_in(self.pg1, self.pg2)
2757         self.pg1.add_stream(pkts)
2758         self.pg_enable_capture(self.pg_interfaces)
2759         self.pg_start()
2760         capture = self.pg2.get_capture(len(pkts))
2761         self.verify_capture_out(capture, nat_ip1)
2762
2763     def test_dynamic_ipless_interfaces(self):
2764         """NAT44EI interfaces without configured IP address"""
2765         self.create_routes_and_neigbors()
2766         self.nat44_add_address(self.nat_addr)
2767         flags = self.config_flags.NAT44_EI_IF_INSIDE
2768         self.vapi.nat44_ei_interface_add_del_feature(
2769             sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2770         )
2771         self.vapi.nat44_ei_interface_add_del_feature(
2772             sw_if_index=self.pg8.sw_if_index, is_add=1
2773         )
2774
2775         # in2out
2776         pkts = self.create_stream_in(self.pg7, self.pg8)
2777         self.pg7.add_stream(pkts)
2778         self.pg_enable_capture(self.pg_interfaces)
2779         self.pg_start()
2780         capture = self.pg8.get_capture(len(pkts))
2781         self.verify_capture_out(capture)
2782
2783         # out2in
2784         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2785         self.pg8.add_stream(pkts)
2786         self.pg_enable_capture(self.pg_interfaces)
2787         self.pg_start()
2788         capture = self.pg7.get_capture(len(pkts))
2789         self.verify_capture_in(capture, self.pg7)
2790
2791     def test_static_ipless_interfaces(self):
2792         """NAT44EI interfaces without configured IP address - 1:1 NAT"""
2793
2794         self.create_routes_and_neigbors()
2795         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2796         flags = self.config_flags.NAT44_EI_IF_INSIDE
2797         self.vapi.nat44_ei_interface_add_del_feature(
2798             sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2799         )
2800         self.vapi.nat44_ei_interface_add_del_feature(
2801             sw_if_index=self.pg8.sw_if_index, is_add=1
2802         )
2803
2804         # out2in
2805         pkts = self.create_stream_out(self.pg8)
2806         self.pg8.add_stream(pkts)
2807         self.pg_enable_capture(self.pg_interfaces)
2808         self.pg_start()
2809         capture = self.pg7.get_capture(len(pkts))
2810         self.verify_capture_in(capture, self.pg7)
2811
2812         # in2out
2813         pkts = self.create_stream_in(self.pg7, self.pg8)
2814         self.pg7.add_stream(pkts)
2815         self.pg_enable_capture(self.pg_interfaces)
2816         self.pg_start()
2817         capture = self.pg8.get_capture(len(pkts))
2818         self.verify_capture_out(capture, self.nat_addr, True)
2819
2820     def test_static_with_port_ipless_interfaces(self):
2821         """NAT44EI interfaces without configured IP address - 1:1 NAPT"""
2822
2823         self.tcp_port_out = 30606
2824         self.udp_port_out = 30607
2825         self.icmp_id_out = 30608
2826
2827         self.create_routes_and_neigbors()
2828         self.nat44_add_address(self.nat_addr)
2829         self.nat44_add_static_mapping(
2830             self.pg7.remote_ip4,
2831             self.nat_addr,
2832             self.tcp_port_in,
2833             self.tcp_port_out,
2834             proto=IP_PROTOS.tcp,
2835         )
2836         self.nat44_add_static_mapping(
2837             self.pg7.remote_ip4,
2838             self.nat_addr,
2839             self.udp_port_in,
2840             self.udp_port_out,
2841             proto=IP_PROTOS.udp,
2842         )
2843         self.nat44_add_static_mapping(
2844             self.pg7.remote_ip4,
2845             self.nat_addr,
2846             self.icmp_id_in,
2847             self.icmp_id_out,
2848             proto=IP_PROTOS.icmp,
2849         )
2850         flags = self.config_flags.NAT44_EI_IF_INSIDE
2851         self.vapi.nat44_ei_interface_add_del_feature(
2852             sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2853         )
2854         self.vapi.nat44_ei_interface_add_del_feature(
2855             sw_if_index=self.pg8.sw_if_index, is_add=1
2856         )
2857
2858         # out2in
2859         pkts = self.create_stream_out(self.pg8)
2860         self.pg8.add_stream(pkts)
2861         self.pg_enable_capture(self.pg_interfaces)
2862         self.pg_start()
2863         capture = self.pg7.get_capture(len(pkts))
2864         self.verify_capture_in(capture, self.pg7)
2865
2866         # in2out
2867         pkts = self.create_stream_in(self.pg7, self.pg8)
2868         self.pg7.add_stream(pkts)
2869         self.pg_enable_capture(self.pg_interfaces)
2870         self.pg_start()
2871         capture = self.pg8.get_capture(len(pkts))
2872         self.verify_capture_out(capture)
2873
2874     def test_static_unknown_proto(self):
2875         """NAT44EI 1:1 translate packet with unknown protocol"""
2876         nat_ip = "10.0.0.10"
2877         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2878         flags = self.config_flags.NAT44_EI_IF_INSIDE
2879         self.vapi.nat44_ei_interface_add_del_feature(
2880             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2881         )
2882         self.vapi.nat44_ei_interface_add_del_feature(
2883             sw_if_index=self.pg1.sw_if_index, is_add=1
2884         )
2885
2886         # in2out
2887         p = (
2888             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2889             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2890             / GRE()
2891             / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4)
2892             / TCP(sport=1234, dport=1234)
2893         )
2894         self.pg0.add_stream(p)
2895         self.pg_enable_capture(self.pg_interfaces)
2896         self.pg_start()
2897         p = self.pg1.get_capture(1)
2898         packet = p[0]
2899         try:
2900             self.assertEqual(packet[IP].src, nat_ip)
2901             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2902             self.assertEqual(packet.haslayer(GRE), 1)
2903             self.assert_packet_checksums_valid(packet)
2904         except:
2905             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2906             raise
2907
2908         # out2in
2909         p = (
2910             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2911             / IP(src=self.pg1.remote_ip4, dst=nat_ip)
2912             / GRE()
2913             / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4)
2914             / TCP(sport=1234, dport=1234)
2915         )
2916         self.pg1.add_stream(p)
2917         self.pg_enable_capture(self.pg_interfaces)
2918         self.pg_start()
2919         p = self.pg0.get_capture(1)
2920         packet = p[0]
2921         try:
2922             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2923             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2924             self.assertEqual(packet.haslayer(GRE), 1)
2925             self.assert_packet_checksums_valid(packet)
2926         except:
2927             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2928             raise
2929
2930     def test_hairpinning_static_unknown_proto(self):
2931         """NAT44EI 1:1 translate packet with unknown protocol - hairpinning"""
2932
2933         host = self.pg0.remote_hosts[0]
2934         server = self.pg0.remote_hosts[1]
2935
2936         host_nat_ip = "10.0.0.10"
2937         server_nat_ip = "10.0.0.11"
2938
2939         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2940         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2941         flags = self.config_flags.NAT44_EI_IF_INSIDE
2942         self.vapi.nat44_ei_interface_add_del_feature(
2943             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2944         )
2945         self.vapi.nat44_ei_interface_add_del_feature(
2946             sw_if_index=self.pg1.sw_if_index, is_add=1
2947         )
2948
2949         # host to server
2950         p = (
2951             Ether(dst=self.pg0.local_mac, src=host.mac)
2952             / IP(src=host.ip4, dst=server_nat_ip)
2953             / GRE()
2954             / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4)
2955             / TCP(sport=1234, dport=1234)
2956         )
2957         self.pg0.add_stream(p)
2958         self.pg_enable_capture(self.pg_interfaces)
2959         self.pg_start()
2960         p = self.pg0.get_capture(1)
2961         packet = p[0]
2962         try:
2963             self.assertEqual(packet[IP].src, host_nat_ip)
2964             self.assertEqual(packet[IP].dst, server.ip4)
2965             self.assertEqual(packet.haslayer(GRE), 1)
2966             self.assert_packet_checksums_valid(packet)
2967         except:
2968             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2969             raise
2970
2971         # server to host
2972         p = (
2973             Ether(dst=self.pg0.local_mac, src=server.mac)
2974             / IP(src=server.ip4, dst=host_nat_ip)
2975             / GRE()
2976             / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4)
2977             / TCP(sport=1234, dport=1234)
2978         )
2979         self.pg0.add_stream(p)
2980         self.pg_enable_capture(self.pg_interfaces)
2981         self.pg_start()
2982         p = self.pg0.get_capture(1)
2983         packet = p[0]
2984         try:
2985             self.assertEqual(packet[IP].src, server_nat_ip)
2986             self.assertEqual(packet[IP].dst, host.ip4)
2987             self.assertEqual(packet.haslayer(GRE), 1)
2988             self.assert_packet_checksums_valid(packet)
2989         except:
2990             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2991             raise
2992
2993     def test_output_feature(self):
2994         """NAT44EI output feature (in2out postrouting)"""
2995         self.nat44_add_address(self.nat_addr)
2996         self.vapi.nat44_ei_add_del_output_interface(
2997             sw_if_index=self.pg3.sw_if_index, is_add=1
2998         )
2999
3000         # in2out
3001         pkts = self.create_stream_in(self.pg0, self.pg3)
3002         self.pg0.add_stream(pkts)
3003         self.pg_enable_capture(self.pg_interfaces)
3004         self.pg_start()
3005         capture = self.pg3.get_capture(len(pkts))
3006         self.verify_capture_out(capture)
3007
3008         # out2in
3009         pkts = self.create_stream_out(self.pg3)
3010         self.pg3.add_stream(pkts)
3011         self.pg_enable_capture(self.pg_interfaces)
3012         self.pg_start()
3013         capture = self.pg0.get_capture(len(pkts))
3014         self.verify_capture_in(capture, self.pg0)
3015
3016         # from non-NAT interface to NAT inside interface
3017         pkts = self.create_stream_in(self.pg2, self.pg0)
3018         self.pg2.add_stream(pkts)
3019         self.pg_enable_capture(self.pg_interfaces)
3020         self.pg_start()
3021         capture = self.pg0.get_capture(len(pkts))
3022         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3023
3024     def test_output_feature_vrf_aware(self):
3025         """NAT44EI output feature VRF aware (in2out postrouting)"""
3026         nat_ip_vrf10 = "10.0.0.10"
3027         nat_ip_vrf20 = "10.0.0.20"
3028
3029         r1 = VppIpRoute(
3030             self,
3031             self.pg3.remote_ip4,
3032             32,
3033             [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
3034             table_id=10,
3035         )
3036         r2 = VppIpRoute(
3037             self,
3038             self.pg3.remote_ip4,
3039             32,
3040             [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
3041             table_id=20,
3042         )
3043         r1.add_vpp_config()
3044         r2.add_vpp_config()
3045
3046         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3047         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3048         self.vapi.nat44_ei_add_del_output_interface(
3049             sw_if_index=self.pg3.sw_if_index, is_add=1
3050         )
3051
3052         # in2out VRF 10
3053         pkts = self.create_stream_in(self.pg4, self.pg3)
3054         self.pg4.add_stream(pkts)
3055         self.pg_enable_capture(self.pg_interfaces)
3056         self.pg_start()
3057         capture = self.pg3.get_capture(len(pkts))
3058         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3059
3060         # out2in VRF 10
3061         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3062         self.pg3.add_stream(pkts)
3063         self.pg_enable_capture(self.pg_interfaces)
3064         self.pg_start()
3065         capture = self.pg4.get_capture(len(pkts))
3066         self.verify_capture_in(capture, self.pg4)
3067
3068         # in2out VRF 20
3069         pkts = self.create_stream_in(self.pg6, self.pg3)
3070         self.pg6.add_stream(pkts)
3071         self.pg_enable_capture(self.pg_interfaces)
3072         self.pg_start()
3073         capture = self.pg3.get_capture(len(pkts))
3074         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3075
3076         # out2in VRF 20
3077         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3078         self.pg3.add_stream(pkts)
3079         self.pg_enable_capture(self.pg_interfaces)
3080         self.pg_start()
3081         capture = self.pg6.get_capture(len(pkts))
3082         self.verify_capture_in(capture, self.pg6)
3083
3084     def test_output_feature_hairpinning(self):
3085         """NAT44EI output feature hairpinning (in2out postrouting)"""
3086         host = self.pg0.remote_hosts[0]
3087         server = self.pg0.remote_hosts[1]
3088         host_in_port = 1234
3089         host_out_port = 0
3090         server_in_port = 5678
3091         server_out_port = 8765
3092
3093         self.nat44_add_address(self.nat_addr)
3094         self.vapi.nat44_ei_add_del_output_interface(
3095             sw_if_index=self.pg0.sw_if_index, is_add=1
3096         )
3097         self.vapi.nat44_ei_add_del_output_interface(
3098             sw_if_index=self.pg1.sw_if_index, is_add=1
3099         )
3100
3101         # add static mapping for server
3102         self.nat44_add_static_mapping(
3103             server.ip4,
3104             self.nat_addr,
3105             server_in_port,
3106             server_out_port,
3107             proto=IP_PROTOS.tcp,
3108         )
3109
3110         # send packet from host to server
3111         p = (
3112             Ether(src=host.mac, dst=self.pg0.local_mac)
3113             / IP(src=host.ip4, dst=self.nat_addr)
3114             / TCP(sport=host_in_port, dport=server_out_port)
3115         )
3116         self.pg0.add_stream(p)
3117         self.pg_enable_capture(self.pg_interfaces)
3118         self.pg_start()
3119         capture = self.pg0.get_capture(1)
3120         p = capture[0]
3121         try:
3122             ip = p[IP]
3123             tcp = p[TCP]
3124             self.assertEqual(ip.src, self.nat_addr)
3125             self.assertEqual(ip.dst, server.ip4)
3126             self.assertNotEqual(tcp.sport, host_in_port)
3127             self.assertEqual(tcp.dport, server_in_port)
3128             self.assert_packet_checksums_valid(p)
3129             host_out_port = tcp.sport
3130         except:
3131             self.logger.error(ppp("Unexpected or invalid packet:", p))
3132             raise
3133
3134         # send reply from server to host
3135         p = (
3136             Ether(src=server.mac, dst=self.pg0.local_mac)
3137             / IP(src=server.ip4, dst=self.nat_addr)
3138             / TCP(sport=server_in_port, dport=host_out_port)
3139         )
3140         self.pg0.add_stream(p)
3141         self.pg_enable_capture(self.pg_interfaces)
3142         self.pg_start()
3143         capture = self.pg0.get_capture(1)
3144         p = capture[0]
3145         try:
3146             ip = p[IP]
3147             tcp = p[TCP]
3148             self.assertEqual(ip.src, self.nat_addr)
3149             self.assertEqual(ip.dst, host.ip4)
3150             self.assertEqual(tcp.sport, server_out_port)
3151             self.assertEqual(tcp.dport, host_in_port)
3152             self.assert_packet_checksums_valid(p)
3153         except:
3154             self.logger.error(ppp("Unexpected or invalid packet:", p))
3155             raise
3156
3157     def test_one_armed_nat44(self):
3158         """NAT44EI One armed NAT"""
3159         remote_host = self.pg9.remote_hosts[0]
3160         local_host = self.pg9.remote_hosts[1]
3161         external_port = 0
3162
3163         self.nat44_add_address(self.nat_addr)
3164         flags = self.config_flags.NAT44_EI_IF_INSIDE
3165         self.vapi.nat44_ei_interface_add_del_feature(
3166             sw_if_index=self.pg9.sw_if_index, is_add=1
3167         )
3168         self.vapi.nat44_ei_interface_add_del_feature(
3169             sw_if_index=self.pg9.sw_if_index, flags=flags, is_add=1
3170         )
3171
3172         # in2out
3173         p = (
3174             Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac)
3175             / IP(src=local_host.ip4, dst=remote_host.ip4)
3176             / TCP(sport=12345, dport=80)
3177         )
3178         self.pg9.add_stream(p)
3179         self.pg_enable_capture(self.pg_interfaces)
3180         self.pg_start()
3181         capture = self.pg9.get_capture(1)
3182         p = capture[0]
3183         try:
3184             ip = p[IP]
3185             tcp = p[TCP]
3186             self.assertEqual(ip.src, self.nat_addr)
3187             self.assertEqual(ip.dst, remote_host.ip4)
3188             self.assertNotEqual(tcp.sport, 12345)
3189             external_port = tcp.sport
3190             self.assertEqual(tcp.dport, 80)
3191             self.assert_packet_checksums_valid(p)
3192         except:
3193             self.logger.error(ppp("Unexpected or invalid packet:", p))
3194             raise
3195
3196         # out2in
3197         p = (
3198             Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac)
3199             / IP(src=remote_host.ip4, dst=self.nat_addr)
3200             / TCP(sport=80, dport=external_port)
3201         )
3202         self.pg9.add_stream(p)
3203         self.pg_enable_capture(self.pg_interfaces)
3204         self.pg_start()
3205         capture = self.pg9.get_capture(1)
3206         p = capture[0]
3207         try:
3208             ip = p[IP]
3209             tcp = p[TCP]
3210             self.assertEqual(ip.src, remote_host.ip4)
3211             self.assertEqual(ip.dst, local_host.ip4)
3212             self.assertEqual(tcp.sport, 80)
3213             self.assertEqual(tcp.dport, 12345)
3214             self.assert_packet_checksums_valid(p)
3215         except:
3216             self.logger.error(ppp("Unexpected or invalid packet:", p))
3217             raise
3218
3219         if self.vpp_worker_count > 1:
3220             node = "nat44-ei-handoff-classify"
3221         else:
3222             node = "nat44-ei-classify"
3223
3224         err = self.statistics.get_err_counter("/err/%s/next in2out" % node)
3225         self.assertEqual(err, 1)
3226         err = self.statistics.get_err_counter("/err/%s/next out2in" % node)
3227         self.assertEqual(err, 1)
3228
3229     def test_del_session(self):
3230         """NAT44EI delete session"""
3231         self.nat44_add_address(self.nat_addr)
3232         flags = self.config_flags.NAT44_EI_IF_INSIDE
3233         self.vapi.nat44_ei_interface_add_del_feature(
3234             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3235         )
3236         self.vapi.nat44_ei_interface_add_del_feature(
3237             sw_if_index=self.pg1.sw_if_index, is_add=1
3238         )
3239
3240         pkts = self.create_stream_in(self.pg0, self.pg1)
3241         self.pg0.add_stream(pkts)
3242         self.pg_enable_capture(self.pg_interfaces)
3243         self.pg_start()
3244         self.pg1.get_capture(len(pkts))
3245
3246         sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
3247         nsessions = len(sessions)
3248
3249         self.vapi.nat44_ei_del_session(
3250             address=sessions[0].inside_ip_address,
3251             port=sessions[0].inside_port,
3252             protocol=sessions[0].protocol,
3253             flags=self.config_flags.NAT44_EI_IF_INSIDE,
3254         )
3255
3256         self.vapi.nat44_ei_del_session(
3257             address=sessions[1].outside_ip_address,
3258             port=sessions[1].outside_port,
3259             protocol=sessions[1].protocol,
3260         )
3261
3262         sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
3263         self.assertEqual(nsessions - len(sessions), 2)
3264
3265         self.vapi.nat44_ei_del_session(
3266             address=sessions[0].inside_ip_address,
3267             port=sessions[0].inside_port,
3268             protocol=sessions[0].protocol,
3269             flags=self.config_flags.NAT44_EI_IF_INSIDE,
3270         )
3271
3272         self.verify_no_nat44_user()
3273
3274     def test_frag_in_order(self):
3275         """NAT44EI translate fragments arriving in order"""
3276
3277         self.nat44_add_address(self.nat_addr)
3278         flags = self.config_flags.NAT44_EI_IF_INSIDE
3279         self.vapi.nat44_ei_interface_add_del_feature(
3280             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3281         )
3282         self.vapi.nat44_ei_interface_add_del_feature(
3283             sw_if_index=self.pg1.sw_if_index, is_add=1
3284         )
3285
3286         self.frag_in_order(proto=IP_PROTOS.tcp)
3287         self.frag_in_order(proto=IP_PROTOS.udp)
3288         self.frag_in_order(proto=IP_PROTOS.icmp)
3289
3290     def test_frag_forwarding(self):
3291         """NAT44EI forwarding fragment test"""
3292         self.vapi.nat44_ei_add_del_interface_addr(
3293             is_add=1, sw_if_index=self.pg1.sw_if_index
3294         )
3295         flags = self.config_flags.NAT44_EI_IF_INSIDE
3296         self.vapi.nat44_ei_interface_add_del_feature(
3297             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3298         )
3299         self.vapi.nat44_ei_interface_add_del_feature(
3300             sw_if_index=self.pg1.sw_if_index, is_add=1
3301         )
3302         self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
3303
3304         data = b"A" * 16 + b"B" * 16 + b"C" * 3
3305         pkts = self.create_stream_frag(
3306             self.pg1, self.pg0.remote_ip4, 4789, 4789, data, proto=IP_PROTOS.udp
3307         )
3308         self.pg1.add_stream(pkts)
3309         self.pg_enable_capture(self.pg_interfaces)
3310         self.pg_start()
3311         frags = self.pg0.get_capture(len(pkts))
3312         p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
3313         self.assertEqual(p[UDP].sport, 4789)
3314         self.assertEqual(p[UDP].dport, 4789)
3315         self.assertEqual(data, p[Raw].load)
3316
3317     def test_reass_hairpinning(self):
3318         """NAT44EI fragments hairpinning"""
3319
3320         server_addr = self.pg0.remote_hosts[1].ip4
3321         host_in_port = random.randint(1025, 65535)
3322         server_in_port = random.randint(1025, 65535)
3323         server_out_port = random.randint(1025, 65535)
3324
3325         self.nat44_add_address(self.nat_addr)
3326         flags = self.config_flags.NAT44_EI_IF_INSIDE
3327         self.vapi.nat44_ei_interface_add_del_feature(
3328             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3329         )
3330         self.vapi.nat44_ei_interface_add_del_feature(
3331             sw_if_index=self.pg1.sw_if_index, is_add=1
3332         )
3333         # add static mapping for server
3334         self.nat44_add_static_mapping(
3335             server_addr,
3336             self.nat_addr,
3337             server_in_port,
3338             server_out_port,
3339             proto=IP_PROTOS.tcp,
3340         )
3341         self.nat44_add_static_mapping(
3342             server_addr,
3343             self.nat_addr,
3344             server_in_port,
3345             server_out_port,
3346             proto=IP_PROTOS.udp,
3347         )
3348         self.nat44_add_static_mapping(server_addr, self.nat_addr)
3349
3350         self.reass_hairpinning(
3351             server_addr,
3352             server_in_port,
3353             server_out_port,
3354             host_in_port,
3355             proto=IP_PROTOS.tcp,
3356         )
3357         self.reass_hairpinning(
3358             server_addr,
3359             server_in_port,
3360             server_out_port,
3361             host_in_port,
3362             proto=IP_PROTOS.udp,
3363         )
3364         self.reass_hairpinning(
3365             server_addr,
3366             server_in_port,
3367             server_out_port,
3368             host_in_port,
3369             proto=IP_PROTOS.icmp,
3370         )
3371
3372     def test_frag_out_of_order(self):
3373         """NAT44EI translate fragments arriving out of order"""
3374
3375         self.nat44_add_address(self.nat_addr)
3376         flags = self.config_flags.NAT44_EI_IF_INSIDE
3377         self.vapi.nat44_ei_interface_add_del_feature(
3378             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3379         )
3380         self.vapi.nat44_ei_interface_add_del_feature(
3381             sw_if_index=self.pg1.sw_if_index, is_add=1
3382         )
3383
3384         self.frag_out_of_order(proto=IP_PROTOS.tcp)
3385         self.frag_out_of_order(proto=IP_PROTOS.udp)
3386         self.frag_out_of_order(proto=IP_PROTOS.icmp)
3387
3388     def test_port_restricted(self):
3389         """NAT44EI Port restricted NAT44EI (MAP-E CE)"""
3390         self.nat44_add_address(self.nat_addr)
3391         flags = self.config_flags.NAT44_EI_IF_INSIDE
3392         self.vapi.nat44_ei_interface_add_del_feature(
3393             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3394         )
3395         self.vapi.nat44_ei_interface_add_del_feature(
3396             sw_if_index=self.pg1.sw_if_index, is_add=1
3397         )
3398         self.vapi.nat44_ei_set_addr_and_port_alloc_alg(
3399             alg=1, psid_offset=6, psid_length=6, psid=10
3400         )
3401
3402         p = (
3403             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3404             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3405             / TCP(sport=4567, dport=22)
3406         )
3407         self.pg0.add_stream(p)
3408         self.pg_enable_capture(self.pg_interfaces)
3409         self.pg_start()
3410         capture = self.pg1.get_capture(1)
3411         p = capture[0]
3412         try:
3413             ip = p[IP]
3414             tcp = p[TCP]
3415             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3416             self.assertEqual(ip.src, self.nat_addr)
3417             self.assertEqual(tcp.dport, 22)
3418             self.assertNotEqual(tcp.sport, 4567)
3419             self.assertEqual((tcp.sport >> 6) & 63, 10)
3420             self.assert_packet_checksums_valid(p)
3421         except:
3422             self.logger.error(ppp("Unexpected or invalid packet:", p))
3423             raise
3424
3425     def test_port_range(self):
3426         """NAT44EI External address port range"""
3427         self.nat44_add_address(self.nat_addr)
3428         flags = self.config_flags.NAT44_EI_IF_INSIDE
3429         self.vapi.nat44_ei_interface_add_del_feature(
3430             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3431         )
3432         self.vapi.nat44_ei_interface_add_del_feature(
3433             sw_if_index=self.pg1.sw_if_index, is_add=1
3434         )
3435         self.vapi.nat44_ei_set_addr_and_port_alloc_alg(
3436             alg=2, start_port=1025, end_port=1027
3437         )
3438
3439         pkts = []
3440         for port in range(0, 5):
3441             p = (
3442                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3443                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3444                 / TCP(sport=1125 + port)
3445             )
3446             pkts.append(p)
3447         self.pg0.add_stream(pkts)
3448         self.pg_enable_capture(self.pg_interfaces)
3449         self.pg_start()
3450         capture = self.pg1.get_capture(3)
3451         for p in capture:
3452             tcp = p[TCP]
3453             self.assertGreaterEqual(tcp.sport, 1025)
3454             self.assertLessEqual(tcp.sport, 1027)
3455
3456     def test_multiple_outside_vrf(self):
3457         """NAT44EI Multiple outside VRF"""
3458         vrf_id1 = 1
3459         vrf_id2 = 2
3460
3461         self.pg1.unconfig_ip4()
3462         self.pg2.unconfig_ip4()
3463         self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1})
3464         self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2})
3465         self.pg1.set_table_ip4(vrf_id1)
3466         self.pg2.set_table_ip4(vrf_id2)
3467         self.pg1.config_ip4()
3468         self.pg2.config_ip4()
3469         self.pg1.resolve_arp()
3470         self.pg2.resolve_arp()
3471
3472         self.nat44_add_address(self.nat_addr)
3473         flags = self.config_flags.NAT44_EI_IF_INSIDE
3474         self.vapi.nat44_ei_interface_add_del_feature(
3475             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3476         )
3477         self.vapi.nat44_ei_interface_add_del_feature(
3478             sw_if_index=self.pg1.sw_if_index, is_add=1
3479         )
3480         self.vapi.nat44_ei_interface_add_del_feature(
3481             sw_if_index=self.pg2.sw_if_index, is_add=1
3482         )
3483
3484         try:
3485             # first VRF
3486             pkts = self.create_stream_in(self.pg0, self.pg1)
3487             self.pg0.add_stream(pkts)
3488             self.pg_enable_capture(self.pg_interfaces)
3489             self.pg_start()
3490             capture = self.pg1.get_capture(len(pkts))
3491             self.verify_capture_out(capture, self.nat_addr)
3492
3493             pkts = self.create_stream_out(self.pg1, self.nat_addr)
3494             self.pg1.add_stream(pkts)
3495             self.pg_enable_capture(self.pg_interfaces)
3496             self.pg_start()
3497             capture = self.pg0.get_capture(len(pkts))
3498             self.verify_capture_in(capture, self.pg0)
3499
3500             self.tcp_port_in = 60303
3501             self.udp_port_in = 60304
3502             self.icmp_id_in = 60305
3503
3504             # second VRF
3505             pkts = self.create_stream_in(self.pg0, self.pg2)
3506             self.pg0.add_stream(pkts)
3507             self.pg_enable_capture(self.pg_interfaces)
3508             self.pg_start()
3509             capture = self.pg2.get_capture(len(pkts))
3510             self.verify_capture_out(capture, self.nat_addr)
3511
3512             pkts = self.create_stream_out(self.pg2, self.nat_addr)
3513             self.pg2.add_stream(pkts)
3514             self.pg_enable_capture(self.pg_interfaces)
3515             self.pg_start()
3516             capture = self.pg0.get_capture(len(pkts))
3517             self.verify_capture_in(capture, self.pg0)
3518
3519         finally:
3520             self.nat44_add_address(self.nat_addr, is_add=0)
3521             self.pg1.unconfig_ip4()
3522             self.pg2.unconfig_ip4()
3523             self.pg1.set_table_ip4(0)
3524             self.pg2.set_table_ip4(0)
3525             self.pg1.config_ip4()
3526             self.pg2.config_ip4()
3527             self.pg1.resolve_arp()
3528             self.pg2.resolve_arp()
3529
3530     def test_mss_clamping(self):
3531         """NAT44EI TCP MSS clamping"""
3532         self.nat44_add_address(self.nat_addr)
3533         flags = self.config_flags.NAT44_EI_IF_INSIDE
3534         self.vapi.nat44_ei_interface_add_del_feature(
3535             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3536         )
3537         self.vapi.nat44_ei_interface_add_del_feature(
3538             sw_if_index=self.pg1.sw_if_index, is_add=1
3539         )
3540
3541         p = (
3542             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3543             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3544             / TCP(
3545                 sport=self.tcp_port_in,
3546                 dport=self.tcp_external_port,
3547                 flags="S",
3548                 options=[("MSS", 1400)],
3549             )
3550         )
3551
3552         self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1000)
3553         self.pg0.add_stream(p)
3554         self.pg_enable_capture(self.pg_interfaces)
3555         self.pg_start()
3556         capture = self.pg1.get_capture(1)
3557         # Negotiated MSS value greater than configured - changed
3558         self.verify_mss_value(capture[0], 1000)
3559
3560         self.vapi.nat44_ei_set_mss_clamping(enable=0, mss_value=1500)
3561         self.pg0.add_stream(p)
3562         self.pg_enable_capture(self.pg_interfaces)
3563         self.pg_start()
3564         capture = self.pg1.get_capture(1)
3565         # MSS clamping disabled - negotiated MSS unchanged
3566         self.verify_mss_value(capture[0], 1400)
3567
3568         self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1500)
3569         self.pg0.add_stream(p)
3570         self.pg_enable_capture(self.pg_interfaces)
3571         self.pg_start()
3572         capture = self.pg1.get_capture(1)
3573         # Negotiated MSS value smaller than configured - unchanged
3574         self.verify_mss_value(capture[0], 1400)
3575
3576     def test_ha_send(self):
3577         """NAT44EI Send HA session synchronization events (active)"""
3578         flags = self.config_flags.NAT44_EI_IF_INSIDE
3579         self.vapi.nat44_ei_interface_add_del_feature(
3580             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3581         )
3582         self.vapi.nat44_ei_interface_add_del_feature(
3583             sw_if_index=self.pg1.sw_if_index, is_add=1
3584         )
3585         self.nat44_add_address(self.nat_addr)
3586
3587         self.vapi.nat44_ei_ha_set_listener(
3588             ip_address=self.pg3.local_ip4, port=12345, path_mtu=512
3589         )
3590         self.vapi.nat44_ei_ha_set_failover(
3591             ip_address=self.pg3.remote_ip4, port=12346, session_refresh_interval=10
3592         )
3593         bind_layers(UDP, HANATStateSync, sport=12345)
3594
3595         # create sessions
3596         pkts = self.create_stream_in(self.pg0, self.pg1)
3597         self.pg0.add_stream(pkts)
3598         self.pg_enable_capture(self.pg_interfaces)
3599         self.pg_start()
3600         capture = self.pg1.get_capture(len(pkts))
3601         self.verify_capture_out(capture)
3602         # active send HA events
3603         self.vapi.nat44_ei_ha_flush()
3604         stats = self.statistics["/nat44-ei/ha/add-event-send"]
3605         self.assertEqual(stats[:, 0].sum(), 3)
3606         capture = self.pg3.get_capture(1)
3607         p = capture[0]
3608         self.assert_packet_checksums_valid(p)
3609         try:
3610             ip = p[IP]
3611             udp = p[UDP]
3612             hanat = p[HANATStateSync]
3613         except IndexError:
3614             self.logger.error(ppp("Invalid packet:", p))
3615             raise
3616         else:
3617             self.assertEqual(ip.src, self.pg3.local_ip4)
3618             self.assertEqual(ip.dst, self.pg3.remote_ip4)
3619             self.assertEqual(udp.sport, 12345)
3620             self.assertEqual(udp.dport, 12346)
3621             self.assertEqual(hanat.version, 1)
3622             # self.assertEqual(hanat.thread_index, 0)
3623             self.assertEqual(hanat.count, 3)
3624             seq = hanat.sequence_number
3625             for event in hanat.events:
3626                 self.assertEqual(event.event_type, 1)
3627                 self.assertEqual(event.in_addr, self.pg0.remote_ip4)
3628                 self.assertEqual(event.out_addr, self.nat_addr)
3629                 self.assertEqual(event.fib_index, 0)
3630
3631         # ACK received events
3632         ack = (
3633             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3634             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3635             / UDP(sport=12346, dport=12345)
3636             / HANATStateSync(
3637                 sequence_number=seq, flags="ACK", thread_index=hanat.thread_index
3638             )
3639         )
3640         self.pg3.add_stream(ack)
3641         self.pg_start()
3642         stats = self.statistics["/nat44-ei/ha/ack-recv"]
3643         self.assertEqual(stats[:, 0].sum(), 1)
3644
3645         # delete one session
3646         self.pg_enable_capture(self.pg_interfaces)
3647         self.vapi.nat44_ei_del_session(
3648             address=self.pg0.remote_ip4,
3649             port=self.tcp_port_in,
3650             protocol=IP_PROTOS.tcp,
3651             flags=self.config_flags.NAT44_EI_IF_INSIDE,
3652         )
3653         self.vapi.nat44_ei_ha_flush()
3654         stats = self.statistics["/nat44-ei/ha/del-event-send"]
3655         self.assertEqual(stats[:, 0].sum(), 1)
3656         capture = self.pg3.get_capture(1)
3657         p = capture[0]
3658         try:
3659             hanat = p[HANATStateSync]
3660         except IndexError:
3661             self.logger.error(ppp("Invalid packet:", p))
3662             raise
3663         else:
3664             self.assertGreater(hanat.sequence_number, seq)
3665
3666         # do not send ACK, active retry send HA event again
3667         self.pg_enable_capture(self.pg_interfaces)
3668         self.virtual_sleep(12)
3669         stats = self.statistics["/nat44-ei/ha/retry-count"]
3670         self.assertEqual(stats[:, 0].sum(), 3)
3671         stats = self.statistics["/nat44-ei/ha/missed-count"]
3672         self.assertEqual(stats[:, 0].sum(), 1)
3673         capture = self.pg3.get_capture(3)
3674         for packet in capture:
3675             self.assertEqual(packet, p)
3676
3677         # session counters refresh
3678         pkts = self.create_stream_out(self.pg1)
3679         self.pg1.add_stream(pkts)
3680         self.pg_enable_capture(self.pg_interfaces)
3681         self.pg_start()
3682         self.pg0.get_capture(2)
3683         self.vapi.nat44_ei_ha_flush()
3684         stats = self.statistics["/nat44-ei/ha/refresh-event-send"]
3685         self.assertEqual(stats[:, 0].sum(), 2)
3686         capture = self.pg3.get_capture(1)
3687         p = capture[0]
3688         self.assert_packet_checksums_valid(p)
3689         try:
3690             ip = p[IP]
3691             udp = p[UDP]
3692             hanat = p[HANATStateSync]
3693         except IndexError:
3694             self.logger.error(ppp("Invalid packet:", p))
3695             raise
3696         else:
3697             self.assertEqual(ip.src, self.pg3.local_ip4)
3698             self.assertEqual(ip.dst, self.pg3.remote_ip4)
3699             self.assertEqual(udp.sport, 12345)
3700             self.assertEqual(udp.dport, 12346)
3701             self.assertEqual(hanat.version, 1)
3702             self.assertEqual(hanat.count, 2)
3703             seq = hanat.sequence_number
3704             for event in hanat.events:
3705                 self.assertEqual(event.event_type, 3)
3706                 self.assertEqual(event.out_addr, self.nat_addr)
3707                 self.assertEqual(event.fib_index, 0)
3708                 self.assertEqual(event.total_pkts, 2)
3709                 self.assertGreater(event.total_bytes, 0)
3710
3711         stats = self.statistics["/nat44-ei/ha/ack-recv"]
3712         ack = (
3713             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3714             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3715             / UDP(sport=12346, dport=12345)
3716             / HANATStateSync(
3717                 sequence_number=seq, flags="ACK", thread_index=hanat.thread_index
3718             )
3719         )
3720         self.pg3.add_stream(ack)
3721         self.pg_start()
3722         stats = self.statistics["/nat44-ei/ha/ack-recv"]
3723         self.assertEqual(stats[:, 0].sum(), 2)
3724
3725     def test_ha_recv(self):
3726         """NAT44EI Receive HA session synchronization events (passive)"""
3727         self.nat44_add_address(self.nat_addr)
3728         flags = self.config_flags.NAT44_EI_IF_INSIDE
3729         self.vapi.nat44_ei_interface_add_del_feature(
3730             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3731         )
3732         self.vapi.nat44_ei_interface_add_del_feature(
3733             sw_if_index=self.pg1.sw_if_index, is_add=1
3734         )
3735         self.vapi.nat44_ei_ha_set_listener(
3736             ip_address=self.pg3.local_ip4, port=12345, path_mtu=512
3737         )
3738         bind_layers(UDP, HANATStateSync, sport=12345)
3739
3740         # this is a bit tricky - HA dictates thread index due to how it's
3741         # designed, but once we use HA to create a session, we also want
3742         # to pass a packet through said session. so the session must end
3743         # up on the correct thread from both directions - in2out (based on
3744         # IP address) and out2in (based on outside port)
3745
3746         # first choose a thread index which is correct for IP
3747         thread_index = get_nat44_ei_in2out_worker_index(
3748             self.pg0.remote_ip4, self.vpp_worker_count
3749         )
3750
3751         # now pick a port which is correct for given thread
3752         port_per_thread = int((0xFFFF - 1024) / max(1, self.vpp_worker_count))
3753         self.tcp_port_out = 1024 + random.randint(1, port_per_thread)
3754         self.udp_port_out = 1024 + random.randint(1, port_per_thread)
3755         if self.vpp_worker_count > 0:
3756             self.tcp_port_out += port_per_thread * (thread_index - 1)
3757             self.udp_port_out += port_per_thread * (thread_index - 1)
3758
3759         # send HA session add events to failover/passive
3760         p = (
3761             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3762             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3763             / UDP(sport=12346, dport=12345)
3764             / HANATStateSync(
3765                 sequence_number=1,
3766                 events=[
3767                     Event(
3768                         event_type="add",
3769                         protocol="tcp",
3770                         in_addr=self.pg0.remote_ip4,
3771                         out_addr=self.nat_addr,
3772                         in_port=self.tcp_port_in,
3773                         out_port=self.tcp_port_out,
3774                         eh_addr=self.pg1.remote_ip4,
3775                         ehn_addr=self.pg1.remote_ip4,
3776                         eh_port=self.tcp_external_port,
3777                         ehn_port=self.tcp_external_port,
3778                         fib_index=0,
3779                     ),
3780                     Event(
3781                         event_type="add",
3782                         protocol="udp",
3783                         in_addr=self.pg0.remote_ip4,
3784                         out_addr=self.nat_addr,
3785                         in_port=self.udp_port_in,
3786                         out_port=self.udp_port_out,
3787                         eh_addr=self.pg1.remote_ip4,
3788                         ehn_addr=self.pg1.remote_ip4,
3789                         eh_port=self.udp_external_port,
3790                         ehn_port=self.udp_external_port,
3791                         fib_index=0,
3792                     ),
3793                 ],
3794                 thread_index=thread_index,
3795             )
3796         )
3797
3798         self.pg3.add_stream(p)
3799         self.pg_enable_capture(self.pg_interfaces)
3800         self.pg_start()
3801         # receive ACK
3802         capture = self.pg3.get_capture(1)
3803         p = capture[0]
3804         try:
3805             hanat = p[HANATStateSync]
3806         except IndexError:
3807             self.logger.error(ppp("Invalid packet:", p))
3808             raise
3809         else:
3810             self.assertEqual(hanat.sequence_number, 1)
3811             self.assertEqual(hanat.flags, "ACK")
3812             self.assertEqual(hanat.version, 1)
3813             self.assertEqual(hanat.thread_index, thread_index)
3814         stats = self.statistics["/nat44-ei/ha/ack-send"]
3815         self.assertEqual(stats[:, 0].sum(), 1)
3816         stats = self.statistics["/nat44-ei/ha/add-event-recv"]
3817         self.assertEqual(stats[:, 0].sum(), 2)
3818         users = self.statistics["/nat44-ei/total-users"]
3819         self.assertEqual(users[:, 0].sum(), 1)
3820         sessions = self.statistics["/nat44-ei/total-sessions"]
3821         self.assertEqual(sessions[:, 0].sum(), 2)
3822         users = self.vapi.nat44_ei_user_dump()
3823         self.assertEqual(len(users), 1)
3824         self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3825         # there should be 2 sessions created by HA
3826         sessions = self.vapi.nat44_ei_user_session_dump(
3827             users[0].ip_address, users[0].vrf_id
3828         )
3829         self.assertEqual(len(sessions), 2)
3830         for session in sessions:
3831             self.assertEqual(str(session.inside_ip_address), self.pg0.remote_ip4)
3832             self.assertEqual(str(session.outside_ip_address), self.nat_addr)
3833             self.assertIn(session.inside_port, [self.tcp_port_in, self.udp_port_in])
3834             self.assertIn(session.outside_port, [self.tcp_port_out, self.udp_port_out])
3835             self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp])
3836
3837         # send HA session delete event to failover/passive
3838         p = (
3839             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3840             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3841             / UDP(sport=12346, dport=12345)
3842             / HANATStateSync(
3843                 sequence_number=2,
3844                 events=[
3845                     Event(
3846                         event_type="del",
3847                         protocol="udp",
3848                         in_addr=self.pg0.remote_ip4,
3849                         out_addr=self.nat_addr,
3850                         in_port=self.udp_port_in,
3851                         out_port=self.udp_port_out,
3852                         eh_addr=self.pg1.remote_ip4,
3853                         ehn_addr=self.pg1.remote_ip4,
3854                         eh_port=self.udp_external_port,
3855                         ehn_port=self.udp_external_port,
3856                         fib_index=0,
3857                     )
3858                 ],
3859                 thread_index=thread_index,
3860             )
3861         )
3862
3863         self.pg3.add_stream(p)
3864         self.pg_enable_capture(self.pg_interfaces)
3865         self.pg_start()
3866         # receive ACK
3867         capture = self.pg3.get_capture(1)
3868         p = capture[0]
3869         try:
3870             hanat = p[HANATStateSync]
3871         except IndexError:
3872             self.logger.error(ppp("Invalid packet:", p))
3873             raise
3874         else:
3875             self.assertEqual(hanat.sequence_number, 2)
3876             self.assertEqual(hanat.flags, "ACK")
3877             self.assertEqual(hanat.version, 1)
3878         users = self.vapi.nat44_ei_user_dump()
3879         self.assertEqual(len(users), 1)
3880         self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3881         # now we should have only 1 session, 1 deleted by HA
3882         sessions = self.vapi.nat44_ei_user_session_dump(
3883             users[0].ip_address, users[0].vrf_id
3884         )
3885         self.assertEqual(len(sessions), 1)
3886         stats = self.statistics["/nat44-ei/ha/del-event-recv"]
3887         self.assertEqual(stats[:, 0].sum(), 1)
3888
3889         stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed")
3890         self.assertEqual(stats, 2)
3891
3892         # send HA session refresh event to failover/passive
3893         p = (
3894             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3895             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3896             / UDP(sport=12346, dport=12345)
3897             / HANATStateSync(
3898                 sequence_number=3,
3899                 events=[
3900                     Event(
3901                         event_type="refresh",
3902                         protocol="tcp",
3903                         in_addr=self.pg0.remote_ip4,
3904                         out_addr=self.nat_addr,
3905                         in_port=self.tcp_port_in,
3906                         out_port=self.tcp_port_out,
3907                         eh_addr=self.pg1.remote_ip4,
3908                         ehn_addr=self.pg1.remote_ip4,
3909                         eh_port=self.tcp_external_port,
3910                         ehn_port=self.tcp_external_port,
3911                         fib_index=0,
3912                         total_bytes=1024,
3913                         total_pkts=2,
3914                     )
3915                 ],
3916                 thread_index=thread_index,
3917             )
3918         )
3919         self.pg3.add_stream(p)
3920         self.pg_enable_capture(self.pg_interfaces)
3921         self.pg_start()
3922         # receive ACK
3923         capture = self.pg3.get_capture(1)
3924         p = capture[0]
3925         try:
3926             hanat = p[HANATStateSync]
3927         except IndexError:
3928             self.logger.error(ppp("Invalid packet:", p))
3929             raise
3930         else:
3931             self.assertEqual(hanat.sequence_number, 3)
3932             self.assertEqual(hanat.flags, "ACK")
3933             self.assertEqual(hanat.version, 1)
3934         users = self.vapi.nat44_ei_user_dump()
3935         self.assertEqual(len(users), 1)
3936         self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3937         sessions = self.vapi.nat44_ei_user_session_dump(
3938             users[0].ip_address, users[0].vrf_id
3939         )
3940         self.assertEqual(len(sessions), 1)
3941         session = sessions[0]
3942         self.assertEqual(session.total_bytes, 1024)
3943         self.assertEqual(session.total_pkts, 2)
3944         stats = self.statistics["/nat44-ei/ha/refresh-event-recv"]
3945         self.assertEqual(stats[:, 0].sum(), 1)
3946
3947         stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed")
3948         self.assertEqual(stats, 3)
3949
3950         # send packet to test session created by HA
3951         p = (
3952             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3953             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3954             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out)
3955         )
3956         self.pg1.add_stream(p)
3957         self.pg_enable_capture(self.pg_interfaces)
3958         self.pg_start()
3959         capture = self.pg0.get_capture(1)
3960         p = capture[0]
3961         try:
3962             ip = p[IP]
3963             tcp = p[TCP]
3964         except IndexError:
3965             self.logger.error(ppp("Invalid packet:", p))
3966             raise
3967         else:
3968             self.assertEqual(ip.src, self.pg1.remote_ip4)
3969             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3970             self.assertEqual(tcp.sport, self.tcp_external_port)
3971             self.assertEqual(tcp.dport, self.tcp_port_in)
3972
3973     def reconfigure_frame_queue_nelts(self, frame_queue_nelts):
3974         self.vapi.nat44_ei_plugin_enable_disable(enable=0)
3975         self.vapi.nat44_ei_set_fq_options(frame_queue_nelts=frame_queue_nelts)
3976         # keep plugin configuration persistent
3977         self.plugin_enable()
3978         return self.vapi.nat44_ei_show_fq_options().frame_queue_nelts
3979
3980     def test_set_frame_queue_nelts(self):
3981         """NAT44EI API test - worker handoff frame queue elements"""
3982         self.assertEqual(self.reconfigure_frame_queue_nelts(512), 512)
3983
3984     def show_commands_at_teardown(self):
3985         self.logger.info(self.vapi.cli("show nat44 ei timeouts"))
3986         self.logger.info(self.vapi.cli("show nat44 ei addresses"))
3987         self.logger.info(self.vapi.cli("show nat44 ei interfaces"))
3988         self.logger.info(self.vapi.cli("show nat44 ei static mappings"))
3989         self.logger.info(self.vapi.cli("show nat44 ei interface address"))
3990         self.logger.info(self.vapi.cli("show nat44 ei sessions detail"))
3991         self.logger.info(self.vapi.cli("show nat44 ei hash tables detail"))
3992         self.logger.info(self.vapi.cli("show nat44 ei ha"))
3993         self.logger.info(self.vapi.cli("show nat44 ei addr-port-assignment-alg"))
3994
3995     def test_outside_address_distribution(self):
3996         """Outside address distribution based on source address"""
3997
3998         x = 100
3999         nat_addresses = []
4000
4001         for i in range(1, x):
4002             a = "10.0.0.%d" % i
4003             nat_addresses.append(a)
4004
4005         flags = self.config_flags.NAT44_EI_IF_INSIDE
4006         self.vapi.nat44_ei_interface_add_del_feature(
4007             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4008         )
4009         self.vapi.nat44_ei_interface_add_del_feature(
4010             sw_if_index=self.pg1.sw_if_index, is_add=1
4011         )
4012
4013         self.vapi.nat44_ei_add_del_address_range(
4014             first_ip_address=nat_addresses[0],
4015             last_ip_address=nat_addresses[-1],
4016             vrf_id=0xFFFFFFFF,
4017             is_add=1,
4018         )
4019
4020         self.pg0.generate_remote_hosts(x)
4021
4022         pkts = []
4023         for i in range(x):
4024             info = self.create_packet_info(self.pg0, self.pg1)
4025             payload = self.info_to_payload(info)
4026             p = (
4027                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4028                 / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4)
4029                 / UDP(sport=7000 + i, dport=8000 + i)
4030                 / Raw(payload)
4031             )
4032             info.data = p
4033             pkts.append(p)
4034
4035         self.pg0.add_stream(pkts)
4036         self.pg_enable_capture(self.pg_interfaces)
4037         self.pg_start()
4038         recvd = self.pg1.get_capture(len(pkts))
4039         for p_recvd in recvd:
4040             payload_info = self.payload_to_info(p_recvd[Raw])
4041             packet_index = payload_info.index
4042             info = self._packet_infos[packet_index]
4043             self.assertTrue(info is not None)
4044             self.assertEqual(packet_index, info.index)
4045             p_sent = info.data
4046             packed = socket.inet_aton(p_sent[IP].src)
4047             numeric = struct.unpack("!L", packed)[0]
4048             numeric = socket.htonl(numeric)
4049             a = nat_addresses[(numeric - 1) % len(nat_addresses)]
4050             self.assertEqual(
4051                 a,
4052                 p_recvd[IP].src,
4053                 "Invalid packet (src IP %s translated to %s, but expected %s)"
4054                 % (p_sent[IP].src, p_recvd[IP].src, a),
4055             )
4056
4057     def test_default_user_sessions(self):
4058         """NAT44EI default per-user session limit is used and reported"""
4059         nat44_ei_config = self.vapi.nat44_ei_show_running_config()
4060         # a nonzero default should be reported for user_sessions
4061         self.assertNotEqual(nat44_ei_config.user_sessions, 0)
4062
4063     def test_delete_interface(self):
4064         """NAT44EI delete nat interface"""
4065
4066         self.nat44_add_address(self.nat_addr)
4067
4068         interfaces = self.create_loopback_interfaces(4)
4069
4070         self.vapi.nat44_ei_interface_add_del_feature(
4071             sw_if_index=interfaces[0].sw_if_index, is_add=1
4072         )
4073         flags = self.config_flags.NAT44_EI_IF_INSIDE
4074         self.vapi.nat44_ei_interface_add_del_feature(
4075             sw_if_index=interfaces[1].sw_if_index, flags=flags, is_add=1
4076         )
4077         flags |= self.config_flags.NAT44_EI_IF_OUTSIDE
4078         self.vapi.nat44_ei_interface_add_del_feature(
4079             sw_if_index=interfaces[2].sw_if_index, flags=flags, is_add=1
4080         )
4081         self.vapi.nat44_ei_add_del_output_interface(
4082             sw_if_index=interfaces[3].sw_if_index, is_add=1
4083         )
4084
4085         nat_sw_if_indices = [
4086             i.sw_if_index
4087             for i in self.vapi.nat44_ei_interface_dump()
4088             + list(self.vapi.vpp.details_iter(self.vapi.nat44_ei_output_interface_get))
4089         ]
4090         self.assertEqual(len(nat_sw_if_indices), len(interfaces))
4091
4092         loopbacks = []
4093         for i in interfaces:
4094             # delete nat-enabled interface
4095             self.assertIn(i.sw_if_index, nat_sw_if_indices)
4096             i.remove_vpp_config()
4097
4098             # create interface with the same index
4099             lo = VppLoInterface(self)
4100             loopbacks.append(lo)
4101             self.assertEqual(lo.sw_if_index, i.sw_if_index)
4102
4103             # check interface is not nat-enabled
4104             nat_sw_if_indices = [
4105                 i.sw_if_index
4106                 for i in self.vapi.nat44_ei_interface_dump()
4107                 + list(
4108                     self.vapi.vpp.details_iter(self.vapi.nat44_ei_output_interface_get)
4109                 )
4110             ]
4111             self.assertNotIn(lo.sw_if_index, nat_sw_if_indices)
4112
4113         for i in loopbacks:
4114             i.remove_vpp_config()
4115
4116
4117 class TestNAT44Out2InDPO(MethodHolder):
4118     """NAT44EI Test Cases using out2in DPO"""
4119
4120     @classmethod
4121     def setUpClass(cls):
4122         super(TestNAT44Out2InDPO, cls).setUpClass()
4123         cls.vapi.cli("set log class nat44-ei level debug")
4124
4125         cls.tcp_port_in = 6303
4126         cls.tcp_port_out = 6303
4127         cls.udp_port_in = 6304
4128         cls.udp_port_out = 6304
4129         cls.icmp_id_in = 6305
4130         cls.icmp_id_out = 6305
4131         cls.nat_addr = "10.0.0.3"
4132         cls.dst_ip4 = "192.168.70.1"
4133
4134         cls.create_pg_interfaces(range(2))
4135
4136         cls.pg0.admin_up()
4137         cls.pg0.config_ip4()
4138         cls.pg0.resolve_arp()
4139
4140         cls.pg1.admin_up()
4141         cls.pg1.config_ip6()
4142         cls.pg1.resolve_ndp()
4143
4144         r1 = VppIpRoute(
4145             cls,
4146             "::",
4147             0,
4148             [VppRoutePath(cls.pg1.remote_ip6, cls.pg1.sw_if_index)],
4149             register=False,
4150         )
4151         r1.add_vpp_config()
4152
4153     def setUp(self):
4154         super(TestNAT44Out2InDPO, self).setUp()
4155         flags = self.config_flags.NAT44_EI_OUT2IN_DPO
4156         self.vapi.nat44_ei_plugin_enable_disable(enable=1, flags=flags)
4157
4158     def tearDown(self):
4159         super(TestNAT44Out2InDPO, self).tearDown()
4160         if not self.vpp_dead:
4161             self.vapi.nat44_ei_plugin_enable_disable(enable=0)
4162             self.vapi.cli("clear logging")
4163
4164     def configure_xlat(self):
4165         self.dst_ip6_pfx = "1:2:3::"
4166         self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.dst_ip6_pfx)
4167         self.dst_ip6_pfx_len = 96
4168         self.src_ip6_pfx = "4:5:6::"
4169         self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.src_ip6_pfx)
4170         self.src_ip6_pfx_len = 96
4171         self.vapi.map_add_domain(
4172             self.dst_ip6_pfx_n,
4173             self.dst_ip6_pfx_len,
4174             self.src_ip6_pfx_n,
4175             self.src_ip6_pfx_len,
4176             "\x00\x00\x00\x00",
4177             0,
4178         )
4179
4180     @unittest.skip("Temporary disabled")
4181     def test_464xlat_ce(self):
4182         """Test 464XLAT CE with NAT44EI"""
4183
4184         self.configure_xlat()
4185
4186         flags = self.config_flags.NAT44_EI_IF_INSIDE
4187         self.vapi.nat44_ei_interface_add_del_feature(
4188             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4189         )
4190         self.vapi.nat44_ei_add_del_address_range(
4191             first_ip_address=self.nat_addr_n,
4192             last_ip_address=self.nat_addr_n,
4193             vrf_id=0xFFFFFFFF,
4194             is_add=1,
4195         )
4196
4197         out_src_ip6 = self.compose_ip6(
4198             self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len
4199         )
4200         out_dst_ip6 = self.compose_ip6(
4201             self.nat_addr, self.src_ip6_pfx, self.src_ip6_pfx_len
4202         )
4203
4204         try:
4205             pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4206             self.pg0.add_stream(pkts)
4207             self.pg_enable_capture(self.pg_interfaces)
4208             self.pg_start()
4209             capture = self.pg1.get_capture(len(pkts))
4210             self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6, dst_ip=out_src_ip6)
4211
4212             pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4213             self.pg1.add_stream(pkts)
4214             self.pg_enable_capture(self.pg_interfaces)
4215             self.pg_start()
4216             capture = self.pg0.get_capture(len(pkts))
4217             self.verify_capture_in(capture, self.pg0)
4218         finally:
4219             self.vapi.nat44_ei_interface_add_del_feature(
4220                 sw_if_index=self.pg0.sw_if_index, flags=flags
4221             )
4222             self.vapi.nat44_ei_add_del_address_range(
4223                 first_ip_address=self.nat_addr_n,
4224                 last_ip_address=self.nat_addr_n,
4225                 vrf_id=0xFFFFFFFF,
4226             )
4227
4228     @unittest.skip("Temporary disabled")
4229     def test_464xlat_ce_no_nat(self):
4230         """Test 464XLAT CE without NAT44EI"""
4231
4232         self.configure_xlat()
4233
4234         out_src_ip6 = self.compose_ip6(
4235             self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len
4236         )
4237         out_dst_ip6 = self.compose_ip6(
4238             self.pg0.remote_ip4, self.src_ip6_pfx, self.src_ip6_pfx_len
4239         )
4240
4241         pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4242         self.pg0.add_stream(pkts)
4243         self.pg_enable_capture(self.pg_interfaces)
4244         self.pg_start()
4245         capture = self.pg1.get_capture(len(pkts))
4246         self.verify_capture_out_ip6(
4247             capture, dst_ip=out_src_ip6, nat_ip=out_dst_ip6, same_port=True
4248         )
4249
4250         pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4251         self.pg1.add_stream(pkts)
4252         self.pg_enable_capture(self.pg_interfaces)
4253         self.pg_start()
4254         capture = self.pg0.get_capture(len(pkts))
4255         self.verify_capture_in(capture, self.pg0)
4256
4257
4258 class TestNAT44EIMW(MethodHolder):
4259     """NAT44EI Test Cases (multiple workers)"""
4260
4261     vpp_worker_count = 2
4262     max_translations = 10240
4263     max_users = 10240
4264
4265     @classmethod
4266     def setUpClass(cls):
4267         super(TestNAT44EIMW, cls).setUpClass()
4268         cls.vapi.cli("set log class nat level debug")
4269
4270         cls.tcp_port_in = 6303
4271         cls.tcp_port_out = 6303
4272         cls.udp_port_in = 6304
4273         cls.udp_port_out = 6304
4274         cls.icmp_id_in = 6305
4275         cls.icmp_id_out = 6305
4276         cls.nat_addr = "10.0.0.3"
4277         cls.ipfix_src_port = 4739
4278         cls.ipfix_domain_id = 1
4279         cls.tcp_external_port = 80
4280         cls.udp_external_port = 69
4281
4282         cls.create_pg_interfaces(range(10))
4283         cls.interfaces = list(cls.pg_interfaces[0:4])
4284
4285         for i in cls.interfaces:
4286             i.admin_up()
4287             i.config_ip4()
4288             i.resolve_arp()
4289
4290         cls.pg0.generate_remote_hosts(3)
4291         cls.pg0.configure_ipv4_neighbors()
4292
4293         cls.pg1.generate_remote_hosts(1)
4294         cls.pg1.configure_ipv4_neighbors()
4295
4296         cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
4297         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10})
4298         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20})
4299
4300         cls.pg4._local_ip4 = "172.16.255.1"
4301         cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
4302         cls.pg4.set_table_ip4(10)
4303         cls.pg5._local_ip4 = "172.17.255.3"
4304         cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
4305         cls.pg5.set_table_ip4(10)
4306         cls.pg6._local_ip4 = "172.16.255.1"
4307         cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
4308         cls.pg6.set_table_ip4(20)
4309         for i in cls.overlapping_interfaces:
4310             i.config_ip4()
4311             i.admin_up()
4312             i.resolve_arp()
4313
4314         cls.pg7.admin_up()
4315         cls.pg8.admin_up()
4316
4317         cls.pg9.generate_remote_hosts(2)
4318         cls.pg9.config_ip4()
4319         cls.vapi.sw_interface_add_del_address(
4320             sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24"
4321         )
4322
4323         cls.pg9.admin_up()
4324         cls.pg9.resolve_arp()
4325         cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
4326         cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
4327         cls.pg9.resolve_arp()
4328
4329     def setUp(self):
4330         super(TestNAT44EIMW, self).setUp()
4331         self.vapi.nat44_ei_plugin_enable_disable(
4332             sessions=self.max_translations, users=self.max_users, enable=1
4333         )
4334
4335     def tearDown(self):
4336         super(TestNAT44EIMW, self).tearDown()
4337         if not self.vpp_dead:
4338             self.vapi.nat44_ei_ipfix_enable_disable(
4339                 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
4340             )
4341             self.ipfix_src_port = 4739
4342             self.ipfix_domain_id = 1
4343
4344             self.vapi.nat44_ei_plugin_enable_disable(enable=0)
4345             self.vapi.cli("clear logging")
4346
4347     def test_hairpinning(self):
4348         """NAT44EI hairpinning - 1:1 NAPT"""
4349
4350         host = self.pg0.remote_hosts[0]
4351         server = self.pg0.remote_hosts[1]
4352         host_in_port = 1234
4353         host_out_port = 0
4354         server_in_port = 5678
4355         server_out_port = 8765
4356         worker_1 = 1
4357         worker_2 = 2
4358
4359         self.nat44_add_address(self.nat_addr)
4360         flags = self.config_flags.NAT44_EI_IF_INSIDE
4361         self.vapi.nat44_ei_interface_add_del_feature(
4362             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4363         )
4364         self.vapi.nat44_ei_interface_add_del_feature(
4365             sw_if_index=self.pg1.sw_if_index, is_add=1
4366         )
4367
4368         # add static mapping for server
4369         self.nat44_add_static_mapping(
4370             server.ip4,
4371             self.nat_addr,
4372             server_in_port,
4373             server_out_port,
4374             proto=IP_PROTOS.tcp,
4375         )
4376
4377         cnt = self.statistics["/nat44-ei/hairpinning"]
4378         # send packet from host to server
4379         p = (
4380             Ether(src=host.mac, dst=self.pg0.local_mac)
4381             / IP(src=host.ip4, dst=self.nat_addr)
4382             / TCP(sport=host_in_port, dport=server_out_port)
4383         )
4384         self.pg0.add_stream(p)
4385         self.pg_enable_capture(self.pg_interfaces)
4386         self.pg_start()
4387         capture = self.pg0.get_capture(1)
4388         p = capture[0]
4389         try:
4390             ip = p[IP]
4391             tcp = p[TCP]
4392             self.assertEqual(ip.src, self.nat_addr)
4393             self.assertEqual(ip.dst, server.ip4)
4394             self.assertNotEqual(tcp.sport, host_in_port)
4395             self.assertEqual(tcp.dport, server_in_port)
4396             self.assert_packet_checksums_valid(p)
4397             host_out_port = tcp.sport
4398         except:
4399             self.logger.error(ppp("Unexpected or invalid packet:", p))
4400             raise
4401
4402         after = self.statistics["/nat44-ei/hairpinning"]
4403
4404         if_idx = self.pg0.sw_if_index
4405         self.assertEqual(after[worker_2][if_idx] - cnt[worker_1][if_idx], 1)
4406
4407         # send reply from server to host
4408         p = (
4409             Ether(src=server.mac, dst=self.pg0.local_mac)
4410             / IP(src=server.ip4, dst=self.nat_addr)
4411             / TCP(sport=server_in_port, dport=host_out_port)
4412         )
4413         self.pg0.add_stream(p)
4414         self.pg_enable_capture(self.pg_interfaces)
4415         self.pg_start()
4416         capture = self.pg0.get_capture(1)
4417         p = capture[0]
4418         try:
4419             ip = p[IP]
4420             tcp = p[TCP]
4421             self.assertEqual(ip.src, self.nat_addr)
4422             self.assertEqual(ip.dst, host.ip4)
4423             self.assertEqual(tcp.sport, server_out_port)
4424             self.assertEqual(tcp.dport, host_in_port)
4425             self.assert_packet_checksums_valid(p)
4426         except:
4427             self.logger.error(ppp("Unexpected or invalid packet:", p))
4428             raise
4429
4430         after = self.statistics["/nat44-ei/hairpinning"]
4431         if_idx = self.pg0.sw_if_index
4432         self.assertEqual(after[worker_1][if_idx] - cnt[worker_1][if_idx], 1)
4433         self.assertEqual(after[worker_2][if_idx] - cnt[worker_2][if_idx], 2)
4434
4435     def test_hairpinning2(self):
4436         """NAT44EI hairpinning - 1:1 NAT"""
4437
4438         server1_nat_ip = "10.0.0.10"
4439         server2_nat_ip = "10.0.0.11"
4440         host = self.pg0.remote_hosts[0]
4441         server1 = self.pg0.remote_hosts[1]
4442         server2 = self.pg0.remote_hosts[2]
4443         server_tcp_port = 22
4444         server_udp_port = 20
4445
4446         self.nat44_add_address(self.nat_addr)
4447         flags = self.config_flags.NAT44_EI_IF_INSIDE
4448         self.vapi.nat44_ei_interface_add_del_feature(
4449             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4450         )
4451         self.vapi.nat44_ei_interface_add_del_feature(
4452             sw_if_index=self.pg1.sw_if_index, is_add=1
4453         )
4454
4455         # add static mapping for servers
4456         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
4457         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
4458
4459         # host to server1
4460         pkts = []
4461         p = (
4462             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4463             / IP(src=host.ip4, dst=server1_nat_ip)
4464             / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
4465         )
4466         pkts.append(p)
4467         p = (
4468             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4469             / IP(src=host.ip4, dst=server1_nat_ip)
4470             / UDP(sport=self.udp_port_in, dport=server_udp_port)
4471         )
4472         pkts.append(p)
4473         p = (
4474             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4475             / IP(src=host.ip4, dst=server1_nat_ip)
4476             / ICMP(id=self.icmp_id_in, type="echo-request")
4477         )
4478         pkts.append(p)
4479         self.pg0.add_stream(pkts)
4480         self.pg_enable_capture(self.pg_interfaces)
4481         self.pg_start()
4482         capture = self.pg0.get_capture(len(pkts))
4483         for packet in capture:
4484             try:
4485                 self.assertEqual(packet[IP].src, self.nat_addr)
4486                 self.assertEqual(packet[IP].dst, server1.ip4)
4487                 if packet.haslayer(TCP):
4488                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
4489                     self.assertEqual(packet[TCP].dport, server_tcp_port)
4490                     self.tcp_port_out = packet[TCP].sport
4491                     self.assert_packet_checksums_valid(packet)
4492                 elif packet.haslayer(UDP):
4493                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
4494                     self.assertEqual(packet[UDP].dport, server_udp_port)
4495                     self.udp_port_out = packet[UDP].sport
4496                 else:
4497                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
4498                     self.icmp_id_out = packet[ICMP].id
4499             except:
4500                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4501                 raise
4502
4503         # server1 to host
4504         pkts = []
4505         p = (
4506             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4507             / IP(src=server1.ip4, dst=self.nat_addr)
4508             / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
4509         )
4510         pkts.append(p)
4511         p = (
4512             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4513             / IP(src=server1.ip4, dst=self.nat_addr)
4514             / UDP(sport=server_udp_port, dport=self.udp_port_out)
4515         )
4516         pkts.append(p)
4517         p = (
4518             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4519             / IP(src=server1.ip4, dst=self.nat_addr)
4520             / ICMP(id=self.icmp_id_out, type="echo-reply")
4521         )
4522         pkts.append(p)
4523         self.pg0.add_stream(pkts)
4524         self.pg_enable_capture(self.pg_interfaces)
4525         self.pg_start()
4526         capture = self.pg0.get_capture(len(pkts))
4527         for packet in capture:
4528             try:
4529                 self.assertEqual(packet[IP].src, server1_nat_ip)
4530                 self.assertEqual(packet[IP].dst, host.ip4)
4531                 if packet.haslayer(TCP):
4532                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
4533                     self.assertEqual(packet[TCP].sport, server_tcp_port)
4534                     self.assert_packet_checksums_valid(packet)
4535                 elif packet.haslayer(UDP):
4536                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
4537                     self.assertEqual(packet[UDP].sport, server_udp_port)
4538                 else:
4539                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4540             except:
4541                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4542                 raise
4543
4544         # server2 to server1
4545         pkts = []
4546         p = (
4547             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4548             / IP(src=server2.ip4, dst=server1_nat_ip)
4549             / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
4550         )
4551         pkts.append(p)
4552         p = (
4553             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4554             / IP(src=server2.ip4, dst=server1_nat_ip)
4555             / UDP(sport=self.udp_port_in, dport=server_udp_port)
4556         )
4557         pkts.append(p)
4558         p = (
4559             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4560             / IP(src=server2.ip4, dst=server1_nat_ip)
4561             / ICMP(id=self.icmp_id_in, type="echo-request")
4562         )
4563         pkts.append(p)
4564         self.pg0.add_stream(pkts)
4565         self.pg_enable_capture(self.pg_interfaces)
4566         self.pg_start()
4567         capture = self.pg0.get_capture(len(pkts))
4568         for packet in capture:
4569             try:
4570                 self.assertEqual(packet[IP].src, server2_nat_ip)
4571                 self.assertEqual(packet[IP].dst, server1.ip4)
4572                 if packet.haslayer(TCP):
4573                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
4574                     self.assertEqual(packet[TCP].dport, server_tcp_port)
4575                     self.tcp_port_out = packet[TCP].sport
4576                     self.assert_packet_checksums_valid(packet)
4577                 elif packet.haslayer(UDP):
4578                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
4579                     self.assertEqual(packet[UDP].dport, server_udp_port)
4580                     self.udp_port_out = packet[UDP].sport
4581                 else:
4582                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4583                     self.icmp_id_out = packet[ICMP].id
4584             except:
4585                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4586                 raise
4587
4588         # server1 to server2
4589         pkts = []
4590         p = (
4591             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4592             / IP(src=server1.ip4, dst=server2_nat_ip)
4593             / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
4594         )
4595         pkts.append(p)
4596         p = (
4597             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4598             / IP(src=server1.ip4, dst=server2_nat_ip)
4599             / UDP(sport=server_udp_port, dport=self.udp_port_out)
4600         )
4601         pkts.append(p)
4602         p = (
4603             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4604             / IP(src=server1.ip4, dst=server2_nat_ip)
4605             / ICMP(id=self.icmp_id_out, type="echo-reply")
4606         )
4607         pkts.append(p)
4608         self.pg0.add_stream(pkts)
4609         self.pg_enable_capture(self.pg_interfaces)
4610         self.pg_start()
4611         capture = self.pg0.get_capture(len(pkts))
4612         for packet in capture:
4613             try:
4614                 self.assertEqual(packet[IP].src, server1_nat_ip)
4615                 self.assertEqual(packet[IP].dst, server2.ip4)
4616                 if packet.haslayer(TCP):
4617                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
4618                     self.assertEqual(packet[TCP].sport, server_tcp_port)
4619                     self.assert_packet_checksums_valid(packet)
4620                 elif packet.haslayer(UDP):
4621                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
4622                     self.assertEqual(packet[UDP].sport, server_udp_port)
4623                 else:
4624                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4625             except:
4626                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4627                 raise
4628
4629
4630 if __name__ == "__main__":
4631     unittest.main(testRunner=VppTestRunner)