ip-neighbor: do not use sas to determine NS source address
[vpp.git] / test / test_nat64.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import random
5 import socket
6 import struct
7 import unittest
8 from io import BytesIO
9
10 import scapy.compat
11 from config import config
12 from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, is_distro_ubuntu2204
13 from framework import VppTestCase, VppTestRunner
14 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
15 from scapy.data import IP_PROTOS
16 from scapy.layers.inet import IP, TCP, UDP, ICMP
17 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
18 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
19 from scapy.layers.inet6 import (
20     IPv6,
21     ICMPv6EchoRequest,
22     ICMPv6EchoReply,
23     ICMPv6ND_NS,
24     ICMPv6ND_NA,
25     ICMPv6NDOptDstLLAddr,
26     fragment6,
27 )
28 from scapy.layers.l2 import Ether, GRE
29 from scapy.packet import Raw
30 from syslog_rfc5424_parser import SyslogMessage, ParseError
31 from syslog_rfc5424_parser.constants import SyslogSeverity
32 from util import ppc, ppp
33 from vpp_papi import VppEnum
34
35
36 @tag_fixme_vpp_workers
37 @tag_fixme_ubuntu2204
38 class TestNAT64(VppTestCase):
39     """NAT64 Test Cases"""
40
41     @property
42     def SYSLOG_SEVERITY(self):
43         return VppEnum.vl_api_syslog_severity_t
44
45     @property
46     def config_flags(self):
47         return VppEnum.vl_api_nat_config_flags_t
48
49     @classmethod
50     def setUpClass(cls):
51         super(TestNAT64, cls).setUpClass()
52
53         if is_distro_ubuntu2204 == True and not hasattr(cls, "vpp"):
54             return
55         cls.tcp_port_in = 6303
56         cls.tcp_port_out = 6303
57         cls.udp_port_in = 6304
58         cls.udp_port_out = 6304
59         cls.icmp_id_in = 6305
60         cls.icmp_id_out = 6305
61         cls.tcp_external_port = 80
62         cls.nat_addr = "10.0.0.3"
63         cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
64         cls.vrf1_id = 10
65         cls.vrf1_nat_addr = "10.0.10.3"
66         cls.ipfix_src_port = 4739
67         cls.ipfix_domain_id = 1
68
69         cls.create_pg_interfaces(range(6))
70         cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
71         cls.ip6_interfaces.append(cls.pg_interfaces[2])
72         cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
73
74         cls.vapi.ip_table_add_del(
75             is_add=1, table={"table_id": cls.vrf1_id, "is_ip6": 1}
76         )
77
78         cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
79
80         cls.pg0.generate_remote_hosts(2)
81
82         for i in cls.ip6_interfaces:
83             i.admin_up()
84             i.config_ip6()
85             i.configure_ipv6_neighbors()
86
87         for i in cls.ip4_interfaces:
88             i.admin_up()
89             i.config_ip4()
90             i.resolve_arp()
91
92         cls.pg3.admin_up()
93         cls.pg3.config_ip4()
94         cls.pg3.resolve_arp()
95         cls.pg3.config_ip6()
96         cls.pg3.configure_ipv6_neighbors()
97
98         cls.pg5.admin_up()
99         cls.pg5.config_ip6()
100
101     @classmethod
102     def tearDownClass(cls):
103         super(TestNAT64, cls).tearDownClass()
104
105     def setUp(self):
106         super(TestNAT64, self).setUp()
107         self.vapi.nat64_plugin_enable_disable(enable=1, bib_buckets=128, st_buckets=256)
108
109     def tearDown(self):
110         super(TestNAT64, self).tearDown()
111         if not self.vpp_dead:
112             self.vapi.nat64_plugin_enable_disable(enable=0)
113
114     def show_commands_at_teardown(self):
115         self.logger.info(self.vapi.cli("show nat64 pool"))
116         self.logger.info(self.vapi.cli("show nat64 interfaces"))
117         self.logger.info(self.vapi.cli("show nat64 prefix"))
118         self.logger.info(self.vapi.cli("show nat64 bib all"))
119         self.logger.info(self.vapi.cli("show nat64 session table all"))
120
121     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
122         """
123         Create IPv6 packet stream for inside network
124
125         :param in_if: Inside interface
126         :param out_if: Outside interface
127         :param ttl: Hop Limit of generated packets
128         :param pref: NAT64 prefix
129         :param plen: NAT64 prefix length
130         """
131         pkts = []
132         if pref is None:
133             dst = "".join(["64:ff9b::", out_if.remote_ip4])
134         else:
135             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
136
137         # TCP
138         p = (
139             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
140             / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim)
141             / TCP(sport=self.tcp_port_in, dport=20)
142         )
143         pkts.append(p)
144
145         # UDP
146         p = (
147             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
148             / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim)
149             / UDP(sport=self.udp_port_in, dport=20)
150         )
151         pkts.append(p)
152
153         # ICMP
154         p = (
155             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
156             / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim)
157             / ICMPv6EchoRequest(id=self.icmp_id_in)
158         )
159         pkts.append(p)
160
161         return pkts
162
163     def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
164         """
165         Create packet stream for outside network
166
167         :param out_if: Outside interface
168         :param dst_ip: Destination IP address (Default use global NAT address)
169         :param ttl: TTL of generated packets
170         :param use_inside_ports: Use inside NAT ports as destination ports
171                instead of outside ports
172         """
173         if dst_ip is None:
174             dst_ip = self.nat_addr
175         if not use_inside_ports:
176             tcp_port = self.tcp_port_out
177             udp_port = self.udp_port_out
178             icmp_id = self.icmp_id_out
179         else:
180             tcp_port = self.tcp_port_in
181             udp_port = self.udp_port_in
182             icmp_id = self.icmp_id_in
183         pkts = []
184         # TCP
185         p = (
186             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
187             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
188             / TCP(dport=tcp_port, sport=20)
189         )
190         pkts.extend([p, p])
191
192         # UDP
193         p = (
194             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
195             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
196             / UDP(dport=udp_port, sport=20)
197         )
198         pkts.append(p)
199
200         # ICMP
201         p = (
202             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
203             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
204             / ICMP(id=icmp_id, type="echo-reply")
205         )
206         pkts.append(p)
207
208         return pkts
209
210     def verify_capture_out(
211         self,
212         capture,
213         nat_ip=None,
214         same_port=False,
215         dst_ip=None,
216         is_ip6=False,
217         ignore_port=False,
218     ):
219         """
220         Verify captured packets on outside network
221
222         :param capture: Captured packets
223         :param nat_ip: Translated IP address (Default use global NAT address)
224         :param same_port: Source port number is not translated (Default False)
225         :param dst_ip: Destination IP address (Default do not verify)
226         :param is_ip6: If L3 protocol is IPv6 (Default False)
227         """
228         if is_ip6:
229             IP46 = IPv6
230             ICMP46 = ICMPv6EchoRequest
231         else:
232             IP46 = IP
233             ICMP46 = ICMP
234         if nat_ip is None:
235             nat_ip = self.nat_addr
236         for packet in capture:
237             try:
238                 if not is_ip6:
239                     self.assert_packet_checksums_valid(packet)
240                 self.assertEqual(packet[IP46].src, nat_ip)
241                 if dst_ip is not None:
242                     self.assertEqual(packet[IP46].dst, dst_ip)
243                 if packet.haslayer(TCP):
244                     if not ignore_port:
245                         if same_port:
246                             self.assertEqual(packet[TCP].sport, self.tcp_port_in)
247                         else:
248                             self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
249                     self.tcp_port_out = packet[TCP].sport
250                     self.assert_packet_checksums_valid(packet)
251                 elif packet.haslayer(UDP):
252                     if not ignore_port:
253                         if same_port:
254                             self.assertEqual(packet[UDP].sport, self.udp_port_in)
255                         else:
256                             self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
257                     self.udp_port_out = packet[UDP].sport
258                 else:
259                     if not ignore_port:
260                         if same_port:
261                             self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
262                         else:
263                             self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
264                     self.icmp_id_out = packet[ICMP46].id
265                     self.assert_packet_checksums_valid(packet)
266             except:
267                 self.logger.error(
268                     ppp("Unexpected or invalid packet (outside network):", packet)
269                 )
270                 raise
271
272     def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
273         """
274         Verify captured IPv6 packets on inside network
275
276         :param capture: Captured packets
277         :param src_ip: Source IP
278         :param dst_ip: Destination IP address
279         """
280         for packet in capture:
281             try:
282                 self.assertEqual(packet[IPv6].src, src_ip)
283                 self.assertEqual(packet[IPv6].dst, dst_ip)
284                 self.assert_packet_checksums_valid(packet)
285                 if packet.haslayer(TCP):
286                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
287                 elif packet.haslayer(UDP):
288                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
289                 else:
290                     self.assertEqual(packet[ICMPv6EchoReply].id, self.icmp_id_in)
291             except:
292                 self.logger.error(
293                     ppp("Unexpected or invalid packet (inside network):", packet)
294                 )
295                 raise
296
297     def create_stream_frag(
298         self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
299     ):
300         """
301         Create fragmented packet stream
302
303         :param src_if: Source interface
304         :param dst: Destination IPv4 address
305         :param sport: Source port
306         :param dport: Destination port
307         :param data: Payload data
308         :param proto: protocol (TCP, UDP, ICMP)
309         :param echo_reply: use echo_reply if protocol is ICMP
310         :returns: Fragments
311         """
312         if proto == IP_PROTOS.tcp:
313             p = (
314                 IP(src=src_if.remote_ip4, dst=dst)
315                 / TCP(sport=sport, dport=dport)
316                 / Raw(data)
317             )
318             p = p.__class__(scapy.compat.raw(p))
319             chksum = p[TCP].chksum
320             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
321         elif proto == IP_PROTOS.udp:
322             proto_header = UDP(sport=sport, dport=dport)
323         elif proto == IP_PROTOS.icmp:
324             if not echo_reply:
325                 proto_header = ICMP(id=sport, type="echo-request")
326             else:
327                 proto_header = ICMP(id=sport, type="echo-reply")
328         else:
329             raise Exception("Unsupported protocol")
330         id = random.randint(0, 65535)
331         pkts = []
332         if proto == IP_PROTOS.tcp:
333             raw = Raw(data[0:4])
334         else:
335             raw = Raw(data[0:16])
336         p = (
337             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
338             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
339             / proto_header
340             / raw
341         )
342         pkts.append(p)
343         if proto == IP_PROTOS.tcp:
344             raw = Raw(data[4:20])
345         else:
346             raw = Raw(data[16:32])
347         p = (
348             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
349             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
350             / raw
351         )
352         pkts.append(p)
353         if proto == IP_PROTOS.tcp:
354             raw = Raw(data[20:])
355         else:
356             raw = Raw(data[32:])
357         p = (
358             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
359             / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
360             / raw
361         )
362         pkts.append(p)
363         return pkts
364
365     def create_stream_frag_ip6(
366         self, src_if, dst, sport, dport, data, pref=None, plen=0, frag_size=128
367     ):
368         """
369         Create fragmented packet stream
370
371         :param src_if: Source interface
372         :param dst: Destination IPv4 address
373         :param sport: Source TCP port
374         :param dport: Destination TCP port
375         :param data: Payload data
376         :param pref: NAT64 prefix
377         :param plen: NAT64 prefix length
378         :param fragsize: size of fragments
379         :returns: Fragments
380         """
381         if pref is None:
382             dst_ip6 = "".join(["64:ff9b::", dst])
383         else:
384             dst_ip6 = self.compose_ip6(dst, pref, plen)
385
386         p = (
387             Ether(dst=src_if.local_mac, src=src_if.remote_mac)
388             / IPv6(src=src_if.remote_ip6, dst=dst_ip6)
389             / IPv6ExtHdrFragment(id=random.randint(0, 65535))
390             / TCP(sport=sport, dport=dport)
391             / Raw(data)
392         )
393
394         return fragment6(p, frag_size)
395
396     def reass_frags_and_verify(self, frags, src, dst):
397         """
398         Reassemble and verify fragmented packet
399
400         :param frags: Captured fragments
401         :param src: Source IPv4 address to verify
402         :param dst: Destination IPv4 address to verify
403
404         :returns: Reassembled IPv4 packet
405         """
406         buffer = BytesIO()
407         for p in frags:
408             self.assertEqual(p[IP].src, src)
409             self.assertEqual(p[IP].dst, dst)
410             self.assert_ip_checksum_valid(p)
411             buffer.seek(p[IP].frag * 8)
412             buffer.write(bytes(p[IP].payload))
413         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
414         if ip.proto == IP_PROTOS.tcp:
415             p = ip / TCP(buffer.getvalue())
416             self.logger.debug(ppp("Reassembled:", p))
417             self.assert_tcp_checksum_valid(p)
418         elif ip.proto == IP_PROTOS.udp:
419             p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
420         elif ip.proto == IP_PROTOS.icmp:
421             p = ip / ICMP(buffer.getvalue())
422         return p
423
424     def reass_frags_and_verify_ip6(self, frags, src, dst):
425         """
426         Reassemble and verify fragmented packet
427
428         :param frags: Captured fragments
429         :param src: Source IPv6 address to verify
430         :param dst: Destination IPv6 address to verify
431
432         :returns: Reassembled IPv6 packet
433         """
434         buffer = BytesIO()
435         for p in frags:
436             self.assertEqual(p[IPv6].src, src)
437             self.assertEqual(p[IPv6].dst, dst)
438             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
439             buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
440         ip = IPv6(
441             src=frags[0][IPv6].src,
442             dst=frags[0][IPv6].dst,
443             nh=frags[0][IPv6ExtHdrFragment].nh,
444         )
445         if ip.nh == IP_PROTOS.tcp:
446             p = ip / TCP(buffer.getvalue())
447         elif ip.nh == IP_PROTOS.udp:
448             p = ip / UDP(buffer.getvalue())
449         self.logger.debug(ppp("Reassembled:", p))
450         self.assert_packet_checksums_valid(p)
451         return p
452
453     def verify_ipfix_max_bibs(self, data, limit):
454         """
455         Verify IPFIX maximum BIB entries exceeded event
456
457         :param data: Decoded IPFIX data records
458         :param limit: Number of maximum BIB entries that can be created.
459         """
460         self.assertEqual(1, len(data))
461         record = data[0]
462         # natEvent
463         self.assertEqual(scapy.compat.orb(record[230]), 13)
464         # natQuotaExceededEvent
465         self.assertEqual(struct.pack("!I", 2), record[466])
466         # maxBIBEntries
467         self.assertEqual(struct.pack("!I", limit), record[472])
468         return len(data)
469
470     def verify_ipfix_bib(self, data, is_create, src_addr):
471         """
472         Verify IPFIX NAT64 BIB create and delete events
473
474         :param data: Decoded IPFIX data records
475         :param is_create: Create event if nonzero value otherwise delete event
476         :param src_addr: IPv6 source address
477         """
478         self.assertEqual(1, len(data))
479         record = data[0]
480         # natEvent
481         if is_create:
482             self.assertEqual(scapy.compat.orb(record[230]), 10)
483         else:
484             self.assertEqual(scapy.compat.orb(record[230]), 11)
485         # sourceIPv6Address
486         self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
487         # postNATSourceIPv4Address
488         self.assertEqual(self.nat_addr_n, record[225])
489         # protocolIdentifier
490         self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
491         # ingressVRFID
492         self.assertEqual(struct.pack("!I", 0), record[234])
493         # sourceTransportPort
494         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
495         # postNAPTSourceTransportPort
496         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
497
498     def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr, dst_port):
499         """
500         Verify IPFIX NAT64 session create and delete events
501
502         :param data: Decoded IPFIX data records
503         :param is_create: Create event if nonzero value otherwise delete event
504         :param src_addr: IPv6 source address
505         :param dst_addr: IPv4 destination address
506         :param dst_port: destination TCP port
507         """
508         self.assertEqual(1, len(data))
509         record = data[0]
510         # natEvent
511         if is_create:
512             self.assertEqual(scapy.compat.orb(record[230]), 6)
513         else:
514             self.assertEqual(scapy.compat.orb(record[230]), 7)
515         # sourceIPv6Address
516         self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
517         # destinationIPv6Address
518         self.assertEqual(
519             socket.inet_pton(
520                 socket.AF_INET6, self.compose_ip6(dst_addr, "64:ff9b::", 96)
521             ),
522             record[28],
523         )
524         # postNATSourceIPv4Address
525         self.assertEqual(self.nat_addr_n, record[225])
526         # postNATDestinationIPv4Address
527         self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr), record[226])
528         # protocolIdentifier
529         self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
530         # ingressVRFID
531         self.assertEqual(struct.pack("!I", 0), record[234])
532         # sourceTransportPort
533         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
534         # postNAPTSourceTransportPort
535         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
536         # destinationTransportPort
537         self.assertEqual(struct.pack("!H", dst_port), record[11])
538         # postNAPTDestinationTransportPort
539         self.assertEqual(struct.pack("!H", dst_port), record[228])
540
541     def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
542         message = data.decode("utf-8")
543         try:
544             message = SyslogMessage.parse(message)
545         except ParseError as e:
546             self.logger.error(e)
547             raise
548         else:
549             self.assertEqual(message.severity, SyslogSeverity.info)
550             self.assertEqual(message.appname, "NAT")
551             self.assertEqual(message.msgid, "SADD" if is_add else "SDEL")
552             sd_params = message.sd.get("nsess")
553             self.assertTrue(sd_params is not None)
554             if is_ip6:
555                 self.assertEqual(sd_params.get("IATYP"), "IPv6")
556                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip6)
557             else:
558                 self.assertEqual(sd_params.get("IATYP"), "IPv4")
559                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
560                 self.assertTrue(sd_params.get("SSUBIX") is not None)
561             self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
562             self.assertEqual(sd_params.get("XATYP"), "IPv4")
563             self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
564             self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
565             self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
566             self.assertEqual(sd_params.get("SVLAN"), "0")
567             self.assertEqual(sd_params.get("XDADDR"), self.pg1.remote_ip4)
568             self.assertEqual(sd_params.get("XDPORT"), "%d" % self.tcp_external_port)
569
570     def compose_ip6(self, ip4, pref, plen):
571         """
572         Compose IPv4-embedded IPv6 addresses
573
574         :param ip4: IPv4 address
575         :param pref: IPv6 prefix
576         :param plen: IPv6 prefix length
577         :returns: IPv4-embedded IPv6 addresses
578         """
579         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
580         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
581         if plen == 32:
582             pref_n[4] = ip4_n[0]
583             pref_n[5] = ip4_n[1]
584             pref_n[6] = ip4_n[2]
585             pref_n[7] = ip4_n[3]
586         elif plen == 40:
587             pref_n[5] = ip4_n[0]
588             pref_n[6] = ip4_n[1]
589             pref_n[7] = ip4_n[2]
590             pref_n[9] = ip4_n[3]
591         elif plen == 48:
592             pref_n[6] = ip4_n[0]
593             pref_n[7] = ip4_n[1]
594             pref_n[9] = ip4_n[2]
595             pref_n[10] = ip4_n[3]
596         elif plen == 56:
597             pref_n[7] = ip4_n[0]
598             pref_n[9] = ip4_n[1]
599             pref_n[10] = ip4_n[2]
600             pref_n[11] = ip4_n[3]
601         elif plen == 64:
602             pref_n[9] = ip4_n[0]
603             pref_n[10] = ip4_n[1]
604             pref_n[11] = ip4_n[2]
605             pref_n[12] = ip4_n[3]
606         elif plen == 96:
607             pref_n[12] = ip4_n[0]
608             pref_n[13] = ip4_n[1]
609             pref_n[14] = ip4_n[2]
610             pref_n[15] = ip4_n[3]
611         packed_pref_n = b"".join([scapy.compat.chb(x) for x in pref_n])
612         return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
613
614     def verify_ipfix_max_sessions(self, data, limit):
615         """
616         Verify IPFIX maximum session entries exceeded event
617
618         :param data: Decoded IPFIX data records
619         :param limit: Number of maximum session entries that can be created.
620         """
621         self.assertEqual(1, len(data))
622         record = data[0]
623         # natEvent
624         self.assertEqual(scapy.compat.orb(record[230]), 13)
625         # natQuotaExceededEvent
626         self.assertEqual(struct.pack("!I", 1), record[466])
627         # maxSessionEntries
628         self.assertEqual(struct.pack("!I", limit), record[471])
629         return len(data)
630
631     def test_nat64_inside_interface_handles_neighbor_advertisement(self):
632         """NAT64 inside interface handles Neighbor Advertisement"""
633
634         flags = self.config_flags.NAT_IS_INSIDE
635         self.vapi.nat64_add_del_interface(
636             is_add=1, flags=flags, sw_if_index=self.pg5.sw_if_index
637         )
638
639         # Try to send ping
640         ping = (
641             Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac)
642             / IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6)
643             / ICMPv6EchoRequest()
644         )
645         pkts = [ping]
646         self.pg5.add_stream(pkts)
647         self.pg_enable_capture(self.pg_interfaces)
648         self.pg_start()
649
650         # Wait for Neighbor Solicitation
651         capture = self.pg5.get_capture(len(pkts))
652         packet = capture[0]
653         try:
654             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6_ll)
655             self.assertEqual(packet.haslayer(ICMPv6ND_NS), 1)
656             tgt = packet[ICMPv6ND_NS].tgt
657         except:
658             self.logger.error(ppp("Unexpected or invalid packet:", packet))
659             raise
660
661         # Send Neighbor Advertisement
662         p = (
663             Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac)
664             / IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6)
665             / ICMPv6ND_NA(tgt=tgt)
666             / ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac)
667         )
668         pkts = [p]
669         self.pg5.add_stream(pkts)
670         self.pg_enable_capture(self.pg_interfaces)
671         self.pg_start()
672
673         # Try to send ping again
674         pkts = [ping]
675         self.pg5.add_stream(pkts)
676         self.pg_enable_capture(self.pg_interfaces)
677         self.pg_start()
678
679         # Wait for ping reply
680         capture = self.pg5.get_capture(len(pkts))
681         packet = capture[0]
682         try:
683             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
684             self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
685             self.assertEqual(packet.haslayer(ICMPv6EchoReply), 1)
686         except:
687             self.logger.error(ppp("Unexpected or invalid packet:", packet))
688             raise
689
690     def test_pool(self):
691         """Add/delete address to NAT64 pool"""
692         nat_addr = "1.2.3.4"
693
694         self.vapi.nat64_add_del_pool_addr_range(
695             start_addr=nat_addr, end_addr=nat_addr, vrf_id=0xFFFFFFFF, is_add=1
696         )
697
698         addresses = self.vapi.nat64_pool_addr_dump()
699         self.assertEqual(len(addresses), 1)
700         self.assertEqual(str(addresses[0].address), nat_addr)
701
702         self.vapi.nat64_add_del_pool_addr_range(
703             start_addr=nat_addr, end_addr=nat_addr, vrf_id=0xFFFFFFFF, is_add=0
704         )
705
706         addresses = self.vapi.nat64_pool_addr_dump()
707         self.assertEqual(len(addresses), 0)
708
709     def test_interface(self):
710         """Enable/disable NAT64 feature on the interface"""
711         flags = self.config_flags.NAT_IS_INSIDE
712         self.vapi.nat64_add_del_interface(
713             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
714         )
715         self.vapi.nat64_add_del_interface(
716             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
717         )
718
719         interfaces = self.vapi.nat64_interface_dump()
720         self.assertEqual(len(interfaces), 2)
721         pg0_found = False
722         pg1_found = False
723         for intf in interfaces:
724             if intf.sw_if_index == self.pg0.sw_if_index:
725                 self.assertEqual(intf.flags, self.config_flags.NAT_IS_INSIDE)
726                 pg0_found = True
727             elif intf.sw_if_index == self.pg1.sw_if_index:
728                 self.assertEqual(intf.flags, self.config_flags.NAT_IS_OUTSIDE)
729                 pg1_found = True
730         self.assertTrue(pg0_found)
731         self.assertTrue(pg1_found)
732
733         features = self.vapi.cli("show interface features pg0")
734         self.assertIn("nat64-in2out", features)
735         features = self.vapi.cli("show interface features pg1")
736         self.assertIn("nat64-out2in", features)
737
738         self.vapi.nat64_add_del_interface(
739             is_add=0, flags=flags, sw_if_index=self.pg0.sw_if_index
740         )
741         self.vapi.nat64_add_del_interface(
742             is_add=0, flags=flags, sw_if_index=self.pg1.sw_if_index
743         )
744
745         interfaces = self.vapi.nat64_interface_dump()
746         self.assertEqual(len(interfaces), 0)
747
748     def test_static_bib(self):
749         """Add/delete static BIB entry"""
750         in_addr = "2001:db8:85a3::8a2e:370:7334"
751         out_addr = "10.1.1.3"
752         in_port = 1234
753         out_port = 5678
754         proto = IP_PROTOS.tcp
755
756         self.vapi.nat64_add_del_static_bib(
757             i_addr=in_addr,
758             o_addr=out_addr,
759             i_port=in_port,
760             o_port=out_port,
761             proto=proto,
762             vrf_id=0,
763             is_add=1,
764         )
765         bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
766         static_bib_num = 0
767         for bibe in bib:
768             if bibe.flags & self.config_flags.NAT_IS_STATIC:
769                 static_bib_num += 1
770                 self.assertEqual(str(bibe.i_addr), in_addr)
771                 self.assertEqual(str(bibe.o_addr), out_addr)
772                 self.assertEqual(bibe.i_port, in_port)
773                 self.assertEqual(bibe.o_port, out_port)
774         self.assertEqual(static_bib_num, 1)
775         bibs = self.statistics.get_counter("/nat64/total-bibs")
776         self.assertEqual(bibs[0][0], 1)
777
778         self.vapi.nat64_add_del_static_bib(
779             i_addr=in_addr,
780             o_addr=out_addr,
781             i_port=in_port,
782             o_port=out_port,
783             proto=proto,
784             vrf_id=0,
785             is_add=0,
786         )
787         bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
788         static_bib_num = 0
789         for bibe in bib:
790             if bibe.flags & self.config_flags.NAT_IS_STATIC:
791                 static_bib_num += 1
792         self.assertEqual(static_bib_num, 0)
793         bibs = self.statistics.get_counter("/nat64/total-bibs")
794         self.assertEqual(bibs[0][0], 0)
795
796     def test_set_timeouts(self):
797         """Set NAT64 timeouts"""
798         # verify default values
799         timeouts = self.vapi.nat64_get_timeouts()
800         self.assertEqual(timeouts.udp, 300)
801         self.assertEqual(timeouts.icmp, 60)
802         self.assertEqual(timeouts.tcp_transitory, 240)
803         self.assertEqual(timeouts.tcp_established, 7440)
804
805         # set and verify custom values
806         self.vapi.nat64_set_timeouts(
807             udp=200, tcp_established=7450, tcp_transitory=250, icmp=30
808         )
809         timeouts = self.vapi.nat64_get_timeouts()
810         self.assertEqual(timeouts.udp, 200)
811         self.assertEqual(timeouts.icmp, 30)
812         self.assertEqual(timeouts.tcp_transitory, 250)
813         self.assertEqual(timeouts.tcp_established, 7450)
814
815     def test_dynamic(self):
816         """NAT64 dynamic translation test"""
817         self.tcp_port_in = 6303
818         self.udp_port_in = 6304
819         self.icmp_id_in = 6305
820
821         ses_num_start = self.nat64_get_ses_num()
822
823         self.vapi.nat64_add_del_pool_addr_range(
824             start_addr=self.nat_addr,
825             end_addr=self.nat_addr,
826             vrf_id=0xFFFFFFFF,
827             is_add=1,
828         )
829         flags = self.config_flags.NAT_IS_INSIDE
830         self.vapi.nat64_add_del_interface(
831             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
832         )
833         self.vapi.nat64_add_del_interface(
834             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
835         )
836
837         # in2out
838         tcpn = self.statistics.get_counter("/nat64/in2out/tcp")[0]
839         udpn = self.statistics.get_counter("/nat64/in2out/udp")[0]
840         icmpn = self.statistics.get_counter("/nat64/in2out/icmp")[0]
841         drops = self.statistics.get_counter("/nat64/in2out/drops")[0]
842
843         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
844         self.pg0.add_stream(pkts)
845         self.pg_enable_capture(self.pg_interfaces)
846         self.pg_start()
847         capture = self.pg1.get_capture(len(pkts))
848         self.verify_capture_out(
849             capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
850         )
851
852         if_idx = self.pg0.sw_if_index
853         cnt = self.statistics.get_counter("/nat64/in2out/tcp")[0]
854         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 1)
855         cnt = self.statistics.get_counter("/nat64/in2out/udp")[0]
856         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
857         cnt = self.statistics.get_counter("/nat64/in2out/icmp")[0]
858         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
859         cnt = self.statistics.get_counter("/nat64/in2out/drops")[0]
860         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
861
862         # out2in
863         tcpn = self.statistics.get_counter("/nat64/out2in/tcp")[0]
864         udpn = self.statistics.get_counter("/nat64/out2in/udp")[0]
865         icmpn = self.statistics.get_counter("/nat64/out2in/icmp")[0]
866         drops = self.statistics.get_counter("/nat64/out2in/drops")[0]
867
868         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
869         self.pg1.add_stream(pkts)
870         self.pg_enable_capture(self.pg_interfaces)
871         self.pg_start()
872         capture = self.pg0.get_capture(len(pkts))
873         ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4]))
874         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
875
876         if_idx = self.pg1.sw_if_index
877         cnt = self.statistics.get_counter("/nat64/out2in/tcp")[0]
878         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
879         cnt = self.statistics.get_counter("/nat64/out2in/udp")[0]
880         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
881         cnt = self.statistics.get_counter("/nat64/out2in/icmp")[0]
882         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
883         cnt = self.statistics.get_counter("/nat64/out2in/drops")[0]
884         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
885
886         bibs = self.statistics.get_counter("/nat64/total-bibs")
887         self.assertEqual(bibs[0][0], 3)
888         sessions = self.statistics.get_counter("/nat64/total-sessions")
889         self.assertEqual(sessions[0][0], 3)
890
891         # in2out
892         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
893         self.pg0.add_stream(pkts)
894         self.pg_enable_capture(self.pg_interfaces)
895         self.pg_start()
896         capture = self.pg1.get_capture(len(pkts))
897         self.verify_capture_out(
898             capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
899         )
900
901         # out2in
902         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
903         self.pg1.add_stream(pkts)
904         self.pg_enable_capture(self.pg_interfaces)
905         self.pg_start()
906         capture = self.pg0.get_capture(len(pkts))
907         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
908
909         ses_num_end = self.nat64_get_ses_num()
910
911         self.assertEqual(ses_num_end - ses_num_start, 3)
912
913         # tenant with specific VRF
914         self.vapi.nat64_add_del_pool_addr_range(
915             start_addr=self.vrf1_nat_addr,
916             end_addr=self.vrf1_nat_addr,
917             vrf_id=self.vrf1_id,
918             is_add=1,
919         )
920         flags = self.config_flags.NAT_IS_INSIDE
921         self.vapi.nat64_add_del_interface(
922             is_add=1, flags=flags, sw_if_index=self.pg2.sw_if_index
923         )
924
925         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
926         self.pg2.add_stream(pkts)
927         self.pg_enable_capture(self.pg_interfaces)
928         self.pg_start()
929         capture = self.pg1.get_capture(len(pkts))
930         self.verify_capture_out(
931             capture, nat_ip=self.vrf1_nat_addr, dst_ip=self.pg1.remote_ip4
932         )
933
934         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
935         self.pg1.add_stream(pkts)
936         self.pg_enable_capture(self.pg_interfaces)
937         self.pg_start()
938         capture = self.pg2.get_capture(len(pkts))
939         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
940
941     def test_static(self):
942         """NAT64 static translation test"""
943         self.tcp_port_in = 60303
944         self.udp_port_in = 60304
945         self.icmp_id_in = 60305
946         self.tcp_port_out = 60303
947         self.udp_port_out = 60304
948         self.icmp_id_out = 60305
949
950         ses_num_start = self.nat64_get_ses_num()
951
952         self.vapi.nat64_add_del_pool_addr_range(
953             start_addr=self.nat_addr,
954             end_addr=self.nat_addr,
955             vrf_id=0xFFFFFFFF,
956             is_add=1,
957         )
958         flags = self.config_flags.NAT_IS_INSIDE
959         self.vapi.nat64_add_del_interface(
960             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
961         )
962         self.vapi.nat64_add_del_interface(
963             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
964         )
965
966         self.vapi.nat64_add_del_static_bib(
967             i_addr=self.pg0.remote_ip6,
968             o_addr=self.nat_addr,
969             i_port=self.tcp_port_in,
970             o_port=self.tcp_port_out,
971             proto=IP_PROTOS.tcp,
972             vrf_id=0,
973             is_add=1,
974         )
975         self.vapi.nat64_add_del_static_bib(
976             i_addr=self.pg0.remote_ip6,
977             o_addr=self.nat_addr,
978             i_port=self.udp_port_in,
979             o_port=self.udp_port_out,
980             proto=IP_PROTOS.udp,
981             vrf_id=0,
982             is_add=1,
983         )
984         self.vapi.nat64_add_del_static_bib(
985             i_addr=self.pg0.remote_ip6,
986             o_addr=self.nat_addr,
987             i_port=self.icmp_id_in,
988             o_port=self.icmp_id_out,
989             proto=IP_PROTOS.icmp,
990             vrf_id=0,
991             is_add=1,
992         )
993
994         # in2out
995         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
996         self.pg0.add_stream(pkts)
997         self.pg_enable_capture(self.pg_interfaces)
998         self.pg_start()
999         capture = self.pg1.get_capture(len(pkts))
1000         self.verify_capture_out(
1001             capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4, same_port=True
1002         )
1003
1004         # out2in
1005         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1006         self.pg1.add_stream(pkts)
1007         self.pg_enable_capture(self.pg_interfaces)
1008         self.pg_start()
1009         capture = self.pg0.get_capture(len(pkts))
1010         ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4]))
1011         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
1012
1013         ses_num_end = self.nat64_get_ses_num()
1014
1015         self.assertEqual(ses_num_end - ses_num_start, 3)
1016
1017     def test_session_timeout(self):
1018         """NAT64 session timeout"""
1019         self.icmp_id_in = 1234
1020         self.vapi.nat64_add_del_pool_addr_range(
1021             start_addr=self.nat_addr,
1022             end_addr=self.nat_addr,
1023             vrf_id=0xFFFFFFFF,
1024             is_add=1,
1025         )
1026         flags = self.config_flags.NAT_IS_INSIDE
1027         self.vapi.nat64_add_del_interface(
1028             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1029         )
1030         self.vapi.nat64_add_del_interface(
1031             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1032         )
1033         self.vapi.nat64_set_timeouts(
1034             udp=300, tcp_established=5, tcp_transitory=5, icmp=5
1035         )
1036
1037         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
1038         self.pg0.add_stream(pkts)
1039         self.pg_enable_capture(self.pg_interfaces)
1040         self.pg_start()
1041         capture = self.pg1.get_capture(len(pkts))
1042
1043         ses_num_before_timeout = self.nat64_get_ses_num()
1044
1045         self.virtual_sleep(15)
1046
1047         # ICMP and TCP session after timeout
1048         ses_num_after_timeout = self.nat64_get_ses_num()
1049         self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
1050
1051     def test_icmp_error(self):
1052         """NAT64 ICMP Error message translation"""
1053         self.tcp_port_in = 6303
1054         self.udp_port_in = 6304
1055         self.icmp_id_in = 6305
1056
1057         self.vapi.nat64_add_del_pool_addr_range(
1058             start_addr=self.nat_addr,
1059             end_addr=self.nat_addr,
1060             vrf_id=0xFFFFFFFF,
1061             is_add=1,
1062         )
1063         flags = self.config_flags.NAT_IS_INSIDE
1064         self.vapi.nat64_add_del_interface(
1065             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1066         )
1067         self.vapi.nat64_add_del_interface(
1068             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1069         )
1070
1071         # send some packets to create sessions
1072         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
1073         self.pg0.add_stream(pkts)
1074         self.pg_enable_capture(self.pg_interfaces)
1075         self.pg_start()
1076         capture_ip4 = self.pg1.get_capture(len(pkts))
1077         self.verify_capture_out(
1078             capture_ip4, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
1079         )
1080
1081         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1082         self.pg1.add_stream(pkts)
1083         self.pg_enable_capture(self.pg_interfaces)
1084         self.pg_start()
1085         capture_ip6 = self.pg0.get_capture(len(pkts))
1086         ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4]))
1087         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src, self.pg0.remote_ip6)
1088
1089         # in2out
1090         pkts = [
1091             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1092             / IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src)
1093             / ICMPv6DestUnreach(code=1)
1094             / packet[IPv6]
1095             for packet in capture_ip6
1096         ]
1097         self.pg0.add_stream(pkts)
1098         self.pg_enable_capture(self.pg_interfaces)
1099         self.pg_start()
1100         capture = self.pg1.get_capture(len(pkts))
1101         for packet in capture:
1102             try:
1103                 self.assertEqual(packet[IP].src, self.nat_addr)
1104                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1105                 self.assertEqual(packet[ICMP].type, 3)
1106                 self.assertEqual(packet[ICMP].code, 13)
1107                 inner = packet[IPerror]
1108                 self.assertEqual(inner.src, self.pg1.remote_ip4)
1109                 self.assertEqual(inner.dst, self.nat_addr)
1110                 self.assert_packet_checksums_valid(packet)
1111                 if inner.haslayer(TCPerror):
1112                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
1113                 elif inner.haslayer(UDPerror):
1114                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
1115                 else:
1116                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
1117             except:
1118                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1119                 raise
1120
1121         # out2in
1122         pkts = [
1123             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1124             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1125             / ICMP(type=3, code=13)
1126             / packet[IP]
1127             for packet in capture_ip4
1128         ]
1129         self.pg1.add_stream(pkts)
1130         self.pg_enable_capture(self.pg_interfaces)
1131         self.pg_start()
1132         capture = self.pg0.get_capture(len(pkts))
1133         for packet in capture:
1134             try:
1135                 self.assertEqual(packet[IPv6].src, ip.src)
1136                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1137                 icmp = packet[ICMPv6DestUnreach]
1138                 self.assertEqual(icmp.code, 1)
1139                 inner = icmp[IPerror6]
1140                 self.assertEqual(inner.src, self.pg0.remote_ip6)
1141                 self.assertEqual(inner.dst, ip.src)
1142                 self.assert_icmpv6_checksum_valid(packet)
1143                 if inner.haslayer(TCPerror):
1144                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
1145                 elif inner.haslayer(UDPerror):
1146                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
1147                 else:
1148                     self.assertEqual(inner[ICMPv6EchoRequest].id, self.icmp_id_in)
1149             except:
1150                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1151                 raise
1152
1153     def test_hairpinning(self):
1154         """NAT64 hairpinning"""
1155
1156         client = self.pg0.remote_hosts[0]
1157         server = self.pg0.remote_hosts[1]
1158         server_tcp_in_port = 22
1159         server_tcp_out_port = 4022
1160         server_udp_in_port = 23
1161         server_udp_out_port = 4023
1162         client_tcp_in_port = 1234
1163         client_udp_in_port = 1235
1164         client_tcp_out_port = 0
1165         client_udp_out_port = 0
1166         ip = IPv6(src="".join(["64:ff9b::", self.nat_addr]))
1167         nat_addr_ip6 = ip.src
1168
1169         self.vapi.nat64_add_del_pool_addr_range(
1170             start_addr=self.nat_addr,
1171             end_addr=self.nat_addr,
1172             vrf_id=0xFFFFFFFF,
1173             is_add=1,
1174         )
1175         flags = self.config_flags.NAT_IS_INSIDE
1176         self.vapi.nat64_add_del_interface(
1177             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1178         )
1179         self.vapi.nat64_add_del_interface(
1180             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1181         )
1182
1183         self.vapi.nat64_add_del_static_bib(
1184             i_addr=server.ip6n,
1185             o_addr=self.nat_addr,
1186             i_port=server_tcp_in_port,
1187             o_port=server_tcp_out_port,
1188             proto=IP_PROTOS.tcp,
1189             vrf_id=0,
1190             is_add=1,
1191         )
1192         self.vapi.nat64_add_del_static_bib(
1193             i_addr=server.ip6n,
1194             o_addr=self.nat_addr,
1195             i_port=server_udp_in_port,
1196             o_port=server_udp_out_port,
1197             proto=IP_PROTOS.udp,
1198             vrf_id=0,
1199             is_add=1,
1200         )
1201
1202         # client to server
1203         pkts = []
1204         p = (
1205             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1206             / IPv6(src=client.ip6, dst=nat_addr_ip6)
1207             / TCP(sport=client_tcp_in_port, dport=server_tcp_out_port)
1208         )
1209         pkts.append(p)
1210         p = (
1211             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1212             / IPv6(src=client.ip6, dst=nat_addr_ip6)
1213             / UDP(sport=client_udp_in_port, dport=server_udp_out_port)
1214         )
1215         pkts.append(p)
1216         self.pg0.add_stream(pkts)
1217         self.pg_enable_capture(self.pg_interfaces)
1218         self.pg_start()
1219         capture = self.pg0.get_capture(len(pkts))
1220         for packet in capture:
1221             try:
1222                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1223                 self.assertEqual(packet[IPv6].dst, server.ip6)
1224                 self.assert_packet_checksums_valid(packet)
1225                 if packet.haslayer(TCP):
1226                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
1227                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
1228                     client_tcp_out_port = packet[TCP].sport
1229                 else:
1230                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
1231                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
1232                     client_udp_out_port = packet[UDP].sport
1233             except:
1234                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1235                 raise
1236
1237         # server to client
1238         pkts = []
1239         p = (
1240             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1241             / IPv6(src=server.ip6, dst=nat_addr_ip6)
1242             / TCP(sport=server_tcp_in_port, dport=client_tcp_out_port)
1243         )
1244         pkts.append(p)
1245         p = (
1246             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1247             / IPv6(src=server.ip6, dst=nat_addr_ip6)
1248             / UDP(sport=server_udp_in_port, dport=client_udp_out_port)
1249         )
1250         pkts.append(p)
1251         self.pg0.add_stream(pkts)
1252         self.pg_enable_capture(self.pg_interfaces)
1253         self.pg_start()
1254         capture = self.pg0.get_capture(len(pkts))
1255         for packet in capture:
1256             try:
1257                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1258                 self.assertEqual(packet[IPv6].dst, client.ip6)
1259                 self.assert_packet_checksums_valid(packet)
1260                 if packet.haslayer(TCP):
1261                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
1262                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
1263                 else:
1264                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
1265                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
1266             except:
1267                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1268                 raise
1269
1270         # ICMP error
1271         pkts = []
1272         pkts = [
1273             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1274             / IPv6(src=client.ip6, dst=nat_addr_ip6)
1275             / ICMPv6DestUnreach(code=1)
1276             / packet[IPv6]
1277             for packet in capture
1278         ]
1279         self.pg0.add_stream(pkts)
1280         self.pg_enable_capture(self.pg_interfaces)
1281         self.pg_start()
1282         capture = self.pg0.get_capture(len(pkts))
1283         for packet in capture:
1284             try:
1285                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1286                 self.assertEqual(packet[IPv6].dst, server.ip6)
1287                 icmp = packet[ICMPv6DestUnreach]
1288                 self.assertEqual(icmp.code, 1)
1289                 inner = icmp[IPerror6]
1290                 self.assertEqual(inner.src, server.ip6)
1291                 self.assertEqual(inner.dst, nat_addr_ip6)
1292                 self.assert_packet_checksums_valid(packet)
1293                 if inner.haslayer(TCPerror):
1294                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
1295                     self.assertEqual(inner[TCPerror].dport, client_tcp_out_port)
1296                 else:
1297                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
1298                     self.assertEqual(inner[UDPerror].dport, client_udp_out_port)
1299             except:
1300                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1301                 raise
1302
1303     def test_prefix(self):
1304         """NAT64 Network-Specific Prefix"""
1305
1306         self.vapi.nat64_add_del_pool_addr_range(
1307             start_addr=self.nat_addr,
1308             end_addr=self.nat_addr,
1309             vrf_id=0xFFFFFFFF,
1310             is_add=1,
1311         )
1312         flags = self.config_flags.NAT_IS_INSIDE
1313         self.vapi.nat64_add_del_interface(
1314             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1315         )
1316         self.vapi.nat64_add_del_interface(
1317             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1318         )
1319         self.vapi.nat64_add_del_pool_addr_range(
1320             start_addr=self.vrf1_nat_addr,
1321             end_addr=self.vrf1_nat_addr,
1322             vrf_id=self.vrf1_id,
1323             is_add=1,
1324         )
1325         self.vapi.nat64_add_del_interface(
1326             is_add=1, flags=flags, sw_if_index=self.pg2.sw_if_index
1327         )
1328
1329         # Add global prefix
1330         global_pref64 = "2001:db8::"
1331         global_pref64_len = 32
1332         global_pref64_str = "{}/{}".format(global_pref64, global_pref64_len)
1333         self.vapi.nat64_add_del_prefix(prefix=global_pref64_str, vrf_id=0, is_add=1)
1334
1335         prefix = self.vapi.nat64_prefix_dump()
1336         self.assertEqual(len(prefix), 1)
1337         self.assertEqual(str(prefix[0].prefix), global_pref64_str)
1338         self.assertEqual(prefix[0].vrf_id, 0)
1339
1340         # Add tenant specific prefix
1341         vrf1_pref64 = "2001:db8:122:300::"
1342         vrf1_pref64_len = 56
1343         vrf1_pref64_str = "{}/{}".format(vrf1_pref64, vrf1_pref64_len)
1344         self.vapi.nat64_add_del_prefix(
1345             prefix=vrf1_pref64_str, vrf_id=self.vrf1_id, is_add=1
1346         )
1347
1348         prefix = self.vapi.nat64_prefix_dump()
1349         self.assertEqual(len(prefix), 2)
1350
1351         # Global prefix
1352         pkts = self.create_stream_in_ip6(
1353             self.pg0, self.pg1, pref=global_pref64, plen=global_pref64_len
1354         )
1355         self.pg0.add_stream(pkts)
1356         self.pg_enable_capture(self.pg_interfaces)
1357         self.pg_start()
1358         capture = self.pg1.get_capture(len(pkts))
1359         self.verify_capture_out(
1360             capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
1361         )
1362
1363         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1364         self.pg1.add_stream(pkts)
1365         self.pg_enable_capture(self.pg_interfaces)
1366         self.pg_start()
1367         capture = self.pg0.get_capture(len(pkts))
1368         dst_ip = self.compose_ip6(self.pg1.remote_ip4, global_pref64, global_pref64_len)
1369         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
1370
1371         # Tenant specific prefix
1372         pkts = self.create_stream_in_ip6(
1373             self.pg2, self.pg1, pref=vrf1_pref64, plen=vrf1_pref64_len
1374         )
1375         self.pg2.add_stream(pkts)
1376         self.pg_enable_capture(self.pg_interfaces)
1377         self.pg_start()
1378         capture = self.pg1.get_capture(len(pkts))
1379         self.verify_capture_out(
1380             capture, nat_ip=self.vrf1_nat_addr, dst_ip=self.pg1.remote_ip4
1381         )
1382
1383         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
1384         self.pg1.add_stream(pkts)
1385         self.pg_enable_capture(self.pg_interfaces)
1386         self.pg_start()
1387         capture = self.pg2.get_capture(len(pkts))
1388         dst_ip = self.compose_ip6(self.pg1.remote_ip4, vrf1_pref64, vrf1_pref64_len)
1389         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
1390
1391     def test_unknown_proto(self):
1392         """NAT64 translate packet with unknown protocol"""
1393
1394         self.vapi.nat64_add_del_pool_addr_range(
1395             start_addr=self.nat_addr,
1396             end_addr=self.nat_addr,
1397             vrf_id=0xFFFFFFFF,
1398             is_add=1,
1399         )
1400         flags = self.config_flags.NAT_IS_INSIDE
1401         self.vapi.nat64_add_del_interface(
1402             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1403         )
1404         self.vapi.nat64_add_del_interface(
1405             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1406         )
1407         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1408
1409         # in2out
1410         p = (
1411             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1412             / IPv6(src=self.pg0.remote_ip6, dst=remote_ip6)
1413             / TCP(sport=self.tcp_port_in, dport=20)
1414         )
1415         self.pg0.add_stream(p)
1416         self.pg_enable_capture(self.pg_interfaces)
1417         self.pg_start()
1418         p = self.pg1.get_capture(1)
1419
1420         p = (
1421             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1422             / IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47)
1423             / GRE()
1424             / IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4)
1425             / TCP(sport=1234, dport=1234)
1426         )
1427         self.pg0.add_stream(p)
1428         self.pg_enable_capture(self.pg_interfaces)
1429         self.pg_start()
1430         p = self.pg1.get_capture(1)
1431         packet = p[0]
1432         try:
1433             self.assertEqual(packet[IP].src, self.nat_addr)
1434             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1435             self.assertEqual(packet.haslayer(GRE), 1)
1436             self.assert_packet_checksums_valid(packet)
1437         except:
1438             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1439             raise
1440
1441         # out2in
1442         p = (
1443             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1444             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1445             / GRE()
1446             / IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4)
1447             / TCP(sport=1234, dport=1234)
1448         )
1449         self.pg1.add_stream(p)
1450         self.pg_enable_capture(self.pg_interfaces)
1451         self.pg_start()
1452         p = self.pg0.get_capture(1)
1453         packet = p[0]
1454         try:
1455             self.assertEqual(packet[IPv6].src, remote_ip6)
1456             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1457             self.assertEqual(packet[IPv6].nh, 47)
1458         except:
1459             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1460             raise
1461
1462     def test_hairpinning_unknown_proto(self):
1463         """NAT64 translate packet with unknown protocol - hairpinning"""
1464
1465         client = self.pg0.remote_hosts[0]
1466         server = self.pg0.remote_hosts[1]
1467         server_tcp_in_port = 22
1468         server_tcp_out_port = 4022
1469         client_tcp_in_port = 1234
1470         client_tcp_out_port = 1235
1471         server_nat_ip = "10.0.0.100"
1472         client_nat_ip = "10.0.0.110"
1473         server_nat_ip6 = self.compose_ip6(server_nat_ip, "64:ff9b::", 96)
1474         client_nat_ip6 = self.compose_ip6(client_nat_ip, "64:ff9b::", 96)
1475
1476         self.vapi.nat64_add_del_pool_addr_range(
1477             start_addr=server_nat_ip,
1478             end_addr=client_nat_ip,
1479             vrf_id=0xFFFFFFFF,
1480             is_add=1,
1481         )
1482         flags = self.config_flags.NAT_IS_INSIDE
1483         self.vapi.nat64_add_del_interface(
1484             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1485         )
1486         self.vapi.nat64_add_del_interface(
1487             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1488         )
1489
1490         self.vapi.nat64_add_del_static_bib(
1491             i_addr=server.ip6n,
1492             o_addr=server_nat_ip,
1493             i_port=server_tcp_in_port,
1494             o_port=server_tcp_out_port,
1495             proto=IP_PROTOS.tcp,
1496             vrf_id=0,
1497             is_add=1,
1498         )
1499
1500         self.vapi.nat64_add_del_static_bib(
1501             i_addr=server.ip6n,
1502             o_addr=server_nat_ip,
1503             i_port=0,
1504             o_port=0,
1505             proto=IP_PROTOS.gre,
1506             vrf_id=0,
1507             is_add=1,
1508         )
1509
1510         self.vapi.nat64_add_del_static_bib(
1511             i_addr=client.ip6n,
1512             o_addr=client_nat_ip,
1513             i_port=client_tcp_in_port,
1514             o_port=client_tcp_out_port,
1515             proto=IP_PROTOS.tcp,
1516             vrf_id=0,
1517             is_add=1,
1518         )
1519
1520         # client to server
1521         p = (
1522             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1523             / IPv6(src=client.ip6, dst=server_nat_ip6)
1524             / TCP(sport=client_tcp_in_port, dport=server_tcp_out_port)
1525         )
1526         self.pg0.add_stream(p)
1527         self.pg_enable_capture(self.pg_interfaces)
1528         self.pg_start()
1529         p = self.pg0.get_capture(1)
1530
1531         p = (
1532             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1533             / IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre)
1534             / GRE()
1535             / IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4)
1536             / TCP(sport=1234, dport=1234)
1537         )
1538         self.pg0.add_stream(p)
1539         self.pg_enable_capture(self.pg_interfaces)
1540         self.pg_start()
1541         p = self.pg0.get_capture(1)
1542         packet = p[0]
1543         try:
1544             self.assertEqual(packet[IPv6].src, client_nat_ip6)
1545             self.assertEqual(packet[IPv6].dst, server.ip6)
1546             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1547         except:
1548             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1549             raise
1550
1551         # server to client
1552         p = (
1553             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1554             / IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre)
1555             / GRE()
1556             / IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4)
1557             / TCP(sport=1234, dport=1234)
1558         )
1559         self.pg0.add_stream(p)
1560         self.pg_enable_capture(self.pg_interfaces)
1561         self.pg_start()
1562         p = self.pg0.get_capture(1)
1563         packet = p[0]
1564         try:
1565             self.assertEqual(packet[IPv6].src, server_nat_ip6)
1566             self.assertEqual(packet[IPv6].dst, client.ip6)
1567             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1568         except:
1569             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1570             raise
1571
1572     def test_one_armed_nat64(self):
1573         """One armed NAT64"""
1574         external_port = 0
1575         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4, "64:ff9b::", 96)
1576
1577         self.vapi.nat64_add_del_pool_addr_range(
1578             start_addr=self.nat_addr,
1579             end_addr=self.nat_addr,
1580             vrf_id=0xFFFFFFFF,
1581             is_add=1,
1582         )
1583         flags = self.config_flags.NAT_IS_INSIDE
1584         self.vapi.nat64_add_del_interface(
1585             is_add=1, flags=flags, sw_if_index=self.pg3.sw_if_index
1586         )
1587         self.vapi.nat64_add_del_interface(
1588             is_add=1, flags=0, sw_if_index=self.pg3.sw_if_index
1589         )
1590
1591         # in2out
1592         p = (
1593             Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac)
1594             / IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6)
1595             / TCP(sport=12345, dport=80)
1596         )
1597         self.pg3.add_stream(p)
1598         self.pg_enable_capture(self.pg_interfaces)
1599         self.pg_start()
1600         capture = self.pg3.get_capture(1)
1601         p = capture[0]
1602         try:
1603             ip = p[IP]
1604             tcp = p[TCP]
1605             self.assertEqual(ip.src, self.nat_addr)
1606             self.assertEqual(ip.dst, self.pg3.remote_ip4)
1607             self.assertNotEqual(tcp.sport, 12345)
1608             external_port = tcp.sport
1609             self.assertEqual(tcp.dport, 80)
1610             self.assert_packet_checksums_valid(p)
1611         except:
1612             self.logger.error(ppp("Unexpected or invalid packet:", p))
1613             raise
1614
1615         # out2in
1616         p = (
1617             Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac)
1618             / IP(src=self.pg3.remote_ip4, dst=self.nat_addr)
1619             / TCP(sport=80, dport=external_port)
1620         )
1621         self.pg3.add_stream(p)
1622         self.pg_enable_capture(self.pg_interfaces)
1623         self.pg_start()
1624         capture = self.pg3.get_capture(1)
1625         p = capture[0]
1626         try:
1627             ip = p[IPv6]
1628             tcp = p[TCP]
1629             self.assertEqual(ip.src, remote_host_ip6)
1630             self.assertEqual(ip.dst, self.pg3.remote_ip6)
1631             self.assertEqual(tcp.sport, 80)
1632             self.assertEqual(tcp.dport, 12345)
1633             self.assert_packet_checksums_valid(p)
1634         except:
1635             self.logger.error(ppp("Unexpected or invalid packet:", p))
1636             raise
1637
1638     def test_frag_in_order(self):
1639         """NAT64 translate fragments arriving in order"""
1640         self.tcp_port_in = random.randint(1025, 65535)
1641
1642         self.vapi.nat64_add_del_pool_addr_range(
1643             start_addr=self.nat_addr,
1644             end_addr=self.nat_addr,
1645             vrf_id=0xFFFFFFFF,
1646             is_add=1,
1647         )
1648         flags = self.config_flags.NAT_IS_INSIDE
1649         self.vapi.nat64_add_del_interface(
1650             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1651         )
1652         self.vapi.nat64_add_del_interface(
1653             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1654         )
1655
1656         # in2out
1657         data = b"a" * 200
1658         pkts = self.create_stream_frag_ip6(
1659             self.pg0, self.pg1.remote_ip4, self.tcp_port_in, 20, data
1660         )
1661         self.pg0.add_stream(pkts)
1662         self.pg_enable_capture(self.pg_interfaces)
1663         self.pg_start()
1664         frags = self.pg1.get_capture(len(pkts))
1665         p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
1666         self.assertEqual(p[TCP].dport, 20)
1667         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1668         self.tcp_port_out = p[TCP].sport
1669         self.assertEqual(data, p[Raw].load)
1670
1671         # out2in
1672         data = b"A" * 4 + b"b" * 16 + b"C" * 3
1673         pkts = self.create_stream_frag(
1674             self.pg1, self.nat_addr, 20, self.tcp_port_out, data
1675         )
1676         self.pg1.add_stream(pkts)
1677         self.pg_enable_capture(self.pg_interfaces)
1678         self.pg_start()
1679         frags = self.pg0.get_capture(len(pkts))
1680         self.logger.debug(ppc("Captured:", frags))
1681         src = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1682         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1683         self.assertEqual(p[TCP].sport, 20)
1684         self.assertEqual(p[TCP].dport, self.tcp_port_in)
1685         self.assertEqual(data, p[Raw].load)
1686
1687     def test_reass_hairpinning(self):
1688         """NAT64 fragments hairpinning"""
1689         data = b"a" * 200
1690         server = self.pg0.remote_hosts[1]
1691         server_in_port = random.randint(1025, 65535)
1692         server_out_port = random.randint(1025, 65535)
1693         client_in_port = random.randint(1025, 65535)
1694         ip = IPv6(src="".join(["64:ff9b::", self.nat_addr]))
1695         nat_addr_ip6 = ip.src
1696
1697         self.vapi.nat64_add_del_pool_addr_range(
1698             start_addr=self.nat_addr,
1699             end_addr=self.nat_addr,
1700             vrf_id=0xFFFFFFFF,
1701             is_add=1,
1702         )
1703         flags = self.config_flags.NAT_IS_INSIDE
1704         self.vapi.nat64_add_del_interface(
1705             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1706         )
1707         self.vapi.nat64_add_del_interface(
1708             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1709         )
1710
1711         # add static BIB entry for server
1712         self.vapi.nat64_add_del_static_bib(
1713             i_addr=server.ip6n,
1714             o_addr=self.nat_addr,
1715             i_port=server_in_port,
1716             o_port=server_out_port,
1717             proto=IP_PROTOS.tcp,
1718             vrf_id=0,
1719             is_add=1,
1720         )
1721
1722         # send packet from host to server
1723         pkts = self.create_stream_frag_ip6(
1724             self.pg0, self.nat_addr, client_in_port, server_out_port, data
1725         )
1726         self.pg0.add_stream(pkts)
1727         self.pg_enable_capture(self.pg_interfaces)
1728         self.pg_start()
1729         frags = self.pg0.get_capture(len(pkts))
1730         self.logger.debug(ppc("Captured:", frags))
1731         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
1732         self.assertNotEqual(p[TCP].sport, client_in_port)
1733         self.assertEqual(p[TCP].dport, server_in_port)
1734         self.assertEqual(data, p[Raw].load)
1735
1736     def test_frag_out_of_order(self):
1737         """NAT64 translate fragments arriving out of order"""
1738         self.tcp_port_in = random.randint(1025, 65535)
1739
1740         self.vapi.nat64_add_del_pool_addr_range(
1741             start_addr=self.nat_addr,
1742             end_addr=self.nat_addr,
1743             vrf_id=0xFFFFFFFF,
1744             is_add=1,
1745         )
1746         flags = self.config_flags.NAT_IS_INSIDE
1747         self.vapi.nat64_add_del_interface(
1748             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1749         )
1750         self.vapi.nat64_add_del_interface(
1751             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1752         )
1753
1754         # in2out
1755         data = b"a" * 200
1756         pkts = self.create_stream_frag_ip6(
1757             self.pg0, self.pg1.remote_ip4, self.tcp_port_in, 20, data
1758         )
1759         pkts.reverse()
1760         self.pg0.add_stream(pkts)
1761         self.pg_enable_capture(self.pg_interfaces)
1762         self.pg_start()
1763         frags = self.pg1.get_capture(len(pkts))
1764         p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
1765         self.assertEqual(p[TCP].dport, 20)
1766         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1767         self.tcp_port_out = p[TCP].sport
1768         self.assertEqual(data, p[Raw].load)
1769
1770         # out2in
1771         data = b"A" * 4 + b"B" * 16 + b"C" * 3
1772         pkts = self.create_stream_frag(
1773             self.pg1, self.nat_addr, 20, self.tcp_port_out, data
1774         )
1775         pkts.reverse()
1776         self.pg1.add_stream(pkts)
1777         self.pg_enable_capture(self.pg_interfaces)
1778         self.pg_start()
1779         frags = self.pg0.get_capture(len(pkts))
1780         src = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1781         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1782         self.assertEqual(p[TCP].sport, 20)
1783         self.assertEqual(p[TCP].dport, self.tcp_port_in)
1784         self.assertEqual(data, p[Raw].load)
1785
1786     def test_interface_addr(self):
1787         """Acquire NAT64 pool addresses from interface"""
1788         self.vapi.nat64_add_del_interface_addr(
1789             is_add=1, sw_if_index=self.pg4.sw_if_index
1790         )
1791
1792         # no address in NAT64 pool
1793         addresses = self.vapi.nat44_address_dump()
1794         self.assertEqual(0, len(addresses))
1795
1796         # configure interface address and check NAT64 address pool
1797         self.pg4.config_ip4()
1798         addresses = self.vapi.nat64_pool_addr_dump()
1799         self.assertEqual(len(addresses), 1)
1800
1801         self.assertEqual(str(addresses[0].address), self.pg4.local_ip4)
1802
1803         # remove interface address and check NAT64 address pool
1804         self.pg4.unconfig_ip4()
1805         addresses = self.vapi.nat64_pool_addr_dump()
1806         self.assertEqual(0, len(addresses))
1807
1808     @unittest.skipUnless(config.extended, "part of extended tests")
1809     def test_ipfix_max_bibs_sessions(self):
1810         """IPFIX logging maximum session and BIB entries exceeded"""
1811         max_bibs = 1280
1812         max_sessions = 2560
1813         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1814
1815         self.vapi.nat64_add_del_pool_addr_range(
1816             start_addr=self.nat_addr,
1817             end_addr=self.nat_addr,
1818             vrf_id=0xFFFFFFFF,
1819             is_add=1,
1820         )
1821         flags = self.config_flags.NAT_IS_INSIDE
1822         self.vapi.nat64_add_del_interface(
1823             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1824         )
1825         self.vapi.nat64_add_del_interface(
1826             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1827         )
1828
1829         pkts = []
1830         src = ""
1831         for i in range(0, max_bibs):
1832             src = "fd01:aa::%x" % (i)
1833             p = (
1834                 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1835                 / IPv6(src=src, dst=remote_host_ip6)
1836                 / TCP(sport=12345, dport=80)
1837             )
1838             pkts.append(p)
1839             p = (
1840                 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1841                 / IPv6(src=src, dst=remote_host_ip6)
1842                 / TCP(sport=12345, dport=22)
1843             )
1844             pkts.append(p)
1845         self.pg0.add_stream(pkts)
1846         self.pg_enable_capture(self.pg_interfaces)
1847         self.pg_start()
1848         self.pg1.get_capture(max_sessions)
1849
1850         self.vapi.set_ipfix_exporter(
1851             collector_address=self.pg3.remote_ip4,
1852             src_address=self.pg3.local_ip4,
1853             path_mtu=512,
1854             template_interval=10,
1855         )
1856         self.vapi.nat_ipfix_enable_disable(
1857             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
1858         )
1859
1860         p = (
1861             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1862             / IPv6(src=src, dst=remote_host_ip6)
1863             / TCP(sport=12345, dport=25)
1864         ) * 3
1865         self.pg0.add_stream(p)
1866         self.pg_enable_capture(self.pg_interfaces)
1867         self.pg_start()
1868         self.pg1.assert_nothing_captured()
1869         self.vapi.ipfix_flush()
1870         capture = self.pg3.get_capture(7)
1871         ipfix = IPFIXDecoder()
1872         # first load template
1873         for p in capture:
1874             self.assertTrue(p.haslayer(IPFIX))
1875             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1876             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1877             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1878             self.assertEqual(p[UDP].dport, 4739)
1879             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1880             if p.haslayer(Template):
1881                 ipfix.add_template(p.getlayer(Template))
1882         # verify events in data set
1883         event_count = 0
1884         for p in capture:
1885             if p.haslayer(Data):
1886                 data = ipfix.decode_data_set(p.getlayer(Set))
1887                 event_count += self.verify_ipfix_max_sessions(data, max_sessions)
1888         self.assertEqual(event_count, 1)
1889
1890         p = (
1891             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1892             / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
1893             / TCP(sport=12345, dport=80)
1894         ) * 3
1895         self.pg0.add_stream(p)
1896         self.pg_enable_capture(self.pg_interfaces)
1897         self.pg_start()
1898         self.pg1.assert_nothing_captured()
1899         self.vapi.ipfix_flush()
1900         capture = self.pg3.get_capture(1)
1901         # verify events in data set
1902         event_count = 0
1903         for p in capture:
1904             self.assertTrue(p.haslayer(IPFIX))
1905             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1906             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1907             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1908             self.assertEqual(p[UDP].dport, 4739)
1909             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1910             if p.haslayer(Data):
1911                 data = ipfix.decode_data_set(p.getlayer(Set))
1912                 event_count += self.verify_ipfix_max_bibs(data, max_bibs)
1913         self.assertEqual(event_count, 1)
1914
1915     def test_ipfix_bib_ses(self):
1916         """IPFIX logging NAT64 BIB/session create and delete events"""
1917         self.tcp_port_in = random.randint(1025, 65535)
1918         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1919
1920         self.vapi.nat64_add_del_pool_addr_range(
1921             start_addr=self.nat_addr,
1922             end_addr=self.nat_addr,
1923             vrf_id=0xFFFFFFFF,
1924             is_add=1,
1925         )
1926         flags = self.config_flags.NAT_IS_INSIDE
1927         self.vapi.nat64_add_del_interface(
1928             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1929         )
1930         self.vapi.nat64_add_del_interface(
1931             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1932         )
1933         self.vapi.set_ipfix_exporter(
1934             collector_address=self.pg3.remote_ip4,
1935             src_address=self.pg3.local_ip4,
1936             path_mtu=512,
1937             template_interval=10,
1938         )
1939         self.vapi.nat_ipfix_enable_disable(
1940             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
1941         )
1942
1943         # Create
1944         p = (
1945             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1946             / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
1947             / TCP(sport=self.tcp_port_in, dport=25)
1948         )
1949         self.pg0.add_stream(p)
1950         self.pg_enable_capture(self.pg_interfaces)
1951         self.pg_start()
1952         p = self.pg1.get_capture(1)
1953         self.tcp_port_out = p[0][TCP].sport
1954         self.vapi.ipfix_flush()
1955         capture = self.pg3.get_capture(8)
1956         ipfix = IPFIXDecoder()
1957         # first load template
1958         for p in capture:
1959             self.assertTrue(p.haslayer(IPFIX))
1960             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1961             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1962             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1963             self.assertEqual(p[UDP].dport, 4739)
1964             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1965             if p.haslayer(Template):
1966                 ipfix.add_template(p.getlayer(Template))
1967         # verify events in data set
1968         for p in capture:
1969             if p.haslayer(Data):
1970                 data = ipfix.decode_data_set(p.getlayer(Set))
1971                 if scapy.compat.orb(data[0][230]) == 10:
1972                     self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6)
1973                 elif scapy.compat.orb(data[0][230]) == 6:
1974                     self.verify_ipfix_nat64_ses(
1975                         data, 1, self.pg0.remote_ip6, self.pg1.remote_ip4, 25
1976                     )
1977                 else:
1978                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
1979
1980         # Delete
1981         self.pg_enable_capture(self.pg_interfaces)
1982         self.vapi.nat64_add_del_pool_addr_range(
1983             start_addr=self.nat_addr,
1984             end_addr=self.nat_addr,
1985             vrf_id=0xFFFFFFFF,
1986             is_add=0,
1987         )
1988         self.vapi.ipfix_flush()
1989         capture = self.pg3.get_capture(2)
1990         # verify events in data set
1991         for p in capture:
1992             self.assertTrue(p.haslayer(IPFIX))
1993             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1994             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1995             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1996             self.assertEqual(p[UDP].dport, 4739)
1997             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1998             if p.haslayer(Data):
1999                 data = ipfix.decode_data_set(p.getlayer(Set))
2000                 if scapy.compat.orb(data[0][230]) == 11:
2001                     self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6)
2002                 elif scapy.compat.orb(data[0][230]) == 7:
2003                     self.verify_ipfix_nat64_ses(
2004                         data, 0, self.pg0.remote_ip6, self.pg1.remote_ip4, 25
2005                     )
2006                 else:
2007                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
2008
2009     def test_syslog_sess(self):
2010         """Test syslog session creation and deletion"""
2011         self.tcp_port_in = random.randint(1025, 65535)
2012         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
2013
2014         self.vapi.nat64_add_del_pool_addr_range(
2015             start_addr=self.nat_addr,
2016             end_addr=self.nat_addr,
2017             vrf_id=0xFFFFFFFF,
2018             is_add=1,
2019         )
2020         flags = self.config_flags.NAT_IS_INSIDE
2021         self.vapi.nat64_add_del_interface(
2022             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
2023         )
2024         self.vapi.nat64_add_del_interface(
2025             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
2026         )
2027         self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2028         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2029
2030         p = (
2031             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2032             / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
2033             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
2034         )
2035         self.pg0.add_stream(p)
2036         self.pg_enable_capture(self.pg_interfaces)
2037         self.pg_start()
2038         p = self.pg1.get_capture(1)
2039         self.tcp_port_out = p[0][TCP].sport
2040         capture = self.pg3.get_capture(1)
2041         self.verify_syslog_sess(capture[0][Raw].load, is_ip6=True)
2042
2043         self.pg_enable_capture(self.pg_interfaces)
2044         self.pg_start()
2045         self.vapi.nat64_add_del_pool_addr_range(
2046             start_addr=self.nat_addr,
2047             end_addr=self.nat_addr,
2048             vrf_id=0xFFFFFFFF,
2049             is_add=0,
2050         )
2051         capture = self.pg3.get_capture(1)
2052         self.verify_syslog_sess(capture[0][Raw].load, False, True)
2053
2054     def nat64_get_ses_num(self):
2055         """
2056         Return number of active NAT64 sessions.
2057         """
2058         st = self.vapi.nat64_st_dump(proto=255)
2059         return len(st)
2060
2061     def clear_nat64(self):
2062         """
2063         Clear NAT64 configuration.
2064         """
2065         self.vapi.nat_ipfix_enable_disable(
2066             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
2067         )
2068         self.ipfix_src_port = 4739
2069         self.ipfix_domain_id = 1
2070
2071         self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
2072
2073         self.vapi.nat64_set_timeouts(
2074             udp=300, tcp_established=7440, tcp_transitory=240, icmp=60
2075         )
2076
2077         interfaces = self.vapi.nat64_interface_dump()
2078         for intf in interfaces:
2079             self.vapi.nat64_add_del_interface(
2080                 is_add=0, flags=intf.flags, sw_if_index=intf.sw_if_index
2081             )
2082
2083         bib = self.vapi.nat64_bib_dump(proto=255)
2084         for bibe in bib:
2085             if bibe.flags & self.config_flags.NAT_IS_STATIC:
2086                 self.vapi.nat64_add_del_static_bib(
2087                     i_addr=bibe.i_addr,
2088                     o_addr=bibe.o_addr,
2089                     i_port=bibe.i_port,
2090                     o_port=bibe.o_port,
2091                     proto=bibe.proto,
2092                     vrf_id=bibe.vrf_id,
2093                     is_add=0,
2094                 )
2095
2096         adresses = self.vapi.nat64_pool_addr_dump()
2097         for addr in adresses:
2098             self.vapi.nat64_add_del_pool_addr_range(
2099                 start_addr=addr.address,
2100                 end_addr=addr.address,
2101                 vrf_id=addr.vrf_id,
2102                 is_add=0,
2103             )
2104
2105         prefixes = self.vapi.nat64_prefix_dump()
2106         for prefix in prefixes:
2107             self.vapi.nat64_add_del_prefix(
2108                 prefix=str(prefix.prefix), vrf_id=prefix.vrf_id, is_add=0
2109             )
2110
2111         bibs = self.statistics.get_counter("/nat64/total-bibs")
2112         self.assertEqual(bibs[0][0], 0)
2113         sessions = self.statistics.get_counter("/nat64/total-sessions")
2114         self.assertEqual(sessions[0][0], 0)
2115
2116
2117 if __name__ == "__main__":
2118     unittest.main(testRunner=VppTestRunner)