tests: make tests less make dependent
[vpp.git] / test / test_nat64.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import random
5 import socket
6 import struct
7 import unittest
8 from io import BytesIO
9
10 import scapy.compat
11 from config import config
12 from framework import tag_fixme_vpp_workers
13 from framework import VppTestCase, VppTestRunner
14 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
15 from scapy.data import IP_PROTOS
16 from scapy.layers.inet import IP, TCP, UDP, ICMP
17 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
18 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
19 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
20     ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
21 from scapy.layers.l2 import Ether, GRE
22 from scapy.packet import Raw
23 from syslog_rfc5424_parser import SyslogMessage, ParseError
24 from syslog_rfc5424_parser.constants import SyslogSeverity
25 from util import ppc, ppp
26 from vpp_papi import VppEnum
27
28
29 @tag_fixme_vpp_workers
30 class TestNAT64(VppTestCase):
31     """ NAT64 Test Cases """
32
33     @property
34     def SYSLOG_SEVERITY(self):
35         return VppEnum.vl_api_syslog_severity_t
36
37     @property
38     def config_flags(self):
39         return VppEnum.vl_api_nat_config_flags_t
40
41     @classmethod
42     def setUpClass(cls):
43         super(TestNAT64, cls).setUpClass()
44
45         cls.tcp_port_in = 6303
46         cls.tcp_port_out = 6303
47         cls.udp_port_in = 6304
48         cls.udp_port_out = 6304
49         cls.icmp_id_in = 6305
50         cls.icmp_id_out = 6305
51         cls.tcp_external_port = 80
52         cls.nat_addr = '10.0.0.3'
53         cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
54         cls.vrf1_id = 10
55         cls.vrf1_nat_addr = '10.0.10.3'
56         cls.ipfix_src_port = 4739
57         cls.ipfix_domain_id = 1
58
59         cls.create_pg_interfaces(range(6))
60         cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
61         cls.ip6_interfaces.append(cls.pg_interfaces[2])
62         cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
63
64         cls.vapi.ip_table_add_del(is_add=1,
65                                   table={'table_id': cls.vrf1_id,
66                                          'is_ip6': 1})
67
68         cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
69
70         cls.pg0.generate_remote_hosts(2)
71
72         for i in cls.ip6_interfaces:
73             i.admin_up()
74             i.config_ip6()
75             i.configure_ipv6_neighbors()
76
77         for i in cls.ip4_interfaces:
78             i.admin_up()
79             i.config_ip4()
80             i.resolve_arp()
81
82         cls.pg3.admin_up()
83         cls.pg3.config_ip4()
84         cls.pg3.resolve_arp()
85         cls.pg3.config_ip6()
86         cls.pg3.configure_ipv6_neighbors()
87
88         cls.pg5.admin_up()
89         cls.pg5.config_ip6()
90
91     @classmethod
92     def tearDownClass(cls):
93         super(TestNAT64, cls).tearDownClass()
94
95     def setUp(self):
96         super(TestNAT64, self).setUp()
97         self.vapi.nat64_plugin_enable_disable(enable=1,
98                                               bib_buckets=128, st_buckets=256)
99
100     def tearDown(self):
101         super(TestNAT64, self).tearDown()
102         if not self.vpp_dead:
103             self.vapi.nat64_plugin_enable_disable(enable=0)
104
105     def show_commands_at_teardown(self):
106         self.logger.info(self.vapi.cli("show nat64 pool"))
107         self.logger.info(self.vapi.cli("show nat64 interfaces"))
108         self.logger.info(self.vapi.cli("show nat64 prefix"))
109         self.logger.info(self.vapi.cli("show nat64 bib all"))
110         self.logger.info(self.vapi.cli("show nat64 session table all"))
111
112     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
113         """
114         Create IPv6 packet stream for inside network
115
116         :param in_if: Inside interface
117         :param out_if: Outside interface
118         :param ttl: Hop Limit of generated packets
119         :param pref: NAT64 prefix
120         :param plen: NAT64 prefix length
121         """
122         pkts = []
123         if pref is None:
124             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
125         else:
126             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
127
128         # TCP
129         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
130              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
131              TCP(sport=self.tcp_port_in, dport=20))
132         pkts.append(p)
133
134         # UDP
135         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
136              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
137              UDP(sport=self.udp_port_in, dport=20))
138         pkts.append(p)
139
140         # ICMP
141         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
142              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
143              ICMPv6EchoRequest(id=self.icmp_id_in))
144         pkts.append(p)
145
146         return pkts
147
148     def create_stream_out(self, out_if, dst_ip=None, ttl=64,
149                           use_inside_ports=False):
150         """
151         Create packet stream for outside network
152
153         :param out_if: Outside interface
154         :param dst_ip: Destination IP address (Default use global NAT address)
155         :param ttl: TTL of generated packets
156         :param use_inside_ports: Use inside NAT ports as destination ports
157                instead of outside ports
158         """
159         if dst_ip is None:
160             dst_ip = self.nat_addr
161         if not use_inside_ports:
162             tcp_port = self.tcp_port_out
163             udp_port = self.udp_port_out
164             icmp_id = self.icmp_id_out
165         else:
166             tcp_port = self.tcp_port_in
167             udp_port = self.udp_port_in
168             icmp_id = self.icmp_id_in
169         pkts = []
170         # TCP
171         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
172              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
173              TCP(dport=tcp_port, sport=20))
174         pkts.extend([p, p])
175
176         # UDP
177         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
178              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
179              UDP(dport=udp_port, sport=20))
180         pkts.append(p)
181
182         # ICMP
183         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
184              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
185              ICMP(id=icmp_id, type='echo-reply'))
186         pkts.append(p)
187
188         return pkts
189
190     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
191                            dst_ip=None, is_ip6=False, ignore_port=False):
192         """
193         Verify captured packets on outside network
194
195         :param capture: Captured packets
196         :param nat_ip: Translated IP address (Default use global NAT address)
197         :param same_port: Source port number is not translated (Default False)
198         :param dst_ip: Destination IP address (Default do not verify)
199         :param is_ip6: If L3 protocol is IPv6 (Default False)
200         """
201         if is_ip6:
202             IP46 = IPv6
203             ICMP46 = ICMPv6EchoRequest
204         else:
205             IP46 = IP
206             ICMP46 = ICMP
207         if nat_ip is None:
208             nat_ip = self.nat_addr
209         for packet in capture:
210             try:
211                 if not is_ip6:
212                     self.assert_packet_checksums_valid(packet)
213                 self.assertEqual(packet[IP46].src, nat_ip)
214                 if dst_ip is not None:
215                     self.assertEqual(packet[IP46].dst, dst_ip)
216                 if packet.haslayer(TCP):
217                     if not ignore_port:
218                         if same_port:
219                             self.assertEqual(
220                                 packet[TCP].sport, self.tcp_port_in)
221                         else:
222                             self.assertNotEqual(
223                                 packet[TCP].sport, self.tcp_port_in)
224                     self.tcp_port_out = packet[TCP].sport
225                     self.assert_packet_checksums_valid(packet)
226                 elif packet.haslayer(UDP):
227                     if not ignore_port:
228                         if same_port:
229                             self.assertEqual(
230                                 packet[UDP].sport, self.udp_port_in)
231                         else:
232                             self.assertNotEqual(
233                                 packet[UDP].sport, self.udp_port_in)
234                     self.udp_port_out = packet[UDP].sport
235                 else:
236                     if not ignore_port:
237                         if same_port:
238                             self.assertEqual(
239                                 packet[ICMP46].id, self.icmp_id_in)
240                         else:
241                             self.assertNotEqual(
242                                 packet[ICMP46].id, self.icmp_id_in)
243                     self.icmp_id_out = packet[ICMP46].id
244                     self.assert_packet_checksums_valid(packet)
245             except:
246                 self.logger.error(ppp("Unexpected or invalid packet "
247                                       "(outside network):", packet))
248                 raise
249
250     def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
251         """
252         Verify captured IPv6 packets on inside network
253
254         :param capture: Captured packets
255         :param src_ip: Source IP
256         :param dst_ip: Destination IP address
257         """
258         for packet in capture:
259             try:
260                 self.assertEqual(packet[IPv6].src, src_ip)
261                 self.assertEqual(packet[IPv6].dst, dst_ip)
262                 self.assert_packet_checksums_valid(packet)
263                 if packet.haslayer(TCP):
264                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
265                 elif packet.haslayer(UDP):
266                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
267                 else:
268                     self.assertEqual(packet[ICMPv6EchoReply].id,
269                                      self.icmp_id_in)
270             except:
271                 self.logger.error(ppp("Unexpected or invalid packet "
272                                       "(inside network):", packet))
273                 raise
274
275     def create_stream_frag(self, src_if, dst, sport, dport, data,
276                            proto=IP_PROTOS.tcp, echo_reply=False):
277         """
278         Create fragmented packet stream
279
280         :param src_if: Source interface
281         :param dst: Destination IPv4 address
282         :param sport: Source port
283         :param dport: Destination port
284         :param data: Payload data
285         :param proto: protocol (TCP, UDP, ICMP)
286         :param echo_reply: use echo_reply if protocol is ICMP
287         :returns: Fragments
288         """
289         if proto == IP_PROTOS.tcp:
290             p = (IP(src=src_if.remote_ip4, dst=dst) /
291                  TCP(sport=sport, dport=dport) /
292                  Raw(data))
293             p = p.__class__(scapy.compat.raw(p))
294             chksum = p[TCP].chksum
295             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
296         elif proto == IP_PROTOS.udp:
297             proto_header = UDP(sport=sport, dport=dport)
298         elif proto == IP_PROTOS.icmp:
299             if not echo_reply:
300                 proto_header = ICMP(id=sport, type='echo-request')
301             else:
302                 proto_header = ICMP(id=sport, type='echo-reply')
303         else:
304             raise Exception("Unsupported protocol")
305         id = random.randint(0, 65535)
306         pkts = []
307         if proto == IP_PROTOS.tcp:
308             raw = Raw(data[0:4])
309         else:
310             raw = Raw(data[0:16])
311         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
312              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
313              proto_header /
314              raw)
315         pkts.append(p)
316         if proto == IP_PROTOS.tcp:
317             raw = Raw(data[4:20])
318         else:
319             raw = Raw(data[16:32])
320         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
321              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
322                 proto=proto) /
323              raw)
324         pkts.append(p)
325         if proto == IP_PROTOS.tcp:
326             raw = Raw(data[20:])
327         else:
328             raw = Raw(data[32:])
329         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
330              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
331                 id=id) /
332              raw)
333         pkts.append(p)
334         return pkts
335
336     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
337                                pref=None, plen=0, frag_size=128):
338         """
339         Create fragmented packet stream
340
341         :param src_if: Source interface
342         :param dst: Destination IPv4 address
343         :param sport: Source TCP port
344         :param dport: Destination TCP port
345         :param data: Payload data
346         :param pref: NAT64 prefix
347         :param plen: NAT64 prefix length
348         :param fragsize: size of fragments
349         :returns: Fragments
350         """
351         if pref is None:
352             dst_ip6 = ''.join(['64:ff9b::', dst])
353         else:
354             dst_ip6 = self.compose_ip6(dst, pref, plen)
355
356         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
357              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
358              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
359              TCP(sport=sport, dport=dport) /
360              Raw(data))
361
362         return fragment6(p, frag_size)
363
364     def reass_frags_and_verify(self, frags, src, dst):
365         """
366         Reassemble and verify fragmented packet
367
368         :param frags: Captured fragments
369         :param src: Source IPv4 address to verify
370         :param dst: Destination IPv4 address to verify
371
372         :returns: Reassembled IPv4 packet
373         """
374         buffer = BytesIO()
375         for p in frags:
376             self.assertEqual(p[IP].src, src)
377             self.assertEqual(p[IP].dst, dst)
378             self.assert_ip_checksum_valid(p)
379             buffer.seek(p[IP].frag * 8)
380             buffer.write(bytes(p[IP].payload))
381         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
382                 proto=frags[0][IP].proto)
383         if ip.proto == IP_PROTOS.tcp:
384             p = (ip / TCP(buffer.getvalue()))
385             self.logger.debug(ppp("Reassembled:", p))
386             self.assert_tcp_checksum_valid(p)
387         elif ip.proto == IP_PROTOS.udp:
388             p = (ip / UDP(buffer.getvalue()[:8]) /
389                  Raw(buffer.getvalue()[8:]))
390         elif ip.proto == IP_PROTOS.icmp:
391             p = (ip / ICMP(buffer.getvalue()))
392         return p
393
394     def reass_frags_and_verify_ip6(self, frags, src, dst):
395         """
396         Reassemble and verify fragmented packet
397
398         :param frags: Captured fragments
399         :param src: Source IPv6 address to verify
400         :param dst: Destination IPv6 address to verify
401
402         :returns: Reassembled IPv6 packet
403         """
404         buffer = BytesIO()
405         for p in frags:
406             self.assertEqual(p[IPv6].src, src)
407             self.assertEqual(p[IPv6].dst, dst)
408             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
409             buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
410         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
411                   nh=frags[0][IPv6ExtHdrFragment].nh)
412         if ip.nh == IP_PROTOS.tcp:
413             p = (ip / TCP(buffer.getvalue()))
414         elif ip.nh == IP_PROTOS.udp:
415             p = (ip / UDP(buffer.getvalue()))
416         self.logger.debug(ppp("Reassembled:", p))
417         self.assert_packet_checksums_valid(p)
418         return p
419
420     def verify_ipfix_max_bibs(self, data, limit):
421         """
422         Verify IPFIX maximum BIB entries exceeded event
423
424         :param data: Decoded IPFIX data records
425         :param limit: Number of maximum BIB entries that can be created.
426         """
427         self.assertEqual(1, len(data))
428         record = data[0]
429         # natEvent
430         self.assertEqual(scapy.compat.orb(record[230]), 13)
431         # natQuotaExceededEvent
432         self.assertEqual(struct.pack("I", 2), record[466])
433         # maxBIBEntries
434         self.assertEqual(struct.pack("I", limit), record[472])
435
436     def verify_ipfix_bib(self, data, is_create, src_addr):
437         """
438         Verify IPFIX NAT64 BIB create and delete events
439
440         :param data: Decoded IPFIX data records
441         :param is_create: Create event if nonzero value otherwise delete event
442         :param src_addr: IPv6 source address
443         """
444         self.assertEqual(1, len(data))
445         record = data[0]
446         # natEvent
447         if is_create:
448             self.assertEqual(scapy.compat.orb(record[230]), 10)
449         else:
450             self.assertEqual(scapy.compat.orb(record[230]), 11)
451         # sourceIPv6Address
452         self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
453         # postNATSourceIPv4Address
454         self.assertEqual(self.nat_addr_n, record[225])
455         # protocolIdentifier
456         self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
457         # ingressVRFID
458         self.assertEqual(struct.pack("!I", 0), record[234])
459         # sourceTransportPort
460         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
461         # postNAPTSourceTransportPort
462         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
463
464     def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
465                                dst_port):
466         """
467         Verify IPFIX NAT64 session create and delete events
468
469         :param data: Decoded IPFIX data records
470         :param is_create: Create event if nonzero value otherwise delete event
471         :param src_addr: IPv6 source address
472         :param dst_addr: IPv4 destination address
473         :param dst_port: destination TCP port
474         """
475         self.assertEqual(1, len(data))
476         record = data[0]
477         # natEvent
478         if is_create:
479             self.assertEqual(scapy.compat.orb(record[230]), 6)
480         else:
481             self.assertEqual(scapy.compat.orb(record[230]), 7)
482         # sourceIPv6Address
483         self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
484         # destinationIPv6Address
485         self.assertEqual(socket.inet_pton(socket.AF_INET6,
486                                           self.compose_ip6(dst_addr,
487                                                            '64:ff9b::',
488                                                            96)),
489                          record[28])
490         # postNATSourceIPv4Address
491         self.assertEqual(self.nat_addr_n, record[225])
492         # postNATDestinationIPv4Address
493         self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
494                          record[226])
495         # protocolIdentifier
496         self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
497         # ingressVRFID
498         self.assertEqual(struct.pack("!I", 0), record[234])
499         # sourceTransportPort
500         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
501         # postNAPTSourceTransportPort
502         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
503         # destinationTransportPort
504         self.assertEqual(struct.pack("!H", dst_port), record[11])
505         # postNAPTDestinationTransportPort
506         self.assertEqual(struct.pack("!H", dst_port), record[228])
507
508     def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
509         message = data.decode('utf-8')
510         try:
511             message = SyslogMessage.parse(message)
512         except ParseError as e:
513             self.logger.error(e)
514             raise
515         else:
516             self.assertEqual(message.severity, SyslogSeverity.info)
517             self.assertEqual(message.appname, 'NAT')
518             self.assertEqual(message.msgid, 'SADD' if is_add else 'SDEL')
519             sd_params = message.sd.get('nsess')
520             self.assertTrue(sd_params is not None)
521             if is_ip6:
522                 self.assertEqual(sd_params.get('IATYP'), 'IPv6')
523                 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip6)
524             else:
525                 self.assertEqual(sd_params.get('IATYP'), 'IPv4')
526                 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
527                 self.assertTrue(sd_params.get('SSUBIX') is not None)
528             self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
529             self.assertEqual(sd_params.get('XATYP'), 'IPv4')
530             self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
531             self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
532             self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
533             self.assertEqual(sd_params.get('SVLAN'), '0')
534             self.assertEqual(sd_params.get('XDADDR'), self.pg1.remote_ip4)
535             self.assertEqual(sd_params.get('XDPORT'),
536                              "%d" % self.tcp_external_port)
537
538     def compose_ip6(self, ip4, pref, plen):
539         """
540         Compose IPv4-embedded IPv6 addresses
541
542         :param ip4: IPv4 address
543         :param pref: IPv6 prefix
544         :param plen: IPv6 prefix length
545         :returns: IPv4-embedded IPv6 addresses
546         """
547         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
548         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
549         if plen == 32:
550             pref_n[4] = ip4_n[0]
551             pref_n[5] = ip4_n[1]
552             pref_n[6] = ip4_n[2]
553             pref_n[7] = ip4_n[3]
554         elif plen == 40:
555             pref_n[5] = ip4_n[0]
556             pref_n[6] = ip4_n[1]
557             pref_n[7] = ip4_n[2]
558             pref_n[9] = ip4_n[3]
559         elif plen == 48:
560             pref_n[6] = ip4_n[0]
561             pref_n[7] = ip4_n[1]
562             pref_n[9] = ip4_n[2]
563             pref_n[10] = ip4_n[3]
564         elif plen == 56:
565             pref_n[7] = ip4_n[0]
566             pref_n[9] = ip4_n[1]
567             pref_n[10] = ip4_n[2]
568             pref_n[11] = ip4_n[3]
569         elif plen == 64:
570             pref_n[9] = ip4_n[0]
571             pref_n[10] = ip4_n[1]
572             pref_n[11] = ip4_n[2]
573             pref_n[12] = ip4_n[3]
574         elif plen == 96:
575             pref_n[12] = ip4_n[0]
576             pref_n[13] = ip4_n[1]
577             pref_n[14] = ip4_n[2]
578             pref_n[15] = ip4_n[3]
579         packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
580         return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
581
582     def verify_ipfix_max_sessions(self, data, limit):
583         """
584         Verify IPFIX maximum session entries exceeded event
585
586         :param data: Decoded IPFIX data records
587         :param limit: Number of maximum session entries that can be created.
588         """
589         self.assertEqual(1, len(data))
590         record = data[0]
591         # natEvent
592         self.assertEqual(scapy.compat.orb(record[230]), 13)
593         # natQuotaExceededEvent
594         self.assertEqual(struct.pack("I", 1), record[466])
595         # maxSessionEntries
596         self.assertEqual(struct.pack("I", limit), record[471])
597
598     def test_nat64_inside_interface_handles_neighbor_advertisement(self):
599         """ NAT64 inside interface handles Neighbor Advertisement """
600
601         flags = self.config_flags.NAT_IS_INSIDE
602         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
603                                           sw_if_index=self.pg5.sw_if_index)
604
605         # Try to send ping
606         ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
607                 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
608                 ICMPv6EchoRequest())
609         pkts = [ping]
610         self.pg5.add_stream(pkts)
611         self.pg_enable_capture(self.pg_interfaces)
612         self.pg_start()
613
614         # Wait for Neighbor Solicitation
615         capture = self.pg5.get_capture(len(pkts))
616         packet = capture[0]
617         try:
618             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
619             self.assertEqual(packet.haslayer(ICMPv6ND_NS), 1)
620             tgt = packet[ICMPv6ND_NS].tgt
621         except:
622             self.logger.error(ppp("Unexpected or invalid packet:", packet))
623             raise
624
625         # Send Neighbor Advertisement
626         p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
627              IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
628              ICMPv6ND_NA(tgt=tgt) /
629              ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
630         pkts = [p]
631         self.pg5.add_stream(pkts)
632         self.pg_enable_capture(self.pg_interfaces)
633         self.pg_start()
634
635         # Try to send ping again
636         pkts = [ping]
637         self.pg5.add_stream(pkts)
638         self.pg_enable_capture(self.pg_interfaces)
639         self.pg_start()
640
641         # Wait for ping reply
642         capture = self.pg5.get_capture(len(pkts))
643         packet = capture[0]
644         try:
645             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
646             self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
647             self.assertEqual(packet.haslayer(ICMPv6EchoReply), 1)
648         except:
649             self.logger.error(ppp("Unexpected or invalid packet:", packet))
650             raise
651
652     def test_pool(self):
653         """ Add/delete address to NAT64 pool """
654         nat_addr = '1.2.3.4'
655
656         self.vapi.nat64_add_del_pool_addr_range(start_addr=nat_addr,
657                                                 end_addr=nat_addr,
658                                                 vrf_id=0xFFFFFFFF, is_add=1)
659
660         addresses = self.vapi.nat64_pool_addr_dump()
661         self.assertEqual(len(addresses), 1)
662         self.assertEqual(str(addresses[0].address), nat_addr)
663
664         self.vapi.nat64_add_del_pool_addr_range(start_addr=nat_addr,
665                                                 end_addr=nat_addr,
666                                                 vrf_id=0xFFFFFFFF, is_add=0)
667
668         addresses = self.vapi.nat64_pool_addr_dump()
669         self.assertEqual(len(addresses), 0)
670
671     def test_interface(self):
672         """ Enable/disable NAT64 feature on the interface """
673         flags = self.config_flags.NAT_IS_INSIDE
674         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
675                                           sw_if_index=self.pg0.sw_if_index)
676         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
677                                           sw_if_index=self.pg1.sw_if_index)
678
679         interfaces = self.vapi.nat64_interface_dump()
680         self.assertEqual(len(interfaces), 2)
681         pg0_found = False
682         pg1_found = False
683         for intf in interfaces:
684             if intf.sw_if_index == self.pg0.sw_if_index:
685                 self.assertEqual(intf.flags, self.config_flags.NAT_IS_INSIDE)
686                 pg0_found = True
687             elif intf.sw_if_index == self.pg1.sw_if_index:
688                 self.assertEqual(intf.flags, self.config_flags.NAT_IS_OUTSIDE)
689                 pg1_found = True
690         self.assertTrue(pg0_found)
691         self.assertTrue(pg1_found)
692
693         features = self.vapi.cli("show interface features pg0")
694         self.assertIn('nat64-in2out', features)
695         features = self.vapi.cli("show interface features pg1")
696         self.assertIn('nat64-out2in', features)
697
698         self.vapi.nat64_add_del_interface(is_add=0, flags=flags,
699                                           sw_if_index=self.pg0.sw_if_index)
700         self.vapi.nat64_add_del_interface(is_add=0, flags=flags,
701                                           sw_if_index=self.pg1.sw_if_index)
702
703         interfaces = self.vapi.nat64_interface_dump()
704         self.assertEqual(len(interfaces), 0)
705
706     def test_static_bib(self):
707         """ Add/delete static BIB entry """
708         in_addr = '2001:db8:85a3::8a2e:370:7334'
709         out_addr = '10.1.1.3'
710         in_port = 1234
711         out_port = 5678
712         proto = IP_PROTOS.tcp
713
714         self.vapi.nat64_add_del_static_bib(i_addr=in_addr, o_addr=out_addr,
715                                            i_port=in_port, o_port=out_port,
716                                            proto=proto, vrf_id=0, is_add=1)
717         bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
718         static_bib_num = 0
719         for bibe in bib:
720             if bibe.flags & self.config_flags.NAT_IS_STATIC:
721                 static_bib_num += 1
722                 self.assertEqual(str(bibe.i_addr), in_addr)
723                 self.assertEqual(str(bibe.o_addr), out_addr)
724                 self.assertEqual(bibe.i_port, in_port)
725                 self.assertEqual(bibe.o_port, out_port)
726         self.assertEqual(static_bib_num, 1)
727         bibs = self.statistics.get_counter('/nat64/total-bibs')
728         self.assertEqual(bibs[0][0], 1)
729
730         self.vapi.nat64_add_del_static_bib(i_addr=in_addr, o_addr=out_addr,
731                                            i_port=in_port, o_port=out_port,
732                                            proto=proto, vrf_id=0, is_add=0)
733         bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
734         static_bib_num = 0
735         for bibe in bib:
736             if bibe.flags & self.config_flags.NAT_IS_STATIC:
737                 static_bib_num += 1
738         self.assertEqual(static_bib_num, 0)
739         bibs = self.statistics.get_counter('/nat64/total-bibs')
740         self.assertEqual(bibs[0][0], 0)
741
742     def test_set_timeouts(self):
743         """ Set NAT64 timeouts """
744         # verify default values
745         timeouts = self.vapi.nat64_get_timeouts()
746         self.assertEqual(timeouts.udp, 300)
747         self.assertEqual(timeouts.icmp, 60)
748         self.assertEqual(timeouts.tcp_transitory, 240)
749         self.assertEqual(timeouts.tcp_established, 7440)
750
751         # set and verify custom values
752         self.vapi.nat64_set_timeouts(udp=200, tcp_established=7450,
753                                      tcp_transitory=250, icmp=30)
754         timeouts = self.vapi.nat64_get_timeouts()
755         self.assertEqual(timeouts.udp, 200)
756         self.assertEqual(timeouts.icmp, 30)
757         self.assertEqual(timeouts.tcp_transitory, 250)
758         self.assertEqual(timeouts.tcp_established, 7450)
759
760     def test_dynamic(self):
761         """ NAT64 dynamic translation test """
762         self.tcp_port_in = 6303
763         self.udp_port_in = 6304
764         self.icmp_id_in = 6305
765
766         ses_num_start = self.nat64_get_ses_num()
767
768         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
769                                                 end_addr=self.nat_addr,
770                                                 vrf_id=0xFFFFFFFF,
771                                                 is_add=1)
772         flags = self.config_flags.NAT_IS_INSIDE
773         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
774                                           sw_if_index=self.pg0.sw_if_index)
775         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
776                                           sw_if_index=self.pg1.sw_if_index)
777
778         # in2out
779         tcpn = self.statistics.get_counter('/nat64/in2out/tcp')[0]
780         udpn = self.statistics.get_counter('/nat64/in2out/udp')[0]
781         icmpn = self.statistics.get_counter('/nat64/in2out/icmp')[0]
782         drops = self.statistics.get_counter('/nat64/in2out/drops')[0]
783
784         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
785         self.pg0.add_stream(pkts)
786         self.pg_enable_capture(self.pg_interfaces)
787         self.pg_start()
788         capture = self.pg1.get_capture(len(pkts))
789         self.verify_capture_out(capture, nat_ip=self.nat_addr,
790                                 dst_ip=self.pg1.remote_ip4)
791
792         if_idx = self.pg0.sw_if_index
793         cnt = self.statistics.get_counter('/nat64/in2out/tcp')[0]
794         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 1)
795         cnt = self.statistics.get_counter('/nat64/in2out/udp')[0]
796         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
797         cnt = self.statistics.get_counter('/nat64/in2out/icmp')[0]
798         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
799         cnt = self.statistics.get_counter('/nat64/in2out/drops')[0]
800         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
801
802         # out2in
803         tcpn = self.statistics.get_counter('/nat64/out2in/tcp')[0]
804         udpn = self.statistics.get_counter('/nat64/out2in/udp')[0]
805         icmpn = self.statistics.get_counter('/nat64/out2in/icmp')[0]
806         drops = self.statistics.get_counter('/nat64/out2in/drops')[0]
807
808         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
809         self.pg1.add_stream(pkts)
810         self.pg_enable_capture(self.pg_interfaces)
811         self.pg_start()
812         capture = self.pg0.get_capture(len(pkts))
813         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
814         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
815
816         if_idx = self.pg1.sw_if_index
817         cnt = self.statistics.get_counter('/nat64/out2in/tcp')[0]
818         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
819         cnt = self.statistics.get_counter('/nat64/out2in/udp')[0]
820         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
821         cnt = self.statistics.get_counter('/nat64/out2in/icmp')[0]
822         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
823         cnt = self.statistics.get_counter('/nat64/out2in/drops')[0]
824         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
825
826         bibs = self.statistics.get_counter('/nat64/total-bibs')
827         self.assertEqual(bibs[0][0], 3)
828         sessions = self.statistics.get_counter('/nat64/total-sessions')
829         self.assertEqual(sessions[0][0], 3)
830
831         # in2out
832         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
833         self.pg0.add_stream(pkts)
834         self.pg_enable_capture(self.pg_interfaces)
835         self.pg_start()
836         capture = self.pg1.get_capture(len(pkts))
837         self.verify_capture_out(capture, nat_ip=self.nat_addr,
838                                 dst_ip=self.pg1.remote_ip4)
839
840         # out2in
841         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
842         self.pg1.add_stream(pkts)
843         self.pg_enable_capture(self.pg_interfaces)
844         self.pg_start()
845         capture = self.pg0.get_capture(len(pkts))
846         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
847
848         ses_num_end = self.nat64_get_ses_num()
849
850         self.assertEqual(ses_num_end - ses_num_start, 3)
851
852         # tenant with specific VRF
853         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.vrf1_nat_addr,
854                                                 end_addr=self.vrf1_nat_addr,
855                                                 vrf_id=self.vrf1_id, is_add=1)
856         flags = self.config_flags.NAT_IS_INSIDE
857         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
858                                           sw_if_index=self.pg2.sw_if_index)
859
860         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
861         self.pg2.add_stream(pkts)
862         self.pg_enable_capture(self.pg_interfaces)
863         self.pg_start()
864         capture = self.pg1.get_capture(len(pkts))
865         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
866                                 dst_ip=self.pg1.remote_ip4)
867
868         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
869         self.pg1.add_stream(pkts)
870         self.pg_enable_capture(self.pg_interfaces)
871         self.pg_start()
872         capture = self.pg2.get_capture(len(pkts))
873         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
874
875     def test_static(self):
876         """ NAT64 static translation test """
877         self.tcp_port_in = 60303
878         self.udp_port_in = 60304
879         self.icmp_id_in = 60305
880         self.tcp_port_out = 60303
881         self.udp_port_out = 60304
882         self.icmp_id_out = 60305
883
884         ses_num_start = self.nat64_get_ses_num()
885
886         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
887                                                 end_addr=self.nat_addr,
888                                                 vrf_id=0xFFFFFFFF,
889                                                 is_add=1)
890         flags = self.config_flags.NAT_IS_INSIDE
891         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
892                                           sw_if_index=self.pg0.sw_if_index)
893         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
894                                           sw_if_index=self.pg1.sw_if_index)
895
896         self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
897                                            o_addr=self.nat_addr,
898                                            i_port=self.tcp_port_in,
899                                            o_port=self.tcp_port_out,
900                                            proto=IP_PROTOS.tcp, vrf_id=0,
901                                            is_add=1)
902         self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
903                                            o_addr=self.nat_addr,
904                                            i_port=self.udp_port_in,
905                                            o_port=self.udp_port_out,
906                                            proto=IP_PROTOS.udp, vrf_id=0,
907                                            is_add=1)
908         self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
909                                            o_addr=self.nat_addr,
910                                            i_port=self.icmp_id_in,
911                                            o_port=self.icmp_id_out,
912                                            proto=IP_PROTOS.icmp, vrf_id=0,
913                                            is_add=1)
914
915         # in2out
916         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
917         self.pg0.add_stream(pkts)
918         self.pg_enable_capture(self.pg_interfaces)
919         self.pg_start()
920         capture = self.pg1.get_capture(len(pkts))
921         self.verify_capture_out(capture, nat_ip=self.nat_addr,
922                                 dst_ip=self.pg1.remote_ip4, same_port=True)
923
924         # out2in
925         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
926         self.pg1.add_stream(pkts)
927         self.pg_enable_capture(self.pg_interfaces)
928         self.pg_start()
929         capture = self.pg0.get_capture(len(pkts))
930         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
931         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
932
933         ses_num_end = self.nat64_get_ses_num()
934
935         self.assertEqual(ses_num_end - ses_num_start, 3)
936
937     def test_session_timeout(self):
938         """ NAT64 session timeout """
939         self.icmp_id_in = 1234
940         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
941                                                 end_addr=self.nat_addr,
942                                                 vrf_id=0xFFFFFFFF,
943                                                 is_add=1)
944         flags = self.config_flags.NAT_IS_INSIDE
945         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
946                                           sw_if_index=self.pg0.sw_if_index)
947         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
948                                           sw_if_index=self.pg1.sw_if_index)
949         self.vapi.nat64_set_timeouts(udp=300, tcp_established=5,
950                                      tcp_transitory=5,
951                                      icmp=5)
952
953         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
954         self.pg0.add_stream(pkts)
955         self.pg_enable_capture(self.pg_interfaces)
956         self.pg_start()
957         capture = self.pg1.get_capture(len(pkts))
958
959         ses_num_before_timeout = self.nat64_get_ses_num()
960
961         self.virtual_sleep(15)
962
963         # ICMP and TCP session after timeout
964         ses_num_after_timeout = self.nat64_get_ses_num()
965         self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
966
967     def test_icmp_error(self):
968         """ NAT64 ICMP Error message translation """
969         self.tcp_port_in = 6303
970         self.udp_port_in = 6304
971         self.icmp_id_in = 6305
972
973         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
974                                                 end_addr=self.nat_addr,
975                                                 vrf_id=0xFFFFFFFF,
976                                                 is_add=1)
977         flags = self.config_flags.NAT_IS_INSIDE
978         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
979                                           sw_if_index=self.pg0.sw_if_index)
980         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
981                                           sw_if_index=self.pg1.sw_if_index)
982
983         # send some packets to create sessions
984         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
985         self.pg0.add_stream(pkts)
986         self.pg_enable_capture(self.pg_interfaces)
987         self.pg_start()
988         capture_ip4 = self.pg1.get_capture(len(pkts))
989         self.verify_capture_out(capture_ip4,
990                                 nat_ip=self.nat_addr,
991                                 dst_ip=self.pg1.remote_ip4)
992
993         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
994         self.pg1.add_stream(pkts)
995         self.pg_enable_capture(self.pg_interfaces)
996         self.pg_start()
997         capture_ip6 = self.pg0.get_capture(len(pkts))
998         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
999         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
1000                                    self.pg0.remote_ip6)
1001
1002         # in2out
1003         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1004                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
1005                 ICMPv6DestUnreach(code=1) /
1006                 packet[IPv6] for packet in capture_ip6]
1007         self.pg0.add_stream(pkts)
1008         self.pg_enable_capture(self.pg_interfaces)
1009         self.pg_start()
1010         capture = self.pg1.get_capture(len(pkts))
1011         for packet in capture:
1012             try:
1013                 self.assertEqual(packet[IP].src, self.nat_addr)
1014                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1015                 self.assertEqual(packet[ICMP].type, 3)
1016                 self.assertEqual(packet[ICMP].code, 13)
1017                 inner = packet[IPerror]
1018                 self.assertEqual(inner.src, self.pg1.remote_ip4)
1019                 self.assertEqual(inner.dst, self.nat_addr)
1020                 self.assert_packet_checksums_valid(packet)
1021                 if inner.haslayer(TCPerror):
1022                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
1023                 elif inner.haslayer(UDPerror):
1024                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
1025                 else:
1026                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
1027             except:
1028                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1029                 raise
1030
1031         # out2in
1032         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1033                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1034                 ICMP(type=3, code=13) /
1035                 packet[IP] for packet in capture_ip4]
1036         self.pg1.add_stream(pkts)
1037         self.pg_enable_capture(self.pg_interfaces)
1038         self.pg_start()
1039         capture = self.pg0.get_capture(len(pkts))
1040         for packet in capture:
1041             try:
1042                 self.assertEqual(packet[IPv6].src, ip.src)
1043                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1044                 icmp = packet[ICMPv6DestUnreach]
1045                 self.assertEqual(icmp.code, 1)
1046                 inner = icmp[IPerror6]
1047                 self.assertEqual(inner.src, self.pg0.remote_ip6)
1048                 self.assertEqual(inner.dst, ip.src)
1049                 self.assert_icmpv6_checksum_valid(packet)
1050                 if inner.haslayer(TCPerror):
1051                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
1052                 elif inner.haslayer(UDPerror):
1053                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
1054                 else:
1055                     self.assertEqual(inner[ICMPv6EchoRequest].id,
1056                                      self.icmp_id_in)
1057             except:
1058                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1059                 raise
1060
1061     def test_hairpinning(self):
1062         """ NAT64 hairpinning """
1063
1064         client = self.pg0.remote_hosts[0]
1065         server = self.pg0.remote_hosts[1]
1066         server_tcp_in_port = 22
1067         server_tcp_out_port = 4022
1068         server_udp_in_port = 23
1069         server_udp_out_port = 4023
1070         client_tcp_in_port = 1234
1071         client_udp_in_port = 1235
1072         client_tcp_out_port = 0
1073         client_udp_out_port = 0
1074         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
1075         nat_addr_ip6 = ip.src
1076
1077         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1078                                                 end_addr=self.nat_addr,
1079                                                 vrf_id=0xFFFFFFFF,
1080                                                 is_add=1)
1081         flags = self.config_flags.NAT_IS_INSIDE
1082         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1083                                           sw_if_index=self.pg0.sw_if_index)
1084         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1085                                           sw_if_index=self.pg1.sw_if_index)
1086
1087         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1088                                            o_addr=self.nat_addr,
1089                                            i_port=server_tcp_in_port,
1090                                            o_port=server_tcp_out_port,
1091                                            proto=IP_PROTOS.tcp, vrf_id=0,
1092                                            is_add=1)
1093         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1094                                            o_addr=self.nat_addr,
1095                                            i_port=server_udp_in_port,
1096                                            o_port=server_udp_out_port,
1097                                            proto=IP_PROTOS.udp, vrf_id=0,
1098                                            is_add=1)
1099
1100         # client to server
1101         pkts = []
1102         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1103              IPv6(src=client.ip6, dst=nat_addr_ip6) /
1104              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
1105         pkts.append(p)
1106         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1107              IPv6(src=client.ip6, dst=nat_addr_ip6) /
1108              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
1109         pkts.append(p)
1110         self.pg0.add_stream(pkts)
1111         self.pg_enable_capture(self.pg_interfaces)
1112         self.pg_start()
1113         capture = self.pg0.get_capture(len(pkts))
1114         for packet in capture:
1115             try:
1116                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1117                 self.assertEqual(packet[IPv6].dst, server.ip6)
1118                 self.assert_packet_checksums_valid(packet)
1119                 if packet.haslayer(TCP):
1120                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
1121                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
1122                     client_tcp_out_port = packet[TCP].sport
1123                 else:
1124                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
1125                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
1126                     client_udp_out_port = packet[UDP].sport
1127             except:
1128                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1129                 raise
1130
1131         # server to client
1132         pkts = []
1133         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1134              IPv6(src=server.ip6, dst=nat_addr_ip6) /
1135              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
1136         pkts.append(p)
1137         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1138              IPv6(src=server.ip6, dst=nat_addr_ip6) /
1139              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
1140         pkts.append(p)
1141         self.pg0.add_stream(pkts)
1142         self.pg_enable_capture(self.pg_interfaces)
1143         self.pg_start()
1144         capture = self.pg0.get_capture(len(pkts))
1145         for packet in capture:
1146             try:
1147                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1148                 self.assertEqual(packet[IPv6].dst, client.ip6)
1149                 self.assert_packet_checksums_valid(packet)
1150                 if packet.haslayer(TCP):
1151                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
1152                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
1153                 else:
1154                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
1155                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
1156             except:
1157                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1158                 raise
1159
1160         # ICMP error
1161         pkts = []
1162         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1163                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
1164                 ICMPv6DestUnreach(code=1) /
1165                 packet[IPv6] for packet in capture]
1166         self.pg0.add_stream(pkts)
1167         self.pg_enable_capture(self.pg_interfaces)
1168         self.pg_start()
1169         capture = self.pg0.get_capture(len(pkts))
1170         for packet in capture:
1171             try:
1172                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1173                 self.assertEqual(packet[IPv6].dst, server.ip6)
1174                 icmp = packet[ICMPv6DestUnreach]
1175                 self.assertEqual(icmp.code, 1)
1176                 inner = icmp[IPerror6]
1177                 self.assertEqual(inner.src, server.ip6)
1178                 self.assertEqual(inner.dst, nat_addr_ip6)
1179                 self.assert_packet_checksums_valid(packet)
1180                 if inner.haslayer(TCPerror):
1181                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
1182                     self.assertEqual(inner[TCPerror].dport,
1183                                      client_tcp_out_port)
1184                 else:
1185                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
1186                     self.assertEqual(inner[UDPerror].dport,
1187                                      client_udp_out_port)
1188             except:
1189                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1190                 raise
1191
1192     def test_prefix(self):
1193         """ NAT64 Network-Specific Prefix """
1194
1195         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1196                                                 end_addr=self.nat_addr,
1197                                                 vrf_id=0xFFFFFFFF,
1198                                                 is_add=1)
1199         flags = self.config_flags.NAT_IS_INSIDE
1200         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1201                                           sw_if_index=self.pg0.sw_if_index)
1202         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1203                                           sw_if_index=self.pg1.sw_if_index)
1204         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.vrf1_nat_addr,
1205                                                 end_addr=self.vrf1_nat_addr,
1206                                                 vrf_id=self.vrf1_id, is_add=1)
1207         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1208                                           sw_if_index=self.pg2.sw_if_index)
1209
1210         # Add global prefix
1211         global_pref64 = "2001:db8::"
1212         global_pref64_len = 32
1213         global_pref64_str = "{}/{}".format(global_pref64, global_pref64_len)
1214         self.vapi.nat64_add_del_prefix(prefix=global_pref64_str, vrf_id=0,
1215                                        is_add=1)
1216
1217         prefix = self.vapi.nat64_prefix_dump()
1218         self.assertEqual(len(prefix), 1)
1219         self.assertEqual(str(prefix[0].prefix), global_pref64_str)
1220         self.assertEqual(prefix[0].vrf_id, 0)
1221
1222         # Add tenant specific prefix
1223         vrf1_pref64 = "2001:db8:122:300::"
1224         vrf1_pref64_len = 56
1225         vrf1_pref64_str = "{}/{}".format(vrf1_pref64, vrf1_pref64_len)
1226         self.vapi.nat64_add_del_prefix(prefix=vrf1_pref64_str,
1227                                        vrf_id=self.vrf1_id, is_add=1)
1228
1229         prefix = self.vapi.nat64_prefix_dump()
1230         self.assertEqual(len(prefix), 2)
1231
1232         # Global prefix
1233         pkts = self.create_stream_in_ip6(self.pg0,
1234                                          self.pg1,
1235                                          pref=global_pref64,
1236                                          plen=global_pref64_len)
1237         self.pg0.add_stream(pkts)
1238         self.pg_enable_capture(self.pg_interfaces)
1239         self.pg_start()
1240         capture = self.pg1.get_capture(len(pkts))
1241         self.verify_capture_out(capture, nat_ip=self.nat_addr,
1242                                 dst_ip=self.pg1.remote_ip4)
1243
1244         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1245         self.pg1.add_stream(pkts)
1246         self.pg_enable_capture(self.pg_interfaces)
1247         self.pg_start()
1248         capture = self.pg0.get_capture(len(pkts))
1249         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
1250                                   global_pref64,
1251                                   global_pref64_len)
1252         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
1253
1254         # Tenant specific prefix
1255         pkts = self.create_stream_in_ip6(self.pg2,
1256                                          self.pg1,
1257                                          pref=vrf1_pref64,
1258                                          plen=vrf1_pref64_len)
1259         self.pg2.add_stream(pkts)
1260         self.pg_enable_capture(self.pg_interfaces)
1261         self.pg_start()
1262         capture = self.pg1.get_capture(len(pkts))
1263         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
1264                                 dst_ip=self.pg1.remote_ip4)
1265
1266         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
1267         self.pg1.add_stream(pkts)
1268         self.pg_enable_capture(self.pg_interfaces)
1269         self.pg_start()
1270         capture = self.pg2.get_capture(len(pkts))
1271         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
1272                                   vrf1_pref64,
1273                                   vrf1_pref64_len)
1274         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
1275
1276     def test_unknown_proto(self):
1277         """ NAT64 translate packet with unknown protocol """
1278
1279         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1280                                                 end_addr=self.nat_addr,
1281                                                 vrf_id=0xFFFFFFFF,
1282                                                 is_add=1)
1283         flags = self.config_flags.NAT_IS_INSIDE
1284         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1285                                           sw_if_index=self.pg0.sw_if_index)
1286         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1287                                           sw_if_index=self.pg1.sw_if_index)
1288         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1289
1290         # in2out
1291         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1292              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
1293              TCP(sport=self.tcp_port_in, dport=20))
1294         self.pg0.add_stream(p)
1295         self.pg_enable_capture(self.pg_interfaces)
1296         self.pg_start()
1297         p = self.pg1.get_capture(1)
1298
1299         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1300              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
1301              GRE() /
1302              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
1303              TCP(sport=1234, dport=1234))
1304         self.pg0.add_stream(p)
1305         self.pg_enable_capture(self.pg_interfaces)
1306         self.pg_start()
1307         p = self.pg1.get_capture(1)
1308         packet = p[0]
1309         try:
1310             self.assertEqual(packet[IP].src, self.nat_addr)
1311             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1312             self.assertEqual(packet.haslayer(GRE), 1)
1313             self.assert_packet_checksums_valid(packet)
1314         except:
1315             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1316             raise
1317
1318         # out2in
1319         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1320              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1321              GRE() /
1322              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
1323              TCP(sport=1234, dport=1234))
1324         self.pg1.add_stream(p)
1325         self.pg_enable_capture(self.pg_interfaces)
1326         self.pg_start()
1327         p = self.pg0.get_capture(1)
1328         packet = p[0]
1329         try:
1330             self.assertEqual(packet[IPv6].src, remote_ip6)
1331             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1332             self.assertEqual(packet[IPv6].nh, 47)
1333         except:
1334             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1335             raise
1336
1337     def test_hairpinning_unknown_proto(self):
1338         """ NAT64 translate packet with unknown protocol - hairpinning """
1339
1340         client = self.pg0.remote_hosts[0]
1341         server = self.pg0.remote_hosts[1]
1342         server_tcp_in_port = 22
1343         server_tcp_out_port = 4022
1344         client_tcp_in_port = 1234
1345         client_tcp_out_port = 1235
1346         server_nat_ip = "10.0.0.100"
1347         client_nat_ip = "10.0.0.110"
1348         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
1349         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
1350
1351         self.vapi.nat64_add_del_pool_addr_range(start_addr=server_nat_ip,
1352                                                 end_addr=client_nat_ip,
1353                                                 vrf_id=0xFFFFFFFF,
1354                                                 is_add=1)
1355         flags = self.config_flags.NAT_IS_INSIDE
1356         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1357                                           sw_if_index=self.pg0.sw_if_index)
1358         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1359                                           sw_if_index=self.pg1.sw_if_index)
1360
1361         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1362                                            o_addr=server_nat_ip,
1363                                            i_port=server_tcp_in_port,
1364                                            o_port=server_tcp_out_port,
1365                                            proto=IP_PROTOS.tcp, vrf_id=0,
1366                                            is_add=1)
1367
1368         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1369                                            o_addr=server_nat_ip, i_port=0,
1370                                            o_port=0,
1371                                            proto=IP_PROTOS.gre, vrf_id=0,
1372                                            is_add=1)
1373
1374         self.vapi.nat64_add_del_static_bib(i_addr=client.ip6n,
1375                                            o_addr=client_nat_ip,
1376                                            i_port=client_tcp_in_port,
1377                                            o_port=client_tcp_out_port,
1378                                            proto=IP_PROTOS.tcp, vrf_id=0,
1379                                            is_add=1)
1380
1381         # client to server
1382         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1383              IPv6(src=client.ip6, dst=server_nat_ip6) /
1384              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
1385         self.pg0.add_stream(p)
1386         self.pg_enable_capture(self.pg_interfaces)
1387         self.pg_start()
1388         p = self.pg0.get_capture(1)
1389
1390         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1391              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
1392              GRE() /
1393              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
1394              TCP(sport=1234, dport=1234))
1395         self.pg0.add_stream(p)
1396         self.pg_enable_capture(self.pg_interfaces)
1397         self.pg_start()
1398         p = self.pg0.get_capture(1)
1399         packet = p[0]
1400         try:
1401             self.assertEqual(packet[IPv6].src, client_nat_ip6)
1402             self.assertEqual(packet[IPv6].dst, server.ip6)
1403             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1404         except:
1405             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1406             raise
1407
1408         # server to client
1409         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1410              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
1411              GRE() /
1412              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
1413              TCP(sport=1234, dport=1234))
1414         self.pg0.add_stream(p)
1415         self.pg_enable_capture(self.pg_interfaces)
1416         self.pg_start()
1417         p = self.pg0.get_capture(1)
1418         packet = p[0]
1419         try:
1420             self.assertEqual(packet[IPv6].src, server_nat_ip6)
1421             self.assertEqual(packet[IPv6].dst, client.ip6)
1422             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1423         except:
1424             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1425             raise
1426
1427     def test_one_armed_nat64(self):
1428         """ One armed NAT64 """
1429         external_port = 0
1430         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
1431                                            '64:ff9b::',
1432                                            96)
1433
1434         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1435                                                 end_addr=self.nat_addr,
1436                                                 vrf_id=0xFFFFFFFF,
1437                                                 is_add=1)
1438         flags = self.config_flags.NAT_IS_INSIDE
1439         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1440                                           sw_if_index=self.pg3.sw_if_index)
1441         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1442                                           sw_if_index=self.pg3.sw_if_index)
1443
1444         # in2out
1445         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
1446              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
1447              TCP(sport=12345, dport=80))
1448         self.pg3.add_stream(p)
1449         self.pg_enable_capture(self.pg_interfaces)
1450         self.pg_start()
1451         capture = self.pg3.get_capture(1)
1452         p = capture[0]
1453         try:
1454             ip = p[IP]
1455             tcp = p[TCP]
1456             self.assertEqual(ip.src, self.nat_addr)
1457             self.assertEqual(ip.dst, self.pg3.remote_ip4)
1458             self.assertNotEqual(tcp.sport, 12345)
1459             external_port = tcp.sport
1460             self.assertEqual(tcp.dport, 80)
1461             self.assert_packet_checksums_valid(p)
1462         except:
1463             self.logger.error(ppp("Unexpected or invalid packet:", p))
1464             raise
1465
1466         # out2in
1467         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
1468              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
1469              TCP(sport=80, dport=external_port))
1470         self.pg3.add_stream(p)
1471         self.pg_enable_capture(self.pg_interfaces)
1472         self.pg_start()
1473         capture = self.pg3.get_capture(1)
1474         p = capture[0]
1475         try:
1476             ip = p[IPv6]
1477             tcp = p[TCP]
1478             self.assertEqual(ip.src, remote_host_ip6)
1479             self.assertEqual(ip.dst, self.pg3.remote_ip6)
1480             self.assertEqual(tcp.sport, 80)
1481             self.assertEqual(tcp.dport, 12345)
1482             self.assert_packet_checksums_valid(p)
1483         except:
1484             self.logger.error(ppp("Unexpected or invalid packet:", p))
1485             raise
1486
1487     def test_frag_in_order(self):
1488         """ NAT64 translate fragments arriving in order """
1489         self.tcp_port_in = random.randint(1025, 65535)
1490
1491         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1492                                                 end_addr=self.nat_addr,
1493                                                 vrf_id=0xFFFFFFFF,
1494                                                 is_add=1)
1495         flags = self.config_flags.NAT_IS_INSIDE
1496         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1497                                           sw_if_index=self.pg0.sw_if_index)
1498         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1499                                           sw_if_index=self.pg1.sw_if_index)
1500
1501         # in2out
1502         data = b'a' * 200
1503         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
1504                                            self.tcp_port_in, 20, data)
1505         self.pg0.add_stream(pkts)
1506         self.pg_enable_capture(self.pg_interfaces)
1507         self.pg_start()
1508         frags = self.pg1.get_capture(len(pkts))
1509         p = self.reass_frags_and_verify(frags,
1510                                         self.nat_addr,
1511                                         self.pg1.remote_ip4)
1512         self.assertEqual(p[TCP].dport, 20)
1513         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1514         self.tcp_port_out = p[TCP].sport
1515         self.assertEqual(data, p[Raw].load)
1516
1517         # out2in
1518         data = b"A" * 4 + b"b" * 16 + b"C" * 3
1519         pkts = self.create_stream_frag(self.pg1,
1520                                        self.nat_addr,
1521                                        20,
1522                                        self.tcp_port_out,
1523                                        data)
1524         self.pg1.add_stream(pkts)
1525         self.pg_enable_capture(self.pg_interfaces)
1526         self.pg_start()
1527         frags = self.pg0.get_capture(len(pkts))
1528         self.logger.debug(ppc("Captured:", frags))
1529         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1530         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1531         self.assertEqual(p[TCP].sport, 20)
1532         self.assertEqual(p[TCP].dport, self.tcp_port_in)
1533         self.assertEqual(data, p[Raw].load)
1534
1535     def test_reass_hairpinning(self):
1536         """ NAT64 fragments hairpinning """
1537         data = b'a' * 200
1538         server = self.pg0.remote_hosts[1]
1539         server_in_port = random.randint(1025, 65535)
1540         server_out_port = random.randint(1025, 65535)
1541         client_in_port = random.randint(1025, 65535)
1542         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
1543         nat_addr_ip6 = ip.src
1544
1545         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1546                                                 end_addr=self.nat_addr,
1547                                                 vrf_id=0xFFFFFFFF,
1548                                                 is_add=1)
1549         flags = self.config_flags.NAT_IS_INSIDE
1550         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1551                                           sw_if_index=self.pg0.sw_if_index)
1552         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1553                                           sw_if_index=self.pg1.sw_if_index)
1554
1555         # add static BIB entry for server
1556         self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1557                                            o_addr=self.nat_addr,
1558                                            i_port=server_in_port,
1559                                            o_port=server_out_port,
1560                                            proto=IP_PROTOS.tcp, vrf_id=0,
1561                                            is_add=1)
1562
1563         # send packet from host to server
1564         pkts = self.create_stream_frag_ip6(self.pg0,
1565                                            self.nat_addr,
1566                                            client_in_port,
1567                                            server_out_port,
1568                                            data)
1569         self.pg0.add_stream(pkts)
1570         self.pg_enable_capture(self.pg_interfaces)
1571         self.pg_start()
1572         frags = self.pg0.get_capture(len(pkts))
1573         self.logger.debug(ppc("Captured:", frags))
1574         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
1575         self.assertNotEqual(p[TCP].sport, client_in_port)
1576         self.assertEqual(p[TCP].dport, server_in_port)
1577         self.assertEqual(data, p[Raw].load)
1578
1579     def test_frag_out_of_order(self):
1580         """ NAT64 translate fragments arriving out of order """
1581         self.tcp_port_in = random.randint(1025, 65535)
1582
1583         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1584                                                 end_addr=self.nat_addr,
1585                                                 vrf_id=0xFFFFFFFF,
1586                                                 is_add=1)
1587         flags = self.config_flags.NAT_IS_INSIDE
1588         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1589                                           sw_if_index=self.pg0.sw_if_index)
1590         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1591                                           sw_if_index=self.pg1.sw_if_index)
1592
1593         # in2out
1594         data = b'a' * 200
1595         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
1596                                            self.tcp_port_in, 20, data)
1597         pkts.reverse()
1598         self.pg0.add_stream(pkts)
1599         self.pg_enable_capture(self.pg_interfaces)
1600         self.pg_start()
1601         frags = self.pg1.get_capture(len(pkts))
1602         p = self.reass_frags_and_verify(frags,
1603                                         self.nat_addr,
1604                                         self.pg1.remote_ip4)
1605         self.assertEqual(p[TCP].dport, 20)
1606         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1607         self.tcp_port_out = p[TCP].sport
1608         self.assertEqual(data, p[Raw].load)
1609
1610         # out2in
1611         data = b"A" * 4 + b"B" * 16 + b"C" * 3
1612         pkts = self.create_stream_frag(self.pg1,
1613                                        self.nat_addr,
1614                                        20,
1615                                        self.tcp_port_out,
1616                                        data)
1617         pkts.reverse()
1618         self.pg1.add_stream(pkts)
1619         self.pg_enable_capture(self.pg_interfaces)
1620         self.pg_start()
1621         frags = self.pg0.get_capture(len(pkts))
1622         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1623         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1624         self.assertEqual(p[TCP].sport, 20)
1625         self.assertEqual(p[TCP].dport, self.tcp_port_in)
1626         self.assertEqual(data, p[Raw].load)
1627
1628     def test_interface_addr(self):
1629         """ Acquire NAT64 pool addresses from interface """
1630         self.vapi.nat64_add_del_interface_addr(
1631             is_add=1,
1632             sw_if_index=self.pg4.sw_if_index)
1633
1634         # no address in NAT64 pool
1635         addresses = self.vapi.nat44_address_dump()
1636         self.assertEqual(0, len(addresses))
1637
1638         # configure interface address and check NAT64 address pool
1639         self.pg4.config_ip4()
1640         addresses = self.vapi.nat64_pool_addr_dump()
1641         self.assertEqual(len(addresses), 1)
1642
1643         self.assertEqual(str(addresses[0].address),
1644                          self.pg4.local_ip4)
1645
1646         # remove interface address and check NAT64 address pool
1647         self.pg4.unconfig_ip4()
1648         addresses = self.vapi.nat64_pool_addr_dump()
1649         self.assertEqual(0, len(addresses))
1650
1651     @unittest.skipUnless(config.extended, "part of extended tests")
1652     def test_ipfix_max_bibs_sessions(self):
1653         """ IPFIX logging maximum session and BIB entries exceeded """
1654         max_bibs = 1280
1655         max_sessions = 2560
1656         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1657                                            '64:ff9b::',
1658                                            96)
1659
1660         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1661                                                 end_addr=self.nat_addr,
1662                                                 vrf_id=0xFFFFFFFF,
1663                                                 is_add=1)
1664         flags = self.config_flags.NAT_IS_INSIDE
1665         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1666                                           sw_if_index=self.pg0.sw_if_index)
1667         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1668                                           sw_if_index=self.pg1.sw_if_index)
1669
1670         pkts = []
1671         src = ""
1672         for i in range(0, max_bibs):
1673             src = "fd01:aa::%x" % (i)
1674             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1675                  IPv6(src=src, dst=remote_host_ip6) /
1676                  TCP(sport=12345, dport=80))
1677             pkts.append(p)
1678             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1679                  IPv6(src=src, dst=remote_host_ip6) /
1680                  TCP(sport=12345, dport=22))
1681             pkts.append(p)
1682         self.pg0.add_stream(pkts)
1683         self.pg_enable_capture(self.pg_interfaces)
1684         self.pg_start()
1685         self.pg1.get_capture(max_sessions)
1686
1687         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
1688                                      src_address=self.pg3.local_ip4,
1689                                      path_mtu=512,
1690                                      template_interval=10)
1691         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1692                                            src_port=self.ipfix_src_port,
1693                                            enable=1)
1694
1695         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1696              IPv6(src=src, dst=remote_host_ip6) /
1697              TCP(sport=12345, dport=25))
1698         self.pg0.add_stream(p)
1699         self.pg_enable_capture(self.pg_interfaces)
1700         self.pg_start()
1701         self.pg1.assert_nothing_captured()
1702         self.vapi.ipfix_flush()
1703         capture = self.pg3.get_capture(7)
1704         ipfix = IPFIXDecoder()
1705         # first load template
1706         for p in capture:
1707             self.assertTrue(p.haslayer(IPFIX))
1708             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1709             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1710             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1711             self.assertEqual(p[UDP].dport, 4739)
1712             self.assertEqual(p[IPFIX].observationDomainID,
1713                              self.ipfix_domain_id)
1714             if p.haslayer(Template):
1715                 ipfix.add_template(p.getlayer(Template))
1716         # verify events in data set
1717         for p in capture:
1718             if p.haslayer(Data):
1719                 data = ipfix.decode_data_set(p.getlayer(Set))
1720                 self.verify_ipfix_max_sessions(data, max_sessions)
1721
1722         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1723              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1724              TCP(sport=12345, dport=80))
1725         self.pg0.add_stream(p)
1726         self.pg_enable_capture(self.pg_interfaces)
1727         self.pg_start()
1728         self.pg1.assert_nothing_captured()
1729         self.vapi.ipfix_flush()
1730         capture = self.pg3.get_capture(1)
1731         # verify events in data set
1732         for p in capture:
1733             self.assertTrue(p.haslayer(IPFIX))
1734             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1735             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1736             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1737             self.assertEqual(p[UDP].dport, 4739)
1738             self.assertEqual(p[IPFIX].observationDomainID,
1739                              self.ipfix_domain_id)
1740             if p.haslayer(Data):
1741                 data = ipfix.decode_data_set(p.getlayer(Set))
1742                 self.verify_ipfix_max_bibs(data, max_bibs)
1743
1744     def test_ipfix_bib_ses(self):
1745         """ IPFIX logging NAT64 BIB/session create and delete events """
1746         self.tcp_port_in = random.randint(1025, 65535)
1747         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1748                                            '64:ff9b::',
1749                                            96)
1750
1751         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1752                                                 end_addr=self.nat_addr,
1753                                                 vrf_id=0xFFFFFFFF,
1754                                                 is_add=1)
1755         flags = self.config_flags.NAT_IS_INSIDE
1756         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1757                                           sw_if_index=self.pg0.sw_if_index)
1758         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1759                                           sw_if_index=self.pg1.sw_if_index)
1760         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
1761                                      src_address=self.pg3.local_ip4,
1762                                      path_mtu=512,
1763                                      template_interval=10)
1764         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1765                                            src_port=self.ipfix_src_port,
1766                                            enable=1)
1767
1768         # Create
1769         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1770              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1771              TCP(sport=self.tcp_port_in, dport=25))
1772         self.pg0.add_stream(p)
1773         self.pg_enable_capture(self.pg_interfaces)
1774         self.pg_start()
1775         p = self.pg1.get_capture(1)
1776         self.tcp_port_out = p[0][TCP].sport
1777         self.vapi.ipfix_flush()
1778         capture = self.pg3.get_capture(8)
1779         ipfix = IPFIXDecoder()
1780         # first load template
1781         for p in capture:
1782             self.assertTrue(p.haslayer(IPFIX))
1783             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1784             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1785             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1786             self.assertEqual(p[UDP].dport, 4739)
1787             self.assertEqual(p[IPFIX].observationDomainID,
1788                              self.ipfix_domain_id)
1789             if p.haslayer(Template):
1790                 ipfix.add_template(p.getlayer(Template))
1791         # verify events in data set
1792         for p in capture:
1793             if p.haslayer(Data):
1794                 data = ipfix.decode_data_set(p.getlayer(Set))
1795                 if scapy.compat.orb(data[0][230]) == 10:
1796                     self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6)
1797                 elif scapy.compat.orb(data[0][230]) == 6:
1798                     self.verify_ipfix_nat64_ses(data,
1799                                                 1,
1800                                                 self.pg0.remote_ip6,
1801                                                 self.pg1.remote_ip4,
1802                                                 25)
1803                 else:
1804                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
1805
1806         # Delete
1807         self.pg_enable_capture(self.pg_interfaces)
1808         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1809                                                 end_addr=self.nat_addr,
1810                                                 vrf_id=0xFFFFFFFF,
1811                                                 is_add=0)
1812         self.vapi.ipfix_flush()
1813         capture = self.pg3.get_capture(2)
1814         # verify events in data set
1815         for p in capture:
1816             self.assertTrue(p.haslayer(IPFIX))
1817             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1818             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1819             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1820             self.assertEqual(p[UDP].dport, 4739)
1821             self.assertEqual(p[IPFIX].observationDomainID,
1822                              self.ipfix_domain_id)
1823             if p.haslayer(Data):
1824                 data = ipfix.decode_data_set(p.getlayer(Set))
1825                 if scapy.compat.orb(data[0][230]) == 11:
1826                     self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6)
1827                 elif scapy.compat.orb(data[0][230]) == 7:
1828                     self.verify_ipfix_nat64_ses(data,
1829                                                 0,
1830                                                 self.pg0.remote_ip6,
1831                                                 self.pg1.remote_ip4,
1832                                                 25)
1833                 else:
1834                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
1835
1836     def test_syslog_sess(self):
1837         """ Test syslog session creation and deletion """
1838         self.tcp_port_in = random.randint(1025, 65535)
1839         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1840                                            '64:ff9b::',
1841                                            96)
1842
1843         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1844                                                 end_addr=self.nat_addr,
1845                                                 vrf_id=0xFFFFFFFF,
1846                                                 is_add=1)
1847         flags = self.config_flags.NAT_IS_INSIDE
1848         self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1849                                           sw_if_index=self.pg0.sw_if_index)
1850         self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1851                                           sw_if_index=self.pg1.sw_if_index)
1852         self.vapi.syslog_set_filter(
1853             self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
1854         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
1855
1856         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1857              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1858              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1859         self.pg0.add_stream(p)
1860         self.pg_enable_capture(self.pg_interfaces)
1861         self.pg_start()
1862         p = self.pg1.get_capture(1)
1863         self.tcp_port_out = p[0][TCP].sport
1864         capture = self.pg3.get_capture(1)
1865         self.verify_syslog_sess(capture[0][Raw].load, is_ip6=True)
1866
1867         self.pg_enable_capture(self.pg_interfaces)
1868         self.pg_start()
1869         self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1870                                                 end_addr=self.nat_addr,
1871                                                 vrf_id=0xFFFFFFFF,
1872                                                 is_add=0)
1873         capture = self.pg3.get_capture(1)
1874         self.verify_syslog_sess(capture[0][Raw].load, False, True)
1875
1876     def nat64_get_ses_num(self):
1877         """
1878         Return number of active NAT64 sessions.
1879         """
1880         st = self.vapi.nat64_st_dump(proto=255)
1881         return len(st)
1882
1883     def clear_nat64(self):
1884         """
1885         Clear NAT64 configuration.
1886         """
1887         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1888                                            src_port=self.ipfix_src_port,
1889                                            enable=0)
1890         self.ipfix_src_port = 4739
1891         self.ipfix_domain_id = 1
1892
1893         self.vapi.syslog_set_filter(
1894             self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
1895
1896         self.vapi.nat64_set_timeouts(udp=300, tcp_established=7440,
1897                                      tcp_transitory=240, icmp=60)
1898
1899         interfaces = self.vapi.nat64_interface_dump()
1900         for intf in interfaces:
1901             self.vapi.nat64_add_del_interface(is_add=0, flags=intf.flags,
1902                                               sw_if_index=intf.sw_if_index)
1903
1904         bib = self.vapi.nat64_bib_dump(proto=255)
1905         for bibe in bib:
1906             if bibe.flags & self.config_flags.NAT_IS_STATIC:
1907                 self.vapi.nat64_add_del_static_bib(i_addr=bibe.i_addr,
1908                                                    o_addr=bibe.o_addr,
1909                                                    i_port=bibe.i_port,
1910                                                    o_port=bibe.o_port,
1911                                                    proto=bibe.proto,
1912                                                    vrf_id=bibe.vrf_id,
1913                                                    is_add=0)
1914
1915         adresses = self.vapi.nat64_pool_addr_dump()
1916         for addr in adresses:
1917             self.vapi.nat64_add_del_pool_addr_range(start_addr=addr.address,
1918                                                     end_addr=addr.address,
1919                                                     vrf_id=addr.vrf_id,
1920                                                     is_add=0)
1921
1922         prefixes = self.vapi.nat64_prefix_dump()
1923         for prefix in prefixes:
1924             self.vapi.nat64_add_del_prefix(prefix=str(prefix.prefix),
1925                                            vrf_id=prefix.vrf_id, is_add=0)
1926
1927         bibs = self.statistics.get_counter('/nat64/total-bibs')
1928         self.assertEqual(bibs[0][0], 0)
1929         sessions = self.statistics.get_counter('/nat64/total-sessions')
1930         self.assertEqual(sessions[0][0], 0)
1931
1932
1933 if __name__ == '__main__':
1934     unittest.main(testRunner=VppTestRunner)