tests: tag the tests that do not work with multi-worker configuration
[vpp.git] / src / plugins / nat / test / test_nat64.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import random
5 import socket
6 import struct
7 import unittest
8 from io import BytesIO
9 from time import sleep
10
11 import scapy.compat
12 from framework import tag_fixme_vpp_workers
13 from framework import VppTestCase, VppTestRunner, running_extended_tests
14 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
15 from scapy.data import IP_PROTOS
16 from scapy.layers.inet import IP, TCP, UDP, ICMP
17 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
18 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
19 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
20     ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
21 from scapy.layers.l2 import Ether, GRE
22 from scapy.packet import Raw
23 from syslog_rfc5424_parser import SyslogMessage, ParseError
24 from syslog_rfc5424_parser.constants import SyslogSeverity
25 from util import ppc, ppp
26 from vpp_papi import VppEnum
27
28
29 @tag_fixme_vpp_workers
30 class TestNAT64(VppTestCase):
31     """ NAT64 Test Cases """
32
33     @property
34     def SYSLOG_SEVERITY(self):
35         return VppEnum.vl_api_syslog_severity_t
36
37     @property
38     def config_flags(self):
39         return VppEnum.vl_api_nat_config_flags_t
40
41     @classmethod
42     def setUpClass(cls):
43         super(TestNAT64, cls).setUpClass()
44
45         cls.tcp_port_in = 6303
46         cls.tcp_port_out = 6303
47         cls.udp_port_in = 6304
48         cls.udp_port_out = 6304
49         cls.icmp_id_in = 6305
50         cls.icmp_id_out = 6305
51         cls.tcp_external_port = 80
52         cls.nat_addr = '10.0.0.3'
53         cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
54         cls.vrf1_id = 10
55         cls.vrf1_nat_addr = '10.0.10.3'
56         cls.ipfix_src_port = 4739
57         cls.ipfix_domain_id = 1
58
59         cls.create_pg_interfaces(range(6))
60         cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
61         cls.ip6_interfaces.append(cls.pg_interfaces[2])
62         cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
63
64         cls.vapi.ip_table_add_del(is_add=1,
65                                   table={'table_id': cls.vrf1_id,
66                                          'is_ip6': 1})
67
68         cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
69
70         cls.pg0.generate_remote_hosts(2)
71
72         for i in cls.ip6_interfaces:
73             i.admin_up()
74             i.config_ip6()
75             i.configure_ipv6_neighbors()
76
77         for i in cls.ip4_interfaces:
78             i.admin_up()
79             i.config_ip4()
80             i.resolve_arp()
81
82         cls.pg3.admin_up()
83         cls.pg3.config_ip4()
84         cls.pg3.resolve_arp()
85         cls.pg3.config_ip6()
86         cls.pg3.configure_ipv6_neighbors()
87
88         cls.pg5.admin_up()
89         cls.pg5.config_ip6()
90
91     @classmethod
92     def tearDownClass(cls):
93         super(TestNAT64, cls).tearDownClass()
94
95     def setUp(self):
96         super(TestNAT64, self).setUp()
97         self.vapi.nat64_plugin_enable_disable(enable=1,
98                                               bib_buckets=128, st_buckets=256)
99
100     def tearDown(self):
101         super(TestNAT64, self).tearDown()
102         if not self.vpp_dead:
103             self.vapi.nat64_plugin_enable_disable(enable=0)
104
105     def show_commands_at_teardown(self):
106         self.logger.info(self.vapi.cli("show nat64 pool"))
107         self.logger.info(self.vapi.cli("show nat64 interfaces"))
108         self.logger.info(self.vapi.cli("show nat64 prefix"))
109         self.logger.info(self.vapi.cli("show nat64 bib all"))
110         self.logger.info(self.vapi.cli("show nat64 session table all"))
111
112     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
113         """
114         Create IPv6 packet stream for inside network
115
116         :param in_if: Inside interface
117         :param out_if: Outside interface
118         :param ttl: Hop Limit of generated packets
119         :param pref: NAT64 prefix
120         :param plen: NAT64 prefix length
121         """
122         pkts = []
123         if pref is None:
124             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
125         else:
126             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
127
128         # TCP
129         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
130              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
131              TCP(sport=self.tcp_port_in, dport=20))
132         pkts.append(p)
133
134         # UDP
135         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
136              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
137              UDP(sport=self.udp_port_in, dport=20))
138         pkts.append(p)
139
140         # ICMP
141         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
142              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
143              ICMPv6EchoRequest(id=self.icmp_id_in))
144         pkts.append(p)
145
146         return pkts
147
148     def create_stream_out(self, out_if, dst_ip=None, ttl=64,
149                           use_inside_ports=False):
150         """
151         Create packet stream for outside network
152
153         :param out_if: Outside interface
154         :param dst_ip: Destination IP address (Default use global NAT address)
155         :param ttl: TTL of generated packets
156         :param use_inside_ports: Use inside NAT ports as destination ports
157                instead of outside ports
158         """
159         if dst_ip is None:
160             dst_ip = self.nat_addr
161         if not use_inside_ports:
162             tcp_port = self.tcp_port_out
163             udp_port = self.udp_port_out
164             icmp_id = self.icmp_id_out
165         else:
166             tcp_port = self.tcp_port_in
167             udp_port = self.udp_port_in
168             icmp_id = self.icmp_id_in
169         pkts = []
170         # TCP
171         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
172              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
173              TCP(dport=tcp_port, sport=20))
174         pkts.extend([p, p])
175
176         # UDP
177         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
178              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
179              UDP(dport=udp_port, sport=20))
180         pkts.append(p)
181
182         # ICMP
183         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
184              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
185              ICMP(id=icmp_id, type='echo-reply'))
186         pkts.append(p)
187
188         return pkts
189
190     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
191                            dst_ip=None, is_ip6=False, ignore_port=False):
192         """
193         Verify captured packets on outside network
194
195         :param capture: Captured packets
196         :param nat_ip: Translated IP address (Default use global NAT address)
197         :param same_port: Source port number is not translated (Default False)
198         :param dst_ip: Destination IP address (Default do not verify)
199         :param is_ip6: If L3 protocol is IPv6 (Default False)
200         """
201         if is_ip6:
202             IP46 = IPv6
203             ICMP46 = ICMPv6EchoRequest
204         else:
205             IP46 = IP
206             ICMP46 = ICMP
207         if nat_ip is None:
208             nat_ip = self.nat_addr
209         for packet in capture:
210             try:
211                 if not is_ip6:
212                     self.assert_packet_checksums_valid(packet)
213                 self.assertEqual(packet[IP46].src, nat_ip)
214                 if dst_ip is not None:
215                     self.assertEqual(packet[IP46].dst, dst_ip)
216                 if packet.haslayer(TCP):
217                     if not ignore_port:
218                         if same_port:
219                             self.assertEqual(
220                                 packet[TCP].sport, self.tcp_port_in)
221                         else:
222                             self.assertNotEqual(
223                                 packet[TCP].sport, self.tcp_port_in)
224                     self.tcp_port_out = packet[TCP].sport
225                     self.assert_packet_checksums_valid(packet)
226                 elif packet.haslayer(UDP):
227                     if not ignore_port:
228                         if same_port:
229                             self.assertEqual(
230                                 packet[UDP].sport, self.udp_port_in)
231                         else:
232                             self.assertNotEqual(
233                                 packet[UDP].sport, self.udp_port_in)
234                     self.udp_port_out = packet[UDP].sport
235                 else:
236                     if not ignore_port:
237                         if same_port:
238                             self.assertEqual(
239                                 packet[ICMP46].id, self.icmp_id_in)
240                         else:
241                             self.assertNotEqual(
242                                 packet[ICMP46].id, self.icmp_id_in)
243                     self.icmp_id_out = packet[ICMP46].id
244                     self.assert_packet_checksums_valid(packet)
245             except:
246                 self.logger.error(ppp("Unexpected or invalid packet "
247                                       "(outside network):", packet))
248                 raise
249
250     def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
251         """
252         Verify captured IPv6 packets on inside network
253
254         :param capture: Captured packets
255         :param src_ip: Source IP
256         :param dst_ip: Destination IP address
257         """
258         for packet in capture:
259             try:
260                 self.assertEqual(packet[IPv6].src, src_ip)
261                 self.assertEqual(packet[IPv6].dst, dst_ip)
262                 self.assert_packet_checksums_valid(packet)
263                 if packet.haslayer(TCP):
264                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
265                 elif packet.haslayer(UDP):
266                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
267                 else:
268                     self.assertEqual(packet[ICMPv6EchoReply].id,
269                                      self.icmp_id_in)
270             except:
271                 self.logger.error(ppp("Unexpected or invalid packet "
272                                       "(inside network):", packet))
273                 raise
274
275     def create_stream_frag(self, src_if, dst, sport, dport, data,
276                            proto=IP_PROTOS.tcp, echo_reply=False):
277         """
278         Create fragmented packet stream
279
280         :param src_if: Source interface
281         :param dst: Destination IPv4 address
282         :param sport: Source port
283         :param dport: Destination port
284         :param data: Payload data
285         :param proto: protocol (TCP, UDP, ICMP)
286         :param echo_reply: use echo_reply if protocol is ICMP
287         :returns: Fragments
288         """
289         if proto == IP_PROTOS.tcp:
290             p = (IP(src=src_if.remote_ip4, dst=dst) /
291                  TCP(sport=sport, dport=dport) /
292                  Raw(data))
293             p = p.__class__(scapy.compat.raw(p))
294             chksum = p[TCP].chksum
295             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
296         elif proto == IP_PROTOS.udp:
297             proto_header = UDP(sport=sport, dport=dport)
298         elif proto == IP_PROTOS.icmp:
299             if not echo_reply:
300                 proto_header = ICMP(id=sport, type='echo-request')
301             else:
302                 proto_header = ICMP(id=sport, type='echo-reply')
303         else:
304             raise Exception("Unsupported protocol")
305         id = random.randint(0, 65535)
306         pkts = []
307         if proto == IP_PROTOS.tcp:
308             raw = Raw(data[0:4])
309         else:
310             raw = Raw(data[0:16])
311         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
312              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
313              proto_header /
314              raw)
315         pkts.append(p)
316         if proto == IP_PROTOS.tcp:
317             raw = Raw(data[4:20])
318         else:
319             raw = Raw(data[16:32])
320         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
321              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
322                 proto=proto) /
323              raw)
324         pkts.append(p)
325         if proto == IP_PROTOS.tcp:
326             raw = Raw(data[20:])
327         else:
328             raw = Raw(data[32:])
329         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
330              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
331                 id=id) /
332              raw)
333         pkts.append(p)
334         return pkts
335
336     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
337                                pref=None, plen=0, frag_size=128):
338         """
339         Create fragmented packet stream
340
341         :param src_if: Source interface
342         :param dst: Destination IPv4 address
343         :param sport: Source TCP port
344         :param dport: Destination TCP port
345         :param data: Payload data
346         :param pref: NAT64 prefix
347         :param plen: NAT64 prefix length
348         :param fragsize: size of fragments
349         :returns: Fragments
350         """
351         if pref is None:
352             dst_ip6 = ''.join(['64:ff9b::', dst])
353         else:
354             dst_ip6 = self.compose_ip6(dst, pref, plen)
355
356         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
357              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
358              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
359              TCP(sport=sport, dport=dport) /
360              Raw(data))
361
362         return fragment6(p, frag_size)
363
364     def reass_frags_and_verify(self, frags, src, dst):
365         """
366         Reassemble and verify fragmented packet
367
368         :param frags: Captured fragments
369         :param src: Source IPv4 address to verify
370         :param dst: Destination IPv4 address to verify
371
372         :returns: Reassembled IPv4 packet
373         """
374         buffer = BytesIO()
375         for p in frags:
376             self.assertEqual(p[IP].src, src)
377             self.assertEqual(p[IP].dst, dst)
378             self.assert_ip_checksum_valid(p)
379             buffer.seek(p[IP].frag * 8)
380             buffer.write(bytes(p[IP].payload))
381         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
382                 proto=frags[0][IP].proto)
383         if ip.proto == IP_PROTOS.tcp:
384             p = (ip / TCP(buffer.getvalue()))
385             self.logger.debug(ppp("Reassembled:", p))
386             self.assert_tcp_checksum_valid(p)
387         elif ip.proto == IP_PROTOS.udp:
388             p = (ip / UDP(buffer.getvalue()[:8]) /
389                  Raw(buffer.getvalue()[8:]))
390         elif ip.proto == IP_PROTOS.icmp:
391             p = (ip / ICMP(buffer.getvalue()))
392         return p
393
394     def reass_frags_and_verify_ip6(self, frags, src, dst):
395         """
396         Reassemble and verify fragmented packet
397
398         :param frags: Captured fragments
399         :param src: Source IPv6 address to verify
400         :param dst: Destination IPv6 address to verify
401
402         :returns: Reassembled IPv6 packet
403         """
404         buffer = BytesIO()
405         for p in frags:
406             self.assertEqual(p[IPv6].src, src)
407             self.assertEqual(p[IPv6].dst, dst)
408             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
409             buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
410         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
411                   nh=frags[0][IPv6ExtHdrFragment].nh)
412         if ip.nh == IP_PROTOS.tcp:
413             p = (ip / TCP(buffer.getvalue()))
414         elif ip.nh == IP_PROTOS.udp:
415             p = (ip / UDP(buffer.getvalue()))
416         self.logger.debug(ppp("Reassembled:", p))
417         self.assert_packet_checksums_valid(p)
418         return p
419
420     def verify_ipfix_max_bibs(self, data, limit):
421         """
422         Verify IPFIX maximum BIB entries exceeded event
423
424         :param data: Decoded IPFIX data records
425         :param limit: Number of maximum BIB entries that can be created.
426         """
427         self.assertEqual(1, len(data))
428         record = data[0]
429         # natEvent
430         self.assertEqual(scapy.compat.orb(record[230]), 13)
431         # natQuotaExceededEvent
432         self.assertEqual(struct.pack("I", 2), record[466])
433         # maxBIBEntries
434         self.assertEqual(struct.pack("I", limit), record[472])
435
436     def verify_ipfix_bib(self, data, is_create, src_addr):
437         """
438         Verify IPFIX NAT64 BIB create and delete events
439
440         :param data: Decoded IPFIX data records
441         :param is_create: Create event if nonzero value otherwise delete event
442         :param src_addr: IPv6 source address
443         """
444         self.assertEqual(1, len(data))
445         record = data[0]
446         # natEvent
447         if is_create:
448             self.assertEqual(scapy.compat.orb(record[230]), 10)
449         else:
450             self.assertEqual(scapy.compat.orb(record[230]), 11)
451         # sourceIPv6Address
452         self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
453         # postNATSourceIPv4Address
454         self.assertEqual(self.nat_addr_n, record[225])
455         # protocolIdentifier
456         self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
457         # ingressVRFID
458         self.assertEqual(struct.pack("!I", 0), record[234])
459         # sourceTransportPort
460         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
461         # postNAPTSourceTransportPort
462         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
463
464     def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
465                                dst_port):
466         """
467         Verify IPFIX NAT64 session create and delete events
468
469         :param data: Decoded IPFIX data records
470         :param is_create: Create event if nonzero value otherwise delete event
471         :param src_addr: IPv6 source address
472         :param dst_addr: IPv4 destination address
473         :param dst_port: destination TCP port
474         """
475         self.assertEqual(1, len(data))
476         record = data[0]
477         # natEvent
478         if is_create:
479             self.assertEqual(scapy.compat.orb(record[230]), 6)
480         else:
481             self.assertEqual(scapy.compat.orb(record[230]), 7)
482         # sourceIPv6Address
483         self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
484         # destinationIPv6Address
485         self.assertEqual(socket.inet_pton(socket.AF_INET6,
486                                           self.compose_ip6(dst_addr,
487                                                            '64:ff9b::',
488                                                            96)),
489                          record[28])
490         # postNATSourceIPv4Address
491         self.assertEqual(self.nat_addr_n, record[225])
492         # postNATDestinationIPv4Address
493         self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
494                          record[226])
495         # protocolIdentifier
496         self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
497         # ingressVRFID
498         self.assertEqual(struct.pack("!I", 0), record[234])
499         # sourceTransportPort
500         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
501         # postNAPTSourceTransportPort
502         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
503         # destinationTransportPort
504         self.assertEqual(struct.pack("!H", dst_port), record[11])
505         # postNAPTDestinationTransportPort
506         self.assertEqual(struct.pack("!H", dst_port), record[228])
507
508     def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
509         message = data.decode('utf-8')
510         try:
511             message = SyslogMessage.parse(message)
512         except ParseError as e:
513             self.logger.error(e)
514             raise
515         else:
516             self.assertEqual(message.severity, SyslogSeverity.info)
517             self.assertEqual(message.appname, 'NAT')
518             self.assertEqual(message.msgid, 'SADD' if is_add else 'SDEL')
519             sd_params = message.sd.get('nsess')
520             self.assertTrue(sd_params is not None)
521             if is_ip6:
522                 self.assertEqual(sd_params.get('IATYP'), 'IPv6')
523                 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip6)
524             else:
525                 self.assertEqual(sd_params.get('IATYP'), 'IPv4')
526                 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
527                 self.assertTrue(sd_params.get('SSUBIX') is not None)
528             self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
529             self.assertEqual(sd_params.get('XATYP'), 'IPv4')
530             self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
531             self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
532             self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
533             self.assertEqual(sd_params.get('SVLAN'), '0')
534             self.assertEqual(sd_params.get('XDADDR'), self.pg1.remote_ip4)
535             self.assertEqual(sd_params.get('XDPORT'),
536                              "%d" % self.tcp_external_port)
537
538     def compose_ip6(self, ip4, pref, plen):
539         """
540         Compose IPv4-embedded IPv6 addresses
541
542         :param ip4: IPv4 address
543         :param pref: IPv6 prefix
544         :param plen: IPv6 prefix length
545         :returns: IPv4-embedded IPv6 addresses
546         """
547         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
548         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
549         if plen == 32:
550             pref_n[4] = ip4_n[0]
551             pref_n[5] = ip4_n[1]
552             pref_n[6] = ip4_n[2]
553             pref_n[7] = ip4_n[3]
554         elif plen == 40:
555             pref_n[5] = ip4_n[0]
556             pref_n[6] = ip4_n[1]
557             pref_n[7] = ip4_n[2]
558             pref_n[9] = ip4_n[3]
559         elif plen == 48:
560             pref_n[6] = ip4_n[0]
561             pref_n[7] = ip4_n[1]
562             pref_n[9] = ip4_n[2]
563             pref_n[10] = ip4_n[3]
564         elif plen == 56:
565             pref_n[7] = ip4_n[0]
566             pref_n[9] = ip4_n[1]
567             pref_n[10] = ip4_n[2]
568             pref_n[11] = ip4_n[3]
569         elif plen == 64:
570             pref_n[9] = ip4_n[0]
571             pref_n[10] = ip4_n[1]
572             pref_n[11] = ip4_n[2]
573             pref_n[12] = ip4_n[3]
574         elif plen == 96:
575             pref_n[12] = ip4_n[0]
576             pref_n[13] = ip4_n[1]
577             pref_n[14] = ip4_n[2]
578             pref_n[15] = ip4_n[3]
579         packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
580         return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
581
582     def verify_ipfix_max_sessions(self, data, limit):
583         """
584         Verify IPFIX maximum session entries exceeded event
585
586         :param data: Decoded IPFIX data records
587         :param limit: Number of maximum session entries that can be created.
588         """
589         self.assertEqual(1, len(data))
590         record = data[0]
591         # natEvent
592         self.assertEqual(scapy.compat.orb(record[230]), 13)
593         # natQuotaExceededEvent
594         self.assertEqual(struct.pack("I", 1), record[466])
595         # maxSessionEntries
596         self.assertEqual(struct.pack("I", limit), record[471])
597
598     def test_nat64_inside_interface_handles_neighbor_advertisement(self):
599         """ NAT64 inside interface handles Neighbor Advertisement """
600
601         flags = self.config_flags.NAT_IS_INSIDE
602         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
603                                           sw_if_index=self.pg5.sw_if_index)
604
605         # Try to send ping
606         ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
607                 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
608                 ICMPv6EchoRequest())
609         pkts = [ping]
610         self.pg5.add_stream(pkts)
611         self.pg_enable_capture(self.pg_interfaces)
612         self.pg_start()
613
614         # Wait for Neighbor Solicitation
615         capture = self.pg5.get_capture(len(pkts))
616         packet = capture[0]
617         try:
618             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
619             self.assertEqual(packet.haslayer(ICMPv6ND_NS), 1)
620             tgt = packet[ICMPv6ND_NS].tgt
621         except:
622             self.logger.error(ppp("Unexpected or invalid packet:", packet))
623             raise
624
625         # Send Neighbor Advertisement
626         p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
627              IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
628              ICMPv6ND_NA(tgt=tgt) /
629              ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
630         pkts = [p]
631         self.pg5.add_stream(pkts)
632         self.pg_enable_capture(self.pg_interfaces)
633         self.pg_start()
634
635         # Try to send ping again
636         pkts = [ping]
637         self.pg5.add_stream(pkts)
638         self.pg_enable_capture(self.pg_interfaces)
639         self.pg_start()
640
641         # Wait for ping reply
642         capture = self.pg5.get_capture(len(pkts))
643         packet = capture[0]
644         try:
645             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
646             self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
647             self.assertEqual(packet.haslayer(ICMPv6EchoReply), 1)
648         except:
649             self.logger.error(ppp("Unexpected or invalid packet:", packet))
650             raise
651
652     def test_pool(self):
653         """ Add/delete address to NAT64 pool """
654         nat_addr = '1.2.3.4'
655
656         self.vapi.nat64_add_del_pool_addr_range(start_addr=nat_addr,
657                                                 end_addr=nat_addr,
658                                                 vrf_id=0xFFFFFFFF, is_add=1)
659
660         addresses = self.vapi.nat64_pool_addr_dump()
661         self.assertEqual(len(addresses), 1)
662         self.assertEqual(str(addresses[0].address), nat_addr)
663
664         self.vapi.nat64_add_del_pool_addr_range(start_addr=nat_addr,
665                                                 end_addr=nat_addr,
666                                                 vrf_id=0xFFFFFFFF, is_add=0)
667
668         addresses = self.vapi.nat64_pool_addr_dump()
669         self.assertEqual(len(addresses), 0)
670
671     def test_interface(self):
672         """ Enable/disable NAT64 feature on the interface """
673         flags = self.config_flags.NAT_IS_INSIDE
674         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
675                                           sw_if_index=self.pg0.sw_if_index)
676         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
677                                           sw_if_index=self.pg1.sw_if_index)
678
679         interfaces = self.vapi.nat64_interface_dump()
680         self.assertEqual(len(interfaces), 2)
681         pg0_found = False
682         pg1_found = False
683         for intf in interfaces:
684             if intf.sw_if_index == self.pg0.sw_if_index:
685                 self.assertEqual(intf.flags, self.config_flags.NAT_IS_INSIDE)
686                 pg0_found = True
687             elif intf.sw_if_index == self.pg1.sw_if_index:
688                 self.assertEqual(intf.flags, self.config_flags.NAT_IS_OUTSIDE)
689                 pg1_found = True
690         self.assertTrue(pg0_found)
691         self.assertTrue(pg1_found)
692
693         features = self.vapi.cli("show interface features pg0")
694         self.assertIn('nat64-in2out', features)
695         features = self.vapi.cli("show interface features pg1")
696         self.assertIn('nat64-out2in', features)
697
698         self.vapi.nat64_add_del_interface(is_add=0, flags=flags,
699                                           sw_if_index=self.pg0.sw_if_index)
700         self.vapi.nat64_add_del_interface(is_add=0, flags=flags,
701                                           sw_if_index=self.pg1.sw_if_index)
702
703         interfaces = self.vapi.nat64_interface_dump()
704         self.assertEqual(len(interfaces), 0)
705
706     def test_static_bib(self):
707         """ Add/delete static BIB entry """
708         in_addr = '2001:db8:85a3::8a2e:370:7334'
709         out_addr = '10.1.1.3'
710         in_port = 1234
711         out_port = 5678
712         proto = IP_PROTOS.tcp
713
714         self.vapi.nat64_add_del_static_bib(i_addr=in_addr, o_addr=out_addr,
715                                            i_port=in_port, o_port=out_port,
716                                            proto=proto, vrf_id=0, is_add=1)
717         bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
718         static_bib_num = 0
719         for bibe in bib:
720             if bibe.flags & self.config_flags.NAT_IS_STATIC:
721                 static_bib_num += 1
722                 self.assertEqual(str(bibe.i_addr), in_addr)
723                 self.assertEqual(str(bibe.o_addr), out_addr)
724                 self.assertEqual(bibe.i_port, in_port)
725                 self.assertEqual(bibe.o_port, out_port)
726         self.assertEqual(static_bib_num, 1)
727         bibs = self.statistics.get_counter('/nat64/total-bibs')
728         self.assertEqual(bibs[0][0], 1)
729
730         self.vapi.nat64_add_del_static_bib(i_addr=in_addr, o_addr=out_addr,
731                                            i_port=in_port, o_port=out_port,
732                                            proto=proto, vrf_id=0, is_add=0)
733         bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
734         static_bib_num = 0
735         for bibe in bib:
736             if bibe.flags & self.config_flags.NAT_IS_STATIC:
737                 static_bib_num += 1
738         self.assertEqual(static_bib_num, 0)
739         bibs = self.statistics.get_counter('/nat64/total-bibs')
740         self.assertEqual(bibs[0][0], 0)
741
742     def test_set_timeouts(self):
743         """ Set NAT64 timeouts """
744         # verify default values
745         timeouts = self.vapi.nat64_get_timeouts()
746         self.assertEqual(timeouts.udp, 300)
747         self.assertEqual(timeouts.icmp, 60)
748         self.assertEqual(timeouts.tcp_transitory, 240)
749         self.assertEqual(timeouts.tcp_established, 7440)
750
751         # set and verify custom values
752         self.vapi.nat64_set_timeouts(udp=200, tcp_established=7450,
753                                      tcp_transitory=250, icmp=30)
754         timeouts = self.vapi.nat64_get_timeouts()
755         self.assertEqual(timeouts.udp, 200)
756         self.assertEqual(timeouts.icmp, 30)
757         self.assertEqual(timeouts.tcp_transitory, 250)
758         self.assertEqual(timeouts.tcp_established, 7450)
759
760     def test_dynamic(self):
761         """ NAT64 dynamic translation test """
762         self.tcp_port_in = 6303
763         self.udp_port_in = 6304
764         self.icmp_id_in = 6305
765
766         ses_num_start = self.nat64_get_ses_num()
767
768         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
769                                                 end_addr=self.nat_addr,
770                                                 vrf_id=0xFFFFFFFF,
771                                                 is_add=1)
772         flags = self.config_flags.NAT_IS_INSIDE
773         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
774                                           sw_if_index=self.pg0.sw_if_index)
775         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
776                                           sw_if_index=self.pg1.sw_if_index)
777
778         # in2out
779         tcpn = self.statistics.get_counter('/nat64/in2out/tcp')[0]
780         udpn = self.statistics.get_counter('/nat64/in2out/udp')[0]
781         icmpn = self.statistics.get_counter('/nat64/in2out/icmp')[0]
782         drops = self.statistics.get_counter('/nat64/in2out/drops')[0]
783
784         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
785         self.pg0.add_stream(pkts)
786         self.pg_enable_capture(self.pg_interfaces)
787         self.pg_start()
788         capture = self.pg1.get_capture(len(pkts))
789         self.verify_capture_out(capture, nat_ip=self.nat_addr,
790                                 dst_ip=self.pg1.remote_ip4)
791
792         if_idx = self.pg0.sw_if_index
793         cnt = self.statistics.get_counter('/nat64/in2out/tcp')[0]
794         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 1)
795         cnt = self.statistics.get_counter('/nat64/in2out/udp')[0]
796         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
797         cnt = self.statistics.get_counter('/nat64/in2out/icmp')[0]
798         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
799         cnt = self.statistics.get_counter('/nat64/in2out/drops')[0]
800         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
801
802         # out2in
803         tcpn = self.statistics.get_counter('/nat64/out2in/tcp')[0]
804         udpn = self.statistics.get_counter('/nat64/out2in/udp')[0]
805         icmpn = self.statistics.get_counter('/nat64/out2in/icmp')[0]
806         drops = self.statistics.get_counter('/nat64/out2in/drops')[0]
807
808         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
809         self.pg1.add_stream(pkts)
810         self.pg_enable_capture(self.pg_interfaces)
811         self.pg_start()
812         capture = self.pg0.get_capture(len(pkts))
813         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
814         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
815
816         if_idx = self.pg1.sw_if_index
817         cnt = self.statistics.get_counter('/nat64/out2in/tcp')[0]
818         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
819         cnt = self.statistics.get_counter('/nat64/out2in/udp')[0]
820         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
821         cnt = self.statistics.get_counter('/nat64/out2in/icmp')[0]
822         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
823         cnt = self.statistics.get_counter('/nat64/out2in/drops')[0]
824         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
825
826         bibs = self.statistics.get_counter('/nat64/total-bibs')
827         self.assertEqual(bibs[0][0], 3)
828         sessions = self.statistics.get_counter('/nat64/total-sessions')
829         self.assertEqual(sessions[0][0], 3)
830
831         # in2out
832         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
833         self.pg0.add_stream(pkts)
834         self.pg_enable_capture(self.pg_interfaces)
835         self.pg_start()
836         capture = self.pg1.get_capture(len(pkts))
837         self.verify_capture_out(capture, nat_ip=self.nat_addr,
838                                 dst_ip=self.pg1.remote_ip4)
839
840         # out2in
841         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
842         self.pg1.add_stream(pkts)
843         self.pg_enable_capture(self.pg_interfaces)
844         self.pg_start()
845         capture = self.pg0.get_capture(len(pkts))
846         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
847
848         ses_num_end = self.nat64_get_ses_num()
849
850         self.assertEqual(ses_num_end - ses_num_start, 3)
851
852         # tenant with specific VRF
853         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.vrf1_nat_addr,
854                                                 end_addr=self.vrf1_nat_addr,
855                                                 vrf_id=self.vrf1_id, is_add=1)
856         flags = self.config_flags.NAT_IS_INSIDE
857         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
858                                           sw_if_index=self.pg2.sw_if_index)
859
860         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
861         self.pg2.add_stream(pkts)
862         self.pg_enable_capture(self.pg_interfaces)
863         self.pg_start()
864         capture = self.pg1.get_capture(len(pkts))
865         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
866                                 dst_ip=self.pg1.remote_ip4)
867
868         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
869         self.pg1.add_stream(pkts)
870         self.pg_enable_capture(self.pg_interfaces)
871         self.pg_start()
872         capture = self.pg2.get_capture(len(pkts))
873         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
874
875     def test_static(self):
876         """ NAT64 static translation test """
877         self.tcp_port_in = 60303
878         self.udp_port_in = 60304
879         self.icmp_id_in = 60305
880         self.tcp_port_out = 60303
881         self.udp_port_out = 60304
882         self.icmp_id_out = 60305
883
884         ses_num_start = self.nat64_get_ses_num()
885
886         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
887                                                 end_addr=self.nat_addr,
888                                                 vrf_id=0xFFFFFFFF,
889                                                 is_add=1)
890         flags = self.config_flags.NAT_IS_INSIDE
891         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
892                                           sw_if_index=self.pg0.sw_if_index)
893         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
894                                           sw_if_index=self.pg1.sw_if_index)
895
896         self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
897                                            o_addr=self.nat_addr,
898                                            i_port=self.tcp_port_in,
899                                            o_port=self.tcp_port_out,
900                                            proto=IP_PROTOS.tcp, vrf_id=0,
901                                            is_add=1)
902         self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
903                                            o_addr=self.nat_addr,
904                                            i_port=self.udp_port_in,
905                                            o_port=self.udp_port_out,
906                                            proto=IP_PROTOS.udp, vrf_id=0,
907                                            is_add=1)
908         self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
909                                            o_addr=self.nat_addr,
910                                            i_port=self.icmp_id_in,
911                                            o_port=self.icmp_id_out,
912                                            proto=IP_PROTOS.icmp, vrf_id=0,
913                                            is_add=1)
914
915         # in2out
916         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
917         self.pg0.add_stream(pkts)
918         self.pg_enable_capture(self.pg_interfaces)
919         self.pg_start()
920         capture = self.pg1.get_capture(len(pkts))
921         self.verify_capture_out(capture, nat_ip=self.nat_addr,
922                                 dst_ip=self.pg1.remote_ip4, same_port=True)
923
924         # out2in
925         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
926         self.pg1.add_stream(pkts)
927         self.pg_enable_capture(self.pg_interfaces)
928         self.pg_start()
929         capture = self.pg0.get_capture(len(pkts))
930         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
931         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
932
933         ses_num_end = self.nat64_get_ses_num()
934
935         self.assertEqual(ses_num_end - ses_num_start, 3)
936
937     @unittest.skipUnless(running_extended_tests, "part of extended tests")
938     def test_session_timeout(self):
939         """ NAT64 session timeout """
940         self.icmp_id_in = 1234
941         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
942                                                 end_addr=self.nat_addr,
943                                                 vrf_id=0xFFFFFFFF,
944                                                 is_add=1)
945         flags = self.config_flags.NAT_IS_INSIDE
946         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
947                                           sw_if_index=self.pg0.sw_if_index)
948         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
949                                           sw_if_index=self.pg1.sw_if_index)
950         self.vapi.nat64_set_timeouts(udp=300, tcp_established=5,
951                                      tcp_transitory=5,
952                                      icmp=5)
953
954         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
955         self.pg0.add_stream(pkts)
956         self.pg_enable_capture(self.pg_interfaces)
957         self.pg_start()
958         capture = self.pg1.get_capture(len(pkts))
959
960         ses_num_before_timeout = self.nat64_get_ses_num()
961
962         sleep(15)
963
964         # ICMP and TCP session after timeout
965         ses_num_after_timeout = self.nat64_get_ses_num()
966         self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
967
968     def test_icmp_error(self):
969         """ NAT64 ICMP Error message translation """
970         self.tcp_port_in = 6303
971         self.udp_port_in = 6304
972         self.icmp_id_in = 6305
973
974         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
975                                                 end_addr=self.nat_addr,
976                                                 vrf_id=0xFFFFFFFF,
977                                                 is_add=1)
978         flags = self.config_flags.NAT_IS_INSIDE
979         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
980                                           sw_if_index=self.pg0.sw_if_index)
981         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
982                                           sw_if_index=self.pg1.sw_if_index)
983
984         # send some packets to create sessions
985         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
986         self.pg0.add_stream(pkts)
987         self.pg_enable_capture(self.pg_interfaces)
988         self.pg_start()
989         capture_ip4 = self.pg1.get_capture(len(pkts))
990         self.verify_capture_out(capture_ip4,
991                                 nat_ip=self.nat_addr,
992                                 dst_ip=self.pg1.remote_ip4)
993
994         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
995         self.pg1.add_stream(pkts)
996         self.pg_enable_capture(self.pg_interfaces)
997         self.pg_start()
998         capture_ip6 = self.pg0.get_capture(len(pkts))
999         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
1000         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
1001                                    self.pg0.remote_ip6)
1002
1003         # in2out
1004         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1005                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
1006                 ICMPv6DestUnreach(code=1) /
1007                 packet[IPv6] for packet in capture_ip6]
1008         self.pg0.add_stream(pkts)
1009         self.pg_enable_capture(self.pg_interfaces)
1010         self.pg_start()
1011         capture = self.pg1.get_capture(len(pkts))
1012         for packet in capture:
1013             try:
1014                 self.assertEqual(packet[IP].src, self.nat_addr)
1015                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1016                 self.assertEqual(packet[ICMP].type, 3)
1017                 self.assertEqual(packet[ICMP].code, 13)
1018                 inner = packet[IPerror]
1019                 self.assertEqual(inner.src, self.pg1.remote_ip4)
1020                 self.assertEqual(inner.dst, self.nat_addr)
1021                 self.assert_packet_checksums_valid(packet)
1022                 if inner.haslayer(TCPerror):
1023                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
1024                 elif inner.haslayer(UDPerror):
1025                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
1026                 else:
1027                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
1028             except:
1029                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1030                 raise
1031
1032         # out2in
1033         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1034                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1035                 ICMP(type=3, code=13) /
1036                 packet[IP] for packet in capture_ip4]
1037         self.pg1.add_stream(pkts)
1038         self.pg_enable_capture(self.pg_interfaces)
1039         self.pg_start()
1040         capture = self.pg0.get_capture(len(pkts))
1041         for packet in capture:
1042             try:
1043                 self.assertEqual(packet[IPv6].src, ip.src)
1044                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1045                 icmp = packet[ICMPv6DestUnreach]
1046                 self.assertEqual(icmp.code, 1)
1047                 inner = icmp[IPerror6]
1048                 self.assertEqual(inner.src, self.pg0.remote_ip6)
1049                 self.assertEqual(inner.dst, ip.src)
1050                 self.assert_icmpv6_checksum_valid(packet)
1051                 if inner.haslayer(TCPerror):
1052                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
1053                 elif inner.haslayer(UDPerror):
1054                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
1055                 else:
1056                     self.assertEqual(inner[ICMPv6EchoRequest].id,
1057                                      self.icmp_id_in)
1058             except:
1059                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1060                 raise
1061
1062     def test_hairpinning(self):
1063         """ NAT64 hairpinning """
1064
1065         client = self.pg0.remote_hosts[0]
1066         server = self.pg0.remote_hosts[1]
1067         server_tcp_in_port = 22
1068         server_tcp_out_port = 4022
1069         server_udp_in_port = 23
1070         server_udp_out_port = 4023
1071         client_tcp_in_port = 1234
1072         client_udp_in_port = 1235
1073         client_tcp_out_port = 0
1074         client_udp_out_port = 0
1075         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
1076         nat_addr_ip6 = ip.src
1077
1078         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1079                                                 end_addr=self.nat_addr,
1080                                                 vrf_id=0xFFFFFFFF,
1081                                                 is_add=1)
1082         flags = self.config_flags.NAT_IS_INSIDE
1083         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1084                                           sw_if_index=self.pg0.sw_if_index)
1085         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1086                                           sw_if_index=self.pg1.sw_if_index)
1087
1088         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1089                                            o_addr=self.nat_addr,
1090                                            i_port=server_tcp_in_port,
1091                                            o_port=server_tcp_out_port,
1092                                            proto=IP_PROTOS.tcp, vrf_id=0,
1093                                            is_add=1)
1094         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1095                                            o_addr=self.nat_addr,
1096                                            i_port=server_udp_in_port,
1097                                            o_port=server_udp_out_port,
1098                                            proto=IP_PROTOS.udp, vrf_id=0,
1099                                            is_add=1)
1100
1101         # client to server
1102         pkts = []
1103         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1104              IPv6(src=client.ip6, dst=nat_addr_ip6) /
1105              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
1106         pkts.append(p)
1107         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1108              IPv6(src=client.ip6, dst=nat_addr_ip6) /
1109              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
1110         pkts.append(p)
1111         self.pg0.add_stream(pkts)
1112         self.pg_enable_capture(self.pg_interfaces)
1113         self.pg_start()
1114         capture = self.pg0.get_capture(len(pkts))
1115         for packet in capture:
1116             try:
1117                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1118                 self.assertEqual(packet[IPv6].dst, server.ip6)
1119                 self.assert_packet_checksums_valid(packet)
1120                 if packet.haslayer(TCP):
1121                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
1122                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
1123                     client_tcp_out_port = packet[TCP].sport
1124                 else:
1125                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
1126                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
1127                     client_udp_out_port = packet[UDP].sport
1128             except:
1129                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1130                 raise
1131
1132         # server to client
1133         pkts = []
1134         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1135              IPv6(src=server.ip6, dst=nat_addr_ip6) /
1136              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
1137         pkts.append(p)
1138         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1139              IPv6(src=server.ip6, dst=nat_addr_ip6) /
1140              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
1141         pkts.append(p)
1142         self.pg0.add_stream(pkts)
1143         self.pg_enable_capture(self.pg_interfaces)
1144         self.pg_start()
1145         capture = self.pg0.get_capture(len(pkts))
1146         for packet in capture:
1147             try:
1148                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1149                 self.assertEqual(packet[IPv6].dst, client.ip6)
1150                 self.assert_packet_checksums_valid(packet)
1151                 if packet.haslayer(TCP):
1152                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
1153                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
1154                 else:
1155                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
1156                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
1157             except:
1158                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1159                 raise
1160
1161         # ICMP error
1162         pkts = []
1163         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1164                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
1165                 ICMPv6DestUnreach(code=1) /
1166                 packet[IPv6] for packet in capture]
1167         self.pg0.add_stream(pkts)
1168         self.pg_enable_capture(self.pg_interfaces)
1169         self.pg_start()
1170         capture = self.pg0.get_capture(len(pkts))
1171         for packet in capture:
1172             try:
1173                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1174                 self.assertEqual(packet[IPv6].dst, server.ip6)
1175                 icmp = packet[ICMPv6DestUnreach]
1176                 self.assertEqual(icmp.code, 1)
1177                 inner = icmp[IPerror6]
1178                 self.assertEqual(inner.src, server.ip6)
1179                 self.assertEqual(inner.dst, nat_addr_ip6)
1180                 self.assert_packet_checksums_valid(packet)
1181                 if inner.haslayer(TCPerror):
1182                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
1183                     self.assertEqual(inner[TCPerror].dport,
1184                                      client_tcp_out_port)
1185                 else:
1186                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
1187                     self.assertEqual(inner[UDPerror].dport,
1188                                      client_udp_out_port)
1189             except:
1190                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1191                 raise
1192
1193     def test_prefix(self):
1194         """ NAT64 Network-Specific Prefix """
1195
1196         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1197                                                 end_addr=self.nat_addr,
1198                                                 vrf_id=0xFFFFFFFF,
1199                                                 is_add=1)
1200         flags = self.config_flags.NAT_IS_INSIDE
1201         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1202                                           sw_if_index=self.pg0.sw_if_index)
1203         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1204                                           sw_if_index=self.pg1.sw_if_index)
1205         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.vrf1_nat_addr,
1206                                                 end_addr=self.vrf1_nat_addr,
1207                                                 vrf_id=self.vrf1_id, is_add=1)
1208         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1209                                           sw_if_index=self.pg2.sw_if_index)
1210
1211         # Add global prefix
1212         global_pref64 = "2001:db8::"
1213         global_pref64_len = 32
1214         global_pref64_str = "{}/{}".format(global_pref64, global_pref64_len)
1215         self.vapi.nat64_add_del_prefix(prefix=global_pref64_str, vrf_id=0,
1216                                        is_add=1)
1217
1218         prefix = self.vapi.nat64_prefix_dump()
1219         self.assertEqual(len(prefix), 1)
1220         self.assertEqual(str(prefix[0].prefix), global_pref64_str)
1221         self.assertEqual(prefix[0].vrf_id, 0)
1222
1223         # Add tenant specific prefix
1224         vrf1_pref64 = "2001:db8:122:300::"
1225         vrf1_pref64_len = 56
1226         vrf1_pref64_str = "{}/{}".format(vrf1_pref64, vrf1_pref64_len)
1227         self.vapi.nat64_add_del_prefix(prefix=vrf1_pref64_str,
1228                                        vrf_id=self.vrf1_id, is_add=1)
1229
1230         prefix = self.vapi.nat64_prefix_dump()
1231         self.assertEqual(len(prefix), 2)
1232
1233         # Global prefix
1234         pkts = self.create_stream_in_ip6(self.pg0,
1235                                          self.pg1,
1236                                          pref=global_pref64,
1237                                          plen=global_pref64_len)
1238         self.pg0.add_stream(pkts)
1239         self.pg_enable_capture(self.pg_interfaces)
1240         self.pg_start()
1241         capture = self.pg1.get_capture(len(pkts))
1242         self.verify_capture_out(capture, nat_ip=self.nat_addr,
1243                                 dst_ip=self.pg1.remote_ip4)
1244
1245         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1246         self.pg1.add_stream(pkts)
1247         self.pg_enable_capture(self.pg_interfaces)
1248         self.pg_start()
1249         capture = self.pg0.get_capture(len(pkts))
1250         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
1251                                   global_pref64,
1252                                   global_pref64_len)
1253         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
1254
1255         # Tenant specific prefix
1256         pkts = self.create_stream_in_ip6(self.pg2,
1257                                          self.pg1,
1258                                          pref=vrf1_pref64,
1259                                          plen=vrf1_pref64_len)
1260         self.pg2.add_stream(pkts)
1261         self.pg_enable_capture(self.pg_interfaces)
1262         self.pg_start()
1263         capture = self.pg1.get_capture(len(pkts))
1264         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
1265                                 dst_ip=self.pg1.remote_ip4)
1266
1267         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
1268         self.pg1.add_stream(pkts)
1269         self.pg_enable_capture(self.pg_interfaces)
1270         self.pg_start()
1271         capture = self.pg2.get_capture(len(pkts))
1272         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
1273                                   vrf1_pref64,
1274                                   vrf1_pref64_len)
1275         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
1276
1277     def test_unknown_proto(self):
1278         """ NAT64 translate packet with unknown protocol """
1279
1280         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1281                                                 end_addr=self.nat_addr,
1282                                                 vrf_id=0xFFFFFFFF,
1283                                                 is_add=1)
1284         flags = self.config_flags.NAT_IS_INSIDE
1285         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1286                                           sw_if_index=self.pg0.sw_if_index)
1287         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1288                                           sw_if_index=self.pg1.sw_if_index)
1289         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1290
1291         # in2out
1292         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1293              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
1294              TCP(sport=self.tcp_port_in, dport=20))
1295         self.pg0.add_stream(p)
1296         self.pg_enable_capture(self.pg_interfaces)
1297         self.pg_start()
1298         p = self.pg1.get_capture(1)
1299
1300         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1301              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
1302              GRE() /
1303              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
1304              TCP(sport=1234, dport=1234))
1305         self.pg0.add_stream(p)
1306         self.pg_enable_capture(self.pg_interfaces)
1307         self.pg_start()
1308         p = self.pg1.get_capture(1)
1309         packet = p[0]
1310         try:
1311             self.assertEqual(packet[IP].src, self.nat_addr)
1312             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1313             self.assertEqual(packet.haslayer(GRE), 1)
1314             self.assert_packet_checksums_valid(packet)
1315         except:
1316             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1317             raise
1318
1319         # out2in
1320         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1321              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1322              GRE() /
1323              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
1324              TCP(sport=1234, dport=1234))
1325         self.pg1.add_stream(p)
1326         self.pg_enable_capture(self.pg_interfaces)
1327         self.pg_start()
1328         p = self.pg0.get_capture(1)
1329         packet = p[0]
1330         try:
1331             self.assertEqual(packet[IPv6].src, remote_ip6)
1332             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1333             self.assertEqual(packet[IPv6].nh, 47)
1334         except:
1335             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1336             raise
1337
1338     def test_hairpinning_unknown_proto(self):
1339         """ NAT64 translate packet with unknown protocol - hairpinning """
1340
1341         client = self.pg0.remote_hosts[0]
1342         server = self.pg0.remote_hosts[1]
1343         server_tcp_in_port = 22
1344         server_tcp_out_port = 4022
1345         client_tcp_in_port = 1234
1346         client_tcp_out_port = 1235
1347         server_nat_ip = "10.0.0.100"
1348         client_nat_ip = "10.0.0.110"
1349         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
1350         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
1351
1352         self.vapi.nat64_add_del_pool_addr_range(start_addr=server_nat_ip,
1353                                                 end_addr=client_nat_ip,
1354                                                 vrf_id=0xFFFFFFFF,
1355                                                 is_add=1)
1356         flags = self.config_flags.NAT_IS_INSIDE
1357         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1358                                           sw_if_index=self.pg0.sw_if_index)
1359         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1360                                           sw_if_index=self.pg1.sw_if_index)
1361
1362         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1363                                            o_addr=server_nat_ip,
1364                                            i_port=server_tcp_in_port,
1365                                            o_port=server_tcp_out_port,
1366                                            proto=IP_PROTOS.tcp, vrf_id=0,
1367                                            is_add=1)
1368
1369         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1370                                            o_addr=server_nat_ip, i_port=0,
1371                                            o_port=0,
1372                                            proto=IP_PROTOS.gre, vrf_id=0,
1373                                            is_add=1)
1374
1375         self.vapi.nat64_add_del_static_bib(i_addr=client.ip6n,
1376                                            o_addr=client_nat_ip,
1377                                            i_port=client_tcp_in_port,
1378                                            o_port=client_tcp_out_port,
1379                                            proto=IP_PROTOS.tcp, vrf_id=0,
1380                                            is_add=1)
1381
1382         # client to server
1383         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1384              IPv6(src=client.ip6, dst=server_nat_ip6) /
1385              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
1386         self.pg0.add_stream(p)
1387         self.pg_enable_capture(self.pg_interfaces)
1388         self.pg_start()
1389         p = self.pg0.get_capture(1)
1390
1391         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1392              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
1393              GRE() /
1394              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
1395              TCP(sport=1234, dport=1234))
1396         self.pg0.add_stream(p)
1397         self.pg_enable_capture(self.pg_interfaces)
1398         self.pg_start()
1399         p = self.pg0.get_capture(1)
1400         packet = p[0]
1401         try:
1402             self.assertEqual(packet[IPv6].src, client_nat_ip6)
1403             self.assertEqual(packet[IPv6].dst, server.ip6)
1404             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1405         except:
1406             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1407             raise
1408
1409         # server to client
1410         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1411              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
1412              GRE() /
1413              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
1414              TCP(sport=1234, dport=1234))
1415         self.pg0.add_stream(p)
1416         self.pg_enable_capture(self.pg_interfaces)
1417         self.pg_start()
1418         p = self.pg0.get_capture(1)
1419         packet = p[0]
1420         try:
1421             self.assertEqual(packet[IPv6].src, server_nat_ip6)
1422             self.assertEqual(packet[IPv6].dst, client.ip6)
1423             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1424         except:
1425             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1426             raise
1427
1428     def test_one_armed_nat64(self):
1429         """ One armed NAT64 """
1430         external_port = 0
1431         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
1432                                            '64:ff9b::',
1433                                            96)
1434
1435         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1436                                                 end_addr=self.nat_addr,
1437                                                 vrf_id=0xFFFFFFFF,
1438                                                 is_add=1)
1439         flags = self.config_flags.NAT_IS_INSIDE
1440         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1441                                           sw_if_index=self.pg3.sw_if_index)
1442         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1443                                           sw_if_index=self.pg3.sw_if_index)
1444
1445         # in2out
1446         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
1447              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
1448              TCP(sport=12345, dport=80))
1449         self.pg3.add_stream(p)
1450         self.pg_enable_capture(self.pg_interfaces)
1451         self.pg_start()
1452         capture = self.pg3.get_capture(1)
1453         p = capture[0]
1454         try:
1455             ip = p[IP]
1456             tcp = p[TCP]
1457             self.assertEqual(ip.src, self.nat_addr)
1458             self.assertEqual(ip.dst, self.pg3.remote_ip4)
1459             self.assertNotEqual(tcp.sport, 12345)
1460             external_port = tcp.sport
1461             self.assertEqual(tcp.dport, 80)
1462             self.assert_packet_checksums_valid(p)
1463         except:
1464             self.logger.error(ppp("Unexpected or invalid packet:", p))
1465             raise
1466
1467         # out2in
1468         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
1469              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
1470              TCP(sport=80, dport=external_port))
1471         self.pg3.add_stream(p)
1472         self.pg_enable_capture(self.pg_interfaces)
1473         self.pg_start()
1474         capture = self.pg3.get_capture(1)
1475         p = capture[0]
1476         try:
1477             ip = p[IPv6]
1478             tcp = p[TCP]
1479             self.assertEqual(ip.src, remote_host_ip6)
1480             self.assertEqual(ip.dst, self.pg3.remote_ip6)
1481             self.assertEqual(tcp.sport, 80)
1482             self.assertEqual(tcp.dport, 12345)
1483             self.assert_packet_checksums_valid(p)
1484         except:
1485             self.logger.error(ppp("Unexpected or invalid packet:", p))
1486             raise
1487
1488     def test_frag_in_order(self):
1489         """ NAT64 translate fragments arriving in order """
1490         self.tcp_port_in = random.randint(1025, 65535)
1491
1492         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1493                                                 end_addr=self.nat_addr,
1494                                                 vrf_id=0xFFFFFFFF,
1495                                                 is_add=1)
1496         flags = self.config_flags.NAT_IS_INSIDE
1497         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1498                                           sw_if_index=self.pg0.sw_if_index)
1499         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1500                                           sw_if_index=self.pg1.sw_if_index)
1501
1502         # in2out
1503         data = b'a' * 200
1504         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
1505                                            self.tcp_port_in, 20, data)
1506         self.pg0.add_stream(pkts)
1507         self.pg_enable_capture(self.pg_interfaces)
1508         self.pg_start()
1509         frags = self.pg1.get_capture(len(pkts))
1510         p = self.reass_frags_and_verify(frags,
1511                                         self.nat_addr,
1512                                         self.pg1.remote_ip4)
1513         self.assertEqual(p[TCP].dport, 20)
1514         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1515         self.tcp_port_out = p[TCP].sport
1516         self.assertEqual(data, p[Raw].load)
1517
1518         # out2in
1519         data = b"A" * 4 + b"b" * 16 + b"C" * 3
1520         pkts = self.create_stream_frag(self.pg1,
1521                                        self.nat_addr,
1522                                        20,
1523                                        self.tcp_port_out,
1524                                        data)
1525         self.pg1.add_stream(pkts)
1526         self.pg_enable_capture(self.pg_interfaces)
1527         self.pg_start()
1528         frags = self.pg0.get_capture(len(pkts))
1529         self.logger.debug(ppc("Captured:", frags))
1530         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1531         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1532         self.assertEqual(p[TCP].sport, 20)
1533         self.assertEqual(p[TCP].dport, self.tcp_port_in)
1534         self.assertEqual(data, p[Raw].load)
1535
1536     def test_reass_hairpinning(self):
1537         """ NAT64 fragments hairpinning """
1538         data = b'a' * 200
1539         server = self.pg0.remote_hosts[1]
1540         server_in_port = random.randint(1025, 65535)
1541         server_out_port = random.randint(1025, 65535)
1542         client_in_port = random.randint(1025, 65535)
1543         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
1544         nat_addr_ip6 = ip.src
1545
1546         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1547                                                 end_addr=self.nat_addr,
1548                                                 vrf_id=0xFFFFFFFF,
1549                                                 is_add=1)
1550         flags = self.config_flags.NAT_IS_INSIDE
1551         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1552                                           sw_if_index=self.pg0.sw_if_index)
1553         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1554                                           sw_if_index=self.pg1.sw_if_index)
1555
1556         # add static BIB entry for server
1557         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1558                                            o_addr=self.nat_addr,
1559                                            i_port=server_in_port,
1560                                            o_port=server_out_port,
1561                                            proto=IP_PROTOS.tcp, vrf_id=0,
1562                                            is_add=1)
1563
1564         # send packet from host to server
1565         pkts = self.create_stream_frag_ip6(self.pg0,
1566                                            self.nat_addr,
1567                                            client_in_port,
1568                                            server_out_port,
1569                                            data)
1570         self.pg0.add_stream(pkts)
1571         self.pg_enable_capture(self.pg_interfaces)
1572         self.pg_start()
1573         frags = self.pg0.get_capture(len(pkts))
1574         self.logger.debug(ppc("Captured:", frags))
1575         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
1576         self.assertNotEqual(p[TCP].sport, client_in_port)
1577         self.assertEqual(p[TCP].dport, server_in_port)
1578         self.assertEqual(data, p[Raw].load)
1579
1580     def test_frag_out_of_order(self):
1581         """ NAT64 translate fragments arriving out of order """
1582         self.tcp_port_in = random.randint(1025, 65535)
1583
1584         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1585                                                 end_addr=self.nat_addr,
1586                                                 vrf_id=0xFFFFFFFF,
1587                                                 is_add=1)
1588         flags = self.config_flags.NAT_IS_INSIDE
1589         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1590                                           sw_if_index=self.pg0.sw_if_index)
1591         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1592                                           sw_if_index=self.pg1.sw_if_index)
1593
1594         # in2out
1595         data = b'a' * 200
1596         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
1597                                            self.tcp_port_in, 20, data)
1598         pkts.reverse()
1599         self.pg0.add_stream(pkts)
1600         self.pg_enable_capture(self.pg_interfaces)
1601         self.pg_start()
1602         frags = self.pg1.get_capture(len(pkts))
1603         p = self.reass_frags_and_verify(frags,
1604                                         self.nat_addr,
1605                                         self.pg1.remote_ip4)
1606         self.assertEqual(p[TCP].dport, 20)
1607         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1608         self.tcp_port_out = p[TCP].sport
1609         self.assertEqual(data, p[Raw].load)
1610
1611         # out2in
1612         data = b"A" * 4 + b"B" * 16 + b"C" * 3
1613         pkts = self.create_stream_frag(self.pg1,
1614                                        self.nat_addr,
1615                                        20,
1616                                        self.tcp_port_out,
1617                                        data)
1618         pkts.reverse()
1619         self.pg1.add_stream(pkts)
1620         self.pg_enable_capture(self.pg_interfaces)
1621         self.pg_start()
1622         frags = self.pg0.get_capture(len(pkts))
1623         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1624         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1625         self.assertEqual(p[TCP].sport, 20)
1626         self.assertEqual(p[TCP].dport, self.tcp_port_in)
1627         self.assertEqual(data, p[Raw].load)
1628
1629     def test_interface_addr(self):
1630         """ Acquire NAT64 pool addresses from interface """
1631         self.vapi.nat64_add_del_interface_addr(
1632             is_add=1,
1633             sw_if_index=self.pg4.sw_if_index)
1634
1635         # no address in NAT64 pool
1636         addresses = self.vapi.nat44_address_dump()
1637         self.assertEqual(0, len(addresses))
1638
1639         # configure interface address and check NAT64 address pool
1640         self.pg4.config_ip4()
1641         addresses = self.vapi.nat64_pool_addr_dump()
1642         self.assertEqual(len(addresses), 1)
1643
1644         self.assertEqual(str(addresses[0].address),
1645                          self.pg4.local_ip4)
1646
1647         # remove interface address and check NAT64 address pool
1648         self.pg4.unconfig_ip4()
1649         addresses = self.vapi.nat64_pool_addr_dump()
1650         self.assertEqual(0, len(addresses))
1651
1652     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1653     def test_ipfix_max_bibs_sessions(self):
1654         """ IPFIX logging maximum session and BIB entries exceeded """
1655         max_bibs = 1280
1656         max_sessions = 2560
1657         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1658                                            '64:ff9b::',
1659                                            96)
1660
1661         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1662                                                 end_addr=self.nat_addr,
1663                                                 vrf_id=0xFFFFFFFF,
1664                                                 is_add=1)
1665         flags = self.config_flags.NAT_IS_INSIDE
1666         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1667                                           sw_if_index=self.pg0.sw_if_index)
1668         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1669                                           sw_if_index=self.pg1.sw_if_index)
1670
1671         pkts = []
1672         src = ""
1673         for i in range(0, max_bibs):
1674             src = "fd01:aa::%x" % (i)
1675             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1676                  IPv6(src=src, dst=remote_host_ip6) /
1677                  TCP(sport=12345, dport=80))
1678             pkts.append(p)
1679             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1680                  IPv6(src=src, dst=remote_host_ip6) /
1681                  TCP(sport=12345, dport=22))
1682             pkts.append(p)
1683         self.pg0.add_stream(pkts)
1684         self.pg_enable_capture(self.pg_interfaces)
1685         self.pg_start()
1686         self.pg1.get_capture(max_sessions)
1687
1688         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
1689                                      src_address=self.pg3.local_ip4,
1690                                      path_mtu=512,
1691                                      template_interval=10)
1692         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1693                                            src_port=self.ipfix_src_port,
1694                                            enable=1)
1695
1696         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1697              IPv6(src=src, dst=remote_host_ip6) /
1698              TCP(sport=12345, dport=25))
1699         self.pg0.add_stream(p)
1700         self.pg_enable_capture(self.pg_interfaces)
1701         self.pg_start()
1702         self.pg1.assert_nothing_captured()
1703         sleep(1)
1704         self.vapi.ipfix_flush()
1705         capture = self.pg3.get_capture(7)
1706         ipfix = IPFIXDecoder()
1707         # first load template
1708         for p in capture:
1709             self.assertTrue(p.haslayer(IPFIX))
1710             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1711             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1712             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1713             self.assertEqual(p[UDP].dport, 4739)
1714             self.assertEqual(p[IPFIX].observationDomainID,
1715                              self.ipfix_domain_id)
1716             if p.haslayer(Template):
1717                 ipfix.add_template(p.getlayer(Template))
1718         # verify events in data set
1719         for p in capture:
1720             if p.haslayer(Data):
1721                 data = ipfix.decode_data_set(p.getlayer(Set))
1722                 self.verify_ipfix_max_sessions(data, max_sessions)
1723
1724         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1725              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1726              TCP(sport=12345, dport=80))
1727         self.pg0.add_stream(p)
1728         self.pg_enable_capture(self.pg_interfaces)
1729         self.pg_start()
1730         self.pg1.assert_nothing_captured()
1731         sleep(1)
1732         self.vapi.ipfix_flush()
1733         capture = self.pg3.get_capture(1)
1734         # verify events in data set
1735         for p in capture:
1736             self.assertTrue(p.haslayer(IPFIX))
1737             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1738             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1739             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1740             self.assertEqual(p[UDP].dport, 4739)
1741             self.assertEqual(p[IPFIX].observationDomainID,
1742                              self.ipfix_domain_id)
1743             if p.haslayer(Data):
1744                 data = ipfix.decode_data_set(p.getlayer(Set))
1745                 self.verify_ipfix_max_bibs(data, max_bibs)
1746
1747     def test_ipfix_bib_ses(self):
1748         """ IPFIX logging NAT64 BIB/session create and delete events """
1749         self.tcp_port_in = random.randint(1025, 65535)
1750         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1751                                            '64:ff9b::',
1752                                            96)
1753
1754         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1755                                                 end_addr=self.nat_addr,
1756                                                 vrf_id=0xFFFFFFFF,
1757                                                 is_add=1)
1758         flags = self.config_flags.NAT_IS_INSIDE
1759         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1760                                           sw_if_index=self.pg0.sw_if_index)
1761         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1762                                           sw_if_index=self.pg1.sw_if_index)
1763         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
1764                                      src_address=self.pg3.local_ip4,
1765                                      path_mtu=512,
1766                                      template_interval=10)
1767         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1768                                            src_port=self.ipfix_src_port,
1769                                            enable=1)
1770
1771         # Create
1772         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1773              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1774              TCP(sport=self.tcp_port_in, dport=25))
1775         self.pg0.add_stream(p)
1776         self.pg_enable_capture(self.pg_interfaces)
1777         self.pg_start()
1778         p = self.pg1.get_capture(1)
1779         self.tcp_port_out = p[0][TCP].sport
1780         self.vapi.ipfix_flush()
1781         capture = self.pg3.get_capture(8)
1782         ipfix = IPFIXDecoder()
1783         # first load template
1784         for p in capture:
1785             self.assertTrue(p.haslayer(IPFIX))
1786             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1787             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1788             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1789             self.assertEqual(p[UDP].dport, 4739)
1790             self.assertEqual(p[IPFIX].observationDomainID,
1791                              self.ipfix_domain_id)
1792             if p.haslayer(Template):
1793                 ipfix.add_template(p.getlayer(Template))
1794         # verify events in data set
1795         for p in capture:
1796             if p.haslayer(Data):
1797                 data = ipfix.decode_data_set(p.getlayer(Set))
1798                 if scapy.compat.orb(data[0][230]) == 10:
1799                     self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6)
1800                 elif scapy.compat.orb(data[0][230]) == 6:
1801                     self.verify_ipfix_nat64_ses(data,
1802                                                 1,
1803                                                 self.pg0.remote_ip6,
1804                                                 self.pg1.remote_ip4,
1805                                                 25)
1806                 else:
1807                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
1808
1809         # Delete
1810         self.pg_enable_capture(self.pg_interfaces)
1811         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1812                                                 end_addr=self.nat_addr,
1813                                                 vrf_id=0xFFFFFFFF,
1814                                                 is_add=0)
1815         self.vapi.ipfix_flush()
1816         capture = self.pg3.get_capture(2)
1817         # verify events in data set
1818         for p in capture:
1819             self.assertTrue(p.haslayer(IPFIX))
1820             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1821             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1822             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1823             self.assertEqual(p[UDP].dport, 4739)
1824             self.assertEqual(p[IPFIX].observationDomainID,
1825                              self.ipfix_domain_id)
1826             if p.haslayer(Data):
1827                 data = ipfix.decode_data_set(p.getlayer(Set))
1828                 if scapy.compat.orb(data[0][230]) == 11:
1829                     self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6)
1830                 elif scapy.compat.orb(data[0][230]) == 7:
1831                     self.verify_ipfix_nat64_ses(data,
1832                                                 0,
1833                                                 self.pg0.remote_ip6,
1834                                                 self.pg1.remote_ip4,
1835                                                 25)
1836                 else:
1837                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
1838
1839     def test_syslog_sess(self):
1840         """ Test syslog session creation and deletion """
1841         self.tcp_port_in = random.randint(1025, 65535)
1842         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1843                                            '64:ff9b::',
1844                                            96)
1845
1846         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1847                                                 end_addr=self.nat_addr,
1848                                                 vrf_id=0xFFFFFFFF,
1849                                                 is_add=1)
1850         flags = self.config_flags.NAT_IS_INSIDE
1851         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1852                                           sw_if_index=self.pg0.sw_if_index)
1853         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1854                                           sw_if_index=self.pg1.sw_if_index)
1855         self.vapi.syslog_set_filter(
1856             self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
1857         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
1858
1859         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1860              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1861              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1862         self.pg0.add_stream(p)
1863         self.pg_enable_capture(self.pg_interfaces)
1864         self.pg_start()
1865         p = self.pg1.get_capture(1)
1866         self.tcp_port_out = p[0][TCP].sport
1867         capture = self.pg3.get_capture(1)
1868         self.verify_syslog_sess(capture[0][Raw].load, is_ip6=True)
1869
1870         self.pg_enable_capture(self.pg_interfaces)
1871         self.pg_start()
1872         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1873                                                 end_addr=self.nat_addr,
1874                                                 vrf_id=0xFFFFFFFF,
1875                                                 is_add=0)
1876         capture = self.pg3.get_capture(1)
1877         self.verify_syslog_sess(capture[0][Raw].load, False, True)
1878
1879     def nat64_get_ses_num(self):
1880         """
1881         Return number of active NAT64 sessions.
1882         """
1883         st = self.vapi.nat64_st_dump(proto=255)
1884         return len(st)
1885
1886     def clear_nat64(self):
1887         """
1888         Clear NAT64 configuration.
1889         """
1890         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1891                                            src_port=self.ipfix_src_port,
1892                                            enable=0)
1893         self.ipfix_src_port = 4739
1894         self.ipfix_domain_id = 1
1895
1896         self.vapi.syslog_set_filter(
1897             self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
1898
1899         self.vapi.nat64_set_timeouts(udp=300, tcp_established=7440,
1900                                      tcp_transitory=240, icmp=60)
1901
1902         interfaces = self.vapi.nat64_interface_dump()
1903         for intf in interfaces:
1904             self.vapi.nat64_add_del_interface(is_add=0, flags=intf.flags,
1905                                               sw_if_index=intf.sw_if_index)
1906
1907         bib = self.vapi.nat64_bib_dump(proto=255)
1908         for bibe in bib:
1909             if bibe.flags & self.config_flags.NAT_IS_STATIC:
1910                 self.vapi.nat64_add_del_static_bib(i_addr=bibe.i_addr,
1911                                                    o_addr=bibe.o_addr,
1912                                                    i_port=bibe.i_port,
1913                                                    o_port=bibe.o_port,
1914                                                    proto=bibe.proto,
1915                                                    vrf_id=bibe.vrf_id,
1916                                                    is_add=0)
1917
1918         adresses = self.vapi.nat64_pool_addr_dump()
1919         for addr in adresses:
1920             self.vapi.nat64_add_del_pool_addr_range(start_addr=addr.address,
1921                                                     end_addr=addr.address,
1922                                                     vrf_id=addr.vrf_id,
1923                                                     is_add=0)
1924
1925         prefixes = self.vapi.nat64_prefix_dump()
1926         for prefix in prefixes:
1927             self.vapi.nat64_add_del_prefix(prefix=str(prefix.prefix),
1928                                            vrf_id=prefix.vrf_id, is_add=0)
1929
1930         bibs = self.statistics.get_counter('/nat64/total-bibs')
1931         self.assertEqual(bibs[0][0], 0)
1932         sessions = self.statistics.get_counter('/nat64/total-sessions')
1933         self.assertEqual(sessions[0][0], 0)
1934
1935
1936 if __name__ == '__main__':
1937     unittest.main(testRunner=VppTestRunner)