tests: replace pycodestyle with black
[vpp.git] / test / test_nat44_ei.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import random
5 import socket
6 import struct
7 import unittest
8 from io import BytesIO
9
10 import scapy.compat
11 from framework import VppTestCase, VppTestRunner
12 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
13 from scapy.all import (
14     bind_layers,
15     Packet,
16     ByteEnumField,
17     ShortField,
18     IPField,
19     IntField,
20     LongField,
21     XByteField,
22     FlagsField,
23     FieldLenField,
24     PacketListField,
25 )
26 from scapy.data import IP_PROTOS
27 from scapy.layers.inet import IP, TCP, UDP, ICMP
28 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
29 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
30 from scapy.layers.l2 import Ether, ARP, GRE
31 from scapy.packet import Raw
32 from syslog_rfc5424_parser import SyslogMessage, ParseError
33 from syslog_rfc5424_parser.constants import SyslogSeverity
34 from util import ppp
35 from vpp_ip_route import VppIpRoute, VppRoutePath
36 from vpp_neighbor import VppNeighbor
37 from vpp_papi import VppEnum
38
39
40 # NAT HA protocol event data
41 class Event(Packet):
42     name = "Event"
43     fields_desc = [
44         ByteEnumField("event_type", None, {1: "add", 2: "del", 3: "refresh"}),
45         ByteEnumField("protocol", None, {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}),
46         ShortField("flags", 0),
47         IPField("in_addr", None),
48         IPField("out_addr", None),
49         ShortField("in_port", None),
50         ShortField("out_port", None),
51         IPField("eh_addr", None),
52         IPField("ehn_addr", None),
53         ShortField("eh_port", None),
54         ShortField("ehn_port", None),
55         IntField("fib_index", None),
56         IntField("total_pkts", 0),
57         LongField("total_bytes", 0),
58     ]
59
60     def extract_padding(self, s):
61         return "", s
62
63
64 # NAT HA protocol header
65 class HANATStateSync(Packet):
66     name = "HA NAT state sync"
67     fields_desc = [
68         XByteField("version", 1),
69         FlagsField("flags", 0, 8, ["ACK"]),
70         FieldLenField("count", None, count_of="events"),
71         IntField("sequence_number", 1),
72         IntField("thread_index", 0),
73         PacketListField("events", [], Event, count_from=lambda pkt: pkt.count),
74     ]
75
76
77 class MethodHolder(VppTestCase):
78     """NAT create capture and verify method holder"""
79
80     @property
81     def config_flags(self):
82         return VppEnum.vl_api_nat44_ei_config_flags_t
83
84     @property
85     def SYSLOG_SEVERITY(self):
86         return VppEnum.vl_api_syslog_severity_t
87
88     def nat44_add_static_mapping(
89         self,
90         local_ip,
91         external_ip="0.0.0.0",
92         local_port=0,
93         external_port=0,
94         vrf_id=0,
95         is_add=1,
96         external_sw_if_index=0xFFFFFFFF,
97         proto=0,
98         tag="",
99         flags=0,
100     ):
101         """
102         Add/delete NAT44EI static mapping
103
104         :param local_ip: Local IP address
105         :param external_ip: External IP address
106         :param local_port: Local port number (Optional)
107         :param external_port: External port number (Optional)
108         :param vrf_id: VRF ID (Default 0)
109         :param is_add: 1 if add, 0 if delete (Default add)
110         :param external_sw_if_index: External interface instead of IP address
111         :param proto: IP protocol (Mandatory if port specified)
112         :param tag: Opaque string tag
113         :param flags: NAT configuration flags
114         """
115
116         if not (local_port and external_port):
117             flags |= self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
118
119         self.vapi.nat44_ei_add_del_static_mapping(
120             is_add=is_add,
121             local_ip_address=local_ip,
122             external_ip_address=external_ip,
123             external_sw_if_index=external_sw_if_index,
124             local_port=local_port,
125             external_port=external_port,
126             vrf_id=vrf_id,
127             protocol=proto,
128             flags=flags,
129             tag=tag,
130         )
131
132     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
133         """
134         Add/delete NAT44EI address
135
136         :param ip: IP address
137         :param is_add: 1 if add, 0 if delete (Default add)
138         """
139         self.vapi.nat44_ei_add_del_address_range(
140             first_ip_address=ip, last_ip_address=ip, vrf_id=vrf_id, is_add=is_add
141         )
142
143     def create_routes_and_neigbors(self):
144         r1 = VppIpRoute(
145             self,
146             self.pg7.remote_ip4,
147             32,
148             [VppRoutePath(self.pg7.remote_ip4, self.pg7.sw_if_index)],
149         )
150         r2 = VppIpRoute(
151             self,
152             self.pg8.remote_ip4,
153             32,
154             [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)],
155         )
156         r1.add_vpp_config()
157         r2.add_vpp_config()
158
159         n1 = VppNeighbor(
160             self,
161             self.pg7.sw_if_index,
162             self.pg7.remote_mac,
163             self.pg7.remote_ip4,
164             is_static=1,
165         )
166         n2 = VppNeighbor(
167             self,
168             self.pg8.sw_if_index,
169             self.pg8.remote_mac,
170             self.pg8.remote_ip4,
171             is_static=1,
172         )
173         n1.add_vpp_config()
174         n2.add_vpp_config()
175
176     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
177         """
178         Create packet stream for inside network
179
180         :param in_if: Inside interface
181         :param out_if: Outside interface
182         :param dst_ip: Destination address
183         :param ttl: TTL of generated packets
184         """
185         if dst_ip is None:
186             dst_ip = out_if.remote_ip4
187
188         pkts = []
189         # TCP
190         p = (
191             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
192             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
193             / TCP(sport=self.tcp_port_in, dport=20)
194         )
195         pkts.extend([p, p])
196
197         # UDP
198         p = (
199             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
200             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
201             / UDP(sport=self.udp_port_in, dport=20)
202         )
203         pkts.append(p)
204
205         # ICMP
206         p = (
207             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
208             / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
209             / ICMP(id=self.icmp_id_in, type="echo-request")
210         )
211         pkts.append(p)
212
213         return pkts
214
215     def compose_ip6(self, ip4, pref, plen):
216         """
217         Compose IPv4-embedded IPv6 addresses
218
219         :param ip4: IPv4 address
220         :param pref: IPv6 prefix
221         :param plen: IPv6 prefix length
222         :returns: IPv4-embedded IPv6 addresses
223         """
224         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
225         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
226         if plen == 32:
227             pref_n[4] = ip4_n[0]
228             pref_n[5] = ip4_n[1]
229             pref_n[6] = ip4_n[2]
230             pref_n[7] = ip4_n[3]
231         elif plen == 40:
232             pref_n[5] = ip4_n[0]
233             pref_n[6] = ip4_n[1]
234             pref_n[7] = ip4_n[2]
235             pref_n[9] = ip4_n[3]
236         elif plen == 48:
237             pref_n[6] = ip4_n[0]
238             pref_n[7] = ip4_n[1]
239             pref_n[9] = ip4_n[2]
240             pref_n[10] = ip4_n[3]
241         elif plen == 56:
242             pref_n[7] = ip4_n[0]
243             pref_n[9] = ip4_n[1]
244             pref_n[10] = ip4_n[2]
245             pref_n[11] = ip4_n[3]
246         elif plen == 64:
247             pref_n[9] = ip4_n[0]
248             pref_n[10] = ip4_n[1]
249             pref_n[11] = ip4_n[2]
250             pref_n[12] = ip4_n[3]
251         elif plen == 96:
252             pref_n[12] = ip4_n[0]
253             pref_n[13] = ip4_n[1]
254             pref_n[14] = ip4_n[2]
255             pref_n[15] = ip4_n[3]
256         packed_pref_n = b"".join([scapy.compat.chb(x) for x in pref_n])
257         return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
258
259     def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
260         """
261         Create packet stream for outside network
262
263         :param out_if: Outside interface
264         :param dst_ip: Destination IP address (Default use global NAT address)
265         :param ttl: TTL of generated packets
266         :param use_inside_ports: Use inside NAT ports as destination ports
267                instead of outside ports
268         """
269         if dst_ip is None:
270             dst_ip = self.nat_addr
271         if not use_inside_ports:
272             tcp_port = self.tcp_port_out
273             udp_port = self.udp_port_out
274             icmp_id = self.icmp_id_out
275         else:
276             tcp_port = self.tcp_port_in
277             udp_port = self.udp_port_in
278             icmp_id = self.icmp_id_in
279         pkts = []
280         # TCP
281         p = (
282             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
283             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
284             / TCP(dport=tcp_port, sport=20)
285         )
286         pkts.extend([p, p])
287
288         # UDP
289         p = (
290             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
291             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
292             / UDP(dport=udp_port, sport=20)
293         )
294         pkts.append(p)
295
296         # ICMP
297         p = (
298             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
299             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
300             / ICMP(id=icmp_id, type="echo-reply")
301         )
302         pkts.append(p)
303
304         return pkts
305
306     def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
307         """
308         Create packet stream for outside network
309
310         :param out_if: Outside interface
311         :param dst_ip: Destination IP address (Default use global NAT address)
312         :param hl: HL of generated packets
313         """
314         pkts = []
315         # TCP
316         p = (
317             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
318             / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
319             / TCP(dport=self.tcp_port_out, sport=20)
320         )
321         pkts.append(p)
322
323         # UDP
324         p = (
325             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
326             / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
327             / UDP(dport=self.udp_port_out, sport=20)
328         )
329         pkts.append(p)
330
331         # ICMP
332         p = (
333             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
334             / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
335             / ICMPv6EchoReply(id=self.icmp_id_out)
336         )
337         pkts.append(p)
338
339         return pkts
340
341     def verify_capture_out(
342         self,
343         capture,
344         nat_ip=None,
345         same_port=False,
346         dst_ip=None,
347         is_ip6=False,
348         ignore_port=False,
349     ):
350         """
351         Verify captured packets on outside network
352
353         :param capture: Captured packets
354         :param nat_ip: Translated IP address (Default use global NAT address)
355         :param same_port: Source port number is not translated (Default False)
356         :param dst_ip: Destination IP address (Default do not verify)
357         :param is_ip6: If L3 protocol is IPv6 (Default False)
358         """
359         if is_ip6:
360             IP46 = IPv6
361             ICMP46 = ICMPv6EchoRequest
362         else:
363             IP46 = IP
364             ICMP46 = ICMP
365         if nat_ip is None:
366             nat_ip = self.nat_addr
367         for packet in capture:
368             try:
369                 if not is_ip6:
370                     self.assert_packet_checksums_valid(packet)
371                 self.assertEqual(packet[IP46].src, nat_ip)
372                 if dst_ip is not None:
373                     self.assertEqual(packet[IP46].dst, dst_ip)
374                 if packet.haslayer(TCP):
375                     if not ignore_port:
376                         if same_port:
377                             self.assertEqual(packet[TCP].sport, self.tcp_port_in)
378                         else:
379                             self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
380                     self.tcp_port_out = packet[TCP].sport
381                     self.assert_packet_checksums_valid(packet)
382                 elif packet.haslayer(UDP):
383                     if not ignore_port:
384                         if same_port:
385                             self.assertEqual(packet[UDP].sport, self.udp_port_in)
386                         else:
387                             self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
388                     self.udp_port_out = packet[UDP].sport
389                 else:
390                     if not ignore_port:
391                         if same_port:
392                             self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
393                         else:
394                             self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
395                     self.icmp_id_out = packet[ICMP46].id
396                     self.assert_packet_checksums_valid(packet)
397             except:
398                 self.logger.error(
399                     ppp("Unexpected or invalid packet (outside network):", packet)
400                 )
401                 raise
402
403     def verify_capture_out_ip6(self, capture, nat_ip, same_port=False, dst_ip=None):
404         """
405         Verify captured packets on outside network
406
407         :param capture: Captured packets
408         :param nat_ip: Translated IP address
409         :param same_port: Source port number is not translated (Default False)
410         :param dst_ip: Destination IP address (Default do not verify)
411         """
412         return self.verify_capture_out(capture, nat_ip, same_port, dst_ip, True)
413
414     def verify_capture_in(self, capture, in_if):
415         """
416         Verify captured packets on inside network
417
418         :param capture: Captured packets
419         :param in_if: Inside interface
420         """
421         for packet in capture:
422             try:
423                 self.assert_packet_checksums_valid(packet)
424                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
425                 if packet.haslayer(TCP):
426                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
427                 elif packet.haslayer(UDP):
428                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
429                 else:
430                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
431             except:
432                 self.logger.error(
433                     ppp("Unexpected or invalid packet (inside network):", packet)
434                 )
435                 raise
436
437     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
438         """
439         Verify captured packet that don't have to be translated
440
441         :param capture: Captured packets
442         :param ingress_if: Ingress interface
443         :param egress_if: Egress interface
444         """
445         for packet in capture:
446             try:
447                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
448                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
449                 if packet.haslayer(TCP):
450                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
451                 elif packet.haslayer(UDP):
452                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
453                 else:
454                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
455             except:
456                 self.logger.error(
457                     ppp("Unexpected or invalid packet (inside network):", packet)
458                 )
459                 raise
460
461     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None, icmp_type=11):
462         """
463         Verify captured packets with ICMP errors on outside network
464
465         :param capture: Captured packets
466         :param src_ip: Translated IP address or IP address of VPP
467                        (Default use global NAT address)
468         :param icmp_type: Type of error ICMP packet
469                           we are expecting (Default 11)
470         """
471         if src_ip is None:
472             src_ip = self.nat_addr
473         for packet in capture:
474             try:
475                 self.assertEqual(packet[IP].src, src_ip)
476                 self.assertEqual(packet.haslayer(ICMP), 1)
477                 icmp = packet[ICMP]
478                 self.assertEqual(icmp.type, icmp_type)
479                 self.assertTrue(icmp.haslayer(IPerror))
480                 inner_ip = icmp[IPerror]
481                 if inner_ip.haslayer(TCPerror):
482                     self.assertEqual(inner_ip[TCPerror].dport, self.tcp_port_out)
483                 elif inner_ip.haslayer(UDPerror):
484                     self.assertEqual(inner_ip[UDPerror].dport, self.udp_port_out)
485                 else:
486                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
487             except:
488                 self.logger.error(
489                     ppp("Unexpected or invalid packet (outside network):", packet)
490                 )
491                 raise
492
493     def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
494         """
495         Verify captured packets with ICMP errors on inside network
496
497         :param capture: Captured packets
498         :param in_if: Inside interface
499         :param icmp_type: Type of error ICMP packet
500                           we are expecting (Default 11)
501         """
502         for packet in capture:
503             try:
504                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
505                 self.assertEqual(packet.haslayer(ICMP), 1)
506                 icmp = packet[ICMP]
507                 self.assertEqual(icmp.type, icmp_type)
508                 self.assertTrue(icmp.haslayer(IPerror))
509                 inner_ip = icmp[IPerror]
510                 if inner_ip.haslayer(TCPerror):
511                     self.assertEqual(inner_ip[TCPerror].sport, self.tcp_port_in)
512                 elif inner_ip.haslayer(UDPerror):
513                     self.assertEqual(inner_ip[UDPerror].sport, self.udp_port_in)
514                 else:
515                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
516             except:
517                 self.logger.error(
518                     ppp("Unexpected or invalid packet (inside network):", packet)
519                 )
520                 raise
521
522     def create_stream_frag(
523         self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
524     ):
525         """
526         Create fragmented packet stream
527
528         :param src_if: Source interface
529         :param dst: Destination IPv4 address
530         :param sport: Source port
531         :param dport: Destination port
532         :param data: Payload data
533         :param proto: protocol (TCP, UDP, ICMP)
534         :param echo_reply: use echo_reply if protocol is ICMP
535         :returns: Fragments
536         """
537         if proto == IP_PROTOS.tcp:
538             p = (
539                 IP(src=src_if.remote_ip4, dst=dst)
540                 / TCP(sport=sport, dport=dport)
541                 / Raw(data)
542             )
543             p = p.__class__(scapy.compat.raw(p))
544             chksum = p[TCP].chksum
545             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
546         elif proto == IP_PROTOS.udp:
547             proto_header = UDP(sport=sport, dport=dport)
548         elif proto == IP_PROTOS.icmp:
549             if not echo_reply:
550                 proto_header = ICMP(id=sport, type="echo-request")
551             else:
552                 proto_header = ICMP(id=sport, type="echo-reply")
553         else:
554             raise Exception("Unsupported protocol")
555         id = random.randint(0, 65535)
556         pkts = []
557         if proto == IP_PROTOS.tcp:
558             raw = Raw(data[0:4])
559         else:
560             raw = Raw(data[0:16])
561         p = (
562             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
563             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
564             / proto_header
565             / raw
566         )
567         pkts.append(p)
568         if proto == IP_PROTOS.tcp:
569             raw = Raw(data[4:20])
570         else:
571             raw = Raw(data[16:32])
572         p = (
573             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
574             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
575             / raw
576         )
577         pkts.append(p)
578         if proto == IP_PROTOS.tcp:
579             raw = Raw(data[20:])
580         else:
581             raw = Raw(data[32:])
582         p = (
583             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
584             / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
585             / raw
586         )
587         pkts.append(p)
588         return pkts
589
590     def reass_frags_and_verify(self, frags, src, dst):
591         """
592         Reassemble and verify fragmented packet
593
594         :param frags: Captured fragments
595         :param src: Source IPv4 address to verify
596         :param dst: Destination IPv4 address to verify
597
598         :returns: Reassembled IPv4 packet
599         """
600         buffer = BytesIO()
601         for p in frags:
602             self.assertEqual(p[IP].src, src)
603             self.assertEqual(p[IP].dst, dst)
604             self.assert_ip_checksum_valid(p)
605             buffer.seek(p[IP].frag * 8)
606             buffer.write(bytes(p[IP].payload))
607         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
608         if ip.proto == IP_PROTOS.tcp:
609             p = ip / TCP(buffer.getvalue())
610             self.logger.debug(ppp("Reassembled:", p))
611             self.assert_tcp_checksum_valid(p)
612         elif ip.proto == IP_PROTOS.udp:
613             p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
614         elif ip.proto == IP_PROTOS.icmp:
615             p = ip / ICMP(buffer.getvalue())
616         return p
617
618     def verify_ipfix_nat44_ses(self, data):
619         """
620         Verify IPFIX NAT44EI session create/delete event
621
622         :param data: Decoded IPFIX data records
623         """
624         nat44_ses_create_num = 0
625         nat44_ses_delete_num = 0
626         self.assertEqual(6, len(data))
627         for record in data:
628             # natEvent
629             self.assertIn(scapy.compat.orb(record[230]), [4, 5])
630             if scapy.compat.orb(record[230]) == 4:
631                 nat44_ses_create_num += 1
632             else:
633                 nat44_ses_delete_num += 1
634             # sourceIPv4Address
635             self.assertEqual(self.pg0.remote_ip4, str(ipaddress.IPv4Address(record[8])))
636             # postNATSourceIPv4Address
637             self.assertEqual(
638                 socket.inet_pton(socket.AF_INET, self.nat_addr), record[225]
639             )
640             # ingressVRFID
641             self.assertEqual(struct.pack("!I", 0), record[234])
642             # protocolIdentifier/sourceTransportPort
643             # /postNAPTSourceTransportPort
644             if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
645                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
646                 self.assertEqual(struct.pack("!H", self.icmp_id_out), record[227])
647             elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
648                 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
649                 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
650             elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
651                 self.assertEqual(struct.pack("!H", self.udp_port_in), record[7])
652                 self.assertEqual(struct.pack("!H", self.udp_port_out), record[227])
653             else:
654                 self.fail(f"Invalid protocol {scapy.compat.orb(record[4])}")
655         self.assertEqual(3, nat44_ses_create_num)
656         self.assertEqual(3, nat44_ses_delete_num)
657
658     def verify_ipfix_addr_exhausted(self, data):
659         self.assertEqual(1, len(data))
660         record = data[0]
661         # natEvent
662         self.assertEqual(scapy.compat.orb(record[230]), 3)
663         # natPoolID
664         self.assertEqual(struct.pack("!I", 0), record[283])
665
666     def verify_ipfix_max_sessions(self, data, limit):
667         self.assertEqual(1, len(data))
668         record = data[0]
669         # natEvent
670         self.assertEqual(scapy.compat.orb(record[230]), 13)
671         # natQuotaExceededEvent
672         self.assertEqual(struct.pack("!I", 1), record[466])
673         # maxSessionEntries
674         self.assertEqual(struct.pack("!I", limit), record[471])
675
676     def verify_no_nat44_user(self):
677         """Verify that there is no NAT44EI user"""
678         users = self.vapi.nat44_ei_user_dump()
679         self.assertEqual(len(users), 0)
680         users = self.statistics["/nat44-ei/total-users"]
681         self.assertEqual(users[0][0], 0)
682         sessions = self.statistics["/nat44-ei/total-sessions"]
683         self.assertEqual(sessions[0][0], 0)
684
685     def verify_syslog_apmap(self, data, is_add=True):
686         message = data.decode("utf-8")
687         try:
688             message = SyslogMessage.parse(message)
689         except ParseError as e:
690             self.logger.error(e)
691             raise
692         else:
693             self.assertEqual(message.severity, SyslogSeverity.info)
694             self.assertEqual(message.appname, "NAT")
695             self.assertEqual(message.msgid, "APMADD" if is_add else "APMDEL")
696             sd_params = message.sd.get("napmap")
697             self.assertTrue(sd_params is not None)
698             self.assertEqual(sd_params.get("IATYP"), "IPv4")
699             self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
700             self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
701             self.assertEqual(sd_params.get("XATYP"), "IPv4")
702             self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
703             self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
704             self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
705             self.assertTrue(sd_params.get("SSUBIX") is not None)
706             self.assertEqual(sd_params.get("SVLAN"), "0")
707
708     def verify_mss_value(self, pkt, mss):
709         if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
710             raise TypeError("Not a TCP/IP packet")
711
712         for option in pkt[TCP].options:
713             if option[0] == "MSS":
714                 self.assertEqual(option[1], mss)
715                 self.assert_tcp_checksum_valid(pkt)
716
717     @staticmethod
718     def proto2layer(proto):
719         if proto == IP_PROTOS.tcp:
720             return TCP
721         elif proto == IP_PROTOS.udp:
722             return UDP
723         elif proto == IP_PROTOS.icmp:
724             return ICMP
725         else:
726             raise Exception("Unsupported protocol")
727
728     def frag_in_order(
729         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
730     ):
731         layer = self.proto2layer(proto)
732
733         if proto == IP_PROTOS.tcp:
734             data = b"A" * 4 + b"B" * 16 + b"C" * 3
735         else:
736             data = b"A" * 16 + b"B" * 16 + b"C" * 3
737         self.port_in = random.randint(1025, 65535)
738
739         # in2out
740         pkts = self.create_stream_frag(
741             self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
742         )
743         self.pg0.add_stream(pkts)
744         self.pg_enable_capture(self.pg_interfaces)
745         self.pg_start()
746         frags = self.pg1.get_capture(len(pkts))
747         if not dont_translate:
748             p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
749         else:
750             p = self.reass_frags_and_verify(
751                 frags, self.pg0.remote_ip4, self.pg1.remote_ip4
752             )
753         if proto != IP_PROTOS.icmp:
754             if not dont_translate:
755                 self.assertEqual(p[layer].dport, 20)
756                 if not ignore_port:
757                     self.assertNotEqual(p[layer].sport, self.port_in)
758             else:
759                 self.assertEqual(p[layer].sport, self.port_in)
760         else:
761             if not ignore_port:
762                 if not dont_translate:
763                     self.assertNotEqual(p[layer].id, self.port_in)
764                 else:
765                     self.assertEqual(p[layer].id, self.port_in)
766         self.assertEqual(data, p[Raw].load)
767
768         # out2in
769         if not dont_translate:
770             dst_addr = self.nat_addr
771         else:
772             dst_addr = self.pg0.remote_ip4
773         if proto != IP_PROTOS.icmp:
774             sport = 20
775             dport = p[layer].sport
776         else:
777             sport = p[layer].id
778             dport = 0
779         pkts = self.create_stream_frag(
780             self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
781         )
782         self.pg1.add_stream(pkts)
783         self.pg_enable_capture(self.pg_interfaces)
784         self.pg_start()
785         frags = self.pg0.get_capture(len(pkts))
786         p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
787         if proto != IP_PROTOS.icmp:
788             self.assertEqual(p[layer].sport, 20)
789             self.assertEqual(p[layer].dport, self.port_in)
790         else:
791             self.assertEqual(p[layer].id, self.port_in)
792         self.assertEqual(data, p[Raw].load)
793
794     def reass_hairpinning(
795         self,
796         server_addr,
797         server_in_port,
798         server_out_port,
799         host_in_port,
800         proto=IP_PROTOS.tcp,
801         ignore_port=False,
802     ):
803
804         layer = self.proto2layer(proto)
805
806         if proto == IP_PROTOS.tcp:
807             data = b"A" * 4 + b"B" * 16 + b"C" * 3
808         else:
809             data = b"A" * 16 + b"B" * 16 + b"C" * 3
810
811         # send packet from host to server
812         pkts = self.create_stream_frag(
813             self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto
814         )
815         self.pg0.add_stream(pkts)
816         self.pg_enable_capture(self.pg_interfaces)
817         self.pg_start()
818         frags = self.pg0.get_capture(len(pkts))
819         p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr)
820         if proto != IP_PROTOS.icmp:
821             if not ignore_port:
822                 self.assertNotEqual(p[layer].sport, host_in_port)
823             self.assertEqual(p[layer].dport, server_in_port)
824         else:
825             if not ignore_port:
826                 self.assertNotEqual(p[layer].id, host_in_port)
827         self.assertEqual(data, p[Raw].load)
828
829     def frag_out_of_order(
830         self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
831     ):
832         layer = self.proto2layer(proto)
833
834         if proto == IP_PROTOS.tcp:
835             data = b"A" * 4 + b"B" * 16 + b"C" * 3
836         else:
837             data = b"A" * 16 + b"B" * 16 + b"C" * 3
838         self.port_in = random.randint(1025, 65535)
839
840         for i in range(2):
841             # in2out
842             pkts = self.create_stream_frag(
843                 self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
844             )
845             pkts.reverse()
846             self.pg0.add_stream(pkts)
847             self.pg_enable_capture(self.pg_interfaces)
848             self.pg_start()
849             frags = self.pg1.get_capture(len(pkts))
850             if not dont_translate:
851                 p = self.reass_frags_and_verify(
852                     frags, self.nat_addr, self.pg1.remote_ip4
853                 )
854             else:
855                 p = self.reass_frags_and_verify(
856                     frags, self.pg0.remote_ip4, self.pg1.remote_ip4
857                 )
858             if proto != IP_PROTOS.icmp:
859                 if not dont_translate:
860                     self.assertEqual(p[layer].dport, 20)
861                     if not ignore_port:
862                         self.assertNotEqual(p[layer].sport, self.port_in)
863                 else:
864                     self.assertEqual(p[layer].sport, self.port_in)
865             else:
866                 if not ignore_port:
867                     if not dont_translate:
868                         self.assertNotEqual(p[layer].id, self.port_in)
869                     else:
870                         self.assertEqual(p[layer].id, self.port_in)
871             self.assertEqual(data, p[Raw].load)
872
873             # out2in
874             if not dont_translate:
875                 dst_addr = self.nat_addr
876             else:
877                 dst_addr = self.pg0.remote_ip4
878             if proto != IP_PROTOS.icmp:
879                 sport = 20
880                 dport = p[layer].sport
881             else:
882                 sport = p[layer].id
883                 dport = 0
884             pkts = self.create_stream_frag(
885                 self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
886             )
887             pkts.reverse()
888             self.pg1.add_stream(pkts)
889             self.pg_enable_capture(self.pg_interfaces)
890             self.pg_start()
891             frags = self.pg0.get_capture(len(pkts))
892             p = self.reass_frags_and_verify(
893                 frags, self.pg1.remote_ip4, self.pg0.remote_ip4
894             )
895             if proto != IP_PROTOS.icmp:
896                 self.assertEqual(p[layer].sport, 20)
897                 self.assertEqual(p[layer].dport, self.port_in)
898             else:
899                 self.assertEqual(p[layer].id, self.port_in)
900             self.assertEqual(data, p[Raw].load)
901
902
903 def get_nat44_ei_in2out_worker_index(ip, vpp_worker_count):
904     if 0 == vpp_worker_count:
905         return 0
906     numeric = socket.inet_aton(ip)
907     numeric = struct.unpack("!L", numeric)[0]
908     numeric = socket.htonl(numeric)
909     h = numeric + (numeric >> 8) + (numeric >> 16) + (numeric >> 24)
910     return 1 + h % vpp_worker_count
911
912
913 class TestNAT44EI(MethodHolder):
914     """NAT44EI Test Cases"""
915
916     max_translations = 10240
917     max_users = 10240
918
919     @classmethod
920     def setUpClass(cls):
921         super(TestNAT44EI, cls).setUpClass()
922         cls.vapi.cli("set log class nat44-ei level debug")
923
924         cls.tcp_port_in = 6303
925         cls.tcp_port_out = 6303
926         cls.udp_port_in = 6304
927         cls.udp_port_out = 6304
928         cls.icmp_id_in = 6305
929         cls.icmp_id_out = 6305
930         cls.nat_addr = "10.0.0.3"
931         cls.ipfix_src_port = 4739
932         cls.ipfix_domain_id = 1
933         cls.tcp_external_port = 80
934         cls.udp_external_port = 69
935
936         cls.create_pg_interfaces(range(10))
937         cls.interfaces = list(cls.pg_interfaces[0:4])
938
939         for i in cls.interfaces:
940             i.admin_up()
941             i.config_ip4()
942             i.resolve_arp()
943
944         cls.pg0.generate_remote_hosts(3)
945         cls.pg0.configure_ipv4_neighbors()
946
947         cls.pg1.generate_remote_hosts(1)
948         cls.pg1.configure_ipv4_neighbors()
949
950         cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
951         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10})
952         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20})
953
954         cls.pg4._local_ip4 = "172.16.255.1"
955         cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
956         cls.pg4.set_table_ip4(10)
957         cls.pg5._local_ip4 = "172.17.255.3"
958         cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
959         cls.pg5.set_table_ip4(10)
960         cls.pg6._local_ip4 = "172.16.255.1"
961         cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
962         cls.pg6.set_table_ip4(20)
963         for i in cls.overlapping_interfaces:
964             i.config_ip4()
965             i.admin_up()
966             i.resolve_arp()
967
968         cls.pg7.admin_up()
969         cls.pg8.admin_up()
970
971         cls.pg9.generate_remote_hosts(2)
972         cls.pg9.config_ip4()
973         cls.vapi.sw_interface_add_del_address(
974             sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24"
975         )
976
977         cls.pg9.admin_up()
978         cls.pg9.resolve_arp()
979         cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
980         cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
981         cls.pg9.resolve_arp()
982
983     def plugin_enable(self):
984         self.vapi.nat44_ei_plugin_enable_disable(
985             sessions=self.max_translations, users=self.max_users, enable=1
986         )
987
988     def setUp(self):
989         super(TestNAT44EI, self).setUp()
990         self.plugin_enable()
991
992     def tearDown(self):
993         super(TestNAT44EI, self).tearDown()
994         if not self.vpp_dead:
995             self.vapi.nat44_ei_ipfix_enable_disable(
996                 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
997             )
998             self.ipfix_src_port = 4739
999             self.ipfix_domain_id = 1
1000
1001             self.vapi.nat44_ei_plugin_enable_disable(enable=0)
1002             self.vapi.cli("clear logging")
1003
1004     def test_clear_sessions(self):
1005         """NAT44EI session clearing test"""
1006
1007         self.nat44_add_address(self.nat_addr)
1008         flags = self.config_flags.NAT44_EI_IF_INSIDE
1009         self.vapi.nat44_ei_interface_add_del_feature(
1010             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1011         )
1012         self.vapi.nat44_ei_interface_add_del_feature(
1013             sw_if_index=self.pg1.sw_if_index, is_add=1
1014         )
1015
1016         pkts = self.create_stream_in(self.pg0, self.pg1)
1017         self.pg0.add_stream(pkts)
1018         self.pg_enable_capture(self.pg_interfaces)
1019         self.pg_start()
1020         capture = self.pg1.get_capture(len(pkts))
1021         self.verify_capture_out(capture)
1022
1023         sessions = self.statistics["/nat44-ei/total-sessions"]
1024         self.assertGreater(sessions[:, 0].sum(), 0, "Session count invalid")
1025         self.logger.info("sessions before clearing: %s" % sessions[0][0])
1026
1027         self.vapi.cli("clear nat44 ei sessions")
1028
1029         sessions = self.statistics["/nat44-ei/total-sessions"]
1030         self.assertEqual(sessions[:, 0].sum(), 0, "Session count invalid")
1031         self.logger.info("sessions after clearing: %s" % sessions[0][0])
1032
1033     def test_dynamic(self):
1034         """NAT44EI dynamic translation test"""
1035         self.nat44_add_address(self.nat_addr)
1036         flags = self.config_flags.NAT44_EI_IF_INSIDE
1037         self.vapi.nat44_ei_interface_add_del_feature(
1038             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1039         )
1040         self.vapi.nat44_ei_interface_add_del_feature(
1041             sw_if_index=self.pg1.sw_if_index, is_add=1
1042         )
1043
1044         # in2out
1045         tcpn = self.statistics["/nat44-ei/in2out/slowpath/tcp"]
1046         udpn = self.statistics["/nat44-ei/in2out/slowpath/udp"]
1047         icmpn = self.statistics["/nat44-ei/in2out/slowpath/icmp"]
1048         drops = self.statistics["/nat44-ei/in2out/slowpath/drops"]
1049
1050         pkts = self.create_stream_in(self.pg0, self.pg1)
1051         self.pg0.add_stream(pkts)
1052         self.pg_enable_capture(self.pg_interfaces)
1053         self.pg_start()
1054         capture = self.pg1.get_capture(len(pkts))
1055         self.verify_capture_out(capture)
1056
1057         if_idx = self.pg0.sw_if_index
1058         cnt = self.statistics["/nat44-ei/in2out/slowpath/tcp"]
1059         self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
1060         cnt = self.statistics["/nat44-ei/in2out/slowpath/udp"]
1061         self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
1062         cnt = self.statistics["/nat44-ei/in2out/slowpath/icmp"]
1063         self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
1064         cnt = self.statistics["/nat44-ei/in2out/slowpath/drops"]
1065         self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
1066
1067         # out2in
1068         tcpn = self.statistics["/nat44-ei/out2in/slowpath/tcp"]
1069         udpn = self.statistics["/nat44-ei/out2in/slowpath/udp"]
1070         icmpn = self.statistics["/nat44-ei/out2in/slowpath/icmp"]
1071         drops = self.statistics["/nat44-ei/out2in/slowpath/drops"]
1072
1073         pkts = self.create_stream_out(self.pg1)
1074         self.pg1.add_stream(pkts)
1075         self.pg_enable_capture(self.pg_interfaces)
1076         self.pg_start()
1077         capture = self.pg0.get_capture(len(pkts))
1078         self.verify_capture_in(capture, self.pg0)
1079
1080         if_idx = self.pg1.sw_if_index
1081         cnt = self.statistics["/nat44-ei/out2in/slowpath/tcp"]
1082         self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
1083         cnt = self.statistics["/nat44-ei/out2in/slowpath/udp"]
1084         self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
1085         cnt = self.statistics["/nat44-ei/out2in/slowpath/icmp"]
1086         self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
1087         cnt = self.statistics["/nat44-ei/out2in/slowpath/drops"]
1088         self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
1089
1090         users = self.statistics["/nat44-ei/total-users"]
1091         self.assertEqual(users[:, 0].sum(), 1)
1092         sessions = self.statistics["/nat44-ei/total-sessions"]
1093         self.assertEqual(sessions[:, 0].sum(), 3)
1094
1095     def test_dynamic_icmp_errors_in2out_ttl_1(self):
1096         """NAT44EI handling of client packets with TTL=1"""
1097
1098         self.nat44_add_address(self.nat_addr)
1099         flags = self.config_flags.NAT44_EI_IF_INSIDE
1100         self.vapi.nat44_ei_interface_add_del_feature(
1101             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1102         )
1103         self.vapi.nat44_ei_interface_add_del_feature(
1104             sw_if_index=self.pg1.sw_if_index, is_add=1
1105         )
1106
1107         # Client side - generate traffic
1108         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1109         capture = self.send_and_expect_some(self.pg0, pkts, self.pg0)
1110
1111         # Client side - verify ICMP type 11 packets
1112         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1113
1114     def test_dynamic_icmp_errors_out2in_ttl_1(self):
1115         """NAT44EI handling of server packets with TTL=1"""
1116
1117         self.nat44_add_address(self.nat_addr)
1118         flags = self.config_flags.NAT44_EI_IF_INSIDE
1119         self.vapi.nat44_ei_interface_add_del_feature(
1120             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1121         )
1122         self.vapi.nat44_ei_interface_add_del_feature(
1123             sw_if_index=self.pg1.sw_if_index, is_add=1
1124         )
1125
1126         # Client side - create sessions
1127         pkts = self.create_stream_in(self.pg0, self.pg1)
1128         self.pg0.add_stream(pkts)
1129         self.pg_enable_capture(self.pg_interfaces)
1130         self.pg_start()
1131
1132         # Server side - generate traffic
1133         capture = self.pg1.get_capture(len(pkts))
1134         self.verify_capture_out(capture)
1135         pkts = self.create_stream_out(self.pg1, ttl=1)
1136         capture = self.send_and_expect_some(self.pg1, pkts, self.pg1)
1137
1138         # Server side - verify ICMP type 11 packets
1139         self.verify_capture_out_with_icmp_errors(capture, src_ip=self.pg1.local_ip4)
1140
1141     def test_dynamic_icmp_errors_in2out_ttl_2(self):
1142         """NAT44EI handling of error responses to client packets with TTL=2"""
1143
1144         self.nat44_add_address(self.nat_addr)
1145         flags = self.config_flags.NAT44_EI_IF_INSIDE
1146         self.vapi.nat44_ei_interface_add_del_feature(
1147             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1148         )
1149         self.vapi.nat44_ei_interface_add_del_feature(
1150             sw_if_index=self.pg1.sw_if_index, is_add=1
1151         )
1152
1153         # Client side - generate traffic
1154         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1155         self.pg0.add_stream(pkts)
1156         self.pg_enable_capture(self.pg_interfaces)
1157         self.pg_start()
1158
1159         # Server side - simulate ICMP type 11 response
1160         capture = self.pg1.get_capture(len(pkts))
1161         pkts = [
1162             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1163             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1164             / ICMP(type=11)
1165             / packet[IP]
1166             for packet in capture
1167         ]
1168         self.pg1.add_stream(pkts)
1169         self.pg_enable_capture(self.pg_interfaces)
1170         self.pg_start()
1171
1172         # Client side - verify ICMP type 11 packets
1173         capture = self.pg0.get_capture(len(pkts))
1174         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1175
1176     def test_dynamic_icmp_errors_out2in_ttl_2(self):
1177         """NAT44EI handling of error responses to server packets with TTL=2"""
1178
1179         self.nat44_add_address(self.nat_addr)
1180         flags = self.config_flags.NAT44_EI_IF_INSIDE
1181         self.vapi.nat44_ei_interface_add_del_feature(
1182             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1183         )
1184         self.vapi.nat44_ei_interface_add_del_feature(
1185             sw_if_index=self.pg1.sw_if_index, is_add=1
1186         )
1187
1188         # Client side - create sessions
1189         pkts = self.create_stream_in(self.pg0, self.pg1)
1190         self.pg0.add_stream(pkts)
1191         self.pg_enable_capture(self.pg_interfaces)
1192         self.pg_start()
1193
1194         # Server side - generate traffic
1195         capture = self.pg1.get_capture(len(pkts))
1196         self.verify_capture_out(capture)
1197         pkts = self.create_stream_out(self.pg1, ttl=2)
1198         self.pg1.add_stream(pkts)
1199         self.pg_enable_capture(self.pg_interfaces)
1200         self.pg_start()
1201
1202         # Client side - simulate ICMP type 11 response
1203         capture = self.pg0.get_capture(len(pkts))
1204         pkts = [
1205             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1206             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1207             / ICMP(type=11)
1208             / packet[IP]
1209             for packet in capture
1210         ]
1211         self.pg0.add_stream(pkts)
1212         self.pg_enable_capture(self.pg_interfaces)
1213         self.pg_start()
1214
1215         # Server side - verify ICMP type 11 packets
1216         capture = self.pg1.get_capture(len(pkts))
1217         self.verify_capture_out_with_icmp_errors(capture)
1218
1219     def test_ping_out_interface_from_outside(self):
1220         """NAT44EI ping out interface from outside network"""
1221
1222         self.nat44_add_address(self.nat_addr)
1223         flags = self.config_flags.NAT44_EI_IF_INSIDE
1224         self.vapi.nat44_ei_interface_add_del_feature(
1225             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1226         )
1227         self.vapi.nat44_ei_interface_add_del_feature(
1228             sw_if_index=self.pg1.sw_if_index, is_add=1
1229         )
1230
1231         p = (
1232             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1233             / IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4)
1234             / ICMP(id=self.icmp_id_out, type="echo-request")
1235         )
1236         pkts = [p]
1237         self.pg1.add_stream(pkts)
1238         self.pg_enable_capture(self.pg_interfaces)
1239         self.pg_start()
1240         capture = self.pg1.get_capture(len(pkts))
1241         packet = capture[0]
1242         try:
1243             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1244             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1245             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1246             self.assertEqual(packet[ICMP].type, 0)  # echo reply
1247         except:
1248             self.logger.error(
1249                 ppp("Unexpected or invalid packet (outside network):", packet)
1250             )
1251             raise
1252
1253     def test_ping_internal_host_from_outside(self):
1254         """NAT44EI ping internal host from outside network"""
1255
1256         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1257         flags = self.config_flags.NAT44_EI_IF_INSIDE
1258         self.vapi.nat44_ei_interface_add_del_feature(
1259             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1260         )
1261         self.vapi.nat44_ei_interface_add_del_feature(
1262             sw_if_index=self.pg1.sw_if_index, is_add=1
1263         )
1264
1265         # out2in
1266         pkt = (
1267             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1268             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64)
1269             / ICMP(id=self.icmp_id_out, type="echo-request")
1270         )
1271         self.pg1.add_stream(pkt)
1272         self.pg_enable_capture(self.pg_interfaces)
1273         self.pg_start()
1274         capture = self.pg0.get_capture(1)
1275         self.verify_capture_in(capture, self.pg0)
1276         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1277
1278         # in2out
1279         pkt = (
1280             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1281             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
1282             / ICMP(id=self.icmp_id_in, type="echo-reply")
1283         )
1284         self.pg0.add_stream(pkt)
1285         self.pg_enable_capture(self.pg_interfaces)
1286         self.pg_start()
1287         capture = self.pg1.get_capture(1)
1288         self.verify_capture_out(capture, same_port=True)
1289         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1290
1291     def test_forwarding(self):
1292         """NAT44EI forwarding test"""
1293
1294         flags = self.config_flags.NAT44_EI_IF_INSIDE
1295         self.vapi.nat44_ei_interface_add_del_feature(
1296             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1297         )
1298         self.vapi.nat44_ei_interface_add_del_feature(
1299             sw_if_index=self.pg1.sw_if_index, is_add=1
1300         )
1301         self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
1302
1303         real_ip = self.pg0.remote_ip4
1304         alias_ip = self.nat_addr
1305         flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1306         self.vapi.nat44_ei_add_del_static_mapping(
1307             is_add=1,
1308             local_ip_address=real_ip,
1309             external_ip_address=alias_ip,
1310             external_sw_if_index=0xFFFFFFFF,
1311             flags=flags,
1312         )
1313
1314         try:
1315             # static mapping match
1316
1317             pkts = self.create_stream_out(self.pg1)
1318             self.pg1.add_stream(pkts)
1319             self.pg_enable_capture(self.pg_interfaces)
1320             self.pg_start()
1321             capture = self.pg0.get_capture(len(pkts))
1322             self.verify_capture_in(capture, self.pg0)
1323
1324             pkts = self.create_stream_in(self.pg0, self.pg1)
1325             self.pg0.add_stream(pkts)
1326             self.pg_enable_capture(self.pg_interfaces)
1327             self.pg_start()
1328             capture = self.pg1.get_capture(len(pkts))
1329             self.verify_capture_out(capture, same_port=True)
1330
1331             # no static mapping match
1332
1333             host0 = self.pg0.remote_hosts[0]
1334             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1335             try:
1336                 pkts = self.create_stream_out(
1337                     self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1338                 )
1339                 self.pg1.add_stream(pkts)
1340                 self.pg_enable_capture(self.pg_interfaces)
1341                 self.pg_start()
1342                 capture = self.pg0.get_capture(len(pkts))
1343                 self.verify_capture_in(capture, self.pg0)
1344
1345                 pkts = self.create_stream_in(self.pg0, self.pg1)
1346                 self.pg0.add_stream(pkts)
1347                 self.pg_enable_capture(self.pg_interfaces)
1348                 self.pg_start()
1349                 capture = self.pg1.get_capture(len(pkts))
1350                 self.verify_capture_out(
1351                     capture, nat_ip=self.pg0.remote_ip4, same_port=True
1352                 )
1353             finally:
1354                 self.pg0.remote_hosts[0] = host0
1355
1356         finally:
1357             self.vapi.nat44_ei_forwarding_enable_disable(enable=0)
1358             flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1359             self.vapi.nat44_ei_add_del_static_mapping(
1360                 is_add=0,
1361                 local_ip_address=real_ip,
1362                 external_ip_address=alias_ip,
1363                 external_sw_if_index=0xFFFFFFFF,
1364                 flags=flags,
1365             )
1366
1367     def test_static_in(self):
1368         """NAT44EI 1:1 NAT initialized from inside network"""
1369
1370         nat_ip = "10.0.0.10"
1371         self.tcp_port_out = 6303
1372         self.udp_port_out = 6304
1373         self.icmp_id_out = 6305
1374
1375         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1376         flags = self.config_flags.NAT44_EI_IF_INSIDE
1377         self.vapi.nat44_ei_interface_add_del_feature(
1378             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1379         )
1380         self.vapi.nat44_ei_interface_add_del_feature(
1381             sw_if_index=self.pg1.sw_if_index, is_add=1
1382         )
1383         sm = self.vapi.nat44_ei_static_mapping_dump()
1384         self.assertEqual(len(sm), 1)
1385         self.assertEqual(sm[0].tag, "")
1386         self.assertEqual(sm[0].protocol, 0)
1387         self.assertEqual(sm[0].local_port, 0)
1388         self.assertEqual(sm[0].external_port, 0)
1389
1390         # in2out
1391         pkts = self.create_stream_in(self.pg0, self.pg1)
1392         self.pg0.add_stream(pkts)
1393         self.pg_enable_capture(self.pg_interfaces)
1394         self.pg_start()
1395         capture = self.pg1.get_capture(len(pkts))
1396         self.verify_capture_out(capture, nat_ip, True)
1397
1398         # out2in
1399         pkts = self.create_stream_out(self.pg1, nat_ip)
1400         self.pg1.add_stream(pkts)
1401         self.pg_enable_capture(self.pg_interfaces)
1402         self.pg_start()
1403         capture = self.pg0.get_capture(len(pkts))
1404         self.verify_capture_in(capture, self.pg0)
1405
1406     def test_static_out(self):
1407         """NAT44EI 1:1 NAT initialized from outside network"""
1408
1409         nat_ip = "10.0.0.20"
1410         self.tcp_port_out = 6303
1411         self.udp_port_out = 6304
1412         self.icmp_id_out = 6305
1413         tag = "testTAG"
1414
1415         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1416         flags = self.config_flags.NAT44_EI_IF_INSIDE
1417         self.vapi.nat44_ei_interface_add_del_feature(
1418             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1419         )
1420         self.vapi.nat44_ei_interface_add_del_feature(
1421             sw_if_index=self.pg1.sw_if_index, is_add=1
1422         )
1423         sm = self.vapi.nat44_ei_static_mapping_dump()
1424         self.assertEqual(len(sm), 1)
1425         self.assertEqual(sm[0].tag, tag)
1426
1427         # out2in
1428         pkts = self.create_stream_out(self.pg1, nat_ip)
1429         self.pg1.add_stream(pkts)
1430         self.pg_enable_capture(self.pg_interfaces)
1431         self.pg_start()
1432         capture = self.pg0.get_capture(len(pkts))
1433         self.verify_capture_in(capture, self.pg0)
1434
1435         # in2out
1436         pkts = self.create_stream_in(self.pg0, self.pg1)
1437         self.pg0.add_stream(pkts)
1438         self.pg_enable_capture(self.pg_interfaces)
1439         self.pg_start()
1440         capture = self.pg1.get_capture(len(pkts))
1441         self.verify_capture_out(capture, nat_ip, True)
1442
1443     def test_static_with_port_in(self):
1444         """NAT44EI 1:1 NAPT initialized from inside network"""
1445
1446         self.tcp_port_out = 3606
1447         self.udp_port_out = 3607
1448         self.icmp_id_out = 3608
1449
1450         self.nat44_add_address(self.nat_addr)
1451         self.nat44_add_static_mapping(
1452             self.pg0.remote_ip4,
1453             self.nat_addr,
1454             self.tcp_port_in,
1455             self.tcp_port_out,
1456             proto=IP_PROTOS.tcp,
1457         )
1458         self.nat44_add_static_mapping(
1459             self.pg0.remote_ip4,
1460             self.nat_addr,
1461             self.udp_port_in,
1462             self.udp_port_out,
1463             proto=IP_PROTOS.udp,
1464         )
1465         self.nat44_add_static_mapping(
1466             self.pg0.remote_ip4,
1467             self.nat_addr,
1468             self.icmp_id_in,
1469             self.icmp_id_out,
1470             proto=IP_PROTOS.icmp,
1471         )
1472         flags = self.config_flags.NAT44_EI_IF_INSIDE
1473         self.vapi.nat44_ei_interface_add_del_feature(
1474             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1475         )
1476         self.vapi.nat44_ei_interface_add_del_feature(
1477             sw_if_index=self.pg1.sw_if_index, is_add=1
1478         )
1479
1480         # in2out
1481         pkts = self.create_stream_in(self.pg0, self.pg1)
1482         self.pg0.add_stream(pkts)
1483         self.pg_enable_capture(self.pg_interfaces)
1484         self.pg_start()
1485         capture = self.pg1.get_capture(len(pkts))
1486         self.verify_capture_out(capture)
1487
1488         # out2in
1489         pkts = self.create_stream_out(self.pg1)
1490         self.pg1.add_stream(pkts)
1491         self.pg_enable_capture(self.pg_interfaces)
1492         self.pg_start()
1493         capture = self.pg0.get_capture(len(pkts))
1494         self.verify_capture_in(capture, self.pg0)
1495
1496     def test_static_with_port_out(self):
1497         """NAT44EI 1:1 NAPT initialized from outside network"""
1498
1499         self.tcp_port_out = 30606
1500         self.udp_port_out = 30607
1501         self.icmp_id_out = 30608
1502
1503         self.nat44_add_address(self.nat_addr)
1504         self.nat44_add_static_mapping(
1505             self.pg0.remote_ip4,
1506             self.nat_addr,
1507             self.tcp_port_in,
1508             self.tcp_port_out,
1509             proto=IP_PROTOS.tcp,
1510         )
1511         self.nat44_add_static_mapping(
1512             self.pg0.remote_ip4,
1513             self.nat_addr,
1514             self.udp_port_in,
1515             self.udp_port_out,
1516             proto=IP_PROTOS.udp,
1517         )
1518         self.nat44_add_static_mapping(
1519             self.pg0.remote_ip4,
1520             self.nat_addr,
1521             self.icmp_id_in,
1522             self.icmp_id_out,
1523             proto=IP_PROTOS.icmp,
1524         )
1525         flags = self.config_flags.NAT44_EI_IF_INSIDE
1526         self.vapi.nat44_ei_interface_add_del_feature(
1527             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1528         )
1529         self.vapi.nat44_ei_interface_add_del_feature(
1530             sw_if_index=self.pg1.sw_if_index, is_add=1
1531         )
1532
1533         # out2in
1534         pkts = self.create_stream_out(self.pg1)
1535         self.pg1.add_stream(pkts)
1536         self.pg_enable_capture(self.pg_interfaces)
1537         self.pg_start()
1538         capture = self.pg0.get_capture(len(pkts))
1539         self.verify_capture_in(capture, self.pg0)
1540
1541         # in2out
1542         pkts = self.create_stream_in(self.pg0, self.pg1)
1543         self.pg0.add_stream(pkts)
1544         self.pg_enable_capture(self.pg_interfaces)
1545         self.pg_start()
1546         capture = self.pg1.get_capture(len(pkts))
1547         self.verify_capture_out(capture)
1548
1549     def test_static_vrf_aware(self):
1550         """NAT44EI 1:1 NAT VRF awareness"""
1551
1552         nat_ip1 = "10.0.0.30"
1553         nat_ip2 = "10.0.0.40"
1554         self.tcp_port_out = 6303
1555         self.udp_port_out = 6304
1556         self.icmp_id_out = 6305
1557
1558         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1, vrf_id=10)
1559         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2, vrf_id=10)
1560         flags = self.config_flags.NAT44_EI_IF_INSIDE
1561         self.vapi.nat44_ei_interface_add_del_feature(
1562             sw_if_index=self.pg3.sw_if_index, is_add=1
1563         )
1564         self.vapi.nat44_ei_interface_add_del_feature(
1565             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1566         )
1567         self.vapi.nat44_ei_interface_add_del_feature(
1568             sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
1569         )
1570
1571         # inside interface VRF match NAT44EI static mapping VRF
1572         pkts = self.create_stream_in(self.pg4, self.pg3)
1573         self.pg4.add_stream(pkts)
1574         self.pg_enable_capture(self.pg_interfaces)
1575         self.pg_start()
1576         capture = self.pg3.get_capture(len(pkts))
1577         self.verify_capture_out(capture, nat_ip1, True)
1578
1579         # inside interface VRF don't match NAT44EI static mapping VRF (packets
1580         # are dropped)
1581         pkts = self.create_stream_in(self.pg0, self.pg3)
1582         self.pg0.add_stream(pkts)
1583         self.pg_enable_capture(self.pg_interfaces)
1584         self.pg_start()
1585         self.pg3.assert_nothing_captured()
1586
1587     def test_dynamic_to_static(self):
1588         """NAT44EI Switch from dynamic translation to 1:1NAT"""
1589         nat_ip = "10.0.0.10"
1590         self.tcp_port_out = 6303
1591         self.udp_port_out = 6304
1592         self.icmp_id_out = 6305
1593
1594         self.nat44_add_address(self.nat_addr)
1595         flags = self.config_flags.NAT44_EI_IF_INSIDE
1596         self.vapi.nat44_ei_interface_add_del_feature(
1597             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1598         )
1599         self.vapi.nat44_ei_interface_add_del_feature(
1600             sw_if_index=self.pg1.sw_if_index, is_add=1
1601         )
1602
1603         # dynamic
1604         pkts = self.create_stream_in(self.pg0, self.pg1)
1605         self.pg0.add_stream(pkts)
1606         self.pg_enable_capture(self.pg_interfaces)
1607         self.pg_start()
1608         capture = self.pg1.get_capture(len(pkts))
1609         self.verify_capture_out(capture)
1610
1611         # 1:1NAT
1612         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1613         sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
1614         self.assertEqual(len(sessions), 0)
1615         pkts = self.create_stream_in(self.pg0, self.pg1)
1616         self.pg0.add_stream(pkts)
1617         self.pg_enable_capture(self.pg_interfaces)
1618         self.pg_start()
1619         capture = self.pg1.get_capture(len(pkts))
1620         self.verify_capture_out(capture, nat_ip, True)
1621
1622     def test_identity_nat(self):
1623         """NAT44EI Identity NAT"""
1624         flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1625         self.vapi.nat44_ei_add_del_identity_mapping(
1626             ip_address=self.pg0.remote_ip4,
1627             sw_if_index=0xFFFFFFFF,
1628             flags=flags,
1629             is_add=1,
1630         )
1631         flags = self.config_flags.NAT44_EI_IF_INSIDE
1632         self.vapi.nat44_ei_interface_add_del_feature(
1633             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1634         )
1635         self.vapi.nat44_ei_interface_add_del_feature(
1636             sw_if_index=self.pg1.sw_if_index, is_add=1
1637         )
1638
1639         p = (
1640             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1641             / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
1642             / TCP(sport=12345, dport=56789)
1643         )
1644         self.pg1.add_stream(p)
1645         self.pg_enable_capture(self.pg_interfaces)
1646         self.pg_start()
1647         capture = self.pg0.get_capture(1)
1648         p = capture[0]
1649         try:
1650             ip = p[IP]
1651             tcp = p[TCP]
1652             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1653             self.assertEqual(ip.src, self.pg1.remote_ip4)
1654             self.assertEqual(tcp.dport, 56789)
1655             self.assertEqual(tcp.sport, 12345)
1656             self.assert_packet_checksums_valid(p)
1657         except:
1658             self.logger.error(ppp("Unexpected or invalid packet:", p))
1659             raise
1660
1661         sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
1662         self.assertEqual(len(sessions), 0)
1663         flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1664         self.vapi.nat44_ei_add_del_identity_mapping(
1665             ip_address=self.pg0.remote_ip4,
1666             sw_if_index=0xFFFFFFFF,
1667             flags=flags,
1668             vrf_id=1,
1669             is_add=1,
1670         )
1671         identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
1672         self.assertEqual(len(identity_mappings), 2)
1673
1674     def test_multiple_inside_interfaces(self):
1675         """NAT44EI multiple non-overlapping address space inside interfaces"""
1676
1677         self.nat44_add_address(self.nat_addr)
1678         flags = self.config_flags.NAT44_EI_IF_INSIDE
1679         self.vapi.nat44_ei_interface_add_del_feature(
1680             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1681         )
1682         self.vapi.nat44_ei_interface_add_del_feature(
1683             sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
1684         )
1685         self.vapi.nat44_ei_interface_add_del_feature(
1686             sw_if_index=self.pg3.sw_if_index, is_add=1
1687         )
1688
1689         # between two NAT44EI inside interfaces (no translation)
1690         pkts = self.create_stream_in(self.pg0, self.pg1)
1691         self.pg0.add_stream(pkts)
1692         self.pg_enable_capture(self.pg_interfaces)
1693         self.pg_start()
1694         capture = self.pg1.get_capture(len(pkts))
1695         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1696
1697         # from inside to interface without translation
1698         pkts = self.create_stream_in(self.pg0, self.pg2)
1699         self.pg0.add_stream(pkts)
1700         self.pg_enable_capture(self.pg_interfaces)
1701         self.pg_start()
1702         capture = self.pg2.get_capture(len(pkts))
1703         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1704
1705         # in2out 1st interface
1706         pkts = self.create_stream_in(self.pg0, self.pg3)
1707         self.pg0.add_stream(pkts)
1708         self.pg_enable_capture(self.pg_interfaces)
1709         self.pg_start()
1710         capture = self.pg3.get_capture(len(pkts))
1711         self.verify_capture_out(capture)
1712
1713         # out2in 1st interface
1714         pkts = self.create_stream_out(self.pg3)
1715         self.pg3.add_stream(pkts)
1716         self.pg_enable_capture(self.pg_interfaces)
1717         self.pg_start()
1718         capture = self.pg0.get_capture(len(pkts))
1719         self.verify_capture_in(capture, self.pg0)
1720
1721         # in2out 2nd interface
1722         pkts = self.create_stream_in(self.pg1, self.pg3)
1723         self.pg1.add_stream(pkts)
1724         self.pg_enable_capture(self.pg_interfaces)
1725         self.pg_start()
1726         capture = self.pg3.get_capture(len(pkts))
1727         self.verify_capture_out(capture)
1728
1729         # out2in 2nd interface
1730         pkts = self.create_stream_out(self.pg3)
1731         self.pg3.add_stream(pkts)
1732         self.pg_enable_capture(self.pg_interfaces)
1733         self.pg_start()
1734         capture = self.pg1.get_capture(len(pkts))
1735         self.verify_capture_in(capture, self.pg1)
1736
1737     def test_inside_overlapping_interfaces(self):
1738         """NAT44EI multiple inside interfaces with overlapping address space"""
1739
1740         static_nat_ip = "10.0.0.10"
1741         self.nat44_add_address(self.nat_addr)
1742         flags = self.config_flags.NAT44_EI_IF_INSIDE
1743         self.vapi.nat44_ei_interface_add_del_feature(
1744             sw_if_index=self.pg3.sw_if_index, is_add=1
1745         )
1746         self.vapi.nat44_ei_interface_add_del_feature(
1747             sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
1748         )
1749         self.vapi.nat44_ei_interface_add_del_feature(
1750             sw_if_index=self.pg5.sw_if_index, flags=flags, is_add=1
1751         )
1752         self.vapi.nat44_ei_interface_add_del_feature(
1753             sw_if_index=self.pg6.sw_if_index, flags=flags, is_add=1
1754         )
1755         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip, vrf_id=20)
1756
1757         # between NAT44EI inside interfaces with same VRF (no translation)
1758         pkts = self.create_stream_in(self.pg4, self.pg5)
1759         self.pg4.add_stream(pkts)
1760         self.pg_enable_capture(self.pg_interfaces)
1761         self.pg_start()
1762         capture = self.pg5.get_capture(len(pkts))
1763         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1764
1765         # between NAT44EI inside interfaces with different VRF (hairpinning)
1766         p = (
1767             Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
1768             / IP(src=self.pg4.remote_ip4, dst=static_nat_ip)
1769             / TCP(sport=1234, dport=5678)
1770         )
1771         self.pg4.add_stream(p)
1772         self.pg_enable_capture(self.pg_interfaces)
1773         self.pg_start()
1774         capture = self.pg6.get_capture(1)
1775         p = capture[0]
1776         try:
1777             ip = p[IP]
1778             tcp = p[TCP]
1779             self.assertEqual(ip.src, self.nat_addr)
1780             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1781             self.assertNotEqual(tcp.sport, 1234)
1782             self.assertEqual(tcp.dport, 5678)
1783         except:
1784             self.logger.error(ppp("Unexpected or invalid packet:", p))
1785             raise
1786
1787         # in2out 1st interface
1788         pkts = self.create_stream_in(self.pg4, self.pg3)
1789         self.pg4.add_stream(pkts)
1790         self.pg_enable_capture(self.pg_interfaces)
1791         self.pg_start()
1792         capture = self.pg3.get_capture(len(pkts))
1793         self.verify_capture_out(capture)
1794
1795         # out2in 1st interface
1796         pkts = self.create_stream_out(self.pg3)
1797         self.pg3.add_stream(pkts)
1798         self.pg_enable_capture(self.pg_interfaces)
1799         self.pg_start()
1800         capture = self.pg4.get_capture(len(pkts))
1801         self.verify_capture_in(capture, self.pg4)
1802
1803         # in2out 2nd interface
1804         pkts = self.create_stream_in(self.pg5, self.pg3)
1805         self.pg5.add_stream(pkts)
1806         self.pg_enable_capture(self.pg_interfaces)
1807         self.pg_start()
1808         capture = self.pg3.get_capture(len(pkts))
1809         self.verify_capture_out(capture)
1810
1811         # out2in 2nd interface
1812         pkts = self.create_stream_out(self.pg3)
1813         self.pg3.add_stream(pkts)
1814         self.pg_enable_capture(self.pg_interfaces)
1815         self.pg_start()
1816         capture = self.pg5.get_capture(len(pkts))
1817         self.verify_capture_in(capture, self.pg5)
1818
1819         # pg5 session dump
1820         addresses = self.vapi.nat44_ei_address_dump()
1821         self.assertEqual(len(addresses), 1)
1822         sessions = self.vapi.nat44_ei_user_session_dump(self.pg5.remote_ip4, 10)
1823         self.assertEqual(len(sessions), 3)
1824         for session in sessions:
1825             self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1826             self.assertEqual(str(session.inside_ip_address), self.pg5.remote_ip4)
1827             self.assertEqual(session.outside_ip_address, addresses[0].ip_address)
1828         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1829         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1830         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1831         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1832         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1833         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1834         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1835         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1836         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1837
1838         # in2out 3rd interface
1839         pkts = self.create_stream_in(self.pg6, self.pg3)
1840         self.pg6.add_stream(pkts)
1841         self.pg_enable_capture(self.pg_interfaces)
1842         self.pg_start()
1843         capture = self.pg3.get_capture(len(pkts))
1844         self.verify_capture_out(capture, static_nat_ip, True)
1845
1846         # out2in 3rd interface
1847         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1848         self.pg3.add_stream(pkts)
1849         self.pg_enable_capture(self.pg_interfaces)
1850         self.pg_start()
1851         capture = self.pg6.get_capture(len(pkts))
1852         self.verify_capture_in(capture, self.pg6)
1853
1854         # general user and session dump verifications
1855         users = self.vapi.nat44_ei_user_dump()
1856         self.assertGreaterEqual(len(users), 3)
1857         addresses = self.vapi.nat44_ei_address_dump()
1858         self.assertEqual(len(addresses), 1)
1859         for user in users:
1860             sessions = self.vapi.nat44_ei_user_session_dump(
1861                 user.ip_address, user.vrf_id
1862             )
1863             for session in sessions:
1864                 self.assertEqual(user.ip_address, session.inside_ip_address)
1865                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1866                 self.assertTrue(
1867                     session.protocol in [IP_PROTOS.tcp, IP_PROTOS.udp, IP_PROTOS.icmp]
1868                 )
1869
1870         # pg4 session dump
1871         sessions = self.vapi.nat44_ei_user_session_dump(self.pg4.remote_ip4, 10)
1872         self.assertGreaterEqual(len(sessions), 4)
1873         for session in sessions:
1874             self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1875             self.assertEqual(str(session.inside_ip_address), self.pg4.remote_ip4)
1876             self.assertEqual(session.outside_ip_address, addresses[0].ip_address)
1877
1878         # pg6 session dump
1879         sessions = self.vapi.nat44_ei_user_session_dump(self.pg6.remote_ip4, 20)
1880         self.assertGreaterEqual(len(sessions), 3)
1881         for session in sessions:
1882             self.assertTrue(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1883             self.assertEqual(str(session.inside_ip_address), self.pg6.remote_ip4)
1884             self.assertEqual(str(session.outside_ip_address), static_nat_ip)
1885             self.assertTrue(
1886                 session.inside_port
1887                 in [self.tcp_port_in, self.udp_port_in, self.icmp_id_in]
1888             )
1889
1890     def test_hairpinning(self):
1891         """NAT44EI hairpinning - 1:1 NAPT"""
1892
1893         host = self.pg0.remote_hosts[0]
1894         server = self.pg0.remote_hosts[1]
1895         host_in_port = 1234
1896         host_out_port = 0
1897         server_in_port = 5678
1898         server_out_port = 8765
1899
1900         self.nat44_add_address(self.nat_addr)
1901         flags = self.config_flags.NAT44_EI_IF_INSIDE
1902         self.vapi.nat44_ei_interface_add_del_feature(
1903             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1904         )
1905         self.vapi.nat44_ei_interface_add_del_feature(
1906             sw_if_index=self.pg1.sw_if_index, is_add=1
1907         )
1908
1909         # add static mapping for server
1910         self.nat44_add_static_mapping(
1911             server.ip4,
1912             self.nat_addr,
1913             server_in_port,
1914             server_out_port,
1915             proto=IP_PROTOS.tcp,
1916         )
1917
1918         cnt = self.statistics["/nat44-ei/hairpinning"]
1919         # send packet from host to server
1920         p = (
1921             Ether(src=host.mac, dst=self.pg0.local_mac)
1922             / IP(src=host.ip4, dst=self.nat_addr)
1923             / TCP(sport=host_in_port, dport=server_out_port)
1924         )
1925         self.pg0.add_stream(p)
1926         self.pg_enable_capture(self.pg_interfaces)
1927         self.pg_start()
1928         capture = self.pg0.get_capture(1)
1929         p = capture[0]
1930         try:
1931             ip = p[IP]
1932             tcp = p[TCP]
1933             self.assertEqual(ip.src, self.nat_addr)
1934             self.assertEqual(ip.dst, server.ip4)
1935             self.assertNotEqual(tcp.sport, host_in_port)
1936             self.assertEqual(tcp.dport, server_in_port)
1937             self.assert_packet_checksums_valid(p)
1938             host_out_port = tcp.sport
1939         except:
1940             self.logger.error(ppp("Unexpected or invalid packet:", p))
1941             raise
1942
1943         after = self.statistics["/nat44-ei/hairpinning"]
1944         if_idx = self.pg0.sw_if_index
1945         self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
1946
1947         # send reply from server to host
1948         p = (
1949             Ether(src=server.mac, dst=self.pg0.local_mac)
1950             / IP(src=server.ip4, dst=self.nat_addr)
1951             / TCP(sport=server_in_port, dport=host_out_port)
1952         )
1953         self.pg0.add_stream(p)
1954         self.pg_enable_capture(self.pg_interfaces)
1955         self.pg_start()
1956         capture = self.pg0.get_capture(1)
1957         p = capture[0]
1958         try:
1959             ip = p[IP]
1960             tcp = p[TCP]
1961             self.assertEqual(ip.src, self.nat_addr)
1962             self.assertEqual(ip.dst, host.ip4)
1963             self.assertEqual(tcp.sport, server_out_port)
1964             self.assertEqual(tcp.dport, host_in_port)
1965             self.assert_packet_checksums_valid(p)
1966         except:
1967             self.logger.error(ppp("Unexpected or invalid packet:", p))
1968             raise
1969
1970         after = self.statistics["/nat44-ei/hairpinning"]
1971         if_idx = self.pg0.sw_if_index
1972         self.assertEqual(
1973             after[:, if_idx].sum() - cnt[:, if_idx].sum(),
1974             2 + (1 if self.vpp_worker_count > 0 else 0),
1975         )
1976
1977     def test_hairpinning2(self):
1978         """NAT44EI hairpinning - 1:1 NAT"""
1979
1980         server1_nat_ip = "10.0.0.10"
1981         server2_nat_ip = "10.0.0.11"
1982         host = self.pg0.remote_hosts[0]
1983         server1 = self.pg0.remote_hosts[1]
1984         server2 = self.pg0.remote_hosts[2]
1985         server_tcp_port = 22
1986         server_udp_port = 20
1987
1988         self.nat44_add_address(self.nat_addr)
1989         flags = self.config_flags.NAT44_EI_IF_INSIDE
1990         self.vapi.nat44_ei_interface_add_del_feature(
1991             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1992         )
1993         self.vapi.nat44_ei_interface_add_del_feature(
1994             sw_if_index=self.pg1.sw_if_index, is_add=1
1995         )
1996
1997         # add static mapping for servers
1998         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1999         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2000
2001         # host to server1
2002         pkts = []
2003         p = (
2004             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2005             / IP(src=host.ip4, dst=server1_nat_ip)
2006             / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
2007         )
2008         pkts.append(p)
2009         p = (
2010             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2011             / IP(src=host.ip4, dst=server1_nat_ip)
2012             / UDP(sport=self.udp_port_in, dport=server_udp_port)
2013         )
2014         pkts.append(p)
2015         p = (
2016             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2017             / IP(src=host.ip4, dst=server1_nat_ip)
2018             / ICMP(id=self.icmp_id_in, type="echo-request")
2019         )
2020         pkts.append(p)
2021         self.pg0.add_stream(pkts)
2022         self.pg_enable_capture(self.pg_interfaces)
2023         self.pg_start()
2024         capture = self.pg0.get_capture(len(pkts))
2025         for packet in capture:
2026             try:
2027                 self.assertEqual(packet[IP].src, self.nat_addr)
2028                 self.assertEqual(packet[IP].dst, server1.ip4)
2029                 if packet.haslayer(TCP):
2030                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2031                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2032                     self.tcp_port_out = packet[TCP].sport
2033                     self.assert_packet_checksums_valid(packet)
2034                 elif packet.haslayer(UDP):
2035                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2036                     self.assertEqual(packet[UDP].dport, server_udp_port)
2037                     self.udp_port_out = packet[UDP].sport
2038                 else:
2039                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2040                     self.icmp_id_out = packet[ICMP].id
2041             except:
2042                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2043                 raise
2044
2045         # server1 to host
2046         pkts = []
2047         p = (
2048             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2049             / IP(src=server1.ip4, dst=self.nat_addr)
2050             / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
2051         )
2052         pkts.append(p)
2053         p = (
2054             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2055             / IP(src=server1.ip4, dst=self.nat_addr)
2056             / UDP(sport=server_udp_port, dport=self.udp_port_out)
2057         )
2058         pkts.append(p)
2059         p = (
2060             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2061             / IP(src=server1.ip4, dst=self.nat_addr)
2062             / ICMP(id=self.icmp_id_out, type="echo-reply")
2063         )
2064         pkts.append(p)
2065         self.pg0.add_stream(pkts)
2066         self.pg_enable_capture(self.pg_interfaces)
2067         self.pg_start()
2068         capture = self.pg0.get_capture(len(pkts))
2069         for packet in capture:
2070             try:
2071                 self.assertEqual(packet[IP].src, server1_nat_ip)
2072                 self.assertEqual(packet[IP].dst, host.ip4)
2073                 if packet.haslayer(TCP):
2074                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2075                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2076                     self.assert_packet_checksums_valid(packet)
2077                 elif packet.haslayer(UDP):
2078                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2079                     self.assertEqual(packet[UDP].sport, server_udp_port)
2080                 else:
2081                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2082             except:
2083                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2084                 raise
2085
2086         # server2 to server1
2087         pkts = []
2088         p = (
2089             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2090             / IP(src=server2.ip4, dst=server1_nat_ip)
2091             / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
2092         )
2093         pkts.append(p)
2094         p = (
2095             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2096             / IP(src=server2.ip4, dst=server1_nat_ip)
2097             / UDP(sport=self.udp_port_in, dport=server_udp_port)
2098         )
2099         pkts.append(p)
2100         p = (
2101             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2102             / IP(src=server2.ip4, dst=server1_nat_ip)
2103             / ICMP(id=self.icmp_id_in, type="echo-request")
2104         )
2105         pkts.append(p)
2106         self.pg0.add_stream(pkts)
2107         self.pg_enable_capture(self.pg_interfaces)
2108         self.pg_start()
2109         capture = self.pg0.get_capture(len(pkts))
2110         for packet in capture:
2111             try:
2112                 self.assertEqual(packet[IP].src, server2_nat_ip)
2113                 self.assertEqual(packet[IP].dst, server1.ip4)
2114                 if packet.haslayer(TCP):
2115                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2116                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2117                     self.tcp_port_out = packet[TCP].sport
2118                     self.assert_packet_checksums_valid(packet)
2119                 elif packet.haslayer(UDP):
2120                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
2121                     self.assertEqual(packet[UDP].dport, server_udp_port)
2122                     self.udp_port_out = packet[UDP].sport
2123                 else:
2124                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2125                     self.icmp_id_out = packet[ICMP].id
2126             except:
2127                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2128                 raise
2129
2130         # server1 to server2
2131         pkts = []
2132         p = (
2133             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2134             / IP(src=server1.ip4, dst=server2_nat_ip)
2135             / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
2136         )
2137         pkts.append(p)
2138         p = (
2139             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2140             / IP(src=server1.ip4, dst=server2_nat_ip)
2141             / UDP(sport=server_udp_port, dport=self.udp_port_out)
2142         )
2143         pkts.append(p)
2144         p = (
2145             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2146             / IP(src=server1.ip4, dst=server2_nat_ip)
2147             / ICMP(id=self.icmp_id_out, type="echo-reply")
2148         )
2149         pkts.append(p)
2150         self.pg0.add_stream(pkts)
2151         self.pg_enable_capture(self.pg_interfaces)
2152         self.pg_start()
2153         capture = self.pg0.get_capture(len(pkts))
2154         for packet in capture:
2155             try:
2156                 self.assertEqual(packet[IP].src, server1_nat_ip)
2157                 self.assertEqual(packet[IP].dst, server2.ip4)
2158                 if packet.haslayer(TCP):
2159                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2160                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2161                     self.assert_packet_checksums_valid(packet)
2162                 elif packet.haslayer(UDP):
2163                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2164                     self.assertEqual(packet[UDP].sport, server_udp_port)
2165                 else:
2166                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2167             except:
2168                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2169                 raise
2170
2171     def test_hairpinning_avoid_inf_loop(self):
2172         """NAT44EI hairpinning - 1:1 NAPT avoid infinite loop"""
2173
2174         host = self.pg0.remote_hosts[0]
2175         server = self.pg0.remote_hosts[1]
2176         host_in_port = 1234
2177         host_out_port = 0
2178         server_in_port = 5678
2179         server_out_port = 8765
2180
2181         self.nat44_add_address(self.nat_addr)
2182         flags = self.config_flags.NAT44_EI_IF_INSIDE
2183         self.vapi.nat44_ei_interface_add_del_feature(
2184             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2185         )
2186         self.vapi.nat44_ei_interface_add_del_feature(
2187             sw_if_index=self.pg1.sw_if_index, is_add=1
2188         )
2189
2190         # add static mapping for server
2191         self.nat44_add_static_mapping(
2192             server.ip4,
2193             self.nat_addr,
2194             server_in_port,
2195             server_out_port,
2196             proto=IP_PROTOS.tcp,
2197         )
2198
2199         # add another static mapping that maps pg0.local_ip4 address to itself
2200         self.nat44_add_static_mapping(self.pg0.local_ip4, self.pg0.local_ip4)
2201
2202         # send packet from host to VPP (the packet should get dropped)
2203         p = (
2204             Ether(src=host.mac, dst=self.pg0.local_mac)
2205             / IP(src=host.ip4, dst=self.pg0.local_ip4)
2206             / TCP(sport=host_in_port, dport=server_out_port)
2207         )
2208         self.pg0.add_stream(p)
2209         self.pg_enable_capture(self.pg_interfaces)
2210         self.pg_start()
2211         # Here VPP used to crash due to an infinite loop
2212
2213         cnt = self.statistics["/nat44-ei/hairpinning"]
2214         # send packet from host to server
2215         p = (
2216             Ether(src=host.mac, dst=self.pg0.local_mac)
2217             / IP(src=host.ip4, dst=self.nat_addr)
2218             / TCP(sport=host_in_port, dport=server_out_port)
2219         )
2220         self.pg0.add_stream(p)
2221         self.pg_enable_capture(self.pg_interfaces)
2222         self.pg_start()
2223         capture = self.pg0.get_capture(1)
2224         p = capture[0]
2225         try:
2226             ip = p[IP]
2227             tcp = p[TCP]
2228             self.assertEqual(ip.src, self.nat_addr)
2229             self.assertEqual(ip.dst, server.ip4)
2230             self.assertNotEqual(tcp.sport, host_in_port)
2231             self.assertEqual(tcp.dport, server_in_port)
2232             self.assert_packet_checksums_valid(p)
2233             host_out_port = tcp.sport
2234         except:
2235             self.logger.error(ppp("Unexpected or invalid packet:", p))
2236             raise
2237
2238         after = self.statistics["/nat44-ei/hairpinning"]
2239         if_idx = self.pg0.sw_if_index
2240         self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
2241
2242         # send reply from server to host
2243         p = (
2244             Ether(src=server.mac, dst=self.pg0.local_mac)
2245             / IP(src=server.ip4, dst=self.nat_addr)
2246             / TCP(sport=server_in_port, dport=host_out_port)
2247         )
2248         self.pg0.add_stream(p)
2249         self.pg_enable_capture(self.pg_interfaces)
2250         self.pg_start()
2251         capture = self.pg0.get_capture(1)
2252         p = capture[0]
2253         try:
2254             ip = p[IP]
2255             tcp = p[TCP]
2256             self.assertEqual(ip.src, self.nat_addr)
2257             self.assertEqual(ip.dst, host.ip4)
2258             self.assertEqual(tcp.sport, server_out_port)
2259             self.assertEqual(tcp.dport, host_in_port)
2260             self.assert_packet_checksums_valid(p)
2261         except:
2262             self.logger.error(ppp("Unexpected or invalid packet:", p))
2263             raise
2264
2265         after = self.statistics["/nat44-ei/hairpinning"]
2266         if_idx = self.pg0.sw_if_index
2267         self.assertEqual(
2268             after[:, if_idx].sum() - cnt[:, if_idx].sum(),
2269             2 + (1 if self.vpp_worker_count > 0 else 0),
2270         )
2271
2272     def test_interface_addr(self):
2273         """NAT44EI acquire addresses from interface"""
2274         self.vapi.nat44_ei_add_del_interface_addr(
2275             is_add=1, sw_if_index=self.pg7.sw_if_index
2276         )
2277
2278         # no address in NAT pool
2279         addresses = self.vapi.nat44_ei_address_dump()
2280         self.assertEqual(0, len(addresses))
2281
2282         # configure interface address and check NAT address pool
2283         self.pg7.config_ip4()
2284         addresses = self.vapi.nat44_ei_address_dump()
2285         self.assertEqual(1, len(addresses))
2286         self.assertEqual(str(addresses[0].ip_address), self.pg7.local_ip4)
2287
2288         # remove interface address and check NAT address pool
2289         self.pg7.unconfig_ip4()
2290         addresses = self.vapi.nat44_ei_address_dump()
2291         self.assertEqual(0, len(addresses))
2292
2293     def test_interface_addr_static_mapping(self):
2294         """NAT44EI Static mapping with addresses from interface"""
2295         tag = "testTAG"
2296
2297         self.vapi.nat44_ei_add_del_interface_addr(
2298             is_add=1, sw_if_index=self.pg7.sw_if_index
2299         )
2300         self.nat44_add_static_mapping(
2301             "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag
2302         )
2303
2304         # static mappings with external interface
2305         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2306         self.assertEqual(1, len(static_mappings))
2307         self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index)
2308         self.assertEqual(static_mappings[0].tag, tag)
2309
2310         # configure interface address and check static mappings
2311         self.pg7.config_ip4()
2312         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2313         self.assertEqual(2, len(static_mappings))
2314         resolved = False
2315         for sm in static_mappings:
2316             if sm.external_sw_if_index == 0xFFFFFFFF:
2317                 self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4)
2318                 self.assertEqual(sm.tag, tag)
2319                 resolved = True
2320         self.assertTrue(resolved)
2321
2322         # remove interface address and check static mappings
2323         self.pg7.unconfig_ip4()
2324         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2325         self.assertEqual(1, len(static_mappings))
2326         self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index)
2327         self.assertEqual(static_mappings[0].tag, tag)
2328
2329         # configure interface address again and check static mappings
2330         self.pg7.config_ip4()
2331         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2332         self.assertEqual(2, len(static_mappings))
2333         resolved = False
2334         for sm in static_mappings:
2335             if sm.external_sw_if_index == 0xFFFFFFFF:
2336                 self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4)
2337                 self.assertEqual(sm.tag, tag)
2338                 resolved = True
2339         self.assertTrue(resolved)
2340
2341         # remove static mapping
2342         self.nat44_add_static_mapping(
2343             "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag, is_add=0
2344         )
2345         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2346         self.assertEqual(0, len(static_mappings))
2347
2348     def test_interface_addr_identity_nat(self):
2349         """NAT44EI Identity NAT with addresses from interface"""
2350
2351         port = 53053
2352         self.vapi.nat44_ei_add_del_interface_addr(
2353             is_add=1, sw_if_index=self.pg7.sw_if_index
2354         )
2355         self.vapi.nat44_ei_add_del_identity_mapping(
2356             ip_address=b"0",
2357             sw_if_index=self.pg7.sw_if_index,
2358             port=port,
2359             protocol=IP_PROTOS.tcp,
2360             is_add=1,
2361         )
2362
2363         # identity mappings with external interface
2364         identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2365         self.assertEqual(1, len(identity_mappings))
2366         self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index)
2367
2368         # configure interface address and check identity mappings
2369         self.pg7.config_ip4()
2370         identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2371         resolved = False
2372         self.assertEqual(2, len(identity_mappings))
2373         for sm in identity_mappings:
2374             if sm.sw_if_index == 0xFFFFFFFF:
2375                 self.assertEqual(
2376                     str(identity_mappings[0].ip_address), self.pg7.local_ip4
2377                 )
2378                 self.assertEqual(port, identity_mappings[0].port)
2379                 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2380                 resolved = True
2381         self.assertTrue(resolved)
2382
2383         # remove interface address and check identity mappings
2384         self.pg7.unconfig_ip4()
2385         identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2386         self.assertEqual(1, len(identity_mappings))
2387         self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index)
2388
2389     def test_ipfix_nat44_sess(self):
2390         """NAT44EI IPFIX logging NAT44EI session created/deleted"""
2391         self.ipfix_domain_id = 10
2392         self.ipfix_src_port = 20202
2393         collector_port = 30303
2394         bind_layers(UDP, IPFIX, dport=30303)
2395         self.nat44_add_address(self.nat_addr)
2396         flags = self.config_flags.NAT44_EI_IF_INSIDE
2397         self.vapi.nat44_ei_interface_add_del_feature(
2398             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2399         )
2400         self.vapi.nat44_ei_interface_add_del_feature(
2401             sw_if_index=self.pg1.sw_if_index, is_add=1
2402         )
2403         self.vapi.set_ipfix_exporter(
2404             collector_address=self.pg3.remote_ip4,
2405             src_address=self.pg3.local_ip4,
2406             path_mtu=512,
2407             template_interval=10,
2408             collector_port=collector_port,
2409         )
2410         self.vapi.nat44_ei_ipfix_enable_disable(
2411             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2412         )
2413
2414         pkts = self.create_stream_in(self.pg0, self.pg1)
2415         self.pg0.add_stream(pkts)
2416         self.pg_enable_capture(self.pg_interfaces)
2417         self.pg_start()
2418         capture = self.pg1.get_capture(len(pkts))
2419         self.verify_capture_out(capture)
2420         self.nat44_add_address(self.nat_addr, is_add=0)
2421         self.vapi.ipfix_flush()
2422         capture = self.pg3.get_capture(7)
2423         ipfix = IPFIXDecoder()
2424         # first load template
2425         for p in capture:
2426             self.assertTrue(p.haslayer(IPFIX))
2427             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2428             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2429             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2430             self.assertEqual(p[UDP].dport, collector_port)
2431             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2432             if p.haslayer(Template):
2433                 ipfix.add_template(p.getlayer(Template))
2434         # verify events in data set
2435         for p in capture:
2436             if p.haslayer(Data):
2437                 data = ipfix.decode_data_set(p.getlayer(Set))
2438                 self.verify_ipfix_nat44_ses(data)
2439
2440     def test_ipfix_addr_exhausted(self):
2441         """NAT44EI IPFIX logging NAT addresses exhausted"""
2442         flags = self.config_flags.NAT44_EI_IF_INSIDE
2443         self.vapi.nat44_ei_interface_add_del_feature(
2444             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2445         )
2446         self.vapi.nat44_ei_interface_add_del_feature(
2447             sw_if_index=self.pg1.sw_if_index, is_add=1
2448         )
2449         self.vapi.set_ipfix_exporter(
2450             collector_address=self.pg3.remote_ip4,
2451             src_address=self.pg3.local_ip4,
2452             path_mtu=512,
2453             template_interval=10,
2454         )
2455         self.vapi.nat44_ei_ipfix_enable_disable(
2456             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2457         )
2458
2459         p = (
2460             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2461             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2462             / TCP(sport=3025)
2463         )
2464         self.pg0.add_stream(p)
2465         self.pg_enable_capture(self.pg_interfaces)
2466         self.pg_start()
2467         self.pg1.assert_nothing_captured()
2468         self.vapi.ipfix_flush()
2469         capture = self.pg3.get_capture(7)
2470         ipfix = IPFIXDecoder()
2471         # first load template
2472         for p in capture:
2473             self.assertTrue(p.haslayer(IPFIX))
2474             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2475             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2476             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2477             self.assertEqual(p[UDP].dport, 4739)
2478             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2479             if p.haslayer(Template):
2480                 ipfix.add_template(p.getlayer(Template))
2481         # verify events in data set
2482         for p in capture:
2483             if p.haslayer(Data):
2484                 data = ipfix.decode_data_set(p.getlayer(Set))
2485                 self.verify_ipfix_addr_exhausted(data)
2486
2487     def test_ipfix_max_sessions(self):
2488         """NAT44EI IPFIX logging maximum session entries exceeded"""
2489         self.nat44_add_address(self.nat_addr)
2490         flags = self.config_flags.NAT44_EI_IF_INSIDE
2491         self.vapi.nat44_ei_interface_add_del_feature(
2492             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2493         )
2494         self.vapi.nat44_ei_interface_add_del_feature(
2495             sw_if_index=self.pg1.sw_if_index, is_add=1
2496         )
2497
2498         max_sessions_per_thread = self.max_translations
2499         max_sessions = max(1, self.vpp_worker_count) * max_sessions_per_thread
2500
2501         pkts = []
2502         for i in range(0, max_sessions):
2503             src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2504             p = (
2505                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2506                 / IP(src=src, dst=self.pg1.remote_ip4)
2507                 / TCP(sport=1025)
2508             )
2509             pkts.append(p)
2510         self.pg0.add_stream(pkts)
2511         self.pg_enable_capture(self.pg_interfaces)
2512         self.pg_start()
2513
2514         self.pg1.get_capture(max_sessions)
2515         self.vapi.set_ipfix_exporter(
2516             collector_address=self.pg3.remote_ip4,
2517             src_address=self.pg3.local_ip4,
2518             path_mtu=512,
2519             template_interval=10,
2520         )
2521         self.vapi.nat44_ei_ipfix_enable_disable(
2522             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2523         )
2524
2525         p = (
2526             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2527             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2528             / TCP(sport=1025)
2529         )
2530         self.pg0.add_stream(p)
2531         self.pg_enable_capture(self.pg_interfaces)
2532         self.pg_start()
2533         self.pg1.assert_nothing_captured()
2534         self.vapi.ipfix_flush()
2535         capture = self.pg3.get_capture(7)
2536         ipfix = IPFIXDecoder()
2537         # first load template
2538         for p in capture:
2539             self.assertTrue(p.haslayer(IPFIX))
2540             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2541             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2542             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2543             self.assertEqual(p[UDP].dport, 4739)
2544             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2545             if p.haslayer(Template):
2546                 ipfix.add_template(p.getlayer(Template))
2547         # verify events in data set
2548         for p in capture:
2549             if p.haslayer(Data):
2550                 data = ipfix.decode_data_set(p.getlayer(Set))
2551                 self.verify_ipfix_max_sessions(data, max_sessions_per_thread)
2552
2553     def test_syslog_apmap(self):
2554         """NAT44EI syslog address and port mapping creation and deletion"""
2555         self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2556         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2557         self.nat44_add_address(self.nat_addr)
2558         flags = self.config_flags.NAT44_EI_IF_INSIDE
2559         self.vapi.nat44_ei_interface_add_del_feature(
2560             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2561         )
2562         self.vapi.nat44_ei_interface_add_del_feature(
2563             sw_if_index=self.pg1.sw_if_index, is_add=1
2564         )
2565
2566         p = (
2567             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2568             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2569             / TCP(sport=self.tcp_port_in, dport=20)
2570         )
2571         self.pg0.add_stream(p)
2572         self.pg_enable_capture(self.pg_interfaces)
2573         self.pg_start()
2574         capture = self.pg1.get_capture(1)
2575         self.tcp_port_out = capture[0][TCP].sport
2576         capture = self.pg3.get_capture(1)
2577         self.verify_syslog_apmap(capture[0][Raw].load)
2578
2579         self.pg_enable_capture(self.pg_interfaces)
2580         self.pg_start()
2581         self.nat44_add_address(self.nat_addr, is_add=0)
2582         capture = self.pg3.get_capture(1)
2583         self.verify_syslog_apmap(capture[0][Raw].load, False)
2584
2585     def test_pool_addr_fib(self):
2586         """NAT44EI add pool addresses to FIB"""
2587         static_addr = "10.0.0.10"
2588         self.nat44_add_address(self.nat_addr)
2589         flags = self.config_flags.NAT44_EI_IF_INSIDE
2590         self.vapi.nat44_ei_interface_add_del_feature(
2591             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2592         )
2593         self.vapi.nat44_ei_interface_add_del_feature(
2594             sw_if_index=self.pg1.sw_if_index, is_add=1
2595         )
2596         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2597
2598         # NAT44EI address
2599         p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2600             op=ARP.who_has,
2601             pdst=self.nat_addr,
2602             psrc=self.pg1.remote_ip4,
2603             hwsrc=self.pg1.remote_mac,
2604         )
2605         self.pg1.add_stream(p)
2606         self.pg_enable_capture(self.pg_interfaces)
2607         self.pg_start()
2608         capture = self.pg1.get_capture(1)
2609         self.assertTrue(capture[0].haslayer(ARP))
2610         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2611
2612         # 1:1 NAT address
2613         p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2614             op=ARP.who_has,
2615             pdst=static_addr,
2616             psrc=self.pg1.remote_ip4,
2617             hwsrc=self.pg1.remote_mac,
2618         )
2619         self.pg1.add_stream(p)
2620         self.pg_enable_capture(self.pg_interfaces)
2621         self.pg_start()
2622         capture = self.pg1.get_capture(1)
2623         self.assertTrue(capture[0].haslayer(ARP))
2624         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2625
2626         # send ARP to non-NAT44EI interface
2627         p = Ether(src=self.pg2.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2628             op=ARP.who_has,
2629             pdst=self.nat_addr,
2630             psrc=self.pg2.remote_ip4,
2631             hwsrc=self.pg2.remote_mac,
2632         )
2633         self.pg2.add_stream(p)
2634         self.pg_enable_capture(self.pg_interfaces)
2635         self.pg_start()
2636         self.pg1.assert_nothing_captured()
2637
2638         # remove addresses and verify
2639         self.nat44_add_address(self.nat_addr, is_add=0)
2640         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr, is_add=0)
2641
2642         p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2643             op=ARP.who_has,
2644             pdst=self.nat_addr,
2645             psrc=self.pg1.remote_ip4,
2646             hwsrc=self.pg1.remote_mac,
2647         )
2648         self.pg1.add_stream(p)
2649         self.pg_enable_capture(self.pg_interfaces)
2650         self.pg_start()
2651         self.pg1.assert_nothing_captured()
2652
2653         p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2654             op=ARP.who_has,
2655             pdst=static_addr,
2656             psrc=self.pg1.remote_ip4,
2657             hwsrc=self.pg1.remote_mac,
2658         )
2659         self.pg1.add_stream(p)
2660         self.pg_enable_capture(self.pg_interfaces)
2661         self.pg_start()
2662         self.pg1.assert_nothing_captured()
2663
2664     def test_vrf_mode(self):
2665         """NAT44EI tenant VRF aware address pool mode"""
2666
2667         vrf_id1 = 1
2668         vrf_id2 = 2
2669         nat_ip1 = "10.0.0.10"
2670         nat_ip2 = "10.0.0.11"
2671
2672         self.pg0.unconfig_ip4()
2673         self.pg1.unconfig_ip4()
2674         self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1})
2675         self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2})
2676         self.pg0.set_table_ip4(vrf_id1)
2677         self.pg1.set_table_ip4(vrf_id2)
2678         self.pg0.config_ip4()
2679         self.pg1.config_ip4()
2680         self.pg0.resolve_arp()
2681         self.pg1.resolve_arp()
2682
2683         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2684         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2685         flags = self.config_flags.NAT44_EI_IF_INSIDE
2686         self.vapi.nat44_ei_interface_add_del_feature(
2687             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2688         )
2689         self.vapi.nat44_ei_interface_add_del_feature(
2690             sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
2691         )
2692         self.vapi.nat44_ei_interface_add_del_feature(
2693             sw_if_index=self.pg2.sw_if_index, is_add=1
2694         )
2695
2696         try:
2697             # first VRF
2698             pkts = self.create_stream_in(self.pg0, self.pg2)
2699             self.pg0.add_stream(pkts)
2700             self.pg_enable_capture(self.pg_interfaces)
2701             self.pg_start()
2702             capture = self.pg2.get_capture(len(pkts))
2703             self.verify_capture_out(capture, nat_ip1)
2704
2705             # second VRF
2706             pkts = self.create_stream_in(self.pg1, self.pg2)
2707             self.pg1.add_stream(pkts)
2708             self.pg_enable_capture(self.pg_interfaces)
2709             self.pg_start()
2710             capture = self.pg2.get_capture(len(pkts))
2711             self.verify_capture_out(capture, nat_ip2)
2712
2713         finally:
2714             self.pg0.unconfig_ip4()
2715             self.pg1.unconfig_ip4()
2716             self.pg0.set_table_ip4(0)
2717             self.pg1.set_table_ip4(0)
2718             self.pg0.config_ip4()
2719             self.pg1.config_ip4()
2720             self.pg0.resolve_arp()
2721             self.pg1.resolve_arp()
2722             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id1})
2723             self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id2})
2724
2725     def test_vrf_feature_independent(self):
2726         """NAT44EI tenant VRF independent address pool mode"""
2727
2728         nat_ip1 = "10.0.0.10"
2729         nat_ip2 = "10.0.0.11"
2730
2731         self.nat44_add_address(nat_ip1)
2732         self.nat44_add_address(nat_ip2, vrf_id=99)
2733         flags = self.config_flags.NAT44_EI_IF_INSIDE
2734         self.vapi.nat44_ei_interface_add_del_feature(
2735             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2736         )
2737         self.vapi.nat44_ei_interface_add_del_feature(
2738             sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
2739         )
2740         self.vapi.nat44_ei_interface_add_del_feature(
2741             sw_if_index=self.pg2.sw_if_index, is_add=1
2742         )
2743
2744         # first VRF
2745         pkts = self.create_stream_in(self.pg0, self.pg2)
2746         self.pg0.add_stream(pkts)
2747         self.pg_enable_capture(self.pg_interfaces)
2748         self.pg_start()
2749         capture = self.pg2.get_capture(len(pkts))
2750         self.verify_capture_out(capture, nat_ip1)
2751
2752         # second VRF
2753         pkts = self.create_stream_in(self.pg1, self.pg2)
2754         self.pg1.add_stream(pkts)
2755         self.pg_enable_capture(self.pg_interfaces)
2756         self.pg_start()
2757         capture = self.pg2.get_capture(len(pkts))
2758         self.verify_capture_out(capture, nat_ip1)
2759
2760     def test_dynamic_ipless_interfaces(self):
2761         """NAT44EI interfaces without configured IP address"""
2762         self.create_routes_and_neigbors()
2763         self.nat44_add_address(self.nat_addr)
2764         flags = self.config_flags.NAT44_EI_IF_INSIDE
2765         self.vapi.nat44_ei_interface_add_del_feature(
2766             sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2767         )
2768         self.vapi.nat44_ei_interface_add_del_feature(
2769             sw_if_index=self.pg8.sw_if_index, is_add=1
2770         )
2771
2772         # in2out
2773         pkts = self.create_stream_in(self.pg7, self.pg8)
2774         self.pg7.add_stream(pkts)
2775         self.pg_enable_capture(self.pg_interfaces)
2776         self.pg_start()
2777         capture = self.pg8.get_capture(len(pkts))
2778         self.verify_capture_out(capture)
2779
2780         # out2in
2781         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2782         self.pg8.add_stream(pkts)
2783         self.pg_enable_capture(self.pg_interfaces)
2784         self.pg_start()
2785         capture = self.pg7.get_capture(len(pkts))
2786         self.verify_capture_in(capture, self.pg7)
2787
2788     def test_static_ipless_interfaces(self):
2789         """NAT44EI interfaces without configured IP address - 1:1 NAT"""
2790
2791         self.create_routes_and_neigbors()
2792         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2793         flags = self.config_flags.NAT44_EI_IF_INSIDE
2794         self.vapi.nat44_ei_interface_add_del_feature(
2795             sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2796         )
2797         self.vapi.nat44_ei_interface_add_del_feature(
2798             sw_if_index=self.pg8.sw_if_index, is_add=1
2799         )
2800
2801         # out2in
2802         pkts = self.create_stream_out(self.pg8)
2803         self.pg8.add_stream(pkts)
2804         self.pg_enable_capture(self.pg_interfaces)
2805         self.pg_start()
2806         capture = self.pg7.get_capture(len(pkts))
2807         self.verify_capture_in(capture, self.pg7)
2808
2809         # in2out
2810         pkts = self.create_stream_in(self.pg7, self.pg8)
2811         self.pg7.add_stream(pkts)
2812         self.pg_enable_capture(self.pg_interfaces)
2813         self.pg_start()
2814         capture = self.pg8.get_capture(len(pkts))
2815         self.verify_capture_out(capture, self.nat_addr, True)
2816
2817     def test_static_with_port_ipless_interfaces(self):
2818         """NAT44EI interfaces without configured IP address - 1:1 NAPT"""
2819
2820         self.tcp_port_out = 30606
2821         self.udp_port_out = 30607
2822         self.icmp_id_out = 30608
2823
2824         self.create_routes_and_neigbors()
2825         self.nat44_add_address(self.nat_addr)
2826         self.nat44_add_static_mapping(
2827             self.pg7.remote_ip4,
2828             self.nat_addr,
2829             self.tcp_port_in,
2830             self.tcp_port_out,
2831             proto=IP_PROTOS.tcp,
2832         )
2833         self.nat44_add_static_mapping(
2834             self.pg7.remote_ip4,
2835             self.nat_addr,
2836             self.udp_port_in,
2837             self.udp_port_out,
2838             proto=IP_PROTOS.udp,
2839         )
2840         self.nat44_add_static_mapping(
2841             self.pg7.remote_ip4,
2842             self.nat_addr,
2843             self.icmp_id_in,
2844             self.icmp_id_out,
2845             proto=IP_PROTOS.icmp,
2846         )
2847         flags = self.config_flags.NAT44_EI_IF_INSIDE
2848         self.vapi.nat44_ei_interface_add_del_feature(
2849             sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2850         )
2851         self.vapi.nat44_ei_interface_add_del_feature(
2852             sw_if_index=self.pg8.sw_if_index, is_add=1
2853         )
2854
2855         # out2in
2856         pkts = self.create_stream_out(self.pg8)
2857         self.pg8.add_stream(pkts)
2858         self.pg_enable_capture(self.pg_interfaces)
2859         self.pg_start()
2860         capture = self.pg7.get_capture(len(pkts))
2861         self.verify_capture_in(capture, self.pg7)
2862
2863         # in2out
2864         pkts = self.create_stream_in(self.pg7, self.pg8)
2865         self.pg7.add_stream(pkts)
2866         self.pg_enable_capture(self.pg_interfaces)
2867         self.pg_start()
2868         capture = self.pg8.get_capture(len(pkts))
2869         self.verify_capture_out(capture)
2870
2871     def test_static_unknown_proto(self):
2872         """NAT44EI 1:1 translate packet with unknown protocol"""
2873         nat_ip = "10.0.0.10"
2874         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2875         flags = self.config_flags.NAT44_EI_IF_INSIDE
2876         self.vapi.nat44_ei_interface_add_del_feature(
2877             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2878         )
2879         self.vapi.nat44_ei_interface_add_del_feature(
2880             sw_if_index=self.pg1.sw_if_index, is_add=1
2881         )
2882
2883         # in2out
2884         p = (
2885             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2886             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2887             / GRE()
2888             / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4)
2889             / TCP(sport=1234, dport=1234)
2890         )
2891         self.pg0.add_stream(p)
2892         self.pg_enable_capture(self.pg_interfaces)
2893         self.pg_start()
2894         p = self.pg1.get_capture(1)
2895         packet = p[0]
2896         try:
2897             self.assertEqual(packet[IP].src, nat_ip)
2898             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2899             self.assertEqual(packet.haslayer(GRE), 1)
2900             self.assert_packet_checksums_valid(packet)
2901         except:
2902             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2903             raise
2904
2905         # out2in
2906         p = (
2907             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2908             / IP(src=self.pg1.remote_ip4, dst=nat_ip)
2909             / GRE()
2910             / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4)
2911             / TCP(sport=1234, dport=1234)
2912         )
2913         self.pg1.add_stream(p)
2914         self.pg_enable_capture(self.pg_interfaces)
2915         self.pg_start()
2916         p = self.pg0.get_capture(1)
2917         packet = p[0]
2918         try:
2919             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2920             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2921             self.assertEqual(packet.haslayer(GRE), 1)
2922             self.assert_packet_checksums_valid(packet)
2923         except:
2924             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2925             raise
2926
2927     def test_hairpinning_static_unknown_proto(self):
2928         """NAT44EI 1:1 translate packet with unknown protocol - hairpinning"""
2929
2930         host = self.pg0.remote_hosts[0]
2931         server = self.pg0.remote_hosts[1]
2932
2933         host_nat_ip = "10.0.0.10"
2934         server_nat_ip = "10.0.0.11"
2935
2936         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2937         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2938         flags = self.config_flags.NAT44_EI_IF_INSIDE
2939         self.vapi.nat44_ei_interface_add_del_feature(
2940             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2941         )
2942         self.vapi.nat44_ei_interface_add_del_feature(
2943             sw_if_index=self.pg1.sw_if_index, is_add=1
2944         )
2945
2946         # host to server
2947         p = (
2948             Ether(dst=self.pg0.local_mac, src=host.mac)
2949             / IP(src=host.ip4, dst=server_nat_ip)
2950             / GRE()
2951             / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4)
2952             / TCP(sport=1234, dport=1234)
2953         )
2954         self.pg0.add_stream(p)
2955         self.pg_enable_capture(self.pg_interfaces)
2956         self.pg_start()
2957         p = self.pg0.get_capture(1)
2958         packet = p[0]
2959         try:
2960             self.assertEqual(packet[IP].src, host_nat_ip)
2961             self.assertEqual(packet[IP].dst, server.ip4)
2962             self.assertEqual(packet.haslayer(GRE), 1)
2963             self.assert_packet_checksums_valid(packet)
2964         except:
2965             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2966             raise
2967
2968         # server to host
2969         p = (
2970             Ether(dst=self.pg0.local_mac, src=server.mac)
2971             / IP(src=server.ip4, dst=host_nat_ip)
2972             / GRE()
2973             / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4)
2974             / TCP(sport=1234, dport=1234)
2975         )
2976         self.pg0.add_stream(p)
2977         self.pg_enable_capture(self.pg_interfaces)
2978         self.pg_start()
2979         p = self.pg0.get_capture(1)
2980         packet = p[0]
2981         try:
2982             self.assertEqual(packet[IP].src, server_nat_ip)
2983             self.assertEqual(packet[IP].dst, host.ip4)
2984             self.assertEqual(packet.haslayer(GRE), 1)
2985             self.assert_packet_checksums_valid(packet)
2986         except:
2987             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2988             raise
2989
2990     def test_output_feature(self):
2991         """NAT44EI output feature (in2out postrouting)"""
2992         self.nat44_add_address(self.nat_addr)
2993         self.vapi.nat44_ei_add_del_output_interface(
2994             sw_if_index=self.pg3.sw_if_index, is_add=1
2995         )
2996
2997         # in2out
2998         pkts = self.create_stream_in(self.pg0, self.pg3)
2999         self.pg0.add_stream(pkts)
3000         self.pg_enable_capture(self.pg_interfaces)
3001         self.pg_start()
3002         capture = self.pg3.get_capture(len(pkts))
3003         self.verify_capture_out(capture)
3004
3005         # out2in
3006         pkts = self.create_stream_out(self.pg3)
3007         self.pg3.add_stream(pkts)
3008         self.pg_enable_capture(self.pg_interfaces)
3009         self.pg_start()
3010         capture = self.pg0.get_capture(len(pkts))
3011         self.verify_capture_in(capture, self.pg0)
3012
3013         # from non-NAT interface to NAT inside interface
3014         pkts = self.create_stream_in(self.pg2, self.pg0)
3015         self.pg2.add_stream(pkts)
3016         self.pg_enable_capture(self.pg_interfaces)
3017         self.pg_start()
3018         capture = self.pg0.get_capture(len(pkts))
3019         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3020
3021     def test_output_feature_vrf_aware(self):
3022         """NAT44EI output feature VRF aware (in2out postrouting)"""
3023         nat_ip_vrf10 = "10.0.0.10"
3024         nat_ip_vrf20 = "10.0.0.20"
3025
3026         r1 = VppIpRoute(
3027             self,
3028             self.pg3.remote_ip4,
3029             32,
3030             [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
3031             table_id=10,
3032         )
3033         r2 = VppIpRoute(
3034             self,
3035             self.pg3.remote_ip4,
3036             32,
3037             [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
3038             table_id=20,
3039         )
3040         r1.add_vpp_config()
3041         r2.add_vpp_config()
3042
3043         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3044         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3045         self.vapi.nat44_ei_add_del_output_interface(
3046             sw_if_index=self.pg3.sw_if_index, is_add=1
3047         )
3048
3049         # in2out VRF 10
3050         pkts = self.create_stream_in(self.pg4, self.pg3)
3051         self.pg4.add_stream(pkts)
3052         self.pg_enable_capture(self.pg_interfaces)
3053         self.pg_start()
3054         capture = self.pg3.get_capture(len(pkts))
3055         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3056
3057         # out2in VRF 10
3058         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3059         self.pg3.add_stream(pkts)
3060         self.pg_enable_capture(self.pg_interfaces)
3061         self.pg_start()
3062         capture = self.pg4.get_capture(len(pkts))
3063         self.verify_capture_in(capture, self.pg4)
3064
3065         # in2out VRF 20
3066         pkts = self.create_stream_in(self.pg6, self.pg3)
3067         self.pg6.add_stream(pkts)
3068         self.pg_enable_capture(self.pg_interfaces)
3069         self.pg_start()
3070         capture = self.pg3.get_capture(len(pkts))
3071         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3072
3073         # out2in VRF 20
3074         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3075         self.pg3.add_stream(pkts)
3076         self.pg_enable_capture(self.pg_interfaces)
3077         self.pg_start()
3078         capture = self.pg6.get_capture(len(pkts))
3079         self.verify_capture_in(capture, self.pg6)
3080
3081     def test_output_feature_hairpinning(self):
3082         """NAT44EI output feature hairpinning (in2out postrouting)"""
3083         host = self.pg0.remote_hosts[0]
3084         server = self.pg0.remote_hosts[1]
3085         host_in_port = 1234
3086         host_out_port = 0
3087         server_in_port = 5678
3088         server_out_port = 8765
3089
3090         self.nat44_add_address(self.nat_addr)
3091         self.vapi.nat44_ei_add_del_output_interface(
3092             sw_if_index=self.pg0.sw_if_index, is_add=1
3093         )
3094         self.vapi.nat44_ei_add_del_output_interface(
3095             sw_if_index=self.pg1.sw_if_index, is_add=1
3096         )
3097
3098         # add static mapping for server
3099         self.nat44_add_static_mapping(
3100             server.ip4,
3101             self.nat_addr,
3102             server_in_port,
3103             server_out_port,
3104             proto=IP_PROTOS.tcp,
3105         )
3106
3107         # send packet from host to server
3108         p = (
3109             Ether(src=host.mac, dst=self.pg0.local_mac)
3110             / IP(src=host.ip4, dst=self.nat_addr)
3111             / TCP(sport=host_in_port, dport=server_out_port)
3112         )
3113         self.pg0.add_stream(p)
3114         self.pg_enable_capture(self.pg_interfaces)
3115         self.pg_start()
3116         capture = self.pg0.get_capture(1)
3117         p = capture[0]
3118         try:
3119             ip = p[IP]
3120             tcp = p[TCP]
3121             self.assertEqual(ip.src, self.nat_addr)
3122             self.assertEqual(ip.dst, server.ip4)
3123             self.assertNotEqual(tcp.sport, host_in_port)
3124             self.assertEqual(tcp.dport, server_in_port)
3125             self.assert_packet_checksums_valid(p)
3126             host_out_port = tcp.sport
3127         except:
3128             self.logger.error(ppp("Unexpected or invalid packet:", p))
3129             raise
3130
3131         # send reply from server to host
3132         p = (
3133             Ether(src=server.mac, dst=self.pg0.local_mac)
3134             / IP(src=server.ip4, dst=self.nat_addr)
3135             / TCP(sport=server_in_port, dport=host_out_port)
3136         )
3137         self.pg0.add_stream(p)
3138         self.pg_enable_capture(self.pg_interfaces)
3139         self.pg_start()
3140         capture = self.pg0.get_capture(1)
3141         p = capture[0]
3142         try:
3143             ip = p[IP]
3144             tcp = p[TCP]
3145             self.assertEqual(ip.src, self.nat_addr)
3146             self.assertEqual(ip.dst, host.ip4)
3147             self.assertEqual(tcp.sport, server_out_port)
3148             self.assertEqual(tcp.dport, host_in_port)
3149             self.assert_packet_checksums_valid(p)
3150         except:
3151             self.logger.error(ppp("Unexpected or invalid packet:", p))
3152             raise
3153
3154     def test_one_armed_nat44(self):
3155         """NAT44EI One armed NAT"""
3156         remote_host = self.pg9.remote_hosts[0]
3157         local_host = self.pg9.remote_hosts[1]
3158         external_port = 0
3159
3160         self.nat44_add_address(self.nat_addr)
3161         flags = self.config_flags.NAT44_EI_IF_INSIDE
3162         self.vapi.nat44_ei_interface_add_del_feature(
3163             sw_if_index=self.pg9.sw_if_index, is_add=1
3164         )
3165         self.vapi.nat44_ei_interface_add_del_feature(
3166             sw_if_index=self.pg9.sw_if_index, flags=flags, is_add=1
3167         )
3168
3169         # in2out
3170         p = (
3171             Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac)
3172             / IP(src=local_host.ip4, dst=remote_host.ip4)
3173             / TCP(sport=12345, dport=80)
3174         )
3175         self.pg9.add_stream(p)
3176         self.pg_enable_capture(self.pg_interfaces)
3177         self.pg_start()
3178         capture = self.pg9.get_capture(1)
3179         p = capture[0]
3180         try:
3181             ip = p[IP]
3182             tcp = p[TCP]
3183             self.assertEqual(ip.src, self.nat_addr)
3184             self.assertEqual(ip.dst, remote_host.ip4)
3185             self.assertNotEqual(tcp.sport, 12345)
3186             external_port = tcp.sport
3187             self.assertEqual(tcp.dport, 80)
3188             self.assert_packet_checksums_valid(p)
3189         except:
3190             self.logger.error(ppp("Unexpected or invalid packet:", p))
3191             raise
3192
3193         # out2in
3194         p = (
3195             Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac)
3196             / IP(src=remote_host.ip4, dst=self.nat_addr)
3197             / TCP(sport=80, dport=external_port)
3198         )
3199         self.pg9.add_stream(p)
3200         self.pg_enable_capture(self.pg_interfaces)
3201         self.pg_start()
3202         capture = self.pg9.get_capture(1)
3203         p = capture[0]
3204         try:
3205             ip = p[IP]
3206             tcp = p[TCP]
3207             self.assertEqual(ip.src, remote_host.ip4)
3208             self.assertEqual(ip.dst, local_host.ip4)
3209             self.assertEqual(tcp.sport, 80)
3210             self.assertEqual(tcp.dport, 12345)
3211             self.assert_packet_checksums_valid(p)
3212         except:
3213             self.logger.error(ppp("Unexpected or invalid packet:", p))
3214             raise
3215
3216         if self.vpp_worker_count > 1:
3217             node = "nat44-ei-handoff-classify"
3218         else:
3219             node = "nat44-ei-classify"
3220
3221         err = self.statistics.get_err_counter("/err/%s/next in2out" % node)
3222         self.assertEqual(err, 1)
3223         err = self.statistics.get_err_counter("/err/%s/next out2in" % node)
3224         self.assertEqual(err, 1)
3225
3226     def test_del_session(self):
3227         """NAT44EI delete session"""
3228         self.nat44_add_address(self.nat_addr)
3229         flags = self.config_flags.NAT44_EI_IF_INSIDE
3230         self.vapi.nat44_ei_interface_add_del_feature(
3231             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3232         )
3233         self.vapi.nat44_ei_interface_add_del_feature(
3234             sw_if_index=self.pg1.sw_if_index, is_add=1
3235         )
3236
3237         pkts = self.create_stream_in(self.pg0, self.pg1)
3238         self.pg0.add_stream(pkts)
3239         self.pg_enable_capture(self.pg_interfaces)
3240         self.pg_start()
3241         self.pg1.get_capture(len(pkts))
3242
3243         sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
3244         nsessions = len(sessions)
3245
3246         self.vapi.nat44_ei_del_session(
3247             address=sessions[0].inside_ip_address,
3248             port=sessions[0].inside_port,
3249             protocol=sessions[0].protocol,
3250             flags=self.config_flags.NAT44_EI_IF_INSIDE,
3251         )
3252
3253         self.vapi.nat44_ei_del_session(
3254             address=sessions[1].outside_ip_address,
3255             port=sessions[1].outside_port,
3256             protocol=sessions[1].protocol,
3257         )
3258
3259         sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
3260         self.assertEqual(nsessions - len(sessions), 2)
3261
3262         self.vapi.nat44_ei_del_session(
3263             address=sessions[0].inside_ip_address,
3264             port=sessions[0].inside_port,
3265             protocol=sessions[0].protocol,
3266             flags=self.config_flags.NAT44_EI_IF_INSIDE,
3267         )
3268
3269         self.verify_no_nat44_user()
3270
3271     def test_frag_in_order(self):
3272         """NAT44EI translate fragments arriving in order"""
3273
3274         self.nat44_add_address(self.nat_addr)
3275         flags = self.config_flags.NAT44_EI_IF_INSIDE
3276         self.vapi.nat44_ei_interface_add_del_feature(
3277             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3278         )
3279         self.vapi.nat44_ei_interface_add_del_feature(
3280             sw_if_index=self.pg1.sw_if_index, is_add=1
3281         )
3282
3283         self.frag_in_order(proto=IP_PROTOS.tcp)
3284         self.frag_in_order(proto=IP_PROTOS.udp)
3285         self.frag_in_order(proto=IP_PROTOS.icmp)
3286
3287     def test_frag_forwarding(self):
3288         """NAT44EI forwarding fragment test"""
3289         self.vapi.nat44_ei_add_del_interface_addr(
3290             is_add=1, sw_if_index=self.pg1.sw_if_index
3291         )
3292         flags = self.config_flags.NAT44_EI_IF_INSIDE
3293         self.vapi.nat44_ei_interface_add_del_feature(
3294             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3295         )
3296         self.vapi.nat44_ei_interface_add_del_feature(
3297             sw_if_index=self.pg1.sw_if_index, is_add=1
3298         )
3299         self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
3300
3301         data = b"A" * 16 + b"B" * 16 + b"C" * 3
3302         pkts = self.create_stream_frag(
3303             self.pg1, self.pg0.remote_ip4, 4789, 4789, data, proto=IP_PROTOS.udp
3304         )
3305         self.pg1.add_stream(pkts)
3306         self.pg_enable_capture(self.pg_interfaces)
3307         self.pg_start()
3308         frags = self.pg0.get_capture(len(pkts))
3309         p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
3310         self.assertEqual(p[UDP].sport, 4789)
3311         self.assertEqual(p[UDP].dport, 4789)
3312         self.assertEqual(data, p[Raw].load)
3313
3314     def test_reass_hairpinning(self):
3315         """NAT44EI fragments hairpinning"""
3316
3317         server_addr = self.pg0.remote_hosts[1].ip4
3318         host_in_port = random.randint(1025, 65535)
3319         server_in_port = random.randint(1025, 65535)
3320         server_out_port = random.randint(1025, 65535)
3321
3322         self.nat44_add_address(self.nat_addr)
3323         flags = self.config_flags.NAT44_EI_IF_INSIDE
3324         self.vapi.nat44_ei_interface_add_del_feature(
3325             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3326         )
3327         self.vapi.nat44_ei_interface_add_del_feature(
3328             sw_if_index=self.pg1.sw_if_index, is_add=1
3329         )
3330         # add static mapping for server
3331         self.nat44_add_static_mapping(
3332             server_addr,
3333             self.nat_addr,
3334             server_in_port,
3335             server_out_port,
3336             proto=IP_PROTOS.tcp,
3337         )
3338         self.nat44_add_static_mapping(
3339             server_addr,
3340             self.nat_addr,
3341             server_in_port,
3342             server_out_port,
3343             proto=IP_PROTOS.udp,
3344         )
3345         self.nat44_add_static_mapping(server_addr, self.nat_addr)
3346
3347         self.reass_hairpinning(
3348             server_addr,
3349             server_in_port,
3350             server_out_port,
3351             host_in_port,
3352             proto=IP_PROTOS.tcp,
3353         )
3354         self.reass_hairpinning(
3355             server_addr,
3356             server_in_port,
3357             server_out_port,
3358             host_in_port,
3359             proto=IP_PROTOS.udp,
3360         )
3361         self.reass_hairpinning(
3362             server_addr,
3363             server_in_port,
3364             server_out_port,
3365             host_in_port,
3366             proto=IP_PROTOS.icmp,
3367         )
3368
3369     def test_frag_out_of_order(self):
3370         """NAT44EI translate fragments arriving out of order"""
3371
3372         self.nat44_add_address(self.nat_addr)
3373         flags = self.config_flags.NAT44_EI_IF_INSIDE
3374         self.vapi.nat44_ei_interface_add_del_feature(
3375             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3376         )
3377         self.vapi.nat44_ei_interface_add_del_feature(
3378             sw_if_index=self.pg1.sw_if_index, is_add=1
3379         )
3380
3381         self.frag_out_of_order(proto=IP_PROTOS.tcp)
3382         self.frag_out_of_order(proto=IP_PROTOS.udp)
3383         self.frag_out_of_order(proto=IP_PROTOS.icmp)
3384
3385     def test_port_restricted(self):
3386         """NAT44EI Port restricted NAT44EI (MAP-E CE)"""
3387         self.nat44_add_address(self.nat_addr)
3388         flags = self.config_flags.NAT44_EI_IF_INSIDE
3389         self.vapi.nat44_ei_interface_add_del_feature(
3390             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3391         )
3392         self.vapi.nat44_ei_interface_add_del_feature(
3393             sw_if_index=self.pg1.sw_if_index, is_add=1
3394         )
3395         self.vapi.nat44_ei_set_addr_and_port_alloc_alg(
3396             alg=1, psid_offset=6, psid_length=6, psid=10
3397         )
3398
3399         p = (
3400             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3401             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3402             / TCP(sport=4567, dport=22)
3403         )
3404         self.pg0.add_stream(p)
3405         self.pg_enable_capture(self.pg_interfaces)
3406         self.pg_start()
3407         capture = self.pg1.get_capture(1)
3408         p = capture[0]
3409         try:
3410             ip = p[IP]
3411             tcp = p[TCP]
3412             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3413             self.assertEqual(ip.src, self.nat_addr)
3414             self.assertEqual(tcp.dport, 22)
3415             self.assertNotEqual(tcp.sport, 4567)
3416             self.assertEqual((tcp.sport >> 6) & 63, 10)
3417             self.assert_packet_checksums_valid(p)
3418         except:
3419             self.logger.error(ppp("Unexpected or invalid packet:", p))
3420             raise
3421
3422     def test_port_range(self):
3423         """NAT44EI External address port range"""
3424         self.nat44_add_address(self.nat_addr)
3425         flags = self.config_flags.NAT44_EI_IF_INSIDE
3426         self.vapi.nat44_ei_interface_add_del_feature(
3427             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3428         )
3429         self.vapi.nat44_ei_interface_add_del_feature(
3430             sw_if_index=self.pg1.sw_if_index, is_add=1
3431         )
3432         self.vapi.nat44_ei_set_addr_and_port_alloc_alg(
3433             alg=2, start_port=1025, end_port=1027
3434         )
3435
3436         pkts = []
3437         for port in range(0, 5):
3438             p = (
3439                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3440                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3441                 / TCP(sport=1125 + port)
3442             )
3443             pkts.append(p)
3444         self.pg0.add_stream(pkts)
3445         self.pg_enable_capture(self.pg_interfaces)
3446         self.pg_start()
3447         capture = self.pg1.get_capture(3)
3448         for p in capture:
3449             tcp = p[TCP]
3450             self.assertGreaterEqual(tcp.sport, 1025)
3451             self.assertLessEqual(tcp.sport, 1027)
3452
3453     def test_multiple_outside_vrf(self):
3454         """NAT44EI Multiple outside VRF"""
3455         vrf_id1 = 1
3456         vrf_id2 = 2
3457
3458         self.pg1.unconfig_ip4()
3459         self.pg2.unconfig_ip4()
3460         self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1})
3461         self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2})
3462         self.pg1.set_table_ip4(vrf_id1)
3463         self.pg2.set_table_ip4(vrf_id2)
3464         self.pg1.config_ip4()
3465         self.pg2.config_ip4()
3466         self.pg1.resolve_arp()
3467         self.pg2.resolve_arp()
3468
3469         self.nat44_add_address(self.nat_addr)
3470         flags = self.config_flags.NAT44_EI_IF_INSIDE
3471         self.vapi.nat44_ei_interface_add_del_feature(
3472             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3473         )
3474         self.vapi.nat44_ei_interface_add_del_feature(
3475             sw_if_index=self.pg1.sw_if_index, is_add=1
3476         )
3477         self.vapi.nat44_ei_interface_add_del_feature(
3478             sw_if_index=self.pg2.sw_if_index, is_add=1
3479         )
3480
3481         try:
3482             # first VRF
3483             pkts = self.create_stream_in(self.pg0, self.pg1)
3484             self.pg0.add_stream(pkts)
3485             self.pg_enable_capture(self.pg_interfaces)
3486             self.pg_start()
3487             capture = self.pg1.get_capture(len(pkts))
3488             self.verify_capture_out(capture, self.nat_addr)
3489
3490             pkts = self.create_stream_out(self.pg1, self.nat_addr)
3491             self.pg1.add_stream(pkts)
3492             self.pg_enable_capture(self.pg_interfaces)
3493             self.pg_start()
3494             capture = self.pg0.get_capture(len(pkts))
3495             self.verify_capture_in(capture, self.pg0)
3496
3497             self.tcp_port_in = 60303
3498             self.udp_port_in = 60304
3499             self.icmp_id_in = 60305
3500
3501             # second VRF
3502             pkts = self.create_stream_in(self.pg0, self.pg2)
3503             self.pg0.add_stream(pkts)
3504             self.pg_enable_capture(self.pg_interfaces)
3505             self.pg_start()
3506             capture = self.pg2.get_capture(len(pkts))
3507             self.verify_capture_out(capture, self.nat_addr)
3508
3509             pkts = self.create_stream_out(self.pg2, self.nat_addr)
3510             self.pg2.add_stream(pkts)
3511             self.pg_enable_capture(self.pg_interfaces)
3512             self.pg_start()
3513             capture = self.pg0.get_capture(len(pkts))
3514             self.verify_capture_in(capture, self.pg0)
3515
3516         finally:
3517             self.nat44_add_address(self.nat_addr, is_add=0)
3518             self.pg1.unconfig_ip4()
3519             self.pg2.unconfig_ip4()
3520             self.pg1.set_table_ip4(0)
3521             self.pg2.set_table_ip4(0)
3522             self.pg1.config_ip4()
3523             self.pg2.config_ip4()
3524             self.pg1.resolve_arp()
3525             self.pg2.resolve_arp()
3526
3527     def test_mss_clamping(self):
3528         """NAT44EI TCP MSS clamping"""
3529         self.nat44_add_address(self.nat_addr)
3530         flags = self.config_flags.NAT44_EI_IF_INSIDE
3531         self.vapi.nat44_ei_interface_add_del_feature(
3532             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3533         )
3534         self.vapi.nat44_ei_interface_add_del_feature(
3535             sw_if_index=self.pg1.sw_if_index, is_add=1
3536         )
3537
3538         p = (
3539             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3540             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3541             / TCP(
3542                 sport=self.tcp_port_in,
3543                 dport=self.tcp_external_port,
3544                 flags="S",
3545                 options=[("MSS", 1400)],
3546             )
3547         )
3548
3549         self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1000)
3550         self.pg0.add_stream(p)
3551         self.pg_enable_capture(self.pg_interfaces)
3552         self.pg_start()
3553         capture = self.pg1.get_capture(1)
3554         # Negotiated MSS value greater than configured - changed
3555         self.verify_mss_value(capture[0], 1000)
3556
3557         self.vapi.nat44_ei_set_mss_clamping(enable=0, mss_value=1500)
3558         self.pg0.add_stream(p)
3559         self.pg_enable_capture(self.pg_interfaces)
3560         self.pg_start()
3561         capture = self.pg1.get_capture(1)
3562         # MSS clamping disabled - negotiated MSS unchanged
3563         self.verify_mss_value(capture[0], 1400)
3564
3565         self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1500)
3566         self.pg0.add_stream(p)
3567         self.pg_enable_capture(self.pg_interfaces)
3568         self.pg_start()
3569         capture = self.pg1.get_capture(1)
3570         # Negotiated MSS value smaller than configured - unchanged
3571         self.verify_mss_value(capture[0], 1400)
3572
3573     def test_ha_send(self):
3574         """NAT44EI Send HA session synchronization events (active)"""
3575         flags = self.config_flags.NAT44_EI_IF_INSIDE
3576         self.vapi.nat44_ei_interface_add_del_feature(
3577             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3578         )
3579         self.vapi.nat44_ei_interface_add_del_feature(
3580             sw_if_index=self.pg1.sw_if_index, is_add=1
3581         )
3582         self.nat44_add_address(self.nat_addr)
3583
3584         self.vapi.nat44_ei_ha_set_listener(
3585             ip_address=self.pg3.local_ip4, port=12345, path_mtu=512
3586         )
3587         self.vapi.nat44_ei_ha_set_failover(
3588             ip_address=self.pg3.remote_ip4, port=12346, session_refresh_interval=10
3589         )
3590         bind_layers(UDP, HANATStateSync, sport=12345)
3591
3592         # create sessions
3593         pkts = self.create_stream_in(self.pg0, self.pg1)
3594         self.pg0.add_stream(pkts)
3595         self.pg_enable_capture(self.pg_interfaces)
3596         self.pg_start()
3597         capture = self.pg1.get_capture(len(pkts))
3598         self.verify_capture_out(capture)
3599         # active send HA events
3600         self.vapi.nat44_ei_ha_flush()
3601         stats = self.statistics["/nat44-ei/ha/add-event-send"]
3602         self.assertEqual(stats[:, 0].sum(), 3)
3603         capture = self.pg3.get_capture(1)
3604         p = capture[0]
3605         self.assert_packet_checksums_valid(p)
3606         try:
3607             ip = p[IP]
3608             udp = p[UDP]
3609             hanat = p[HANATStateSync]
3610         except IndexError:
3611             self.logger.error(ppp("Invalid packet:", p))
3612             raise
3613         else:
3614             self.assertEqual(ip.src, self.pg3.local_ip4)
3615             self.assertEqual(ip.dst, self.pg3.remote_ip4)
3616             self.assertEqual(udp.sport, 12345)
3617             self.assertEqual(udp.dport, 12346)
3618             self.assertEqual(hanat.version, 1)
3619             # self.assertEqual(hanat.thread_index, 0)
3620             self.assertEqual(hanat.count, 3)
3621             seq = hanat.sequence_number
3622             for event in hanat.events:
3623                 self.assertEqual(event.event_type, 1)
3624                 self.assertEqual(event.in_addr, self.pg0.remote_ip4)
3625                 self.assertEqual(event.out_addr, self.nat_addr)
3626                 self.assertEqual(event.fib_index, 0)
3627
3628         # ACK received events
3629         ack = (
3630             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3631             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3632             / UDP(sport=12346, dport=12345)
3633             / HANATStateSync(
3634                 sequence_number=seq, flags="ACK", thread_index=hanat.thread_index
3635             )
3636         )
3637         self.pg3.add_stream(ack)
3638         self.pg_start()
3639         stats = self.statistics["/nat44-ei/ha/ack-recv"]
3640         self.assertEqual(stats[:, 0].sum(), 1)
3641
3642         # delete one session
3643         self.pg_enable_capture(self.pg_interfaces)
3644         self.vapi.nat44_ei_del_session(
3645             address=self.pg0.remote_ip4,
3646             port=self.tcp_port_in,
3647             protocol=IP_PROTOS.tcp,
3648             flags=self.config_flags.NAT44_EI_IF_INSIDE,
3649         )
3650         self.vapi.nat44_ei_ha_flush()
3651         stats = self.statistics["/nat44-ei/ha/del-event-send"]
3652         self.assertEqual(stats[:, 0].sum(), 1)
3653         capture = self.pg3.get_capture(1)
3654         p = capture[0]
3655         try:
3656             hanat = p[HANATStateSync]
3657         except IndexError:
3658             self.logger.error(ppp("Invalid packet:", p))
3659             raise
3660         else:
3661             self.assertGreater(hanat.sequence_number, seq)
3662
3663         # do not send ACK, active retry send HA event again
3664         self.pg_enable_capture(self.pg_interfaces)
3665         self.virtual_sleep(12)
3666         stats = self.statistics["/nat44-ei/ha/retry-count"]
3667         self.assertEqual(stats[:, 0].sum(), 3)
3668         stats = self.statistics["/nat44-ei/ha/missed-count"]
3669         self.assertEqual(stats[:, 0].sum(), 1)
3670         capture = self.pg3.get_capture(3)
3671         for packet in capture:
3672             self.assertEqual(packet, p)
3673
3674         # session counters refresh
3675         pkts = self.create_stream_out(self.pg1)
3676         self.pg1.add_stream(pkts)
3677         self.pg_enable_capture(self.pg_interfaces)
3678         self.pg_start()
3679         self.pg0.get_capture(2)
3680         self.vapi.nat44_ei_ha_flush()
3681         stats = self.statistics["/nat44-ei/ha/refresh-event-send"]
3682         self.assertEqual(stats[:, 0].sum(), 2)
3683         capture = self.pg3.get_capture(1)
3684         p = capture[0]
3685         self.assert_packet_checksums_valid(p)
3686         try:
3687             ip = p[IP]
3688             udp = p[UDP]
3689             hanat = p[HANATStateSync]
3690         except IndexError:
3691             self.logger.error(ppp("Invalid packet:", p))
3692             raise
3693         else:
3694             self.assertEqual(ip.src, self.pg3.local_ip4)
3695             self.assertEqual(ip.dst, self.pg3.remote_ip4)
3696             self.assertEqual(udp.sport, 12345)
3697             self.assertEqual(udp.dport, 12346)
3698             self.assertEqual(hanat.version, 1)
3699             self.assertEqual(hanat.count, 2)
3700             seq = hanat.sequence_number
3701             for event in hanat.events:
3702                 self.assertEqual(event.event_type, 3)
3703                 self.assertEqual(event.out_addr, self.nat_addr)
3704                 self.assertEqual(event.fib_index, 0)
3705                 self.assertEqual(event.total_pkts, 2)
3706                 self.assertGreater(event.total_bytes, 0)
3707
3708         stats = self.statistics["/nat44-ei/ha/ack-recv"]
3709         ack = (
3710             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3711             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3712             / UDP(sport=12346, dport=12345)
3713             / HANATStateSync(
3714                 sequence_number=seq, flags="ACK", thread_index=hanat.thread_index
3715             )
3716         )
3717         self.pg3.add_stream(ack)
3718         self.pg_start()
3719         stats = self.statistics["/nat44-ei/ha/ack-recv"]
3720         self.assertEqual(stats[:, 0].sum(), 2)
3721
3722     def test_ha_recv(self):
3723         """NAT44EI Receive HA session synchronization events (passive)"""
3724         self.nat44_add_address(self.nat_addr)
3725         flags = self.config_flags.NAT44_EI_IF_INSIDE
3726         self.vapi.nat44_ei_interface_add_del_feature(
3727             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3728         )
3729         self.vapi.nat44_ei_interface_add_del_feature(
3730             sw_if_index=self.pg1.sw_if_index, is_add=1
3731         )
3732         self.vapi.nat44_ei_ha_set_listener(
3733             ip_address=self.pg3.local_ip4, port=12345, path_mtu=512
3734         )
3735         bind_layers(UDP, HANATStateSync, sport=12345)
3736
3737         # this is a bit tricky - HA dictates thread index due to how it's
3738         # designed, but once we use HA to create a session, we also want
3739         # to pass a packet through said session. so the session must end
3740         # up on the correct thread from both directions - in2out (based on
3741         # IP address) and out2in (based on outside port)
3742
3743         # first choose a thread index which is correct for IP
3744         thread_index = get_nat44_ei_in2out_worker_index(
3745             self.pg0.remote_ip4, self.vpp_worker_count
3746         )
3747
3748         # now pick a port which is correct for given thread
3749         port_per_thread = int((0xFFFF - 1024) / max(1, self.vpp_worker_count))
3750         self.tcp_port_out = 1024 + random.randint(1, port_per_thread)
3751         self.udp_port_out = 1024 + random.randint(1, port_per_thread)
3752         if self.vpp_worker_count > 0:
3753             self.tcp_port_out += port_per_thread * (thread_index - 1)
3754             self.udp_port_out += port_per_thread * (thread_index - 1)
3755
3756         # send HA session add events to failover/passive
3757         p = (
3758             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3759             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3760             / UDP(sport=12346, dport=12345)
3761             / HANATStateSync(
3762                 sequence_number=1,
3763                 events=[
3764                     Event(
3765                         event_type="add",
3766                         protocol="tcp",
3767                         in_addr=self.pg0.remote_ip4,
3768                         out_addr=self.nat_addr,
3769                         in_port=self.tcp_port_in,
3770                         out_port=self.tcp_port_out,
3771                         eh_addr=self.pg1.remote_ip4,
3772                         ehn_addr=self.pg1.remote_ip4,
3773                         eh_port=self.tcp_external_port,
3774                         ehn_port=self.tcp_external_port,
3775                         fib_index=0,
3776                     ),
3777                     Event(
3778                         event_type="add",
3779                         protocol="udp",
3780                         in_addr=self.pg0.remote_ip4,
3781                         out_addr=self.nat_addr,
3782                         in_port=self.udp_port_in,
3783                         out_port=self.udp_port_out,
3784                         eh_addr=self.pg1.remote_ip4,
3785                         ehn_addr=self.pg1.remote_ip4,
3786                         eh_port=self.udp_external_port,
3787                         ehn_port=self.udp_external_port,
3788                         fib_index=0,
3789                     ),
3790                 ],
3791                 thread_index=thread_index,
3792             )
3793         )
3794
3795         self.pg3.add_stream(p)
3796         self.pg_enable_capture(self.pg_interfaces)
3797         self.pg_start()
3798         # receive ACK
3799         capture = self.pg3.get_capture(1)
3800         p = capture[0]
3801         try:
3802             hanat = p[HANATStateSync]
3803         except IndexError:
3804             self.logger.error(ppp("Invalid packet:", p))
3805             raise
3806         else:
3807             self.assertEqual(hanat.sequence_number, 1)
3808             self.assertEqual(hanat.flags, "ACK")
3809             self.assertEqual(hanat.version, 1)
3810             self.assertEqual(hanat.thread_index, thread_index)
3811         stats = self.statistics["/nat44-ei/ha/ack-send"]
3812         self.assertEqual(stats[:, 0].sum(), 1)
3813         stats = self.statistics["/nat44-ei/ha/add-event-recv"]
3814         self.assertEqual(stats[:, 0].sum(), 2)
3815         users = self.statistics["/nat44-ei/total-users"]
3816         self.assertEqual(users[:, 0].sum(), 1)
3817         sessions = self.statistics["/nat44-ei/total-sessions"]
3818         self.assertEqual(sessions[:, 0].sum(), 2)
3819         users = self.vapi.nat44_ei_user_dump()
3820         self.assertEqual(len(users), 1)
3821         self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3822         # there should be 2 sessions created by HA
3823         sessions = self.vapi.nat44_ei_user_session_dump(
3824             users[0].ip_address, users[0].vrf_id
3825         )
3826         self.assertEqual(len(sessions), 2)
3827         for session in sessions:
3828             self.assertEqual(str(session.inside_ip_address), self.pg0.remote_ip4)
3829             self.assertEqual(str(session.outside_ip_address), self.nat_addr)
3830             self.assertIn(session.inside_port, [self.tcp_port_in, self.udp_port_in])
3831             self.assertIn(session.outside_port, [self.tcp_port_out, self.udp_port_out])
3832             self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp])
3833
3834         # send HA session delete event to failover/passive
3835         p = (
3836             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3837             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3838             / UDP(sport=12346, dport=12345)
3839             / HANATStateSync(
3840                 sequence_number=2,
3841                 events=[
3842                     Event(
3843                         event_type="del",
3844                         protocol="udp",
3845                         in_addr=self.pg0.remote_ip4,
3846                         out_addr=self.nat_addr,
3847                         in_port=self.udp_port_in,
3848                         out_port=self.udp_port_out,
3849                         eh_addr=self.pg1.remote_ip4,
3850                         ehn_addr=self.pg1.remote_ip4,
3851                         eh_port=self.udp_external_port,
3852                         ehn_port=self.udp_external_port,
3853                         fib_index=0,
3854                     )
3855                 ],
3856                 thread_index=thread_index,
3857             )
3858         )
3859
3860         self.pg3.add_stream(p)
3861         self.pg_enable_capture(self.pg_interfaces)
3862         self.pg_start()
3863         # receive ACK
3864         capture = self.pg3.get_capture(1)
3865         p = capture[0]
3866         try:
3867             hanat = p[HANATStateSync]
3868         except IndexError:
3869             self.logger.error(ppp("Invalid packet:", p))
3870             raise
3871         else:
3872             self.assertEqual(hanat.sequence_number, 2)
3873             self.assertEqual(hanat.flags, "ACK")
3874             self.assertEqual(hanat.version, 1)
3875         users = self.vapi.nat44_ei_user_dump()
3876         self.assertEqual(len(users), 1)
3877         self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3878         # now we should have only 1 session, 1 deleted by HA
3879         sessions = self.vapi.nat44_ei_user_session_dump(
3880             users[0].ip_address, users[0].vrf_id
3881         )
3882         self.assertEqual(len(sessions), 1)
3883         stats = self.statistics["/nat44-ei/ha/del-event-recv"]
3884         self.assertEqual(stats[:, 0].sum(), 1)
3885
3886         stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed")
3887         self.assertEqual(stats, 2)
3888
3889         # send HA session refresh event to failover/passive
3890         p = (
3891             Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3892             / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3893             / UDP(sport=12346, dport=12345)
3894             / HANATStateSync(
3895                 sequence_number=3,
3896                 events=[
3897                     Event(
3898                         event_type="refresh",
3899                         protocol="tcp",
3900                         in_addr=self.pg0.remote_ip4,
3901                         out_addr=self.nat_addr,
3902                         in_port=self.tcp_port_in,
3903                         out_port=self.tcp_port_out,
3904                         eh_addr=self.pg1.remote_ip4,
3905                         ehn_addr=self.pg1.remote_ip4,
3906                         eh_port=self.tcp_external_port,
3907                         ehn_port=self.tcp_external_port,
3908                         fib_index=0,
3909                         total_bytes=1024,
3910                         total_pkts=2,
3911                     )
3912                 ],
3913                 thread_index=thread_index,
3914             )
3915         )
3916         self.pg3.add_stream(p)
3917         self.pg_enable_capture(self.pg_interfaces)
3918         self.pg_start()
3919         # receive ACK
3920         capture = self.pg3.get_capture(1)
3921         p = capture[0]
3922         try:
3923             hanat = p[HANATStateSync]
3924         except IndexError:
3925             self.logger.error(ppp("Invalid packet:", p))
3926             raise
3927         else:
3928             self.assertEqual(hanat.sequence_number, 3)
3929             self.assertEqual(hanat.flags, "ACK")
3930             self.assertEqual(hanat.version, 1)
3931         users = self.vapi.nat44_ei_user_dump()
3932         self.assertEqual(len(users), 1)
3933         self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3934         sessions = self.vapi.nat44_ei_user_session_dump(
3935             users[0].ip_address, users[0].vrf_id
3936         )
3937         self.assertEqual(len(sessions), 1)
3938         session = sessions[0]
3939         self.assertEqual(session.total_bytes, 1024)
3940         self.assertEqual(session.total_pkts, 2)
3941         stats = self.statistics["/nat44-ei/ha/refresh-event-recv"]
3942         self.assertEqual(stats[:, 0].sum(), 1)
3943
3944         stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed")
3945         self.assertEqual(stats, 3)
3946
3947         # send packet to test session created by HA
3948         p = (
3949             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3950             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3951             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out)
3952         )
3953         self.pg1.add_stream(p)
3954         self.pg_enable_capture(self.pg_interfaces)
3955         self.pg_start()
3956         capture = self.pg0.get_capture(1)
3957         p = capture[0]
3958         try:
3959             ip = p[IP]
3960             tcp = p[TCP]
3961         except IndexError:
3962             self.logger.error(ppp("Invalid packet:", p))
3963             raise
3964         else:
3965             self.assertEqual(ip.src, self.pg1.remote_ip4)
3966             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3967             self.assertEqual(tcp.sport, self.tcp_external_port)
3968             self.assertEqual(tcp.dport, self.tcp_port_in)
3969
3970     def reconfigure_frame_queue_nelts(self, frame_queue_nelts):
3971         self.vapi.nat44_ei_plugin_enable_disable(enable=0)
3972         self.vapi.nat44_ei_set_fq_options(frame_queue_nelts=frame_queue_nelts)
3973         # keep plugin configuration persistent
3974         self.plugin_enable()
3975         return self.vapi.nat44_ei_show_fq_options().frame_queue_nelts
3976
3977     def test_set_frame_queue_nelts(self):
3978         """NAT44EI API test - worker handoff frame queue elements"""
3979         self.assertEqual(self.reconfigure_frame_queue_nelts(512), 512)
3980
3981     def show_commands_at_teardown(self):
3982         self.logger.info(self.vapi.cli("show nat44 ei timeouts"))
3983         self.logger.info(self.vapi.cli("show nat44 ei addresses"))
3984         self.logger.info(self.vapi.cli("show nat44 ei interfaces"))
3985         self.logger.info(self.vapi.cli("show nat44 ei static mappings"))
3986         self.logger.info(self.vapi.cli("show nat44 ei interface address"))
3987         self.logger.info(self.vapi.cli("show nat44 ei sessions detail"))
3988         self.logger.info(self.vapi.cli("show nat44 ei hash tables detail"))
3989         self.logger.info(self.vapi.cli("show nat44 ei ha"))
3990         self.logger.info(self.vapi.cli("show nat44 ei addr-port-assignment-alg"))
3991
3992     def test_outside_address_distribution(self):
3993         """Outside address distribution based on source address"""
3994
3995         x = 100
3996         nat_addresses = []
3997
3998         for i in range(1, x):
3999             a = "10.0.0.%d" % i
4000             nat_addresses.append(a)
4001
4002         flags = self.config_flags.NAT44_EI_IF_INSIDE
4003         self.vapi.nat44_ei_interface_add_del_feature(
4004             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4005         )
4006         self.vapi.nat44_ei_interface_add_del_feature(
4007             sw_if_index=self.pg1.sw_if_index, is_add=1
4008         )
4009
4010         self.vapi.nat44_ei_add_del_address_range(
4011             first_ip_address=nat_addresses[0],
4012             last_ip_address=nat_addresses[-1],
4013             vrf_id=0xFFFFFFFF,
4014             is_add=1,
4015         )
4016
4017         self.pg0.generate_remote_hosts(x)
4018
4019         pkts = []
4020         for i in range(x):
4021             info = self.create_packet_info(self.pg0, self.pg1)
4022             payload = self.info_to_payload(info)
4023             p = (
4024                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4025                 / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4)
4026                 / UDP(sport=7000 + i, dport=8000 + i)
4027                 / Raw(payload)
4028             )
4029             info.data = p
4030             pkts.append(p)
4031
4032         self.pg0.add_stream(pkts)
4033         self.pg_enable_capture(self.pg_interfaces)
4034         self.pg_start()
4035         recvd = self.pg1.get_capture(len(pkts))
4036         for p_recvd in recvd:
4037             payload_info = self.payload_to_info(p_recvd[Raw])
4038             packet_index = payload_info.index
4039             info = self._packet_infos[packet_index]
4040             self.assertTrue(info is not None)
4041             self.assertEqual(packet_index, info.index)
4042             p_sent = info.data
4043             packed = socket.inet_aton(p_sent[IP].src)
4044             numeric = struct.unpack("!L", packed)[0]
4045             numeric = socket.htonl(numeric)
4046             a = nat_addresses[(numeric - 1) % len(nat_addresses)]
4047             self.assertEqual(
4048                 a,
4049                 p_recvd[IP].src,
4050                 "Invalid packet (src IP %s translated to %s, but expected %s)"
4051                 % (p_sent[IP].src, p_recvd[IP].src, a),
4052             )
4053
4054     def test_default_user_sessions(self):
4055         """NAT44EI default per-user session limit is used and reported"""
4056         nat44_ei_config = self.vapi.nat44_ei_show_running_config()
4057         # a nonzero default should be reported for user_sessions
4058         self.assertNotEqual(nat44_ei_config.user_sessions, 0)
4059
4060
4061 class TestNAT44Out2InDPO(MethodHolder):
4062     """NAT44EI Test Cases using out2in DPO"""
4063
4064     @classmethod
4065     def setUpClass(cls):
4066         super(TestNAT44Out2InDPO, cls).setUpClass()
4067         cls.vapi.cli("set log class nat44-ei level debug")
4068
4069         cls.tcp_port_in = 6303
4070         cls.tcp_port_out = 6303
4071         cls.udp_port_in = 6304
4072         cls.udp_port_out = 6304
4073         cls.icmp_id_in = 6305
4074         cls.icmp_id_out = 6305
4075         cls.nat_addr = "10.0.0.3"
4076         cls.dst_ip4 = "192.168.70.1"
4077
4078         cls.create_pg_interfaces(range(2))
4079
4080         cls.pg0.admin_up()
4081         cls.pg0.config_ip4()
4082         cls.pg0.resolve_arp()
4083
4084         cls.pg1.admin_up()
4085         cls.pg1.config_ip6()
4086         cls.pg1.resolve_ndp()
4087
4088         r1 = VppIpRoute(
4089             cls,
4090             "::",
4091             0,
4092             [VppRoutePath(cls.pg1.remote_ip6, cls.pg1.sw_if_index)],
4093             register=False,
4094         )
4095         r1.add_vpp_config()
4096
4097     def setUp(self):
4098         super(TestNAT44Out2InDPO, self).setUp()
4099         flags = self.config_flags.NAT44_EI_OUT2IN_DPO
4100         self.vapi.nat44_ei_plugin_enable_disable(enable=1, flags=flags)
4101
4102     def tearDown(self):
4103         super(TestNAT44Out2InDPO, self).tearDown()
4104         if not self.vpp_dead:
4105             self.vapi.nat44_ei_plugin_enable_disable(enable=0)
4106             self.vapi.cli("clear logging")
4107
4108     def configure_xlat(self):
4109         self.dst_ip6_pfx = "1:2:3::"
4110         self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.dst_ip6_pfx)
4111         self.dst_ip6_pfx_len = 96
4112         self.src_ip6_pfx = "4:5:6::"
4113         self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.src_ip6_pfx)
4114         self.src_ip6_pfx_len = 96
4115         self.vapi.map_add_domain(
4116             self.dst_ip6_pfx_n,
4117             self.dst_ip6_pfx_len,
4118             self.src_ip6_pfx_n,
4119             self.src_ip6_pfx_len,
4120             "\x00\x00\x00\x00",
4121             0,
4122         )
4123
4124     @unittest.skip("Temporary disabled")
4125     def test_464xlat_ce(self):
4126         """Test 464XLAT CE with NAT44EI"""
4127
4128         self.configure_xlat()
4129
4130         flags = self.config_flags.NAT44_EI_IF_INSIDE
4131         self.vapi.nat44_ei_interface_add_del_feature(
4132             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4133         )
4134         self.vapi.nat44_ei_add_del_address_range(
4135             first_ip_address=self.nat_addr_n,
4136             last_ip_address=self.nat_addr_n,
4137             vrf_id=0xFFFFFFFF,
4138             is_add=1,
4139         )
4140
4141         out_src_ip6 = self.compose_ip6(
4142             self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len
4143         )
4144         out_dst_ip6 = self.compose_ip6(
4145             self.nat_addr, self.src_ip6_pfx, self.src_ip6_pfx_len
4146         )
4147
4148         try:
4149             pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4150             self.pg0.add_stream(pkts)
4151             self.pg_enable_capture(self.pg_interfaces)
4152             self.pg_start()
4153             capture = self.pg1.get_capture(len(pkts))
4154             self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6, dst_ip=out_src_ip6)
4155
4156             pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4157             self.pg1.add_stream(pkts)
4158             self.pg_enable_capture(self.pg_interfaces)
4159             self.pg_start()
4160             capture = self.pg0.get_capture(len(pkts))
4161             self.verify_capture_in(capture, self.pg0)
4162         finally:
4163             self.vapi.nat44_ei_interface_add_del_feature(
4164                 sw_if_index=self.pg0.sw_if_index, flags=flags
4165             )
4166             self.vapi.nat44_ei_add_del_address_range(
4167                 first_ip_address=self.nat_addr_n,
4168                 last_ip_address=self.nat_addr_n,
4169                 vrf_id=0xFFFFFFFF,
4170             )
4171
4172     @unittest.skip("Temporary disabled")
4173     def test_464xlat_ce_no_nat(self):
4174         """Test 464XLAT CE without NAT44EI"""
4175
4176         self.configure_xlat()
4177
4178         out_src_ip6 = self.compose_ip6(
4179             self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len
4180         )
4181         out_dst_ip6 = self.compose_ip6(
4182             self.pg0.remote_ip4, self.src_ip6_pfx, self.src_ip6_pfx_len
4183         )
4184
4185         pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4186         self.pg0.add_stream(pkts)
4187         self.pg_enable_capture(self.pg_interfaces)
4188         self.pg_start()
4189         capture = self.pg1.get_capture(len(pkts))
4190         self.verify_capture_out_ip6(
4191             capture, dst_ip=out_src_ip6, nat_ip=out_dst_ip6, same_port=True
4192         )
4193
4194         pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4195         self.pg1.add_stream(pkts)
4196         self.pg_enable_capture(self.pg_interfaces)
4197         self.pg_start()
4198         capture = self.pg0.get_capture(len(pkts))
4199         self.verify_capture_in(capture, self.pg0)
4200
4201
4202 class TestNAT44EIMW(MethodHolder):
4203     """NAT44EI Test Cases (multiple workers)"""
4204
4205     vpp_worker_count = 2
4206     max_translations = 10240
4207     max_users = 10240
4208
4209     @classmethod
4210     def setUpClass(cls):
4211         super(TestNAT44EIMW, cls).setUpClass()
4212         cls.vapi.cli("set log class nat level debug")
4213
4214         cls.tcp_port_in = 6303
4215         cls.tcp_port_out = 6303
4216         cls.udp_port_in = 6304
4217         cls.udp_port_out = 6304
4218         cls.icmp_id_in = 6305
4219         cls.icmp_id_out = 6305
4220         cls.nat_addr = "10.0.0.3"
4221         cls.ipfix_src_port = 4739
4222         cls.ipfix_domain_id = 1
4223         cls.tcp_external_port = 80
4224         cls.udp_external_port = 69
4225
4226         cls.create_pg_interfaces(range(10))
4227         cls.interfaces = list(cls.pg_interfaces[0:4])
4228
4229         for i in cls.interfaces:
4230             i.admin_up()
4231             i.config_ip4()
4232             i.resolve_arp()
4233
4234         cls.pg0.generate_remote_hosts(3)
4235         cls.pg0.configure_ipv4_neighbors()
4236
4237         cls.pg1.generate_remote_hosts(1)
4238         cls.pg1.configure_ipv4_neighbors()
4239
4240         cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
4241         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10})
4242         cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20})
4243
4244         cls.pg4._local_ip4 = "172.16.255.1"
4245         cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
4246         cls.pg4.set_table_ip4(10)
4247         cls.pg5._local_ip4 = "172.17.255.3"
4248         cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
4249         cls.pg5.set_table_ip4(10)
4250         cls.pg6._local_ip4 = "172.16.255.1"
4251         cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
4252         cls.pg6.set_table_ip4(20)
4253         for i in cls.overlapping_interfaces:
4254             i.config_ip4()
4255             i.admin_up()
4256             i.resolve_arp()
4257
4258         cls.pg7.admin_up()
4259         cls.pg8.admin_up()
4260
4261         cls.pg9.generate_remote_hosts(2)
4262         cls.pg9.config_ip4()
4263         cls.vapi.sw_interface_add_del_address(
4264             sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24"
4265         )
4266
4267         cls.pg9.admin_up()
4268         cls.pg9.resolve_arp()
4269         cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
4270         cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
4271         cls.pg9.resolve_arp()
4272
4273     def setUp(self):
4274         super(TestNAT44EIMW, self).setUp()
4275         self.vapi.nat44_ei_plugin_enable_disable(
4276             sessions=self.max_translations, users=self.max_users, enable=1
4277         )
4278
4279     def tearDown(self):
4280         super(TestNAT44EIMW, self).tearDown()
4281         if not self.vpp_dead:
4282             self.vapi.nat44_ei_ipfix_enable_disable(
4283                 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
4284             )
4285             self.ipfix_src_port = 4739
4286             self.ipfix_domain_id = 1
4287
4288             self.vapi.nat44_ei_plugin_enable_disable(enable=0)
4289             self.vapi.cli("clear logging")
4290
4291     def test_hairpinning(self):
4292         """NAT44EI hairpinning - 1:1 NAPT"""
4293
4294         host = self.pg0.remote_hosts[0]
4295         server = self.pg0.remote_hosts[1]
4296         host_in_port = 1234
4297         host_out_port = 0
4298         server_in_port = 5678
4299         server_out_port = 8765
4300         worker_1 = 1
4301         worker_2 = 2
4302
4303         self.nat44_add_address(self.nat_addr)
4304         flags = self.config_flags.NAT44_EI_IF_INSIDE
4305         self.vapi.nat44_ei_interface_add_del_feature(
4306             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4307         )
4308         self.vapi.nat44_ei_interface_add_del_feature(
4309             sw_if_index=self.pg1.sw_if_index, is_add=1
4310         )
4311
4312         # add static mapping for server
4313         self.nat44_add_static_mapping(
4314             server.ip4,
4315             self.nat_addr,
4316             server_in_port,
4317             server_out_port,
4318             proto=IP_PROTOS.tcp,
4319         )
4320
4321         cnt = self.statistics["/nat44-ei/hairpinning"]
4322         # send packet from host to server
4323         p = (
4324             Ether(src=host.mac, dst=self.pg0.local_mac)
4325             / IP(src=host.ip4, dst=self.nat_addr)
4326             / TCP(sport=host_in_port, dport=server_out_port)
4327         )
4328         self.pg0.add_stream(p)
4329         self.pg_enable_capture(self.pg_interfaces)
4330         self.pg_start()
4331         capture = self.pg0.get_capture(1)
4332         p = capture[0]
4333         try:
4334             ip = p[IP]
4335             tcp = p[TCP]
4336             self.assertEqual(ip.src, self.nat_addr)
4337             self.assertEqual(ip.dst, server.ip4)
4338             self.assertNotEqual(tcp.sport, host_in_port)
4339             self.assertEqual(tcp.dport, server_in_port)
4340             self.assert_packet_checksums_valid(p)
4341             host_out_port = tcp.sport
4342         except:
4343             self.logger.error(ppp("Unexpected or invalid packet:", p))
4344             raise
4345
4346         after = self.statistics["/nat44-ei/hairpinning"]
4347
4348         if_idx = self.pg0.sw_if_index
4349         self.assertEqual(after[worker_2][if_idx] - cnt[worker_1][if_idx], 1)
4350
4351         # send reply from server to host
4352         p = (
4353             Ether(src=server.mac, dst=self.pg0.local_mac)
4354             / IP(src=server.ip4, dst=self.nat_addr)
4355             / TCP(sport=server_in_port, dport=host_out_port)
4356         )
4357         self.pg0.add_stream(p)
4358         self.pg_enable_capture(self.pg_interfaces)
4359         self.pg_start()
4360         capture = self.pg0.get_capture(1)
4361         p = capture[0]
4362         try:
4363             ip = p[IP]
4364             tcp = p[TCP]
4365             self.assertEqual(ip.src, self.nat_addr)
4366             self.assertEqual(ip.dst, host.ip4)
4367             self.assertEqual(tcp.sport, server_out_port)
4368             self.assertEqual(tcp.dport, host_in_port)
4369             self.assert_packet_checksums_valid(p)
4370         except:
4371             self.logger.error(ppp("Unexpected or invalid packet:", p))
4372             raise
4373
4374         after = self.statistics["/nat44-ei/hairpinning"]
4375         if_idx = self.pg0.sw_if_index
4376         self.assertEqual(after[worker_1][if_idx] - cnt[worker_1][if_idx], 1)
4377         self.assertEqual(after[worker_2][if_idx] - cnt[worker_2][if_idx], 2)
4378
4379     def test_hairpinning2(self):
4380         """NAT44EI hairpinning - 1:1 NAT"""
4381
4382         server1_nat_ip = "10.0.0.10"
4383         server2_nat_ip = "10.0.0.11"
4384         host = self.pg0.remote_hosts[0]
4385         server1 = self.pg0.remote_hosts[1]
4386         server2 = self.pg0.remote_hosts[2]
4387         server_tcp_port = 22
4388         server_udp_port = 20
4389
4390         self.nat44_add_address(self.nat_addr)
4391         flags = self.config_flags.NAT44_EI_IF_INSIDE
4392         self.vapi.nat44_ei_interface_add_del_feature(
4393             sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4394         )
4395         self.vapi.nat44_ei_interface_add_del_feature(
4396             sw_if_index=self.pg1.sw_if_index, is_add=1
4397         )
4398
4399         # add static mapping for servers
4400         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
4401         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
4402
4403         # host to server1
4404         pkts = []
4405         p = (
4406             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4407             / IP(src=host.ip4, dst=server1_nat_ip)
4408             / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
4409         )
4410         pkts.append(p)
4411         p = (
4412             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4413             / IP(src=host.ip4, dst=server1_nat_ip)
4414             / UDP(sport=self.udp_port_in, dport=server_udp_port)
4415         )
4416         pkts.append(p)
4417         p = (
4418             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4419             / IP(src=host.ip4, dst=server1_nat_ip)
4420             / ICMP(id=self.icmp_id_in, type="echo-request")
4421         )
4422         pkts.append(p)
4423         self.pg0.add_stream(pkts)
4424         self.pg_enable_capture(self.pg_interfaces)
4425         self.pg_start()
4426         capture = self.pg0.get_capture(len(pkts))
4427         for packet in capture:
4428             try:
4429                 self.assertEqual(packet[IP].src, self.nat_addr)
4430                 self.assertEqual(packet[IP].dst, server1.ip4)
4431                 if packet.haslayer(TCP):
4432                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
4433                     self.assertEqual(packet[TCP].dport, server_tcp_port)
4434                     self.tcp_port_out = packet[TCP].sport
4435                     self.assert_packet_checksums_valid(packet)
4436                 elif packet.haslayer(UDP):
4437                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
4438                     self.assertEqual(packet[UDP].dport, server_udp_port)
4439                     self.udp_port_out = packet[UDP].sport
4440                 else:
4441                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
4442                     self.icmp_id_out = packet[ICMP].id
4443             except:
4444                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4445                 raise
4446
4447         # server1 to host
4448         pkts = []
4449         p = (
4450             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4451             / IP(src=server1.ip4, dst=self.nat_addr)
4452             / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
4453         )
4454         pkts.append(p)
4455         p = (
4456             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4457             / IP(src=server1.ip4, dst=self.nat_addr)
4458             / UDP(sport=server_udp_port, dport=self.udp_port_out)
4459         )
4460         pkts.append(p)
4461         p = (
4462             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4463             / IP(src=server1.ip4, dst=self.nat_addr)
4464             / ICMP(id=self.icmp_id_out, type="echo-reply")
4465         )
4466         pkts.append(p)
4467         self.pg0.add_stream(pkts)
4468         self.pg_enable_capture(self.pg_interfaces)
4469         self.pg_start()
4470         capture = self.pg0.get_capture(len(pkts))
4471         for packet in capture:
4472             try:
4473                 self.assertEqual(packet[IP].src, server1_nat_ip)
4474                 self.assertEqual(packet[IP].dst, host.ip4)
4475                 if packet.haslayer(TCP):
4476                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
4477                     self.assertEqual(packet[TCP].sport, server_tcp_port)
4478                     self.assert_packet_checksums_valid(packet)
4479                 elif packet.haslayer(UDP):
4480                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
4481                     self.assertEqual(packet[UDP].sport, server_udp_port)
4482                 else:
4483                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4484             except:
4485                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4486                 raise
4487
4488         # server2 to server1
4489         pkts = []
4490         p = (
4491             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4492             / IP(src=server2.ip4, dst=server1_nat_ip)
4493             / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
4494         )
4495         pkts.append(p)
4496         p = (
4497             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4498             / IP(src=server2.ip4, dst=server1_nat_ip)
4499             / UDP(sport=self.udp_port_in, dport=server_udp_port)
4500         )
4501         pkts.append(p)
4502         p = (
4503             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4504             / IP(src=server2.ip4, dst=server1_nat_ip)
4505             / ICMP(id=self.icmp_id_in, type="echo-request")
4506         )
4507         pkts.append(p)
4508         self.pg0.add_stream(pkts)
4509         self.pg_enable_capture(self.pg_interfaces)
4510         self.pg_start()
4511         capture = self.pg0.get_capture(len(pkts))
4512         for packet in capture:
4513             try:
4514                 self.assertEqual(packet[IP].src, server2_nat_ip)
4515                 self.assertEqual(packet[IP].dst, server1.ip4)
4516                 if packet.haslayer(TCP):
4517                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
4518                     self.assertEqual(packet[TCP].dport, server_tcp_port)
4519                     self.tcp_port_out = packet[TCP].sport
4520                     self.assert_packet_checksums_valid(packet)
4521                 elif packet.haslayer(UDP):
4522                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
4523                     self.assertEqual(packet[UDP].dport, server_udp_port)
4524                     self.udp_port_out = packet[UDP].sport
4525                 else:
4526                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4527                     self.icmp_id_out = packet[ICMP].id
4528             except:
4529                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4530                 raise
4531
4532         # server1 to server2
4533         pkts = []
4534         p = (
4535             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4536             / IP(src=server1.ip4, dst=server2_nat_ip)
4537             / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
4538         )
4539         pkts.append(p)
4540         p = (
4541             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4542             / IP(src=server1.ip4, dst=server2_nat_ip)
4543             / UDP(sport=server_udp_port, dport=self.udp_port_out)
4544         )
4545         pkts.append(p)
4546         p = (
4547             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4548             / IP(src=server1.ip4, dst=server2_nat_ip)
4549             / ICMP(id=self.icmp_id_out, type="echo-reply")
4550         )
4551         pkts.append(p)
4552         self.pg0.add_stream(pkts)
4553         self.pg_enable_capture(self.pg_interfaces)
4554         self.pg_start()
4555         capture = self.pg0.get_capture(len(pkts))
4556         for packet in capture:
4557             try:
4558                 self.assertEqual(packet[IP].src, server1_nat_ip)
4559                 self.assertEqual(packet[IP].dst, server2.ip4)
4560                 if packet.haslayer(TCP):
4561                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
4562                     self.assertEqual(packet[TCP].sport, server_tcp_port)
4563                     self.assert_packet_checksums_valid(packet)
4564                 elif packet.haslayer(UDP):
4565                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
4566                     self.assertEqual(packet[UDP].sport, server_udp_port)
4567                 else:
4568                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4569             except:
4570                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4571                 raise
4572
4573
4574 if __name__ == "__main__":
4575     unittest.main(testRunner=VppTestRunner)