tests: replace pycodestyle with black
[vpp.git] / test / test_nat64.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import random
5 import socket
6 import struct
7 import unittest
8 from io import BytesIO
9
10 import scapy.compat
11 from config import config
12 from framework import tag_fixme_vpp_workers
13 from framework import VppTestCase, VppTestRunner
14 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
15 from scapy.data import IP_PROTOS
16 from scapy.layers.inet import IP, TCP, UDP, ICMP
17 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
18 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
19 from scapy.layers.inet6 import (
20     IPv6,
21     ICMPv6EchoRequest,
22     ICMPv6EchoReply,
23     ICMPv6ND_NS,
24     ICMPv6ND_NA,
25     ICMPv6NDOptDstLLAddr,
26     fragment6,
27 )
28 from scapy.layers.l2 import Ether, GRE
29 from scapy.packet import Raw
30 from syslog_rfc5424_parser import SyslogMessage, ParseError
31 from syslog_rfc5424_parser.constants import SyslogSeverity
32 from util import ppc, ppp
33 from vpp_papi import VppEnum
34
35
36 @tag_fixme_vpp_workers
37 class TestNAT64(VppTestCase):
38     """NAT64 Test Cases"""
39
40     @property
41     def SYSLOG_SEVERITY(self):
42         return VppEnum.vl_api_syslog_severity_t
43
44     @property
45     def config_flags(self):
46         return VppEnum.vl_api_nat_config_flags_t
47
48     @classmethod
49     def setUpClass(cls):
50         super(TestNAT64, cls).setUpClass()
51
52         cls.tcp_port_in = 6303
53         cls.tcp_port_out = 6303
54         cls.udp_port_in = 6304
55         cls.udp_port_out = 6304
56         cls.icmp_id_in = 6305
57         cls.icmp_id_out = 6305
58         cls.tcp_external_port = 80
59         cls.nat_addr = "10.0.0.3"
60         cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
61         cls.vrf1_id = 10
62         cls.vrf1_nat_addr = "10.0.10.3"
63         cls.ipfix_src_port = 4739
64         cls.ipfix_domain_id = 1
65
66         cls.create_pg_interfaces(range(6))
67         cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
68         cls.ip6_interfaces.append(cls.pg_interfaces[2])
69         cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
70
71         cls.vapi.ip_table_add_del(
72             is_add=1, table={"table_id": cls.vrf1_id, "is_ip6": 1}
73         )
74
75         cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
76
77         cls.pg0.generate_remote_hosts(2)
78
79         for i in cls.ip6_interfaces:
80             i.admin_up()
81             i.config_ip6()
82             i.configure_ipv6_neighbors()
83
84         for i in cls.ip4_interfaces:
85             i.admin_up()
86             i.config_ip4()
87             i.resolve_arp()
88
89         cls.pg3.admin_up()
90         cls.pg3.config_ip4()
91         cls.pg3.resolve_arp()
92         cls.pg3.config_ip6()
93         cls.pg3.configure_ipv6_neighbors()
94
95         cls.pg5.admin_up()
96         cls.pg5.config_ip6()
97
98     @classmethod
99     def tearDownClass(cls):
100         super(TestNAT64, cls).tearDownClass()
101
102     def setUp(self):
103         super(TestNAT64, self).setUp()
104         self.vapi.nat64_plugin_enable_disable(enable=1, bib_buckets=128, st_buckets=256)
105
106     def tearDown(self):
107         super(TestNAT64, self).tearDown()
108         if not self.vpp_dead:
109             self.vapi.nat64_plugin_enable_disable(enable=0)
110
111     def show_commands_at_teardown(self):
112         self.logger.info(self.vapi.cli("show nat64 pool"))
113         self.logger.info(self.vapi.cli("show nat64 interfaces"))
114         self.logger.info(self.vapi.cli("show nat64 prefix"))
115         self.logger.info(self.vapi.cli("show nat64 bib all"))
116         self.logger.info(self.vapi.cli("show nat64 session table all"))
117
118     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
119         """
120         Create IPv6 packet stream for inside network
121
122         :param in_if: Inside interface
123         :param out_if: Outside interface
124         :param ttl: Hop Limit of generated packets
125         :param pref: NAT64 prefix
126         :param plen: NAT64 prefix length
127         """
128         pkts = []
129         if pref is None:
130             dst = "".join(["64:ff9b::", out_if.remote_ip4])
131         else:
132             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
133
134         # TCP
135         p = (
136             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
137             / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim)
138             / TCP(sport=self.tcp_port_in, dport=20)
139         )
140         pkts.append(p)
141
142         # UDP
143         p = (
144             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
145             / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim)
146             / UDP(sport=self.udp_port_in, dport=20)
147         )
148         pkts.append(p)
149
150         # ICMP
151         p = (
152             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
153             / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim)
154             / ICMPv6EchoRequest(id=self.icmp_id_in)
155         )
156         pkts.append(p)
157
158         return pkts
159
160     def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
161         """
162         Create packet stream for outside network
163
164         :param out_if: Outside interface
165         :param dst_ip: Destination IP address (Default use global NAT address)
166         :param ttl: TTL of generated packets
167         :param use_inside_ports: Use inside NAT ports as destination ports
168                instead of outside ports
169         """
170         if dst_ip is None:
171             dst_ip = self.nat_addr
172         if not use_inside_ports:
173             tcp_port = self.tcp_port_out
174             udp_port = self.udp_port_out
175             icmp_id = self.icmp_id_out
176         else:
177             tcp_port = self.tcp_port_in
178             udp_port = self.udp_port_in
179             icmp_id = self.icmp_id_in
180         pkts = []
181         # TCP
182         p = (
183             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
184             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
185             / TCP(dport=tcp_port, sport=20)
186         )
187         pkts.extend([p, p])
188
189         # UDP
190         p = (
191             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
192             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
193             / UDP(dport=udp_port, sport=20)
194         )
195         pkts.append(p)
196
197         # ICMP
198         p = (
199             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
200             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
201             / ICMP(id=icmp_id, type="echo-reply")
202         )
203         pkts.append(p)
204
205         return pkts
206
207     def verify_capture_out(
208         self,
209         capture,
210         nat_ip=None,
211         same_port=False,
212         dst_ip=None,
213         is_ip6=False,
214         ignore_port=False,
215     ):
216         """
217         Verify captured packets on outside network
218
219         :param capture: Captured packets
220         :param nat_ip: Translated IP address (Default use global NAT address)
221         :param same_port: Source port number is not translated (Default False)
222         :param dst_ip: Destination IP address (Default do not verify)
223         :param is_ip6: If L3 protocol is IPv6 (Default False)
224         """
225         if is_ip6:
226             IP46 = IPv6
227             ICMP46 = ICMPv6EchoRequest
228         else:
229             IP46 = IP
230             ICMP46 = ICMP
231         if nat_ip is None:
232             nat_ip = self.nat_addr
233         for packet in capture:
234             try:
235                 if not is_ip6:
236                     self.assert_packet_checksums_valid(packet)
237                 self.assertEqual(packet[IP46].src, nat_ip)
238                 if dst_ip is not None:
239                     self.assertEqual(packet[IP46].dst, dst_ip)
240                 if packet.haslayer(TCP):
241                     if not ignore_port:
242                         if same_port:
243                             self.assertEqual(packet[TCP].sport, self.tcp_port_in)
244                         else:
245                             self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
246                     self.tcp_port_out = packet[TCP].sport
247                     self.assert_packet_checksums_valid(packet)
248                 elif packet.haslayer(UDP):
249                     if not ignore_port:
250                         if same_port:
251                             self.assertEqual(packet[UDP].sport, self.udp_port_in)
252                         else:
253                             self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
254                     self.udp_port_out = packet[UDP].sport
255                 else:
256                     if not ignore_port:
257                         if same_port:
258                             self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
259                         else:
260                             self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
261                     self.icmp_id_out = packet[ICMP46].id
262                     self.assert_packet_checksums_valid(packet)
263             except:
264                 self.logger.error(
265                     ppp("Unexpected or invalid packet (outside network):", packet)
266                 )
267                 raise
268
269     def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
270         """
271         Verify captured IPv6 packets on inside network
272
273         :param capture: Captured packets
274         :param src_ip: Source IP
275         :param dst_ip: Destination IP address
276         """
277         for packet in capture:
278             try:
279                 self.assertEqual(packet[IPv6].src, src_ip)
280                 self.assertEqual(packet[IPv6].dst, dst_ip)
281                 self.assert_packet_checksums_valid(packet)
282                 if packet.haslayer(TCP):
283                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
284                 elif packet.haslayer(UDP):
285                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
286                 else:
287                     self.assertEqual(packet[ICMPv6EchoReply].id, self.icmp_id_in)
288             except:
289                 self.logger.error(
290                     ppp("Unexpected or invalid packet (inside network):", packet)
291                 )
292                 raise
293
294     def create_stream_frag(
295         self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
296     ):
297         """
298         Create fragmented packet stream
299
300         :param src_if: Source interface
301         :param dst: Destination IPv4 address
302         :param sport: Source port
303         :param dport: Destination port
304         :param data: Payload data
305         :param proto: protocol (TCP, UDP, ICMP)
306         :param echo_reply: use echo_reply if protocol is ICMP
307         :returns: Fragments
308         """
309         if proto == IP_PROTOS.tcp:
310             p = (
311                 IP(src=src_if.remote_ip4, dst=dst)
312                 / TCP(sport=sport, dport=dport)
313                 / Raw(data)
314             )
315             p = p.__class__(scapy.compat.raw(p))
316             chksum = p[TCP].chksum
317             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
318         elif proto == IP_PROTOS.udp:
319             proto_header = UDP(sport=sport, dport=dport)
320         elif proto == IP_PROTOS.icmp:
321             if not echo_reply:
322                 proto_header = ICMP(id=sport, type="echo-request")
323             else:
324                 proto_header = ICMP(id=sport, type="echo-reply")
325         else:
326             raise Exception("Unsupported protocol")
327         id = random.randint(0, 65535)
328         pkts = []
329         if proto == IP_PROTOS.tcp:
330             raw = Raw(data[0:4])
331         else:
332             raw = Raw(data[0:16])
333         p = (
334             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
335             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
336             / proto_header
337             / raw
338         )
339         pkts.append(p)
340         if proto == IP_PROTOS.tcp:
341             raw = Raw(data[4:20])
342         else:
343             raw = Raw(data[16:32])
344         p = (
345             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
346             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
347             / raw
348         )
349         pkts.append(p)
350         if proto == IP_PROTOS.tcp:
351             raw = Raw(data[20:])
352         else:
353             raw = Raw(data[32:])
354         p = (
355             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
356             / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
357             / raw
358         )
359         pkts.append(p)
360         return pkts
361
362     def create_stream_frag_ip6(
363         self, src_if, dst, sport, dport, data, pref=None, plen=0, frag_size=128
364     ):
365         """
366         Create fragmented packet stream
367
368         :param src_if: Source interface
369         :param dst: Destination IPv4 address
370         :param sport: Source TCP port
371         :param dport: Destination TCP port
372         :param data: Payload data
373         :param pref: NAT64 prefix
374         :param plen: NAT64 prefix length
375         :param fragsize: size of fragments
376         :returns: Fragments
377         """
378         if pref is None:
379             dst_ip6 = "".join(["64:ff9b::", dst])
380         else:
381             dst_ip6 = self.compose_ip6(dst, pref, plen)
382
383         p = (
384             Ether(dst=src_if.local_mac, src=src_if.remote_mac)
385             / IPv6(src=src_if.remote_ip6, dst=dst_ip6)
386             / IPv6ExtHdrFragment(id=random.randint(0, 65535))
387             / TCP(sport=sport, dport=dport)
388             / Raw(data)
389         )
390
391         return fragment6(p, frag_size)
392
393     def reass_frags_and_verify(self, frags, src, dst):
394         """
395         Reassemble and verify fragmented packet
396
397         :param frags: Captured fragments
398         :param src: Source IPv4 address to verify
399         :param dst: Destination IPv4 address to verify
400
401         :returns: Reassembled IPv4 packet
402         """
403         buffer = BytesIO()
404         for p in frags:
405             self.assertEqual(p[IP].src, src)
406             self.assertEqual(p[IP].dst, dst)
407             self.assert_ip_checksum_valid(p)
408             buffer.seek(p[IP].frag * 8)
409             buffer.write(bytes(p[IP].payload))
410         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
411         if ip.proto == IP_PROTOS.tcp:
412             p = ip / TCP(buffer.getvalue())
413             self.logger.debug(ppp("Reassembled:", p))
414             self.assert_tcp_checksum_valid(p)
415         elif ip.proto == IP_PROTOS.udp:
416             p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
417         elif ip.proto == IP_PROTOS.icmp:
418             p = ip / ICMP(buffer.getvalue())
419         return p
420
421     def reass_frags_and_verify_ip6(self, frags, src, dst):
422         """
423         Reassemble and verify fragmented packet
424
425         :param frags: Captured fragments
426         :param src: Source IPv6 address to verify
427         :param dst: Destination IPv6 address to verify
428
429         :returns: Reassembled IPv6 packet
430         """
431         buffer = BytesIO()
432         for p in frags:
433             self.assertEqual(p[IPv6].src, src)
434             self.assertEqual(p[IPv6].dst, dst)
435             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
436             buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
437         ip = IPv6(
438             src=frags[0][IPv6].src,
439             dst=frags[0][IPv6].dst,
440             nh=frags[0][IPv6ExtHdrFragment].nh,
441         )
442         if ip.nh == IP_PROTOS.tcp:
443             p = ip / TCP(buffer.getvalue())
444         elif ip.nh == IP_PROTOS.udp:
445             p = ip / UDP(buffer.getvalue())
446         self.logger.debug(ppp("Reassembled:", p))
447         self.assert_packet_checksums_valid(p)
448         return p
449
450     def verify_ipfix_max_bibs(self, data, limit):
451         """
452         Verify IPFIX maximum BIB entries exceeded event
453
454         :param data: Decoded IPFIX data records
455         :param limit: Number of maximum BIB entries that can be created.
456         """
457         self.assertEqual(1, len(data))
458         record = data[0]
459         # natEvent
460         self.assertEqual(scapy.compat.orb(record[230]), 13)
461         # natQuotaExceededEvent
462         self.assertEqual(struct.pack("I", 2), record[466])
463         # maxBIBEntries
464         self.assertEqual(struct.pack("I", limit), record[472])
465
466     def verify_ipfix_bib(self, data, is_create, src_addr):
467         """
468         Verify IPFIX NAT64 BIB create and delete events
469
470         :param data: Decoded IPFIX data records
471         :param is_create: Create event if nonzero value otherwise delete event
472         :param src_addr: IPv6 source address
473         """
474         self.assertEqual(1, len(data))
475         record = data[0]
476         # natEvent
477         if is_create:
478             self.assertEqual(scapy.compat.orb(record[230]), 10)
479         else:
480             self.assertEqual(scapy.compat.orb(record[230]), 11)
481         # sourceIPv6Address
482         self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
483         # postNATSourceIPv4Address
484         self.assertEqual(self.nat_addr_n, record[225])
485         # protocolIdentifier
486         self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
487         # ingressVRFID
488         self.assertEqual(struct.pack("!I", 0), record[234])
489         # sourceTransportPort
490         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
491         # postNAPTSourceTransportPort
492         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
493
494     def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr, dst_port):
495         """
496         Verify IPFIX NAT64 session create and delete events
497
498         :param data: Decoded IPFIX data records
499         :param is_create: Create event if nonzero value otherwise delete event
500         :param src_addr: IPv6 source address
501         :param dst_addr: IPv4 destination address
502         :param dst_port: destination TCP port
503         """
504         self.assertEqual(1, len(data))
505         record = data[0]
506         # natEvent
507         if is_create:
508             self.assertEqual(scapy.compat.orb(record[230]), 6)
509         else:
510             self.assertEqual(scapy.compat.orb(record[230]), 7)
511         # sourceIPv6Address
512         self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
513         # destinationIPv6Address
514         self.assertEqual(
515             socket.inet_pton(
516                 socket.AF_INET6, self.compose_ip6(dst_addr, "64:ff9b::", 96)
517             ),
518             record[28],
519         )
520         # postNATSourceIPv4Address
521         self.assertEqual(self.nat_addr_n, record[225])
522         # postNATDestinationIPv4Address
523         self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr), record[226])
524         # protocolIdentifier
525         self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
526         # ingressVRFID
527         self.assertEqual(struct.pack("!I", 0), record[234])
528         # sourceTransportPort
529         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
530         # postNAPTSourceTransportPort
531         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
532         # destinationTransportPort
533         self.assertEqual(struct.pack("!H", dst_port), record[11])
534         # postNAPTDestinationTransportPort
535         self.assertEqual(struct.pack("!H", dst_port), record[228])
536
537     def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
538         message = data.decode("utf-8")
539         try:
540             message = SyslogMessage.parse(message)
541         except ParseError as e:
542             self.logger.error(e)
543             raise
544         else:
545             self.assertEqual(message.severity, SyslogSeverity.info)
546             self.assertEqual(message.appname, "NAT")
547             self.assertEqual(message.msgid, "SADD" if is_add else "SDEL")
548             sd_params = message.sd.get("nsess")
549             self.assertTrue(sd_params is not None)
550             if is_ip6:
551                 self.assertEqual(sd_params.get("IATYP"), "IPv6")
552                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip6)
553             else:
554                 self.assertEqual(sd_params.get("IATYP"), "IPv4")
555                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
556                 self.assertTrue(sd_params.get("SSUBIX") is not None)
557             self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
558             self.assertEqual(sd_params.get("XATYP"), "IPv4")
559             self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
560             self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
561             self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
562             self.assertEqual(sd_params.get("SVLAN"), "0")
563             self.assertEqual(sd_params.get("XDADDR"), self.pg1.remote_ip4)
564             self.assertEqual(sd_params.get("XDPORT"), "%d" % self.tcp_external_port)
565
566     def compose_ip6(self, ip4, pref, plen):
567         """
568         Compose IPv4-embedded IPv6 addresses
569
570         :param ip4: IPv4 address
571         :param pref: IPv6 prefix
572         :param plen: IPv6 prefix length
573         :returns: IPv4-embedded IPv6 addresses
574         """
575         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
576         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
577         if plen == 32:
578             pref_n[4] = ip4_n[0]
579             pref_n[5] = ip4_n[1]
580             pref_n[6] = ip4_n[2]
581             pref_n[7] = ip4_n[3]
582         elif plen == 40:
583             pref_n[5] = ip4_n[0]
584             pref_n[6] = ip4_n[1]
585             pref_n[7] = ip4_n[2]
586             pref_n[9] = ip4_n[3]
587         elif plen == 48:
588             pref_n[6] = ip4_n[0]
589             pref_n[7] = ip4_n[1]
590             pref_n[9] = ip4_n[2]
591             pref_n[10] = ip4_n[3]
592         elif plen == 56:
593             pref_n[7] = ip4_n[0]
594             pref_n[9] = ip4_n[1]
595             pref_n[10] = ip4_n[2]
596             pref_n[11] = ip4_n[3]
597         elif plen == 64:
598             pref_n[9] = ip4_n[0]
599             pref_n[10] = ip4_n[1]
600             pref_n[11] = ip4_n[2]
601             pref_n[12] = ip4_n[3]
602         elif plen == 96:
603             pref_n[12] = ip4_n[0]
604             pref_n[13] = ip4_n[1]
605             pref_n[14] = ip4_n[2]
606             pref_n[15] = ip4_n[3]
607         packed_pref_n = b"".join([scapy.compat.chb(x) for x in pref_n])
608         return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
609
610     def verify_ipfix_max_sessions(self, data, limit):
611         """
612         Verify IPFIX maximum session entries exceeded event
613
614         :param data: Decoded IPFIX data records
615         :param limit: Number of maximum session entries that can be created.
616         """
617         self.assertEqual(1, len(data))
618         record = data[0]
619         # natEvent
620         self.assertEqual(scapy.compat.orb(record[230]), 13)
621         # natQuotaExceededEvent
622         self.assertEqual(struct.pack("I", 1), record[466])
623         # maxSessionEntries
624         self.assertEqual(struct.pack("I", limit), record[471])
625
626     def test_nat64_inside_interface_handles_neighbor_advertisement(self):
627         """NAT64 inside interface handles Neighbor Advertisement"""
628
629         flags = self.config_flags.NAT_IS_INSIDE
630         self.vapi.nat64_add_del_interface(
631             is_add=1, flags=flags, sw_if_index=self.pg5.sw_if_index
632         )
633
634         # Try to send ping
635         ping = (
636             Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac)
637             / IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6)
638             / ICMPv6EchoRequest()
639         )
640         pkts = [ping]
641         self.pg5.add_stream(pkts)
642         self.pg_enable_capture(self.pg_interfaces)
643         self.pg_start()
644
645         # Wait for Neighbor Solicitation
646         capture = self.pg5.get_capture(len(pkts))
647         packet = capture[0]
648         try:
649             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
650             self.assertEqual(packet.haslayer(ICMPv6ND_NS), 1)
651             tgt = packet[ICMPv6ND_NS].tgt
652         except:
653             self.logger.error(ppp("Unexpected or invalid packet:", packet))
654             raise
655
656         # Send Neighbor Advertisement
657         p = (
658             Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac)
659             / IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6)
660             / ICMPv6ND_NA(tgt=tgt)
661             / ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac)
662         )
663         pkts = [p]
664         self.pg5.add_stream(pkts)
665         self.pg_enable_capture(self.pg_interfaces)
666         self.pg_start()
667
668         # Try to send ping again
669         pkts = [ping]
670         self.pg5.add_stream(pkts)
671         self.pg_enable_capture(self.pg_interfaces)
672         self.pg_start()
673
674         # Wait for ping reply
675         capture = self.pg5.get_capture(len(pkts))
676         packet = capture[0]
677         try:
678             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
679             self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
680             self.assertEqual(packet.haslayer(ICMPv6EchoReply), 1)
681         except:
682             self.logger.error(ppp("Unexpected or invalid packet:", packet))
683             raise
684
685     def test_pool(self):
686         """Add/delete address to NAT64 pool"""
687         nat_addr = "1.2.3.4"
688
689         self.vapi.nat64_add_del_pool_addr_range(
690             start_addr=nat_addr, end_addr=nat_addr, vrf_id=0xFFFFFFFF, is_add=1
691         )
692
693         addresses = self.vapi.nat64_pool_addr_dump()
694         self.assertEqual(len(addresses), 1)
695         self.assertEqual(str(addresses[0].address), nat_addr)
696
697         self.vapi.nat64_add_del_pool_addr_range(
698             start_addr=nat_addr, end_addr=nat_addr, vrf_id=0xFFFFFFFF, is_add=0
699         )
700
701         addresses = self.vapi.nat64_pool_addr_dump()
702         self.assertEqual(len(addresses), 0)
703
704     def test_interface(self):
705         """Enable/disable NAT64 feature on the interface"""
706         flags = self.config_flags.NAT_IS_INSIDE
707         self.vapi.nat64_add_del_interface(
708             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
709         )
710         self.vapi.nat64_add_del_interface(
711             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
712         )
713
714         interfaces = self.vapi.nat64_interface_dump()
715         self.assertEqual(len(interfaces), 2)
716         pg0_found = False
717         pg1_found = False
718         for intf in interfaces:
719             if intf.sw_if_index == self.pg0.sw_if_index:
720                 self.assertEqual(intf.flags, self.config_flags.NAT_IS_INSIDE)
721                 pg0_found = True
722             elif intf.sw_if_index == self.pg1.sw_if_index:
723                 self.assertEqual(intf.flags, self.config_flags.NAT_IS_OUTSIDE)
724                 pg1_found = True
725         self.assertTrue(pg0_found)
726         self.assertTrue(pg1_found)
727
728         features = self.vapi.cli("show interface features pg0")
729         self.assertIn("nat64-in2out", features)
730         features = self.vapi.cli("show interface features pg1")
731         self.assertIn("nat64-out2in", features)
732
733         self.vapi.nat64_add_del_interface(
734             is_add=0, flags=flags, sw_if_index=self.pg0.sw_if_index
735         )
736         self.vapi.nat64_add_del_interface(
737             is_add=0, flags=flags, sw_if_index=self.pg1.sw_if_index
738         )
739
740         interfaces = self.vapi.nat64_interface_dump()
741         self.assertEqual(len(interfaces), 0)
742
743     def test_static_bib(self):
744         """Add/delete static BIB entry"""
745         in_addr = "2001:db8:85a3::8a2e:370:7334"
746         out_addr = "10.1.1.3"
747         in_port = 1234
748         out_port = 5678
749         proto = IP_PROTOS.tcp
750
751         self.vapi.nat64_add_del_static_bib(
752             i_addr=in_addr,
753             o_addr=out_addr,
754             i_port=in_port,
755             o_port=out_port,
756             proto=proto,
757             vrf_id=0,
758             is_add=1,
759         )
760         bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
761         static_bib_num = 0
762         for bibe in bib:
763             if bibe.flags & self.config_flags.NAT_IS_STATIC:
764                 static_bib_num += 1
765                 self.assertEqual(str(bibe.i_addr), in_addr)
766                 self.assertEqual(str(bibe.o_addr), out_addr)
767                 self.assertEqual(bibe.i_port, in_port)
768                 self.assertEqual(bibe.o_port, out_port)
769         self.assertEqual(static_bib_num, 1)
770         bibs = self.statistics.get_counter("/nat64/total-bibs")
771         self.assertEqual(bibs[0][0], 1)
772
773         self.vapi.nat64_add_del_static_bib(
774             i_addr=in_addr,
775             o_addr=out_addr,
776             i_port=in_port,
777             o_port=out_port,
778             proto=proto,
779             vrf_id=0,
780             is_add=0,
781         )
782         bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
783         static_bib_num = 0
784         for bibe in bib:
785             if bibe.flags & self.config_flags.NAT_IS_STATIC:
786                 static_bib_num += 1
787         self.assertEqual(static_bib_num, 0)
788         bibs = self.statistics.get_counter("/nat64/total-bibs")
789         self.assertEqual(bibs[0][0], 0)
790
791     def test_set_timeouts(self):
792         """Set NAT64 timeouts"""
793         # verify default values
794         timeouts = self.vapi.nat64_get_timeouts()
795         self.assertEqual(timeouts.udp, 300)
796         self.assertEqual(timeouts.icmp, 60)
797         self.assertEqual(timeouts.tcp_transitory, 240)
798         self.assertEqual(timeouts.tcp_established, 7440)
799
800         # set and verify custom values
801         self.vapi.nat64_set_timeouts(
802             udp=200, tcp_established=7450, tcp_transitory=250, icmp=30
803         )
804         timeouts = self.vapi.nat64_get_timeouts()
805         self.assertEqual(timeouts.udp, 200)
806         self.assertEqual(timeouts.icmp, 30)
807         self.assertEqual(timeouts.tcp_transitory, 250)
808         self.assertEqual(timeouts.tcp_established, 7450)
809
810     def test_dynamic(self):
811         """NAT64 dynamic translation test"""
812         self.tcp_port_in = 6303
813         self.udp_port_in = 6304
814         self.icmp_id_in = 6305
815
816         ses_num_start = self.nat64_get_ses_num()
817
818         self.vapi.nat64_add_del_pool_addr_range(
819             start_addr=self.nat_addr,
820             end_addr=self.nat_addr,
821             vrf_id=0xFFFFFFFF,
822             is_add=1,
823         )
824         flags = self.config_flags.NAT_IS_INSIDE
825         self.vapi.nat64_add_del_interface(
826             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
827         )
828         self.vapi.nat64_add_del_interface(
829             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
830         )
831
832         # in2out
833         tcpn = self.statistics.get_counter("/nat64/in2out/tcp")[0]
834         udpn = self.statistics.get_counter("/nat64/in2out/udp")[0]
835         icmpn = self.statistics.get_counter("/nat64/in2out/icmp")[0]
836         drops = self.statistics.get_counter("/nat64/in2out/drops")[0]
837
838         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
839         self.pg0.add_stream(pkts)
840         self.pg_enable_capture(self.pg_interfaces)
841         self.pg_start()
842         capture = self.pg1.get_capture(len(pkts))
843         self.verify_capture_out(
844             capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
845         )
846
847         if_idx = self.pg0.sw_if_index
848         cnt = self.statistics.get_counter("/nat64/in2out/tcp")[0]
849         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 1)
850         cnt = self.statistics.get_counter("/nat64/in2out/udp")[0]
851         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
852         cnt = self.statistics.get_counter("/nat64/in2out/icmp")[0]
853         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
854         cnt = self.statistics.get_counter("/nat64/in2out/drops")[0]
855         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
856
857         # out2in
858         tcpn = self.statistics.get_counter("/nat64/out2in/tcp")[0]
859         udpn = self.statistics.get_counter("/nat64/out2in/udp")[0]
860         icmpn = self.statistics.get_counter("/nat64/out2in/icmp")[0]
861         drops = self.statistics.get_counter("/nat64/out2in/drops")[0]
862
863         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
864         self.pg1.add_stream(pkts)
865         self.pg_enable_capture(self.pg_interfaces)
866         self.pg_start()
867         capture = self.pg0.get_capture(len(pkts))
868         ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4]))
869         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
870
871         if_idx = self.pg1.sw_if_index
872         cnt = self.statistics.get_counter("/nat64/out2in/tcp")[0]
873         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
874         cnt = self.statistics.get_counter("/nat64/out2in/udp")[0]
875         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
876         cnt = self.statistics.get_counter("/nat64/out2in/icmp")[0]
877         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
878         cnt = self.statistics.get_counter("/nat64/out2in/drops")[0]
879         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
880
881         bibs = self.statistics.get_counter("/nat64/total-bibs")
882         self.assertEqual(bibs[0][0], 3)
883         sessions = self.statistics.get_counter("/nat64/total-sessions")
884         self.assertEqual(sessions[0][0], 3)
885
886         # in2out
887         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
888         self.pg0.add_stream(pkts)
889         self.pg_enable_capture(self.pg_interfaces)
890         self.pg_start()
891         capture = self.pg1.get_capture(len(pkts))
892         self.verify_capture_out(
893             capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
894         )
895
896         # out2in
897         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
898         self.pg1.add_stream(pkts)
899         self.pg_enable_capture(self.pg_interfaces)
900         self.pg_start()
901         capture = self.pg0.get_capture(len(pkts))
902         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
903
904         ses_num_end = self.nat64_get_ses_num()
905
906         self.assertEqual(ses_num_end - ses_num_start, 3)
907
908         # tenant with specific VRF
909         self.vapi.nat64_add_del_pool_addr_range(
910             start_addr=self.vrf1_nat_addr,
911             end_addr=self.vrf1_nat_addr,
912             vrf_id=self.vrf1_id,
913             is_add=1,
914         )
915         flags = self.config_flags.NAT_IS_INSIDE
916         self.vapi.nat64_add_del_interface(
917             is_add=1, flags=flags, sw_if_index=self.pg2.sw_if_index
918         )
919
920         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
921         self.pg2.add_stream(pkts)
922         self.pg_enable_capture(self.pg_interfaces)
923         self.pg_start()
924         capture = self.pg1.get_capture(len(pkts))
925         self.verify_capture_out(
926             capture, nat_ip=self.vrf1_nat_addr, dst_ip=self.pg1.remote_ip4
927         )
928
929         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
930         self.pg1.add_stream(pkts)
931         self.pg_enable_capture(self.pg_interfaces)
932         self.pg_start()
933         capture = self.pg2.get_capture(len(pkts))
934         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
935
936     def test_static(self):
937         """NAT64 static translation test"""
938         self.tcp_port_in = 60303
939         self.udp_port_in = 60304
940         self.icmp_id_in = 60305
941         self.tcp_port_out = 60303
942         self.udp_port_out = 60304
943         self.icmp_id_out = 60305
944
945         ses_num_start = self.nat64_get_ses_num()
946
947         self.vapi.nat64_add_del_pool_addr_range(
948             start_addr=self.nat_addr,
949             end_addr=self.nat_addr,
950             vrf_id=0xFFFFFFFF,
951             is_add=1,
952         )
953         flags = self.config_flags.NAT_IS_INSIDE
954         self.vapi.nat64_add_del_interface(
955             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
956         )
957         self.vapi.nat64_add_del_interface(
958             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
959         )
960
961         self.vapi.nat64_add_del_static_bib(
962             i_addr=self.pg0.remote_ip6,
963             o_addr=self.nat_addr,
964             i_port=self.tcp_port_in,
965             o_port=self.tcp_port_out,
966             proto=IP_PROTOS.tcp,
967             vrf_id=0,
968             is_add=1,
969         )
970         self.vapi.nat64_add_del_static_bib(
971             i_addr=self.pg0.remote_ip6,
972             o_addr=self.nat_addr,
973             i_port=self.udp_port_in,
974             o_port=self.udp_port_out,
975             proto=IP_PROTOS.udp,
976             vrf_id=0,
977             is_add=1,
978         )
979         self.vapi.nat64_add_del_static_bib(
980             i_addr=self.pg0.remote_ip6,
981             o_addr=self.nat_addr,
982             i_port=self.icmp_id_in,
983             o_port=self.icmp_id_out,
984             proto=IP_PROTOS.icmp,
985             vrf_id=0,
986             is_add=1,
987         )
988
989         # in2out
990         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
991         self.pg0.add_stream(pkts)
992         self.pg_enable_capture(self.pg_interfaces)
993         self.pg_start()
994         capture = self.pg1.get_capture(len(pkts))
995         self.verify_capture_out(
996             capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4, same_port=True
997         )
998
999         # out2in
1000         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1001         self.pg1.add_stream(pkts)
1002         self.pg_enable_capture(self.pg_interfaces)
1003         self.pg_start()
1004         capture = self.pg0.get_capture(len(pkts))
1005         ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4]))
1006         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
1007
1008         ses_num_end = self.nat64_get_ses_num()
1009
1010         self.assertEqual(ses_num_end - ses_num_start, 3)
1011
1012     def test_session_timeout(self):
1013         """NAT64 session timeout"""
1014         self.icmp_id_in = 1234
1015         self.vapi.nat64_add_del_pool_addr_range(
1016             start_addr=self.nat_addr,
1017             end_addr=self.nat_addr,
1018             vrf_id=0xFFFFFFFF,
1019             is_add=1,
1020         )
1021         flags = self.config_flags.NAT_IS_INSIDE
1022         self.vapi.nat64_add_del_interface(
1023             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1024         )
1025         self.vapi.nat64_add_del_interface(
1026             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1027         )
1028         self.vapi.nat64_set_timeouts(
1029             udp=300, tcp_established=5, tcp_transitory=5, icmp=5
1030         )
1031
1032         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
1033         self.pg0.add_stream(pkts)
1034         self.pg_enable_capture(self.pg_interfaces)
1035         self.pg_start()
1036         capture = self.pg1.get_capture(len(pkts))
1037
1038         ses_num_before_timeout = self.nat64_get_ses_num()
1039
1040         self.virtual_sleep(15)
1041
1042         # ICMP and TCP session after timeout
1043         ses_num_after_timeout = self.nat64_get_ses_num()
1044         self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
1045
1046     def test_icmp_error(self):
1047         """NAT64 ICMP Error message translation"""
1048         self.tcp_port_in = 6303
1049         self.udp_port_in = 6304
1050         self.icmp_id_in = 6305
1051
1052         self.vapi.nat64_add_del_pool_addr_range(
1053             start_addr=self.nat_addr,
1054             end_addr=self.nat_addr,
1055             vrf_id=0xFFFFFFFF,
1056             is_add=1,
1057         )
1058         flags = self.config_flags.NAT_IS_INSIDE
1059         self.vapi.nat64_add_del_interface(
1060             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1061         )
1062         self.vapi.nat64_add_del_interface(
1063             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1064         )
1065
1066         # send some packets to create sessions
1067         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
1068         self.pg0.add_stream(pkts)
1069         self.pg_enable_capture(self.pg_interfaces)
1070         self.pg_start()
1071         capture_ip4 = self.pg1.get_capture(len(pkts))
1072         self.verify_capture_out(
1073             capture_ip4, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
1074         )
1075
1076         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1077         self.pg1.add_stream(pkts)
1078         self.pg_enable_capture(self.pg_interfaces)
1079         self.pg_start()
1080         capture_ip6 = self.pg0.get_capture(len(pkts))
1081         ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4]))
1082         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src, self.pg0.remote_ip6)
1083
1084         # in2out
1085         pkts = [
1086             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1087             / IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src)
1088             / ICMPv6DestUnreach(code=1)
1089             / packet[IPv6]
1090             for packet in capture_ip6
1091         ]
1092         self.pg0.add_stream(pkts)
1093         self.pg_enable_capture(self.pg_interfaces)
1094         self.pg_start()
1095         capture = self.pg1.get_capture(len(pkts))
1096         for packet in capture:
1097             try:
1098                 self.assertEqual(packet[IP].src, self.nat_addr)
1099                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1100                 self.assertEqual(packet[ICMP].type, 3)
1101                 self.assertEqual(packet[ICMP].code, 13)
1102                 inner = packet[IPerror]
1103                 self.assertEqual(inner.src, self.pg1.remote_ip4)
1104                 self.assertEqual(inner.dst, self.nat_addr)
1105                 self.assert_packet_checksums_valid(packet)
1106                 if inner.haslayer(TCPerror):
1107                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
1108                 elif inner.haslayer(UDPerror):
1109                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
1110                 else:
1111                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
1112             except:
1113                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1114                 raise
1115
1116         # out2in
1117         pkts = [
1118             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1119             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1120             / ICMP(type=3, code=13)
1121             / packet[IP]
1122             for packet in capture_ip4
1123         ]
1124         self.pg1.add_stream(pkts)
1125         self.pg_enable_capture(self.pg_interfaces)
1126         self.pg_start()
1127         capture = self.pg0.get_capture(len(pkts))
1128         for packet in capture:
1129             try:
1130                 self.assertEqual(packet[IPv6].src, ip.src)
1131                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1132                 icmp = packet[ICMPv6DestUnreach]
1133                 self.assertEqual(icmp.code, 1)
1134                 inner = icmp[IPerror6]
1135                 self.assertEqual(inner.src, self.pg0.remote_ip6)
1136                 self.assertEqual(inner.dst, ip.src)
1137                 self.assert_icmpv6_checksum_valid(packet)
1138                 if inner.haslayer(TCPerror):
1139                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
1140                 elif inner.haslayer(UDPerror):
1141                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
1142                 else:
1143                     self.assertEqual(inner[ICMPv6EchoRequest].id, self.icmp_id_in)
1144             except:
1145                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1146                 raise
1147
1148     def test_hairpinning(self):
1149         """NAT64 hairpinning"""
1150
1151         client = self.pg0.remote_hosts[0]
1152         server = self.pg0.remote_hosts[1]
1153         server_tcp_in_port = 22
1154         server_tcp_out_port = 4022
1155         server_udp_in_port = 23
1156         server_udp_out_port = 4023
1157         client_tcp_in_port = 1234
1158         client_udp_in_port = 1235
1159         client_tcp_out_port = 0
1160         client_udp_out_port = 0
1161         ip = IPv6(src="".join(["64:ff9b::", self.nat_addr]))
1162         nat_addr_ip6 = ip.src
1163
1164         self.vapi.nat64_add_del_pool_addr_range(
1165             start_addr=self.nat_addr,
1166             end_addr=self.nat_addr,
1167             vrf_id=0xFFFFFFFF,
1168             is_add=1,
1169         )
1170         flags = self.config_flags.NAT_IS_INSIDE
1171         self.vapi.nat64_add_del_interface(
1172             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1173         )
1174         self.vapi.nat64_add_del_interface(
1175             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1176         )
1177
1178         self.vapi.nat64_add_del_static_bib(
1179             i_addr=server.ip6n,
1180             o_addr=self.nat_addr,
1181             i_port=server_tcp_in_port,
1182             o_port=server_tcp_out_port,
1183             proto=IP_PROTOS.tcp,
1184             vrf_id=0,
1185             is_add=1,
1186         )
1187         self.vapi.nat64_add_del_static_bib(
1188             i_addr=server.ip6n,
1189             o_addr=self.nat_addr,
1190             i_port=server_udp_in_port,
1191             o_port=server_udp_out_port,
1192             proto=IP_PROTOS.udp,
1193             vrf_id=0,
1194             is_add=1,
1195         )
1196
1197         # client to server
1198         pkts = []
1199         p = (
1200             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1201             / IPv6(src=client.ip6, dst=nat_addr_ip6)
1202             / TCP(sport=client_tcp_in_port, dport=server_tcp_out_port)
1203         )
1204         pkts.append(p)
1205         p = (
1206             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1207             / IPv6(src=client.ip6, dst=nat_addr_ip6)
1208             / UDP(sport=client_udp_in_port, dport=server_udp_out_port)
1209         )
1210         pkts.append(p)
1211         self.pg0.add_stream(pkts)
1212         self.pg_enable_capture(self.pg_interfaces)
1213         self.pg_start()
1214         capture = self.pg0.get_capture(len(pkts))
1215         for packet in capture:
1216             try:
1217                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1218                 self.assertEqual(packet[IPv6].dst, server.ip6)
1219                 self.assert_packet_checksums_valid(packet)
1220                 if packet.haslayer(TCP):
1221                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
1222                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
1223                     client_tcp_out_port = packet[TCP].sport
1224                 else:
1225                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
1226                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
1227                     client_udp_out_port = packet[UDP].sport
1228             except:
1229                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1230                 raise
1231
1232         # server to client
1233         pkts = []
1234         p = (
1235             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1236             / IPv6(src=server.ip6, dst=nat_addr_ip6)
1237             / TCP(sport=server_tcp_in_port, dport=client_tcp_out_port)
1238         )
1239         pkts.append(p)
1240         p = (
1241             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1242             / IPv6(src=server.ip6, dst=nat_addr_ip6)
1243             / UDP(sport=server_udp_in_port, dport=client_udp_out_port)
1244         )
1245         pkts.append(p)
1246         self.pg0.add_stream(pkts)
1247         self.pg_enable_capture(self.pg_interfaces)
1248         self.pg_start()
1249         capture = self.pg0.get_capture(len(pkts))
1250         for packet in capture:
1251             try:
1252                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1253                 self.assertEqual(packet[IPv6].dst, client.ip6)
1254                 self.assert_packet_checksums_valid(packet)
1255                 if packet.haslayer(TCP):
1256                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
1257                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
1258                 else:
1259                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
1260                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
1261             except:
1262                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1263                 raise
1264
1265         # ICMP error
1266         pkts = []
1267         pkts = [
1268             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1269             / IPv6(src=client.ip6, dst=nat_addr_ip6)
1270             / ICMPv6DestUnreach(code=1)
1271             / packet[IPv6]
1272             for packet in capture
1273         ]
1274         self.pg0.add_stream(pkts)
1275         self.pg_enable_capture(self.pg_interfaces)
1276         self.pg_start()
1277         capture = self.pg0.get_capture(len(pkts))
1278         for packet in capture:
1279             try:
1280                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1281                 self.assertEqual(packet[IPv6].dst, server.ip6)
1282                 icmp = packet[ICMPv6DestUnreach]
1283                 self.assertEqual(icmp.code, 1)
1284                 inner = icmp[IPerror6]
1285                 self.assertEqual(inner.src, server.ip6)
1286                 self.assertEqual(inner.dst, nat_addr_ip6)
1287                 self.assert_packet_checksums_valid(packet)
1288                 if inner.haslayer(TCPerror):
1289                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
1290                     self.assertEqual(inner[TCPerror].dport, client_tcp_out_port)
1291                 else:
1292                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
1293                     self.assertEqual(inner[UDPerror].dport, client_udp_out_port)
1294             except:
1295                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1296                 raise
1297
1298     def test_prefix(self):
1299         """NAT64 Network-Specific Prefix"""
1300
1301         self.vapi.nat64_add_del_pool_addr_range(
1302             start_addr=self.nat_addr,
1303             end_addr=self.nat_addr,
1304             vrf_id=0xFFFFFFFF,
1305             is_add=1,
1306         )
1307         flags = self.config_flags.NAT_IS_INSIDE
1308         self.vapi.nat64_add_del_interface(
1309             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1310         )
1311         self.vapi.nat64_add_del_interface(
1312             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1313         )
1314         self.vapi.nat64_add_del_pool_addr_range(
1315             start_addr=self.vrf1_nat_addr,
1316             end_addr=self.vrf1_nat_addr,
1317             vrf_id=self.vrf1_id,
1318             is_add=1,
1319         )
1320         self.vapi.nat64_add_del_interface(
1321             is_add=1, flags=flags, sw_if_index=self.pg2.sw_if_index
1322         )
1323
1324         # Add global prefix
1325         global_pref64 = "2001:db8::"
1326         global_pref64_len = 32
1327         global_pref64_str = "{}/{}".format(global_pref64, global_pref64_len)
1328         self.vapi.nat64_add_del_prefix(prefix=global_pref64_str, vrf_id=0, is_add=1)
1329
1330         prefix = self.vapi.nat64_prefix_dump()
1331         self.assertEqual(len(prefix), 1)
1332         self.assertEqual(str(prefix[0].prefix), global_pref64_str)
1333         self.assertEqual(prefix[0].vrf_id, 0)
1334
1335         # Add tenant specific prefix
1336         vrf1_pref64 = "2001:db8:122:300::"
1337         vrf1_pref64_len = 56
1338         vrf1_pref64_str = "{}/{}".format(vrf1_pref64, vrf1_pref64_len)
1339         self.vapi.nat64_add_del_prefix(
1340             prefix=vrf1_pref64_str, vrf_id=self.vrf1_id, is_add=1
1341         )
1342
1343         prefix = self.vapi.nat64_prefix_dump()
1344         self.assertEqual(len(prefix), 2)
1345
1346         # Global prefix
1347         pkts = self.create_stream_in_ip6(
1348             self.pg0, self.pg1, pref=global_pref64, plen=global_pref64_len
1349         )
1350         self.pg0.add_stream(pkts)
1351         self.pg_enable_capture(self.pg_interfaces)
1352         self.pg_start()
1353         capture = self.pg1.get_capture(len(pkts))
1354         self.verify_capture_out(
1355             capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
1356         )
1357
1358         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1359         self.pg1.add_stream(pkts)
1360         self.pg_enable_capture(self.pg_interfaces)
1361         self.pg_start()
1362         capture = self.pg0.get_capture(len(pkts))
1363         dst_ip = self.compose_ip6(self.pg1.remote_ip4, global_pref64, global_pref64_len)
1364         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
1365
1366         # Tenant specific prefix
1367         pkts = self.create_stream_in_ip6(
1368             self.pg2, self.pg1, pref=vrf1_pref64, plen=vrf1_pref64_len
1369         )
1370         self.pg2.add_stream(pkts)
1371         self.pg_enable_capture(self.pg_interfaces)
1372         self.pg_start()
1373         capture = self.pg1.get_capture(len(pkts))
1374         self.verify_capture_out(
1375             capture, nat_ip=self.vrf1_nat_addr, dst_ip=self.pg1.remote_ip4
1376         )
1377
1378         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
1379         self.pg1.add_stream(pkts)
1380         self.pg_enable_capture(self.pg_interfaces)
1381         self.pg_start()
1382         capture = self.pg2.get_capture(len(pkts))
1383         dst_ip = self.compose_ip6(self.pg1.remote_ip4, vrf1_pref64, vrf1_pref64_len)
1384         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
1385
1386     def test_unknown_proto(self):
1387         """NAT64 translate packet with unknown protocol"""
1388
1389         self.vapi.nat64_add_del_pool_addr_range(
1390             start_addr=self.nat_addr,
1391             end_addr=self.nat_addr,
1392             vrf_id=0xFFFFFFFF,
1393             is_add=1,
1394         )
1395         flags = self.config_flags.NAT_IS_INSIDE
1396         self.vapi.nat64_add_del_interface(
1397             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1398         )
1399         self.vapi.nat64_add_del_interface(
1400             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1401         )
1402         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1403
1404         # in2out
1405         p = (
1406             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1407             / IPv6(src=self.pg0.remote_ip6, dst=remote_ip6)
1408             / TCP(sport=self.tcp_port_in, dport=20)
1409         )
1410         self.pg0.add_stream(p)
1411         self.pg_enable_capture(self.pg_interfaces)
1412         self.pg_start()
1413         p = self.pg1.get_capture(1)
1414
1415         p = (
1416             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1417             / IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47)
1418             / GRE()
1419             / IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4)
1420             / TCP(sport=1234, dport=1234)
1421         )
1422         self.pg0.add_stream(p)
1423         self.pg_enable_capture(self.pg_interfaces)
1424         self.pg_start()
1425         p = self.pg1.get_capture(1)
1426         packet = p[0]
1427         try:
1428             self.assertEqual(packet[IP].src, self.nat_addr)
1429             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1430             self.assertEqual(packet.haslayer(GRE), 1)
1431             self.assert_packet_checksums_valid(packet)
1432         except:
1433             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1434             raise
1435
1436         # out2in
1437         p = (
1438             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1439             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1440             / GRE()
1441             / IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4)
1442             / TCP(sport=1234, dport=1234)
1443         )
1444         self.pg1.add_stream(p)
1445         self.pg_enable_capture(self.pg_interfaces)
1446         self.pg_start()
1447         p = self.pg0.get_capture(1)
1448         packet = p[0]
1449         try:
1450             self.assertEqual(packet[IPv6].src, remote_ip6)
1451             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1452             self.assertEqual(packet[IPv6].nh, 47)
1453         except:
1454             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1455             raise
1456
1457     def test_hairpinning_unknown_proto(self):
1458         """NAT64 translate packet with unknown protocol - hairpinning"""
1459
1460         client = self.pg0.remote_hosts[0]
1461         server = self.pg0.remote_hosts[1]
1462         server_tcp_in_port = 22
1463         server_tcp_out_port = 4022
1464         client_tcp_in_port = 1234
1465         client_tcp_out_port = 1235
1466         server_nat_ip = "10.0.0.100"
1467         client_nat_ip = "10.0.0.110"
1468         server_nat_ip6 = self.compose_ip6(server_nat_ip, "64:ff9b::", 96)
1469         client_nat_ip6 = self.compose_ip6(client_nat_ip, "64:ff9b::", 96)
1470
1471         self.vapi.nat64_add_del_pool_addr_range(
1472             start_addr=server_nat_ip,
1473             end_addr=client_nat_ip,
1474             vrf_id=0xFFFFFFFF,
1475             is_add=1,
1476         )
1477         flags = self.config_flags.NAT_IS_INSIDE
1478         self.vapi.nat64_add_del_interface(
1479             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1480         )
1481         self.vapi.nat64_add_del_interface(
1482             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1483         )
1484
1485         self.vapi.nat64_add_del_static_bib(
1486             i_addr=server.ip6n,
1487             o_addr=server_nat_ip,
1488             i_port=server_tcp_in_port,
1489             o_port=server_tcp_out_port,
1490             proto=IP_PROTOS.tcp,
1491             vrf_id=0,
1492             is_add=1,
1493         )
1494
1495         self.vapi.nat64_add_del_static_bib(
1496             i_addr=server.ip6n,
1497             o_addr=server_nat_ip,
1498             i_port=0,
1499             o_port=0,
1500             proto=IP_PROTOS.gre,
1501             vrf_id=0,
1502             is_add=1,
1503         )
1504
1505         self.vapi.nat64_add_del_static_bib(
1506             i_addr=client.ip6n,
1507             o_addr=client_nat_ip,
1508             i_port=client_tcp_in_port,
1509             o_port=client_tcp_out_port,
1510             proto=IP_PROTOS.tcp,
1511             vrf_id=0,
1512             is_add=1,
1513         )
1514
1515         # client to server
1516         p = (
1517             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1518             / IPv6(src=client.ip6, dst=server_nat_ip6)
1519             / TCP(sport=client_tcp_in_port, dport=server_tcp_out_port)
1520         )
1521         self.pg0.add_stream(p)
1522         self.pg_enable_capture(self.pg_interfaces)
1523         self.pg_start()
1524         p = self.pg0.get_capture(1)
1525
1526         p = (
1527             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1528             / IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre)
1529             / GRE()
1530             / IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4)
1531             / TCP(sport=1234, dport=1234)
1532         )
1533         self.pg0.add_stream(p)
1534         self.pg_enable_capture(self.pg_interfaces)
1535         self.pg_start()
1536         p = self.pg0.get_capture(1)
1537         packet = p[0]
1538         try:
1539             self.assertEqual(packet[IPv6].src, client_nat_ip6)
1540             self.assertEqual(packet[IPv6].dst, server.ip6)
1541             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1542         except:
1543             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1544             raise
1545
1546         # server to client
1547         p = (
1548             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1549             / IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre)
1550             / GRE()
1551             / IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4)
1552             / TCP(sport=1234, dport=1234)
1553         )
1554         self.pg0.add_stream(p)
1555         self.pg_enable_capture(self.pg_interfaces)
1556         self.pg_start()
1557         p = self.pg0.get_capture(1)
1558         packet = p[0]
1559         try:
1560             self.assertEqual(packet[IPv6].src, server_nat_ip6)
1561             self.assertEqual(packet[IPv6].dst, client.ip6)
1562             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1563         except:
1564             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1565             raise
1566
1567     def test_one_armed_nat64(self):
1568         """One armed NAT64"""
1569         external_port = 0
1570         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4, "64:ff9b::", 96)
1571
1572         self.vapi.nat64_add_del_pool_addr_range(
1573             start_addr=self.nat_addr,
1574             end_addr=self.nat_addr,
1575             vrf_id=0xFFFFFFFF,
1576             is_add=1,
1577         )
1578         flags = self.config_flags.NAT_IS_INSIDE
1579         self.vapi.nat64_add_del_interface(
1580             is_add=1, flags=flags, sw_if_index=self.pg3.sw_if_index
1581         )
1582         self.vapi.nat64_add_del_interface(
1583             is_add=1, flags=0, sw_if_index=self.pg3.sw_if_index
1584         )
1585
1586         # in2out
1587         p = (
1588             Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac)
1589             / IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6)
1590             / TCP(sport=12345, dport=80)
1591         )
1592         self.pg3.add_stream(p)
1593         self.pg_enable_capture(self.pg_interfaces)
1594         self.pg_start()
1595         capture = self.pg3.get_capture(1)
1596         p = capture[0]
1597         try:
1598             ip = p[IP]
1599             tcp = p[TCP]
1600             self.assertEqual(ip.src, self.nat_addr)
1601             self.assertEqual(ip.dst, self.pg3.remote_ip4)
1602             self.assertNotEqual(tcp.sport, 12345)
1603             external_port = tcp.sport
1604             self.assertEqual(tcp.dport, 80)
1605             self.assert_packet_checksums_valid(p)
1606         except:
1607             self.logger.error(ppp("Unexpected or invalid packet:", p))
1608             raise
1609
1610         # out2in
1611         p = (
1612             Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac)
1613             / IP(src=self.pg3.remote_ip4, dst=self.nat_addr)
1614             / TCP(sport=80, dport=external_port)
1615         )
1616         self.pg3.add_stream(p)
1617         self.pg_enable_capture(self.pg_interfaces)
1618         self.pg_start()
1619         capture = self.pg3.get_capture(1)
1620         p = capture[0]
1621         try:
1622             ip = p[IPv6]
1623             tcp = p[TCP]
1624             self.assertEqual(ip.src, remote_host_ip6)
1625             self.assertEqual(ip.dst, self.pg3.remote_ip6)
1626             self.assertEqual(tcp.sport, 80)
1627             self.assertEqual(tcp.dport, 12345)
1628             self.assert_packet_checksums_valid(p)
1629         except:
1630             self.logger.error(ppp("Unexpected or invalid packet:", p))
1631             raise
1632
1633     def test_frag_in_order(self):
1634         """NAT64 translate fragments arriving in order"""
1635         self.tcp_port_in = random.randint(1025, 65535)
1636
1637         self.vapi.nat64_add_del_pool_addr_range(
1638             start_addr=self.nat_addr,
1639             end_addr=self.nat_addr,
1640             vrf_id=0xFFFFFFFF,
1641             is_add=1,
1642         )
1643         flags = self.config_flags.NAT_IS_INSIDE
1644         self.vapi.nat64_add_del_interface(
1645             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1646         )
1647         self.vapi.nat64_add_del_interface(
1648             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1649         )
1650
1651         # in2out
1652         data = b"a" * 200
1653         pkts = self.create_stream_frag_ip6(
1654             self.pg0, self.pg1.remote_ip4, self.tcp_port_in, 20, data
1655         )
1656         self.pg0.add_stream(pkts)
1657         self.pg_enable_capture(self.pg_interfaces)
1658         self.pg_start()
1659         frags = self.pg1.get_capture(len(pkts))
1660         p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
1661         self.assertEqual(p[TCP].dport, 20)
1662         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1663         self.tcp_port_out = p[TCP].sport
1664         self.assertEqual(data, p[Raw].load)
1665
1666         # out2in
1667         data = b"A" * 4 + b"b" * 16 + b"C" * 3
1668         pkts = self.create_stream_frag(
1669             self.pg1, self.nat_addr, 20, self.tcp_port_out, data
1670         )
1671         self.pg1.add_stream(pkts)
1672         self.pg_enable_capture(self.pg_interfaces)
1673         self.pg_start()
1674         frags = self.pg0.get_capture(len(pkts))
1675         self.logger.debug(ppc("Captured:", frags))
1676         src = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1677         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1678         self.assertEqual(p[TCP].sport, 20)
1679         self.assertEqual(p[TCP].dport, self.tcp_port_in)
1680         self.assertEqual(data, p[Raw].load)
1681
1682     def test_reass_hairpinning(self):
1683         """NAT64 fragments hairpinning"""
1684         data = b"a" * 200
1685         server = self.pg0.remote_hosts[1]
1686         server_in_port = random.randint(1025, 65535)
1687         server_out_port = random.randint(1025, 65535)
1688         client_in_port = random.randint(1025, 65535)
1689         ip = IPv6(src="".join(["64:ff9b::", self.nat_addr]))
1690         nat_addr_ip6 = ip.src
1691
1692         self.vapi.nat64_add_del_pool_addr_range(
1693             start_addr=self.nat_addr,
1694             end_addr=self.nat_addr,
1695             vrf_id=0xFFFFFFFF,
1696             is_add=1,
1697         )
1698         flags = self.config_flags.NAT_IS_INSIDE
1699         self.vapi.nat64_add_del_interface(
1700             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1701         )
1702         self.vapi.nat64_add_del_interface(
1703             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1704         )
1705
1706         # add static BIB entry for server
1707         self.vapi.nat64_add_del_static_bib(
1708             i_addr=server.ip6n,
1709             o_addr=self.nat_addr,
1710             i_port=server_in_port,
1711             o_port=server_out_port,
1712             proto=IP_PROTOS.tcp,
1713             vrf_id=0,
1714             is_add=1,
1715         )
1716
1717         # send packet from host to server
1718         pkts = self.create_stream_frag_ip6(
1719             self.pg0, self.nat_addr, client_in_port, server_out_port, data
1720         )
1721         self.pg0.add_stream(pkts)
1722         self.pg_enable_capture(self.pg_interfaces)
1723         self.pg_start()
1724         frags = self.pg0.get_capture(len(pkts))
1725         self.logger.debug(ppc("Captured:", frags))
1726         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
1727         self.assertNotEqual(p[TCP].sport, client_in_port)
1728         self.assertEqual(p[TCP].dport, server_in_port)
1729         self.assertEqual(data, p[Raw].load)
1730
1731     def test_frag_out_of_order(self):
1732         """NAT64 translate fragments arriving out of order"""
1733         self.tcp_port_in = random.randint(1025, 65535)
1734
1735         self.vapi.nat64_add_del_pool_addr_range(
1736             start_addr=self.nat_addr,
1737             end_addr=self.nat_addr,
1738             vrf_id=0xFFFFFFFF,
1739             is_add=1,
1740         )
1741         flags = self.config_flags.NAT_IS_INSIDE
1742         self.vapi.nat64_add_del_interface(
1743             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1744         )
1745         self.vapi.nat64_add_del_interface(
1746             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1747         )
1748
1749         # in2out
1750         data = b"a" * 200
1751         pkts = self.create_stream_frag_ip6(
1752             self.pg0, self.pg1.remote_ip4, self.tcp_port_in, 20, data
1753         )
1754         pkts.reverse()
1755         self.pg0.add_stream(pkts)
1756         self.pg_enable_capture(self.pg_interfaces)
1757         self.pg_start()
1758         frags = self.pg1.get_capture(len(pkts))
1759         p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
1760         self.assertEqual(p[TCP].dport, 20)
1761         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1762         self.tcp_port_out = p[TCP].sport
1763         self.assertEqual(data, p[Raw].load)
1764
1765         # out2in
1766         data = b"A" * 4 + b"B" * 16 + b"C" * 3
1767         pkts = self.create_stream_frag(
1768             self.pg1, self.nat_addr, 20, self.tcp_port_out, data
1769         )
1770         pkts.reverse()
1771         self.pg1.add_stream(pkts)
1772         self.pg_enable_capture(self.pg_interfaces)
1773         self.pg_start()
1774         frags = self.pg0.get_capture(len(pkts))
1775         src = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1776         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1777         self.assertEqual(p[TCP].sport, 20)
1778         self.assertEqual(p[TCP].dport, self.tcp_port_in)
1779         self.assertEqual(data, p[Raw].load)
1780
1781     def test_interface_addr(self):
1782         """Acquire NAT64 pool addresses from interface"""
1783         self.vapi.nat64_add_del_interface_addr(
1784             is_add=1, sw_if_index=self.pg4.sw_if_index
1785         )
1786
1787         # no address in NAT64 pool
1788         addresses = self.vapi.nat44_address_dump()
1789         self.assertEqual(0, len(addresses))
1790
1791         # configure interface address and check NAT64 address pool
1792         self.pg4.config_ip4()
1793         addresses = self.vapi.nat64_pool_addr_dump()
1794         self.assertEqual(len(addresses), 1)
1795
1796         self.assertEqual(str(addresses[0].address), self.pg4.local_ip4)
1797
1798         # remove interface address and check NAT64 address pool
1799         self.pg4.unconfig_ip4()
1800         addresses = self.vapi.nat64_pool_addr_dump()
1801         self.assertEqual(0, len(addresses))
1802
1803     @unittest.skipUnless(config.extended, "part of extended tests")
1804     def test_ipfix_max_bibs_sessions(self):
1805         """IPFIX logging maximum session and BIB entries exceeded"""
1806         max_bibs = 1280
1807         max_sessions = 2560
1808         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1809
1810         self.vapi.nat64_add_del_pool_addr_range(
1811             start_addr=self.nat_addr,
1812             end_addr=self.nat_addr,
1813             vrf_id=0xFFFFFFFF,
1814             is_add=1,
1815         )
1816         flags = self.config_flags.NAT_IS_INSIDE
1817         self.vapi.nat64_add_del_interface(
1818             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1819         )
1820         self.vapi.nat64_add_del_interface(
1821             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1822         )
1823
1824         pkts = []
1825         src = ""
1826         for i in range(0, max_bibs):
1827             src = "fd01:aa::%x" % (i)
1828             p = (
1829                 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1830                 / IPv6(src=src, dst=remote_host_ip6)
1831                 / TCP(sport=12345, dport=80)
1832             )
1833             pkts.append(p)
1834             p = (
1835                 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1836                 / IPv6(src=src, dst=remote_host_ip6)
1837                 / TCP(sport=12345, dport=22)
1838             )
1839             pkts.append(p)
1840         self.pg0.add_stream(pkts)
1841         self.pg_enable_capture(self.pg_interfaces)
1842         self.pg_start()
1843         self.pg1.get_capture(max_sessions)
1844
1845         self.vapi.set_ipfix_exporter(
1846             collector_address=self.pg3.remote_ip4,
1847             src_address=self.pg3.local_ip4,
1848             path_mtu=512,
1849             template_interval=10,
1850         )
1851         self.vapi.nat_ipfix_enable_disable(
1852             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
1853         )
1854
1855         p = (
1856             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1857             / IPv6(src=src, dst=remote_host_ip6)
1858             / TCP(sport=12345, dport=25)
1859         )
1860         self.pg0.add_stream(p)
1861         self.pg_enable_capture(self.pg_interfaces)
1862         self.pg_start()
1863         self.pg1.assert_nothing_captured()
1864         self.vapi.ipfix_flush()
1865         capture = self.pg3.get_capture(7)
1866         ipfix = IPFIXDecoder()
1867         # first load template
1868         for p in capture:
1869             self.assertTrue(p.haslayer(IPFIX))
1870             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1871             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1872             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1873             self.assertEqual(p[UDP].dport, 4739)
1874             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1875             if p.haslayer(Template):
1876                 ipfix.add_template(p.getlayer(Template))
1877         # verify events in data set
1878         for p in capture:
1879             if p.haslayer(Data):
1880                 data = ipfix.decode_data_set(p.getlayer(Set))
1881                 self.verify_ipfix_max_sessions(data, max_sessions)
1882
1883         p = (
1884             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1885             / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
1886             / TCP(sport=12345, dport=80)
1887         )
1888         self.pg0.add_stream(p)
1889         self.pg_enable_capture(self.pg_interfaces)
1890         self.pg_start()
1891         self.pg1.assert_nothing_captured()
1892         self.vapi.ipfix_flush()
1893         capture = self.pg3.get_capture(1)
1894         # verify events in data set
1895         for p in capture:
1896             self.assertTrue(p.haslayer(IPFIX))
1897             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1898             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1899             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1900             self.assertEqual(p[UDP].dport, 4739)
1901             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1902             if p.haslayer(Data):
1903                 data = ipfix.decode_data_set(p.getlayer(Set))
1904                 self.verify_ipfix_max_bibs(data, max_bibs)
1905
1906     def test_ipfix_bib_ses(self):
1907         """IPFIX logging NAT64 BIB/session create and delete events"""
1908         self.tcp_port_in = random.randint(1025, 65535)
1909         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1910
1911         self.vapi.nat64_add_del_pool_addr_range(
1912             start_addr=self.nat_addr,
1913             end_addr=self.nat_addr,
1914             vrf_id=0xFFFFFFFF,
1915             is_add=1,
1916         )
1917         flags = self.config_flags.NAT_IS_INSIDE
1918         self.vapi.nat64_add_del_interface(
1919             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1920         )
1921         self.vapi.nat64_add_del_interface(
1922             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1923         )
1924         self.vapi.set_ipfix_exporter(
1925             collector_address=self.pg3.remote_ip4,
1926             src_address=self.pg3.local_ip4,
1927             path_mtu=512,
1928             template_interval=10,
1929         )
1930         self.vapi.nat_ipfix_enable_disable(
1931             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
1932         )
1933
1934         # Create
1935         p = (
1936             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1937             / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
1938             / TCP(sport=self.tcp_port_in, dport=25)
1939         )
1940         self.pg0.add_stream(p)
1941         self.pg_enable_capture(self.pg_interfaces)
1942         self.pg_start()
1943         p = self.pg1.get_capture(1)
1944         self.tcp_port_out = p[0][TCP].sport
1945         self.vapi.ipfix_flush()
1946         capture = self.pg3.get_capture(8)
1947         ipfix = IPFIXDecoder()
1948         # first load template
1949         for p in capture:
1950             self.assertTrue(p.haslayer(IPFIX))
1951             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1952             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1953             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1954             self.assertEqual(p[UDP].dport, 4739)
1955             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1956             if p.haslayer(Template):
1957                 ipfix.add_template(p.getlayer(Template))
1958         # verify events in data set
1959         for p in capture:
1960             if p.haslayer(Data):
1961                 data = ipfix.decode_data_set(p.getlayer(Set))
1962                 if scapy.compat.orb(data[0][230]) == 10:
1963                     self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6)
1964                 elif scapy.compat.orb(data[0][230]) == 6:
1965                     self.verify_ipfix_nat64_ses(
1966                         data, 1, self.pg0.remote_ip6, self.pg1.remote_ip4, 25
1967                     )
1968                 else:
1969                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
1970
1971         # Delete
1972         self.pg_enable_capture(self.pg_interfaces)
1973         self.vapi.nat64_add_del_pool_addr_range(
1974             start_addr=self.nat_addr,
1975             end_addr=self.nat_addr,
1976             vrf_id=0xFFFFFFFF,
1977             is_add=0,
1978         )
1979         self.vapi.ipfix_flush()
1980         capture = self.pg3.get_capture(2)
1981         # verify events in data set
1982         for p in capture:
1983             self.assertTrue(p.haslayer(IPFIX))
1984             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1985             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1986             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1987             self.assertEqual(p[UDP].dport, 4739)
1988             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1989             if p.haslayer(Data):
1990                 data = ipfix.decode_data_set(p.getlayer(Set))
1991                 if scapy.compat.orb(data[0][230]) == 11:
1992                     self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6)
1993                 elif scapy.compat.orb(data[0][230]) == 7:
1994                     self.verify_ipfix_nat64_ses(
1995                         data, 0, self.pg0.remote_ip6, self.pg1.remote_ip4, 25
1996                     )
1997                 else:
1998                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
1999
2000     def test_syslog_sess(self):
2001         """Test syslog session creation and deletion"""
2002         self.tcp_port_in = random.randint(1025, 65535)
2003         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
2004
2005         self.vapi.nat64_add_del_pool_addr_range(
2006             start_addr=self.nat_addr,
2007             end_addr=self.nat_addr,
2008             vrf_id=0xFFFFFFFF,
2009             is_add=1,
2010         )
2011         flags = self.config_flags.NAT_IS_INSIDE
2012         self.vapi.nat64_add_del_interface(
2013             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
2014         )
2015         self.vapi.nat64_add_del_interface(
2016             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
2017         )
2018         self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2019         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2020
2021         p = (
2022             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2023             / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
2024             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
2025         )
2026         self.pg0.add_stream(p)
2027         self.pg_enable_capture(self.pg_interfaces)
2028         self.pg_start()
2029         p = self.pg1.get_capture(1)
2030         self.tcp_port_out = p[0][TCP].sport
2031         capture = self.pg3.get_capture(1)
2032         self.verify_syslog_sess(capture[0][Raw].load, is_ip6=True)
2033
2034         self.pg_enable_capture(self.pg_interfaces)
2035         self.pg_start()
2036         self.vapi.nat64_add_del_pool_addr_range(
2037             start_addr=self.nat_addr,
2038             end_addr=self.nat_addr,
2039             vrf_id=0xFFFFFFFF,
2040             is_add=0,
2041         )
2042         capture = self.pg3.get_capture(1)
2043         self.verify_syslog_sess(capture[0][Raw].load, False, True)
2044
2045     def nat64_get_ses_num(self):
2046         """
2047         Return number of active NAT64 sessions.
2048         """
2049         st = self.vapi.nat64_st_dump(proto=255)
2050         return len(st)
2051
2052     def clear_nat64(self):
2053         """
2054         Clear NAT64 configuration.
2055         """
2056         self.vapi.nat_ipfix_enable_disable(
2057             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
2058         )
2059         self.ipfix_src_port = 4739
2060         self.ipfix_domain_id = 1
2061
2062         self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
2063
2064         self.vapi.nat64_set_timeouts(
2065             udp=300, tcp_established=7440, tcp_transitory=240, icmp=60
2066         )
2067
2068         interfaces = self.vapi.nat64_interface_dump()
2069         for intf in interfaces:
2070             self.vapi.nat64_add_del_interface(
2071                 is_add=0, flags=intf.flags, sw_if_index=intf.sw_if_index
2072             )
2073
2074         bib = self.vapi.nat64_bib_dump(proto=255)
2075         for bibe in bib:
2076             if bibe.flags & self.config_flags.NAT_IS_STATIC:
2077                 self.vapi.nat64_add_del_static_bib(
2078                     i_addr=bibe.i_addr,
2079                     o_addr=bibe.o_addr,
2080                     i_port=bibe.i_port,
2081                     o_port=bibe.o_port,
2082                     proto=bibe.proto,
2083                     vrf_id=bibe.vrf_id,
2084                     is_add=0,
2085                 )
2086
2087         adresses = self.vapi.nat64_pool_addr_dump()
2088         for addr in adresses:
2089             self.vapi.nat64_add_del_pool_addr_range(
2090                 start_addr=addr.address,
2091                 end_addr=addr.address,
2092                 vrf_id=addr.vrf_id,
2093                 is_add=0,
2094             )
2095
2096         prefixes = self.vapi.nat64_prefix_dump()
2097         for prefix in prefixes:
2098             self.vapi.nat64_add_del_prefix(
2099                 prefix=str(prefix.prefix), vrf_id=prefix.vrf_id, is_add=0
2100             )
2101
2102         bibs = self.statistics.get_counter("/nat64/total-bibs")
2103         self.assertEqual(bibs[0][0], 0)
2104         sessions = self.statistics.get_counter("/nat64/total-sessions")
2105         self.assertEqual(sessions[0][0], 0)
2106
2107
2108 if __name__ == "__main__":
2109     unittest.main(testRunner=VppTestRunner)