vlib: add virtual time support
[vpp.git] / test / test_nat64.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import random
5 import socket
6 import struct
7 import unittest
8 from io import BytesIO
9
10 import scapy.compat
11 from framework import tag_fixme_vpp_workers
12 from framework import VppTestCase, VppTestRunner, running_extended_tests
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from scapy.data import IP_PROTOS
15 from scapy.layers.inet import IP, TCP, UDP, ICMP
16 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
17 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
18 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
19     ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
20 from scapy.layers.l2 import Ether, GRE
21 from scapy.packet import Raw
22 from syslog_rfc5424_parser import SyslogMessage, ParseError
23 from syslog_rfc5424_parser.constants import SyslogSeverity
24 from util import ppc, ppp
25 from vpp_papi import VppEnum
26
27
28 @tag_fixme_vpp_workers
29 class TestNAT64(VppTestCase):
30     """ NAT64 Test Cases """
31
32     @property
33     def SYSLOG_SEVERITY(self):
34         return VppEnum.vl_api_syslog_severity_t
35
36     @property
37     def config_flags(self):
38         return VppEnum.vl_api_nat_config_flags_t
39
40     @classmethod
41     def setUpClass(cls):
42         super(TestNAT64, cls).setUpClass()
43
44         cls.tcp_port_in = 6303
45         cls.tcp_port_out = 6303
46         cls.udp_port_in = 6304
47         cls.udp_port_out = 6304
48         cls.icmp_id_in = 6305
49         cls.icmp_id_out = 6305
50         cls.tcp_external_port = 80
51         cls.nat_addr = '10.0.0.3'
52         cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
53         cls.vrf1_id = 10
54         cls.vrf1_nat_addr = '10.0.10.3'
55         cls.ipfix_src_port = 4739
56         cls.ipfix_domain_id = 1
57
58         cls.create_pg_interfaces(range(6))
59         cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
60         cls.ip6_interfaces.append(cls.pg_interfaces[2])
61         cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
62
63         cls.vapi.ip_table_add_del(is_add=1,
64                                   table={'table_id': cls.vrf1_id,
65                                          'is_ip6': 1})
66
67         cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
68
69         cls.pg0.generate_remote_hosts(2)
70
71         for i in cls.ip6_interfaces:
72             i.admin_up()
73             i.config_ip6()
74             i.configure_ipv6_neighbors()
75
76         for i in cls.ip4_interfaces:
77             i.admin_up()
78             i.config_ip4()
79             i.resolve_arp()
80
81         cls.pg3.admin_up()
82         cls.pg3.config_ip4()
83         cls.pg3.resolve_arp()
84         cls.pg3.config_ip6()
85         cls.pg3.configure_ipv6_neighbors()
86
87         cls.pg5.admin_up()
88         cls.pg5.config_ip6()
89
90     @classmethod
91     def tearDownClass(cls):
92         super(TestNAT64, cls).tearDownClass()
93
94     def setUp(self):
95         super(TestNAT64, self).setUp()
96         self.vapi.nat64_plugin_enable_disable(enable=1,
97                                               bib_buckets=128, st_buckets=256)
98
99     def tearDown(self):
100         super(TestNAT64, self).tearDown()
101         if not self.vpp_dead:
102             self.vapi.nat64_plugin_enable_disable(enable=0)
103
104     def show_commands_at_teardown(self):
105         self.logger.info(self.vapi.cli("show nat64 pool"))
106         self.logger.info(self.vapi.cli("show nat64 interfaces"))
107         self.logger.info(self.vapi.cli("show nat64 prefix"))
108         self.logger.info(self.vapi.cli("show nat64 bib all"))
109         self.logger.info(self.vapi.cli("show nat64 session table all"))
110
111     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
112         """
113         Create IPv6 packet stream for inside network
114
115         :param in_if: Inside interface
116         :param out_if: Outside interface
117         :param ttl: Hop Limit of generated packets
118         :param pref: NAT64 prefix
119         :param plen: NAT64 prefix length
120         """
121         pkts = []
122         if pref is None:
123             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
124         else:
125             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
126
127         # TCP
128         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
129              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
130              TCP(sport=self.tcp_port_in, dport=20))
131         pkts.append(p)
132
133         # UDP
134         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
135              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
136              UDP(sport=self.udp_port_in, dport=20))
137         pkts.append(p)
138
139         # ICMP
140         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
141              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
142              ICMPv6EchoRequest(id=self.icmp_id_in))
143         pkts.append(p)
144
145         return pkts
146
147     def create_stream_out(self, out_if, dst_ip=None, ttl=64,
148                           use_inside_ports=False):
149         """
150         Create packet stream for outside network
151
152         :param out_if: Outside interface
153         :param dst_ip: Destination IP address (Default use global NAT address)
154         :param ttl: TTL of generated packets
155         :param use_inside_ports: Use inside NAT ports as destination ports
156                instead of outside ports
157         """
158         if dst_ip is None:
159             dst_ip = self.nat_addr
160         if not use_inside_ports:
161             tcp_port = self.tcp_port_out
162             udp_port = self.udp_port_out
163             icmp_id = self.icmp_id_out
164         else:
165             tcp_port = self.tcp_port_in
166             udp_port = self.udp_port_in
167             icmp_id = self.icmp_id_in
168         pkts = []
169         # TCP
170         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
171              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
172              TCP(dport=tcp_port, sport=20))
173         pkts.extend([p, p])
174
175         # UDP
176         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
177              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
178              UDP(dport=udp_port, sport=20))
179         pkts.append(p)
180
181         # ICMP
182         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
183              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
184              ICMP(id=icmp_id, type='echo-reply'))
185         pkts.append(p)
186
187         return pkts
188
189     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
190                            dst_ip=None, is_ip6=False, ignore_port=False):
191         """
192         Verify captured packets on outside network
193
194         :param capture: Captured packets
195         :param nat_ip: Translated IP address (Default use global NAT address)
196         :param same_port: Source port number is not translated (Default False)
197         :param dst_ip: Destination IP address (Default do not verify)
198         :param is_ip6: If L3 protocol is IPv6 (Default False)
199         """
200         if is_ip6:
201             IP46 = IPv6
202             ICMP46 = ICMPv6EchoRequest
203         else:
204             IP46 = IP
205             ICMP46 = ICMP
206         if nat_ip is None:
207             nat_ip = self.nat_addr
208         for packet in capture:
209             try:
210                 if not is_ip6:
211                     self.assert_packet_checksums_valid(packet)
212                 self.assertEqual(packet[IP46].src, nat_ip)
213                 if dst_ip is not None:
214                     self.assertEqual(packet[IP46].dst, dst_ip)
215                 if packet.haslayer(TCP):
216                     if not ignore_port:
217                         if same_port:
218                             self.assertEqual(
219                                 packet[TCP].sport, self.tcp_port_in)
220                         else:
221                             self.assertNotEqual(
222                                 packet[TCP].sport, self.tcp_port_in)
223                     self.tcp_port_out = packet[TCP].sport
224                     self.assert_packet_checksums_valid(packet)
225                 elif packet.haslayer(UDP):
226                     if not ignore_port:
227                         if same_port:
228                             self.assertEqual(
229                                 packet[UDP].sport, self.udp_port_in)
230                         else:
231                             self.assertNotEqual(
232                                 packet[UDP].sport, self.udp_port_in)
233                     self.udp_port_out = packet[UDP].sport
234                 else:
235                     if not ignore_port:
236                         if same_port:
237                             self.assertEqual(
238                                 packet[ICMP46].id, self.icmp_id_in)
239                         else:
240                             self.assertNotEqual(
241                                 packet[ICMP46].id, self.icmp_id_in)
242                     self.icmp_id_out = packet[ICMP46].id
243                     self.assert_packet_checksums_valid(packet)
244             except:
245                 self.logger.error(ppp("Unexpected or invalid packet "
246                                       "(outside network):", packet))
247                 raise
248
249     def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
250         """
251         Verify captured IPv6 packets on inside network
252
253         :param capture: Captured packets
254         :param src_ip: Source IP
255         :param dst_ip: Destination IP address
256         """
257         for packet in capture:
258             try:
259                 self.assertEqual(packet[IPv6].src, src_ip)
260                 self.assertEqual(packet[IPv6].dst, dst_ip)
261                 self.assert_packet_checksums_valid(packet)
262                 if packet.haslayer(TCP):
263                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
264                 elif packet.haslayer(UDP):
265                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
266                 else:
267                     self.assertEqual(packet[ICMPv6EchoReply].id,
268                                      self.icmp_id_in)
269             except:
270                 self.logger.error(ppp("Unexpected or invalid packet "
271                                       "(inside network):", packet))
272                 raise
273
274     def create_stream_frag(self, src_if, dst, sport, dport, data,
275                            proto=IP_PROTOS.tcp, echo_reply=False):
276         """
277         Create fragmented packet stream
278
279         :param src_if: Source interface
280         :param dst: Destination IPv4 address
281         :param sport: Source port
282         :param dport: Destination port
283         :param data: Payload data
284         :param proto: protocol (TCP, UDP, ICMP)
285         :param echo_reply: use echo_reply if protocol is ICMP
286         :returns: Fragments
287         """
288         if proto == IP_PROTOS.tcp:
289             p = (IP(src=src_if.remote_ip4, dst=dst) /
290                  TCP(sport=sport, dport=dport) /
291                  Raw(data))
292             p = p.__class__(scapy.compat.raw(p))
293             chksum = p[TCP].chksum
294             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
295         elif proto == IP_PROTOS.udp:
296             proto_header = UDP(sport=sport, dport=dport)
297         elif proto == IP_PROTOS.icmp:
298             if not echo_reply:
299                 proto_header = ICMP(id=sport, type='echo-request')
300             else:
301                 proto_header = ICMP(id=sport, type='echo-reply')
302         else:
303             raise Exception("Unsupported protocol")
304         id = random.randint(0, 65535)
305         pkts = []
306         if proto == IP_PROTOS.tcp:
307             raw = Raw(data[0:4])
308         else:
309             raw = Raw(data[0:16])
310         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
311              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
312              proto_header /
313              raw)
314         pkts.append(p)
315         if proto == IP_PROTOS.tcp:
316             raw = Raw(data[4:20])
317         else:
318             raw = Raw(data[16:32])
319         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
320              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
321                 proto=proto) /
322              raw)
323         pkts.append(p)
324         if proto == IP_PROTOS.tcp:
325             raw = Raw(data[20:])
326         else:
327             raw = Raw(data[32:])
328         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
329              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
330                 id=id) /
331              raw)
332         pkts.append(p)
333         return pkts
334
335     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
336                                pref=None, plen=0, frag_size=128):
337         """
338         Create fragmented packet stream
339
340         :param src_if: Source interface
341         :param dst: Destination IPv4 address
342         :param sport: Source TCP port
343         :param dport: Destination TCP port
344         :param data: Payload data
345         :param pref: NAT64 prefix
346         :param plen: NAT64 prefix length
347         :param fragsize: size of fragments
348         :returns: Fragments
349         """
350         if pref is None:
351             dst_ip6 = ''.join(['64:ff9b::', dst])
352         else:
353             dst_ip6 = self.compose_ip6(dst, pref, plen)
354
355         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
356              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
357              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
358              TCP(sport=sport, dport=dport) /
359              Raw(data))
360
361         return fragment6(p, frag_size)
362
363     def reass_frags_and_verify(self, frags, src, dst):
364         """
365         Reassemble and verify fragmented packet
366
367         :param frags: Captured fragments
368         :param src: Source IPv4 address to verify
369         :param dst: Destination IPv4 address to verify
370
371         :returns: Reassembled IPv4 packet
372         """
373         buffer = BytesIO()
374         for p in frags:
375             self.assertEqual(p[IP].src, src)
376             self.assertEqual(p[IP].dst, dst)
377             self.assert_ip_checksum_valid(p)
378             buffer.seek(p[IP].frag * 8)
379             buffer.write(bytes(p[IP].payload))
380         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
381                 proto=frags[0][IP].proto)
382         if ip.proto == IP_PROTOS.tcp:
383             p = (ip / TCP(buffer.getvalue()))
384             self.logger.debug(ppp("Reassembled:", p))
385             self.assert_tcp_checksum_valid(p)
386         elif ip.proto == IP_PROTOS.udp:
387             p = (ip / UDP(buffer.getvalue()[:8]) /
388                  Raw(buffer.getvalue()[8:]))
389         elif ip.proto == IP_PROTOS.icmp:
390             p = (ip / ICMP(buffer.getvalue()))
391         return p
392
393     def reass_frags_and_verify_ip6(self, frags, src, dst):
394         """
395         Reassemble and verify fragmented packet
396
397         :param frags: Captured fragments
398         :param src: Source IPv6 address to verify
399         :param dst: Destination IPv6 address to verify
400
401         :returns: Reassembled IPv6 packet
402         """
403         buffer = BytesIO()
404         for p in frags:
405             self.assertEqual(p[IPv6].src, src)
406             self.assertEqual(p[IPv6].dst, dst)
407             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
408             buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
409         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
410                   nh=frags[0][IPv6ExtHdrFragment].nh)
411         if ip.nh == IP_PROTOS.tcp:
412             p = (ip / TCP(buffer.getvalue()))
413         elif ip.nh == IP_PROTOS.udp:
414             p = (ip / UDP(buffer.getvalue()))
415         self.logger.debug(ppp("Reassembled:", p))
416         self.assert_packet_checksums_valid(p)
417         return p
418
419     def verify_ipfix_max_bibs(self, data, limit):
420         """
421         Verify IPFIX maximum BIB entries exceeded event
422
423         :param data: Decoded IPFIX data records
424         :param limit: Number of maximum BIB entries that can be created.
425         """
426         self.assertEqual(1, len(data))
427         record = data[0]
428         # natEvent
429         self.assertEqual(scapy.compat.orb(record[230]), 13)
430         # natQuotaExceededEvent
431         self.assertEqual(struct.pack("I", 2), record[466])
432         # maxBIBEntries
433         self.assertEqual(struct.pack("I", limit), record[472])
434
435     def verify_ipfix_bib(self, data, is_create, src_addr):
436         """
437         Verify IPFIX NAT64 BIB create and delete events
438
439         :param data: Decoded IPFIX data records
440         :param is_create: Create event if nonzero value otherwise delete event
441         :param src_addr: IPv6 source address
442         """
443         self.assertEqual(1, len(data))
444         record = data[0]
445         # natEvent
446         if is_create:
447             self.assertEqual(scapy.compat.orb(record[230]), 10)
448         else:
449             self.assertEqual(scapy.compat.orb(record[230]), 11)
450         # sourceIPv6Address
451         self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
452         # postNATSourceIPv4Address
453         self.assertEqual(self.nat_addr_n, record[225])
454         # protocolIdentifier
455         self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
456         # ingressVRFID
457         self.assertEqual(struct.pack("!I", 0), record[234])
458         # sourceTransportPort
459         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
460         # postNAPTSourceTransportPort
461         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
462
463     def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
464                                dst_port):
465         """
466         Verify IPFIX NAT64 session create and delete events
467
468         :param data: Decoded IPFIX data records
469         :param is_create: Create event if nonzero value otherwise delete event
470         :param src_addr: IPv6 source address
471         :param dst_addr: IPv4 destination address
472         :param dst_port: destination TCP port
473         """
474         self.assertEqual(1, len(data))
475         record = data[0]
476         # natEvent
477         if is_create:
478             self.assertEqual(scapy.compat.orb(record[230]), 6)
479         else:
480             self.assertEqual(scapy.compat.orb(record[230]), 7)
481         # sourceIPv6Address
482         self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
483         # destinationIPv6Address
484         self.assertEqual(socket.inet_pton(socket.AF_INET6,
485                                           self.compose_ip6(dst_addr,
486                                                            '64:ff9b::',
487                                                            96)),
488                          record[28])
489         # postNATSourceIPv4Address
490         self.assertEqual(self.nat_addr_n, record[225])
491         # postNATDestinationIPv4Address
492         self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
493                          record[226])
494         # protocolIdentifier
495         self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
496         # ingressVRFID
497         self.assertEqual(struct.pack("!I", 0), record[234])
498         # sourceTransportPort
499         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
500         # postNAPTSourceTransportPort
501         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
502         # destinationTransportPort
503         self.assertEqual(struct.pack("!H", dst_port), record[11])
504         # postNAPTDestinationTransportPort
505         self.assertEqual(struct.pack("!H", dst_port), record[228])
506
507     def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
508         message = data.decode('utf-8')
509         try:
510             message = SyslogMessage.parse(message)
511         except ParseError as e:
512             self.logger.error(e)
513             raise
514         else:
515             self.assertEqual(message.severity, SyslogSeverity.info)
516             self.assertEqual(message.appname, 'NAT')
517             self.assertEqual(message.msgid, 'SADD' if is_add else 'SDEL')
518             sd_params = message.sd.get('nsess')
519             self.assertTrue(sd_params is not None)
520             if is_ip6:
521                 self.assertEqual(sd_params.get('IATYP'), 'IPv6')
522                 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip6)
523             else:
524                 self.assertEqual(sd_params.get('IATYP'), 'IPv4')
525                 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
526                 self.assertTrue(sd_params.get('SSUBIX') is not None)
527             self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
528             self.assertEqual(sd_params.get('XATYP'), 'IPv4')
529             self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
530             self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
531             self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
532             self.assertEqual(sd_params.get('SVLAN'), '0')
533             self.assertEqual(sd_params.get('XDADDR'), self.pg1.remote_ip4)
534             self.assertEqual(sd_params.get('XDPORT'),
535                              "%d" % self.tcp_external_port)
536
537     def compose_ip6(self, ip4, pref, plen):
538         """
539         Compose IPv4-embedded IPv6 addresses
540
541         :param ip4: IPv4 address
542         :param pref: IPv6 prefix
543         :param plen: IPv6 prefix length
544         :returns: IPv4-embedded IPv6 addresses
545         """
546         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
547         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
548         if plen == 32:
549             pref_n[4] = ip4_n[0]
550             pref_n[5] = ip4_n[1]
551             pref_n[6] = ip4_n[2]
552             pref_n[7] = ip4_n[3]
553         elif plen == 40:
554             pref_n[5] = ip4_n[0]
555             pref_n[6] = ip4_n[1]
556             pref_n[7] = ip4_n[2]
557             pref_n[9] = ip4_n[3]
558         elif plen == 48:
559             pref_n[6] = ip4_n[0]
560             pref_n[7] = ip4_n[1]
561             pref_n[9] = ip4_n[2]
562             pref_n[10] = ip4_n[3]
563         elif plen == 56:
564             pref_n[7] = ip4_n[0]
565             pref_n[9] = ip4_n[1]
566             pref_n[10] = ip4_n[2]
567             pref_n[11] = ip4_n[3]
568         elif plen == 64:
569             pref_n[9] = ip4_n[0]
570             pref_n[10] = ip4_n[1]
571             pref_n[11] = ip4_n[2]
572             pref_n[12] = ip4_n[3]
573         elif plen == 96:
574             pref_n[12] = ip4_n[0]
575             pref_n[13] = ip4_n[1]
576             pref_n[14] = ip4_n[2]
577             pref_n[15] = ip4_n[3]
578         packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
579         return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
580
581     def verify_ipfix_max_sessions(self, data, limit):
582         """
583         Verify IPFIX maximum session entries exceeded event
584
585         :param data: Decoded IPFIX data records
586         :param limit: Number of maximum session entries that can be created.
587         """
588         self.assertEqual(1, len(data))
589         record = data[0]
590         # natEvent
591         self.assertEqual(scapy.compat.orb(record[230]), 13)
592         # natQuotaExceededEvent
593         self.assertEqual(struct.pack("I", 1), record[466])
594         # maxSessionEntries
595         self.assertEqual(struct.pack("I", limit), record[471])
596
597     def test_nat64_inside_interface_handles_neighbor_advertisement(self):
598         """ NAT64 inside interface handles Neighbor Advertisement """
599
600         flags = self.config_flags.NAT_IS_INSIDE
601         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
602                                           sw_if_index=self.pg5.sw_if_index)
603
604         # Try to send ping
605         ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
606                 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
607                 ICMPv6EchoRequest())
608         pkts = [ping]
609         self.pg5.add_stream(pkts)
610         self.pg_enable_capture(self.pg_interfaces)
611         self.pg_start()
612
613         # Wait for Neighbor Solicitation
614         capture = self.pg5.get_capture(len(pkts))
615         packet = capture[0]
616         try:
617             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
618             self.assertEqual(packet.haslayer(ICMPv6ND_NS), 1)
619             tgt = packet[ICMPv6ND_NS].tgt
620         except:
621             self.logger.error(ppp("Unexpected or invalid packet:", packet))
622             raise
623
624         # Send Neighbor Advertisement
625         p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
626              IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
627              ICMPv6ND_NA(tgt=tgt) /
628              ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
629         pkts = [p]
630         self.pg5.add_stream(pkts)
631         self.pg_enable_capture(self.pg_interfaces)
632         self.pg_start()
633
634         # Try to send ping again
635         pkts = [ping]
636         self.pg5.add_stream(pkts)
637         self.pg_enable_capture(self.pg_interfaces)
638         self.pg_start()
639
640         # Wait for ping reply
641         capture = self.pg5.get_capture(len(pkts))
642         packet = capture[0]
643         try:
644             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
645             self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
646             self.assertEqual(packet.haslayer(ICMPv6EchoReply), 1)
647         except:
648             self.logger.error(ppp("Unexpected or invalid packet:", packet))
649             raise
650
651     def test_pool(self):
652         """ Add/delete address to NAT64 pool """
653         nat_addr = '1.2.3.4'
654
655         self.vapi.nat64_add_del_pool_addr_range(start_addr=nat_addr,
656                                                 end_addr=nat_addr,
657                                                 vrf_id=0xFFFFFFFF, is_add=1)
658
659         addresses = self.vapi.nat64_pool_addr_dump()
660         self.assertEqual(len(addresses), 1)
661         self.assertEqual(str(addresses[0].address), nat_addr)
662
663         self.vapi.nat64_add_del_pool_addr_range(start_addr=nat_addr,
664                                                 end_addr=nat_addr,
665                                                 vrf_id=0xFFFFFFFF, is_add=0)
666
667         addresses = self.vapi.nat64_pool_addr_dump()
668         self.assertEqual(len(addresses), 0)
669
670     def test_interface(self):
671         """ Enable/disable NAT64 feature on the interface """
672         flags = self.config_flags.NAT_IS_INSIDE
673         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
674                                           sw_if_index=self.pg0.sw_if_index)
675         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
676                                           sw_if_index=self.pg1.sw_if_index)
677
678         interfaces = self.vapi.nat64_interface_dump()
679         self.assertEqual(len(interfaces), 2)
680         pg0_found = False
681         pg1_found = False
682         for intf in interfaces:
683             if intf.sw_if_index == self.pg0.sw_if_index:
684                 self.assertEqual(intf.flags, self.config_flags.NAT_IS_INSIDE)
685                 pg0_found = True
686             elif intf.sw_if_index == self.pg1.sw_if_index:
687                 self.assertEqual(intf.flags, self.config_flags.NAT_IS_OUTSIDE)
688                 pg1_found = True
689         self.assertTrue(pg0_found)
690         self.assertTrue(pg1_found)
691
692         features = self.vapi.cli("show interface features pg0")
693         self.assertIn('nat64-in2out', features)
694         features = self.vapi.cli("show interface features pg1")
695         self.assertIn('nat64-out2in', features)
696
697         self.vapi.nat64_add_del_interface(is_add=0, flags=flags,
698                                           sw_if_index=self.pg0.sw_if_index)
699         self.vapi.nat64_add_del_interface(is_add=0, flags=flags,
700                                           sw_if_index=self.pg1.sw_if_index)
701
702         interfaces = self.vapi.nat64_interface_dump()
703         self.assertEqual(len(interfaces), 0)
704
705     def test_static_bib(self):
706         """ Add/delete static BIB entry """
707         in_addr = '2001:db8:85a3::8a2e:370:7334'
708         out_addr = '10.1.1.3'
709         in_port = 1234
710         out_port = 5678
711         proto = IP_PROTOS.tcp
712
713         self.vapi.nat64_add_del_static_bib(i_addr=in_addr, o_addr=out_addr,
714                                            i_port=in_port, o_port=out_port,
715                                            proto=proto, vrf_id=0, is_add=1)
716         bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
717         static_bib_num = 0
718         for bibe in bib:
719             if bibe.flags & self.config_flags.NAT_IS_STATIC:
720                 static_bib_num += 1
721                 self.assertEqual(str(bibe.i_addr), in_addr)
722                 self.assertEqual(str(bibe.o_addr), out_addr)
723                 self.assertEqual(bibe.i_port, in_port)
724                 self.assertEqual(bibe.o_port, out_port)
725         self.assertEqual(static_bib_num, 1)
726         bibs = self.statistics.get_counter('/nat64/total-bibs')
727         self.assertEqual(bibs[0][0], 1)
728
729         self.vapi.nat64_add_del_static_bib(i_addr=in_addr, o_addr=out_addr,
730                                            i_port=in_port, o_port=out_port,
731                                            proto=proto, vrf_id=0, is_add=0)
732         bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
733         static_bib_num = 0
734         for bibe in bib:
735             if bibe.flags & self.config_flags.NAT_IS_STATIC:
736                 static_bib_num += 1
737         self.assertEqual(static_bib_num, 0)
738         bibs = self.statistics.get_counter('/nat64/total-bibs')
739         self.assertEqual(bibs[0][0], 0)
740
741     def test_set_timeouts(self):
742         """ Set NAT64 timeouts """
743         # verify default values
744         timeouts = self.vapi.nat64_get_timeouts()
745         self.assertEqual(timeouts.udp, 300)
746         self.assertEqual(timeouts.icmp, 60)
747         self.assertEqual(timeouts.tcp_transitory, 240)
748         self.assertEqual(timeouts.tcp_established, 7440)
749
750         # set and verify custom values
751         self.vapi.nat64_set_timeouts(udp=200, tcp_established=7450,
752                                      tcp_transitory=250, icmp=30)
753         timeouts = self.vapi.nat64_get_timeouts()
754         self.assertEqual(timeouts.udp, 200)
755         self.assertEqual(timeouts.icmp, 30)
756         self.assertEqual(timeouts.tcp_transitory, 250)
757         self.assertEqual(timeouts.tcp_established, 7450)
758
759     def test_dynamic(self):
760         """ NAT64 dynamic translation test """
761         self.tcp_port_in = 6303
762         self.udp_port_in = 6304
763         self.icmp_id_in = 6305
764
765         ses_num_start = self.nat64_get_ses_num()
766
767         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
768                                                 end_addr=self.nat_addr,
769                                                 vrf_id=0xFFFFFFFF,
770                                                 is_add=1)
771         flags = self.config_flags.NAT_IS_INSIDE
772         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
773                                           sw_if_index=self.pg0.sw_if_index)
774         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
775                                           sw_if_index=self.pg1.sw_if_index)
776
777         # in2out
778         tcpn = self.statistics.get_counter('/nat64/in2out/tcp')[0]
779         udpn = self.statistics.get_counter('/nat64/in2out/udp')[0]
780         icmpn = self.statistics.get_counter('/nat64/in2out/icmp')[0]
781         drops = self.statistics.get_counter('/nat64/in2out/drops')[0]
782
783         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
784         self.pg0.add_stream(pkts)
785         self.pg_enable_capture(self.pg_interfaces)
786         self.pg_start()
787         capture = self.pg1.get_capture(len(pkts))
788         self.verify_capture_out(capture, nat_ip=self.nat_addr,
789                                 dst_ip=self.pg1.remote_ip4)
790
791         if_idx = self.pg0.sw_if_index
792         cnt = self.statistics.get_counter('/nat64/in2out/tcp')[0]
793         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 1)
794         cnt = self.statistics.get_counter('/nat64/in2out/udp')[0]
795         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
796         cnt = self.statistics.get_counter('/nat64/in2out/icmp')[0]
797         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
798         cnt = self.statistics.get_counter('/nat64/in2out/drops')[0]
799         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
800
801         # out2in
802         tcpn = self.statistics.get_counter('/nat64/out2in/tcp')[0]
803         udpn = self.statistics.get_counter('/nat64/out2in/udp')[0]
804         icmpn = self.statistics.get_counter('/nat64/out2in/icmp')[0]
805         drops = self.statistics.get_counter('/nat64/out2in/drops')[0]
806
807         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
808         self.pg1.add_stream(pkts)
809         self.pg_enable_capture(self.pg_interfaces)
810         self.pg_start()
811         capture = self.pg0.get_capture(len(pkts))
812         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
813         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
814
815         if_idx = self.pg1.sw_if_index
816         cnt = self.statistics.get_counter('/nat64/out2in/tcp')[0]
817         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
818         cnt = self.statistics.get_counter('/nat64/out2in/udp')[0]
819         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
820         cnt = self.statistics.get_counter('/nat64/out2in/icmp')[0]
821         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
822         cnt = self.statistics.get_counter('/nat64/out2in/drops')[0]
823         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
824
825         bibs = self.statistics.get_counter('/nat64/total-bibs')
826         self.assertEqual(bibs[0][0], 3)
827         sessions = self.statistics.get_counter('/nat64/total-sessions')
828         self.assertEqual(sessions[0][0], 3)
829
830         # in2out
831         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
832         self.pg0.add_stream(pkts)
833         self.pg_enable_capture(self.pg_interfaces)
834         self.pg_start()
835         capture = self.pg1.get_capture(len(pkts))
836         self.verify_capture_out(capture, nat_ip=self.nat_addr,
837                                 dst_ip=self.pg1.remote_ip4)
838
839         # out2in
840         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
841         self.pg1.add_stream(pkts)
842         self.pg_enable_capture(self.pg_interfaces)
843         self.pg_start()
844         capture = self.pg0.get_capture(len(pkts))
845         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
846
847         ses_num_end = self.nat64_get_ses_num()
848
849         self.assertEqual(ses_num_end - ses_num_start, 3)
850
851         # tenant with specific VRF
852         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.vrf1_nat_addr,
853                                                 end_addr=self.vrf1_nat_addr,
854                                                 vrf_id=self.vrf1_id, is_add=1)
855         flags = self.config_flags.NAT_IS_INSIDE
856         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
857                                           sw_if_index=self.pg2.sw_if_index)
858
859         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
860         self.pg2.add_stream(pkts)
861         self.pg_enable_capture(self.pg_interfaces)
862         self.pg_start()
863         capture = self.pg1.get_capture(len(pkts))
864         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
865                                 dst_ip=self.pg1.remote_ip4)
866
867         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
868         self.pg1.add_stream(pkts)
869         self.pg_enable_capture(self.pg_interfaces)
870         self.pg_start()
871         capture = self.pg2.get_capture(len(pkts))
872         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
873
874     def test_static(self):
875         """ NAT64 static translation test """
876         self.tcp_port_in = 60303
877         self.udp_port_in = 60304
878         self.icmp_id_in = 60305
879         self.tcp_port_out = 60303
880         self.udp_port_out = 60304
881         self.icmp_id_out = 60305
882
883         ses_num_start = self.nat64_get_ses_num()
884
885         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
886                                                 end_addr=self.nat_addr,
887                                                 vrf_id=0xFFFFFFFF,
888                                                 is_add=1)
889         flags = self.config_flags.NAT_IS_INSIDE
890         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
891                                           sw_if_index=self.pg0.sw_if_index)
892         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
893                                           sw_if_index=self.pg1.sw_if_index)
894
895         self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
896                                            o_addr=self.nat_addr,
897                                            i_port=self.tcp_port_in,
898                                            o_port=self.tcp_port_out,
899                                            proto=IP_PROTOS.tcp, vrf_id=0,
900                                            is_add=1)
901         self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
902                                            o_addr=self.nat_addr,
903                                            i_port=self.udp_port_in,
904                                            o_port=self.udp_port_out,
905                                            proto=IP_PROTOS.udp, vrf_id=0,
906                                            is_add=1)
907         self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
908                                            o_addr=self.nat_addr,
909                                            i_port=self.icmp_id_in,
910                                            o_port=self.icmp_id_out,
911                                            proto=IP_PROTOS.icmp, vrf_id=0,
912                                            is_add=1)
913
914         # in2out
915         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
916         self.pg0.add_stream(pkts)
917         self.pg_enable_capture(self.pg_interfaces)
918         self.pg_start()
919         capture = self.pg1.get_capture(len(pkts))
920         self.verify_capture_out(capture, nat_ip=self.nat_addr,
921                                 dst_ip=self.pg1.remote_ip4, same_port=True)
922
923         # out2in
924         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
925         self.pg1.add_stream(pkts)
926         self.pg_enable_capture(self.pg_interfaces)
927         self.pg_start()
928         capture = self.pg0.get_capture(len(pkts))
929         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
930         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
931
932         ses_num_end = self.nat64_get_ses_num()
933
934         self.assertEqual(ses_num_end - ses_num_start, 3)
935
936     def test_session_timeout(self):
937         """ NAT64 session timeout """
938         self.icmp_id_in = 1234
939         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
940                                                 end_addr=self.nat_addr,
941                                                 vrf_id=0xFFFFFFFF,
942                                                 is_add=1)
943         flags = self.config_flags.NAT_IS_INSIDE
944         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
945                                           sw_if_index=self.pg0.sw_if_index)
946         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
947                                           sw_if_index=self.pg1.sw_if_index)
948         self.vapi.nat64_set_timeouts(udp=300, tcp_established=5,
949                                      tcp_transitory=5,
950                                      icmp=5)
951
952         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
953         self.pg0.add_stream(pkts)
954         self.pg_enable_capture(self.pg_interfaces)
955         self.pg_start()
956         capture = self.pg1.get_capture(len(pkts))
957
958         ses_num_before_timeout = self.nat64_get_ses_num()
959
960         self.virtual_sleep(15)
961
962         # ICMP and TCP session after timeout
963         ses_num_after_timeout = self.nat64_get_ses_num()
964         self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
965
966     def test_icmp_error(self):
967         """ NAT64 ICMP Error message translation """
968         self.tcp_port_in = 6303
969         self.udp_port_in = 6304
970         self.icmp_id_in = 6305
971
972         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
973                                                 end_addr=self.nat_addr,
974                                                 vrf_id=0xFFFFFFFF,
975                                                 is_add=1)
976         flags = self.config_flags.NAT_IS_INSIDE
977         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
978                                           sw_if_index=self.pg0.sw_if_index)
979         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
980                                           sw_if_index=self.pg1.sw_if_index)
981
982         # send some packets to create sessions
983         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
984         self.pg0.add_stream(pkts)
985         self.pg_enable_capture(self.pg_interfaces)
986         self.pg_start()
987         capture_ip4 = self.pg1.get_capture(len(pkts))
988         self.verify_capture_out(capture_ip4,
989                                 nat_ip=self.nat_addr,
990                                 dst_ip=self.pg1.remote_ip4)
991
992         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
993         self.pg1.add_stream(pkts)
994         self.pg_enable_capture(self.pg_interfaces)
995         self.pg_start()
996         capture_ip6 = self.pg0.get_capture(len(pkts))
997         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
998         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
999                                    self.pg0.remote_ip6)
1000
1001         # in2out
1002         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1003                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
1004                 ICMPv6DestUnreach(code=1) /
1005                 packet[IPv6] for packet in capture_ip6]
1006         self.pg0.add_stream(pkts)
1007         self.pg_enable_capture(self.pg_interfaces)
1008         self.pg_start()
1009         capture = self.pg1.get_capture(len(pkts))
1010         for packet in capture:
1011             try:
1012                 self.assertEqual(packet[IP].src, self.nat_addr)
1013                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1014                 self.assertEqual(packet[ICMP].type, 3)
1015                 self.assertEqual(packet[ICMP].code, 13)
1016                 inner = packet[IPerror]
1017                 self.assertEqual(inner.src, self.pg1.remote_ip4)
1018                 self.assertEqual(inner.dst, self.nat_addr)
1019                 self.assert_packet_checksums_valid(packet)
1020                 if inner.haslayer(TCPerror):
1021                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
1022                 elif inner.haslayer(UDPerror):
1023                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
1024                 else:
1025                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
1026             except:
1027                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1028                 raise
1029
1030         # out2in
1031         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1032                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1033                 ICMP(type=3, code=13) /
1034                 packet[IP] for packet in capture_ip4]
1035         self.pg1.add_stream(pkts)
1036         self.pg_enable_capture(self.pg_interfaces)
1037         self.pg_start()
1038         capture = self.pg0.get_capture(len(pkts))
1039         for packet in capture:
1040             try:
1041                 self.assertEqual(packet[IPv6].src, ip.src)
1042                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1043                 icmp = packet[ICMPv6DestUnreach]
1044                 self.assertEqual(icmp.code, 1)
1045                 inner = icmp[IPerror6]
1046                 self.assertEqual(inner.src, self.pg0.remote_ip6)
1047                 self.assertEqual(inner.dst, ip.src)
1048                 self.assert_icmpv6_checksum_valid(packet)
1049                 if inner.haslayer(TCPerror):
1050                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
1051                 elif inner.haslayer(UDPerror):
1052                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
1053                 else:
1054                     self.assertEqual(inner[ICMPv6EchoRequest].id,
1055                                      self.icmp_id_in)
1056             except:
1057                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1058                 raise
1059
1060     def test_hairpinning(self):
1061         """ NAT64 hairpinning """
1062
1063         client = self.pg0.remote_hosts[0]
1064         server = self.pg0.remote_hosts[1]
1065         server_tcp_in_port = 22
1066         server_tcp_out_port = 4022
1067         server_udp_in_port = 23
1068         server_udp_out_port = 4023
1069         client_tcp_in_port = 1234
1070         client_udp_in_port = 1235
1071         client_tcp_out_port = 0
1072         client_udp_out_port = 0
1073         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
1074         nat_addr_ip6 = ip.src
1075
1076         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1077                                                 end_addr=self.nat_addr,
1078                                                 vrf_id=0xFFFFFFFF,
1079                                                 is_add=1)
1080         flags = self.config_flags.NAT_IS_INSIDE
1081         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1082                                           sw_if_index=self.pg0.sw_if_index)
1083         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1084                                           sw_if_index=self.pg1.sw_if_index)
1085
1086         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1087                                            o_addr=self.nat_addr,
1088                                            i_port=server_tcp_in_port,
1089                                            o_port=server_tcp_out_port,
1090                                            proto=IP_PROTOS.tcp, vrf_id=0,
1091                                            is_add=1)
1092         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1093                                            o_addr=self.nat_addr,
1094                                            i_port=server_udp_in_port,
1095                                            o_port=server_udp_out_port,
1096                                            proto=IP_PROTOS.udp, vrf_id=0,
1097                                            is_add=1)
1098
1099         # client to server
1100         pkts = []
1101         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1102              IPv6(src=client.ip6, dst=nat_addr_ip6) /
1103              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
1104         pkts.append(p)
1105         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1106              IPv6(src=client.ip6, dst=nat_addr_ip6) /
1107              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
1108         pkts.append(p)
1109         self.pg0.add_stream(pkts)
1110         self.pg_enable_capture(self.pg_interfaces)
1111         self.pg_start()
1112         capture = self.pg0.get_capture(len(pkts))
1113         for packet in capture:
1114             try:
1115                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1116                 self.assertEqual(packet[IPv6].dst, server.ip6)
1117                 self.assert_packet_checksums_valid(packet)
1118                 if packet.haslayer(TCP):
1119                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
1120                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
1121                     client_tcp_out_port = packet[TCP].sport
1122                 else:
1123                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
1124                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
1125                     client_udp_out_port = packet[UDP].sport
1126             except:
1127                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1128                 raise
1129
1130         # server to client
1131         pkts = []
1132         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1133              IPv6(src=server.ip6, dst=nat_addr_ip6) /
1134              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
1135         pkts.append(p)
1136         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1137              IPv6(src=server.ip6, dst=nat_addr_ip6) /
1138              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
1139         pkts.append(p)
1140         self.pg0.add_stream(pkts)
1141         self.pg_enable_capture(self.pg_interfaces)
1142         self.pg_start()
1143         capture = self.pg0.get_capture(len(pkts))
1144         for packet in capture:
1145             try:
1146                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1147                 self.assertEqual(packet[IPv6].dst, client.ip6)
1148                 self.assert_packet_checksums_valid(packet)
1149                 if packet.haslayer(TCP):
1150                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
1151                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
1152                 else:
1153                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
1154                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
1155             except:
1156                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1157                 raise
1158
1159         # ICMP error
1160         pkts = []
1161         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1162                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
1163                 ICMPv6DestUnreach(code=1) /
1164                 packet[IPv6] for packet in capture]
1165         self.pg0.add_stream(pkts)
1166         self.pg_enable_capture(self.pg_interfaces)
1167         self.pg_start()
1168         capture = self.pg0.get_capture(len(pkts))
1169         for packet in capture:
1170             try:
1171                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1172                 self.assertEqual(packet[IPv6].dst, server.ip6)
1173                 icmp = packet[ICMPv6DestUnreach]
1174                 self.assertEqual(icmp.code, 1)
1175                 inner = icmp[IPerror6]
1176                 self.assertEqual(inner.src, server.ip6)
1177                 self.assertEqual(inner.dst, nat_addr_ip6)
1178                 self.assert_packet_checksums_valid(packet)
1179                 if inner.haslayer(TCPerror):
1180                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
1181                     self.assertEqual(inner[TCPerror].dport,
1182                                      client_tcp_out_port)
1183                 else:
1184                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
1185                     self.assertEqual(inner[UDPerror].dport,
1186                                      client_udp_out_port)
1187             except:
1188                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1189                 raise
1190
1191     def test_prefix(self):
1192         """ NAT64 Network-Specific Prefix """
1193
1194         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1195                                                 end_addr=self.nat_addr,
1196                                                 vrf_id=0xFFFFFFFF,
1197                                                 is_add=1)
1198         flags = self.config_flags.NAT_IS_INSIDE
1199         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1200                                           sw_if_index=self.pg0.sw_if_index)
1201         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1202                                           sw_if_index=self.pg1.sw_if_index)
1203         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.vrf1_nat_addr,
1204                                                 end_addr=self.vrf1_nat_addr,
1205                                                 vrf_id=self.vrf1_id, is_add=1)
1206         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1207                                           sw_if_index=self.pg2.sw_if_index)
1208
1209         # Add global prefix
1210         global_pref64 = "2001:db8::"
1211         global_pref64_len = 32
1212         global_pref64_str = "{}/{}".format(global_pref64, global_pref64_len)
1213         self.vapi.nat64_add_del_prefix(prefix=global_pref64_str, vrf_id=0,
1214                                        is_add=1)
1215
1216         prefix = self.vapi.nat64_prefix_dump()
1217         self.assertEqual(len(prefix), 1)
1218         self.assertEqual(str(prefix[0].prefix), global_pref64_str)
1219         self.assertEqual(prefix[0].vrf_id, 0)
1220
1221         # Add tenant specific prefix
1222         vrf1_pref64 = "2001:db8:122:300::"
1223         vrf1_pref64_len = 56
1224         vrf1_pref64_str = "{}/{}".format(vrf1_pref64, vrf1_pref64_len)
1225         self.vapi.nat64_add_del_prefix(prefix=vrf1_pref64_str,
1226                                        vrf_id=self.vrf1_id, is_add=1)
1227
1228         prefix = self.vapi.nat64_prefix_dump()
1229         self.assertEqual(len(prefix), 2)
1230
1231         # Global prefix
1232         pkts = self.create_stream_in_ip6(self.pg0,
1233                                          self.pg1,
1234                                          pref=global_pref64,
1235                                          plen=global_pref64_len)
1236         self.pg0.add_stream(pkts)
1237         self.pg_enable_capture(self.pg_interfaces)
1238         self.pg_start()
1239         capture = self.pg1.get_capture(len(pkts))
1240         self.verify_capture_out(capture, nat_ip=self.nat_addr,
1241                                 dst_ip=self.pg1.remote_ip4)
1242
1243         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1244         self.pg1.add_stream(pkts)
1245         self.pg_enable_capture(self.pg_interfaces)
1246         self.pg_start()
1247         capture = self.pg0.get_capture(len(pkts))
1248         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
1249                                   global_pref64,
1250                                   global_pref64_len)
1251         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
1252
1253         # Tenant specific prefix
1254         pkts = self.create_stream_in_ip6(self.pg2,
1255                                          self.pg1,
1256                                          pref=vrf1_pref64,
1257                                          plen=vrf1_pref64_len)
1258         self.pg2.add_stream(pkts)
1259         self.pg_enable_capture(self.pg_interfaces)
1260         self.pg_start()
1261         capture = self.pg1.get_capture(len(pkts))
1262         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
1263                                 dst_ip=self.pg1.remote_ip4)
1264
1265         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
1266         self.pg1.add_stream(pkts)
1267         self.pg_enable_capture(self.pg_interfaces)
1268         self.pg_start()
1269         capture = self.pg2.get_capture(len(pkts))
1270         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
1271                                   vrf1_pref64,
1272                                   vrf1_pref64_len)
1273         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
1274
1275     def test_unknown_proto(self):
1276         """ NAT64 translate packet with unknown protocol """
1277
1278         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1279                                                 end_addr=self.nat_addr,
1280                                                 vrf_id=0xFFFFFFFF,
1281                                                 is_add=1)
1282         flags = self.config_flags.NAT_IS_INSIDE
1283         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1284                                           sw_if_index=self.pg0.sw_if_index)
1285         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1286                                           sw_if_index=self.pg1.sw_if_index)
1287         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1288
1289         # in2out
1290         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1291              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
1292              TCP(sport=self.tcp_port_in, dport=20))
1293         self.pg0.add_stream(p)
1294         self.pg_enable_capture(self.pg_interfaces)
1295         self.pg_start()
1296         p = self.pg1.get_capture(1)
1297
1298         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1299              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
1300              GRE() /
1301              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
1302              TCP(sport=1234, dport=1234))
1303         self.pg0.add_stream(p)
1304         self.pg_enable_capture(self.pg_interfaces)
1305         self.pg_start()
1306         p = self.pg1.get_capture(1)
1307         packet = p[0]
1308         try:
1309             self.assertEqual(packet[IP].src, self.nat_addr)
1310             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1311             self.assertEqual(packet.haslayer(GRE), 1)
1312             self.assert_packet_checksums_valid(packet)
1313         except:
1314             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1315             raise
1316
1317         # out2in
1318         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1319              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1320              GRE() /
1321              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
1322              TCP(sport=1234, dport=1234))
1323         self.pg1.add_stream(p)
1324         self.pg_enable_capture(self.pg_interfaces)
1325         self.pg_start()
1326         p = self.pg0.get_capture(1)
1327         packet = p[0]
1328         try:
1329             self.assertEqual(packet[IPv6].src, remote_ip6)
1330             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1331             self.assertEqual(packet[IPv6].nh, 47)
1332         except:
1333             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1334             raise
1335
1336     def test_hairpinning_unknown_proto(self):
1337         """ NAT64 translate packet with unknown protocol - hairpinning """
1338
1339         client = self.pg0.remote_hosts[0]
1340         server = self.pg0.remote_hosts[1]
1341         server_tcp_in_port = 22
1342         server_tcp_out_port = 4022
1343         client_tcp_in_port = 1234
1344         client_tcp_out_port = 1235
1345         server_nat_ip = "10.0.0.100"
1346         client_nat_ip = "10.0.0.110"
1347         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
1348         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
1349
1350         self.vapi.nat64_add_del_pool_addr_range(start_addr=server_nat_ip,
1351                                                 end_addr=client_nat_ip,
1352                                                 vrf_id=0xFFFFFFFF,
1353                                                 is_add=1)
1354         flags = self.config_flags.NAT_IS_INSIDE
1355         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1356                                           sw_if_index=self.pg0.sw_if_index)
1357         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1358                                           sw_if_index=self.pg1.sw_if_index)
1359
1360         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1361                                            o_addr=server_nat_ip,
1362                                            i_port=server_tcp_in_port,
1363                                            o_port=server_tcp_out_port,
1364                                            proto=IP_PROTOS.tcp, vrf_id=0,
1365                                            is_add=1)
1366
1367         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1368                                            o_addr=server_nat_ip, i_port=0,
1369                                            o_port=0,
1370                                            proto=IP_PROTOS.gre, vrf_id=0,
1371                                            is_add=1)
1372
1373         self.vapi.nat64_add_del_static_bib(i_addr=client.ip6n,
1374                                            o_addr=client_nat_ip,
1375                                            i_port=client_tcp_in_port,
1376                                            o_port=client_tcp_out_port,
1377                                            proto=IP_PROTOS.tcp, vrf_id=0,
1378                                            is_add=1)
1379
1380         # client to server
1381         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1382              IPv6(src=client.ip6, dst=server_nat_ip6) /
1383              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
1384         self.pg0.add_stream(p)
1385         self.pg_enable_capture(self.pg_interfaces)
1386         self.pg_start()
1387         p = self.pg0.get_capture(1)
1388
1389         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1390              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
1391              GRE() /
1392              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
1393              TCP(sport=1234, dport=1234))
1394         self.pg0.add_stream(p)
1395         self.pg_enable_capture(self.pg_interfaces)
1396         self.pg_start()
1397         p = self.pg0.get_capture(1)
1398         packet = p[0]
1399         try:
1400             self.assertEqual(packet[IPv6].src, client_nat_ip6)
1401             self.assertEqual(packet[IPv6].dst, server.ip6)
1402             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1403         except:
1404             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1405             raise
1406
1407         # server to client
1408         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1409              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
1410              GRE() /
1411              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
1412              TCP(sport=1234, dport=1234))
1413         self.pg0.add_stream(p)
1414         self.pg_enable_capture(self.pg_interfaces)
1415         self.pg_start()
1416         p = self.pg0.get_capture(1)
1417         packet = p[0]
1418         try:
1419             self.assertEqual(packet[IPv6].src, server_nat_ip6)
1420             self.assertEqual(packet[IPv6].dst, client.ip6)
1421             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1422         except:
1423             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1424             raise
1425
1426     def test_one_armed_nat64(self):
1427         """ One armed NAT64 """
1428         external_port = 0
1429         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
1430                                            '64:ff9b::',
1431                                            96)
1432
1433         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1434                                                 end_addr=self.nat_addr,
1435                                                 vrf_id=0xFFFFFFFF,
1436                                                 is_add=1)
1437         flags = self.config_flags.NAT_IS_INSIDE
1438         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1439                                           sw_if_index=self.pg3.sw_if_index)
1440         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1441                                           sw_if_index=self.pg3.sw_if_index)
1442
1443         # in2out
1444         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
1445              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
1446              TCP(sport=12345, dport=80))
1447         self.pg3.add_stream(p)
1448         self.pg_enable_capture(self.pg_interfaces)
1449         self.pg_start()
1450         capture = self.pg3.get_capture(1)
1451         p = capture[0]
1452         try:
1453             ip = p[IP]
1454             tcp = p[TCP]
1455             self.assertEqual(ip.src, self.nat_addr)
1456             self.assertEqual(ip.dst, self.pg3.remote_ip4)
1457             self.assertNotEqual(tcp.sport, 12345)
1458             external_port = tcp.sport
1459             self.assertEqual(tcp.dport, 80)
1460             self.assert_packet_checksums_valid(p)
1461         except:
1462             self.logger.error(ppp("Unexpected or invalid packet:", p))
1463             raise
1464
1465         # out2in
1466         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
1467              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
1468              TCP(sport=80, dport=external_port))
1469         self.pg3.add_stream(p)
1470         self.pg_enable_capture(self.pg_interfaces)
1471         self.pg_start()
1472         capture = self.pg3.get_capture(1)
1473         p = capture[0]
1474         try:
1475             ip = p[IPv6]
1476             tcp = p[TCP]
1477             self.assertEqual(ip.src, remote_host_ip6)
1478             self.assertEqual(ip.dst, self.pg3.remote_ip6)
1479             self.assertEqual(tcp.sport, 80)
1480             self.assertEqual(tcp.dport, 12345)
1481             self.assert_packet_checksums_valid(p)
1482         except:
1483             self.logger.error(ppp("Unexpected or invalid packet:", p))
1484             raise
1485
1486     def test_frag_in_order(self):
1487         """ NAT64 translate fragments arriving in order """
1488         self.tcp_port_in = random.randint(1025, 65535)
1489
1490         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1491                                                 end_addr=self.nat_addr,
1492                                                 vrf_id=0xFFFFFFFF,
1493                                                 is_add=1)
1494         flags = self.config_flags.NAT_IS_INSIDE
1495         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1496                                           sw_if_index=self.pg0.sw_if_index)
1497         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1498                                           sw_if_index=self.pg1.sw_if_index)
1499
1500         # in2out
1501         data = b'a' * 200
1502         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
1503                                            self.tcp_port_in, 20, data)
1504         self.pg0.add_stream(pkts)
1505         self.pg_enable_capture(self.pg_interfaces)
1506         self.pg_start()
1507         frags = self.pg1.get_capture(len(pkts))
1508         p = self.reass_frags_and_verify(frags,
1509                                         self.nat_addr,
1510                                         self.pg1.remote_ip4)
1511         self.assertEqual(p[TCP].dport, 20)
1512         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1513         self.tcp_port_out = p[TCP].sport
1514         self.assertEqual(data, p[Raw].load)
1515
1516         # out2in
1517         data = b"A" * 4 + b"b" * 16 + b"C" * 3
1518         pkts = self.create_stream_frag(self.pg1,
1519                                        self.nat_addr,
1520                                        20,
1521                                        self.tcp_port_out,
1522                                        data)
1523         self.pg1.add_stream(pkts)
1524         self.pg_enable_capture(self.pg_interfaces)
1525         self.pg_start()
1526         frags = self.pg0.get_capture(len(pkts))
1527         self.logger.debug(ppc("Captured:", frags))
1528         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1529         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1530         self.assertEqual(p[TCP].sport, 20)
1531         self.assertEqual(p[TCP].dport, self.tcp_port_in)
1532         self.assertEqual(data, p[Raw].load)
1533
1534     def test_reass_hairpinning(self):
1535         """ NAT64 fragments hairpinning """
1536         data = b'a' * 200
1537         server = self.pg0.remote_hosts[1]
1538         server_in_port = random.randint(1025, 65535)
1539         server_out_port = random.randint(1025, 65535)
1540         client_in_port = random.randint(1025, 65535)
1541         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
1542         nat_addr_ip6 = ip.src
1543
1544         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1545                                                 end_addr=self.nat_addr,
1546                                                 vrf_id=0xFFFFFFFF,
1547                                                 is_add=1)
1548         flags = self.config_flags.NAT_IS_INSIDE
1549         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1550                                           sw_if_index=self.pg0.sw_if_index)
1551         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1552                                           sw_if_index=self.pg1.sw_if_index)
1553
1554         # add static BIB entry for server
1555         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1556                                            o_addr=self.nat_addr,
1557                                            i_port=server_in_port,
1558                                            o_port=server_out_port,
1559                                            proto=IP_PROTOS.tcp, vrf_id=0,
1560                                            is_add=1)
1561
1562         # send packet from host to server
1563         pkts = self.create_stream_frag_ip6(self.pg0,
1564                                            self.nat_addr,
1565                                            client_in_port,
1566                                            server_out_port,
1567                                            data)
1568         self.pg0.add_stream(pkts)
1569         self.pg_enable_capture(self.pg_interfaces)
1570         self.pg_start()
1571         frags = self.pg0.get_capture(len(pkts))
1572         self.logger.debug(ppc("Captured:", frags))
1573         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
1574         self.assertNotEqual(p[TCP].sport, client_in_port)
1575         self.assertEqual(p[TCP].dport, server_in_port)
1576         self.assertEqual(data, p[Raw].load)
1577
1578     def test_frag_out_of_order(self):
1579         """ NAT64 translate fragments arriving out of order """
1580         self.tcp_port_in = random.randint(1025, 65535)
1581
1582         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1583                                                 end_addr=self.nat_addr,
1584                                                 vrf_id=0xFFFFFFFF,
1585                                                 is_add=1)
1586         flags = self.config_flags.NAT_IS_INSIDE
1587         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1588                                           sw_if_index=self.pg0.sw_if_index)
1589         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1590                                           sw_if_index=self.pg1.sw_if_index)
1591
1592         # in2out
1593         data = b'a' * 200
1594         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
1595                                            self.tcp_port_in, 20, data)
1596         pkts.reverse()
1597         self.pg0.add_stream(pkts)
1598         self.pg_enable_capture(self.pg_interfaces)
1599         self.pg_start()
1600         frags = self.pg1.get_capture(len(pkts))
1601         p = self.reass_frags_and_verify(frags,
1602                                         self.nat_addr,
1603                                         self.pg1.remote_ip4)
1604         self.assertEqual(p[TCP].dport, 20)
1605         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1606         self.tcp_port_out = p[TCP].sport
1607         self.assertEqual(data, p[Raw].load)
1608
1609         # out2in
1610         data = b"A" * 4 + b"B" * 16 + b"C" * 3
1611         pkts = self.create_stream_frag(self.pg1,
1612                                        self.nat_addr,
1613                                        20,
1614                                        self.tcp_port_out,
1615                                        data)
1616         pkts.reverse()
1617         self.pg1.add_stream(pkts)
1618         self.pg_enable_capture(self.pg_interfaces)
1619         self.pg_start()
1620         frags = self.pg0.get_capture(len(pkts))
1621         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1622         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1623         self.assertEqual(p[TCP].sport, 20)
1624         self.assertEqual(p[TCP].dport, self.tcp_port_in)
1625         self.assertEqual(data, p[Raw].load)
1626
1627     def test_interface_addr(self):
1628         """ Acquire NAT64 pool addresses from interface """
1629         self.vapi.nat64_add_del_interface_addr(
1630             is_add=1,
1631             sw_if_index=self.pg4.sw_if_index)
1632
1633         # no address in NAT64 pool
1634         addresses = self.vapi.nat44_address_dump()
1635         self.assertEqual(0, len(addresses))
1636
1637         # configure interface address and check NAT64 address pool
1638         self.pg4.config_ip4()
1639         addresses = self.vapi.nat64_pool_addr_dump()
1640         self.assertEqual(len(addresses), 1)
1641
1642         self.assertEqual(str(addresses[0].address),
1643                          self.pg4.local_ip4)
1644
1645         # remove interface address and check NAT64 address pool
1646         self.pg4.unconfig_ip4()
1647         addresses = self.vapi.nat64_pool_addr_dump()
1648         self.assertEqual(0, len(addresses))
1649
1650     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1651     def test_ipfix_max_bibs_sessions(self):
1652         """ IPFIX logging maximum session and BIB entries exceeded """
1653         max_bibs = 1280
1654         max_sessions = 2560
1655         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1656                                            '64:ff9b::',
1657                                            96)
1658
1659         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1660                                                 end_addr=self.nat_addr,
1661                                                 vrf_id=0xFFFFFFFF,
1662                                                 is_add=1)
1663         flags = self.config_flags.NAT_IS_INSIDE
1664         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1665                                           sw_if_index=self.pg0.sw_if_index)
1666         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1667                                           sw_if_index=self.pg1.sw_if_index)
1668
1669         pkts = []
1670         src = ""
1671         for i in range(0, max_bibs):
1672             src = "fd01:aa::%x" % (i)
1673             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1674                  IPv6(src=src, dst=remote_host_ip6) /
1675                  TCP(sport=12345, dport=80))
1676             pkts.append(p)
1677             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1678                  IPv6(src=src, dst=remote_host_ip6) /
1679                  TCP(sport=12345, dport=22))
1680             pkts.append(p)
1681         self.pg0.add_stream(pkts)
1682         self.pg_enable_capture(self.pg_interfaces)
1683         self.pg_start()
1684         self.pg1.get_capture(max_sessions)
1685
1686         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
1687                                      src_address=self.pg3.local_ip4,
1688                                      path_mtu=512,
1689                                      template_interval=10)
1690         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1691                                            src_port=self.ipfix_src_port,
1692                                            enable=1)
1693
1694         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1695              IPv6(src=src, dst=remote_host_ip6) /
1696              TCP(sport=12345, dport=25))
1697         self.pg0.add_stream(p)
1698         self.pg_enable_capture(self.pg_interfaces)
1699         self.pg_start()
1700         self.pg1.assert_nothing_captured()
1701         self.vapi.ipfix_flush()
1702         capture = self.pg3.get_capture(7)
1703         ipfix = IPFIXDecoder()
1704         # first load template
1705         for p in capture:
1706             self.assertTrue(p.haslayer(IPFIX))
1707             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1708             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1709             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1710             self.assertEqual(p[UDP].dport, 4739)
1711             self.assertEqual(p[IPFIX].observationDomainID,
1712                              self.ipfix_domain_id)
1713             if p.haslayer(Template):
1714                 ipfix.add_template(p.getlayer(Template))
1715         # verify events in data set
1716         for p in capture:
1717             if p.haslayer(Data):
1718                 data = ipfix.decode_data_set(p.getlayer(Set))
1719                 self.verify_ipfix_max_sessions(data, max_sessions)
1720
1721         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1722              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1723              TCP(sport=12345, dport=80))
1724         self.pg0.add_stream(p)
1725         self.pg_enable_capture(self.pg_interfaces)
1726         self.pg_start()
1727         self.pg1.assert_nothing_captured()
1728         self.vapi.ipfix_flush()
1729         capture = self.pg3.get_capture(1)
1730         # verify events in data set
1731         for p in capture:
1732             self.assertTrue(p.haslayer(IPFIX))
1733             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1734             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1735             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1736             self.assertEqual(p[UDP].dport, 4739)
1737             self.assertEqual(p[IPFIX].observationDomainID,
1738                              self.ipfix_domain_id)
1739             if p.haslayer(Data):
1740                 data = ipfix.decode_data_set(p.getlayer(Set))
1741                 self.verify_ipfix_max_bibs(data, max_bibs)
1742
1743     def test_ipfix_bib_ses(self):
1744         """ IPFIX logging NAT64 BIB/session create and delete events """
1745         self.tcp_port_in = random.randint(1025, 65535)
1746         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1747                                            '64:ff9b::',
1748                                            96)
1749
1750         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1751                                                 end_addr=self.nat_addr,
1752                                                 vrf_id=0xFFFFFFFF,
1753                                                 is_add=1)
1754         flags = self.config_flags.NAT_IS_INSIDE
1755         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1756                                           sw_if_index=self.pg0.sw_if_index)
1757         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1758                                           sw_if_index=self.pg1.sw_if_index)
1759         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
1760                                      src_address=self.pg3.local_ip4,
1761                                      path_mtu=512,
1762                                      template_interval=10)
1763         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1764                                            src_port=self.ipfix_src_port,
1765                                            enable=1)
1766
1767         # Create
1768         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1769              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1770              TCP(sport=self.tcp_port_in, dport=25))
1771         self.pg0.add_stream(p)
1772         self.pg_enable_capture(self.pg_interfaces)
1773         self.pg_start()
1774         p = self.pg1.get_capture(1)
1775         self.tcp_port_out = p[0][TCP].sport
1776         self.vapi.ipfix_flush()
1777         capture = self.pg3.get_capture(8)
1778         ipfix = IPFIXDecoder()
1779         # first load template
1780         for p in capture:
1781             self.assertTrue(p.haslayer(IPFIX))
1782             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1783             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1784             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1785             self.assertEqual(p[UDP].dport, 4739)
1786             self.assertEqual(p[IPFIX].observationDomainID,
1787                              self.ipfix_domain_id)
1788             if p.haslayer(Template):
1789                 ipfix.add_template(p.getlayer(Template))
1790         # verify events in data set
1791         for p in capture:
1792             if p.haslayer(Data):
1793                 data = ipfix.decode_data_set(p.getlayer(Set))
1794                 if scapy.compat.orb(data[0][230]) == 10:
1795                     self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6)
1796                 elif scapy.compat.orb(data[0][230]) == 6:
1797                     self.verify_ipfix_nat64_ses(data,
1798                                                 1,
1799                                                 self.pg0.remote_ip6,
1800                                                 self.pg1.remote_ip4,
1801                                                 25)
1802                 else:
1803                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
1804
1805         # Delete
1806         self.pg_enable_capture(self.pg_interfaces)
1807         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1808                                                 end_addr=self.nat_addr,
1809                                                 vrf_id=0xFFFFFFFF,
1810                                                 is_add=0)
1811         self.vapi.ipfix_flush()
1812         capture = self.pg3.get_capture(2)
1813         # verify events in data set
1814         for p in capture:
1815             self.assertTrue(p.haslayer(IPFIX))
1816             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1817             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1818             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1819             self.assertEqual(p[UDP].dport, 4739)
1820             self.assertEqual(p[IPFIX].observationDomainID,
1821                              self.ipfix_domain_id)
1822             if p.haslayer(Data):
1823                 data = ipfix.decode_data_set(p.getlayer(Set))
1824                 if scapy.compat.orb(data[0][230]) == 11:
1825                     self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6)
1826                 elif scapy.compat.orb(data[0][230]) == 7:
1827                     self.verify_ipfix_nat64_ses(data,
1828                                                 0,
1829                                                 self.pg0.remote_ip6,
1830                                                 self.pg1.remote_ip4,
1831                                                 25)
1832                 else:
1833                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
1834
1835     def test_syslog_sess(self):
1836         """ Test syslog session creation and deletion """
1837         self.tcp_port_in = random.randint(1025, 65535)
1838         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1839                                            '64:ff9b::',
1840                                            96)
1841
1842         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1843                                                 end_addr=self.nat_addr,
1844                                                 vrf_id=0xFFFFFFFF,
1845                                                 is_add=1)
1846         flags = self.config_flags.NAT_IS_INSIDE
1847         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1848                                           sw_if_index=self.pg0.sw_if_index)
1849         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1850                                           sw_if_index=self.pg1.sw_if_index)
1851         self.vapi.syslog_set_filter(
1852             self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
1853         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
1854
1855         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1856              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1857              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1858         self.pg0.add_stream(p)
1859         self.pg_enable_capture(self.pg_interfaces)
1860         self.pg_start()
1861         p = self.pg1.get_capture(1)
1862         self.tcp_port_out = p[0][TCP].sport
1863         capture = self.pg3.get_capture(1)
1864         self.verify_syslog_sess(capture[0][Raw].load, is_ip6=True)
1865
1866         self.pg_enable_capture(self.pg_interfaces)
1867         self.pg_start()
1868         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1869                                                 end_addr=self.nat_addr,
1870                                                 vrf_id=0xFFFFFFFF,
1871                                                 is_add=0)
1872         capture = self.pg3.get_capture(1)
1873         self.verify_syslog_sess(capture[0][Raw].load, False, True)
1874
1875     def nat64_get_ses_num(self):
1876         """
1877         Return number of active NAT64 sessions.
1878         """
1879         st = self.vapi.nat64_st_dump(proto=255)
1880         return len(st)
1881
1882     def clear_nat64(self):
1883         """
1884         Clear NAT64 configuration.
1885         """
1886         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1887                                            src_port=self.ipfix_src_port,
1888                                            enable=0)
1889         self.ipfix_src_port = 4739
1890         self.ipfix_domain_id = 1
1891
1892         self.vapi.syslog_set_filter(
1893             self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
1894
1895         self.vapi.nat64_set_timeouts(udp=300, tcp_established=7440,
1896                                      tcp_transitory=240, icmp=60)
1897
1898         interfaces = self.vapi.nat64_interface_dump()
1899         for intf in interfaces:
1900             self.vapi.nat64_add_del_interface(is_add=0, flags=intf.flags,
1901                                               sw_if_index=intf.sw_if_index)
1902
1903         bib = self.vapi.nat64_bib_dump(proto=255)
1904         for bibe in bib:
1905             if bibe.flags & self.config_flags.NAT_IS_STATIC:
1906                 self.vapi.nat64_add_del_static_bib(i_addr=bibe.i_addr,
1907                                                    o_addr=bibe.o_addr,
1908                                                    i_port=bibe.i_port,
1909                                                    o_port=bibe.o_port,
1910                                                    proto=bibe.proto,
1911                                                    vrf_id=bibe.vrf_id,
1912                                                    is_add=0)
1913
1914         adresses = self.vapi.nat64_pool_addr_dump()
1915         for addr in adresses:
1916             self.vapi.nat64_add_del_pool_addr_range(start_addr=addr.address,
1917                                                     end_addr=addr.address,
1918                                                     vrf_id=addr.vrf_id,
1919                                                     is_add=0)
1920
1921         prefixes = self.vapi.nat64_prefix_dump()
1922         for prefix in prefixes:
1923             self.vapi.nat64_add_del_prefix(prefix=str(prefix.prefix),
1924                                            vrf_id=prefix.vrf_id, is_add=0)
1925
1926         bibs = self.statistics.get_counter('/nat64/total-bibs')
1927         self.assertEqual(bibs[0][0], 0)
1928         sessions = self.statistics.get_counter('/nat64/total-sessions')
1929         self.assertEqual(sessions[0][0], 0)
1930
1931
1932 if __name__ == '__main__':
1933     unittest.main(testRunner=VppTestRunner)