6f88702d16a550e4dd54d0c392c11c95d233b53f
[vpp.git] / src / plugins / nat / test / test_nat64.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import random
5 import socket
6 import struct
7 import unittest
8 from io import BytesIO
9 from time import sleep
10
11 import scapy.compat
12 from framework import VppTestCase, VppTestRunner, running_extended_tests
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from scapy.data import IP_PROTOS
15 from scapy.layers.inet import IP, TCP, UDP, ICMP
16 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
17 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
18 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
19     ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
20 from scapy.layers.l2 import Ether, GRE
21 from scapy.packet import Raw
22 from syslog_rfc5424_parser import SyslogMessage, ParseError
23 from syslog_rfc5424_parser.constants import SyslogSeverity
24 from util import ppc, ppp
25 from vpp_papi import VppEnum
26
27
28 class TestNAT64(VppTestCase):
29     """ NAT64 Test Cases """
30
31     @property
32     def SYSLOG_SEVERITY(self):
33         return VppEnum.vl_api_syslog_severity_t
34
35     @property
36     def config_flags(self):
37         return VppEnum.vl_api_nat_config_flags_t
38
39     @classmethod
40     def setUpClass(cls):
41         super(TestNAT64, cls).setUpClass()
42
43         cls.tcp_port_in = 6303
44         cls.tcp_port_out = 6303
45         cls.udp_port_in = 6304
46         cls.udp_port_out = 6304
47         cls.icmp_id_in = 6305
48         cls.icmp_id_out = 6305
49         cls.tcp_external_port = 80
50         cls.nat_addr = '10.0.0.3'
51         cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
52         cls.vrf1_id = 10
53         cls.vrf1_nat_addr = '10.0.10.3'
54         cls.ipfix_src_port = 4739
55         cls.ipfix_domain_id = 1
56
57         cls.create_pg_interfaces(range(6))
58         cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
59         cls.ip6_interfaces.append(cls.pg_interfaces[2])
60         cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
61
62         cls.vapi.ip_table_add_del(is_add=1,
63                                   table={'table_id': cls.vrf1_id,
64                                          'is_ip6': 1})
65
66         cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
67
68         cls.pg0.generate_remote_hosts(2)
69
70         for i in cls.ip6_interfaces:
71             i.admin_up()
72             i.config_ip6()
73             i.configure_ipv6_neighbors()
74
75         for i in cls.ip4_interfaces:
76             i.admin_up()
77             i.config_ip4()
78             i.resolve_arp()
79
80         cls.pg3.admin_up()
81         cls.pg3.config_ip4()
82         cls.pg3.resolve_arp()
83         cls.pg3.config_ip6()
84         cls.pg3.configure_ipv6_neighbors()
85
86         cls.pg5.admin_up()
87         cls.pg5.config_ip6()
88
89     @classmethod
90     def tearDownClass(cls):
91         super(TestNAT64, cls).tearDownClass()
92
93     def setUp(self):
94         super(TestNAT64, self).setUp()
95         self.vapi.nat64_plugin_enable_disable(enable=1,
96                                               bib_buckets=128, st_buckets=256)
97
98     def tearDown(self):
99         super(TestNAT64, self).tearDown()
100         if not self.vpp_dead:
101             self.vapi.nat64_plugin_enable_disable(enable=0)
102
103     def show_commands_at_teardown(self):
104         self.logger.info(self.vapi.cli("show nat64 pool"))
105         self.logger.info(self.vapi.cli("show nat64 interfaces"))
106         self.logger.info(self.vapi.cli("show nat64 prefix"))
107         self.logger.info(self.vapi.cli("show nat64 bib all"))
108         self.logger.info(self.vapi.cli("show nat64 session table all"))
109
110     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
111         """
112         Create IPv6 packet stream for inside network
113
114         :param in_if: Inside interface
115         :param out_if: Outside interface
116         :param ttl: Hop Limit of generated packets
117         :param pref: NAT64 prefix
118         :param plen: NAT64 prefix length
119         """
120         pkts = []
121         if pref is None:
122             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
123         else:
124             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
125
126         # TCP
127         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
128              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
129              TCP(sport=self.tcp_port_in, dport=20))
130         pkts.append(p)
131
132         # UDP
133         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
134              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
135              UDP(sport=self.udp_port_in, dport=20))
136         pkts.append(p)
137
138         # ICMP
139         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
140              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
141              ICMPv6EchoRequest(id=self.icmp_id_in))
142         pkts.append(p)
143
144         return pkts
145
146     def create_stream_out(self, out_if, dst_ip=None, ttl=64,
147                           use_inside_ports=False):
148         """
149         Create packet stream for outside network
150
151         :param out_if: Outside interface
152         :param dst_ip: Destination IP address (Default use global NAT address)
153         :param ttl: TTL of generated packets
154         :param use_inside_ports: Use inside NAT ports as destination ports
155                instead of outside ports
156         """
157         if dst_ip is None:
158             dst_ip = self.nat_addr
159         if not use_inside_ports:
160             tcp_port = self.tcp_port_out
161             udp_port = self.udp_port_out
162             icmp_id = self.icmp_id_out
163         else:
164             tcp_port = self.tcp_port_in
165             udp_port = self.udp_port_in
166             icmp_id = self.icmp_id_in
167         pkts = []
168         # TCP
169         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
170              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
171              TCP(dport=tcp_port, sport=20))
172         pkts.extend([p, p])
173
174         # UDP
175         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
176              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
177              UDP(dport=udp_port, sport=20))
178         pkts.append(p)
179
180         # ICMP
181         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
182              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
183              ICMP(id=icmp_id, type='echo-reply'))
184         pkts.append(p)
185
186         return pkts
187
188     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
189                            dst_ip=None, is_ip6=False, ignore_port=False):
190         """
191         Verify captured packets on outside network
192
193         :param capture: Captured packets
194         :param nat_ip: Translated IP address (Default use global NAT address)
195         :param same_port: Source port number is not translated (Default False)
196         :param dst_ip: Destination IP address (Default do not verify)
197         :param is_ip6: If L3 protocol is IPv6 (Default False)
198         """
199         if is_ip6:
200             IP46 = IPv6
201             ICMP46 = ICMPv6EchoRequest
202         else:
203             IP46 = IP
204             ICMP46 = ICMP
205         if nat_ip is None:
206             nat_ip = self.nat_addr
207         for packet in capture:
208             try:
209                 if not is_ip6:
210                     self.assert_packet_checksums_valid(packet)
211                 self.assertEqual(packet[IP46].src, nat_ip)
212                 if dst_ip is not None:
213                     self.assertEqual(packet[IP46].dst, dst_ip)
214                 if packet.haslayer(TCP):
215                     if not ignore_port:
216                         if same_port:
217                             self.assertEqual(
218                                 packet[TCP].sport, self.tcp_port_in)
219                         else:
220                             self.assertNotEqual(
221                                 packet[TCP].sport, self.tcp_port_in)
222                     self.tcp_port_out = packet[TCP].sport
223                     self.assert_packet_checksums_valid(packet)
224                 elif packet.haslayer(UDP):
225                     if not ignore_port:
226                         if same_port:
227                             self.assertEqual(
228                                 packet[UDP].sport, self.udp_port_in)
229                         else:
230                             self.assertNotEqual(
231                                 packet[UDP].sport, self.udp_port_in)
232                     self.udp_port_out = packet[UDP].sport
233                 else:
234                     if not ignore_port:
235                         if same_port:
236                             self.assertEqual(
237                                 packet[ICMP46].id, self.icmp_id_in)
238                         else:
239                             self.assertNotEqual(
240                                 packet[ICMP46].id, self.icmp_id_in)
241                     self.icmp_id_out = packet[ICMP46].id
242                     self.assert_packet_checksums_valid(packet)
243             except:
244                 self.logger.error(ppp("Unexpected or invalid packet "
245                                       "(outside network):", packet))
246                 raise
247
248     def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
249         """
250         Verify captured IPv6 packets on inside network
251
252         :param capture: Captured packets
253         :param src_ip: Source IP
254         :param dst_ip: Destination IP address
255         """
256         for packet in capture:
257             try:
258                 self.assertEqual(packet[IPv6].src, src_ip)
259                 self.assertEqual(packet[IPv6].dst, dst_ip)
260                 self.assert_packet_checksums_valid(packet)
261                 if packet.haslayer(TCP):
262                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
263                 elif packet.haslayer(UDP):
264                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
265                 else:
266                     self.assertEqual(packet[ICMPv6EchoReply].id,
267                                      self.icmp_id_in)
268             except:
269                 self.logger.error(ppp("Unexpected or invalid packet "
270                                       "(inside network):", packet))
271                 raise
272
273     def create_stream_frag(self, src_if, dst, sport, dport, data,
274                            proto=IP_PROTOS.tcp, echo_reply=False):
275         """
276         Create fragmented packet stream
277
278         :param src_if: Source interface
279         :param dst: Destination IPv4 address
280         :param sport: Source port
281         :param dport: Destination port
282         :param data: Payload data
283         :param proto: protocol (TCP, UDP, ICMP)
284         :param echo_reply: use echo_reply if protocol is ICMP
285         :returns: Fragments
286         """
287         if proto == IP_PROTOS.tcp:
288             p = (IP(src=src_if.remote_ip4, dst=dst) /
289                  TCP(sport=sport, dport=dport) /
290                  Raw(data))
291             p = p.__class__(scapy.compat.raw(p))
292             chksum = p[TCP].chksum
293             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
294         elif proto == IP_PROTOS.udp:
295             proto_header = UDP(sport=sport, dport=dport)
296         elif proto == IP_PROTOS.icmp:
297             if not echo_reply:
298                 proto_header = ICMP(id=sport, type='echo-request')
299             else:
300                 proto_header = ICMP(id=sport, type='echo-reply')
301         else:
302             raise Exception("Unsupported protocol")
303         id = random.randint(0, 65535)
304         pkts = []
305         if proto == IP_PROTOS.tcp:
306             raw = Raw(data[0:4])
307         else:
308             raw = Raw(data[0:16])
309         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
310              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
311              proto_header /
312              raw)
313         pkts.append(p)
314         if proto == IP_PROTOS.tcp:
315             raw = Raw(data[4:20])
316         else:
317             raw = Raw(data[16:32])
318         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
319              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
320                 proto=proto) /
321              raw)
322         pkts.append(p)
323         if proto == IP_PROTOS.tcp:
324             raw = Raw(data[20:])
325         else:
326             raw = Raw(data[32:])
327         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
328              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
329                 id=id) /
330              raw)
331         pkts.append(p)
332         return pkts
333
334     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
335                                pref=None, plen=0, frag_size=128):
336         """
337         Create fragmented packet stream
338
339         :param src_if: Source interface
340         :param dst: Destination IPv4 address
341         :param sport: Source TCP port
342         :param dport: Destination TCP port
343         :param data: Payload data
344         :param pref: NAT64 prefix
345         :param plen: NAT64 prefix length
346         :param fragsize: size of fragments
347         :returns: Fragments
348         """
349         if pref is None:
350             dst_ip6 = ''.join(['64:ff9b::', dst])
351         else:
352             dst_ip6 = self.compose_ip6(dst, pref, plen)
353
354         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
355              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
356              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
357              TCP(sport=sport, dport=dport) /
358              Raw(data))
359
360         return fragment6(p, frag_size)
361
362     def reass_frags_and_verify(self, frags, src, dst):
363         """
364         Reassemble and verify fragmented packet
365
366         :param frags: Captured fragments
367         :param src: Source IPv4 address to verify
368         :param dst: Destination IPv4 address to verify
369
370         :returns: Reassembled IPv4 packet
371         """
372         buffer = BytesIO()
373         for p in frags:
374             self.assertEqual(p[IP].src, src)
375             self.assertEqual(p[IP].dst, dst)
376             self.assert_ip_checksum_valid(p)
377             buffer.seek(p[IP].frag * 8)
378             buffer.write(bytes(p[IP].payload))
379         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
380                 proto=frags[0][IP].proto)
381         if ip.proto == IP_PROTOS.tcp:
382             p = (ip / TCP(buffer.getvalue()))
383             self.logger.debug(ppp("Reassembled:", p))
384             self.assert_tcp_checksum_valid(p)
385         elif ip.proto == IP_PROTOS.udp:
386             p = (ip / UDP(buffer.getvalue()[:8]) /
387                  Raw(buffer.getvalue()[8:]))
388         elif ip.proto == IP_PROTOS.icmp:
389             p = (ip / ICMP(buffer.getvalue()))
390         return p
391
392     def reass_frags_and_verify_ip6(self, frags, src, dst):
393         """
394         Reassemble and verify fragmented packet
395
396         :param frags: Captured fragments
397         :param src: Source IPv6 address to verify
398         :param dst: Destination IPv6 address to verify
399
400         :returns: Reassembled IPv6 packet
401         """
402         buffer = BytesIO()
403         for p in frags:
404             self.assertEqual(p[IPv6].src, src)
405             self.assertEqual(p[IPv6].dst, dst)
406             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
407             buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
408         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
409                   nh=frags[0][IPv6ExtHdrFragment].nh)
410         if ip.nh == IP_PROTOS.tcp:
411             p = (ip / TCP(buffer.getvalue()))
412         elif ip.nh == IP_PROTOS.udp:
413             p = (ip / UDP(buffer.getvalue()))
414         self.logger.debug(ppp("Reassembled:", p))
415         self.assert_packet_checksums_valid(p)
416         return p
417
418     # TODO: ipfix needs to be separated from NAT base plugin
419     @unittest.skipUnless(running_extended_tests, "part of extended tests")
420     def verify_ipfix_max_bibs(self, data, limit):
421         """
422         Verify IPFIX maximum BIB entries exceeded event
423
424         :param data: Decoded IPFIX data records
425         :param limit: Number of maximum BIB entries that can be created.
426         """
427         self.assertEqual(1, len(data))
428         record = data[0]
429         # natEvent
430         self.assertEqual(scapy.compat.orb(record[230]), 13)
431         # natQuotaExceededEvent
432         self.assertEqual(struct.pack("I", 2), record[466])
433         # maxBIBEntries
434         self.assertEqual(struct.pack("I", limit), record[472])
435
436     # TODO: ipfix needs to be separated from NAT base plugin
437     @unittest.skipUnless(running_extended_tests, "part of extended tests")
438     def verify_ipfix_bib(self, data, is_create, src_addr):
439         """
440         Verify IPFIX NAT64 BIB create and delete events
441
442         :param data: Decoded IPFIX data records
443         :param is_create: Create event if nonzero value otherwise delete event
444         :param src_addr: IPv6 source address
445         """
446         self.assertEqual(1, len(data))
447         record = data[0]
448         # natEvent
449         if is_create:
450             self.assertEqual(scapy.compat.orb(record[230]), 10)
451         else:
452             self.assertEqual(scapy.compat.orb(record[230]), 11)
453         # sourceIPv6Address
454         self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
455         # postNATSourceIPv4Address
456         self.assertEqual(self.nat_addr_n, record[225])
457         # protocolIdentifier
458         self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
459         # ingressVRFID
460         self.assertEqual(struct.pack("!I", 0), record[234])
461         # sourceTransportPort
462         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
463         # postNAPTSourceTransportPort
464         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
465
466     # TODO: ipfix needs to be separated from NAT base plugin
467     @unittest.skipUnless(running_extended_tests, "part of extended tests")
468     def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
469                                dst_port):
470         """
471         Verify IPFIX NAT64 session create and delete events
472
473         :param data: Decoded IPFIX data records
474         :param is_create: Create event if nonzero value otherwise delete event
475         :param src_addr: IPv6 source address
476         :param dst_addr: IPv4 destination address
477         :param dst_port: destination TCP port
478         """
479         self.assertEqual(1, len(data))
480         record = data[0]
481         # natEvent
482         if is_create:
483             self.assertEqual(scapy.compat.orb(record[230]), 6)
484         else:
485             self.assertEqual(scapy.compat.orb(record[230]), 7)
486         # sourceIPv6Address
487         self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
488         # destinationIPv6Address
489         self.assertEqual(socket.inet_pton(socket.AF_INET6,
490                                           self.compose_ip6(dst_addr,
491                                                            '64:ff9b::',
492                                                            96)),
493                          record[28])
494         # postNATSourceIPv4Address
495         self.assertEqual(self.nat_addr_n, record[225])
496         # postNATDestinationIPv4Address
497         self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
498                          record[226])
499         # protocolIdentifier
500         self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
501         # ingressVRFID
502         self.assertEqual(struct.pack("!I", 0), record[234])
503         # sourceTransportPort
504         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
505         # postNAPTSourceTransportPort
506         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
507         # destinationTransportPort
508         self.assertEqual(struct.pack("!H", dst_port), record[11])
509         # postNAPTDestinationTransportPort
510         self.assertEqual(struct.pack("!H", dst_port), record[228])
511
512     def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
513         message = data.decode('utf-8')
514         try:
515             message = SyslogMessage.parse(message)
516         except ParseError as e:
517             self.logger.error(e)
518             raise
519         else:
520             self.assertEqual(message.severity, SyslogSeverity.info)
521             self.assertEqual(message.appname, 'NAT')
522             self.assertEqual(message.msgid, 'SADD' if is_add else 'SDEL')
523             sd_params = message.sd.get('nsess')
524             self.assertTrue(sd_params is not None)
525             if is_ip6:
526                 self.assertEqual(sd_params.get('IATYP'), 'IPv6')
527                 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip6)
528             else:
529                 self.assertEqual(sd_params.get('IATYP'), 'IPv4')
530                 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
531                 self.assertTrue(sd_params.get('SSUBIX') is not None)
532             self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
533             self.assertEqual(sd_params.get('XATYP'), 'IPv4')
534             self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
535             self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
536             self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
537             self.assertEqual(sd_params.get('SVLAN'), '0')
538             self.assertEqual(sd_params.get('XDADDR'), self.pg1.remote_ip4)
539             self.assertEqual(sd_params.get('XDPORT'),
540                              "%d" % self.tcp_external_port)
541
542     def compose_ip6(self, ip4, pref, plen):
543         """
544         Compose IPv4-embedded IPv6 addresses
545
546         :param ip4: IPv4 address
547         :param pref: IPv6 prefix
548         :param plen: IPv6 prefix length
549         :returns: IPv4-embedded IPv6 addresses
550         """
551         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
552         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
553         if plen == 32:
554             pref_n[4] = ip4_n[0]
555             pref_n[5] = ip4_n[1]
556             pref_n[6] = ip4_n[2]
557             pref_n[7] = ip4_n[3]
558         elif plen == 40:
559             pref_n[5] = ip4_n[0]
560             pref_n[6] = ip4_n[1]
561             pref_n[7] = ip4_n[2]
562             pref_n[9] = ip4_n[3]
563         elif plen == 48:
564             pref_n[6] = ip4_n[0]
565             pref_n[7] = ip4_n[1]
566             pref_n[9] = ip4_n[2]
567             pref_n[10] = ip4_n[3]
568         elif plen == 56:
569             pref_n[7] = ip4_n[0]
570             pref_n[9] = ip4_n[1]
571             pref_n[10] = ip4_n[2]
572             pref_n[11] = ip4_n[3]
573         elif plen == 64:
574             pref_n[9] = ip4_n[0]
575             pref_n[10] = ip4_n[1]
576             pref_n[11] = ip4_n[2]
577             pref_n[12] = ip4_n[3]
578         elif plen == 96:
579             pref_n[12] = ip4_n[0]
580             pref_n[13] = ip4_n[1]
581             pref_n[14] = ip4_n[2]
582             pref_n[15] = ip4_n[3]
583         packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
584         return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
585
586     # TODO: ipfix needs to be separated from NAT base plugin
587     @unittest.skipUnless(running_extended_tests, "part of extended tests")
588     def verify_ipfix_max_sessions(self, data, limit):
589         """
590         Verify IPFIX maximum session entries exceeded event
591
592         :param data: Decoded IPFIX data records
593         :param limit: Number of maximum session entries that can be created.
594         """
595         self.assertEqual(1, len(data))
596         record = data[0]
597         # natEvent
598         self.assertEqual(scapy.compat.orb(record[230]), 13)
599         # natQuotaExceededEvent
600         self.assertEqual(struct.pack("I", 1), record[466])
601         # maxSessionEntries
602         self.assertEqual(struct.pack("I", limit), record[471])
603
604     def test_nat64_inside_interface_handles_neighbor_advertisement(self):
605         """ NAT64 inside interface handles Neighbor Advertisement """
606
607         flags = self.config_flags.NAT_IS_INSIDE
608         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
609                                           sw_if_index=self.pg5.sw_if_index)
610
611         # Try to send ping
612         ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
613                 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
614                 ICMPv6EchoRequest())
615         pkts = [ping]
616         self.pg5.add_stream(pkts)
617         self.pg_enable_capture(self.pg_interfaces)
618         self.pg_start()
619
620         # Wait for Neighbor Solicitation
621         capture = self.pg5.get_capture(len(pkts))
622         packet = capture[0]
623         try:
624             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
625             self.assertEqual(packet.haslayer(ICMPv6ND_NS), 1)
626             tgt = packet[ICMPv6ND_NS].tgt
627         except:
628             self.logger.error(ppp("Unexpected or invalid packet:", packet))
629             raise
630
631         # Send Neighbor Advertisement
632         p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
633              IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
634              ICMPv6ND_NA(tgt=tgt) /
635              ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
636         pkts = [p]
637         self.pg5.add_stream(pkts)
638         self.pg_enable_capture(self.pg_interfaces)
639         self.pg_start()
640
641         # Try to send ping again
642         pkts = [ping]
643         self.pg5.add_stream(pkts)
644         self.pg_enable_capture(self.pg_interfaces)
645         self.pg_start()
646
647         # Wait for ping reply
648         capture = self.pg5.get_capture(len(pkts))
649         packet = capture[0]
650         try:
651             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
652             self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
653             self.assertEqual(packet.haslayer(ICMPv6EchoReply), 1)
654         except:
655             self.logger.error(ppp("Unexpected or invalid packet:", packet))
656             raise
657
658     def test_pool(self):
659         """ Add/delete address to NAT64 pool """
660         nat_addr = '1.2.3.4'
661
662         self.vapi.nat64_add_del_pool_addr_range(start_addr=nat_addr,
663                                                 end_addr=nat_addr,
664                                                 vrf_id=0xFFFFFFFF, is_add=1)
665
666         addresses = self.vapi.nat64_pool_addr_dump()
667         self.assertEqual(len(addresses), 1)
668         self.assertEqual(str(addresses[0].address), nat_addr)
669
670         self.vapi.nat64_add_del_pool_addr_range(start_addr=nat_addr,
671                                                 end_addr=nat_addr,
672                                                 vrf_id=0xFFFFFFFF, is_add=0)
673
674         addresses = self.vapi.nat64_pool_addr_dump()
675         self.assertEqual(len(addresses), 0)
676
677     def test_interface(self):
678         """ Enable/disable NAT64 feature on the interface """
679         flags = self.config_flags.NAT_IS_INSIDE
680         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
681                                           sw_if_index=self.pg0.sw_if_index)
682         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
683                                           sw_if_index=self.pg1.sw_if_index)
684
685         interfaces = self.vapi.nat64_interface_dump()
686         self.assertEqual(len(interfaces), 2)
687         pg0_found = False
688         pg1_found = False
689         for intf in interfaces:
690             if intf.sw_if_index == self.pg0.sw_if_index:
691                 self.assertEqual(intf.flags, self.config_flags.NAT_IS_INSIDE)
692                 pg0_found = True
693             elif intf.sw_if_index == self.pg1.sw_if_index:
694                 self.assertEqual(intf.flags, self.config_flags.NAT_IS_OUTSIDE)
695                 pg1_found = True
696         self.assertTrue(pg0_found)
697         self.assertTrue(pg1_found)
698
699         features = self.vapi.cli("show interface features pg0")
700         self.assertIn('nat64-in2out', features)
701         features = self.vapi.cli("show interface features pg1")
702         self.assertIn('nat64-out2in', features)
703
704         self.vapi.nat64_add_del_interface(is_add=0, flags=flags,
705                                           sw_if_index=self.pg0.sw_if_index)
706         self.vapi.nat64_add_del_interface(is_add=0, flags=flags,
707                                           sw_if_index=self.pg1.sw_if_index)
708
709         interfaces = self.vapi.nat64_interface_dump()
710         self.assertEqual(len(interfaces), 0)
711
712     def test_static_bib(self):
713         """ Add/delete static BIB entry """
714         in_addr = '2001:db8:85a3::8a2e:370:7334'
715         out_addr = '10.1.1.3'
716         in_port = 1234
717         out_port = 5678
718         proto = IP_PROTOS.tcp
719
720         self.vapi.nat64_add_del_static_bib(i_addr=in_addr, o_addr=out_addr,
721                                            i_port=in_port, o_port=out_port,
722                                            proto=proto, vrf_id=0, is_add=1)
723         bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
724         static_bib_num = 0
725         for bibe in bib:
726             if bibe.flags & self.config_flags.NAT_IS_STATIC:
727                 static_bib_num += 1
728                 self.assertEqual(str(bibe.i_addr), in_addr)
729                 self.assertEqual(str(bibe.o_addr), out_addr)
730                 self.assertEqual(bibe.i_port, in_port)
731                 self.assertEqual(bibe.o_port, out_port)
732         self.assertEqual(static_bib_num, 1)
733         bibs = self.statistics.get_counter('/nat64/total-bibs')
734         self.assertEqual(bibs[0][0], 1)
735
736         self.vapi.nat64_add_del_static_bib(i_addr=in_addr, o_addr=out_addr,
737                                            i_port=in_port, o_port=out_port,
738                                            proto=proto, vrf_id=0, is_add=0)
739         bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
740         static_bib_num = 0
741         for bibe in bib:
742             if bibe.flags & self.config_flags.NAT_IS_STATIC:
743                 static_bib_num += 1
744         self.assertEqual(static_bib_num, 0)
745         bibs = self.statistics.get_counter('/nat64/total-bibs')
746         self.assertEqual(bibs[0][0], 0)
747
748     def test_set_timeouts(self):
749         """ Set NAT64 timeouts """
750         # verify default values
751         timeouts = self.vapi.nat64_get_timeouts()
752         self.assertEqual(timeouts.udp, 300)
753         self.assertEqual(timeouts.icmp, 60)
754         self.assertEqual(timeouts.tcp_transitory, 240)
755         self.assertEqual(timeouts.tcp_established, 7440)
756
757         # set and verify custom values
758         self.vapi.nat64_set_timeouts(udp=200, tcp_established=7450,
759                                      tcp_transitory=250, icmp=30)
760         timeouts = self.vapi.nat64_get_timeouts()
761         self.assertEqual(timeouts.udp, 200)
762         self.assertEqual(timeouts.icmp, 30)
763         self.assertEqual(timeouts.tcp_transitory, 250)
764         self.assertEqual(timeouts.tcp_established, 7450)
765
766     def test_dynamic(self):
767         """ NAT64 dynamic translation test """
768         self.tcp_port_in = 6303
769         self.udp_port_in = 6304
770         self.icmp_id_in = 6305
771
772         ses_num_start = self.nat64_get_ses_num()
773
774         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
775                                                 end_addr=self.nat_addr,
776                                                 vrf_id=0xFFFFFFFF,
777                                                 is_add=1)
778         flags = self.config_flags.NAT_IS_INSIDE
779         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
780                                           sw_if_index=self.pg0.sw_if_index)
781         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
782                                           sw_if_index=self.pg1.sw_if_index)
783
784         # in2out
785         tcpn = self.statistics.get_counter('/nat64/in2out/tcp')[0]
786         udpn = self.statistics.get_counter('/nat64/in2out/udp')[0]
787         icmpn = self.statistics.get_counter('/nat64/in2out/icmp')[0]
788         drops = self.statistics.get_counter('/nat64/in2out/drops')[0]
789
790         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
791         self.pg0.add_stream(pkts)
792         self.pg_enable_capture(self.pg_interfaces)
793         self.pg_start()
794         capture = self.pg1.get_capture(len(pkts))
795         self.verify_capture_out(capture, nat_ip=self.nat_addr,
796                                 dst_ip=self.pg1.remote_ip4)
797
798         if_idx = self.pg0.sw_if_index
799         cnt = self.statistics.get_counter('/nat64/in2out/tcp')[0]
800         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 1)
801         cnt = self.statistics.get_counter('/nat64/in2out/udp')[0]
802         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
803         cnt = self.statistics.get_counter('/nat64/in2out/icmp')[0]
804         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
805         cnt = self.statistics.get_counter('/nat64/in2out/drops')[0]
806         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
807
808         # out2in
809         tcpn = self.statistics.get_counter('/nat64/out2in/tcp')[0]
810         udpn = self.statistics.get_counter('/nat64/out2in/udp')[0]
811         icmpn = self.statistics.get_counter('/nat64/out2in/icmp')[0]
812         drops = self.statistics.get_counter('/nat64/out2in/drops')[0]
813
814         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
815         self.pg1.add_stream(pkts)
816         self.pg_enable_capture(self.pg_interfaces)
817         self.pg_start()
818         capture = self.pg0.get_capture(len(pkts))
819         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
820         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
821
822         if_idx = self.pg1.sw_if_index
823         cnt = self.statistics.get_counter('/nat64/out2in/tcp')[0]
824         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
825         cnt = self.statistics.get_counter('/nat64/out2in/udp')[0]
826         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
827         cnt = self.statistics.get_counter('/nat64/out2in/icmp')[0]
828         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
829         cnt = self.statistics.get_counter('/nat64/out2in/drops')[0]
830         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
831
832         bibs = self.statistics.get_counter('/nat64/total-bibs')
833         self.assertEqual(bibs[0][0], 3)
834         sessions = self.statistics.get_counter('/nat64/total-sessions')
835         self.assertEqual(sessions[0][0], 3)
836
837         # in2out
838         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
839         self.pg0.add_stream(pkts)
840         self.pg_enable_capture(self.pg_interfaces)
841         self.pg_start()
842         capture = self.pg1.get_capture(len(pkts))
843         self.verify_capture_out(capture, nat_ip=self.nat_addr,
844                                 dst_ip=self.pg1.remote_ip4)
845
846         # out2in
847         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
848         self.pg1.add_stream(pkts)
849         self.pg_enable_capture(self.pg_interfaces)
850         self.pg_start()
851         capture = self.pg0.get_capture(len(pkts))
852         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
853
854         ses_num_end = self.nat64_get_ses_num()
855
856         self.assertEqual(ses_num_end - ses_num_start, 3)
857
858         # tenant with specific VRF
859         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.vrf1_nat_addr,
860                                                 end_addr=self.vrf1_nat_addr,
861                                                 vrf_id=self.vrf1_id, is_add=1)
862         flags = self.config_flags.NAT_IS_INSIDE
863         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
864                                           sw_if_index=self.pg2.sw_if_index)
865
866         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
867         self.pg2.add_stream(pkts)
868         self.pg_enable_capture(self.pg_interfaces)
869         self.pg_start()
870         capture = self.pg1.get_capture(len(pkts))
871         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
872                                 dst_ip=self.pg1.remote_ip4)
873
874         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
875         self.pg1.add_stream(pkts)
876         self.pg_enable_capture(self.pg_interfaces)
877         self.pg_start()
878         capture = self.pg2.get_capture(len(pkts))
879         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
880
881     def test_static(self):
882         """ NAT64 static translation test """
883         self.tcp_port_in = 60303
884         self.udp_port_in = 60304
885         self.icmp_id_in = 60305
886         self.tcp_port_out = 60303
887         self.udp_port_out = 60304
888         self.icmp_id_out = 60305
889
890         ses_num_start = self.nat64_get_ses_num()
891
892         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
893                                                 end_addr=self.nat_addr,
894                                                 vrf_id=0xFFFFFFFF,
895                                                 is_add=1)
896         flags = self.config_flags.NAT_IS_INSIDE
897         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
898                                           sw_if_index=self.pg0.sw_if_index)
899         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
900                                           sw_if_index=self.pg1.sw_if_index)
901
902         self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
903                                            o_addr=self.nat_addr,
904                                            i_port=self.tcp_port_in,
905                                            o_port=self.tcp_port_out,
906                                            proto=IP_PROTOS.tcp, vrf_id=0,
907                                            is_add=1)
908         self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
909                                            o_addr=self.nat_addr,
910                                            i_port=self.udp_port_in,
911                                            o_port=self.udp_port_out,
912                                            proto=IP_PROTOS.udp, vrf_id=0,
913                                            is_add=1)
914         self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
915                                            o_addr=self.nat_addr,
916                                            i_port=self.icmp_id_in,
917                                            o_port=self.icmp_id_out,
918                                            proto=IP_PROTOS.icmp, vrf_id=0,
919                                            is_add=1)
920
921         # in2out
922         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
923         self.pg0.add_stream(pkts)
924         self.pg_enable_capture(self.pg_interfaces)
925         self.pg_start()
926         capture = self.pg1.get_capture(len(pkts))
927         self.verify_capture_out(capture, nat_ip=self.nat_addr,
928                                 dst_ip=self.pg1.remote_ip4, same_port=True)
929
930         # out2in
931         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
932         self.pg1.add_stream(pkts)
933         self.pg_enable_capture(self.pg_interfaces)
934         self.pg_start()
935         capture = self.pg0.get_capture(len(pkts))
936         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
937         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
938
939         ses_num_end = self.nat64_get_ses_num()
940
941         self.assertEqual(ses_num_end - ses_num_start, 3)
942
943     @unittest.skipUnless(running_extended_tests, "part of extended tests")
944     def test_session_timeout(self):
945         """ NAT64 session timeout """
946         self.icmp_id_in = 1234
947         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
948                                                 end_addr=self.nat_addr,
949                                                 vrf_id=0xFFFFFFFF,
950                                                 is_add=1)
951         flags = self.config_flags.NAT_IS_INSIDE
952         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
953                                           sw_if_index=self.pg0.sw_if_index)
954         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
955                                           sw_if_index=self.pg1.sw_if_index)
956         self.vapi.nat64_set_timeouts(udp=300, tcp_established=5,
957                                      tcp_transitory=5,
958                                      icmp=5)
959
960         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
961         self.pg0.add_stream(pkts)
962         self.pg_enable_capture(self.pg_interfaces)
963         self.pg_start()
964         capture = self.pg1.get_capture(len(pkts))
965
966         ses_num_before_timeout = self.nat64_get_ses_num()
967
968         sleep(15)
969
970         # ICMP and TCP session after timeout
971         ses_num_after_timeout = self.nat64_get_ses_num()
972         self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
973
974     def test_icmp_error(self):
975         """ NAT64 ICMP Error message translation """
976         self.tcp_port_in = 6303
977         self.udp_port_in = 6304
978         self.icmp_id_in = 6305
979
980         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
981                                                 end_addr=self.nat_addr,
982                                                 vrf_id=0xFFFFFFFF,
983                                                 is_add=1)
984         flags = self.config_flags.NAT_IS_INSIDE
985         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
986                                           sw_if_index=self.pg0.sw_if_index)
987         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
988                                           sw_if_index=self.pg1.sw_if_index)
989
990         # send some packets to create sessions
991         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
992         self.pg0.add_stream(pkts)
993         self.pg_enable_capture(self.pg_interfaces)
994         self.pg_start()
995         capture_ip4 = self.pg1.get_capture(len(pkts))
996         self.verify_capture_out(capture_ip4,
997                                 nat_ip=self.nat_addr,
998                                 dst_ip=self.pg1.remote_ip4)
999
1000         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1001         self.pg1.add_stream(pkts)
1002         self.pg_enable_capture(self.pg_interfaces)
1003         self.pg_start()
1004         capture_ip6 = self.pg0.get_capture(len(pkts))
1005         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
1006         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
1007                                    self.pg0.remote_ip6)
1008
1009         # in2out
1010         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1011                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
1012                 ICMPv6DestUnreach(code=1) /
1013                 packet[IPv6] for packet in capture_ip6]
1014         self.pg0.add_stream(pkts)
1015         self.pg_enable_capture(self.pg_interfaces)
1016         self.pg_start()
1017         capture = self.pg1.get_capture(len(pkts))
1018         for packet in capture:
1019             try:
1020                 self.assertEqual(packet[IP].src, self.nat_addr)
1021                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1022                 self.assertEqual(packet[ICMP].type, 3)
1023                 self.assertEqual(packet[ICMP].code, 13)
1024                 inner = packet[IPerror]
1025                 self.assertEqual(inner.src, self.pg1.remote_ip4)
1026                 self.assertEqual(inner.dst, self.nat_addr)
1027                 self.assert_packet_checksums_valid(packet)
1028                 if inner.haslayer(TCPerror):
1029                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
1030                 elif inner.haslayer(UDPerror):
1031                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
1032                 else:
1033                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
1034             except:
1035                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1036                 raise
1037
1038         # out2in
1039         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1040                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1041                 ICMP(type=3, code=13) /
1042                 packet[IP] for packet in capture_ip4]
1043         self.pg1.add_stream(pkts)
1044         self.pg_enable_capture(self.pg_interfaces)
1045         self.pg_start()
1046         capture = self.pg0.get_capture(len(pkts))
1047         for packet in capture:
1048             try:
1049                 self.assertEqual(packet[IPv6].src, ip.src)
1050                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1051                 icmp = packet[ICMPv6DestUnreach]
1052                 self.assertEqual(icmp.code, 1)
1053                 inner = icmp[IPerror6]
1054                 self.assertEqual(inner.src, self.pg0.remote_ip6)
1055                 self.assertEqual(inner.dst, ip.src)
1056                 self.assert_icmpv6_checksum_valid(packet)
1057                 if inner.haslayer(TCPerror):
1058                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
1059                 elif inner.haslayer(UDPerror):
1060                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
1061                 else:
1062                     self.assertEqual(inner[ICMPv6EchoRequest].id,
1063                                      self.icmp_id_in)
1064             except:
1065                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1066                 raise
1067
1068     def test_hairpinning(self):
1069         """ NAT64 hairpinning """
1070
1071         client = self.pg0.remote_hosts[0]
1072         server = self.pg0.remote_hosts[1]
1073         server_tcp_in_port = 22
1074         server_tcp_out_port = 4022
1075         server_udp_in_port = 23
1076         server_udp_out_port = 4023
1077         client_tcp_in_port = 1234
1078         client_udp_in_port = 1235
1079         client_tcp_out_port = 0
1080         client_udp_out_port = 0
1081         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
1082         nat_addr_ip6 = ip.src
1083
1084         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1085                                                 end_addr=self.nat_addr,
1086                                                 vrf_id=0xFFFFFFFF,
1087                                                 is_add=1)
1088         flags = self.config_flags.NAT_IS_INSIDE
1089         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1090                                           sw_if_index=self.pg0.sw_if_index)
1091         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1092                                           sw_if_index=self.pg1.sw_if_index)
1093
1094         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1095                                            o_addr=self.nat_addr,
1096                                            i_port=server_tcp_in_port,
1097                                            o_port=server_tcp_out_port,
1098                                            proto=IP_PROTOS.tcp, vrf_id=0,
1099                                            is_add=1)
1100         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1101                                            o_addr=self.nat_addr,
1102                                            i_port=server_udp_in_port,
1103                                            o_port=server_udp_out_port,
1104                                            proto=IP_PROTOS.udp, vrf_id=0,
1105                                            is_add=1)
1106
1107         # client to server
1108         pkts = []
1109         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1110              IPv6(src=client.ip6, dst=nat_addr_ip6) /
1111              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
1112         pkts.append(p)
1113         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1114              IPv6(src=client.ip6, dst=nat_addr_ip6) /
1115              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
1116         pkts.append(p)
1117         self.pg0.add_stream(pkts)
1118         self.pg_enable_capture(self.pg_interfaces)
1119         self.pg_start()
1120         capture = self.pg0.get_capture(len(pkts))
1121         for packet in capture:
1122             try:
1123                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1124                 self.assertEqual(packet[IPv6].dst, server.ip6)
1125                 self.assert_packet_checksums_valid(packet)
1126                 if packet.haslayer(TCP):
1127                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
1128                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
1129                     client_tcp_out_port = packet[TCP].sport
1130                 else:
1131                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
1132                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
1133                     client_udp_out_port = packet[UDP].sport
1134             except:
1135                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1136                 raise
1137
1138         # server to client
1139         pkts = []
1140         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1141              IPv6(src=server.ip6, dst=nat_addr_ip6) /
1142              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
1143         pkts.append(p)
1144         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1145              IPv6(src=server.ip6, dst=nat_addr_ip6) /
1146              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
1147         pkts.append(p)
1148         self.pg0.add_stream(pkts)
1149         self.pg_enable_capture(self.pg_interfaces)
1150         self.pg_start()
1151         capture = self.pg0.get_capture(len(pkts))
1152         for packet in capture:
1153             try:
1154                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1155                 self.assertEqual(packet[IPv6].dst, client.ip6)
1156                 self.assert_packet_checksums_valid(packet)
1157                 if packet.haslayer(TCP):
1158                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
1159                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
1160                 else:
1161                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
1162                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
1163             except:
1164                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1165                 raise
1166
1167         # ICMP error
1168         pkts = []
1169         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1170                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
1171                 ICMPv6DestUnreach(code=1) /
1172                 packet[IPv6] for packet in capture]
1173         self.pg0.add_stream(pkts)
1174         self.pg_enable_capture(self.pg_interfaces)
1175         self.pg_start()
1176         capture = self.pg0.get_capture(len(pkts))
1177         for packet in capture:
1178             try:
1179                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1180                 self.assertEqual(packet[IPv6].dst, server.ip6)
1181                 icmp = packet[ICMPv6DestUnreach]
1182                 self.assertEqual(icmp.code, 1)
1183                 inner = icmp[IPerror6]
1184                 self.assertEqual(inner.src, server.ip6)
1185                 self.assertEqual(inner.dst, nat_addr_ip6)
1186                 self.assert_packet_checksums_valid(packet)
1187                 if inner.haslayer(TCPerror):
1188                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
1189                     self.assertEqual(inner[TCPerror].dport,
1190                                      client_tcp_out_port)
1191                 else:
1192                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
1193                     self.assertEqual(inner[UDPerror].dport,
1194                                      client_udp_out_port)
1195             except:
1196                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1197                 raise
1198
1199     def test_prefix(self):
1200         """ NAT64 Network-Specific Prefix """
1201
1202         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1203                                                 end_addr=self.nat_addr,
1204                                                 vrf_id=0xFFFFFFFF,
1205                                                 is_add=1)
1206         flags = self.config_flags.NAT_IS_INSIDE
1207         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1208                                           sw_if_index=self.pg0.sw_if_index)
1209         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1210                                           sw_if_index=self.pg1.sw_if_index)
1211         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.vrf1_nat_addr,
1212                                                 end_addr=self.vrf1_nat_addr,
1213                                                 vrf_id=self.vrf1_id, is_add=1)
1214         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1215                                           sw_if_index=self.pg2.sw_if_index)
1216
1217         # Add global prefix
1218         global_pref64 = "2001:db8::"
1219         global_pref64_len = 32
1220         global_pref64_str = "{}/{}".format(global_pref64, global_pref64_len)
1221         self.vapi.nat64_add_del_prefix(prefix=global_pref64_str, vrf_id=0,
1222                                        is_add=1)
1223
1224         prefix = self.vapi.nat64_prefix_dump()
1225         self.assertEqual(len(prefix), 1)
1226         self.assertEqual(str(prefix[0].prefix), global_pref64_str)
1227         self.assertEqual(prefix[0].vrf_id, 0)
1228
1229         # Add tenant specific prefix
1230         vrf1_pref64 = "2001:db8:122:300::"
1231         vrf1_pref64_len = 56
1232         vrf1_pref64_str = "{}/{}".format(vrf1_pref64, vrf1_pref64_len)
1233         self.vapi.nat64_add_del_prefix(prefix=vrf1_pref64_str,
1234                                        vrf_id=self.vrf1_id, is_add=1)
1235
1236         prefix = self.vapi.nat64_prefix_dump()
1237         self.assertEqual(len(prefix), 2)
1238
1239         # Global prefix
1240         pkts = self.create_stream_in_ip6(self.pg0,
1241                                          self.pg1,
1242                                          pref=global_pref64,
1243                                          plen=global_pref64_len)
1244         self.pg0.add_stream(pkts)
1245         self.pg_enable_capture(self.pg_interfaces)
1246         self.pg_start()
1247         capture = self.pg1.get_capture(len(pkts))
1248         self.verify_capture_out(capture, nat_ip=self.nat_addr,
1249                                 dst_ip=self.pg1.remote_ip4)
1250
1251         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1252         self.pg1.add_stream(pkts)
1253         self.pg_enable_capture(self.pg_interfaces)
1254         self.pg_start()
1255         capture = self.pg0.get_capture(len(pkts))
1256         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
1257                                   global_pref64,
1258                                   global_pref64_len)
1259         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
1260
1261         # Tenant specific prefix
1262         pkts = self.create_stream_in_ip6(self.pg2,
1263                                          self.pg1,
1264                                          pref=vrf1_pref64,
1265                                          plen=vrf1_pref64_len)
1266         self.pg2.add_stream(pkts)
1267         self.pg_enable_capture(self.pg_interfaces)
1268         self.pg_start()
1269         capture = self.pg1.get_capture(len(pkts))
1270         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
1271                                 dst_ip=self.pg1.remote_ip4)
1272
1273         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
1274         self.pg1.add_stream(pkts)
1275         self.pg_enable_capture(self.pg_interfaces)
1276         self.pg_start()
1277         capture = self.pg2.get_capture(len(pkts))
1278         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
1279                                   vrf1_pref64,
1280                                   vrf1_pref64_len)
1281         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
1282
1283     def test_unknown_proto(self):
1284         """ NAT64 translate packet with unknown protocol """
1285
1286         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1287                                                 end_addr=self.nat_addr,
1288                                                 vrf_id=0xFFFFFFFF,
1289                                                 is_add=1)
1290         flags = self.config_flags.NAT_IS_INSIDE
1291         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1292                                           sw_if_index=self.pg0.sw_if_index)
1293         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1294                                           sw_if_index=self.pg1.sw_if_index)
1295         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1296
1297         # in2out
1298         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1299              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
1300              TCP(sport=self.tcp_port_in, dport=20))
1301         self.pg0.add_stream(p)
1302         self.pg_enable_capture(self.pg_interfaces)
1303         self.pg_start()
1304         p = self.pg1.get_capture(1)
1305
1306         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1307              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
1308              GRE() /
1309              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
1310              TCP(sport=1234, dport=1234))
1311         self.pg0.add_stream(p)
1312         self.pg_enable_capture(self.pg_interfaces)
1313         self.pg_start()
1314         p = self.pg1.get_capture(1)
1315         packet = p[0]
1316         try:
1317             self.assertEqual(packet[IP].src, self.nat_addr)
1318             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1319             self.assertEqual(packet.haslayer(GRE), 1)
1320             self.assert_packet_checksums_valid(packet)
1321         except:
1322             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1323             raise
1324
1325         # out2in
1326         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1327              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1328              GRE() /
1329              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
1330              TCP(sport=1234, dport=1234))
1331         self.pg1.add_stream(p)
1332         self.pg_enable_capture(self.pg_interfaces)
1333         self.pg_start()
1334         p = self.pg0.get_capture(1)
1335         packet = p[0]
1336         try:
1337             self.assertEqual(packet[IPv6].src, remote_ip6)
1338             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1339             self.assertEqual(packet[IPv6].nh, 47)
1340         except:
1341             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1342             raise
1343
1344     def test_hairpinning_unknown_proto(self):
1345         """ NAT64 translate packet with unknown protocol - hairpinning """
1346
1347         client = self.pg0.remote_hosts[0]
1348         server = self.pg0.remote_hosts[1]
1349         server_tcp_in_port = 22
1350         server_tcp_out_port = 4022
1351         client_tcp_in_port = 1234
1352         client_tcp_out_port = 1235
1353         server_nat_ip = "10.0.0.100"
1354         client_nat_ip = "10.0.0.110"
1355         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
1356         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
1357
1358         self.vapi.nat64_add_del_pool_addr_range(start_addr=server_nat_ip,
1359                                                 end_addr=client_nat_ip,
1360                                                 vrf_id=0xFFFFFFFF,
1361                                                 is_add=1)
1362         flags = self.config_flags.NAT_IS_INSIDE
1363         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1364                                           sw_if_index=self.pg0.sw_if_index)
1365         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1366                                           sw_if_index=self.pg1.sw_if_index)
1367
1368         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1369                                            o_addr=server_nat_ip,
1370                                            i_port=server_tcp_in_port,
1371                                            o_port=server_tcp_out_port,
1372                                            proto=IP_PROTOS.tcp, vrf_id=0,
1373                                            is_add=1)
1374
1375         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1376                                            o_addr=server_nat_ip, i_port=0,
1377                                            o_port=0,
1378                                            proto=IP_PROTOS.gre, vrf_id=0,
1379                                            is_add=1)
1380
1381         self.vapi.nat64_add_del_static_bib(i_addr=client.ip6n,
1382                                            o_addr=client_nat_ip,
1383                                            i_port=client_tcp_in_port,
1384                                            o_port=client_tcp_out_port,
1385                                            proto=IP_PROTOS.tcp, vrf_id=0,
1386                                            is_add=1)
1387
1388         # client to server
1389         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1390              IPv6(src=client.ip6, dst=server_nat_ip6) /
1391              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
1392         self.pg0.add_stream(p)
1393         self.pg_enable_capture(self.pg_interfaces)
1394         self.pg_start()
1395         p = self.pg0.get_capture(1)
1396
1397         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1398              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
1399              GRE() /
1400              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
1401              TCP(sport=1234, dport=1234))
1402         self.pg0.add_stream(p)
1403         self.pg_enable_capture(self.pg_interfaces)
1404         self.pg_start()
1405         p = self.pg0.get_capture(1)
1406         packet = p[0]
1407         try:
1408             self.assertEqual(packet[IPv6].src, client_nat_ip6)
1409             self.assertEqual(packet[IPv6].dst, server.ip6)
1410             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1411         except:
1412             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1413             raise
1414
1415         # server to client
1416         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1417              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
1418              GRE() /
1419              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
1420              TCP(sport=1234, dport=1234))
1421         self.pg0.add_stream(p)
1422         self.pg_enable_capture(self.pg_interfaces)
1423         self.pg_start()
1424         p = self.pg0.get_capture(1)
1425         packet = p[0]
1426         try:
1427             self.assertEqual(packet[IPv6].src, server_nat_ip6)
1428             self.assertEqual(packet[IPv6].dst, client.ip6)
1429             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1430         except:
1431             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1432             raise
1433
1434     def test_one_armed_nat64(self):
1435         """ One armed NAT64 """
1436         external_port = 0
1437         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
1438                                            '64:ff9b::',
1439                                            96)
1440
1441         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1442                                                 end_addr=self.nat_addr,
1443                                                 vrf_id=0xFFFFFFFF,
1444                                                 is_add=1)
1445         flags = self.config_flags.NAT_IS_INSIDE
1446         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1447                                           sw_if_index=self.pg3.sw_if_index)
1448         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1449                                           sw_if_index=self.pg3.sw_if_index)
1450
1451         # in2out
1452         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
1453              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
1454              TCP(sport=12345, dport=80))
1455         self.pg3.add_stream(p)
1456         self.pg_enable_capture(self.pg_interfaces)
1457         self.pg_start()
1458         capture = self.pg3.get_capture(1)
1459         p = capture[0]
1460         try:
1461             ip = p[IP]
1462             tcp = p[TCP]
1463             self.assertEqual(ip.src, self.nat_addr)
1464             self.assertEqual(ip.dst, self.pg3.remote_ip4)
1465             self.assertNotEqual(tcp.sport, 12345)
1466             external_port = tcp.sport
1467             self.assertEqual(tcp.dport, 80)
1468             self.assert_packet_checksums_valid(p)
1469         except:
1470             self.logger.error(ppp("Unexpected or invalid packet:", p))
1471             raise
1472
1473         # out2in
1474         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
1475              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
1476              TCP(sport=80, dport=external_port))
1477         self.pg3.add_stream(p)
1478         self.pg_enable_capture(self.pg_interfaces)
1479         self.pg_start()
1480         capture = self.pg3.get_capture(1)
1481         p = capture[0]
1482         try:
1483             ip = p[IPv6]
1484             tcp = p[TCP]
1485             self.assertEqual(ip.src, remote_host_ip6)
1486             self.assertEqual(ip.dst, self.pg3.remote_ip6)
1487             self.assertEqual(tcp.sport, 80)
1488             self.assertEqual(tcp.dport, 12345)
1489             self.assert_packet_checksums_valid(p)
1490         except:
1491             self.logger.error(ppp("Unexpected or invalid packet:", p))
1492             raise
1493
1494     def test_frag_in_order(self):
1495         """ NAT64 translate fragments arriving in order """
1496         self.tcp_port_in = random.randint(1025, 65535)
1497
1498         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1499                                                 end_addr=self.nat_addr,
1500                                                 vrf_id=0xFFFFFFFF,
1501                                                 is_add=1)
1502         flags = self.config_flags.NAT_IS_INSIDE
1503         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1504                                           sw_if_index=self.pg0.sw_if_index)
1505         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1506                                           sw_if_index=self.pg1.sw_if_index)
1507
1508         # in2out
1509         data = b'a' * 200
1510         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
1511                                            self.tcp_port_in, 20, data)
1512         self.pg0.add_stream(pkts)
1513         self.pg_enable_capture(self.pg_interfaces)
1514         self.pg_start()
1515         frags = self.pg1.get_capture(len(pkts))
1516         p = self.reass_frags_and_verify(frags,
1517                                         self.nat_addr,
1518                                         self.pg1.remote_ip4)
1519         self.assertEqual(p[TCP].dport, 20)
1520         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1521         self.tcp_port_out = p[TCP].sport
1522         self.assertEqual(data, p[Raw].load)
1523
1524         # out2in
1525         data = b"A" * 4 + b"b" * 16 + b"C" * 3
1526         pkts = self.create_stream_frag(self.pg1,
1527                                        self.nat_addr,
1528                                        20,
1529                                        self.tcp_port_out,
1530                                        data)
1531         self.pg1.add_stream(pkts)
1532         self.pg_enable_capture(self.pg_interfaces)
1533         self.pg_start()
1534         frags = self.pg0.get_capture(len(pkts))
1535         self.logger.debug(ppc("Captured:", frags))
1536         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1537         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1538         self.assertEqual(p[TCP].sport, 20)
1539         self.assertEqual(p[TCP].dport, self.tcp_port_in)
1540         self.assertEqual(data, p[Raw].load)
1541
1542     def test_reass_hairpinning(self):
1543         """ NAT64 fragments hairpinning """
1544         data = b'a' * 200
1545         server = self.pg0.remote_hosts[1]
1546         server_in_port = random.randint(1025, 65535)
1547         server_out_port = random.randint(1025, 65535)
1548         client_in_port = random.randint(1025, 65535)
1549         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
1550         nat_addr_ip6 = ip.src
1551
1552         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1553                                                 end_addr=self.nat_addr,
1554                                                 vrf_id=0xFFFFFFFF,
1555                                                 is_add=1)
1556         flags = self.config_flags.NAT_IS_INSIDE
1557         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1558                                           sw_if_index=self.pg0.sw_if_index)
1559         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1560                                           sw_if_index=self.pg1.sw_if_index)
1561
1562         # add static BIB entry for server
1563         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1564                                            o_addr=self.nat_addr,
1565                                            i_port=server_in_port,
1566                                            o_port=server_out_port,
1567                                            proto=IP_PROTOS.tcp, vrf_id=0,
1568                                            is_add=1)
1569
1570         # send packet from host to server
1571         pkts = self.create_stream_frag_ip6(self.pg0,
1572                                            self.nat_addr,
1573                                            client_in_port,
1574                                            server_out_port,
1575                                            data)
1576         self.pg0.add_stream(pkts)
1577         self.pg_enable_capture(self.pg_interfaces)
1578         self.pg_start()
1579         frags = self.pg0.get_capture(len(pkts))
1580         self.logger.debug(ppc("Captured:", frags))
1581         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
1582         self.assertNotEqual(p[TCP].sport, client_in_port)
1583         self.assertEqual(p[TCP].dport, server_in_port)
1584         self.assertEqual(data, p[Raw].load)
1585
1586     def test_frag_out_of_order(self):
1587         """ NAT64 translate fragments arriving out of order """
1588         self.tcp_port_in = random.randint(1025, 65535)
1589
1590         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1591                                                 end_addr=self.nat_addr,
1592                                                 vrf_id=0xFFFFFFFF,
1593                                                 is_add=1)
1594         flags = self.config_flags.NAT_IS_INSIDE
1595         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1596                                           sw_if_index=self.pg0.sw_if_index)
1597         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1598                                           sw_if_index=self.pg1.sw_if_index)
1599
1600         # in2out
1601         data = b'a' * 200
1602         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
1603                                            self.tcp_port_in, 20, data)
1604         pkts.reverse()
1605         self.pg0.add_stream(pkts)
1606         self.pg_enable_capture(self.pg_interfaces)
1607         self.pg_start()
1608         frags = self.pg1.get_capture(len(pkts))
1609         p = self.reass_frags_and_verify(frags,
1610                                         self.nat_addr,
1611                                         self.pg1.remote_ip4)
1612         self.assertEqual(p[TCP].dport, 20)
1613         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1614         self.tcp_port_out = p[TCP].sport
1615         self.assertEqual(data, p[Raw].load)
1616
1617         # out2in
1618         data = b"A" * 4 + b"B" * 16 + b"C" * 3
1619         pkts = self.create_stream_frag(self.pg1,
1620                                        self.nat_addr,
1621                                        20,
1622                                        self.tcp_port_out,
1623                                        data)
1624         pkts.reverse()
1625         self.pg1.add_stream(pkts)
1626         self.pg_enable_capture(self.pg_interfaces)
1627         self.pg_start()
1628         frags = self.pg0.get_capture(len(pkts))
1629         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1630         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1631         self.assertEqual(p[TCP].sport, 20)
1632         self.assertEqual(p[TCP].dport, self.tcp_port_in)
1633         self.assertEqual(data, p[Raw].load)
1634
1635     def test_interface_addr(self):
1636         """ Acquire NAT64 pool addresses from interface """
1637         self.vapi.nat64_add_del_interface_addr(
1638             is_add=1,
1639             sw_if_index=self.pg4.sw_if_index)
1640
1641         # no address in NAT64 pool
1642         addresses = self.vapi.nat44_address_dump()
1643         self.assertEqual(0, len(addresses))
1644
1645         # configure interface address and check NAT64 address pool
1646         self.pg4.config_ip4()
1647         addresses = self.vapi.nat64_pool_addr_dump()
1648         self.assertEqual(len(addresses), 1)
1649
1650         self.assertEqual(str(addresses[0].address),
1651                          self.pg4.local_ip4)
1652
1653         # remove interface address and check NAT64 address pool
1654         self.pg4.unconfig_ip4()
1655         addresses = self.vapi.nat64_pool_addr_dump()
1656         self.assertEqual(0, len(addresses))
1657
1658     # TODO: ipfix needs to be separated from NAT base plugin
1659     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1660     def test_ipfix_max_bibs_sessions(self):
1661         """ IPFIX logging maximum session and BIB entries exceeded """
1662         max_bibs = 1280
1663         max_sessions = 2560
1664         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1665                                            '64:ff9b::',
1666                                            96)
1667
1668         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1669                                                 end_addr=self.nat_addr,
1670                                                 vrf_id=0xFFFFFFFF,
1671                                                 is_add=1)
1672         flags = self.config_flags.NAT_IS_INSIDE
1673         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1674                                           sw_if_index=self.pg0.sw_if_index)
1675         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1676                                           sw_if_index=self.pg1.sw_if_index)
1677
1678         pkts = []
1679         src = ""
1680         for i in range(0, max_bibs):
1681             src = "fd01:aa::%x" % (i)
1682             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1683                  IPv6(src=src, dst=remote_host_ip6) /
1684                  TCP(sport=12345, dport=80))
1685             pkts.append(p)
1686             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1687                  IPv6(src=src, dst=remote_host_ip6) /
1688                  TCP(sport=12345, dport=22))
1689             pkts.append(p)
1690         self.pg0.add_stream(pkts)
1691         self.pg_enable_capture(self.pg_interfaces)
1692         self.pg_start()
1693         self.pg1.get_capture(max_sessions)
1694
1695         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
1696                                      src_address=self.pg3.local_ip4,
1697                                      path_mtu=512,
1698                                      template_interval=10)
1699         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1700                                            src_port=self.ipfix_src_port,
1701                                            enable=1)
1702
1703         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1704              IPv6(src=src, dst=remote_host_ip6) /
1705              TCP(sport=12345, dport=25))
1706         self.pg0.add_stream(p)
1707         self.pg_enable_capture(self.pg_interfaces)
1708         self.pg_start()
1709         self.pg1.assert_nothing_captured()
1710         sleep(1)
1711         self.vapi.ipfix_flush()
1712         capture = self.pg3.get_capture(7)
1713         ipfix = IPFIXDecoder()
1714         # first load template
1715         for p in capture:
1716             self.assertTrue(p.haslayer(IPFIX))
1717             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1718             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1719             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1720             self.assertEqual(p[UDP].dport, 4739)
1721             self.assertEqual(p[IPFIX].observationDomainID,
1722                              self.ipfix_domain_id)
1723             if p.haslayer(Template):
1724                 ipfix.add_template(p.getlayer(Template))
1725         # verify events in data set
1726         for p in capture:
1727             if p.haslayer(Data):
1728                 data = ipfix.decode_data_set(p.getlayer(Set))
1729                 self.verify_ipfix_max_sessions(data, max_sessions)
1730
1731         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1732              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1733              TCP(sport=12345, dport=80))
1734         self.pg0.add_stream(p)
1735         self.pg_enable_capture(self.pg_interfaces)
1736         self.pg_start()
1737         self.pg1.assert_nothing_captured()
1738         sleep(1)
1739         self.vapi.ipfix_flush()
1740         capture = self.pg3.get_capture(1)
1741         # verify events in data set
1742         for p in capture:
1743             self.assertTrue(p.haslayer(IPFIX))
1744             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1745             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1746             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1747             self.assertEqual(p[UDP].dport, 4739)
1748             self.assertEqual(p[IPFIX].observationDomainID,
1749                              self.ipfix_domain_id)
1750             if p.haslayer(Data):
1751                 data = ipfix.decode_data_set(p.getlayer(Set))
1752                 self.verify_ipfix_max_bibs(data, max_bibs)
1753
1754     # TODO: ipfix needs to be separated from NAT base plugin
1755     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1756     def test_ipfix_bib_ses(self):
1757         """ IPFIX logging NAT64 BIB/session create and delete events """
1758         self.tcp_port_in = random.randint(1025, 65535)
1759         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1760                                            '64:ff9b::',
1761                                            96)
1762
1763         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1764                                                 end_addr=self.nat_addr,
1765                                                 vrf_id=0xFFFFFFFF,
1766                                                 is_add=1)
1767         flags = self.config_flags.NAT_IS_INSIDE
1768         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1769                                           sw_if_index=self.pg0.sw_if_index)
1770         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1771                                           sw_if_index=self.pg1.sw_if_index)
1772         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
1773                                      src_address=self.pg3.local_ip4,
1774                                      path_mtu=512,
1775                                      template_interval=10)
1776         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1777                                            src_port=self.ipfix_src_port,
1778                                            enable=1)
1779
1780         # Create
1781         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1782              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1783              TCP(sport=self.tcp_port_in, dport=25))
1784         self.pg0.add_stream(p)
1785         self.pg_enable_capture(self.pg_interfaces)
1786         self.pg_start()
1787         p = self.pg1.get_capture(1)
1788         self.tcp_port_out = p[0][TCP].sport
1789         self.vapi.ipfix_flush()
1790         capture = self.pg3.get_capture(8)
1791         ipfix = IPFIXDecoder()
1792         # first load template
1793         for p in capture:
1794             self.assertTrue(p.haslayer(IPFIX))
1795             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1796             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1797             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1798             self.assertEqual(p[UDP].dport, 4739)
1799             self.assertEqual(p[IPFIX].observationDomainID,
1800                              self.ipfix_domain_id)
1801             if p.haslayer(Template):
1802                 ipfix.add_template(p.getlayer(Template))
1803         # verify events in data set
1804         for p in capture:
1805             if p.haslayer(Data):
1806                 data = ipfix.decode_data_set(p.getlayer(Set))
1807                 if scapy.compat.orb(data[0][230]) == 10:
1808                     self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6)
1809                 elif scapy.compat.orb(data[0][230]) == 6:
1810                     self.verify_ipfix_nat64_ses(data,
1811                                                 1,
1812                                                 self.pg0.remote_ip6,
1813                                                 self.pg1.remote_ip4,
1814                                                 25)
1815                 else:
1816                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
1817
1818         # Delete
1819         self.pg_enable_capture(self.pg_interfaces)
1820         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1821                                                 end_addr=self.nat_addr,
1822                                                 vrf_id=0xFFFFFFFF,
1823                                                 is_add=0)
1824         self.vapi.ipfix_flush()
1825         capture = self.pg3.get_capture(2)
1826         # verify events in data set
1827         for p in capture:
1828             self.assertTrue(p.haslayer(IPFIX))
1829             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1830             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1831             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1832             self.assertEqual(p[UDP].dport, 4739)
1833             self.assertEqual(p[IPFIX].observationDomainID,
1834                              self.ipfix_domain_id)
1835             if p.haslayer(Data):
1836                 data = ipfix.decode_data_set(p.getlayer(Set))
1837                 if scapy.compat.orb(data[0][230]) == 11:
1838                     self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6)
1839                 elif scapy.compat.orb(data[0][230]) == 7:
1840                     self.verify_ipfix_nat64_ses(data,
1841                                                 0,
1842                                                 self.pg0.remote_ip6,
1843                                                 self.pg1.remote_ip4,
1844                                                 25)
1845                 else:
1846                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
1847
1848     def test_syslog_sess(self):
1849         """ Test syslog session creation and deletion """
1850         self.tcp_port_in = random.randint(1025, 65535)
1851         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1852                                            '64:ff9b::',
1853                                            96)
1854
1855         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1856                                                 end_addr=self.nat_addr,
1857                                                 vrf_id=0xFFFFFFFF,
1858                                                 is_add=1)
1859         flags = self.config_flags.NAT_IS_INSIDE
1860         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1861                                           sw_if_index=self.pg0.sw_if_index)
1862         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1863                                           sw_if_index=self.pg1.sw_if_index)
1864         self.vapi.syslog_set_filter(
1865             self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
1866         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
1867
1868         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1869              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1870              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1871         self.pg0.add_stream(p)
1872         self.pg_enable_capture(self.pg_interfaces)
1873         self.pg_start()
1874         p = self.pg1.get_capture(1)
1875         self.tcp_port_out = p[0][TCP].sport
1876         capture = self.pg3.get_capture(1)
1877         self.verify_syslog_sess(capture[0][Raw].load, is_ip6=True)
1878
1879         self.pg_enable_capture(self.pg_interfaces)
1880         self.pg_start()
1881         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1882                                                 end_addr=self.nat_addr,
1883                                                 vrf_id=0xFFFFFFFF,
1884                                                 is_add=0)
1885         capture = self.pg3.get_capture(1)
1886         self.verify_syslog_sess(capture[0][Raw].load, False, True)
1887
1888     def nat64_get_ses_num(self):
1889         """
1890         Return number of active NAT64 sessions.
1891         """
1892         st = self.vapi.nat64_st_dump(proto=255)
1893         return len(st)
1894
1895     def clear_nat64(self):
1896         """
1897         Clear NAT64 configuration.
1898         """
1899         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1900                                            src_port=self.ipfix_src_port,
1901                                            enable=0)
1902         self.ipfix_src_port = 4739
1903         self.ipfix_domain_id = 1
1904
1905         self.vapi.syslog_set_filter(
1906             self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
1907
1908         self.vapi.nat64_set_timeouts(udp=300, tcp_established=7440,
1909                                      tcp_transitory=240, icmp=60)
1910
1911         interfaces = self.vapi.nat64_interface_dump()
1912         for intf in interfaces:
1913             self.vapi.nat64_add_del_interface(is_add=0, flags=intf.flags,
1914                                               sw_if_index=intf.sw_if_index)
1915
1916         bib = self.vapi.nat64_bib_dump(proto=255)
1917         for bibe in bib:
1918             if bibe.flags & self.config_flags.NAT_IS_STATIC:
1919                 self.vapi.nat64_add_del_static_bib(i_addr=bibe.i_addr,
1920                                                    o_addr=bibe.o_addr,
1921                                                    i_port=bibe.i_port,
1922                                                    o_port=bibe.o_port,
1923                                                    proto=bibe.proto,
1924                                                    vrf_id=bibe.vrf_id,
1925                                                    is_add=0)
1926
1927         adresses = self.vapi.nat64_pool_addr_dump()
1928         for addr in adresses:
1929             self.vapi.nat64_add_del_pool_addr_range(start_addr=addr.address,
1930                                                     end_addr=addr.address,
1931                                                     vrf_id=addr.vrf_id,
1932                                                     is_add=0)
1933
1934         prefixes = self.vapi.nat64_prefix_dump()
1935         for prefix in prefixes:
1936             self.vapi.nat64_add_del_prefix(prefix=str(prefix.prefix),
1937                                            vrf_id=prefix.vrf_id, is_add=0)
1938
1939         bibs = self.statistics.get_counter('/nat64/total-bibs')
1940         self.assertEqual(bibs[0][0], 0)
1941         sessions = self.statistics.get_counter('/nat64/total-sessions')
1942         self.assertEqual(sessions[0][0], 0)
1943
1944
1945 if __name__ == '__main__':
1946     unittest.main(testRunner=VppTestRunner)