nat: improve nat44-ed outside address distribution
[vpp.git] / test / test_nat64.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import random
5 import socket
6 import struct
7 import unittest
8 from io import BytesIO
9
10 import scapy.compat
11 from config import config
12 from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, is_distro_ubuntu2204
13 from framework import VppTestCase, VppTestRunner
14 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
15 from scapy.data import IP_PROTOS
16 from scapy.layers.inet import IP, TCP, UDP, ICMP
17 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
18 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
19 from scapy.layers.inet6 import (
20     IPv6,
21     ICMPv6EchoRequest,
22     ICMPv6EchoReply,
23     ICMPv6ND_NS,
24     ICMPv6ND_NA,
25     ICMPv6NDOptDstLLAddr,
26     fragment6,
27 )
28 from scapy.layers.l2 import Ether, GRE
29 from scapy.packet import Raw
30 from syslog_rfc5424_parser import SyslogMessage, ParseError
31 from syslog_rfc5424_parser.constants import SyslogSeverity
32 from util import ppc, ppp
33 from vpp_papi import VppEnum
34
35
36 @tag_fixme_vpp_workers
37 @tag_fixme_ubuntu2204
38 class TestNAT64(VppTestCase):
39     """NAT64 Test Cases"""
40
41     @property
42     def SYSLOG_SEVERITY(self):
43         return VppEnum.vl_api_syslog_severity_t
44
45     @property
46     def config_flags(self):
47         return VppEnum.vl_api_nat_config_flags_t
48
49     @classmethod
50     def setUpClass(cls):
51         super(TestNAT64, cls).setUpClass()
52
53         if is_distro_ubuntu2204 == True and not hasattr(cls, "vpp"):
54             return
55         cls.tcp_port_in = 6303
56         cls.tcp_port_out = 6303
57         cls.udp_port_in = 6304
58         cls.udp_port_out = 6304
59         cls.icmp_id_in = 6305
60         cls.icmp_id_out = 6305
61         cls.tcp_external_port = 80
62         cls.nat_addr = "10.0.0.3"
63         cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
64         cls.vrf1_id = 10
65         cls.vrf1_nat_addr = "10.0.10.3"
66         cls.ipfix_src_port = 4739
67         cls.ipfix_domain_id = 1
68
69         cls.create_pg_interfaces(range(6))
70         cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
71         cls.ip6_interfaces.append(cls.pg_interfaces[2])
72         cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
73
74         cls.vapi.ip_table_add_del(
75             is_add=1, table={"table_id": cls.vrf1_id, "is_ip6": 1}
76         )
77
78         cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
79
80         cls.pg0.generate_remote_hosts(2)
81
82         for i in cls.ip6_interfaces:
83             i.admin_up()
84             i.config_ip6()
85             i.configure_ipv6_neighbors()
86
87         for i in cls.ip4_interfaces:
88             i.admin_up()
89             i.config_ip4()
90             i.resolve_arp()
91
92         cls.pg3.admin_up()
93         cls.pg3.config_ip4()
94         cls.pg3.resolve_arp()
95         cls.pg3.config_ip6()
96         cls.pg3.configure_ipv6_neighbors()
97
98         cls.pg5.admin_up()
99         cls.pg5.config_ip6()
100
101     @classmethod
102     def tearDownClass(cls):
103         super(TestNAT64, cls).tearDownClass()
104
105     def setUp(self):
106         super(TestNAT64, self).setUp()
107         self.vapi.nat64_plugin_enable_disable(enable=1, bib_buckets=128, st_buckets=256)
108
109     def tearDown(self):
110         super(TestNAT64, self).tearDown()
111         if not self.vpp_dead:
112             self.vapi.nat64_plugin_enable_disable(enable=0)
113
114     def show_commands_at_teardown(self):
115         self.logger.info(self.vapi.cli("show nat64 pool"))
116         self.logger.info(self.vapi.cli("show nat64 interfaces"))
117         self.logger.info(self.vapi.cli("show nat64 prefix"))
118         self.logger.info(self.vapi.cli("show nat64 bib all"))
119         self.logger.info(self.vapi.cli("show nat64 session table all"))
120
121     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
122         """
123         Create IPv6 packet stream for inside network
124
125         :param in_if: Inside interface
126         :param out_if: Outside interface
127         :param ttl: Hop Limit of generated packets
128         :param pref: NAT64 prefix
129         :param plen: NAT64 prefix length
130         """
131         pkts = []
132         if pref is None:
133             dst = "".join(["64:ff9b::", out_if.remote_ip4])
134         else:
135             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
136
137         # TCP
138         p = (
139             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
140             / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim)
141             / TCP(sport=self.tcp_port_in, dport=20)
142         )
143         pkts.append(p)
144
145         # UDP
146         p = (
147             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
148             / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim)
149             / UDP(sport=self.udp_port_in, dport=20)
150         )
151         pkts.append(p)
152
153         # ICMP
154         p = (
155             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
156             / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim)
157             / ICMPv6EchoRequest(id=self.icmp_id_in)
158         )
159         pkts.append(p)
160
161         return pkts
162
163     def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
164         """
165         Create packet stream for outside network
166
167         :param out_if: Outside interface
168         :param dst_ip: Destination IP address (Default use global NAT address)
169         :param ttl: TTL of generated packets
170         :param use_inside_ports: Use inside NAT ports as destination ports
171                instead of outside ports
172         """
173         if dst_ip is None:
174             dst_ip = self.nat_addr
175         if not use_inside_ports:
176             tcp_port = self.tcp_port_out
177             udp_port = self.udp_port_out
178             icmp_id = self.icmp_id_out
179         else:
180             tcp_port = self.tcp_port_in
181             udp_port = self.udp_port_in
182             icmp_id = self.icmp_id_in
183         pkts = []
184         # TCP
185         p = (
186             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
187             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
188             / TCP(dport=tcp_port, sport=20)
189         )
190         pkts.extend([p, p])
191
192         # UDP
193         p = (
194             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
195             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
196             / UDP(dport=udp_port, sport=20)
197         )
198         pkts.append(p)
199
200         # ICMP
201         p = (
202             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
203             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
204             / ICMP(id=icmp_id, type="echo-reply")
205         )
206         pkts.append(p)
207
208         return pkts
209
210     def verify_capture_out(
211         self,
212         capture,
213         nat_ip=None,
214         same_port=False,
215         dst_ip=None,
216         is_ip6=False,
217         ignore_port=False,
218     ):
219         """
220         Verify captured packets on outside network
221
222         :param capture: Captured packets
223         :param nat_ip: Translated IP address (Default use global NAT address)
224         :param same_port: Source port number is not translated (Default False)
225         :param dst_ip: Destination IP address (Default do not verify)
226         :param is_ip6: If L3 protocol is IPv6 (Default False)
227         """
228         if is_ip6:
229             IP46 = IPv6
230             ICMP46 = ICMPv6EchoRequest
231         else:
232             IP46 = IP
233             ICMP46 = ICMP
234         if nat_ip is None:
235             nat_ip = self.nat_addr
236         for packet in capture:
237             try:
238                 if not is_ip6:
239                     self.assert_packet_checksums_valid(packet)
240                 self.assertEqual(packet[IP46].src, nat_ip)
241                 if dst_ip is not None:
242                     self.assertEqual(packet[IP46].dst, dst_ip)
243                 if packet.haslayer(TCP):
244                     if not ignore_port:
245                         if same_port:
246                             self.assertEqual(packet[TCP].sport, self.tcp_port_in)
247                         else:
248                             self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
249                     self.tcp_port_out = packet[TCP].sport
250                     self.assert_packet_checksums_valid(packet)
251                 elif packet.haslayer(UDP):
252                     if not ignore_port:
253                         if same_port:
254                             self.assertEqual(packet[UDP].sport, self.udp_port_in)
255                         else:
256                             self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
257                     self.udp_port_out = packet[UDP].sport
258                 else:
259                     if not ignore_port:
260                         if same_port:
261                             self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
262                         else:
263                             self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
264                     self.icmp_id_out = packet[ICMP46].id
265                     self.assert_packet_checksums_valid(packet)
266             except:
267                 self.logger.error(
268                     ppp("Unexpected or invalid packet (outside network):", packet)
269                 )
270                 raise
271
272     def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
273         """
274         Verify captured IPv6 packets on inside network
275
276         :param capture: Captured packets
277         :param src_ip: Source IP
278         :param dst_ip: Destination IP address
279         """
280         for packet in capture:
281             try:
282                 self.assertEqual(packet[IPv6].src, src_ip)
283                 self.assertEqual(packet[IPv6].dst, dst_ip)
284                 self.assert_packet_checksums_valid(packet)
285                 if packet.haslayer(TCP):
286                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
287                 elif packet.haslayer(UDP):
288                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
289                 else:
290                     self.assertEqual(packet[ICMPv6EchoReply].id, self.icmp_id_in)
291             except:
292                 self.logger.error(
293                     ppp("Unexpected or invalid packet (inside network):", packet)
294                 )
295                 raise
296
297     def create_stream_frag(
298         self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
299     ):
300         """
301         Create fragmented packet stream
302
303         :param src_if: Source interface
304         :param dst: Destination IPv4 address
305         :param sport: Source port
306         :param dport: Destination port
307         :param data: Payload data
308         :param proto: protocol (TCP, UDP, ICMP)
309         :param echo_reply: use echo_reply if protocol is ICMP
310         :returns: Fragments
311         """
312         if proto == IP_PROTOS.tcp:
313             p = (
314                 IP(src=src_if.remote_ip4, dst=dst)
315                 / TCP(sport=sport, dport=dport)
316                 / Raw(data)
317             )
318             p = p.__class__(scapy.compat.raw(p))
319             chksum = p[TCP].chksum
320             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
321         elif proto == IP_PROTOS.udp:
322             proto_header = UDP(sport=sport, dport=dport)
323         elif proto == IP_PROTOS.icmp:
324             if not echo_reply:
325                 proto_header = ICMP(id=sport, type="echo-request")
326             else:
327                 proto_header = ICMP(id=sport, type="echo-reply")
328         else:
329             raise Exception("Unsupported protocol")
330         id = random.randint(0, 65535)
331         pkts = []
332         if proto == IP_PROTOS.tcp:
333             raw = Raw(data[0:4])
334         else:
335             raw = Raw(data[0:16])
336         p = (
337             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
338             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
339             / proto_header
340             / raw
341         )
342         pkts.append(p)
343         if proto == IP_PROTOS.tcp:
344             raw = Raw(data[4:20])
345         else:
346             raw = Raw(data[16:32])
347         p = (
348             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
349             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
350             / raw
351         )
352         pkts.append(p)
353         if proto == IP_PROTOS.tcp:
354             raw = Raw(data[20:])
355         else:
356             raw = Raw(data[32:])
357         p = (
358             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
359             / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
360             / raw
361         )
362         pkts.append(p)
363         return pkts
364
365     def create_stream_frag_ip6(
366         self, src_if, dst, sport, dport, data, pref=None, plen=0, frag_size=128
367     ):
368         """
369         Create fragmented packet stream
370
371         :param src_if: Source interface
372         :param dst: Destination IPv4 address
373         :param sport: Source TCP port
374         :param dport: Destination TCP port
375         :param data: Payload data
376         :param pref: NAT64 prefix
377         :param plen: NAT64 prefix length
378         :param fragsize: size of fragments
379         :returns: Fragments
380         """
381         if pref is None:
382             dst_ip6 = "".join(["64:ff9b::", dst])
383         else:
384             dst_ip6 = self.compose_ip6(dst, pref, plen)
385
386         p = (
387             Ether(dst=src_if.local_mac, src=src_if.remote_mac)
388             / IPv6(src=src_if.remote_ip6, dst=dst_ip6)
389             / IPv6ExtHdrFragment(id=random.randint(0, 65535))
390             / TCP(sport=sport, dport=dport)
391             / Raw(data)
392         )
393
394         return fragment6(p, frag_size)
395
396     def reass_frags_and_verify(self, frags, src, dst):
397         """
398         Reassemble and verify fragmented packet
399
400         :param frags: Captured fragments
401         :param src: Source IPv4 address to verify
402         :param dst: Destination IPv4 address to verify
403
404         :returns: Reassembled IPv4 packet
405         """
406         buffer = BytesIO()
407         for p in frags:
408             self.assertEqual(p[IP].src, src)
409             self.assertEqual(p[IP].dst, dst)
410             self.assert_ip_checksum_valid(p)
411             buffer.seek(p[IP].frag * 8)
412             buffer.write(bytes(p[IP].payload))
413         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
414         if ip.proto == IP_PROTOS.tcp:
415             p = ip / TCP(buffer.getvalue())
416             self.logger.debug(ppp("Reassembled:", p))
417             self.assert_tcp_checksum_valid(p)
418         elif ip.proto == IP_PROTOS.udp:
419             p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
420         elif ip.proto == IP_PROTOS.icmp:
421             p = ip / ICMP(buffer.getvalue())
422         return p
423
424     def reass_frags_and_verify_ip6(self, frags, src, dst):
425         """
426         Reassemble and verify fragmented packet
427
428         :param frags: Captured fragments
429         :param src: Source IPv6 address to verify
430         :param dst: Destination IPv6 address to verify
431
432         :returns: Reassembled IPv6 packet
433         """
434         buffer = BytesIO()
435         for p in frags:
436             self.assertEqual(p[IPv6].src, src)
437             self.assertEqual(p[IPv6].dst, dst)
438             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
439             buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
440         ip = IPv6(
441             src=frags[0][IPv6].src,
442             dst=frags[0][IPv6].dst,
443             nh=frags[0][IPv6ExtHdrFragment].nh,
444         )
445         if ip.nh == IP_PROTOS.tcp:
446             p = ip / TCP(buffer.getvalue())
447         elif ip.nh == IP_PROTOS.udp:
448             p = ip / UDP(buffer.getvalue())
449         self.logger.debug(ppp("Reassembled:", p))
450         self.assert_packet_checksums_valid(p)
451         return p
452
453     def verify_ipfix_max_bibs(self, data, limit):
454         """
455         Verify IPFIX maximum BIB entries exceeded event
456
457         :param data: Decoded IPFIX data records
458         :param limit: Number of maximum BIB entries that can be created.
459         """
460         self.assertEqual(1, len(data))
461         record = data[0]
462         # natEvent
463         self.assertEqual(scapy.compat.orb(record[230]), 13)
464         # natQuotaExceededEvent
465         self.assertEqual(struct.pack("I", 2), record[466])
466         # maxBIBEntries
467         self.assertEqual(struct.pack("I", limit), record[472])
468
469     def verify_ipfix_bib(self, data, is_create, src_addr):
470         """
471         Verify IPFIX NAT64 BIB create and delete events
472
473         :param data: Decoded IPFIX data records
474         :param is_create: Create event if nonzero value otherwise delete event
475         :param src_addr: IPv6 source address
476         """
477         self.assertEqual(1, len(data))
478         record = data[0]
479         # natEvent
480         if is_create:
481             self.assertEqual(scapy.compat.orb(record[230]), 10)
482         else:
483             self.assertEqual(scapy.compat.orb(record[230]), 11)
484         # sourceIPv6Address
485         self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
486         # postNATSourceIPv4Address
487         self.assertEqual(self.nat_addr_n, record[225])
488         # protocolIdentifier
489         self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
490         # ingressVRFID
491         self.assertEqual(struct.pack("!I", 0), record[234])
492         # sourceTransportPort
493         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
494         # postNAPTSourceTransportPort
495         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
496
497     def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr, dst_port):
498         """
499         Verify IPFIX NAT64 session create and delete events
500
501         :param data: Decoded IPFIX data records
502         :param is_create: Create event if nonzero value otherwise delete event
503         :param src_addr: IPv6 source address
504         :param dst_addr: IPv4 destination address
505         :param dst_port: destination TCP port
506         """
507         self.assertEqual(1, len(data))
508         record = data[0]
509         # natEvent
510         if is_create:
511             self.assertEqual(scapy.compat.orb(record[230]), 6)
512         else:
513             self.assertEqual(scapy.compat.orb(record[230]), 7)
514         # sourceIPv6Address
515         self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
516         # destinationIPv6Address
517         self.assertEqual(
518             socket.inet_pton(
519                 socket.AF_INET6, self.compose_ip6(dst_addr, "64:ff9b::", 96)
520             ),
521             record[28],
522         )
523         # postNATSourceIPv4Address
524         self.assertEqual(self.nat_addr_n, record[225])
525         # postNATDestinationIPv4Address
526         self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr), record[226])
527         # protocolIdentifier
528         self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
529         # ingressVRFID
530         self.assertEqual(struct.pack("!I", 0), record[234])
531         # sourceTransportPort
532         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
533         # postNAPTSourceTransportPort
534         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
535         # destinationTransportPort
536         self.assertEqual(struct.pack("!H", dst_port), record[11])
537         # postNAPTDestinationTransportPort
538         self.assertEqual(struct.pack("!H", dst_port), record[228])
539
540     def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
541         message = data.decode("utf-8")
542         try:
543             message = SyslogMessage.parse(message)
544         except ParseError as e:
545             self.logger.error(e)
546             raise
547         else:
548             self.assertEqual(message.severity, SyslogSeverity.info)
549             self.assertEqual(message.appname, "NAT")
550             self.assertEqual(message.msgid, "SADD" if is_add else "SDEL")
551             sd_params = message.sd.get("nsess")
552             self.assertTrue(sd_params is not None)
553             if is_ip6:
554                 self.assertEqual(sd_params.get("IATYP"), "IPv6")
555                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip6)
556             else:
557                 self.assertEqual(sd_params.get("IATYP"), "IPv4")
558                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
559                 self.assertTrue(sd_params.get("SSUBIX") is not None)
560             self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
561             self.assertEqual(sd_params.get("XATYP"), "IPv4")
562             self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
563             self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
564             self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
565             self.assertEqual(sd_params.get("SVLAN"), "0")
566             self.assertEqual(sd_params.get("XDADDR"), self.pg1.remote_ip4)
567             self.assertEqual(sd_params.get("XDPORT"), "%d" % self.tcp_external_port)
568
569     def compose_ip6(self, ip4, pref, plen):
570         """
571         Compose IPv4-embedded IPv6 addresses
572
573         :param ip4: IPv4 address
574         :param pref: IPv6 prefix
575         :param plen: IPv6 prefix length
576         :returns: IPv4-embedded IPv6 addresses
577         """
578         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
579         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
580         if plen == 32:
581             pref_n[4] = ip4_n[0]
582             pref_n[5] = ip4_n[1]
583             pref_n[6] = ip4_n[2]
584             pref_n[7] = ip4_n[3]
585         elif plen == 40:
586             pref_n[5] = ip4_n[0]
587             pref_n[6] = ip4_n[1]
588             pref_n[7] = ip4_n[2]
589             pref_n[9] = ip4_n[3]
590         elif plen == 48:
591             pref_n[6] = ip4_n[0]
592             pref_n[7] = ip4_n[1]
593             pref_n[9] = ip4_n[2]
594             pref_n[10] = ip4_n[3]
595         elif plen == 56:
596             pref_n[7] = ip4_n[0]
597             pref_n[9] = ip4_n[1]
598             pref_n[10] = ip4_n[2]
599             pref_n[11] = ip4_n[3]
600         elif plen == 64:
601             pref_n[9] = ip4_n[0]
602             pref_n[10] = ip4_n[1]
603             pref_n[11] = ip4_n[2]
604             pref_n[12] = ip4_n[3]
605         elif plen == 96:
606             pref_n[12] = ip4_n[0]
607             pref_n[13] = ip4_n[1]
608             pref_n[14] = ip4_n[2]
609             pref_n[15] = ip4_n[3]
610         packed_pref_n = b"".join([scapy.compat.chb(x) for x in pref_n])
611         return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
612
613     def verify_ipfix_max_sessions(self, data, limit):
614         """
615         Verify IPFIX maximum session entries exceeded event
616
617         :param data: Decoded IPFIX data records
618         :param limit: Number of maximum session entries that can be created.
619         """
620         self.assertEqual(1, len(data))
621         record = data[0]
622         # natEvent
623         self.assertEqual(scapy.compat.orb(record[230]), 13)
624         # natQuotaExceededEvent
625         self.assertEqual(struct.pack("I", 1), record[466])
626         # maxSessionEntries
627         self.assertEqual(struct.pack("I", limit), record[471])
628
629     def test_nat64_inside_interface_handles_neighbor_advertisement(self):
630         """NAT64 inside interface handles Neighbor Advertisement"""
631
632         flags = self.config_flags.NAT_IS_INSIDE
633         self.vapi.nat64_add_del_interface(
634             is_add=1, flags=flags, sw_if_index=self.pg5.sw_if_index
635         )
636
637         # Try to send ping
638         ping = (
639             Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac)
640             / IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6)
641             / ICMPv6EchoRequest()
642         )
643         pkts = [ping]
644         self.pg5.add_stream(pkts)
645         self.pg_enable_capture(self.pg_interfaces)
646         self.pg_start()
647
648         # Wait for Neighbor Solicitation
649         capture = self.pg5.get_capture(len(pkts))
650         packet = capture[0]
651         try:
652             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
653             self.assertEqual(packet.haslayer(ICMPv6ND_NS), 1)
654             tgt = packet[ICMPv6ND_NS].tgt
655         except:
656             self.logger.error(ppp("Unexpected or invalid packet:", packet))
657             raise
658
659         # Send Neighbor Advertisement
660         p = (
661             Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac)
662             / IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6)
663             / ICMPv6ND_NA(tgt=tgt)
664             / ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac)
665         )
666         pkts = [p]
667         self.pg5.add_stream(pkts)
668         self.pg_enable_capture(self.pg_interfaces)
669         self.pg_start()
670
671         # Try to send ping again
672         pkts = [ping]
673         self.pg5.add_stream(pkts)
674         self.pg_enable_capture(self.pg_interfaces)
675         self.pg_start()
676
677         # Wait for ping reply
678         capture = self.pg5.get_capture(len(pkts))
679         packet = capture[0]
680         try:
681             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
682             self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
683             self.assertEqual(packet.haslayer(ICMPv6EchoReply), 1)
684         except:
685             self.logger.error(ppp("Unexpected or invalid packet:", packet))
686             raise
687
688     def test_pool(self):
689         """Add/delete address to NAT64 pool"""
690         nat_addr = "1.2.3.4"
691
692         self.vapi.nat64_add_del_pool_addr_range(
693             start_addr=nat_addr, end_addr=nat_addr, vrf_id=0xFFFFFFFF, is_add=1
694         )
695
696         addresses = self.vapi.nat64_pool_addr_dump()
697         self.assertEqual(len(addresses), 1)
698         self.assertEqual(str(addresses[0].address), nat_addr)
699
700         self.vapi.nat64_add_del_pool_addr_range(
701             start_addr=nat_addr, end_addr=nat_addr, vrf_id=0xFFFFFFFF, is_add=0
702         )
703
704         addresses = self.vapi.nat64_pool_addr_dump()
705         self.assertEqual(len(addresses), 0)
706
707     def test_interface(self):
708         """Enable/disable NAT64 feature on the interface"""
709         flags = self.config_flags.NAT_IS_INSIDE
710         self.vapi.nat64_add_del_interface(
711             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
712         )
713         self.vapi.nat64_add_del_interface(
714             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
715         )
716
717         interfaces = self.vapi.nat64_interface_dump()
718         self.assertEqual(len(interfaces), 2)
719         pg0_found = False
720         pg1_found = False
721         for intf in interfaces:
722             if intf.sw_if_index == self.pg0.sw_if_index:
723                 self.assertEqual(intf.flags, self.config_flags.NAT_IS_INSIDE)
724                 pg0_found = True
725             elif intf.sw_if_index == self.pg1.sw_if_index:
726                 self.assertEqual(intf.flags, self.config_flags.NAT_IS_OUTSIDE)
727                 pg1_found = True
728         self.assertTrue(pg0_found)
729         self.assertTrue(pg1_found)
730
731         features = self.vapi.cli("show interface features pg0")
732         self.assertIn("nat64-in2out", features)
733         features = self.vapi.cli("show interface features pg1")
734         self.assertIn("nat64-out2in", features)
735
736         self.vapi.nat64_add_del_interface(
737             is_add=0, flags=flags, sw_if_index=self.pg0.sw_if_index
738         )
739         self.vapi.nat64_add_del_interface(
740             is_add=0, flags=flags, sw_if_index=self.pg1.sw_if_index
741         )
742
743         interfaces = self.vapi.nat64_interface_dump()
744         self.assertEqual(len(interfaces), 0)
745
746     def test_static_bib(self):
747         """Add/delete static BIB entry"""
748         in_addr = "2001:db8:85a3::8a2e:370:7334"
749         out_addr = "10.1.1.3"
750         in_port = 1234
751         out_port = 5678
752         proto = IP_PROTOS.tcp
753
754         self.vapi.nat64_add_del_static_bib(
755             i_addr=in_addr,
756             o_addr=out_addr,
757             i_port=in_port,
758             o_port=out_port,
759             proto=proto,
760             vrf_id=0,
761             is_add=1,
762         )
763         bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
764         static_bib_num = 0
765         for bibe in bib:
766             if bibe.flags & self.config_flags.NAT_IS_STATIC:
767                 static_bib_num += 1
768                 self.assertEqual(str(bibe.i_addr), in_addr)
769                 self.assertEqual(str(bibe.o_addr), out_addr)
770                 self.assertEqual(bibe.i_port, in_port)
771                 self.assertEqual(bibe.o_port, out_port)
772         self.assertEqual(static_bib_num, 1)
773         bibs = self.statistics.get_counter("/nat64/total-bibs")
774         self.assertEqual(bibs[0][0], 1)
775
776         self.vapi.nat64_add_del_static_bib(
777             i_addr=in_addr,
778             o_addr=out_addr,
779             i_port=in_port,
780             o_port=out_port,
781             proto=proto,
782             vrf_id=0,
783             is_add=0,
784         )
785         bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
786         static_bib_num = 0
787         for bibe in bib:
788             if bibe.flags & self.config_flags.NAT_IS_STATIC:
789                 static_bib_num += 1
790         self.assertEqual(static_bib_num, 0)
791         bibs = self.statistics.get_counter("/nat64/total-bibs")
792         self.assertEqual(bibs[0][0], 0)
793
794     def test_set_timeouts(self):
795         """Set NAT64 timeouts"""
796         # verify default values
797         timeouts = self.vapi.nat64_get_timeouts()
798         self.assertEqual(timeouts.udp, 300)
799         self.assertEqual(timeouts.icmp, 60)
800         self.assertEqual(timeouts.tcp_transitory, 240)
801         self.assertEqual(timeouts.tcp_established, 7440)
802
803         # set and verify custom values
804         self.vapi.nat64_set_timeouts(
805             udp=200, tcp_established=7450, tcp_transitory=250, icmp=30
806         )
807         timeouts = self.vapi.nat64_get_timeouts()
808         self.assertEqual(timeouts.udp, 200)
809         self.assertEqual(timeouts.icmp, 30)
810         self.assertEqual(timeouts.tcp_transitory, 250)
811         self.assertEqual(timeouts.tcp_established, 7450)
812
813     def test_dynamic(self):
814         """NAT64 dynamic translation test"""
815         self.tcp_port_in = 6303
816         self.udp_port_in = 6304
817         self.icmp_id_in = 6305
818
819         ses_num_start = self.nat64_get_ses_num()
820
821         self.vapi.nat64_add_del_pool_addr_range(
822             start_addr=self.nat_addr,
823             end_addr=self.nat_addr,
824             vrf_id=0xFFFFFFFF,
825             is_add=1,
826         )
827         flags = self.config_flags.NAT_IS_INSIDE
828         self.vapi.nat64_add_del_interface(
829             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
830         )
831         self.vapi.nat64_add_del_interface(
832             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
833         )
834
835         # in2out
836         tcpn = self.statistics.get_counter("/nat64/in2out/tcp")[0]
837         udpn = self.statistics.get_counter("/nat64/in2out/udp")[0]
838         icmpn = self.statistics.get_counter("/nat64/in2out/icmp")[0]
839         drops = self.statistics.get_counter("/nat64/in2out/drops")[0]
840
841         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
842         self.pg0.add_stream(pkts)
843         self.pg_enable_capture(self.pg_interfaces)
844         self.pg_start()
845         capture = self.pg1.get_capture(len(pkts))
846         self.verify_capture_out(
847             capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
848         )
849
850         if_idx = self.pg0.sw_if_index
851         cnt = self.statistics.get_counter("/nat64/in2out/tcp")[0]
852         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 1)
853         cnt = self.statistics.get_counter("/nat64/in2out/udp")[0]
854         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
855         cnt = self.statistics.get_counter("/nat64/in2out/icmp")[0]
856         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
857         cnt = self.statistics.get_counter("/nat64/in2out/drops")[0]
858         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
859
860         # out2in
861         tcpn = self.statistics.get_counter("/nat64/out2in/tcp")[0]
862         udpn = self.statistics.get_counter("/nat64/out2in/udp")[0]
863         icmpn = self.statistics.get_counter("/nat64/out2in/icmp")[0]
864         drops = self.statistics.get_counter("/nat64/out2in/drops")[0]
865
866         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
867         self.pg1.add_stream(pkts)
868         self.pg_enable_capture(self.pg_interfaces)
869         self.pg_start()
870         capture = self.pg0.get_capture(len(pkts))
871         ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4]))
872         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
873
874         if_idx = self.pg1.sw_if_index
875         cnt = self.statistics.get_counter("/nat64/out2in/tcp")[0]
876         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
877         cnt = self.statistics.get_counter("/nat64/out2in/udp")[0]
878         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
879         cnt = self.statistics.get_counter("/nat64/out2in/icmp")[0]
880         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
881         cnt = self.statistics.get_counter("/nat64/out2in/drops")[0]
882         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
883
884         bibs = self.statistics.get_counter("/nat64/total-bibs")
885         self.assertEqual(bibs[0][0], 3)
886         sessions = self.statistics.get_counter("/nat64/total-sessions")
887         self.assertEqual(sessions[0][0], 3)
888
889         # in2out
890         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
891         self.pg0.add_stream(pkts)
892         self.pg_enable_capture(self.pg_interfaces)
893         self.pg_start()
894         capture = self.pg1.get_capture(len(pkts))
895         self.verify_capture_out(
896             capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
897         )
898
899         # out2in
900         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
901         self.pg1.add_stream(pkts)
902         self.pg_enable_capture(self.pg_interfaces)
903         self.pg_start()
904         capture = self.pg0.get_capture(len(pkts))
905         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
906
907         ses_num_end = self.nat64_get_ses_num()
908
909         self.assertEqual(ses_num_end - ses_num_start, 3)
910
911         # tenant with specific VRF
912         self.vapi.nat64_add_del_pool_addr_range(
913             start_addr=self.vrf1_nat_addr,
914             end_addr=self.vrf1_nat_addr,
915             vrf_id=self.vrf1_id,
916             is_add=1,
917         )
918         flags = self.config_flags.NAT_IS_INSIDE
919         self.vapi.nat64_add_del_interface(
920             is_add=1, flags=flags, sw_if_index=self.pg2.sw_if_index
921         )
922
923         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
924         self.pg2.add_stream(pkts)
925         self.pg_enable_capture(self.pg_interfaces)
926         self.pg_start()
927         capture = self.pg1.get_capture(len(pkts))
928         self.verify_capture_out(
929             capture, nat_ip=self.vrf1_nat_addr, dst_ip=self.pg1.remote_ip4
930         )
931
932         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
933         self.pg1.add_stream(pkts)
934         self.pg_enable_capture(self.pg_interfaces)
935         self.pg_start()
936         capture = self.pg2.get_capture(len(pkts))
937         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
938
939     def test_static(self):
940         """NAT64 static translation test"""
941         self.tcp_port_in = 60303
942         self.udp_port_in = 60304
943         self.icmp_id_in = 60305
944         self.tcp_port_out = 60303
945         self.udp_port_out = 60304
946         self.icmp_id_out = 60305
947
948         ses_num_start = self.nat64_get_ses_num()
949
950         self.vapi.nat64_add_del_pool_addr_range(
951             start_addr=self.nat_addr,
952             end_addr=self.nat_addr,
953             vrf_id=0xFFFFFFFF,
954             is_add=1,
955         )
956         flags = self.config_flags.NAT_IS_INSIDE
957         self.vapi.nat64_add_del_interface(
958             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
959         )
960         self.vapi.nat64_add_del_interface(
961             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
962         )
963
964         self.vapi.nat64_add_del_static_bib(
965             i_addr=self.pg0.remote_ip6,
966             o_addr=self.nat_addr,
967             i_port=self.tcp_port_in,
968             o_port=self.tcp_port_out,
969             proto=IP_PROTOS.tcp,
970             vrf_id=0,
971             is_add=1,
972         )
973         self.vapi.nat64_add_del_static_bib(
974             i_addr=self.pg0.remote_ip6,
975             o_addr=self.nat_addr,
976             i_port=self.udp_port_in,
977             o_port=self.udp_port_out,
978             proto=IP_PROTOS.udp,
979             vrf_id=0,
980             is_add=1,
981         )
982         self.vapi.nat64_add_del_static_bib(
983             i_addr=self.pg0.remote_ip6,
984             o_addr=self.nat_addr,
985             i_port=self.icmp_id_in,
986             o_port=self.icmp_id_out,
987             proto=IP_PROTOS.icmp,
988             vrf_id=0,
989             is_add=1,
990         )
991
992         # in2out
993         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
994         self.pg0.add_stream(pkts)
995         self.pg_enable_capture(self.pg_interfaces)
996         self.pg_start()
997         capture = self.pg1.get_capture(len(pkts))
998         self.verify_capture_out(
999             capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4, same_port=True
1000         )
1001
1002         # out2in
1003         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1004         self.pg1.add_stream(pkts)
1005         self.pg_enable_capture(self.pg_interfaces)
1006         self.pg_start()
1007         capture = self.pg0.get_capture(len(pkts))
1008         ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4]))
1009         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
1010
1011         ses_num_end = self.nat64_get_ses_num()
1012
1013         self.assertEqual(ses_num_end - ses_num_start, 3)
1014
1015     def test_session_timeout(self):
1016         """NAT64 session timeout"""
1017         self.icmp_id_in = 1234
1018         self.vapi.nat64_add_del_pool_addr_range(
1019             start_addr=self.nat_addr,
1020             end_addr=self.nat_addr,
1021             vrf_id=0xFFFFFFFF,
1022             is_add=1,
1023         )
1024         flags = self.config_flags.NAT_IS_INSIDE
1025         self.vapi.nat64_add_del_interface(
1026             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1027         )
1028         self.vapi.nat64_add_del_interface(
1029             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1030         )
1031         self.vapi.nat64_set_timeouts(
1032             udp=300, tcp_established=5, tcp_transitory=5, icmp=5
1033         )
1034
1035         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
1036         self.pg0.add_stream(pkts)
1037         self.pg_enable_capture(self.pg_interfaces)
1038         self.pg_start()
1039         capture = self.pg1.get_capture(len(pkts))
1040
1041         ses_num_before_timeout = self.nat64_get_ses_num()
1042
1043         self.virtual_sleep(15)
1044
1045         # ICMP and TCP session after timeout
1046         ses_num_after_timeout = self.nat64_get_ses_num()
1047         self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
1048
1049     def test_icmp_error(self):
1050         """NAT64 ICMP Error message translation"""
1051         self.tcp_port_in = 6303
1052         self.udp_port_in = 6304
1053         self.icmp_id_in = 6305
1054
1055         self.vapi.nat64_add_del_pool_addr_range(
1056             start_addr=self.nat_addr,
1057             end_addr=self.nat_addr,
1058             vrf_id=0xFFFFFFFF,
1059             is_add=1,
1060         )
1061         flags = self.config_flags.NAT_IS_INSIDE
1062         self.vapi.nat64_add_del_interface(
1063             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1064         )
1065         self.vapi.nat64_add_del_interface(
1066             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1067         )
1068
1069         # send some packets to create sessions
1070         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
1071         self.pg0.add_stream(pkts)
1072         self.pg_enable_capture(self.pg_interfaces)
1073         self.pg_start()
1074         capture_ip4 = self.pg1.get_capture(len(pkts))
1075         self.verify_capture_out(
1076             capture_ip4, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
1077         )
1078
1079         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1080         self.pg1.add_stream(pkts)
1081         self.pg_enable_capture(self.pg_interfaces)
1082         self.pg_start()
1083         capture_ip6 = self.pg0.get_capture(len(pkts))
1084         ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4]))
1085         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src, self.pg0.remote_ip6)
1086
1087         # in2out
1088         pkts = [
1089             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1090             / IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src)
1091             / ICMPv6DestUnreach(code=1)
1092             / packet[IPv6]
1093             for packet in capture_ip6
1094         ]
1095         self.pg0.add_stream(pkts)
1096         self.pg_enable_capture(self.pg_interfaces)
1097         self.pg_start()
1098         capture = self.pg1.get_capture(len(pkts))
1099         for packet in capture:
1100             try:
1101                 self.assertEqual(packet[IP].src, self.nat_addr)
1102                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1103                 self.assertEqual(packet[ICMP].type, 3)
1104                 self.assertEqual(packet[ICMP].code, 13)
1105                 inner = packet[IPerror]
1106                 self.assertEqual(inner.src, self.pg1.remote_ip4)
1107                 self.assertEqual(inner.dst, self.nat_addr)
1108                 self.assert_packet_checksums_valid(packet)
1109                 if inner.haslayer(TCPerror):
1110                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
1111                 elif inner.haslayer(UDPerror):
1112                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
1113                 else:
1114                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
1115             except:
1116                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1117                 raise
1118
1119         # out2in
1120         pkts = [
1121             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1122             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1123             / ICMP(type=3, code=13)
1124             / packet[IP]
1125             for packet in capture_ip4
1126         ]
1127         self.pg1.add_stream(pkts)
1128         self.pg_enable_capture(self.pg_interfaces)
1129         self.pg_start()
1130         capture = self.pg0.get_capture(len(pkts))
1131         for packet in capture:
1132             try:
1133                 self.assertEqual(packet[IPv6].src, ip.src)
1134                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1135                 icmp = packet[ICMPv6DestUnreach]
1136                 self.assertEqual(icmp.code, 1)
1137                 inner = icmp[IPerror6]
1138                 self.assertEqual(inner.src, self.pg0.remote_ip6)
1139                 self.assertEqual(inner.dst, ip.src)
1140                 self.assert_icmpv6_checksum_valid(packet)
1141                 if inner.haslayer(TCPerror):
1142                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
1143                 elif inner.haslayer(UDPerror):
1144                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
1145                 else:
1146                     self.assertEqual(inner[ICMPv6EchoRequest].id, self.icmp_id_in)
1147             except:
1148                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1149                 raise
1150
1151     def test_hairpinning(self):
1152         """NAT64 hairpinning"""
1153
1154         client = self.pg0.remote_hosts[0]
1155         server = self.pg0.remote_hosts[1]
1156         server_tcp_in_port = 22
1157         server_tcp_out_port = 4022
1158         server_udp_in_port = 23
1159         server_udp_out_port = 4023
1160         client_tcp_in_port = 1234
1161         client_udp_in_port = 1235
1162         client_tcp_out_port = 0
1163         client_udp_out_port = 0
1164         ip = IPv6(src="".join(["64:ff9b::", self.nat_addr]))
1165         nat_addr_ip6 = ip.src
1166
1167         self.vapi.nat64_add_del_pool_addr_range(
1168             start_addr=self.nat_addr,
1169             end_addr=self.nat_addr,
1170             vrf_id=0xFFFFFFFF,
1171             is_add=1,
1172         )
1173         flags = self.config_flags.NAT_IS_INSIDE
1174         self.vapi.nat64_add_del_interface(
1175             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1176         )
1177         self.vapi.nat64_add_del_interface(
1178             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1179         )
1180
1181         self.vapi.nat64_add_del_static_bib(
1182             i_addr=server.ip6n,
1183             o_addr=self.nat_addr,
1184             i_port=server_tcp_in_port,
1185             o_port=server_tcp_out_port,
1186             proto=IP_PROTOS.tcp,
1187             vrf_id=0,
1188             is_add=1,
1189         )
1190         self.vapi.nat64_add_del_static_bib(
1191             i_addr=server.ip6n,
1192             o_addr=self.nat_addr,
1193             i_port=server_udp_in_port,
1194             o_port=server_udp_out_port,
1195             proto=IP_PROTOS.udp,
1196             vrf_id=0,
1197             is_add=1,
1198         )
1199
1200         # client to server
1201         pkts = []
1202         p = (
1203             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1204             / IPv6(src=client.ip6, dst=nat_addr_ip6)
1205             / TCP(sport=client_tcp_in_port, dport=server_tcp_out_port)
1206         )
1207         pkts.append(p)
1208         p = (
1209             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1210             / IPv6(src=client.ip6, dst=nat_addr_ip6)
1211             / UDP(sport=client_udp_in_port, dport=server_udp_out_port)
1212         )
1213         pkts.append(p)
1214         self.pg0.add_stream(pkts)
1215         self.pg_enable_capture(self.pg_interfaces)
1216         self.pg_start()
1217         capture = self.pg0.get_capture(len(pkts))
1218         for packet in capture:
1219             try:
1220                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1221                 self.assertEqual(packet[IPv6].dst, server.ip6)
1222                 self.assert_packet_checksums_valid(packet)
1223                 if packet.haslayer(TCP):
1224                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
1225                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
1226                     client_tcp_out_port = packet[TCP].sport
1227                 else:
1228                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
1229                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
1230                     client_udp_out_port = packet[UDP].sport
1231             except:
1232                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1233                 raise
1234
1235         # server to client
1236         pkts = []
1237         p = (
1238             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1239             / IPv6(src=server.ip6, dst=nat_addr_ip6)
1240             / TCP(sport=server_tcp_in_port, dport=client_tcp_out_port)
1241         )
1242         pkts.append(p)
1243         p = (
1244             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1245             / IPv6(src=server.ip6, dst=nat_addr_ip6)
1246             / UDP(sport=server_udp_in_port, dport=client_udp_out_port)
1247         )
1248         pkts.append(p)
1249         self.pg0.add_stream(pkts)
1250         self.pg_enable_capture(self.pg_interfaces)
1251         self.pg_start()
1252         capture = self.pg0.get_capture(len(pkts))
1253         for packet in capture:
1254             try:
1255                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1256                 self.assertEqual(packet[IPv6].dst, client.ip6)
1257                 self.assert_packet_checksums_valid(packet)
1258                 if packet.haslayer(TCP):
1259                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
1260                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
1261                 else:
1262                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
1263                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
1264             except:
1265                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1266                 raise
1267
1268         # ICMP error
1269         pkts = []
1270         pkts = [
1271             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1272             / IPv6(src=client.ip6, dst=nat_addr_ip6)
1273             / ICMPv6DestUnreach(code=1)
1274             / packet[IPv6]
1275             for packet in capture
1276         ]
1277         self.pg0.add_stream(pkts)
1278         self.pg_enable_capture(self.pg_interfaces)
1279         self.pg_start()
1280         capture = self.pg0.get_capture(len(pkts))
1281         for packet in capture:
1282             try:
1283                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1284                 self.assertEqual(packet[IPv6].dst, server.ip6)
1285                 icmp = packet[ICMPv6DestUnreach]
1286                 self.assertEqual(icmp.code, 1)
1287                 inner = icmp[IPerror6]
1288                 self.assertEqual(inner.src, server.ip6)
1289                 self.assertEqual(inner.dst, nat_addr_ip6)
1290                 self.assert_packet_checksums_valid(packet)
1291                 if inner.haslayer(TCPerror):
1292                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
1293                     self.assertEqual(inner[TCPerror].dport, client_tcp_out_port)
1294                 else:
1295                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
1296                     self.assertEqual(inner[UDPerror].dport, client_udp_out_port)
1297             except:
1298                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1299                 raise
1300
1301     def test_prefix(self):
1302         """NAT64 Network-Specific Prefix"""
1303
1304         self.vapi.nat64_add_del_pool_addr_range(
1305             start_addr=self.nat_addr,
1306             end_addr=self.nat_addr,
1307             vrf_id=0xFFFFFFFF,
1308             is_add=1,
1309         )
1310         flags = self.config_flags.NAT_IS_INSIDE
1311         self.vapi.nat64_add_del_interface(
1312             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1313         )
1314         self.vapi.nat64_add_del_interface(
1315             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1316         )
1317         self.vapi.nat64_add_del_pool_addr_range(
1318             start_addr=self.vrf1_nat_addr,
1319             end_addr=self.vrf1_nat_addr,
1320             vrf_id=self.vrf1_id,
1321             is_add=1,
1322         )
1323         self.vapi.nat64_add_del_interface(
1324             is_add=1, flags=flags, sw_if_index=self.pg2.sw_if_index
1325         )
1326
1327         # Add global prefix
1328         global_pref64 = "2001:db8::"
1329         global_pref64_len = 32
1330         global_pref64_str = "{}/{}".format(global_pref64, global_pref64_len)
1331         self.vapi.nat64_add_del_prefix(prefix=global_pref64_str, vrf_id=0, is_add=1)
1332
1333         prefix = self.vapi.nat64_prefix_dump()
1334         self.assertEqual(len(prefix), 1)
1335         self.assertEqual(str(prefix[0].prefix), global_pref64_str)
1336         self.assertEqual(prefix[0].vrf_id, 0)
1337
1338         # Add tenant specific prefix
1339         vrf1_pref64 = "2001:db8:122:300::"
1340         vrf1_pref64_len = 56
1341         vrf1_pref64_str = "{}/{}".format(vrf1_pref64, vrf1_pref64_len)
1342         self.vapi.nat64_add_del_prefix(
1343             prefix=vrf1_pref64_str, vrf_id=self.vrf1_id, is_add=1
1344         )
1345
1346         prefix = self.vapi.nat64_prefix_dump()
1347         self.assertEqual(len(prefix), 2)
1348
1349         # Global prefix
1350         pkts = self.create_stream_in_ip6(
1351             self.pg0, self.pg1, pref=global_pref64, plen=global_pref64_len
1352         )
1353         self.pg0.add_stream(pkts)
1354         self.pg_enable_capture(self.pg_interfaces)
1355         self.pg_start()
1356         capture = self.pg1.get_capture(len(pkts))
1357         self.verify_capture_out(
1358             capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
1359         )
1360
1361         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1362         self.pg1.add_stream(pkts)
1363         self.pg_enable_capture(self.pg_interfaces)
1364         self.pg_start()
1365         capture = self.pg0.get_capture(len(pkts))
1366         dst_ip = self.compose_ip6(self.pg1.remote_ip4, global_pref64, global_pref64_len)
1367         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
1368
1369         # Tenant specific prefix
1370         pkts = self.create_stream_in_ip6(
1371             self.pg2, self.pg1, pref=vrf1_pref64, plen=vrf1_pref64_len
1372         )
1373         self.pg2.add_stream(pkts)
1374         self.pg_enable_capture(self.pg_interfaces)
1375         self.pg_start()
1376         capture = self.pg1.get_capture(len(pkts))
1377         self.verify_capture_out(
1378             capture, nat_ip=self.vrf1_nat_addr, dst_ip=self.pg1.remote_ip4
1379         )
1380
1381         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
1382         self.pg1.add_stream(pkts)
1383         self.pg_enable_capture(self.pg_interfaces)
1384         self.pg_start()
1385         capture = self.pg2.get_capture(len(pkts))
1386         dst_ip = self.compose_ip6(self.pg1.remote_ip4, vrf1_pref64, vrf1_pref64_len)
1387         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
1388
1389     def test_unknown_proto(self):
1390         """NAT64 translate packet with unknown protocol"""
1391
1392         self.vapi.nat64_add_del_pool_addr_range(
1393             start_addr=self.nat_addr,
1394             end_addr=self.nat_addr,
1395             vrf_id=0xFFFFFFFF,
1396             is_add=1,
1397         )
1398         flags = self.config_flags.NAT_IS_INSIDE
1399         self.vapi.nat64_add_del_interface(
1400             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1401         )
1402         self.vapi.nat64_add_del_interface(
1403             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1404         )
1405         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1406
1407         # in2out
1408         p = (
1409             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1410             / IPv6(src=self.pg0.remote_ip6, dst=remote_ip6)
1411             / TCP(sport=self.tcp_port_in, dport=20)
1412         )
1413         self.pg0.add_stream(p)
1414         self.pg_enable_capture(self.pg_interfaces)
1415         self.pg_start()
1416         p = self.pg1.get_capture(1)
1417
1418         p = (
1419             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1420             / IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47)
1421             / GRE()
1422             / IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4)
1423             / TCP(sport=1234, dport=1234)
1424         )
1425         self.pg0.add_stream(p)
1426         self.pg_enable_capture(self.pg_interfaces)
1427         self.pg_start()
1428         p = self.pg1.get_capture(1)
1429         packet = p[0]
1430         try:
1431             self.assertEqual(packet[IP].src, self.nat_addr)
1432             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1433             self.assertEqual(packet.haslayer(GRE), 1)
1434             self.assert_packet_checksums_valid(packet)
1435         except:
1436             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1437             raise
1438
1439         # out2in
1440         p = (
1441             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1442             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1443             / GRE()
1444             / IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4)
1445             / TCP(sport=1234, dport=1234)
1446         )
1447         self.pg1.add_stream(p)
1448         self.pg_enable_capture(self.pg_interfaces)
1449         self.pg_start()
1450         p = self.pg0.get_capture(1)
1451         packet = p[0]
1452         try:
1453             self.assertEqual(packet[IPv6].src, remote_ip6)
1454             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1455             self.assertEqual(packet[IPv6].nh, 47)
1456         except:
1457             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1458             raise
1459
1460     def test_hairpinning_unknown_proto(self):
1461         """NAT64 translate packet with unknown protocol - hairpinning"""
1462
1463         client = self.pg0.remote_hosts[0]
1464         server = self.pg0.remote_hosts[1]
1465         server_tcp_in_port = 22
1466         server_tcp_out_port = 4022
1467         client_tcp_in_port = 1234
1468         client_tcp_out_port = 1235
1469         server_nat_ip = "10.0.0.100"
1470         client_nat_ip = "10.0.0.110"
1471         server_nat_ip6 = self.compose_ip6(server_nat_ip, "64:ff9b::", 96)
1472         client_nat_ip6 = self.compose_ip6(client_nat_ip, "64:ff9b::", 96)
1473
1474         self.vapi.nat64_add_del_pool_addr_range(
1475             start_addr=server_nat_ip,
1476             end_addr=client_nat_ip,
1477             vrf_id=0xFFFFFFFF,
1478             is_add=1,
1479         )
1480         flags = self.config_flags.NAT_IS_INSIDE
1481         self.vapi.nat64_add_del_interface(
1482             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1483         )
1484         self.vapi.nat64_add_del_interface(
1485             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1486         )
1487
1488         self.vapi.nat64_add_del_static_bib(
1489             i_addr=server.ip6n,
1490             o_addr=server_nat_ip,
1491             i_port=server_tcp_in_port,
1492             o_port=server_tcp_out_port,
1493             proto=IP_PROTOS.tcp,
1494             vrf_id=0,
1495             is_add=1,
1496         )
1497
1498         self.vapi.nat64_add_del_static_bib(
1499             i_addr=server.ip6n,
1500             o_addr=server_nat_ip,
1501             i_port=0,
1502             o_port=0,
1503             proto=IP_PROTOS.gre,
1504             vrf_id=0,
1505             is_add=1,
1506         )
1507
1508         self.vapi.nat64_add_del_static_bib(
1509             i_addr=client.ip6n,
1510             o_addr=client_nat_ip,
1511             i_port=client_tcp_in_port,
1512             o_port=client_tcp_out_port,
1513             proto=IP_PROTOS.tcp,
1514             vrf_id=0,
1515             is_add=1,
1516         )
1517
1518         # client to server
1519         p = (
1520             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1521             / IPv6(src=client.ip6, dst=server_nat_ip6)
1522             / TCP(sport=client_tcp_in_port, dport=server_tcp_out_port)
1523         )
1524         self.pg0.add_stream(p)
1525         self.pg_enable_capture(self.pg_interfaces)
1526         self.pg_start()
1527         p = self.pg0.get_capture(1)
1528
1529         p = (
1530             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1531             / IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre)
1532             / GRE()
1533             / IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4)
1534             / TCP(sport=1234, dport=1234)
1535         )
1536         self.pg0.add_stream(p)
1537         self.pg_enable_capture(self.pg_interfaces)
1538         self.pg_start()
1539         p = self.pg0.get_capture(1)
1540         packet = p[0]
1541         try:
1542             self.assertEqual(packet[IPv6].src, client_nat_ip6)
1543             self.assertEqual(packet[IPv6].dst, server.ip6)
1544             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1545         except:
1546             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1547             raise
1548
1549         # server to client
1550         p = (
1551             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1552             / IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre)
1553             / GRE()
1554             / IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4)
1555             / TCP(sport=1234, dport=1234)
1556         )
1557         self.pg0.add_stream(p)
1558         self.pg_enable_capture(self.pg_interfaces)
1559         self.pg_start()
1560         p = self.pg0.get_capture(1)
1561         packet = p[0]
1562         try:
1563             self.assertEqual(packet[IPv6].src, server_nat_ip6)
1564             self.assertEqual(packet[IPv6].dst, client.ip6)
1565             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1566         except:
1567             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1568             raise
1569
1570     def test_one_armed_nat64(self):
1571         """One armed NAT64"""
1572         external_port = 0
1573         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4, "64:ff9b::", 96)
1574
1575         self.vapi.nat64_add_del_pool_addr_range(
1576             start_addr=self.nat_addr,
1577             end_addr=self.nat_addr,
1578             vrf_id=0xFFFFFFFF,
1579             is_add=1,
1580         )
1581         flags = self.config_flags.NAT_IS_INSIDE
1582         self.vapi.nat64_add_del_interface(
1583             is_add=1, flags=flags, sw_if_index=self.pg3.sw_if_index
1584         )
1585         self.vapi.nat64_add_del_interface(
1586             is_add=1, flags=0, sw_if_index=self.pg3.sw_if_index
1587         )
1588
1589         # in2out
1590         p = (
1591             Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac)
1592             / IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6)
1593             / TCP(sport=12345, dport=80)
1594         )
1595         self.pg3.add_stream(p)
1596         self.pg_enable_capture(self.pg_interfaces)
1597         self.pg_start()
1598         capture = self.pg3.get_capture(1)
1599         p = capture[0]
1600         try:
1601             ip = p[IP]
1602             tcp = p[TCP]
1603             self.assertEqual(ip.src, self.nat_addr)
1604             self.assertEqual(ip.dst, self.pg3.remote_ip4)
1605             self.assertNotEqual(tcp.sport, 12345)
1606             external_port = tcp.sport
1607             self.assertEqual(tcp.dport, 80)
1608             self.assert_packet_checksums_valid(p)
1609         except:
1610             self.logger.error(ppp("Unexpected or invalid packet:", p))
1611             raise
1612
1613         # out2in
1614         p = (
1615             Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac)
1616             / IP(src=self.pg3.remote_ip4, dst=self.nat_addr)
1617             / TCP(sport=80, dport=external_port)
1618         )
1619         self.pg3.add_stream(p)
1620         self.pg_enable_capture(self.pg_interfaces)
1621         self.pg_start()
1622         capture = self.pg3.get_capture(1)
1623         p = capture[0]
1624         try:
1625             ip = p[IPv6]
1626             tcp = p[TCP]
1627             self.assertEqual(ip.src, remote_host_ip6)
1628             self.assertEqual(ip.dst, self.pg3.remote_ip6)
1629             self.assertEqual(tcp.sport, 80)
1630             self.assertEqual(tcp.dport, 12345)
1631             self.assert_packet_checksums_valid(p)
1632         except:
1633             self.logger.error(ppp("Unexpected or invalid packet:", p))
1634             raise
1635
1636     def test_frag_in_order(self):
1637         """NAT64 translate fragments arriving in order"""
1638         self.tcp_port_in = random.randint(1025, 65535)
1639
1640         self.vapi.nat64_add_del_pool_addr_range(
1641             start_addr=self.nat_addr,
1642             end_addr=self.nat_addr,
1643             vrf_id=0xFFFFFFFF,
1644             is_add=1,
1645         )
1646         flags = self.config_flags.NAT_IS_INSIDE
1647         self.vapi.nat64_add_del_interface(
1648             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1649         )
1650         self.vapi.nat64_add_del_interface(
1651             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1652         )
1653
1654         # in2out
1655         data = b"a" * 200
1656         pkts = self.create_stream_frag_ip6(
1657             self.pg0, self.pg1.remote_ip4, self.tcp_port_in, 20, data
1658         )
1659         self.pg0.add_stream(pkts)
1660         self.pg_enable_capture(self.pg_interfaces)
1661         self.pg_start()
1662         frags = self.pg1.get_capture(len(pkts))
1663         p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
1664         self.assertEqual(p[TCP].dport, 20)
1665         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1666         self.tcp_port_out = p[TCP].sport
1667         self.assertEqual(data, p[Raw].load)
1668
1669         # out2in
1670         data = b"A" * 4 + b"b" * 16 + b"C" * 3
1671         pkts = self.create_stream_frag(
1672             self.pg1, self.nat_addr, 20, self.tcp_port_out, data
1673         )
1674         self.pg1.add_stream(pkts)
1675         self.pg_enable_capture(self.pg_interfaces)
1676         self.pg_start()
1677         frags = self.pg0.get_capture(len(pkts))
1678         self.logger.debug(ppc("Captured:", frags))
1679         src = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1680         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1681         self.assertEqual(p[TCP].sport, 20)
1682         self.assertEqual(p[TCP].dport, self.tcp_port_in)
1683         self.assertEqual(data, p[Raw].load)
1684
1685     def test_reass_hairpinning(self):
1686         """NAT64 fragments hairpinning"""
1687         data = b"a" * 200
1688         server = self.pg0.remote_hosts[1]
1689         server_in_port = random.randint(1025, 65535)
1690         server_out_port = random.randint(1025, 65535)
1691         client_in_port = random.randint(1025, 65535)
1692         ip = IPv6(src="".join(["64:ff9b::", self.nat_addr]))
1693         nat_addr_ip6 = ip.src
1694
1695         self.vapi.nat64_add_del_pool_addr_range(
1696             start_addr=self.nat_addr,
1697             end_addr=self.nat_addr,
1698             vrf_id=0xFFFFFFFF,
1699             is_add=1,
1700         )
1701         flags = self.config_flags.NAT_IS_INSIDE
1702         self.vapi.nat64_add_del_interface(
1703             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1704         )
1705         self.vapi.nat64_add_del_interface(
1706             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1707         )
1708
1709         # add static BIB entry for server
1710         self.vapi.nat64_add_del_static_bib(
1711             i_addr=server.ip6n,
1712             o_addr=self.nat_addr,
1713             i_port=server_in_port,
1714             o_port=server_out_port,
1715             proto=IP_PROTOS.tcp,
1716             vrf_id=0,
1717             is_add=1,
1718         )
1719
1720         # send packet from host to server
1721         pkts = self.create_stream_frag_ip6(
1722             self.pg0, self.nat_addr, client_in_port, server_out_port, data
1723         )
1724         self.pg0.add_stream(pkts)
1725         self.pg_enable_capture(self.pg_interfaces)
1726         self.pg_start()
1727         frags = self.pg0.get_capture(len(pkts))
1728         self.logger.debug(ppc("Captured:", frags))
1729         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
1730         self.assertNotEqual(p[TCP].sport, client_in_port)
1731         self.assertEqual(p[TCP].dport, server_in_port)
1732         self.assertEqual(data, p[Raw].load)
1733
1734     def test_frag_out_of_order(self):
1735         """NAT64 translate fragments arriving out of order"""
1736         self.tcp_port_in = random.randint(1025, 65535)
1737
1738         self.vapi.nat64_add_del_pool_addr_range(
1739             start_addr=self.nat_addr,
1740             end_addr=self.nat_addr,
1741             vrf_id=0xFFFFFFFF,
1742             is_add=1,
1743         )
1744         flags = self.config_flags.NAT_IS_INSIDE
1745         self.vapi.nat64_add_del_interface(
1746             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1747         )
1748         self.vapi.nat64_add_del_interface(
1749             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1750         )
1751
1752         # in2out
1753         data = b"a" * 200
1754         pkts = self.create_stream_frag_ip6(
1755             self.pg0, self.pg1.remote_ip4, self.tcp_port_in, 20, data
1756         )
1757         pkts.reverse()
1758         self.pg0.add_stream(pkts)
1759         self.pg_enable_capture(self.pg_interfaces)
1760         self.pg_start()
1761         frags = self.pg1.get_capture(len(pkts))
1762         p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
1763         self.assertEqual(p[TCP].dport, 20)
1764         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1765         self.tcp_port_out = p[TCP].sport
1766         self.assertEqual(data, p[Raw].load)
1767
1768         # out2in
1769         data = b"A" * 4 + b"B" * 16 + b"C" * 3
1770         pkts = self.create_stream_frag(
1771             self.pg1, self.nat_addr, 20, self.tcp_port_out, data
1772         )
1773         pkts.reverse()
1774         self.pg1.add_stream(pkts)
1775         self.pg_enable_capture(self.pg_interfaces)
1776         self.pg_start()
1777         frags = self.pg0.get_capture(len(pkts))
1778         src = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1779         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1780         self.assertEqual(p[TCP].sport, 20)
1781         self.assertEqual(p[TCP].dport, self.tcp_port_in)
1782         self.assertEqual(data, p[Raw].load)
1783
1784     def test_interface_addr(self):
1785         """Acquire NAT64 pool addresses from interface"""
1786         self.vapi.nat64_add_del_interface_addr(
1787             is_add=1, sw_if_index=self.pg4.sw_if_index
1788         )
1789
1790         # no address in NAT64 pool
1791         addresses = self.vapi.nat44_address_dump()
1792         self.assertEqual(0, len(addresses))
1793
1794         # configure interface address and check NAT64 address pool
1795         self.pg4.config_ip4()
1796         addresses = self.vapi.nat64_pool_addr_dump()
1797         self.assertEqual(len(addresses), 1)
1798
1799         self.assertEqual(str(addresses[0].address), self.pg4.local_ip4)
1800
1801         # remove interface address and check NAT64 address pool
1802         self.pg4.unconfig_ip4()
1803         addresses = self.vapi.nat64_pool_addr_dump()
1804         self.assertEqual(0, len(addresses))
1805
1806     @unittest.skipUnless(config.extended, "part of extended tests")
1807     def test_ipfix_max_bibs_sessions(self):
1808         """IPFIX logging maximum session and BIB entries exceeded"""
1809         max_bibs = 1280
1810         max_sessions = 2560
1811         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1812
1813         self.vapi.nat64_add_del_pool_addr_range(
1814             start_addr=self.nat_addr,
1815             end_addr=self.nat_addr,
1816             vrf_id=0xFFFFFFFF,
1817             is_add=1,
1818         )
1819         flags = self.config_flags.NAT_IS_INSIDE
1820         self.vapi.nat64_add_del_interface(
1821             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1822         )
1823         self.vapi.nat64_add_del_interface(
1824             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1825         )
1826
1827         pkts = []
1828         src = ""
1829         for i in range(0, max_bibs):
1830             src = "fd01:aa::%x" % (i)
1831             p = (
1832                 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1833                 / IPv6(src=src, dst=remote_host_ip6)
1834                 / TCP(sport=12345, dport=80)
1835             )
1836             pkts.append(p)
1837             p = (
1838                 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1839                 / IPv6(src=src, dst=remote_host_ip6)
1840                 / TCP(sport=12345, dport=22)
1841             )
1842             pkts.append(p)
1843         self.pg0.add_stream(pkts)
1844         self.pg_enable_capture(self.pg_interfaces)
1845         self.pg_start()
1846         self.pg1.get_capture(max_sessions)
1847
1848         self.vapi.set_ipfix_exporter(
1849             collector_address=self.pg3.remote_ip4,
1850             src_address=self.pg3.local_ip4,
1851             path_mtu=512,
1852             template_interval=10,
1853         )
1854         self.vapi.nat_ipfix_enable_disable(
1855             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
1856         )
1857
1858         p = (
1859             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1860             / IPv6(src=src, dst=remote_host_ip6)
1861             / TCP(sport=12345, dport=25)
1862         )
1863         self.pg0.add_stream(p)
1864         self.pg_enable_capture(self.pg_interfaces)
1865         self.pg_start()
1866         self.pg1.assert_nothing_captured()
1867         self.vapi.ipfix_flush()
1868         capture = self.pg3.get_capture(7)
1869         ipfix = IPFIXDecoder()
1870         # first load template
1871         for p in capture:
1872             self.assertTrue(p.haslayer(IPFIX))
1873             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1874             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1875             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1876             self.assertEqual(p[UDP].dport, 4739)
1877             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1878             if p.haslayer(Template):
1879                 ipfix.add_template(p.getlayer(Template))
1880         # verify events in data set
1881         for p in capture:
1882             if p.haslayer(Data):
1883                 data = ipfix.decode_data_set(p.getlayer(Set))
1884                 self.verify_ipfix_max_sessions(data, max_sessions)
1885
1886         p = (
1887             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1888             / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
1889             / TCP(sport=12345, dport=80)
1890         )
1891         self.pg0.add_stream(p)
1892         self.pg_enable_capture(self.pg_interfaces)
1893         self.pg_start()
1894         self.pg1.assert_nothing_captured()
1895         self.vapi.ipfix_flush()
1896         capture = self.pg3.get_capture(1)
1897         # verify events in data set
1898         for p in capture:
1899             self.assertTrue(p.haslayer(IPFIX))
1900             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1901             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1902             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1903             self.assertEqual(p[UDP].dport, 4739)
1904             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1905             if p.haslayer(Data):
1906                 data = ipfix.decode_data_set(p.getlayer(Set))
1907                 self.verify_ipfix_max_bibs(data, max_bibs)
1908
1909     def test_ipfix_bib_ses(self):
1910         """IPFIX logging NAT64 BIB/session create and delete events"""
1911         self.tcp_port_in = random.randint(1025, 65535)
1912         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1913
1914         self.vapi.nat64_add_del_pool_addr_range(
1915             start_addr=self.nat_addr,
1916             end_addr=self.nat_addr,
1917             vrf_id=0xFFFFFFFF,
1918             is_add=1,
1919         )
1920         flags = self.config_flags.NAT_IS_INSIDE
1921         self.vapi.nat64_add_del_interface(
1922             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1923         )
1924         self.vapi.nat64_add_del_interface(
1925             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1926         )
1927         self.vapi.set_ipfix_exporter(
1928             collector_address=self.pg3.remote_ip4,
1929             src_address=self.pg3.local_ip4,
1930             path_mtu=512,
1931             template_interval=10,
1932         )
1933         self.vapi.nat_ipfix_enable_disable(
1934             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
1935         )
1936
1937         # Create
1938         p = (
1939             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1940             / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
1941             / TCP(sport=self.tcp_port_in, dport=25)
1942         )
1943         self.pg0.add_stream(p)
1944         self.pg_enable_capture(self.pg_interfaces)
1945         self.pg_start()
1946         p = self.pg1.get_capture(1)
1947         self.tcp_port_out = p[0][TCP].sport
1948         self.vapi.ipfix_flush()
1949         capture = self.pg3.get_capture(8)
1950         ipfix = IPFIXDecoder()
1951         # first load template
1952         for p in capture:
1953             self.assertTrue(p.haslayer(IPFIX))
1954             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1955             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1956             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1957             self.assertEqual(p[UDP].dport, 4739)
1958             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1959             if p.haslayer(Template):
1960                 ipfix.add_template(p.getlayer(Template))
1961         # verify events in data set
1962         for p in capture:
1963             if p.haslayer(Data):
1964                 data = ipfix.decode_data_set(p.getlayer(Set))
1965                 if scapy.compat.orb(data[0][230]) == 10:
1966                     self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6)
1967                 elif scapy.compat.orb(data[0][230]) == 6:
1968                     self.verify_ipfix_nat64_ses(
1969                         data, 1, self.pg0.remote_ip6, self.pg1.remote_ip4, 25
1970                     )
1971                 else:
1972                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
1973
1974         # Delete
1975         self.pg_enable_capture(self.pg_interfaces)
1976         self.vapi.nat64_add_del_pool_addr_range(
1977             start_addr=self.nat_addr,
1978             end_addr=self.nat_addr,
1979             vrf_id=0xFFFFFFFF,
1980             is_add=0,
1981         )
1982         self.vapi.ipfix_flush()
1983         capture = self.pg3.get_capture(2)
1984         # verify events in data set
1985         for p in capture:
1986             self.assertTrue(p.haslayer(IPFIX))
1987             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1988             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1989             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1990             self.assertEqual(p[UDP].dport, 4739)
1991             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1992             if p.haslayer(Data):
1993                 data = ipfix.decode_data_set(p.getlayer(Set))
1994                 if scapy.compat.orb(data[0][230]) == 11:
1995                     self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6)
1996                 elif scapy.compat.orb(data[0][230]) == 7:
1997                     self.verify_ipfix_nat64_ses(
1998                         data, 0, self.pg0.remote_ip6, self.pg1.remote_ip4, 25
1999                     )
2000                 else:
2001                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
2002
2003     def test_syslog_sess(self):
2004         """Test syslog session creation and deletion"""
2005         self.tcp_port_in = random.randint(1025, 65535)
2006         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
2007
2008         self.vapi.nat64_add_del_pool_addr_range(
2009             start_addr=self.nat_addr,
2010             end_addr=self.nat_addr,
2011             vrf_id=0xFFFFFFFF,
2012             is_add=1,
2013         )
2014         flags = self.config_flags.NAT_IS_INSIDE
2015         self.vapi.nat64_add_del_interface(
2016             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
2017         )
2018         self.vapi.nat64_add_del_interface(
2019             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
2020         )
2021         self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2022         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2023
2024         p = (
2025             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2026             / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
2027             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
2028         )
2029         self.pg0.add_stream(p)
2030         self.pg_enable_capture(self.pg_interfaces)
2031         self.pg_start()
2032         p = self.pg1.get_capture(1)
2033         self.tcp_port_out = p[0][TCP].sport
2034         capture = self.pg3.get_capture(1)
2035         self.verify_syslog_sess(capture[0][Raw].load, is_ip6=True)
2036
2037         self.pg_enable_capture(self.pg_interfaces)
2038         self.pg_start()
2039         self.vapi.nat64_add_del_pool_addr_range(
2040             start_addr=self.nat_addr,
2041             end_addr=self.nat_addr,
2042             vrf_id=0xFFFFFFFF,
2043             is_add=0,
2044         )
2045         capture = self.pg3.get_capture(1)
2046         self.verify_syslog_sess(capture[0][Raw].load, False, True)
2047
2048     def nat64_get_ses_num(self):
2049         """
2050         Return number of active NAT64 sessions.
2051         """
2052         st = self.vapi.nat64_st_dump(proto=255)
2053         return len(st)
2054
2055     def clear_nat64(self):
2056         """
2057         Clear NAT64 configuration.
2058         """
2059         self.vapi.nat_ipfix_enable_disable(
2060             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
2061         )
2062         self.ipfix_src_port = 4739
2063         self.ipfix_domain_id = 1
2064
2065         self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
2066
2067         self.vapi.nat64_set_timeouts(
2068             udp=300, tcp_established=7440, tcp_transitory=240, icmp=60
2069         )
2070
2071         interfaces = self.vapi.nat64_interface_dump()
2072         for intf in interfaces:
2073             self.vapi.nat64_add_del_interface(
2074                 is_add=0, flags=intf.flags, sw_if_index=intf.sw_if_index
2075             )
2076
2077         bib = self.vapi.nat64_bib_dump(proto=255)
2078         for bibe in bib:
2079             if bibe.flags & self.config_flags.NAT_IS_STATIC:
2080                 self.vapi.nat64_add_del_static_bib(
2081                     i_addr=bibe.i_addr,
2082                     o_addr=bibe.o_addr,
2083                     i_port=bibe.i_port,
2084                     o_port=bibe.o_port,
2085                     proto=bibe.proto,
2086                     vrf_id=bibe.vrf_id,
2087                     is_add=0,
2088                 )
2089
2090         adresses = self.vapi.nat64_pool_addr_dump()
2091         for addr in adresses:
2092             self.vapi.nat64_add_del_pool_addr_range(
2093                 start_addr=addr.address,
2094                 end_addr=addr.address,
2095                 vrf_id=addr.vrf_id,
2096                 is_add=0,
2097             )
2098
2099         prefixes = self.vapi.nat64_prefix_dump()
2100         for prefix in prefixes:
2101             self.vapi.nat64_add_del_prefix(
2102                 prefix=str(prefix.prefix), vrf_id=prefix.vrf_id, is_add=0
2103             )
2104
2105         bibs = self.statistics.get_counter("/nat64/total-bibs")
2106         self.assertEqual(bibs[0][0], 0)
2107         sessions = self.statistics.get_counter("/nat64/total-sessions")
2108         self.assertEqual(sessions[0][0], 0)
2109
2110
2111 if __name__ == "__main__":
2112     unittest.main(testRunner=VppTestRunner)