nat: ipfix logging separation & refactor
[vpp.git] / src / plugins / nat / test / test_nat64.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import random
5 import socket
6 import struct
7 import unittest
8 from io import BytesIO
9 from time import sleep
10
11 import scapy.compat
12 from framework import VppTestCase, VppTestRunner, running_extended_tests
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from scapy.data import IP_PROTOS
15 from scapy.layers.inet import IP, TCP, UDP, ICMP
16 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
17 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
18 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
19     ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
20 from scapy.layers.l2 import Ether, GRE
21 from scapy.packet import Raw
22 from syslog_rfc5424_parser import SyslogMessage, ParseError
23 from syslog_rfc5424_parser.constants import SyslogSeverity
24 from util import ppc, ppp
25 from vpp_papi import VppEnum
26
27
28 class TestNAT64(VppTestCase):
29     """ NAT64 Test Cases """
30
31     @property
32     def SYSLOG_SEVERITY(self):
33         return VppEnum.vl_api_syslog_severity_t
34
35     @property
36     def config_flags(self):
37         return VppEnum.vl_api_nat_config_flags_t
38
39     @classmethod
40     def setUpClass(cls):
41         super(TestNAT64, cls).setUpClass()
42
43         cls.tcp_port_in = 6303
44         cls.tcp_port_out = 6303
45         cls.udp_port_in = 6304
46         cls.udp_port_out = 6304
47         cls.icmp_id_in = 6305
48         cls.icmp_id_out = 6305
49         cls.tcp_external_port = 80
50         cls.nat_addr = '10.0.0.3'
51         cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
52         cls.vrf1_id = 10
53         cls.vrf1_nat_addr = '10.0.10.3'
54         cls.ipfix_src_port = 4739
55         cls.ipfix_domain_id = 1
56
57         cls.create_pg_interfaces(range(6))
58         cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
59         cls.ip6_interfaces.append(cls.pg_interfaces[2])
60         cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
61
62         cls.vapi.ip_table_add_del(is_add=1,
63                                   table={'table_id': cls.vrf1_id,
64                                          'is_ip6': 1})
65
66         cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
67
68         cls.pg0.generate_remote_hosts(2)
69
70         for i in cls.ip6_interfaces:
71             i.admin_up()
72             i.config_ip6()
73             i.configure_ipv6_neighbors()
74
75         for i in cls.ip4_interfaces:
76             i.admin_up()
77             i.config_ip4()
78             i.resolve_arp()
79
80         cls.pg3.admin_up()
81         cls.pg3.config_ip4()
82         cls.pg3.resolve_arp()
83         cls.pg3.config_ip6()
84         cls.pg3.configure_ipv6_neighbors()
85
86         cls.pg5.admin_up()
87         cls.pg5.config_ip6()
88
89     @classmethod
90     def tearDownClass(cls):
91         super(TestNAT64, cls).tearDownClass()
92
93     def setUp(self):
94         super(TestNAT64, self).setUp()
95         self.vapi.nat64_plugin_enable_disable(enable=1,
96                                               bib_buckets=128, st_buckets=256)
97
98     def tearDown(self):
99         super(TestNAT64, self).tearDown()
100         if not self.vpp_dead:
101             self.vapi.nat64_plugin_enable_disable(enable=0)
102
103     def show_commands_at_teardown(self):
104         self.logger.info(self.vapi.cli("show nat64 pool"))
105         self.logger.info(self.vapi.cli("show nat64 interfaces"))
106         self.logger.info(self.vapi.cli("show nat64 prefix"))
107         self.logger.info(self.vapi.cli("show nat64 bib all"))
108         self.logger.info(self.vapi.cli("show nat64 session table all"))
109
110     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
111         """
112         Create IPv6 packet stream for inside network
113
114         :param in_if: Inside interface
115         :param out_if: Outside interface
116         :param ttl: Hop Limit of generated packets
117         :param pref: NAT64 prefix
118         :param plen: NAT64 prefix length
119         """
120         pkts = []
121         if pref is None:
122             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
123         else:
124             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
125
126         # TCP
127         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
128              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
129              TCP(sport=self.tcp_port_in, dport=20))
130         pkts.append(p)
131
132         # UDP
133         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
134              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
135              UDP(sport=self.udp_port_in, dport=20))
136         pkts.append(p)
137
138         # ICMP
139         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
140              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
141              ICMPv6EchoRequest(id=self.icmp_id_in))
142         pkts.append(p)
143
144         return pkts
145
146     def create_stream_out(self, out_if, dst_ip=None, ttl=64,
147                           use_inside_ports=False):
148         """
149         Create packet stream for outside network
150
151         :param out_if: Outside interface
152         :param dst_ip: Destination IP address (Default use global NAT address)
153         :param ttl: TTL of generated packets
154         :param use_inside_ports: Use inside NAT ports as destination ports
155                instead of outside ports
156         """
157         if dst_ip is None:
158             dst_ip = self.nat_addr
159         if not use_inside_ports:
160             tcp_port = self.tcp_port_out
161             udp_port = self.udp_port_out
162             icmp_id = self.icmp_id_out
163         else:
164             tcp_port = self.tcp_port_in
165             udp_port = self.udp_port_in
166             icmp_id = self.icmp_id_in
167         pkts = []
168         # TCP
169         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
170              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
171              TCP(dport=tcp_port, sport=20))
172         pkts.extend([p, p])
173
174         # UDP
175         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
176              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
177              UDP(dport=udp_port, sport=20))
178         pkts.append(p)
179
180         # ICMP
181         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
182              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
183              ICMP(id=icmp_id, type='echo-reply'))
184         pkts.append(p)
185
186         return pkts
187
188     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
189                            dst_ip=None, is_ip6=False, ignore_port=False):
190         """
191         Verify captured packets on outside network
192
193         :param capture: Captured packets
194         :param nat_ip: Translated IP address (Default use global NAT address)
195         :param same_port: Source port number is not translated (Default False)
196         :param dst_ip: Destination IP address (Default do not verify)
197         :param is_ip6: If L3 protocol is IPv6 (Default False)
198         """
199         if is_ip6:
200             IP46 = IPv6
201             ICMP46 = ICMPv6EchoRequest
202         else:
203             IP46 = IP
204             ICMP46 = ICMP
205         if nat_ip is None:
206             nat_ip = self.nat_addr
207         for packet in capture:
208             try:
209                 if not is_ip6:
210                     self.assert_packet_checksums_valid(packet)
211                 self.assertEqual(packet[IP46].src, nat_ip)
212                 if dst_ip is not None:
213                     self.assertEqual(packet[IP46].dst, dst_ip)
214                 if packet.haslayer(TCP):
215                     if not ignore_port:
216                         if same_port:
217                             self.assertEqual(
218                                 packet[TCP].sport, self.tcp_port_in)
219                         else:
220                             self.assertNotEqual(
221                                 packet[TCP].sport, self.tcp_port_in)
222                     self.tcp_port_out = packet[TCP].sport
223                     self.assert_packet_checksums_valid(packet)
224                 elif packet.haslayer(UDP):
225                     if not ignore_port:
226                         if same_port:
227                             self.assertEqual(
228                                 packet[UDP].sport, self.udp_port_in)
229                         else:
230                             self.assertNotEqual(
231                                 packet[UDP].sport, self.udp_port_in)
232                     self.udp_port_out = packet[UDP].sport
233                 else:
234                     if not ignore_port:
235                         if same_port:
236                             self.assertEqual(
237                                 packet[ICMP46].id, self.icmp_id_in)
238                         else:
239                             self.assertNotEqual(
240                                 packet[ICMP46].id, self.icmp_id_in)
241                     self.icmp_id_out = packet[ICMP46].id
242                     self.assert_packet_checksums_valid(packet)
243             except:
244                 self.logger.error(ppp("Unexpected or invalid packet "
245                                       "(outside network):", packet))
246                 raise
247
248     def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
249         """
250         Verify captured IPv6 packets on inside network
251
252         :param capture: Captured packets
253         :param src_ip: Source IP
254         :param dst_ip: Destination IP address
255         """
256         for packet in capture:
257             try:
258                 self.assertEqual(packet[IPv6].src, src_ip)
259                 self.assertEqual(packet[IPv6].dst, dst_ip)
260                 self.assert_packet_checksums_valid(packet)
261                 if packet.haslayer(TCP):
262                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
263                 elif packet.haslayer(UDP):
264                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
265                 else:
266                     self.assertEqual(packet[ICMPv6EchoReply].id,
267                                      self.icmp_id_in)
268             except:
269                 self.logger.error(ppp("Unexpected or invalid packet "
270                                       "(inside network):", packet))
271                 raise
272
273     def create_stream_frag(self, src_if, dst, sport, dport, data,
274                            proto=IP_PROTOS.tcp, echo_reply=False):
275         """
276         Create fragmented packet stream
277
278         :param src_if: Source interface
279         :param dst: Destination IPv4 address
280         :param sport: Source port
281         :param dport: Destination port
282         :param data: Payload data
283         :param proto: protocol (TCP, UDP, ICMP)
284         :param echo_reply: use echo_reply if protocol is ICMP
285         :returns: Fragments
286         """
287         if proto == IP_PROTOS.tcp:
288             p = (IP(src=src_if.remote_ip4, dst=dst) /
289                  TCP(sport=sport, dport=dport) /
290                  Raw(data))
291             p = p.__class__(scapy.compat.raw(p))
292             chksum = p[TCP].chksum
293             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
294         elif proto == IP_PROTOS.udp:
295             proto_header = UDP(sport=sport, dport=dport)
296         elif proto == IP_PROTOS.icmp:
297             if not echo_reply:
298                 proto_header = ICMP(id=sport, type='echo-request')
299             else:
300                 proto_header = ICMP(id=sport, type='echo-reply')
301         else:
302             raise Exception("Unsupported protocol")
303         id = random.randint(0, 65535)
304         pkts = []
305         if proto == IP_PROTOS.tcp:
306             raw = Raw(data[0:4])
307         else:
308             raw = Raw(data[0:16])
309         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
310              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
311              proto_header /
312              raw)
313         pkts.append(p)
314         if proto == IP_PROTOS.tcp:
315             raw = Raw(data[4:20])
316         else:
317             raw = Raw(data[16:32])
318         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
319              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
320                 proto=proto) /
321              raw)
322         pkts.append(p)
323         if proto == IP_PROTOS.tcp:
324             raw = Raw(data[20:])
325         else:
326             raw = Raw(data[32:])
327         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
328              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
329                 id=id) /
330              raw)
331         pkts.append(p)
332         return pkts
333
334     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
335                                pref=None, plen=0, frag_size=128):
336         """
337         Create fragmented packet stream
338
339         :param src_if: Source interface
340         :param dst: Destination IPv4 address
341         :param sport: Source TCP port
342         :param dport: Destination TCP port
343         :param data: Payload data
344         :param pref: NAT64 prefix
345         :param plen: NAT64 prefix length
346         :param fragsize: size of fragments
347         :returns: Fragments
348         """
349         if pref is None:
350             dst_ip6 = ''.join(['64:ff9b::', dst])
351         else:
352             dst_ip6 = self.compose_ip6(dst, pref, plen)
353
354         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
355              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
356              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
357              TCP(sport=sport, dport=dport) /
358              Raw(data))
359
360         return fragment6(p, frag_size)
361
362     def reass_frags_and_verify(self, frags, src, dst):
363         """
364         Reassemble and verify fragmented packet
365
366         :param frags: Captured fragments
367         :param src: Source IPv4 address to verify
368         :param dst: Destination IPv4 address to verify
369
370         :returns: Reassembled IPv4 packet
371         """
372         buffer = BytesIO()
373         for p in frags:
374             self.assertEqual(p[IP].src, src)
375             self.assertEqual(p[IP].dst, dst)
376             self.assert_ip_checksum_valid(p)
377             buffer.seek(p[IP].frag * 8)
378             buffer.write(bytes(p[IP].payload))
379         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
380                 proto=frags[0][IP].proto)
381         if ip.proto == IP_PROTOS.tcp:
382             p = (ip / TCP(buffer.getvalue()))
383             self.logger.debug(ppp("Reassembled:", p))
384             self.assert_tcp_checksum_valid(p)
385         elif ip.proto == IP_PROTOS.udp:
386             p = (ip / UDP(buffer.getvalue()[:8]) /
387                  Raw(buffer.getvalue()[8:]))
388         elif ip.proto == IP_PROTOS.icmp:
389             p = (ip / ICMP(buffer.getvalue()))
390         return p
391
392     def reass_frags_and_verify_ip6(self, frags, src, dst):
393         """
394         Reassemble and verify fragmented packet
395
396         :param frags: Captured fragments
397         :param src: Source IPv6 address to verify
398         :param dst: Destination IPv6 address to verify
399
400         :returns: Reassembled IPv6 packet
401         """
402         buffer = BytesIO()
403         for p in frags:
404             self.assertEqual(p[IPv6].src, src)
405             self.assertEqual(p[IPv6].dst, dst)
406             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
407             buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
408         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
409                   nh=frags[0][IPv6ExtHdrFragment].nh)
410         if ip.nh == IP_PROTOS.tcp:
411             p = (ip / TCP(buffer.getvalue()))
412         elif ip.nh == IP_PROTOS.udp:
413             p = (ip / UDP(buffer.getvalue()))
414         self.logger.debug(ppp("Reassembled:", p))
415         self.assert_packet_checksums_valid(p)
416         return p
417
418     def verify_ipfix_max_bibs(self, data, limit):
419         """
420         Verify IPFIX maximum BIB entries exceeded event
421
422         :param data: Decoded IPFIX data records
423         :param limit: Number of maximum BIB entries that can be created.
424         """
425         self.assertEqual(1, len(data))
426         record = data[0]
427         # natEvent
428         self.assertEqual(scapy.compat.orb(record[230]), 13)
429         # natQuotaExceededEvent
430         self.assertEqual(struct.pack("I", 2), record[466])
431         # maxBIBEntries
432         self.assertEqual(struct.pack("I", limit), record[472])
433
434     def verify_ipfix_bib(self, data, is_create, src_addr):
435         """
436         Verify IPFIX NAT64 BIB create and delete events
437
438         :param data: Decoded IPFIX data records
439         :param is_create: Create event if nonzero value otherwise delete event
440         :param src_addr: IPv6 source address
441         """
442         self.assertEqual(1, len(data))
443         record = data[0]
444         # natEvent
445         if is_create:
446             self.assertEqual(scapy.compat.orb(record[230]), 10)
447         else:
448             self.assertEqual(scapy.compat.orb(record[230]), 11)
449         # sourceIPv6Address
450         self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
451         # postNATSourceIPv4Address
452         self.assertEqual(self.nat_addr_n, record[225])
453         # protocolIdentifier
454         self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
455         # ingressVRFID
456         self.assertEqual(struct.pack("!I", 0), record[234])
457         # sourceTransportPort
458         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
459         # postNAPTSourceTransportPort
460         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
461
462     def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
463                                dst_port):
464         """
465         Verify IPFIX NAT64 session create and delete events
466
467         :param data: Decoded IPFIX data records
468         :param is_create: Create event if nonzero value otherwise delete event
469         :param src_addr: IPv6 source address
470         :param dst_addr: IPv4 destination address
471         :param dst_port: destination TCP port
472         """
473         self.assertEqual(1, len(data))
474         record = data[0]
475         # natEvent
476         if is_create:
477             self.assertEqual(scapy.compat.orb(record[230]), 6)
478         else:
479             self.assertEqual(scapy.compat.orb(record[230]), 7)
480         # sourceIPv6Address
481         self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
482         # destinationIPv6Address
483         self.assertEqual(socket.inet_pton(socket.AF_INET6,
484                                           self.compose_ip6(dst_addr,
485                                                            '64:ff9b::',
486                                                            96)),
487                          record[28])
488         # postNATSourceIPv4Address
489         self.assertEqual(self.nat_addr_n, record[225])
490         # postNATDestinationIPv4Address
491         self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
492                          record[226])
493         # protocolIdentifier
494         self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
495         # ingressVRFID
496         self.assertEqual(struct.pack("!I", 0), record[234])
497         # sourceTransportPort
498         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
499         # postNAPTSourceTransportPort
500         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
501         # destinationTransportPort
502         self.assertEqual(struct.pack("!H", dst_port), record[11])
503         # postNAPTDestinationTransportPort
504         self.assertEqual(struct.pack("!H", dst_port), record[228])
505
506     def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
507         message = data.decode('utf-8')
508         try:
509             message = SyslogMessage.parse(message)
510         except ParseError as e:
511             self.logger.error(e)
512             raise
513         else:
514             self.assertEqual(message.severity, SyslogSeverity.info)
515             self.assertEqual(message.appname, 'NAT')
516             self.assertEqual(message.msgid, 'SADD' if is_add else 'SDEL')
517             sd_params = message.sd.get('nsess')
518             self.assertTrue(sd_params is not None)
519             if is_ip6:
520                 self.assertEqual(sd_params.get('IATYP'), 'IPv6')
521                 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip6)
522             else:
523                 self.assertEqual(sd_params.get('IATYP'), 'IPv4')
524                 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
525                 self.assertTrue(sd_params.get('SSUBIX') is not None)
526             self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
527             self.assertEqual(sd_params.get('XATYP'), 'IPv4')
528             self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
529             self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
530             self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
531             self.assertEqual(sd_params.get('SVLAN'), '0')
532             self.assertEqual(sd_params.get('XDADDR'), self.pg1.remote_ip4)
533             self.assertEqual(sd_params.get('XDPORT'),
534                              "%d" % self.tcp_external_port)
535
536     def compose_ip6(self, ip4, pref, plen):
537         """
538         Compose IPv4-embedded IPv6 addresses
539
540         :param ip4: IPv4 address
541         :param pref: IPv6 prefix
542         :param plen: IPv6 prefix length
543         :returns: IPv4-embedded IPv6 addresses
544         """
545         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
546         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
547         if plen == 32:
548             pref_n[4] = ip4_n[0]
549             pref_n[5] = ip4_n[1]
550             pref_n[6] = ip4_n[2]
551             pref_n[7] = ip4_n[3]
552         elif plen == 40:
553             pref_n[5] = ip4_n[0]
554             pref_n[6] = ip4_n[1]
555             pref_n[7] = ip4_n[2]
556             pref_n[9] = ip4_n[3]
557         elif plen == 48:
558             pref_n[6] = ip4_n[0]
559             pref_n[7] = ip4_n[1]
560             pref_n[9] = ip4_n[2]
561             pref_n[10] = ip4_n[3]
562         elif plen == 56:
563             pref_n[7] = ip4_n[0]
564             pref_n[9] = ip4_n[1]
565             pref_n[10] = ip4_n[2]
566             pref_n[11] = ip4_n[3]
567         elif plen == 64:
568             pref_n[9] = ip4_n[0]
569             pref_n[10] = ip4_n[1]
570             pref_n[11] = ip4_n[2]
571             pref_n[12] = ip4_n[3]
572         elif plen == 96:
573             pref_n[12] = ip4_n[0]
574             pref_n[13] = ip4_n[1]
575             pref_n[14] = ip4_n[2]
576             pref_n[15] = ip4_n[3]
577         packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
578         return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
579
580     def verify_ipfix_max_sessions(self, data, limit):
581         """
582         Verify IPFIX maximum session entries exceeded event
583
584         :param data: Decoded IPFIX data records
585         :param limit: Number of maximum session entries that can be created.
586         """
587         self.assertEqual(1, len(data))
588         record = data[0]
589         # natEvent
590         self.assertEqual(scapy.compat.orb(record[230]), 13)
591         # natQuotaExceededEvent
592         self.assertEqual(struct.pack("I", 1), record[466])
593         # maxSessionEntries
594         self.assertEqual(struct.pack("I", limit), record[471])
595
596     def test_nat64_inside_interface_handles_neighbor_advertisement(self):
597         """ NAT64 inside interface handles Neighbor Advertisement """
598
599         flags = self.config_flags.NAT_IS_INSIDE
600         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
601                                           sw_if_index=self.pg5.sw_if_index)
602
603         # Try to send ping
604         ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
605                 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
606                 ICMPv6EchoRequest())
607         pkts = [ping]
608         self.pg5.add_stream(pkts)
609         self.pg_enable_capture(self.pg_interfaces)
610         self.pg_start()
611
612         # Wait for Neighbor Solicitation
613         capture = self.pg5.get_capture(len(pkts))
614         packet = capture[0]
615         try:
616             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
617             self.assertEqual(packet.haslayer(ICMPv6ND_NS), 1)
618             tgt = packet[ICMPv6ND_NS].tgt
619         except:
620             self.logger.error(ppp("Unexpected or invalid packet:", packet))
621             raise
622
623         # Send Neighbor Advertisement
624         p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
625              IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
626              ICMPv6ND_NA(tgt=tgt) /
627              ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
628         pkts = [p]
629         self.pg5.add_stream(pkts)
630         self.pg_enable_capture(self.pg_interfaces)
631         self.pg_start()
632
633         # Try to send ping again
634         pkts = [ping]
635         self.pg5.add_stream(pkts)
636         self.pg_enable_capture(self.pg_interfaces)
637         self.pg_start()
638
639         # Wait for ping reply
640         capture = self.pg5.get_capture(len(pkts))
641         packet = capture[0]
642         try:
643             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
644             self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
645             self.assertEqual(packet.haslayer(ICMPv6EchoReply), 1)
646         except:
647             self.logger.error(ppp("Unexpected or invalid packet:", packet))
648             raise
649
650     def test_pool(self):
651         """ Add/delete address to NAT64 pool """
652         nat_addr = '1.2.3.4'
653
654         self.vapi.nat64_add_del_pool_addr_range(start_addr=nat_addr,
655                                                 end_addr=nat_addr,
656                                                 vrf_id=0xFFFFFFFF, is_add=1)
657
658         addresses = self.vapi.nat64_pool_addr_dump()
659         self.assertEqual(len(addresses), 1)
660         self.assertEqual(str(addresses[0].address), nat_addr)
661
662         self.vapi.nat64_add_del_pool_addr_range(start_addr=nat_addr,
663                                                 end_addr=nat_addr,
664                                                 vrf_id=0xFFFFFFFF, is_add=0)
665
666         addresses = self.vapi.nat64_pool_addr_dump()
667         self.assertEqual(len(addresses), 0)
668
669     def test_interface(self):
670         """ Enable/disable NAT64 feature on the interface """
671         flags = self.config_flags.NAT_IS_INSIDE
672         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
673                                           sw_if_index=self.pg0.sw_if_index)
674         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
675                                           sw_if_index=self.pg1.sw_if_index)
676
677         interfaces = self.vapi.nat64_interface_dump()
678         self.assertEqual(len(interfaces), 2)
679         pg0_found = False
680         pg1_found = False
681         for intf in interfaces:
682             if intf.sw_if_index == self.pg0.sw_if_index:
683                 self.assertEqual(intf.flags, self.config_flags.NAT_IS_INSIDE)
684                 pg0_found = True
685             elif intf.sw_if_index == self.pg1.sw_if_index:
686                 self.assertEqual(intf.flags, self.config_flags.NAT_IS_OUTSIDE)
687                 pg1_found = True
688         self.assertTrue(pg0_found)
689         self.assertTrue(pg1_found)
690
691         features = self.vapi.cli("show interface features pg0")
692         self.assertIn('nat64-in2out', features)
693         features = self.vapi.cli("show interface features pg1")
694         self.assertIn('nat64-out2in', features)
695
696         self.vapi.nat64_add_del_interface(is_add=0, flags=flags,
697                                           sw_if_index=self.pg0.sw_if_index)
698         self.vapi.nat64_add_del_interface(is_add=0, flags=flags,
699                                           sw_if_index=self.pg1.sw_if_index)
700
701         interfaces = self.vapi.nat64_interface_dump()
702         self.assertEqual(len(interfaces), 0)
703
704     def test_static_bib(self):
705         """ Add/delete static BIB entry """
706         in_addr = '2001:db8:85a3::8a2e:370:7334'
707         out_addr = '10.1.1.3'
708         in_port = 1234
709         out_port = 5678
710         proto = IP_PROTOS.tcp
711
712         self.vapi.nat64_add_del_static_bib(i_addr=in_addr, o_addr=out_addr,
713                                            i_port=in_port, o_port=out_port,
714                                            proto=proto, vrf_id=0, is_add=1)
715         bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
716         static_bib_num = 0
717         for bibe in bib:
718             if bibe.flags & self.config_flags.NAT_IS_STATIC:
719                 static_bib_num += 1
720                 self.assertEqual(str(bibe.i_addr), in_addr)
721                 self.assertEqual(str(bibe.o_addr), out_addr)
722                 self.assertEqual(bibe.i_port, in_port)
723                 self.assertEqual(bibe.o_port, out_port)
724         self.assertEqual(static_bib_num, 1)
725         bibs = self.statistics.get_counter('/nat64/total-bibs')
726         self.assertEqual(bibs[0][0], 1)
727
728         self.vapi.nat64_add_del_static_bib(i_addr=in_addr, o_addr=out_addr,
729                                            i_port=in_port, o_port=out_port,
730                                            proto=proto, vrf_id=0, is_add=0)
731         bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
732         static_bib_num = 0
733         for bibe in bib:
734             if bibe.flags & self.config_flags.NAT_IS_STATIC:
735                 static_bib_num += 1
736         self.assertEqual(static_bib_num, 0)
737         bibs = self.statistics.get_counter('/nat64/total-bibs')
738         self.assertEqual(bibs[0][0], 0)
739
740     def test_set_timeouts(self):
741         """ Set NAT64 timeouts """
742         # verify default values
743         timeouts = self.vapi.nat64_get_timeouts()
744         self.assertEqual(timeouts.udp, 300)
745         self.assertEqual(timeouts.icmp, 60)
746         self.assertEqual(timeouts.tcp_transitory, 240)
747         self.assertEqual(timeouts.tcp_established, 7440)
748
749         # set and verify custom values
750         self.vapi.nat64_set_timeouts(udp=200, tcp_established=7450,
751                                      tcp_transitory=250, icmp=30)
752         timeouts = self.vapi.nat64_get_timeouts()
753         self.assertEqual(timeouts.udp, 200)
754         self.assertEqual(timeouts.icmp, 30)
755         self.assertEqual(timeouts.tcp_transitory, 250)
756         self.assertEqual(timeouts.tcp_established, 7450)
757
758     def test_dynamic(self):
759         """ NAT64 dynamic translation test """
760         self.tcp_port_in = 6303
761         self.udp_port_in = 6304
762         self.icmp_id_in = 6305
763
764         ses_num_start = self.nat64_get_ses_num()
765
766         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
767                                                 end_addr=self.nat_addr,
768                                                 vrf_id=0xFFFFFFFF,
769                                                 is_add=1)
770         flags = self.config_flags.NAT_IS_INSIDE
771         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
772                                           sw_if_index=self.pg0.sw_if_index)
773         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
774                                           sw_if_index=self.pg1.sw_if_index)
775
776         # in2out
777         tcpn = self.statistics.get_counter('/nat64/in2out/tcp')[0]
778         udpn = self.statistics.get_counter('/nat64/in2out/udp')[0]
779         icmpn = self.statistics.get_counter('/nat64/in2out/icmp')[0]
780         drops = self.statistics.get_counter('/nat64/in2out/drops')[0]
781
782         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
783         self.pg0.add_stream(pkts)
784         self.pg_enable_capture(self.pg_interfaces)
785         self.pg_start()
786         capture = self.pg1.get_capture(len(pkts))
787         self.verify_capture_out(capture, nat_ip=self.nat_addr,
788                                 dst_ip=self.pg1.remote_ip4)
789
790         if_idx = self.pg0.sw_if_index
791         cnt = self.statistics.get_counter('/nat64/in2out/tcp')[0]
792         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 1)
793         cnt = self.statistics.get_counter('/nat64/in2out/udp')[0]
794         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
795         cnt = self.statistics.get_counter('/nat64/in2out/icmp')[0]
796         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
797         cnt = self.statistics.get_counter('/nat64/in2out/drops')[0]
798         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
799
800         # out2in
801         tcpn = self.statistics.get_counter('/nat64/out2in/tcp')[0]
802         udpn = self.statistics.get_counter('/nat64/out2in/udp')[0]
803         icmpn = self.statistics.get_counter('/nat64/out2in/icmp')[0]
804         drops = self.statistics.get_counter('/nat64/out2in/drops')[0]
805
806         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
807         self.pg1.add_stream(pkts)
808         self.pg_enable_capture(self.pg_interfaces)
809         self.pg_start()
810         capture = self.pg0.get_capture(len(pkts))
811         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
812         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
813
814         if_idx = self.pg1.sw_if_index
815         cnt = self.statistics.get_counter('/nat64/out2in/tcp')[0]
816         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
817         cnt = self.statistics.get_counter('/nat64/out2in/udp')[0]
818         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
819         cnt = self.statistics.get_counter('/nat64/out2in/icmp')[0]
820         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
821         cnt = self.statistics.get_counter('/nat64/out2in/drops')[0]
822         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
823
824         bibs = self.statistics.get_counter('/nat64/total-bibs')
825         self.assertEqual(bibs[0][0], 3)
826         sessions = self.statistics.get_counter('/nat64/total-sessions')
827         self.assertEqual(sessions[0][0], 3)
828
829         # in2out
830         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
831         self.pg0.add_stream(pkts)
832         self.pg_enable_capture(self.pg_interfaces)
833         self.pg_start()
834         capture = self.pg1.get_capture(len(pkts))
835         self.verify_capture_out(capture, nat_ip=self.nat_addr,
836                                 dst_ip=self.pg1.remote_ip4)
837
838         # out2in
839         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
840         self.pg1.add_stream(pkts)
841         self.pg_enable_capture(self.pg_interfaces)
842         self.pg_start()
843         capture = self.pg0.get_capture(len(pkts))
844         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
845
846         ses_num_end = self.nat64_get_ses_num()
847
848         self.assertEqual(ses_num_end - ses_num_start, 3)
849
850         # tenant with specific VRF
851         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.vrf1_nat_addr,
852                                                 end_addr=self.vrf1_nat_addr,
853                                                 vrf_id=self.vrf1_id, is_add=1)
854         flags = self.config_flags.NAT_IS_INSIDE
855         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
856                                           sw_if_index=self.pg2.sw_if_index)
857
858         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
859         self.pg2.add_stream(pkts)
860         self.pg_enable_capture(self.pg_interfaces)
861         self.pg_start()
862         capture = self.pg1.get_capture(len(pkts))
863         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
864                                 dst_ip=self.pg1.remote_ip4)
865
866         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
867         self.pg1.add_stream(pkts)
868         self.pg_enable_capture(self.pg_interfaces)
869         self.pg_start()
870         capture = self.pg2.get_capture(len(pkts))
871         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
872
873     def test_static(self):
874         """ NAT64 static translation test """
875         self.tcp_port_in = 60303
876         self.udp_port_in = 60304
877         self.icmp_id_in = 60305
878         self.tcp_port_out = 60303
879         self.udp_port_out = 60304
880         self.icmp_id_out = 60305
881
882         ses_num_start = self.nat64_get_ses_num()
883
884         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
885                                                 end_addr=self.nat_addr,
886                                                 vrf_id=0xFFFFFFFF,
887                                                 is_add=1)
888         flags = self.config_flags.NAT_IS_INSIDE
889         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
890                                           sw_if_index=self.pg0.sw_if_index)
891         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
892                                           sw_if_index=self.pg1.sw_if_index)
893
894         self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
895                                            o_addr=self.nat_addr,
896                                            i_port=self.tcp_port_in,
897                                            o_port=self.tcp_port_out,
898                                            proto=IP_PROTOS.tcp, vrf_id=0,
899                                            is_add=1)
900         self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
901                                            o_addr=self.nat_addr,
902                                            i_port=self.udp_port_in,
903                                            o_port=self.udp_port_out,
904                                            proto=IP_PROTOS.udp, vrf_id=0,
905                                            is_add=1)
906         self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
907                                            o_addr=self.nat_addr,
908                                            i_port=self.icmp_id_in,
909                                            o_port=self.icmp_id_out,
910                                            proto=IP_PROTOS.icmp, vrf_id=0,
911                                            is_add=1)
912
913         # in2out
914         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
915         self.pg0.add_stream(pkts)
916         self.pg_enable_capture(self.pg_interfaces)
917         self.pg_start()
918         capture = self.pg1.get_capture(len(pkts))
919         self.verify_capture_out(capture, nat_ip=self.nat_addr,
920                                 dst_ip=self.pg1.remote_ip4, same_port=True)
921
922         # out2in
923         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
924         self.pg1.add_stream(pkts)
925         self.pg_enable_capture(self.pg_interfaces)
926         self.pg_start()
927         capture = self.pg0.get_capture(len(pkts))
928         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
929         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
930
931         ses_num_end = self.nat64_get_ses_num()
932
933         self.assertEqual(ses_num_end - ses_num_start, 3)
934
935     @unittest.skipUnless(running_extended_tests, "part of extended tests")
936     def test_session_timeout(self):
937         """ NAT64 session timeout """
938         self.icmp_id_in = 1234
939         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
940                                                 end_addr=self.nat_addr,
941                                                 vrf_id=0xFFFFFFFF,
942                                                 is_add=1)
943         flags = self.config_flags.NAT_IS_INSIDE
944         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
945                                           sw_if_index=self.pg0.sw_if_index)
946         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
947                                           sw_if_index=self.pg1.sw_if_index)
948         self.vapi.nat64_set_timeouts(udp=300, tcp_established=5,
949                                      tcp_transitory=5,
950                                      icmp=5)
951
952         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
953         self.pg0.add_stream(pkts)
954         self.pg_enable_capture(self.pg_interfaces)
955         self.pg_start()
956         capture = self.pg1.get_capture(len(pkts))
957
958         ses_num_before_timeout = self.nat64_get_ses_num()
959
960         sleep(15)
961
962         # ICMP and TCP session after timeout
963         ses_num_after_timeout = self.nat64_get_ses_num()
964         self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
965
966     def test_icmp_error(self):
967         """ NAT64 ICMP Error message translation """
968         self.tcp_port_in = 6303
969         self.udp_port_in = 6304
970         self.icmp_id_in = 6305
971
972         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
973                                                 end_addr=self.nat_addr,
974                                                 vrf_id=0xFFFFFFFF,
975                                                 is_add=1)
976         flags = self.config_flags.NAT_IS_INSIDE
977         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
978                                           sw_if_index=self.pg0.sw_if_index)
979         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
980                                           sw_if_index=self.pg1.sw_if_index)
981
982         # send some packets to create sessions
983         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
984         self.pg0.add_stream(pkts)
985         self.pg_enable_capture(self.pg_interfaces)
986         self.pg_start()
987         capture_ip4 = self.pg1.get_capture(len(pkts))
988         self.verify_capture_out(capture_ip4,
989                                 nat_ip=self.nat_addr,
990                                 dst_ip=self.pg1.remote_ip4)
991
992         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
993         self.pg1.add_stream(pkts)
994         self.pg_enable_capture(self.pg_interfaces)
995         self.pg_start()
996         capture_ip6 = self.pg0.get_capture(len(pkts))
997         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
998         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
999                                    self.pg0.remote_ip6)
1000
1001         # in2out
1002         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1003                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
1004                 ICMPv6DestUnreach(code=1) /
1005                 packet[IPv6] for packet in capture_ip6]
1006         self.pg0.add_stream(pkts)
1007         self.pg_enable_capture(self.pg_interfaces)
1008         self.pg_start()
1009         capture = self.pg1.get_capture(len(pkts))
1010         for packet in capture:
1011             try:
1012                 self.assertEqual(packet[IP].src, self.nat_addr)
1013                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1014                 self.assertEqual(packet[ICMP].type, 3)
1015                 self.assertEqual(packet[ICMP].code, 13)
1016                 inner = packet[IPerror]
1017                 self.assertEqual(inner.src, self.pg1.remote_ip4)
1018                 self.assertEqual(inner.dst, self.nat_addr)
1019                 self.assert_packet_checksums_valid(packet)
1020                 if inner.haslayer(TCPerror):
1021                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
1022                 elif inner.haslayer(UDPerror):
1023                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
1024                 else:
1025                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
1026             except:
1027                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1028                 raise
1029
1030         # out2in
1031         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1032                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1033                 ICMP(type=3, code=13) /
1034                 packet[IP] for packet in capture_ip4]
1035         self.pg1.add_stream(pkts)
1036         self.pg_enable_capture(self.pg_interfaces)
1037         self.pg_start()
1038         capture = self.pg0.get_capture(len(pkts))
1039         for packet in capture:
1040             try:
1041                 self.assertEqual(packet[IPv6].src, ip.src)
1042                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1043                 icmp = packet[ICMPv6DestUnreach]
1044                 self.assertEqual(icmp.code, 1)
1045                 inner = icmp[IPerror6]
1046                 self.assertEqual(inner.src, self.pg0.remote_ip6)
1047                 self.assertEqual(inner.dst, ip.src)
1048                 self.assert_icmpv6_checksum_valid(packet)
1049                 if inner.haslayer(TCPerror):
1050                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
1051                 elif inner.haslayer(UDPerror):
1052                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
1053                 else:
1054                     self.assertEqual(inner[ICMPv6EchoRequest].id,
1055                                      self.icmp_id_in)
1056             except:
1057                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1058                 raise
1059
1060     def test_hairpinning(self):
1061         """ NAT64 hairpinning """
1062
1063         client = self.pg0.remote_hosts[0]
1064         server = self.pg0.remote_hosts[1]
1065         server_tcp_in_port = 22
1066         server_tcp_out_port = 4022
1067         server_udp_in_port = 23
1068         server_udp_out_port = 4023
1069         client_tcp_in_port = 1234
1070         client_udp_in_port = 1235
1071         client_tcp_out_port = 0
1072         client_udp_out_port = 0
1073         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
1074         nat_addr_ip6 = ip.src
1075
1076         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1077                                                 end_addr=self.nat_addr,
1078                                                 vrf_id=0xFFFFFFFF,
1079                                                 is_add=1)
1080         flags = self.config_flags.NAT_IS_INSIDE
1081         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1082                                           sw_if_index=self.pg0.sw_if_index)
1083         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1084                                           sw_if_index=self.pg1.sw_if_index)
1085
1086         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1087                                            o_addr=self.nat_addr,
1088                                            i_port=server_tcp_in_port,
1089                                            o_port=server_tcp_out_port,
1090                                            proto=IP_PROTOS.tcp, vrf_id=0,
1091                                            is_add=1)
1092         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1093                                            o_addr=self.nat_addr,
1094                                            i_port=server_udp_in_port,
1095                                            o_port=server_udp_out_port,
1096                                            proto=IP_PROTOS.udp, vrf_id=0,
1097                                            is_add=1)
1098
1099         # client to server
1100         pkts = []
1101         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1102              IPv6(src=client.ip6, dst=nat_addr_ip6) /
1103              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
1104         pkts.append(p)
1105         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1106              IPv6(src=client.ip6, dst=nat_addr_ip6) /
1107              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
1108         pkts.append(p)
1109         self.pg0.add_stream(pkts)
1110         self.pg_enable_capture(self.pg_interfaces)
1111         self.pg_start()
1112         capture = self.pg0.get_capture(len(pkts))
1113         for packet in capture:
1114             try:
1115                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1116                 self.assertEqual(packet[IPv6].dst, server.ip6)
1117                 self.assert_packet_checksums_valid(packet)
1118                 if packet.haslayer(TCP):
1119                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
1120                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
1121                     client_tcp_out_port = packet[TCP].sport
1122                 else:
1123                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
1124                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
1125                     client_udp_out_port = packet[UDP].sport
1126             except:
1127                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1128                 raise
1129
1130         # server to client
1131         pkts = []
1132         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1133              IPv6(src=server.ip6, dst=nat_addr_ip6) /
1134              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
1135         pkts.append(p)
1136         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1137              IPv6(src=server.ip6, dst=nat_addr_ip6) /
1138              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
1139         pkts.append(p)
1140         self.pg0.add_stream(pkts)
1141         self.pg_enable_capture(self.pg_interfaces)
1142         self.pg_start()
1143         capture = self.pg0.get_capture(len(pkts))
1144         for packet in capture:
1145             try:
1146                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1147                 self.assertEqual(packet[IPv6].dst, client.ip6)
1148                 self.assert_packet_checksums_valid(packet)
1149                 if packet.haslayer(TCP):
1150                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
1151                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
1152                 else:
1153                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
1154                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
1155             except:
1156                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1157                 raise
1158
1159         # ICMP error
1160         pkts = []
1161         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1162                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
1163                 ICMPv6DestUnreach(code=1) /
1164                 packet[IPv6] for packet in capture]
1165         self.pg0.add_stream(pkts)
1166         self.pg_enable_capture(self.pg_interfaces)
1167         self.pg_start()
1168         capture = self.pg0.get_capture(len(pkts))
1169         for packet in capture:
1170             try:
1171                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1172                 self.assertEqual(packet[IPv6].dst, server.ip6)
1173                 icmp = packet[ICMPv6DestUnreach]
1174                 self.assertEqual(icmp.code, 1)
1175                 inner = icmp[IPerror6]
1176                 self.assertEqual(inner.src, server.ip6)
1177                 self.assertEqual(inner.dst, nat_addr_ip6)
1178                 self.assert_packet_checksums_valid(packet)
1179                 if inner.haslayer(TCPerror):
1180                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
1181                     self.assertEqual(inner[TCPerror].dport,
1182                                      client_tcp_out_port)
1183                 else:
1184                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
1185                     self.assertEqual(inner[UDPerror].dport,
1186                                      client_udp_out_port)
1187             except:
1188                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1189                 raise
1190
1191     def test_prefix(self):
1192         """ NAT64 Network-Specific Prefix """
1193
1194         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1195                                                 end_addr=self.nat_addr,
1196                                                 vrf_id=0xFFFFFFFF,
1197                                                 is_add=1)
1198         flags = self.config_flags.NAT_IS_INSIDE
1199         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1200                                           sw_if_index=self.pg0.sw_if_index)
1201         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1202                                           sw_if_index=self.pg1.sw_if_index)
1203         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.vrf1_nat_addr,
1204                                                 end_addr=self.vrf1_nat_addr,
1205                                                 vrf_id=self.vrf1_id, is_add=1)
1206         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1207                                           sw_if_index=self.pg2.sw_if_index)
1208
1209         # Add global prefix
1210         global_pref64 = "2001:db8::"
1211         global_pref64_len = 32
1212         global_pref64_str = "{}/{}".format(global_pref64, global_pref64_len)
1213         self.vapi.nat64_add_del_prefix(prefix=global_pref64_str, vrf_id=0,
1214                                        is_add=1)
1215
1216         prefix = self.vapi.nat64_prefix_dump()
1217         self.assertEqual(len(prefix), 1)
1218         self.assertEqual(str(prefix[0].prefix), global_pref64_str)
1219         self.assertEqual(prefix[0].vrf_id, 0)
1220
1221         # Add tenant specific prefix
1222         vrf1_pref64 = "2001:db8:122:300::"
1223         vrf1_pref64_len = 56
1224         vrf1_pref64_str = "{}/{}".format(vrf1_pref64, vrf1_pref64_len)
1225         self.vapi.nat64_add_del_prefix(prefix=vrf1_pref64_str,
1226                                        vrf_id=self.vrf1_id, is_add=1)
1227
1228         prefix = self.vapi.nat64_prefix_dump()
1229         self.assertEqual(len(prefix), 2)
1230
1231         # Global prefix
1232         pkts = self.create_stream_in_ip6(self.pg0,
1233                                          self.pg1,
1234                                          pref=global_pref64,
1235                                          plen=global_pref64_len)
1236         self.pg0.add_stream(pkts)
1237         self.pg_enable_capture(self.pg_interfaces)
1238         self.pg_start()
1239         capture = self.pg1.get_capture(len(pkts))
1240         self.verify_capture_out(capture, nat_ip=self.nat_addr,
1241                                 dst_ip=self.pg1.remote_ip4)
1242
1243         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1244         self.pg1.add_stream(pkts)
1245         self.pg_enable_capture(self.pg_interfaces)
1246         self.pg_start()
1247         capture = self.pg0.get_capture(len(pkts))
1248         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
1249                                   global_pref64,
1250                                   global_pref64_len)
1251         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
1252
1253         # Tenant specific prefix
1254         pkts = self.create_stream_in_ip6(self.pg2,
1255                                          self.pg1,
1256                                          pref=vrf1_pref64,
1257                                          plen=vrf1_pref64_len)
1258         self.pg2.add_stream(pkts)
1259         self.pg_enable_capture(self.pg_interfaces)
1260         self.pg_start()
1261         capture = self.pg1.get_capture(len(pkts))
1262         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
1263                                 dst_ip=self.pg1.remote_ip4)
1264
1265         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
1266         self.pg1.add_stream(pkts)
1267         self.pg_enable_capture(self.pg_interfaces)
1268         self.pg_start()
1269         capture = self.pg2.get_capture(len(pkts))
1270         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
1271                                   vrf1_pref64,
1272                                   vrf1_pref64_len)
1273         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
1274
1275     def test_unknown_proto(self):
1276         """ NAT64 translate packet with unknown protocol """
1277
1278         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1279                                                 end_addr=self.nat_addr,
1280                                                 vrf_id=0xFFFFFFFF,
1281                                                 is_add=1)
1282         flags = self.config_flags.NAT_IS_INSIDE
1283         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1284                                           sw_if_index=self.pg0.sw_if_index)
1285         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1286                                           sw_if_index=self.pg1.sw_if_index)
1287         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1288
1289         # in2out
1290         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1291              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
1292              TCP(sport=self.tcp_port_in, dport=20))
1293         self.pg0.add_stream(p)
1294         self.pg_enable_capture(self.pg_interfaces)
1295         self.pg_start()
1296         p = self.pg1.get_capture(1)
1297
1298         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1299              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
1300              GRE() /
1301              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
1302              TCP(sport=1234, dport=1234))
1303         self.pg0.add_stream(p)
1304         self.pg_enable_capture(self.pg_interfaces)
1305         self.pg_start()
1306         p = self.pg1.get_capture(1)
1307         packet = p[0]
1308         try:
1309             self.assertEqual(packet[IP].src, self.nat_addr)
1310             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1311             self.assertEqual(packet.haslayer(GRE), 1)
1312             self.assert_packet_checksums_valid(packet)
1313         except:
1314             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1315             raise
1316
1317         # out2in
1318         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1319              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1320              GRE() /
1321              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
1322              TCP(sport=1234, dport=1234))
1323         self.pg1.add_stream(p)
1324         self.pg_enable_capture(self.pg_interfaces)
1325         self.pg_start()
1326         p = self.pg0.get_capture(1)
1327         packet = p[0]
1328         try:
1329             self.assertEqual(packet[IPv6].src, remote_ip6)
1330             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1331             self.assertEqual(packet[IPv6].nh, 47)
1332         except:
1333             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1334             raise
1335
1336     def test_hairpinning_unknown_proto(self):
1337         """ NAT64 translate packet with unknown protocol - hairpinning """
1338
1339         client = self.pg0.remote_hosts[0]
1340         server = self.pg0.remote_hosts[1]
1341         server_tcp_in_port = 22
1342         server_tcp_out_port = 4022
1343         client_tcp_in_port = 1234
1344         client_tcp_out_port = 1235
1345         server_nat_ip = "10.0.0.100"
1346         client_nat_ip = "10.0.0.110"
1347         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
1348         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
1349
1350         self.vapi.nat64_add_del_pool_addr_range(start_addr=server_nat_ip,
1351                                                 end_addr=client_nat_ip,
1352                                                 vrf_id=0xFFFFFFFF,
1353                                                 is_add=1)
1354         flags = self.config_flags.NAT_IS_INSIDE
1355         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1356                                           sw_if_index=self.pg0.sw_if_index)
1357         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1358                                           sw_if_index=self.pg1.sw_if_index)
1359
1360         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1361                                            o_addr=server_nat_ip,
1362                                            i_port=server_tcp_in_port,
1363                                            o_port=server_tcp_out_port,
1364                                            proto=IP_PROTOS.tcp, vrf_id=0,
1365                                            is_add=1)
1366
1367         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1368                                            o_addr=server_nat_ip, i_port=0,
1369                                            o_port=0,
1370                                            proto=IP_PROTOS.gre, vrf_id=0,
1371                                            is_add=1)
1372
1373         self.vapi.nat64_add_del_static_bib(i_addr=client.ip6n,
1374                                            o_addr=client_nat_ip,
1375                                            i_port=client_tcp_in_port,
1376                                            o_port=client_tcp_out_port,
1377                                            proto=IP_PROTOS.tcp, vrf_id=0,
1378                                            is_add=1)
1379
1380         # client to server
1381         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1382              IPv6(src=client.ip6, dst=server_nat_ip6) /
1383              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
1384         self.pg0.add_stream(p)
1385         self.pg_enable_capture(self.pg_interfaces)
1386         self.pg_start()
1387         p = self.pg0.get_capture(1)
1388
1389         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1390              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
1391              GRE() /
1392              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
1393              TCP(sport=1234, dport=1234))
1394         self.pg0.add_stream(p)
1395         self.pg_enable_capture(self.pg_interfaces)
1396         self.pg_start()
1397         p = self.pg0.get_capture(1)
1398         packet = p[0]
1399         try:
1400             self.assertEqual(packet[IPv6].src, client_nat_ip6)
1401             self.assertEqual(packet[IPv6].dst, server.ip6)
1402             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1403         except:
1404             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1405             raise
1406
1407         # server to client
1408         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1409              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
1410              GRE() /
1411              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
1412              TCP(sport=1234, dport=1234))
1413         self.pg0.add_stream(p)
1414         self.pg_enable_capture(self.pg_interfaces)
1415         self.pg_start()
1416         p = self.pg0.get_capture(1)
1417         packet = p[0]
1418         try:
1419             self.assertEqual(packet[IPv6].src, server_nat_ip6)
1420             self.assertEqual(packet[IPv6].dst, client.ip6)
1421             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1422         except:
1423             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1424             raise
1425
1426     def test_one_armed_nat64(self):
1427         """ One armed NAT64 """
1428         external_port = 0
1429         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
1430                                            '64:ff9b::',
1431                                            96)
1432
1433         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1434                                                 end_addr=self.nat_addr,
1435                                                 vrf_id=0xFFFFFFFF,
1436                                                 is_add=1)
1437         flags = self.config_flags.NAT_IS_INSIDE
1438         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1439                                           sw_if_index=self.pg3.sw_if_index)
1440         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1441                                           sw_if_index=self.pg3.sw_if_index)
1442
1443         # in2out
1444         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
1445              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
1446              TCP(sport=12345, dport=80))
1447         self.pg3.add_stream(p)
1448         self.pg_enable_capture(self.pg_interfaces)
1449         self.pg_start()
1450         capture = self.pg3.get_capture(1)
1451         p = capture[0]
1452         try:
1453             ip = p[IP]
1454             tcp = p[TCP]
1455             self.assertEqual(ip.src, self.nat_addr)
1456             self.assertEqual(ip.dst, self.pg3.remote_ip4)
1457             self.assertNotEqual(tcp.sport, 12345)
1458             external_port = tcp.sport
1459             self.assertEqual(tcp.dport, 80)
1460             self.assert_packet_checksums_valid(p)
1461         except:
1462             self.logger.error(ppp("Unexpected or invalid packet:", p))
1463             raise
1464
1465         # out2in
1466         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
1467              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
1468              TCP(sport=80, dport=external_port))
1469         self.pg3.add_stream(p)
1470         self.pg_enable_capture(self.pg_interfaces)
1471         self.pg_start()
1472         capture = self.pg3.get_capture(1)
1473         p = capture[0]
1474         try:
1475             ip = p[IPv6]
1476             tcp = p[TCP]
1477             self.assertEqual(ip.src, remote_host_ip6)
1478             self.assertEqual(ip.dst, self.pg3.remote_ip6)
1479             self.assertEqual(tcp.sport, 80)
1480             self.assertEqual(tcp.dport, 12345)
1481             self.assert_packet_checksums_valid(p)
1482         except:
1483             self.logger.error(ppp("Unexpected or invalid packet:", p))
1484             raise
1485
1486     def test_frag_in_order(self):
1487         """ NAT64 translate fragments arriving in order """
1488         self.tcp_port_in = random.randint(1025, 65535)
1489
1490         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1491                                                 end_addr=self.nat_addr,
1492                                                 vrf_id=0xFFFFFFFF,
1493                                                 is_add=1)
1494         flags = self.config_flags.NAT_IS_INSIDE
1495         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1496                                           sw_if_index=self.pg0.sw_if_index)
1497         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1498                                           sw_if_index=self.pg1.sw_if_index)
1499
1500         # in2out
1501         data = b'a' * 200
1502         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
1503                                            self.tcp_port_in, 20, data)
1504         self.pg0.add_stream(pkts)
1505         self.pg_enable_capture(self.pg_interfaces)
1506         self.pg_start()
1507         frags = self.pg1.get_capture(len(pkts))
1508         p = self.reass_frags_and_verify(frags,
1509                                         self.nat_addr,
1510                                         self.pg1.remote_ip4)
1511         self.assertEqual(p[TCP].dport, 20)
1512         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1513         self.tcp_port_out = p[TCP].sport
1514         self.assertEqual(data, p[Raw].load)
1515
1516         # out2in
1517         data = b"A" * 4 + b"b" * 16 + b"C" * 3
1518         pkts = self.create_stream_frag(self.pg1,
1519                                        self.nat_addr,
1520                                        20,
1521                                        self.tcp_port_out,
1522                                        data)
1523         self.pg1.add_stream(pkts)
1524         self.pg_enable_capture(self.pg_interfaces)
1525         self.pg_start()
1526         frags = self.pg0.get_capture(len(pkts))
1527         self.logger.debug(ppc("Captured:", frags))
1528         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1529         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1530         self.assertEqual(p[TCP].sport, 20)
1531         self.assertEqual(p[TCP].dport, self.tcp_port_in)
1532         self.assertEqual(data, p[Raw].load)
1533
1534     def test_reass_hairpinning(self):
1535         """ NAT64 fragments hairpinning """
1536         data = b'a' * 200
1537         server = self.pg0.remote_hosts[1]
1538         server_in_port = random.randint(1025, 65535)
1539         server_out_port = random.randint(1025, 65535)
1540         client_in_port = random.randint(1025, 65535)
1541         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
1542         nat_addr_ip6 = ip.src
1543
1544         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1545                                                 end_addr=self.nat_addr,
1546                                                 vrf_id=0xFFFFFFFF,
1547                                                 is_add=1)
1548         flags = self.config_flags.NAT_IS_INSIDE
1549         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1550                                           sw_if_index=self.pg0.sw_if_index)
1551         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1552                                           sw_if_index=self.pg1.sw_if_index)
1553
1554         # add static BIB entry for server
1555         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1556                                            o_addr=self.nat_addr,
1557                                            i_port=server_in_port,
1558                                            o_port=server_out_port,
1559                                            proto=IP_PROTOS.tcp, vrf_id=0,
1560                                            is_add=1)
1561
1562         # send packet from host to server
1563         pkts = self.create_stream_frag_ip6(self.pg0,
1564                                            self.nat_addr,
1565                                            client_in_port,
1566                                            server_out_port,
1567                                            data)
1568         self.pg0.add_stream(pkts)
1569         self.pg_enable_capture(self.pg_interfaces)
1570         self.pg_start()
1571         frags = self.pg0.get_capture(len(pkts))
1572         self.logger.debug(ppc("Captured:", frags))
1573         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
1574         self.assertNotEqual(p[TCP].sport, client_in_port)
1575         self.assertEqual(p[TCP].dport, server_in_port)
1576         self.assertEqual(data, p[Raw].load)
1577
1578     def test_frag_out_of_order(self):
1579         """ NAT64 translate fragments arriving out of order """
1580         self.tcp_port_in = random.randint(1025, 65535)
1581
1582         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1583                                                 end_addr=self.nat_addr,
1584                                                 vrf_id=0xFFFFFFFF,
1585                                                 is_add=1)
1586         flags = self.config_flags.NAT_IS_INSIDE
1587         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1588                                           sw_if_index=self.pg0.sw_if_index)
1589         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1590                                           sw_if_index=self.pg1.sw_if_index)
1591
1592         # in2out
1593         data = b'a' * 200
1594         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
1595                                            self.tcp_port_in, 20, data)
1596         pkts.reverse()
1597         self.pg0.add_stream(pkts)
1598         self.pg_enable_capture(self.pg_interfaces)
1599         self.pg_start()
1600         frags = self.pg1.get_capture(len(pkts))
1601         p = self.reass_frags_and_verify(frags,
1602                                         self.nat_addr,
1603                                         self.pg1.remote_ip4)
1604         self.assertEqual(p[TCP].dport, 20)
1605         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1606         self.tcp_port_out = p[TCP].sport
1607         self.assertEqual(data, p[Raw].load)
1608
1609         # out2in
1610         data = b"A" * 4 + b"B" * 16 + b"C" * 3
1611         pkts = self.create_stream_frag(self.pg1,
1612                                        self.nat_addr,
1613                                        20,
1614                                        self.tcp_port_out,
1615                                        data)
1616         pkts.reverse()
1617         self.pg1.add_stream(pkts)
1618         self.pg_enable_capture(self.pg_interfaces)
1619         self.pg_start()
1620         frags = self.pg0.get_capture(len(pkts))
1621         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1622         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1623         self.assertEqual(p[TCP].sport, 20)
1624         self.assertEqual(p[TCP].dport, self.tcp_port_in)
1625         self.assertEqual(data, p[Raw].load)
1626
1627     def test_interface_addr(self):
1628         """ Acquire NAT64 pool addresses from interface """
1629         self.vapi.nat64_add_del_interface_addr(
1630             is_add=1,
1631             sw_if_index=self.pg4.sw_if_index)
1632
1633         # no address in NAT64 pool
1634         addresses = self.vapi.nat44_address_dump()
1635         self.assertEqual(0, len(addresses))
1636
1637         # configure interface address and check NAT64 address pool
1638         self.pg4.config_ip4()
1639         addresses = self.vapi.nat64_pool_addr_dump()
1640         self.assertEqual(len(addresses), 1)
1641
1642         self.assertEqual(str(addresses[0].address),
1643                          self.pg4.local_ip4)
1644
1645         # remove interface address and check NAT64 address pool
1646         self.pg4.unconfig_ip4()
1647         addresses = self.vapi.nat64_pool_addr_dump()
1648         self.assertEqual(0, len(addresses))
1649
1650     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1651     def test_ipfix_max_bibs_sessions(self):
1652         """ IPFIX logging maximum session and BIB entries exceeded """
1653         max_bibs = 1280
1654         max_sessions = 2560
1655         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1656                                            '64:ff9b::',
1657                                            96)
1658
1659         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1660                                                 end_addr=self.nat_addr,
1661                                                 vrf_id=0xFFFFFFFF,
1662                                                 is_add=1)
1663         flags = self.config_flags.NAT_IS_INSIDE
1664         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1665                                           sw_if_index=self.pg0.sw_if_index)
1666         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1667                                           sw_if_index=self.pg1.sw_if_index)
1668
1669         pkts = []
1670         src = ""
1671         for i in range(0, max_bibs):
1672             src = "fd01:aa::%x" % (i)
1673             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1674                  IPv6(src=src, dst=remote_host_ip6) /
1675                  TCP(sport=12345, dport=80))
1676             pkts.append(p)
1677             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1678                  IPv6(src=src, dst=remote_host_ip6) /
1679                  TCP(sport=12345, dport=22))
1680             pkts.append(p)
1681         self.pg0.add_stream(pkts)
1682         self.pg_enable_capture(self.pg_interfaces)
1683         self.pg_start()
1684         self.pg1.get_capture(max_sessions)
1685
1686         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
1687                                      src_address=self.pg3.local_ip4,
1688                                      path_mtu=512,
1689                                      template_interval=10)
1690         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1691                                            src_port=self.ipfix_src_port,
1692                                            enable=1)
1693
1694         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1695              IPv6(src=src, dst=remote_host_ip6) /
1696              TCP(sport=12345, dport=25))
1697         self.pg0.add_stream(p)
1698         self.pg_enable_capture(self.pg_interfaces)
1699         self.pg_start()
1700         self.pg1.assert_nothing_captured()
1701         sleep(1)
1702         self.vapi.ipfix_flush()
1703         capture = self.pg3.get_capture(7)
1704         ipfix = IPFIXDecoder()
1705         # first load template
1706         for p in capture:
1707             self.assertTrue(p.haslayer(IPFIX))
1708             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1709             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1710             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1711             self.assertEqual(p[UDP].dport, 4739)
1712             self.assertEqual(p[IPFIX].observationDomainID,
1713                              self.ipfix_domain_id)
1714             if p.haslayer(Template):
1715                 ipfix.add_template(p.getlayer(Template))
1716         # verify events in data set
1717         for p in capture:
1718             if p.haslayer(Data):
1719                 data = ipfix.decode_data_set(p.getlayer(Set))
1720                 self.verify_ipfix_max_sessions(data, max_sessions)
1721
1722         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1723              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1724              TCP(sport=12345, dport=80))
1725         self.pg0.add_stream(p)
1726         self.pg_enable_capture(self.pg_interfaces)
1727         self.pg_start()
1728         self.pg1.assert_nothing_captured()
1729         sleep(1)
1730         self.vapi.ipfix_flush()
1731         capture = self.pg3.get_capture(1)
1732         # verify events in data set
1733         for p in capture:
1734             self.assertTrue(p.haslayer(IPFIX))
1735             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1736             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1737             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1738             self.assertEqual(p[UDP].dport, 4739)
1739             self.assertEqual(p[IPFIX].observationDomainID,
1740                              self.ipfix_domain_id)
1741             if p.haslayer(Data):
1742                 data = ipfix.decode_data_set(p.getlayer(Set))
1743                 self.verify_ipfix_max_bibs(data, max_bibs)
1744
1745     def test_ipfix_bib_ses(self):
1746         """ IPFIX logging NAT64 BIB/session create and delete events """
1747         self.tcp_port_in = random.randint(1025, 65535)
1748         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1749                                            '64:ff9b::',
1750                                            96)
1751
1752         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1753                                                 end_addr=self.nat_addr,
1754                                                 vrf_id=0xFFFFFFFF,
1755                                                 is_add=1)
1756         flags = self.config_flags.NAT_IS_INSIDE
1757         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1758                                           sw_if_index=self.pg0.sw_if_index)
1759         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1760                                           sw_if_index=self.pg1.sw_if_index)
1761         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
1762                                      src_address=self.pg3.local_ip4,
1763                                      path_mtu=512,
1764                                      template_interval=10)
1765         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1766                                            src_port=self.ipfix_src_port,
1767                                            enable=1)
1768
1769         # Create
1770         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1771              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1772              TCP(sport=self.tcp_port_in, dport=25))
1773         self.pg0.add_stream(p)
1774         self.pg_enable_capture(self.pg_interfaces)
1775         self.pg_start()
1776         p = self.pg1.get_capture(1)
1777         self.tcp_port_out = p[0][TCP].sport
1778         self.vapi.ipfix_flush()
1779         capture = self.pg3.get_capture(8)
1780         ipfix = IPFIXDecoder()
1781         # first load template
1782         for p in capture:
1783             self.assertTrue(p.haslayer(IPFIX))
1784             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1785             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1786             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1787             self.assertEqual(p[UDP].dport, 4739)
1788             self.assertEqual(p[IPFIX].observationDomainID,
1789                              self.ipfix_domain_id)
1790             if p.haslayer(Template):
1791                 ipfix.add_template(p.getlayer(Template))
1792         # verify events in data set
1793         for p in capture:
1794             if p.haslayer(Data):
1795                 data = ipfix.decode_data_set(p.getlayer(Set))
1796                 if scapy.compat.orb(data[0][230]) == 10:
1797                     self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6)
1798                 elif scapy.compat.orb(data[0][230]) == 6:
1799                     self.verify_ipfix_nat64_ses(data,
1800                                                 1,
1801                                                 self.pg0.remote_ip6,
1802                                                 self.pg1.remote_ip4,
1803                                                 25)
1804                 else:
1805                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
1806
1807         # Delete
1808         self.pg_enable_capture(self.pg_interfaces)
1809         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1810                                                 end_addr=self.nat_addr,
1811                                                 vrf_id=0xFFFFFFFF,
1812                                                 is_add=0)
1813         self.vapi.ipfix_flush()
1814         capture = self.pg3.get_capture(2)
1815         # verify events in data set
1816         for p in capture:
1817             self.assertTrue(p.haslayer(IPFIX))
1818             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1819             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1820             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1821             self.assertEqual(p[UDP].dport, 4739)
1822             self.assertEqual(p[IPFIX].observationDomainID,
1823                              self.ipfix_domain_id)
1824             if p.haslayer(Data):
1825                 data = ipfix.decode_data_set(p.getlayer(Set))
1826                 if scapy.compat.orb(data[0][230]) == 11:
1827                     self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6)
1828                 elif scapy.compat.orb(data[0][230]) == 7:
1829                     self.verify_ipfix_nat64_ses(data,
1830                                                 0,
1831                                                 self.pg0.remote_ip6,
1832                                                 self.pg1.remote_ip4,
1833                                                 25)
1834                 else:
1835                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
1836
1837     def test_syslog_sess(self):
1838         """ Test syslog session creation and deletion """
1839         self.tcp_port_in = random.randint(1025, 65535)
1840         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1841                                            '64:ff9b::',
1842                                            96)
1843
1844         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1845                                                 end_addr=self.nat_addr,
1846                                                 vrf_id=0xFFFFFFFF,
1847                                                 is_add=1)
1848         flags = self.config_flags.NAT_IS_INSIDE
1849         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1850                                           sw_if_index=self.pg0.sw_if_index)
1851         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1852                                           sw_if_index=self.pg1.sw_if_index)
1853         self.vapi.syslog_set_filter(
1854             self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
1855         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
1856
1857         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1858              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1859              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1860         self.pg0.add_stream(p)
1861         self.pg_enable_capture(self.pg_interfaces)
1862         self.pg_start()
1863         p = self.pg1.get_capture(1)
1864         self.tcp_port_out = p[0][TCP].sport
1865         capture = self.pg3.get_capture(1)
1866         self.verify_syslog_sess(capture[0][Raw].load, is_ip6=True)
1867
1868         self.pg_enable_capture(self.pg_interfaces)
1869         self.pg_start()
1870         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1871                                                 end_addr=self.nat_addr,
1872                                                 vrf_id=0xFFFFFFFF,
1873                                                 is_add=0)
1874         capture = self.pg3.get_capture(1)
1875         self.verify_syslog_sess(capture[0][Raw].load, False, True)
1876
1877     def nat64_get_ses_num(self):
1878         """
1879         Return number of active NAT64 sessions.
1880         """
1881         st = self.vapi.nat64_st_dump(proto=255)
1882         return len(st)
1883
1884     def clear_nat64(self):
1885         """
1886         Clear NAT64 configuration.
1887         """
1888         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1889                                            src_port=self.ipfix_src_port,
1890                                            enable=0)
1891         self.ipfix_src_port = 4739
1892         self.ipfix_domain_id = 1
1893
1894         self.vapi.syslog_set_filter(
1895             self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
1896
1897         self.vapi.nat64_set_timeouts(udp=300, tcp_established=7440,
1898                                      tcp_transitory=240, icmp=60)
1899
1900         interfaces = self.vapi.nat64_interface_dump()
1901         for intf in interfaces:
1902             self.vapi.nat64_add_del_interface(is_add=0, flags=intf.flags,
1903                                               sw_if_index=intf.sw_if_index)
1904
1905         bib = self.vapi.nat64_bib_dump(proto=255)
1906         for bibe in bib:
1907             if bibe.flags & self.config_flags.NAT_IS_STATIC:
1908                 self.vapi.nat64_add_del_static_bib(i_addr=bibe.i_addr,
1909                                                    o_addr=bibe.o_addr,
1910                                                    i_port=bibe.i_port,
1911                                                    o_port=bibe.o_port,
1912                                                    proto=bibe.proto,
1913                                                    vrf_id=bibe.vrf_id,
1914                                                    is_add=0)
1915
1916         adresses = self.vapi.nat64_pool_addr_dump()
1917         for addr in adresses:
1918             self.vapi.nat64_add_del_pool_addr_range(start_addr=addr.address,
1919                                                     end_addr=addr.address,
1920                                                     vrf_id=addr.vrf_id,
1921                                                     is_add=0)
1922
1923         prefixes = self.vapi.nat64_prefix_dump()
1924         for prefix in prefixes:
1925             self.vapi.nat64_add_del_prefix(prefix=str(prefix.prefix),
1926                                            vrf_id=prefix.vrf_id, is_add=0)
1927
1928         bibs = self.statistics.get_counter('/nat64/total-bibs')
1929         self.assertEqual(bibs[0][0], 0)
1930         sessions = self.statistics.get_counter('/nat64/total-sessions')
1931         self.assertEqual(sessions[0][0], 0)
1932
1933
1934 if __name__ == '__main__':
1935     unittest.main(testRunner=VppTestRunner)