tests: Use errno value rather than a specific int
[vpp.git] / test / test_nat64.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import random
5 import socket
6 import struct
7 import unittest
8 from io import BytesIO
9
10 import scapy.compat
11 from config import config
12 from framework import VppTestCase
13 from asfframework import (
14     tag_fixme_vpp_workers,
15     tag_fixme_ubuntu2204,
16     is_distro_ubuntu2204,
17     VppTestRunner,
18 )
19 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
20 from scapy.data import IP_PROTOS
21 from scapy.layers.inet import IP, TCP, UDP, ICMP
22 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
23 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
24 from scapy.layers.inet6 import (
25     IPv6,
26     ICMPv6EchoRequest,
27     ICMPv6EchoReply,
28     ICMPv6ND_NS,
29     ICMPv6ND_NA,
30     ICMPv6NDOptDstLLAddr,
31     fragment6,
32 )
33 from scapy.layers.l2 import Ether, GRE
34 from scapy.packet import Raw
35 from syslog_rfc5424_parser import SyslogMessage, ParseError
36 from syslog_rfc5424_parser.constants import SyslogSeverity
37 from util import ppc, ppp
38 from vpp_papi import VppEnum
39
40
41 @tag_fixme_vpp_workers
42 @tag_fixme_ubuntu2204
43 class TestNAT64(VppTestCase):
44     """NAT64 Test Cases"""
45
46     @property
47     def SYSLOG_SEVERITY(self):
48         return VppEnum.vl_api_syslog_severity_t
49
50     @property
51     def config_flags(self):
52         return VppEnum.vl_api_nat_config_flags_t
53
54     @classmethod
55     def setUpClass(cls):
56         super(TestNAT64, cls).setUpClass()
57
58         if is_distro_ubuntu2204 == True and not hasattr(cls, "vpp"):
59             return
60         cls.tcp_port_in = 6303
61         cls.tcp_port_out = 6303
62         cls.udp_port_in = 6304
63         cls.udp_port_out = 6304
64         cls.icmp_id_in = 6305
65         cls.icmp_id_out = 6305
66         cls.tcp_external_port = 80
67         cls.nat_addr = "10.0.0.3"
68         cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
69         cls.vrf1_id = 10
70         cls.vrf1_nat_addr = "10.0.10.3"
71         cls.ipfix_src_port = 4739
72         cls.ipfix_domain_id = 1
73
74         cls.create_pg_interfaces(range(6))
75         cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
76         cls.ip6_interfaces.append(cls.pg_interfaces[2])
77         cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
78
79         cls.vapi.ip_table_add_del(
80             is_add=1, table={"table_id": cls.vrf1_id, "is_ip6": 1}
81         )
82
83         cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
84
85         cls.pg0.generate_remote_hosts(2)
86
87         for i in cls.ip6_interfaces:
88             i.admin_up()
89             i.config_ip6()
90             i.configure_ipv6_neighbors()
91
92         for i in cls.ip4_interfaces:
93             i.admin_up()
94             i.config_ip4()
95             i.resolve_arp()
96
97         cls.pg3.admin_up()
98         cls.pg3.config_ip4()
99         cls.pg3.resolve_arp()
100         cls.pg3.config_ip6()
101         cls.pg3.configure_ipv6_neighbors()
102
103         cls.pg5.admin_up()
104         cls.pg5.config_ip6()
105
106     @classmethod
107     def tearDownClass(cls):
108         super(TestNAT64, cls).tearDownClass()
109
110     def setUp(self):
111         super(TestNAT64, self).setUp()
112         self.vapi.nat64_plugin_enable_disable(enable=1, bib_buckets=128, st_buckets=256)
113
114     def tearDown(self):
115         super(TestNAT64, self).tearDown()
116         if not self.vpp_dead:
117             self.vapi.nat64_plugin_enable_disable(enable=0)
118
119     def show_commands_at_teardown(self):
120         self.logger.info(self.vapi.cli("show nat64 pool"))
121         self.logger.info(self.vapi.cli("show nat64 interfaces"))
122         self.logger.info(self.vapi.cli("show nat64 prefix"))
123         self.logger.info(self.vapi.cli("show nat64 bib all"))
124         self.logger.info(self.vapi.cli("show nat64 session table all"))
125
126     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
127         """
128         Create IPv6 packet stream for inside network
129
130         :param in_if: Inside interface
131         :param out_if: Outside interface
132         :param ttl: Hop Limit of generated packets
133         :param pref: NAT64 prefix
134         :param plen: NAT64 prefix length
135         """
136         pkts = []
137         if pref is None:
138             dst = "".join(["64:ff9b::", out_if.remote_ip4])
139         else:
140             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
141
142         # TCP
143         p = (
144             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
145             / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim)
146             / TCP(sport=self.tcp_port_in, dport=20)
147         )
148         pkts.append(p)
149
150         # UDP
151         p = (
152             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
153             / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim)
154             / UDP(sport=self.udp_port_in, dport=20)
155         )
156         pkts.append(p)
157
158         # ICMP
159         p = (
160             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
161             / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim)
162             / ICMPv6EchoRequest(id=self.icmp_id_in)
163         )
164         pkts.append(p)
165
166         return pkts
167
168     def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
169         """
170         Create packet stream for outside network
171
172         :param out_if: Outside interface
173         :param dst_ip: Destination IP address (Default use global NAT address)
174         :param ttl: TTL of generated packets
175         :param use_inside_ports: Use inside NAT ports as destination ports
176                instead of outside ports
177         """
178         if dst_ip is None:
179             dst_ip = self.nat_addr
180         if not use_inside_ports:
181             tcp_port = self.tcp_port_out
182             udp_port = self.udp_port_out
183             icmp_id = self.icmp_id_out
184         else:
185             tcp_port = self.tcp_port_in
186             udp_port = self.udp_port_in
187             icmp_id = self.icmp_id_in
188         pkts = []
189         # TCP
190         p = (
191             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
192             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
193             / TCP(dport=tcp_port, sport=20)
194         )
195         pkts.extend([p, p])
196
197         # UDP
198         p = (
199             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
200             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
201             / UDP(dport=udp_port, sport=20)
202         )
203         pkts.append(p)
204
205         # ICMP
206         p = (
207             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
208             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
209             / ICMP(id=icmp_id, type="echo-reply")
210         )
211         pkts.append(p)
212
213         return pkts
214
215     def verify_capture_out(
216         self,
217         capture,
218         nat_ip=None,
219         same_port=False,
220         dst_ip=None,
221         is_ip6=False,
222         ignore_port=False,
223     ):
224         """
225         Verify captured packets on outside network
226
227         :param capture: Captured packets
228         :param nat_ip: Translated IP address (Default use global NAT address)
229         :param same_port: Source port number is not translated (Default False)
230         :param dst_ip: Destination IP address (Default do not verify)
231         :param is_ip6: If L3 protocol is IPv6 (Default False)
232         """
233         if is_ip6:
234             IP46 = IPv6
235             ICMP46 = ICMPv6EchoRequest
236         else:
237             IP46 = IP
238             ICMP46 = ICMP
239         if nat_ip is None:
240             nat_ip = self.nat_addr
241         for packet in capture:
242             try:
243                 if not is_ip6:
244                     self.assert_packet_checksums_valid(packet)
245                 self.assertEqual(packet[IP46].src, nat_ip)
246                 if dst_ip is not None:
247                     self.assertEqual(packet[IP46].dst, dst_ip)
248                 if packet.haslayer(TCP):
249                     if not ignore_port:
250                         if same_port:
251                             self.assertEqual(packet[TCP].sport, self.tcp_port_in)
252                         else:
253                             self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
254                     self.tcp_port_out = packet[TCP].sport
255                     self.assert_packet_checksums_valid(packet)
256                 elif packet.haslayer(UDP):
257                     if not ignore_port:
258                         if same_port:
259                             self.assertEqual(packet[UDP].sport, self.udp_port_in)
260                         else:
261                             self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
262                     self.udp_port_out = packet[UDP].sport
263                 else:
264                     if not ignore_port:
265                         if same_port:
266                             self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
267                         else:
268                             self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
269                     self.icmp_id_out = packet[ICMP46].id
270                     self.assert_packet_checksums_valid(packet)
271             except:
272                 self.logger.error(
273                     ppp("Unexpected or invalid packet (outside network):", packet)
274                 )
275                 raise
276
277     def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
278         """
279         Verify captured IPv6 packets on inside network
280
281         :param capture: Captured packets
282         :param src_ip: Source IP
283         :param dst_ip: Destination IP address
284         """
285         for packet in capture:
286             try:
287                 self.assertEqual(packet[IPv6].src, src_ip)
288                 self.assertEqual(packet[IPv6].dst, dst_ip)
289                 self.assert_packet_checksums_valid(packet)
290                 if packet.haslayer(TCP):
291                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
292                 elif packet.haslayer(UDP):
293                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
294                 else:
295                     self.assertEqual(packet[ICMPv6EchoReply].id, self.icmp_id_in)
296             except:
297                 self.logger.error(
298                     ppp("Unexpected or invalid packet (inside network):", packet)
299                 )
300                 raise
301
302     def create_stream_frag(
303         self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
304     ):
305         """
306         Create fragmented packet stream
307
308         :param src_if: Source interface
309         :param dst: Destination IPv4 address
310         :param sport: Source port
311         :param dport: Destination port
312         :param data: Payload data
313         :param proto: protocol (TCP, UDP, ICMP)
314         :param echo_reply: use echo_reply if protocol is ICMP
315         :returns: Fragments
316         """
317         if proto == IP_PROTOS.tcp:
318             p = (
319                 IP(src=src_if.remote_ip4, dst=dst)
320                 / TCP(sport=sport, dport=dport)
321                 / Raw(data)
322             )
323             p = p.__class__(scapy.compat.raw(p))
324             chksum = p[TCP].chksum
325             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
326         elif proto == IP_PROTOS.udp:
327             proto_header = UDP(sport=sport, dport=dport)
328         elif proto == IP_PROTOS.icmp:
329             if not echo_reply:
330                 proto_header = ICMP(id=sport, type="echo-request")
331             else:
332                 proto_header = ICMP(id=sport, type="echo-reply")
333         else:
334             raise Exception("Unsupported protocol")
335         id = random.randint(0, 65535)
336         pkts = []
337         if proto == IP_PROTOS.tcp:
338             raw = Raw(data[0:4])
339         else:
340             raw = Raw(data[0:16])
341         p = (
342             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
343             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
344             / proto_header
345             / raw
346         )
347         pkts.append(p)
348         if proto == IP_PROTOS.tcp:
349             raw = Raw(data[4:20])
350         else:
351             raw = Raw(data[16:32])
352         p = (
353             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
354             / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
355             / raw
356         )
357         pkts.append(p)
358         if proto == IP_PROTOS.tcp:
359             raw = Raw(data[20:])
360         else:
361             raw = Raw(data[32:])
362         p = (
363             Ether(src=src_if.remote_mac, dst=src_if.local_mac)
364             / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
365             / raw
366         )
367         pkts.append(p)
368         return pkts
369
370     def create_stream_frag_ip6(
371         self, src_if, dst, sport, dport, data, pref=None, plen=0, frag_size=128
372     ):
373         """
374         Create fragmented packet stream
375
376         :param src_if: Source interface
377         :param dst: Destination IPv4 address
378         :param sport: Source TCP port
379         :param dport: Destination TCP port
380         :param data: Payload data
381         :param pref: NAT64 prefix
382         :param plen: NAT64 prefix length
383         :param fragsize: size of fragments
384         :returns: Fragments
385         """
386         if pref is None:
387             dst_ip6 = "".join(["64:ff9b::", dst])
388         else:
389             dst_ip6 = self.compose_ip6(dst, pref, plen)
390
391         p = (
392             Ether(dst=src_if.local_mac, src=src_if.remote_mac)
393             / IPv6(src=src_if.remote_ip6, dst=dst_ip6)
394             / IPv6ExtHdrFragment(id=random.randint(0, 65535))
395             / TCP(sport=sport, dport=dport)
396             / Raw(data)
397         )
398
399         return fragment6(p, frag_size)
400
401     def reass_frags_and_verify(self, frags, src, dst):
402         """
403         Reassemble and verify fragmented packet
404
405         :param frags: Captured fragments
406         :param src: Source IPv4 address to verify
407         :param dst: Destination IPv4 address to verify
408
409         :returns: Reassembled IPv4 packet
410         """
411         buffer = BytesIO()
412         for p in frags:
413             self.assertEqual(p[IP].src, src)
414             self.assertEqual(p[IP].dst, dst)
415             self.assert_ip_checksum_valid(p)
416             buffer.seek(p[IP].frag * 8)
417             buffer.write(bytes(p[IP].payload))
418         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
419         if ip.proto == IP_PROTOS.tcp:
420             p = ip / TCP(buffer.getvalue())
421             self.logger.debug(ppp("Reassembled:", p))
422             self.assert_tcp_checksum_valid(p)
423         elif ip.proto == IP_PROTOS.udp:
424             p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
425         elif ip.proto == IP_PROTOS.icmp:
426             p = ip / ICMP(buffer.getvalue())
427         return p
428
429     def reass_frags_and_verify_ip6(self, frags, src, dst):
430         """
431         Reassemble and verify fragmented packet
432
433         :param frags: Captured fragments
434         :param src: Source IPv6 address to verify
435         :param dst: Destination IPv6 address to verify
436
437         :returns: Reassembled IPv6 packet
438         """
439         buffer = BytesIO()
440         for p in frags:
441             self.assertEqual(p[IPv6].src, src)
442             self.assertEqual(p[IPv6].dst, dst)
443             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
444             buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
445         ip = IPv6(
446             src=frags[0][IPv6].src,
447             dst=frags[0][IPv6].dst,
448             nh=frags[0][IPv6ExtHdrFragment].nh,
449         )
450         if ip.nh == IP_PROTOS.tcp:
451             p = ip / TCP(buffer.getvalue())
452         elif ip.nh == IP_PROTOS.udp:
453             p = ip / UDP(buffer.getvalue())
454         self.logger.debug(ppp("Reassembled:", p))
455         self.assert_packet_checksums_valid(p)
456         return p
457
458     def verify_ipfix_max_bibs(self, data, limit):
459         """
460         Verify IPFIX maximum BIB entries exceeded event
461
462         :param data: Decoded IPFIX data records
463         :param limit: Number of maximum BIB entries that can be created.
464         """
465         self.assertEqual(1, len(data))
466         record = data[0]
467         # natEvent
468         self.assertEqual(scapy.compat.orb(record[230]), 13)
469         # natQuotaExceededEvent
470         self.assertEqual(struct.pack("!I", 2), record[466])
471         # maxBIBEntries
472         self.assertEqual(struct.pack("!I", limit), record[472])
473         return len(data)
474
475     def verify_ipfix_bib(self, data, is_create, src_addr):
476         """
477         Verify IPFIX NAT64 BIB create and delete events
478
479         :param data: Decoded IPFIX data records
480         :param is_create: Create event if nonzero value otherwise delete event
481         :param src_addr: IPv6 source address
482         """
483         self.assertEqual(1, len(data))
484         record = data[0]
485         # natEvent
486         if is_create:
487             self.assertEqual(scapy.compat.orb(record[230]), 10)
488         else:
489             self.assertEqual(scapy.compat.orb(record[230]), 11)
490         # sourceIPv6Address
491         self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
492         # postNATSourceIPv4Address
493         self.assertEqual(self.nat_addr_n, record[225])
494         # protocolIdentifier
495         self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
496         # ingressVRFID
497         self.assertEqual(struct.pack("!I", 0), record[234])
498         # sourceTransportPort
499         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
500         # postNAPTSourceTransportPort
501         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
502
503     def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr, dst_port):
504         """
505         Verify IPFIX NAT64 session create and delete events
506
507         :param data: Decoded IPFIX data records
508         :param is_create: Create event if nonzero value otherwise delete event
509         :param src_addr: IPv6 source address
510         :param dst_addr: IPv4 destination address
511         :param dst_port: destination TCP port
512         """
513         self.assertEqual(1, len(data))
514         record = data[0]
515         # natEvent
516         if is_create:
517             self.assertEqual(scapy.compat.orb(record[230]), 6)
518         else:
519             self.assertEqual(scapy.compat.orb(record[230]), 7)
520         # sourceIPv6Address
521         self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
522         # destinationIPv6Address
523         self.assertEqual(
524             socket.inet_pton(
525                 socket.AF_INET6, self.compose_ip6(dst_addr, "64:ff9b::", 96)
526             ),
527             record[28],
528         )
529         # postNATSourceIPv4Address
530         self.assertEqual(self.nat_addr_n, record[225])
531         # postNATDestinationIPv4Address
532         self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr), record[226])
533         # protocolIdentifier
534         self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
535         # ingressVRFID
536         self.assertEqual(struct.pack("!I", 0), record[234])
537         # sourceTransportPort
538         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
539         # postNAPTSourceTransportPort
540         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
541         # destinationTransportPort
542         self.assertEqual(struct.pack("!H", dst_port), record[11])
543         # postNAPTDestinationTransportPort
544         self.assertEqual(struct.pack("!H", dst_port), record[228])
545
546     def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
547         message = data.decode("utf-8")
548         try:
549             message = SyslogMessage.parse(message)
550         except ParseError as e:
551             self.logger.error(e)
552             raise
553         else:
554             self.assertEqual(message.severity, SyslogSeverity.info)
555             self.assertEqual(message.appname, "NAT")
556             self.assertEqual(message.msgid, "SADD" if is_add else "SDEL")
557             sd_params = message.sd.get("nsess")
558             self.assertTrue(sd_params is not None)
559             if is_ip6:
560                 self.assertEqual(sd_params.get("IATYP"), "IPv6")
561                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip6)
562             else:
563                 self.assertEqual(sd_params.get("IATYP"), "IPv4")
564                 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
565                 self.assertTrue(sd_params.get("SSUBIX") is not None)
566             self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
567             self.assertEqual(sd_params.get("XATYP"), "IPv4")
568             self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
569             self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
570             self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
571             self.assertEqual(sd_params.get("SVLAN"), "0")
572             self.assertEqual(sd_params.get("XDADDR"), self.pg1.remote_ip4)
573             self.assertEqual(sd_params.get("XDPORT"), "%d" % self.tcp_external_port)
574
575     def compose_ip6(self, ip4, pref, plen):
576         """
577         Compose IPv4-embedded IPv6 addresses
578
579         :param ip4: IPv4 address
580         :param pref: IPv6 prefix
581         :param plen: IPv6 prefix length
582         :returns: IPv4-embedded IPv6 addresses
583         """
584         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
585         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
586         if plen == 32:
587             pref_n[4] = ip4_n[0]
588             pref_n[5] = ip4_n[1]
589             pref_n[6] = ip4_n[2]
590             pref_n[7] = ip4_n[3]
591         elif plen == 40:
592             pref_n[5] = ip4_n[0]
593             pref_n[6] = ip4_n[1]
594             pref_n[7] = ip4_n[2]
595             pref_n[9] = ip4_n[3]
596         elif plen == 48:
597             pref_n[6] = ip4_n[0]
598             pref_n[7] = ip4_n[1]
599             pref_n[9] = ip4_n[2]
600             pref_n[10] = ip4_n[3]
601         elif plen == 56:
602             pref_n[7] = ip4_n[0]
603             pref_n[9] = ip4_n[1]
604             pref_n[10] = ip4_n[2]
605             pref_n[11] = ip4_n[3]
606         elif plen == 64:
607             pref_n[9] = ip4_n[0]
608             pref_n[10] = ip4_n[1]
609             pref_n[11] = ip4_n[2]
610             pref_n[12] = ip4_n[3]
611         elif plen == 96:
612             pref_n[12] = ip4_n[0]
613             pref_n[13] = ip4_n[1]
614             pref_n[14] = ip4_n[2]
615             pref_n[15] = ip4_n[3]
616         packed_pref_n = b"".join([scapy.compat.chb(x) for x in pref_n])
617         return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
618
619     def verify_ipfix_max_sessions(self, data, limit):
620         """
621         Verify IPFIX maximum session entries exceeded event
622
623         :param data: Decoded IPFIX data records
624         :param limit: Number of maximum session entries that can be created.
625         """
626         self.assertEqual(1, len(data))
627         record = data[0]
628         # natEvent
629         self.assertEqual(scapy.compat.orb(record[230]), 13)
630         # natQuotaExceededEvent
631         self.assertEqual(struct.pack("!I", 1), record[466])
632         # maxSessionEntries
633         self.assertEqual(struct.pack("!I", limit), record[471])
634         return len(data)
635
636     def test_nat64_inside_interface_handles_neighbor_advertisement(self):
637         """NAT64 inside interface handles Neighbor Advertisement"""
638
639         flags = self.config_flags.NAT_IS_INSIDE
640         self.vapi.nat64_add_del_interface(
641             is_add=1, flags=flags, sw_if_index=self.pg5.sw_if_index
642         )
643
644         # Try to send ping
645         ping = (
646             Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac)
647             / IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6)
648             / ICMPv6EchoRequest()
649         )
650         pkts = [ping]
651         self.pg5.add_stream(pkts)
652         self.pg_enable_capture(self.pg_interfaces)
653         self.pg_start()
654
655         # Wait for Neighbor Solicitation
656         capture = self.pg5.get_capture(len(pkts))
657         packet = capture[0]
658         try:
659             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6_ll)
660             self.assertEqual(packet.haslayer(ICMPv6ND_NS), 1)
661             tgt = packet[ICMPv6ND_NS].tgt
662         except:
663             self.logger.error(ppp("Unexpected or invalid packet:", packet))
664             raise
665
666         # Send Neighbor Advertisement
667         p = (
668             Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac)
669             / IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6)
670             / ICMPv6ND_NA(tgt=tgt)
671             / ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac)
672         )
673         pkts = [p]
674         self.pg5.add_stream(pkts)
675         self.pg_enable_capture(self.pg_interfaces)
676         self.pg_start()
677
678         # Try to send ping again
679         pkts = [ping]
680         self.pg5.add_stream(pkts)
681         self.pg_enable_capture(self.pg_interfaces)
682         self.pg_start()
683
684         # Wait for ping reply
685         capture = self.pg5.get_capture(len(pkts))
686         packet = capture[0]
687         try:
688             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
689             self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
690             self.assertEqual(packet.haslayer(ICMPv6EchoReply), 1)
691         except:
692             self.logger.error(ppp("Unexpected or invalid packet:", packet))
693             raise
694
695     def test_pool(self):
696         """Add/delete address to NAT64 pool"""
697         nat_addr = "1.2.3.4"
698
699         self.vapi.nat64_add_del_pool_addr_range(
700             start_addr=nat_addr, end_addr=nat_addr, vrf_id=0xFFFFFFFF, is_add=1
701         )
702
703         addresses = self.vapi.nat64_pool_addr_dump()
704         self.assertEqual(len(addresses), 1)
705         self.assertEqual(str(addresses[0].address), nat_addr)
706
707         self.vapi.nat64_add_del_pool_addr_range(
708             start_addr=nat_addr, end_addr=nat_addr, vrf_id=0xFFFFFFFF, is_add=0
709         )
710
711         addresses = self.vapi.nat64_pool_addr_dump()
712         self.assertEqual(len(addresses), 0)
713
714     def test_interface(self):
715         """Enable/disable NAT64 feature on the interface"""
716         flags = self.config_flags.NAT_IS_INSIDE
717         self.vapi.nat64_add_del_interface(
718             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
719         )
720         self.vapi.nat64_add_del_interface(
721             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
722         )
723
724         interfaces = self.vapi.nat64_interface_dump()
725         self.assertEqual(len(interfaces), 2)
726         pg0_found = False
727         pg1_found = False
728         for intf in interfaces:
729             if intf.sw_if_index == self.pg0.sw_if_index:
730                 self.assertEqual(intf.flags, self.config_flags.NAT_IS_INSIDE)
731                 pg0_found = True
732             elif intf.sw_if_index == self.pg1.sw_if_index:
733                 self.assertEqual(intf.flags, self.config_flags.NAT_IS_OUTSIDE)
734                 pg1_found = True
735         self.assertTrue(pg0_found)
736         self.assertTrue(pg1_found)
737
738         features = self.vapi.cli("show interface features pg0")
739         self.assertIn("nat64-in2out", features)
740         features = self.vapi.cli("show interface features pg1")
741         self.assertIn("nat64-out2in", features)
742
743         self.vapi.nat64_add_del_interface(
744             is_add=0, flags=flags, sw_if_index=self.pg0.sw_if_index
745         )
746         self.vapi.nat64_add_del_interface(
747             is_add=0, flags=flags, sw_if_index=self.pg1.sw_if_index
748         )
749
750         interfaces = self.vapi.nat64_interface_dump()
751         self.assertEqual(len(interfaces), 0)
752
753     def test_static_bib(self):
754         """Add/delete static BIB entry"""
755         in_addr = "2001:db8:85a3::8a2e:370:7334"
756         out_addr = "10.1.1.3"
757         in_port = 1234
758         out_port = 5678
759         proto = IP_PROTOS.tcp
760
761         self.vapi.nat64_add_del_static_bib(
762             i_addr=in_addr,
763             o_addr=out_addr,
764             i_port=in_port,
765             o_port=out_port,
766             proto=proto,
767             vrf_id=0,
768             is_add=1,
769         )
770         bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
771         static_bib_num = 0
772         for bibe in bib:
773             if bibe.flags & self.config_flags.NAT_IS_STATIC:
774                 static_bib_num += 1
775                 self.assertEqual(str(bibe.i_addr), in_addr)
776                 self.assertEqual(str(bibe.o_addr), out_addr)
777                 self.assertEqual(bibe.i_port, in_port)
778                 self.assertEqual(bibe.o_port, out_port)
779         self.assertEqual(static_bib_num, 1)
780         bibs = self.statistics.get_counter("/nat64/total-bibs")
781         self.assertEqual(bibs[0][0], 1)
782
783         self.vapi.nat64_add_del_static_bib(
784             i_addr=in_addr,
785             o_addr=out_addr,
786             i_port=in_port,
787             o_port=out_port,
788             proto=proto,
789             vrf_id=0,
790             is_add=0,
791         )
792         bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
793         static_bib_num = 0
794         for bibe in bib:
795             if bibe.flags & self.config_flags.NAT_IS_STATIC:
796                 static_bib_num += 1
797         self.assertEqual(static_bib_num, 0)
798         bibs = self.statistics.get_counter("/nat64/total-bibs")
799         self.assertEqual(bibs[0][0], 0)
800
801     def test_set_timeouts(self):
802         """Set NAT64 timeouts"""
803         # verify default values
804         timeouts = self.vapi.nat64_get_timeouts()
805         self.assertEqual(timeouts.udp, 300)
806         self.assertEqual(timeouts.icmp, 60)
807         self.assertEqual(timeouts.tcp_transitory, 240)
808         self.assertEqual(timeouts.tcp_established, 7440)
809
810         # set and verify custom values
811         self.vapi.nat64_set_timeouts(
812             udp=200, tcp_established=7450, tcp_transitory=250, icmp=30
813         )
814         timeouts = self.vapi.nat64_get_timeouts()
815         self.assertEqual(timeouts.udp, 200)
816         self.assertEqual(timeouts.icmp, 30)
817         self.assertEqual(timeouts.tcp_transitory, 250)
818         self.assertEqual(timeouts.tcp_established, 7450)
819
820     def test_dynamic(self):
821         """NAT64 dynamic translation test"""
822         self.tcp_port_in = 6303
823         self.udp_port_in = 6304
824         self.icmp_id_in = 6305
825
826         ses_num_start = self.nat64_get_ses_num()
827
828         self.vapi.nat64_add_del_pool_addr_range(
829             start_addr=self.nat_addr,
830             end_addr=self.nat_addr,
831             vrf_id=0xFFFFFFFF,
832             is_add=1,
833         )
834         flags = self.config_flags.NAT_IS_INSIDE
835         self.vapi.nat64_add_del_interface(
836             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
837         )
838         self.vapi.nat64_add_del_interface(
839             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
840         )
841
842         # in2out
843         tcpn = self.statistics.get_counter("/nat64/in2out/tcp")[0]
844         udpn = self.statistics.get_counter("/nat64/in2out/udp")[0]
845         icmpn = self.statistics.get_counter("/nat64/in2out/icmp")[0]
846         drops = self.statistics.get_counter("/nat64/in2out/drops")[0]
847
848         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
849         self.pg0.add_stream(pkts)
850         self.pg_enable_capture(self.pg_interfaces)
851         self.pg_start()
852         capture = self.pg1.get_capture(len(pkts))
853         self.verify_capture_out(
854             capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
855         )
856
857         if_idx = self.pg0.sw_if_index
858         cnt = self.statistics.get_counter("/nat64/in2out/tcp")[0]
859         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 1)
860         cnt = self.statistics.get_counter("/nat64/in2out/udp")[0]
861         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
862         cnt = self.statistics.get_counter("/nat64/in2out/icmp")[0]
863         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
864         cnt = self.statistics.get_counter("/nat64/in2out/drops")[0]
865         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
866
867         # out2in
868         tcpn = self.statistics.get_counter("/nat64/out2in/tcp")[0]
869         udpn = self.statistics.get_counter("/nat64/out2in/udp")[0]
870         icmpn = self.statistics.get_counter("/nat64/out2in/icmp")[0]
871         drops = self.statistics.get_counter("/nat64/out2in/drops")[0]
872
873         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
874         self.pg1.add_stream(pkts)
875         self.pg_enable_capture(self.pg_interfaces)
876         self.pg_start()
877         capture = self.pg0.get_capture(len(pkts))
878         ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4]))
879         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
880
881         if_idx = self.pg1.sw_if_index
882         cnt = self.statistics.get_counter("/nat64/out2in/tcp")[0]
883         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
884         cnt = self.statistics.get_counter("/nat64/out2in/udp")[0]
885         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
886         cnt = self.statistics.get_counter("/nat64/out2in/icmp")[0]
887         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
888         cnt = self.statistics.get_counter("/nat64/out2in/drops")[0]
889         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
890
891         bibs = self.statistics.get_counter("/nat64/total-bibs")
892         self.assertEqual(bibs[0][0], 3)
893         sessions = self.statistics.get_counter("/nat64/total-sessions")
894         self.assertEqual(sessions[0][0], 3)
895
896         # in2out
897         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
898         self.pg0.add_stream(pkts)
899         self.pg_enable_capture(self.pg_interfaces)
900         self.pg_start()
901         capture = self.pg1.get_capture(len(pkts))
902         self.verify_capture_out(
903             capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
904         )
905
906         # out2in
907         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
908         self.pg1.add_stream(pkts)
909         self.pg_enable_capture(self.pg_interfaces)
910         self.pg_start()
911         capture = self.pg0.get_capture(len(pkts))
912         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
913
914         ses_num_end = self.nat64_get_ses_num()
915
916         self.assertEqual(ses_num_end - ses_num_start, 3)
917
918         # tenant with specific VRF
919         self.vapi.nat64_add_del_pool_addr_range(
920             start_addr=self.vrf1_nat_addr,
921             end_addr=self.vrf1_nat_addr,
922             vrf_id=self.vrf1_id,
923             is_add=1,
924         )
925         flags = self.config_flags.NAT_IS_INSIDE
926         self.vapi.nat64_add_del_interface(
927             is_add=1, flags=flags, sw_if_index=self.pg2.sw_if_index
928         )
929
930         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
931         self.pg2.add_stream(pkts)
932         self.pg_enable_capture(self.pg_interfaces)
933         self.pg_start()
934         capture = self.pg1.get_capture(len(pkts))
935         self.verify_capture_out(
936             capture, nat_ip=self.vrf1_nat_addr, dst_ip=self.pg1.remote_ip4
937         )
938
939         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
940         self.pg1.add_stream(pkts)
941         self.pg_enable_capture(self.pg_interfaces)
942         self.pg_start()
943         capture = self.pg2.get_capture(len(pkts))
944         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
945
946     def test_static(self):
947         """NAT64 static translation test"""
948         self.tcp_port_in = 60303
949         self.udp_port_in = 60304
950         self.icmp_id_in = 60305
951         self.tcp_port_out = 60303
952         self.udp_port_out = 60304
953         self.icmp_id_out = 60305
954
955         ses_num_start = self.nat64_get_ses_num()
956
957         self.vapi.nat64_add_del_pool_addr_range(
958             start_addr=self.nat_addr,
959             end_addr=self.nat_addr,
960             vrf_id=0xFFFFFFFF,
961             is_add=1,
962         )
963         flags = self.config_flags.NAT_IS_INSIDE
964         self.vapi.nat64_add_del_interface(
965             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
966         )
967         self.vapi.nat64_add_del_interface(
968             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
969         )
970
971         self.vapi.nat64_add_del_static_bib(
972             i_addr=self.pg0.remote_ip6,
973             o_addr=self.nat_addr,
974             i_port=self.tcp_port_in,
975             o_port=self.tcp_port_out,
976             proto=IP_PROTOS.tcp,
977             vrf_id=0,
978             is_add=1,
979         )
980         self.vapi.nat64_add_del_static_bib(
981             i_addr=self.pg0.remote_ip6,
982             o_addr=self.nat_addr,
983             i_port=self.udp_port_in,
984             o_port=self.udp_port_out,
985             proto=IP_PROTOS.udp,
986             vrf_id=0,
987             is_add=1,
988         )
989         self.vapi.nat64_add_del_static_bib(
990             i_addr=self.pg0.remote_ip6,
991             o_addr=self.nat_addr,
992             i_port=self.icmp_id_in,
993             o_port=self.icmp_id_out,
994             proto=IP_PROTOS.icmp,
995             vrf_id=0,
996             is_add=1,
997         )
998
999         # in2out
1000         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
1001         self.pg0.add_stream(pkts)
1002         self.pg_enable_capture(self.pg_interfaces)
1003         self.pg_start()
1004         capture = self.pg1.get_capture(len(pkts))
1005         self.verify_capture_out(
1006             capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4, same_port=True
1007         )
1008
1009         # out2in
1010         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1011         self.pg1.add_stream(pkts)
1012         self.pg_enable_capture(self.pg_interfaces)
1013         self.pg_start()
1014         capture = self.pg0.get_capture(len(pkts))
1015         ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4]))
1016         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
1017
1018         ses_num_end = self.nat64_get_ses_num()
1019
1020         self.assertEqual(ses_num_end - ses_num_start, 3)
1021
1022     def test_session_timeout(self):
1023         """NAT64 session timeout"""
1024         self.icmp_id_in = 1234
1025         self.vapi.nat64_add_del_pool_addr_range(
1026             start_addr=self.nat_addr,
1027             end_addr=self.nat_addr,
1028             vrf_id=0xFFFFFFFF,
1029             is_add=1,
1030         )
1031         flags = self.config_flags.NAT_IS_INSIDE
1032         self.vapi.nat64_add_del_interface(
1033             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1034         )
1035         self.vapi.nat64_add_del_interface(
1036             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1037         )
1038         self.vapi.nat64_set_timeouts(
1039             udp=300, tcp_established=5, tcp_transitory=5, icmp=5
1040         )
1041
1042         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
1043         self.pg0.add_stream(pkts)
1044         self.pg_enable_capture(self.pg_interfaces)
1045         self.pg_start()
1046         capture = self.pg1.get_capture(len(pkts))
1047
1048         ses_num_before_timeout = self.nat64_get_ses_num()
1049
1050         self.virtual_sleep(15)
1051
1052         # ICMP and TCP session after timeout
1053         ses_num_after_timeout = self.nat64_get_ses_num()
1054         self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
1055
1056     def test_icmp_error(self):
1057         """NAT64 ICMP Error message translation"""
1058         self.tcp_port_in = 6303
1059         self.udp_port_in = 6304
1060         self.icmp_id_in = 6305
1061
1062         self.vapi.nat64_add_del_pool_addr_range(
1063             start_addr=self.nat_addr,
1064             end_addr=self.nat_addr,
1065             vrf_id=0xFFFFFFFF,
1066             is_add=1,
1067         )
1068         flags = self.config_flags.NAT_IS_INSIDE
1069         self.vapi.nat64_add_del_interface(
1070             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1071         )
1072         self.vapi.nat64_add_del_interface(
1073             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1074         )
1075
1076         # send some packets to create sessions
1077         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
1078         self.pg0.add_stream(pkts)
1079         self.pg_enable_capture(self.pg_interfaces)
1080         self.pg_start()
1081         capture_ip4 = self.pg1.get_capture(len(pkts))
1082         self.verify_capture_out(
1083             capture_ip4, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
1084         )
1085
1086         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1087         self.pg1.add_stream(pkts)
1088         self.pg_enable_capture(self.pg_interfaces)
1089         self.pg_start()
1090         capture_ip6 = self.pg0.get_capture(len(pkts))
1091         ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4]))
1092         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src, self.pg0.remote_ip6)
1093
1094         # in2out
1095         pkts = [
1096             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1097             / IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src)
1098             / ICMPv6DestUnreach(code=1)
1099             / packet[IPv6]
1100             for packet in capture_ip6
1101         ]
1102         self.pg0.add_stream(pkts)
1103         self.pg_enable_capture(self.pg_interfaces)
1104         self.pg_start()
1105         capture = self.pg1.get_capture(len(pkts))
1106         for packet in capture:
1107             try:
1108                 self.assertEqual(packet[IP].src, self.nat_addr)
1109                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1110                 self.assertEqual(packet[ICMP].type, 3)
1111                 self.assertEqual(packet[ICMP].code, 13)
1112                 inner = packet[IPerror]
1113                 self.assertEqual(inner.src, self.pg1.remote_ip4)
1114                 self.assertEqual(inner.dst, self.nat_addr)
1115                 self.assert_packet_checksums_valid(packet)
1116                 if inner.haslayer(TCPerror):
1117                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
1118                 elif inner.haslayer(UDPerror):
1119                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
1120                 else:
1121                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
1122             except:
1123                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1124                 raise
1125
1126         # out2in
1127         pkts = [
1128             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1129             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1130             / ICMP(type=3, code=13)
1131             / packet[IP]
1132             for packet in capture_ip4
1133         ]
1134         self.pg1.add_stream(pkts)
1135         self.pg_enable_capture(self.pg_interfaces)
1136         self.pg_start()
1137         capture = self.pg0.get_capture(len(pkts))
1138         for packet in capture:
1139             try:
1140                 self.assertEqual(packet[IPv6].src, ip.src)
1141                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1142                 icmp = packet[ICMPv6DestUnreach]
1143                 self.assertEqual(icmp.code, 1)
1144                 inner = icmp[IPerror6]
1145                 self.assertEqual(inner.src, self.pg0.remote_ip6)
1146                 self.assertEqual(inner.dst, ip.src)
1147                 self.assert_icmpv6_checksum_valid(packet)
1148                 if inner.haslayer(TCPerror):
1149                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
1150                 elif inner.haslayer(UDPerror):
1151                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
1152                 else:
1153                     self.assertEqual(inner[ICMPv6EchoRequest].id, self.icmp_id_in)
1154             except:
1155                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1156                 raise
1157
1158     def test_hairpinning(self):
1159         """NAT64 hairpinning"""
1160
1161         client = self.pg0.remote_hosts[0]
1162         server = self.pg0.remote_hosts[1]
1163         server_tcp_in_port = 22
1164         server_tcp_out_port = 4022
1165         server_udp_in_port = 23
1166         server_udp_out_port = 4023
1167         client_tcp_in_port = 1234
1168         client_udp_in_port = 1235
1169         client_tcp_out_port = 0
1170         client_udp_out_port = 0
1171         ip = IPv6(src="".join(["64:ff9b::", self.nat_addr]))
1172         nat_addr_ip6 = ip.src
1173
1174         self.vapi.nat64_add_del_pool_addr_range(
1175             start_addr=self.nat_addr,
1176             end_addr=self.nat_addr,
1177             vrf_id=0xFFFFFFFF,
1178             is_add=1,
1179         )
1180         flags = self.config_flags.NAT_IS_INSIDE
1181         self.vapi.nat64_add_del_interface(
1182             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1183         )
1184         self.vapi.nat64_add_del_interface(
1185             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1186         )
1187
1188         self.vapi.nat64_add_del_static_bib(
1189             i_addr=server.ip6n,
1190             o_addr=self.nat_addr,
1191             i_port=server_tcp_in_port,
1192             o_port=server_tcp_out_port,
1193             proto=IP_PROTOS.tcp,
1194             vrf_id=0,
1195             is_add=1,
1196         )
1197         self.vapi.nat64_add_del_static_bib(
1198             i_addr=server.ip6n,
1199             o_addr=self.nat_addr,
1200             i_port=server_udp_in_port,
1201             o_port=server_udp_out_port,
1202             proto=IP_PROTOS.udp,
1203             vrf_id=0,
1204             is_add=1,
1205         )
1206
1207         # client to server
1208         pkts = []
1209         p = (
1210             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1211             / IPv6(src=client.ip6, dst=nat_addr_ip6)
1212             / TCP(sport=client_tcp_in_port, dport=server_tcp_out_port)
1213         )
1214         pkts.append(p)
1215         p = (
1216             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1217             / IPv6(src=client.ip6, dst=nat_addr_ip6)
1218             / UDP(sport=client_udp_in_port, dport=server_udp_out_port)
1219         )
1220         pkts.append(p)
1221         self.pg0.add_stream(pkts)
1222         self.pg_enable_capture(self.pg_interfaces)
1223         self.pg_start()
1224         capture = self.pg0.get_capture(len(pkts))
1225         for packet in capture:
1226             try:
1227                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1228                 self.assertEqual(packet[IPv6].dst, server.ip6)
1229                 self.assert_packet_checksums_valid(packet)
1230                 if packet.haslayer(TCP):
1231                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
1232                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
1233                     client_tcp_out_port = packet[TCP].sport
1234                 else:
1235                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
1236                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
1237                     client_udp_out_port = packet[UDP].sport
1238             except:
1239                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1240                 raise
1241
1242         # server to client
1243         pkts = []
1244         p = (
1245             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1246             / IPv6(src=server.ip6, dst=nat_addr_ip6)
1247             / TCP(sport=server_tcp_in_port, dport=client_tcp_out_port)
1248         )
1249         pkts.append(p)
1250         p = (
1251             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1252             / IPv6(src=server.ip6, dst=nat_addr_ip6)
1253             / UDP(sport=server_udp_in_port, dport=client_udp_out_port)
1254         )
1255         pkts.append(p)
1256         self.pg0.add_stream(pkts)
1257         self.pg_enable_capture(self.pg_interfaces)
1258         self.pg_start()
1259         capture = self.pg0.get_capture(len(pkts))
1260         for packet in capture:
1261             try:
1262                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1263                 self.assertEqual(packet[IPv6].dst, client.ip6)
1264                 self.assert_packet_checksums_valid(packet)
1265                 if packet.haslayer(TCP):
1266                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
1267                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
1268                 else:
1269                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
1270                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
1271             except:
1272                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1273                 raise
1274
1275         # ICMP error
1276         pkts = []
1277         pkts = [
1278             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1279             / IPv6(src=client.ip6, dst=nat_addr_ip6)
1280             / ICMPv6DestUnreach(code=1)
1281             / packet[IPv6]
1282             for packet in capture
1283         ]
1284         self.pg0.add_stream(pkts)
1285         self.pg_enable_capture(self.pg_interfaces)
1286         self.pg_start()
1287         capture = self.pg0.get_capture(len(pkts))
1288         for packet in capture:
1289             try:
1290                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1291                 self.assertEqual(packet[IPv6].dst, server.ip6)
1292                 icmp = packet[ICMPv6DestUnreach]
1293                 self.assertEqual(icmp.code, 1)
1294                 inner = icmp[IPerror6]
1295                 self.assertEqual(inner.src, server.ip6)
1296                 self.assertEqual(inner.dst, nat_addr_ip6)
1297                 self.assert_packet_checksums_valid(packet)
1298                 if inner.haslayer(TCPerror):
1299                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
1300                     self.assertEqual(inner[TCPerror].dport, client_tcp_out_port)
1301                 else:
1302                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
1303                     self.assertEqual(inner[UDPerror].dport, client_udp_out_port)
1304             except:
1305                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1306                 raise
1307
1308     def test_prefix(self):
1309         """NAT64 Network-Specific Prefix"""
1310
1311         self.vapi.nat64_add_del_pool_addr_range(
1312             start_addr=self.nat_addr,
1313             end_addr=self.nat_addr,
1314             vrf_id=0xFFFFFFFF,
1315             is_add=1,
1316         )
1317         flags = self.config_flags.NAT_IS_INSIDE
1318         self.vapi.nat64_add_del_interface(
1319             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1320         )
1321         self.vapi.nat64_add_del_interface(
1322             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1323         )
1324         self.vapi.nat64_add_del_pool_addr_range(
1325             start_addr=self.vrf1_nat_addr,
1326             end_addr=self.vrf1_nat_addr,
1327             vrf_id=self.vrf1_id,
1328             is_add=1,
1329         )
1330         self.vapi.nat64_add_del_interface(
1331             is_add=1, flags=flags, sw_if_index=self.pg2.sw_if_index
1332         )
1333
1334         # Add global prefix
1335         global_pref64 = "2001:db8::"
1336         global_pref64_len = 32
1337         global_pref64_str = "{}/{}".format(global_pref64, global_pref64_len)
1338         self.vapi.nat64_add_del_prefix(prefix=global_pref64_str, vrf_id=0, is_add=1)
1339
1340         prefix = self.vapi.nat64_prefix_dump()
1341         self.assertEqual(len(prefix), 1)
1342         self.assertEqual(str(prefix[0].prefix), global_pref64_str)
1343         self.assertEqual(prefix[0].vrf_id, 0)
1344
1345         # Add tenant specific prefix
1346         vrf1_pref64 = "2001:db8:122:300::"
1347         vrf1_pref64_len = 56
1348         vrf1_pref64_str = "{}/{}".format(vrf1_pref64, vrf1_pref64_len)
1349         self.vapi.nat64_add_del_prefix(
1350             prefix=vrf1_pref64_str, vrf_id=self.vrf1_id, is_add=1
1351         )
1352
1353         prefix = self.vapi.nat64_prefix_dump()
1354         self.assertEqual(len(prefix), 2)
1355
1356         # Global prefix
1357         pkts = self.create_stream_in_ip6(
1358             self.pg0, self.pg1, pref=global_pref64, plen=global_pref64_len
1359         )
1360         self.pg0.add_stream(pkts)
1361         self.pg_enable_capture(self.pg_interfaces)
1362         self.pg_start()
1363         capture = self.pg1.get_capture(len(pkts))
1364         self.verify_capture_out(
1365             capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
1366         )
1367
1368         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1369         self.pg1.add_stream(pkts)
1370         self.pg_enable_capture(self.pg_interfaces)
1371         self.pg_start()
1372         capture = self.pg0.get_capture(len(pkts))
1373         dst_ip = self.compose_ip6(self.pg1.remote_ip4, global_pref64, global_pref64_len)
1374         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
1375
1376         # Tenant specific prefix
1377         pkts = self.create_stream_in_ip6(
1378             self.pg2, self.pg1, pref=vrf1_pref64, plen=vrf1_pref64_len
1379         )
1380         self.pg2.add_stream(pkts)
1381         self.pg_enable_capture(self.pg_interfaces)
1382         self.pg_start()
1383         capture = self.pg1.get_capture(len(pkts))
1384         self.verify_capture_out(
1385             capture, nat_ip=self.vrf1_nat_addr, dst_ip=self.pg1.remote_ip4
1386         )
1387
1388         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
1389         self.pg1.add_stream(pkts)
1390         self.pg_enable_capture(self.pg_interfaces)
1391         self.pg_start()
1392         capture = self.pg2.get_capture(len(pkts))
1393         dst_ip = self.compose_ip6(self.pg1.remote_ip4, vrf1_pref64, vrf1_pref64_len)
1394         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
1395
1396     def test_unknown_proto(self):
1397         """NAT64 translate packet with unknown protocol"""
1398
1399         self.vapi.nat64_add_del_pool_addr_range(
1400             start_addr=self.nat_addr,
1401             end_addr=self.nat_addr,
1402             vrf_id=0xFFFFFFFF,
1403             is_add=1,
1404         )
1405         flags = self.config_flags.NAT_IS_INSIDE
1406         self.vapi.nat64_add_del_interface(
1407             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1408         )
1409         self.vapi.nat64_add_del_interface(
1410             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1411         )
1412         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1413
1414         # in2out
1415         p = (
1416             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1417             / IPv6(src=self.pg0.remote_ip6, dst=remote_ip6)
1418             / TCP(sport=self.tcp_port_in, dport=20)
1419         )
1420         self.pg0.add_stream(p)
1421         self.pg_enable_capture(self.pg_interfaces)
1422         self.pg_start()
1423         p = self.pg1.get_capture(1)
1424
1425         p = (
1426             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1427             / IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47)
1428             / GRE()
1429             / IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4)
1430             / TCP(sport=1234, dport=1234)
1431         )
1432         self.pg0.add_stream(p)
1433         self.pg_enable_capture(self.pg_interfaces)
1434         self.pg_start()
1435         p = self.pg1.get_capture(1)
1436         packet = p[0]
1437         try:
1438             self.assertEqual(packet[IP].src, self.nat_addr)
1439             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1440             self.assertEqual(packet.haslayer(GRE), 1)
1441             self.assert_packet_checksums_valid(packet)
1442         except:
1443             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1444             raise
1445
1446         # out2in
1447         p = (
1448             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1449             / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1450             / GRE()
1451             / IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4)
1452             / TCP(sport=1234, dport=1234)
1453         )
1454         self.pg1.add_stream(p)
1455         self.pg_enable_capture(self.pg_interfaces)
1456         self.pg_start()
1457         p = self.pg0.get_capture(1)
1458         packet = p[0]
1459         try:
1460             self.assertEqual(packet[IPv6].src, remote_ip6)
1461             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1462             self.assertEqual(packet[IPv6].nh, 47)
1463         except:
1464             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1465             raise
1466
1467     def test_hairpinning_unknown_proto(self):
1468         """NAT64 translate packet with unknown protocol - hairpinning"""
1469
1470         client = self.pg0.remote_hosts[0]
1471         server = self.pg0.remote_hosts[1]
1472         server_tcp_in_port = 22
1473         server_tcp_out_port = 4022
1474         client_tcp_in_port = 1234
1475         client_tcp_out_port = 1235
1476         server_nat_ip = "10.0.0.100"
1477         client_nat_ip = "10.0.0.110"
1478         server_nat_ip6 = self.compose_ip6(server_nat_ip, "64:ff9b::", 96)
1479         client_nat_ip6 = self.compose_ip6(client_nat_ip, "64:ff9b::", 96)
1480
1481         self.vapi.nat64_add_del_pool_addr_range(
1482             start_addr=server_nat_ip,
1483             end_addr=client_nat_ip,
1484             vrf_id=0xFFFFFFFF,
1485             is_add=1,
1486         )
1487         flags = self.config_flags.NAT_IS_INSIDE
1488         self.vapi.nat64_add_del_interface(
1489             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1490         )
1491         self.vapi.nat64_add_del_interface(
1492             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1493         )
1494
1495         self.vapi.nat64_add_del_static_bib(
1496             i_addr=server.ip6n,
1497             o_addr=server_nat_ip,
1498             i_port=server_tcp_in_port,
1499             o_port=server_tcp_out_port,
1500             proto=IP_PROTOS.tcp,
1501             vrf_id=0,
1502             is_add=1,
1503         )
1504
1505         self.vapi.nat64_add_del_static_bib(
1506             i_addr=server.ip6n,
1507             o_addr=server_nat_ip,
1508             i_port=0,
1509             o_port=0,
1510             proto=IP_PROTOS.gre,
1511             vrf_id=0,
1512             is_add=1,
1513         )
1514
1515         self.vapi.nat64_add_del_static_bib(
1516             i_addr=client.ip6n,
1517             o_addr=client_nat_ip,
1518             i_port=client_tcp_in_port,
1519             o_port=client_tcp_out_port,
1520             proto=IP_PROTOS.tcp,
1521             vrf_id=0,
1522             is_add=1,
1523         )
1524
1525         # client to server
1526         p = (
1527             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1528             / IPv6(src=client.ip6, dst=server_nat_ip6)
1529             / TCP(sport=client_tcp_in_port, dport=server_tcp_out_port)
1530         )
1531         self.pg0.add_stream(p)
1532         self.pg_enable_capture(self.pg_interfaces)
1533         self.pg_start()
1534         p = self.pg0.get_capture(1)
1535
1536         p = (
1537             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1538             / IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre)
1539             / GRE()
1540             / IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4)
1541             / TCP(sport=1234, dport=1234)
1542         )
1543         self.pg0.add_stream(p)
1544         self.pg_enable_capture(self.pg_interfaces)
1545         self.pg_start()
1546         p = self.pg0.get_capture(1)
1547         packet = p[0]
1548         try:
1549             self.assertEqual(packet[IPv6].src, client_nat_ip6)
1550             self.assertEqual(packet[IPv6].dst, server.ip6)
1551             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1552         except:
1553             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1554             raise
1555
1556         # server to client
1557         p = (
1558             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1559             / IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre)
1560             / GRE()
1561             / IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4)
1562             / TCP(sport=1234, dport=1234)
1563         )
1564         self.pg0.add_stream(p)
1565         self.pg_enable_capture(self.pg_interfaces)
1566         self.pg_start()
1567         p = self.pg0.get_capture(1)
1568         packet = p[0]
1569         try:
1570             self.assertEqual(packet[IPv6].src, server_nat_ip6)
1571             self.assertEqual(packet[IPv6].dst, client.ip6)
1572             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1573         except:
1574             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1575             raise
1576
1577     def test_one_armed_nat64(self):
1578         """One armed NAT64"""
1579         external_port = 0
1580         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4, "64:ff9b::", 96)
1581
1582         self.vapi.nat64_add_del_pool_addr_range(
1583             start_addr=self.nat_addr,
1584             end_addr=self.nat_addr,
1585             vrf_id=0xFFFFFFFF,
1586             is_add=1,
1587         )
1588         flags = self.config_flags.NAT_IS_INSIDE
1589         self.vapi.nat64_add_del_interface(
1590             is_add=1, flags=flags, sw_if_index=self.pg3.sw_if_index
1591         )
1592         self.vapi.nat64_add_del_interface(
1593             is_add=1, flags=0, sw_if_index=self.pg3.sw_if_index
1594         )
1595
1596         # in2out
1597         p = (
1598             Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac)
1599             / IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6)
1600             / TCP(sport=12345, dport=80)
1601         )
1602         self.pg3.add_stream(p)
1603         self.pg_enable_capture(self.pg_interfaces)
1604         self.pg_start()
1605         capture = self.pg3.get_capture(1)
1606         p = capture[0]
1607         try:
1608             ip = p[IP]
1609             tcp = p[TCP]
1610             self.assertEqual(ip.src, self.nat_addr)
1611             self.assertEqual(ip.dst, self.pg3.remote_ip4)
1612             self.assertNotEqual(tcp.sport, 12345)
1613             external_port = tcp.sport
1614             self.assertEqual(tcp.dport, 80)
1615             self.assert_packet_checksums_valid(p)
1616         except:
1617             self.logger.error(ppp("Unexpected or invalid packet:", p))
1618             raise
1619
1620         # out2in
1621         p = (
1622             Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac)
1623             / IP(src=self.pg3.remote_ip4, dst=self.nat_addr)
1624             / TCP(sport=80, dport=external_port)
1625         )
1626         self.pg3.add_stream(p)
1627         self.pg_enable_capture(self.pg_interfaces)
1628         self.pg_start()
1629         capture = self.pg3.get_capture(1)
1630         p = capture[0]
1631         try:
1632             ip = p[IPv6]
1633             tcp = p[TCP]
1634             self.assertEqual(ip.src, remote_host_ip6)
1635             self.assertEqual(ip.dst, self.pg3.remote_ip6)
1636             self.assertEqual(tcp.sport, 80)
1637             self.assertEqual(tcp.dport, 12345)
1638             self.assert_packet_checksums_valid(p)
1639         except:
1640             self.logger.error(ppp("Unexpected or invalid packet:", p))
1641             raise
1642
1643     def test_frag_in_order(self):
1644         """NAT64 translate fragments arriving in order"""
1645         self.tcp_port_in = random.randint(1025, 65535)
1646
1647         self.vapi.nat64_add_del_pool_addr_range(
1648             start_addr=self.nat_addr,
1649             end_addr=self.nat_addr,
1650             vrf_id=0xFFFFFFFF,
1651             is_add=1,
1652         )
1653         flags = self.config_flags.NAT_IS_INSIDE
1654         self.vapi.nat64_add_del_interface(
1655             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1656         )
1657         self.vapi.nat64_add_del_interface(
1658             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1659         )
1660
1661         # in2out
1662         data = b"a" * 200
1663         pkts = self.create_stream_frag_ip6(
1664             self.pg0, self.pg1.remote_ip4, self.tcp_port_in, 20, data
1665         )
1666         self.pg0.add_stream(pkts)
1667         self.pg_enable_capture(self.pg_interfaces)
1668         self.pg_start()
1669         frags = self.pg1.get_capture(len(pkts))
1670         p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
1671         self.assertEqual(p[TCP].dport, 20)
1672         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1673         self.tcp_port_out = p[TCP].sport
1674         self.assertEqual(data, p[Raw].load)
1675
1676         # out2in
1677         data = b"A" * 4 + b"b" * 16 + b"C" * 3
1678         pkts = self.create_stream_frag(
1679             self.pg1, self.nat_addr, 20, self.tcp_port_out, data
1680         )
1681         self.pg1.add_stream(pkts)
1682         self.pg_enable_capture(self.pg_interfaces)
1683         self.pg_start()
1684         frags = self.pg0.get_capture(len(pkts))
1685         self.logger.debug(ppc("Captured:", frags))
1686         src = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1687         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1688         self.assertEqual(p[TCP].sport, 20)
1689         self.assertEqual(p[TCP].dport, self.tcp_port_in)
1690         self.assertEqual(data, p[Raw].load)
1691
1692     def test_reass_hairpinning(self):
1693         """NAT64 fragments hairpinning"""
1694         data = b"a" * 200
1695         server = self.pg0.remote_hosts[1]
1696         server_in_port = random.randint(1025, 65535)
1697         server_out_port = random.randint(1025, 65535)
1698         client_in_port = random.randint(1025, 65535)
1699         ip = IPv6(src="".join(["64:ff9b::", self.nat_addr]))
1700         nat_addr_ip6 = ip.src
1701
1702         self.vapi.nat64_add_del_pool_addr_range(
1703             start_addr=self.nat_addr,
1704             end_addr=self.nat_addr,
1705             vrf_id=0xFFFFFFFF,
1706             is_add=1,
1707         )
1708         flags = self.config_flags.NAT_IS_INSIDE
1709         self.vapi.nat64_add_del_interface(
1710             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1711         )
1712         self.vapi.nat64_add_del_interface(
1713             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1714         )
1715
1716         # add static BIB entry for server
1717         self.vapi.nat64_add_del_static_bib(
1718             i_addr=server.ip6n,
1719             o_addr=self.nat_addr,
1720             i_port=server_in_port,
1721             o_port=server_out_port,
1722             proto=IP_PROTOS.tcp,
1723             vrf_id=0,
1724             is_add=1,
1725         )
1726
1727         # send packet from host to server
1728         pkts = self.create_stream_frag_ip6(
1729             self.pg0, self.nat_addr, client_in_port, server_out_port, data
1730         )
1731         self.pg0.add_stream(pkts)
1732         self.pg_enable_capture(self.pg_interfaces)
1733         self.pg_start()
1734         frags = self.pg0.get_capture(len(pkts))
1735         self.logger.debug(ppc("Captured:", frags))
1736         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
1737         self.assertNotEqual(p[TCP].sport, client_in_port)
1738         self.assertEqual(p[TCP].dport, server_in_port)
1739         self.assertEqual(data, p[Raw].load)
1740
1741     def test_frag_out_of_order(self):
1742         """NAT64 translate fragments arriving out of order"""
1743         self.tcp_port_in = random.randint(1025, 65535)
1744
1745         self.vapi.nat64_add_del_pool_addr_range(
1746             start_addr=self.nat_addr,
1747             end_addr=self.nat_addr,
1748             vrf_id=0xFFFFFFFF,
1749             is_add=1,
1750         )
1751         flags = self.config_flags.NAT_IS_INSIDE
1752         self.vapi.nat64_add_del_interface(
1753             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1754         )
1755         self.vapi.nat64_add_del_interface(
1756             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1757         )
1758
1759         # in2out
1760         data = b"a" * 200
1761         pkts = self.create_stream_frag_ip6(
1762             self.pg0, self.pg1.remote_ip4, self.tcp_port_in, 20, data
1763         )
1764         pkts.reverse()
1765         self.pg0.add_stream(pkts)
1766         self.pg_enable_capture(self.pg_interfaces)
1767         self.pg_start()
1768         frags = self.pg1.get_capture(len(pkts))
1769         p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
1770         self.assertEqual(p[TCP].dport, 20)
1771         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1772         self.tcp_port_out = p[TCP].sport
1773         self.assertEqual(data, p[Raw].load)
1774
1775         # out2in
1776         data = b"A" * 4 + b"B" * 16 + b"C" * 3
1777         pkts = self.create_stream_frag(
1778             self.pg1, self.nat_addr, 20, self.tcp_port_out, data
1779         )
1780         pkts.reverse()
1781         self.pg1.add_stream(pkts)
1782         self.pg_enable_capture(self.pg_interfaces)
1783         self.pg_start()
1784         frags = self.pg0.get_capture(len(pkts))
1785         src = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1786         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1787         self.assertEqual(p[TCP].sport, 20)
1788         self.assertEqual(p[TCP].dport, self.tcp_port_in)
1789         self.assertEqual(data, p[Raw].load)
1790
1791     def test_interface_addr(self):
1792         """Acquire NAT64 pool addresses from interface"""
1793         self.vapi.nat64_add_del_interface_addr(
1794             is_add=1, sw_if_index=self.pg4.sw_if_index
1795         )
1796
1797         # no address in NAT64 pool
1798         addresses = self.vapi.nat44_address_dump()
1799         self.assertEqual(0, len(addresses))
1800
1801         # configure interface address and check NAT64 address pool
1802         self.pg4.config_ip4()
1803         addresses = self.vapi.nat64_pool_addr_dump()
1804         self.assertEqual(len(addresses), 1)
1805
1806         self.assertEqual(str(addresses[0].address), self.pg4.local_ip4)
1807
1808         # remove interface address and check NAT64 address pool
1809         self.pg4.unconfig_ip4()
1810         addresses = self.vapi.nat64_pool_addr_dump()
1811         self.assertEqual(0, len(addresses))
1812
1813     @unittest.skipUnless(config.extended, "part of extended tests")
1814     def test_ipfix_max_bibs_sessions(self):
1815         """IPFIX logging maximum session and BIB entries exceeded"""
1816         max_bibs = 1280
1817         max_sessions = 2560
1818         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1819
1820         self.vapi.nat64_add_del_pool_addr_range(
1821             start_addr=self.nat_addr,
1822             end_addr=self.nat_addr,
1823             vrf_id=0xFFFFFFFF,
1824             is_add=1,
1825         )
1826         flags = self.config_flags.NAT_IS_INSIDE
1827         self.vapi.nat64_add_del_interface(
1828             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1829         )
1830         self.vapi.nat64_add_del_interface(
1831             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1832         )
1833
1834         pkts = []
1835         src = ""
1836         for i in range(0, max_bibs):
1837             src = "fd01:aa::%x" % (i)
1838             p = (
1839                 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1840                 / IPv6(src=src, dst=remote_host_ip6)
1841                 / TCP(sport=12345, dport=80)
1842             )
1843             pkts.append(p)
1844             p = (
1845                 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1846                 / IPv6(src=src, dst=remote_host_ip6)
1847                 / TCP(sport=12345, dport=22)
1848             )
1849             pkts.append(p)
1850         self.pg0.add_stream(pkts)
1851         self.pg_enable_capture(self.pg_interfaces)
1852         self.pg_start()
1853         self.pg1.get_capture(max_sessions)
1854
1855         self.vapi.set_ipfix_exporter(
1856             collector_address=self.pg3.remote_ip4,
1857             src_address=self.pg3.local_ip4,
1858             path_mtu=512,
1859             template_interval=10,
1860         )
1861         self.vapi.nat_ipfix_enable_disable(
1862             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
1863         )
1864
1865         p = (
1866             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1867             / IPv6(src=src, dst=remote_host_ip6)
1868             / TCP(sport=12345, dport=25)
1869         ) * 3
1870         self.pg0.add_stream(p)
1871         self.pg_enable_capture(self.pg_interfaces)
1872         self.pg_start()
1873         self.pg1.assert_nothing_captured()
1874         self.vapi.ipfix_flush()
1875         capture = self.pg3.get_capture(7)
1876         ipfix = IPFIXDecoder()
1877         # first load template
1878         for p in capture:
1879             self.assertTrue(p.haslayer(IPFIX))
1880             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1881             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1882             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1883             self.assertEqual(p[UDP].dport, 4739)
1884             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1885             if p.haslayer(Template):
1886                 ipfix.add_template(p.getlayer(Template))
1887         # verify events in data set
1888         event_count = 0
1889         for p in capture:
1890             if p.haslayer(Data):
1891                 data = ipfix.decode_data_set(p.getlayer(Set))
1892                 event_count += self.verify_ipfix_max_sessions(data, max_sessions)
1893         self.assertEqual(event_count, 1)
1894
1895         p = (
1896             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1897             / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
1898             / TCP(sport=12345, dport=80)
1899         ) * 3
1900         self.pg0.add_stream(p)
1901         self.pg_enable_capture(self.pg_interfaces)
1902         self.pg_start()
1903         self.pg1.assert_nothing_captured()
1904         self.vapi.ipfix_flush()
1905         capture = self.pg3.get_capture(1)
1906         # verify events in data set
1907         event_count = 0
1908         for p in capture:
1909             self.assertTrue(p.haslayer(IPFIX))
1910             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1911             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1912             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1913             self.assertEqual(p[UDP].dport, 4739)
1914             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1915             if p.haslayer(Data):
1916                 data = ipfix.decode_data_set(p.getlayer(Set))
1917                 event_count += self.verify_ipfix_max_bibs(data, max_bibs)
1918         self.assertEqual(event_count, 1)
1919
1920     def test_ipfix_bib_ses(self):
1921         """IPFIX logging NAT64 BIB/session create and delete events"""
1922         self.tcp_port_in = random.randint(1025, 65535)
1923         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1924
1925         self.vapi.nat64_add_del_pool_addr_range(
1926             start_addr=self.nat_addr,
1927             end_addr=self.nat_addr,
1928             vrf_id=0xFFFFFFFF,
1929             is_add=1,
1930         )
1931         flags = self.config_flags.NAT_IS_INSIDE
1932         self.vapi.nat64_add_del_interface(
1933             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1934         )
1935         self.vapi.nat64_add_del_interface(
1936             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1937         )
1938         self.vapi.set_ipfix_exporter(
1939             collector_address=self.pg3.remote_ip4,
1940             src_address=self.pg3.local_ip4,
1941             path_mtu=512,
1942             template_interval=10,
1943         )
1944         self.vapi.nat_ipfix_enable_disable(
1945             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
1946         )
1947
1948         # Create
1949         p = (
1950             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1951             / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
1952             / TCP(sport=self.tcp_port_in, dport=25)
1953         )
1954         self.pg0.add_stream(p)
1955         self.pg_enable_capture(self.pg_interfaces)
1956         self.pg_start()
1957         p = self.pg1.get_capture(1)
1958         self.tcp_port_out = p[0][TCP].sport
1959         self.vapi.ipfix_flush()
1960         capture = self.pg3.get_capture(8)
1961         ipfix = IPFIXDecoder()
1962         # first load template
1963         for p in capture:
1964             self.assertTrue(p.haslayer(IPFIX))
1965             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1966             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1967             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1968             self.assertEqual(p[UDP].dport, 4739)
1969             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1970             if p.haslayer(Template):
1971                 ipfix.add_template(p.getlayer(Template))
1972         # verify events in data set
1973         for p in capture:
1974             if p.haslayer(Data):
1975                 data = ipfix.decode_data_set(p.getlayer(Set))
1976                 if scapy.compat.orb(data[0][230]) == 10:
1977                     self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6)
1978                 elif scapy.compat.orb(data[0][230]) == 6:
1979                     self.verify_ipfix_nat64_ses(
1980                         data, 1, self.pg0.remote_ip6, self.pg1.remote_ip4, 25
1981                     )
1982                 else:
1983                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
1984
1985         # Delete
1986         self.pg_enable_capture(self.pg_interfaces)
1987         self.vapi.nat64_add_del_pool_addr_range(
1988             start_addr=self.nat_addr,
1989             end_addr=self.nat_addr,
1990             vrf_id=0xFFFFFFFF,
1991             is_add=0,
1992         )
1993         self.vapi.ipfix_flush()
1994         capture = self.pg3.get_capture(2)
1995         # verify events in data set
1996         for p in capture:
1997             self.assertTrue(p.haslayer(IPFIX))
1998             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1999             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2000             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2001             self.assertEqual(p[UDP].dport, 4739)
2002             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2003             if p.haslayer(Data):
2004                 data = ipfix.decode_data_set(p.getlayer(Set))
2005                 if scapy.compat.orb(data[0][230]) == 11:
2006                     self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6)
2007                 elif scapy.compat.orb(data[0][230]) == 7:
2008                     self.verify_ipfix_nat64_ses(
2009                         data, 0, self.pg0.remote_ip6, self.pg1.remote_ip4, 25
2010                     )
2011                 else:
2012                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
2013
2014     def test_syslog_sess(self):
2015         """Test syslog session creation and deletion"""
2016         self.tcp_port_in = random.randint(1025, 65535)
2017         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
2018
2019         self.vapi.nat64_add_del_pool_addr_range(
2020             start_addr=self.nat_addr,
2021             end_addr=self.nat_addr,
2022             vrf_id=0xFFFFFFFF,
2023             is_add=1,
2024         )
2025         flags = self.config_flags.NAT_IS_INSIDE
2026         self.vapi.nat64_add_del_interface(
2027             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
2028         )
2029         self.vapi.nat64_add_del_interface(
2030             is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
2031         )
2032         self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2033         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2034
2035         p = (
2036             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2037             / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
2038             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
2039         )
2040         self.pg0.add_stream(p)
2041         self.pg_enable_capture(self.pg_interfaces)
2042         self.pg_start()
2043         p = self.pg1.get_capture(1)
2044         self.tcp_port_out = p[0][TCP].sport
2045         capture = self.pg3.get_capture(1)
2046         self.verify_syslog_sess(capture[0][Raw].load, is_ip6=True)
2047
2048         self.pg_enable_capture(self.pg_interfaces)
2049         self.pg_start()
2050         self.vapi.nat64_add_del_pool_addr_range(
2051             start_addr=self.nat_addr,
2052             end_addr=self.nat_addr,
2053             vrf_id=0xFFFFFFFF,
2054             is_add=0,
2055         )
2056         capture = self.pg3.get_capture(1)
2057         self.verify_syslog_sess(capture[0][Raw].load, False, True)
2058
2059     def nat64_get_ses_num(self):
2060         """
2061         Return number of active NAT64 sessions.
2062         """
2063         st = self.vapi.nat64_st_dump(proto=255)
2064         return len(st)
2065
2066     def clear_nat64(self):
2067         """
2068         Clear NAT64 configuration.
2069         """
2070         self.vapi.nat_ipfix_enable_disable(
2071             domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
2072         )
2073         self.ipfix_src_port = 4739
2074         self.ipfix_domain_id = 1
2075
2076         self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
2077
2078         self.vapi.nat64_set_timeouts(
2079             udp=300, tcp_established=7440, tcp_transitory=240, icmp=60
2080         )
2081
2082         interfaces = self.vapi.nat64_interface_dump()
2083         for intf in interfaces:
2084             self.vapi.nat64_add_del_interface(
2085                 is_add=0, flags=intf.flags, sw_if_index=intf.sw_if_index
2086             )
2087
2088         bib = self.vapi.nat64_bib_dump(proto=255)
2089         for bibe in bib:
2090             if bibe.flags & self.config_flags.NAT_IS_STATIC:
2091                 self.vapi.nat64_add_del_static_bib(
2092                     i_addr=bibe.i_addr,
2093                     o_addr=bibe.o_addr,
2094                     i_port=bibe.i_port,
2095                     o_port=bibe.o_port,
2096                     proto=bibe.proto,
2097                     vrf_id=bibe.vrf_id,
2098                     is_add=0,
2099                 )
2100
2101         adresses = self.vapi.nat64_pool_addr_dump()
2102         for addr in adresses:
2103             self.vapi.nat64_add_del_pool_addr_range(
2104                 start_addr=addr.address,
2105                 end_addr=addr.address,
2106                 vrf_id=addr.vrf_id,
2107                 is_add=0,
2108             )
2109
2110         prefixes = self.vapi.nat64_prefix_dump()
2111         for prefix in prefixes:
2112             self.vapi.nat64_add_del_prefix(
2113                 prefix=str(prefix.prefix), vrf_id=prefix.vrf_id, is_add=0
2114             )
2115
2116         bibs = self.statistics.get_counter("/nat64/total-bibs")
2117         self.assertEqual(bibs[0][0], 0)
2118         sessions = self.statistics.get_counter("/nat64/total-sessions")
2119         self.assertEqual(sessions[0][0], 0)
2120
2121
2122 if __name__ == "__main__":
2123     unittest.main(testRunner=VppTestRunner)