Add support for 464XLAT NAT44 mode (VPP-1045)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6 import StringIO
7 import random
8
9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
19 from util import ppp
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
24
25
26 class MethodHolder(VppTestCase):
27     """ NAT create capture and verify method holder """
28
29     @classmethod
30     def setUpClass(cls):
31         super(MethodHolder, cls).setUpClass()
32
33     def tearDown(self):
34         super(MethodHolder, self).tearDown()
35
36     def check_ip_checksum(self, pkt):
37         """
38         Check IP checksum of the packet
39
40         :param pkt: Packet to check IP checksum
41         """
42         new = pkt.__class__(str(pkt))
43         del new['IP'].chksum
44         new = new.__class__(str(new))
45         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
46
47     def check_tcp_checksum(self, pkt):
48         """
49         Check TCP checksum in IP packet
50
51         :param pkt: Packet to check TCP checksum
52         """
53         new = pkt.__class__(str(pkt))
54         del new['TCP'].chksum
55         new = new.__class__(str(new))
56         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
57
58     def check_udp_checksum(self, pkt):
59         """
60         Check UDP checksum in IP packet
61
62         :param pkt: Packet to check UDP checksum
63         """
64         new = pkt.__class__(str(pkt))
65         del new['UDP'].chksum
66         new = new.__class__(str(new))
67         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
68
69     def check_icmp_errror_embedded(self, pkt):
70         """
71         Check ICMP error embeded packet checksum
72
73         :param pkt: Packet to check ICMP error embeded packet checksum
74         """
75         if pkt.haslayer(IPerror):
76             new = pkt.__class__(str(pkt))
77             del new['IPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
80
81         if pkt.haslayer(TCPerror):
82             new = pkt.__class__(str(pkt))
83             del new['TCPerror'].chksum
84             new = new.__class__(str(new))
85             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
86
87         if pkt.haslayer(UDPerror):
88             if pkt['UDPerror'].chksum != 0:
89                 new = pkt.__class__(str(pkt))
90                 del new['UDPerror'].chksum
91                 new = new.__class__(str(new))
92                 self.assertEqual(new['UDPerror'].chksum,
93                                  pkt['UDPerror'].chksum)
94
95         if pkt.haslayer(ICMPerror):
96             del new['ICMPerror'].chksum
97             new = new.__class__(str(new))
98             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
99
100     def check_icmp_checksum(self, pkt):
101         """
102         Check ICMP checksum in IPv4 packet
103
104         :param pkt: Packet to check ICMP checksum
105         """
106         new = pkt.__class__(str(pkt))
107         del new['ICMP'].chksum
108         new = new.__class__(str(new))
109         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110         if pkt.haslayer(IPerror):
111             self.check_icmp_errror_embedded(pkt)
112
113     def check_icmpv6_checksum(self, pkt):
114         """
115         Check ICMPv6 checksum in IPv4 packet
116
117         :param pkt: Packet to check ICMPv6 checksum
118         """
119         new = pkt.__class__(str(pkt))
120         if pkt.haslayer(ICMPv6DestUnreach):
121             del new['ICMPv6DestUnreach'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124                              pkt['ICMPv6DestUnreach'].cksum)
125             self.check_icmp_errror_embedded(pkt)
126         if pkt.haslayer(ICMPv6EchoRequest):
127             del new['ICMPv6EchoRequest'].cksum
128             new = new.__class__(str(new))
129             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130                              pkt['ICMPv6EchoRequest'].cksum)
131         if pkt.haslayer(ICMPv6EchoReply):
132             del new['ICMPv6EchoReply'].cksum
133             new = new.__class__(str(new))
134             self.assertEqual(new['ICMPv6EchoReply'].cksum,
135                              pkt['ICMPv6EchoReply'].cksum)
136
137     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
138         """
139         Create packet stream for inside network
140
141         :param in_if: Inside interface
142         :param out_if: Outside interface
143         :param dst_ip: Destination address
144         :param ttl: TTL of generated packets
145         """
146         if dst_ip is None:
147             dst_ip = out_if.remote_ip4
148
149         pkts = []
150         # TCP
151         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153              TCP(sport=self.tcp_port_in, dport=20))
154         pkts.append(p)
155
156         # UDP
157         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159              UDP(sport=self.udp_port_in, dport=20))
160         pkts.append(p)
161
162         # ICMP
163         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165              ICMP(id=self.icmp_id_in, type='echo-request'))
166         pkts.append(p)
167
168         return pkts
169
170     def compose_ip6(self, ip4, pref, plen):
171         """
172         Compose IPv4-embedded IPv6 addresses
173
174         :param ip4: IPv4 address
175         :param pref: IPv6 prefix
176         :param plen: IPv6 prefix length
177         :returns: IPv4-embedded IPv6 addresses
178         """
179         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
181         if plen == 32:
182             pref_n[4] = ip4_n[0]
183             pref_n[5] = ip4_n[1]
184             pref_n[6] = ip4_n[2]
185             pref_n[7] = ip4_n[3]
186         elif plen == 40:
187             pref_n[5] = ip4_n[0]
188             pref_n[6] = ip4_n[1]
189             pref_n[7] = ip4_n[2]
190             pref_n[9] = ip4_n[3]
191         elif plen == 48:
192             pref_n[6] = ip4_n[0]
193             pref_n[7] = ip4_n[1]
194             pref_n[9] = ip4_n[2]
195             pref_n[10] = ip4_n[3]
196         elif plen == 56:
197             pref_n[7] = ip4_n[0]
198             pref_n[9] = ip4_n[1]
199             pref_n[10] = ip4_n[2]
200             pref_n[11] = ip4_n[3]
201         elif plen == 64:
202             pref_n[9] = ip4_n[0]
203             pref_n[10] = ip4_n[1]
204             pref_n[11] = ip4_n[2]
205             pref_n[12] = ip4_n[3]
206         elif plen == 96:
207             pref_n[12] = ip4_n[0]
208             pref_n[13] = ip4_n[1]
209             pref_n[14] = ip4_n[2]
210             pref_n[15] = ip4_n[3]
211         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
212
213     def extract_ip4(self, ip6, plen):
214         """
215         Extract IPv4 address embedded in IPv6 addresses
216
217         :param ip6: IPv6 address
218         :param plen: IPv6 prefix length
219         :returns: extracted IPv4 address
220         """
221         ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
222         ip4_n = [None] * 4
223         if plen == 32:
224             ip4_n[0] = ip6_n[4]
225             ip4_n[1] = ip6_n[5]
226             ip4_n[2] = ip6_n[6]
227             ip4_n[3] = ip6_n[7]
228         elif plen == 40:
229             ip4_n[0] = ip6_n[5]
230             ip4_n[1] = ip6_n[6]
231             ip4_n[2] = ip6_n[7]
232             ip4_n[3] = ip6_n[9]
233         elif plen == 48:
234             ip4_n[0] = ip6_n[6]
235             ip4_n[1] = ip6_n[7]
236             ip4_n[2] = ip6_n[9]
237             ip4_n[3] = ip6_n[10]
238         elif plen == 56:
239             ip4_n[0] = ip6_n[7]
240             ip4_n[1] = ip6_n[9]
241             ip4_n[2] = ip6_n[10]
242             ip4_n[3] = ip6_n[11]
243         elif plen == 64:
244             ip4_n[0] = ip6_n[9]
245             ip4_n[1] = ip6_n[10]
246             ip4_n[2] = ip6_n[11]
247             ip4_n[3] = ip6_n[12]
248         elif plen == 96:
249             ip4_n[0] = ip6_n[12]
250             ip4_n[1] = ip6_n[13]
251             ip4_n[2] = ip6_n[14]
252             ip4_n[3] = ip6_n[15]
253         return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
254
255     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
256         """
257         Create IPv6 packet stream for inside network
258
259         :param in_if: Inside interface
260         :param out_if: Outside interface
261         :param ttl: Hop Limit of generated packets
262         :param pref: NAT64 prefix
263         :param plen: NAT64 prefix length
264         """
265         pkts = []
266         if pref is None:
267             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
268         else:
269             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
270
271         # TCP
272         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274              TCP(sport=self.tcp_port_in, dport=20))
275         pkts.append(p)
276
277         # UDP
278         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280              UDP(sport=self.udp_port_in, dport=20))
281         pkts.append(p)
282
283         # ICMP
284         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286              ICMPv6EchoRequest(id=self.icmp_id_in))
287         pkts.append(p)
288
289         return pkts
290
291     def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292                           use_inside_ports=False):
293         """
294         Create packet stream for outside network
295
296         :param out_if: Outside interface
297         :param dst_ip: Destination IP address (Default use global NAT address)
298         :param ttl: TTL of generated packets
299         :param use_inside_ports: Use inside NAT ports as destination ports
300                instead of outside ports
301         """
302         if dst_ip is None:
303             dst_ip = self.nat_addr
304         if not use_inside_ports:
305             tcp_port = self.tcp_port_out
306             udp_port = self.udp_port_out
307             icmp_id = self.icmp_id_out
308         else:
309             tcp_port = self.tcp_port_in
310             udp_port = self.udp_port_in
311             icmp_id = self.icmp_id_in
312         pkts = []
313         # TCP
314         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316              TCP(dport=tcp_port, sport=20))
317         pkts.append(p)
318
319         # UDP
320         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322              UDP(dport=udp_port, sport=20))
323         pkts.append(p)
324
325         # ICMP
326         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328              ICMP(id=icmp_id, type='echo-reply'))
329         pkts.append(p)
330
331         return pkts
332
333     def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
334         """
335         Create packet stream for outside network
336
337         :param out_if: Outside interface
338         :param dst_ip: Destination IP address (Default use global NAT address)
339         :param hl: HL of generated packets
340         """
341         pkts = []
342         # TCP
343         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345              TCP(dport=self.tcp_port_out, sport=20))
346         pkts.append(p)
347
348         # UDP
349         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351              UDP(dport=self.udp_port_out, sport=20))
352         pkts.append(p)
353
354         # ICMP
355         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357              ICMPv6EchoReply(id=self.icmp_id_out))
358         pkts.append(p)
359
360         return pkts
361
362     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363                            packet_num=3, dst_ip=None, is_ip6=False):
364         """
365         Verify captured packets on outside network
366
367         :param capture: Captured packets
368         :param nat_ip: Translated IP address (Default use global NAT address)
369         :param same_port: Sorce port number is not translated (Default False)
370         :param packet_num: Expected number of packets (Default 3)
371         :param dst_ip: Destination IP address (Default do not verify)
372         :param is_ip6: If L3 protocol is IPv6 (Default False)
373         """
374         if is_ip6:
375             IP46 = IPv6
376             ICMP46 = ICMPv6EchoRequest
377         else:
378             IP46 = IP
379             ICMP46 = ICMP
380         if nat_ip is None:
381             nat_ip = self.nat_addr
382         self.assertEqual(packet_num, len(capture))
383         for packet in capture:
384             try:
385                 if not is_ip6:
386                     self.check_ip_checksum(packet)
387                 self.assertEqual(packet[IP46].src, nat_ip)
388                 if dst_ip is not None:
389                     self.assertEqual(packet[IP46].dst, dst_ip)
390                 if packet.haslayer(TCP):
391                     if same_port:
392                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
393                     else:
394                         self.assertNotEqual(
395                             packet[TCP].sport, self.tcp_port_in)
396                     self.tcp_port_out = packet[TCP].sport
397                     self.check_tcp_checksum(packet)
398                 elif packet.haslayer(UDP):
399                     if same_port:
400                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
401                     else:
402                         self.assertNotEqual(
403                             packet[UDP].sport, self.udp_port_in)
404                     self.udp_port_out = packet[UDP].sport
405                 else:
406                     if same_port:
407                         self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
408                     else:
409                         self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410                     self.icmp_id_out = packet[ICMP46].id
411                     if is_ip6:
412                         self.check_icmpv6_checksum(packet)
413                     else:
414                         self.check_icmp_checksum(packet)
415             except:
416                 self.logger.error(ppp("Unexpected or invalid packet "
417                                       "(outside network):", packet))
418                 raise
419
420     def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421                                packet_num=3, dst_ip=None):
422         """
423         Verify captured packets on outside network
424
425         :param capture: Captured packets
426         :param nat_ip: Translated IP address
427         :param same_port: Sorce port number is not translated (Default False)
428         :param packet_num: Expected number of packets (Default 3)
429         :param dst_ip: Destination IP address (Default do not verify)
430         """
431         return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
432                                        dst_ip, True)
433
434     def verify_capture_in(self, capture, in_if, packet_num=3):
435         """
436         Verify captured packets on inside network
437
438         :param capture: Captured packets
439         :param in_if: Inside interface
440         :param packet_num: Expected number of packets (Default 3)
441         """
442         self.assertEqual(packet_num, len(capture))
443         for packet in capture:
444             try:
445                 self.check_ip_checksum(packet)
446                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447                 if packet.haslayer(TCP):
448                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449                     self.check_tcp_checksum(packet)
450                 elif packet.haslayer(UDP):
451                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
452                 else:
453                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454                     self.check_icmp_checksum(packet)
455             except:
456                 self.logger.error(ppp("Unexpected or invalid packet "
457                                       "(inside network):", packet))
458                 raise
459
460     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
461         """
462         Verify captured IPv6 packets on inside network
463
464         :param capture: Captured packets
465         :param src_ip: Source IP
466         :param dst_ip: Destination IP address
467         :param packet_num: Expected number of packets (Default 3)
468         """
469         self.assertEqual(packet_num, len(capture))
470         for packet in capture:
471             try:
472                 self.assertEqual(packet[IPv6].src, src_ip)
473                 self.assertEqual(packet[IPv6].dst, dst_ip)
474                 if packet.haslayer(TCP):
475                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476                     self.check_tcp_checksum(packet)
477                 elif packet.haslayer(UDP):
478                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
479                     self.check_udp_checksum(packet)
480                 else:
481                     self.assertEqual(packet[ICMPv6EchoReply].id,
482                                      self.icmp_id_in)
483                     self.check_icmpv6_checksum(packet)
484             except:
485                 self.logger.error(ppp("Unexpected or invalid packet "
486                                       "(inside network):", packet))
487                 raise
488
489     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
490         """
491         Verify captured packet that don't have to be translated
492
493         :param capture: Captured packets
494         :param ingress_if: Ingress interface
495         :param egress_if: Egress interface
496         """
497         for packet in capture:
498             try:
499                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501                 if packet.haslayer(TCP):
502                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503                 elif packet.haslayer(UDP):
504                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
505                 else:
506                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
507             except:
508                 self.logger.error(ppp("Unexpected or invalid packet "
509                                       "(inside network):", packet))
510                 raise
511
512     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513                                             packet_num=3, icmp_type=11):
514         """
515         Verify captured packets with ICMP errors on outside network
516
517         :param capture: Captured packets
518         :param src_ip: Translated IP address or IP address of VPP
519                        (Default use global NAT address)
520         :param packet_num: Expected number of packets (Default 3)
521         :param icmp_type: Type of error ICMP packet
522                           we are expecting (Default 11)
523         """
524         if src_ip is None:
525             src_ip = self.nat_addr
526         self.assertEqual(packet_num, len(capture))
527         for packet in capture:
528             try:
529                 self.assertEqual(packet[IP].src, src_ip)
530                 self.assertTrue(packet.haslayer(ICMP))
531                 icmp = packet[ICMP]
532                 self.assertEqual(icmp.type, icmp_type)
533                 self.assertTrue(icmp.haslayer(IPerror))
534                 inner_ip = icmp[IPerror]
535                 if inner_ip.haslayer(TCPerror):
536                     self.assertEqual(inner_ip[TCPerror].dport,
537                                      self.tcp_port_out)
538                 elif inner_ip.haslayer(UDPerror):
539                     self.assertEqual(inner_ip[UDPerror].dport,
540                                      self.udp_port_out)
541                 else:
542                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
543             except:
544                 self.logger.error(ppp("Unexpected or invalid packet "
545                                       "(outside network):", packet))
546                 raise
547
548     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
549                                            icmp_type=11):
550         """
551         Verify captured packets with ICMP errors on inside network
552
553         :param capture: Captured packets
554         :param in_if: Inside interface
555         :param packet_num: Expected number of packets (Default 3)
556         :param icmp_type: Type of error ICMP packet
557                           we are expecting (Default 11)
558         """
559         self.assertEqual(packet_num, len(capture))
560         for packet in capture:
561             try:
562                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563                 self.assertTrue(packet.haslayer(ICMP))
564                 icmp = packet[ICMP]
565                 self.assertEqual(icmp.type, icmp_type)
566                 self.assertTrue(icmp.haslayer(IPerror))
567                 inner_ip = icmp[IPerror]
568                 if inner_ip.haslayer(TCPerror):
569                     self.assertEqual(inner_ip[TCPerror].sport,
570                                      self.tcp_port_in)
571                 elif inner_ip.haslayer(UDPerror):
572                     self.assertEqual(inner_ip[UDPerror].sport,
573                                      self.udp_port_in)
574                 else:
575                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
576             except:
577                 self.logger.error(ppp("Unexpected or invalid packet "
578                                       "(inside network):", packet))
579                 raise
580
581     def create_stream_frag(self, src_if, dst, sport, dport, data):
582         """
583         Create fragmented packet stream
584
585         :param src_if: Source interface
586         :param dst: Destination IPv4 address
587         :param sport: Source TCP port
588         :param dport: Destination TCP port
589         :param data: Payload data
590         :returns: Fragmets
591         """
592         id = random.randint(0, 65535)
593         p = (IP(src=src_if.remote_ip4, dst=dst) /
594              TCP(sport=sport, dport=dport) /
595              Raw(data))
596         p = p.__class__(str(p))
597         chksum = p['TCP'].chksum
598         pkts = []
599         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601              TCP(sport=sport, dport=dport, chksum=chksum) /
602              Raw(data[0:4]))
603         pkts.append(p)
604         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606                 proto=IP_PROTOS.tcp) /
607              Raw(data[4:20]))
608         pkts.append(p)
609         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
611                 id=id) /
612              Raw(data[20:]))
613         pkts.append(p)
614         return pkts
615
616     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617                                pref=None, plen=0, frag_size=128):
618         """
619         Create fragmented packet stream
620
621         :param src_if: Source interface
622         :param dst: Destination IPv4 address
623         :param sport: Source TCP port
624         :param dport: Destination TCP port
625         :param data: Payload data
626         :param pref: NAT64 prefix
627         :param plen: NAT64 prefix length
628         :param fragsize: size of fragments
629         :returns: Fragmets
630         """
631         if pref is None:
632             dst_ip6 = ''.join(['64:ff9b::', dst])
633         else:
634             dst_ip6 = self.compose_ip6(dst, pref, plen)
635
636         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639              TCP(sport=sport, dport=dport) /
640              Raw(data))
641
642         return fragment6(p, frag_size)
643
644     def reass_frags_and_verify(self, frags, src, dst):
645         """
646         Reassemble and verify fragmented packet
647
648         :param frags: Captured fragments
649         :param src: Source IPv4 address to verify
650         :param dst: Destination IPv4 address to verify
651
652         :returns: Reassembled IPv4 packet
653         """
654         buffer = StringIO.StringIO()
655         for p in frags:
656             self.assertEqual(p[IP].src, src)
657             self.assertEqual(p[IP].dst, dst)
658             self.check_ip_checksum(p)
659             buffer.seek(p[IP].frag * 8)
660             buffer.write(p[IP].payload)
661         ip = frags[0].getlayer(IP)
662         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663                 proto=frags[0][IP].proto)
664         if ip.proto == IP_PROTOS.tcp:
665             p = (ip / TCP(buffer.getvalue()))
666             self.check_tcp_checksum(p)
667         elif ip.proto == IP_PROTOS.udp:
668             p = (ip / UDP(buffer.getvalue()))
669         return p
670
671     def reass_frags_and_verify_ip6(self, frags, src, dst):
672         """
673         Reassemble and verify fragmented packet
674
675         :param frags: Captured fragments
676         :param src: Source IPv6 address to verify
677         :param dst: Destination IPv6 address to verify
678
679         :returns: Reassembled IPv6 packet
680         """
681         buffer = StringIO.StringIO()
682         for p in frags:
683             self.assertEqual(p[IPv6].src, src)
684             self.assertEqual(p[IPv6].dst, dst)
685             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686             buffer.write(p[IPv6ExtHdrFragment].payload)
687         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688                   nh=frags[0][IPv6ExtHdrFragment].nh)
689         if ip.nh == IP_PROTOS.tcp:
690             p = (ip / TCP(buffer.getvalue()))
691             self.check_tcp_checksum(p)
692         elif ip.nh == IP_PROTOS.udp:
693             p = (ip / UDP(buffer.getvalue()))
694         return p
695
696     def verify_ipfix_nat44_ses(self, data):
697         """
698         Verify IPFIX NAT44 session create/delete event
699
700         :param data: Decoded IPFIX data records
701         """
702         nat44_ses_create_num = 0
703         nat44_ses_delete_num = 0
704         self.assertEqual(6, len(data))
705         for record in data:
706             # natEvent
707             self.assertIn(ord(record[230]), [4, 5])
708             if ord(record[230]) == 4:
709                 nat44_ses_create_num += 1
710             else:
711                 nat44_ses_delete_num += 1
712             # sourceIPv4Address
713             self.assertEqual(self.pg0.remote_ip4n, record[8])
714             # postNATSourceIPv4Address
715             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
716                              record[225])
717             # ingressVRFID
718             self.assertEqual(struct.pack("!I", 0), record[234])
719             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720             if IP_PROTOS.icmp == ord(record[4]):
721                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
723                                  record[227])
724             elif IP_PROTOS.tcp == ord(record[4]):
725                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
726                                  record[7])
727                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
728                                  record[227])
729             elif IP_PROTOS.udp == ord(record[4]):
730                 self.assertEqual(struct.pack("!H", self.udp_port_in),
731                                  record[7])
732                 self.assertEqual(struct.pack("!H", self.udp_port_out),
733                                  record[227])
734             else:
735                 self.fail("Invalid protocol")
736         self.assertEqual(3, nat44_ses_create_num)
737         self.assertEqual(3, nat44_ses_delete_num)
738
739     def verify_ipfix_addr_exhausted(self, data):
740         """
741         Verify IPFIX NAT addresses event
742
743         :param data: Decoded IPFIX data records
744         """
745         self.assertEqual(1, len(data))
746         record = data[0]
747         # natEvent
748         self.assertEqual(ord(record[230]), 3)
749         # natPoolID
750         self.assertEqual(struct.pack("!I", 0), record[283])
751
752
753 class TestNAT44(MethodHolder):
754     """ NAT44 Test Cases """
755
756     @classmethod
757     def setUpClass(cls):
758         super(TestNAT44, cls).setUpClass()
759
760         try:
761             cls.tcp_port_in = 6303
762             cls.tcp_port_out = 6303
763             cls.udp_port_in = 6304
764             cls.udp_port_out = 6304
765             cls.icmp_id_in = 6305
766             cls.icmp_id_out = 6305
767             cls.nat_addr = '10.0.0.3'
768             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
769             cls.ipfix_src_port = 4739
770             cls.ipfix_domain_id = 1
771
772             cls.create_pg_interfaces(range(10))
773             cls.interfaces = list(cls.pg_interfaces[0:4])
774
775             for i in cls.interfaces:
776                 i.admin_up()
777                 i.config_ip4()
778                 i.resolve_arp()
779
780             cls.pg0.generate_remote_hosts(3)
781             cls.pg0.configure_ipv4_neighbors()
782
783             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
784             cls.vapi.ip_table_add_del(10, is_add=1)
785             cls.vapi.ip_table_add_del(20, is_add=1)
786
787             cls.pg4._local_ip4 = "172.16.255.1"
788             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
789             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
790             cls.pg4.set_table_ip4(10)
791             cls.pg5._local_ip4 = "172.17.255.3"
792             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
793             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
794             cls.pg5.set_table_ip4(10)
795             cls.pg6._local_ip4 = "172.16.255.1"
796             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
797             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
798             cls.pg6.set_table_ip4(20)
799             for i in cls.overlapping_interfaces:
800                 i.config_ip4()
801                 i.admin_up()
802                 i.resolve_arp()
803
804             cls.pg7.admin_up()
805             cls.pg8.admin_up()
806
807             cls.pg9.generate_remote_hosts(2)
808             cls.pg9.config_ip4()
809             ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
810             cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
811                                                   ip_addr_n,
812                                                   24)
813             cls.pg9.admin_up()
814             cls.pg9.resolve_arp()
815             cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
816             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
817             cls.pg9.resolve_arp()
818
819         except Exception:
820             super(TestNAT44, cls).tearDownClass()
821             raise
822
823     def clear_nat44(self):
824         """
825         Clear NAT44 configuration.
826         """
827         # I found no elegant way to do this
828         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
829                                    dst_address_length=32,
830                                    next_hop_address=self.pg7.remote_ip4n,
831                                    next_hop_sw_if_index=self.pg7.sw_if_index,
832                                    is_add=0)
833         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
834                                    dst_address_length=32,
835                                    next_hop_address=self.pg8.remote_ip4n,
836                                    next_hop_sw_if_index=self.pg8.sw_if_index,
837                                    is_add=0)
838
839         for intf in [self.pg7, self.pg8]:
840             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
841             for n in neighbors:
842                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
843                                               n.mac_address,
844                                               n.ip_address,
845                                               is_add=0)
846
847         if self.pg7.has_ip4_config:
848             self.pg7.unconfig_ip4()
849
850         interfaces = self.vapi.nat44_interface_addr_dump()
851         for intf in interfaces:
852             self.vapi.nat44_add_interface_addr(intf.sw_if_index,
853                                                twice_nat=intf.twice_nat,
854                                                is_add=0)
855
856         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
857                             domain_id=self.ipfix_domain_id)
858         self.ipfix_src_port = 4739
859         self.ipfix_domain_id = 1
860
861         interfaces = self.vapi.nat44_interface_dump()
862         for intf in interfaces:
863             if intf.is_inside > 1:
864                 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
865                                                           0,
866                                                           is_add=0)
867             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
868                                                       intf.is_inside,
869                                                       is_add=0)
870
871         interfaces = self.vapi.nat44_interface_output_feature_dump()
872         for intf in interfaces:
873             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
874                                                              intf.is_inside,
875                                                              is_add=0)
876
877         static_mappings = self.vapi.nat44_static_mapping_dump()
878         for sm in static_mappings:
879             self.vapi.nat44_add_del_static_mapping(
880                 sm.local_ip_address,
881                 sm.external_ip_address,
882                 local_port=sm.local_port,
883                 external_port=sm.external_port,
884                 addr_only=sm.addr_only,
885                 vrf_id=sm.vrf_id,
886                 protocol=sm.protocol,
887                 twice_nat=sm.twice_nat,
888                 is_add=0)
889
890         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
891         for lb_sm in lb_static_mappings:
892             self.vapi.nat44_add_del_lb_static_mapping(
893                 lb_sm.external_addr,
894                 lb_sm.external_port,
895                 lb_sm.protocol,
896                 vrf_id=lb_sm.vrf_id,
897                 twice_nat=lb_sm.twice_nat,
898                 is_add=0,
899                 local_num=0,
900                 locals=[])
901
902         identity_mappings = self.vapi.nat44_identity_mapping_dump()
903         for id_m in identity_mappings:
904             self.vapi.nat44_add_del_identity_mapping(
905                 addr_only=id_m.addr_only,
906                 ip=id_m.ip_address,
907                 port=id_m.port,
908                 sw_if_index=id_m.sw_if_index,
909                 vrf_id=id_m.vrf_id,
910                 protocol=id_m.protocol,
911                 is_add=0)
912
913         adresses = self.vapi.nat44_address_dump()
914         for addr in adresses:
915             self.vapi.nat44_add_del_address_range(addr.ip_address,
916                                                   addr.ip_address,
917                                                   twice_nat=addr.twice_nat,
918                                                   is_add=0)
919
920         self.vapi.nat_set_reass()
921         self.vapi.nat_set_reass(is_ip6=1)
922
923     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
924                                  local_port=0, external_port=0, vrf_id=0,
925                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
926                                  proto=0, twice_nat=0):
927         """
928         Add/delete NAT44 static mapping
929
930         :param local_ip: Local IP address
931         :param external_ip: External IP address
932         :param local_port: Local port number (Optional)
933         :param external_port: External port number (Optional)
934         :param vrf_id: VRF ID (Default 0)
935         :param is_add: 1 if add, 0 if delete (Default add)
936         :param external_sw_if_index: External interface instead of IP address
937         :param proto: IP protocol (Mandatory if port specified)
938         :param twice_nat: 1 if translate external host address and port
939         """
940         addr_only = 1
941         if local_port and external_port:
942             addr_only = 0
943         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
944         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
945         self.vapi.nat44_add_del_static_mapping(
946             l_ip,
947             e_ip,
948             external_sw_if_index,
949             local_port,
950             external_port,
951             addr_only,
952             vrf_id,
953             proto,
954             twice_nat,
955             is_add)
956
957     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
958         """
959         Add/delete NAT44 address
960
961         :param ip: IP address
962         :param is_add: 1 if add, 0 if delete (Default add)
963         :param twice_nat: twice NAT address for extenal hosts
964         """
965         nat_addr = socket.inet_pton(socket.AF_INET, ip)
966         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
967                                               vrf_id=vrf_id,
968                                               twice_nat=twice_nat)
969
970     def test_dynamic(self):
971         """ NAT44 dynamic translation test """
972
973         self.nat44_add_address(self.nat_addr)
974         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
975         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
976                                                   is_inside=0)
977
978         # in2out
979         pkts = self.create_stream_in(self.pg0, self.pg1)
980         self.pg0.add_stream(pkts)
981         self.pg_enable_capture(self.pg_interfaces)
982         self.pg_start()
983         capture = self.pg1.get_capture(len(pkts))
984         self.verify_capture_out(capture)
985
986         # out2in
987         pkts = self.create_stream_out(self.pg1)
988         self.pg1.add_stream(pkts)
989         self.pg_enable_capture(self.pg_interfaces)
990         self.pg_start()
991         capture = self.pg0.get_capture(len(pkts))
992         self.verify_capture_in(capture, self.pg0)
993
994     def test_dynamic_icmp_errors_in2out_ttl_1(self):
995         """ NAT44 handling of client packets with TTL=1 """
996
997         self.nat44_add_address(self.nat_addr)
998         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
999         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1000                                                   is_inside=0)
1001
1002         # Client side - generate traffic
1003         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1004         self.pg0.add_stream(pkts)
1005         self.pg_enable_capture(self.pg_interfaces)
1006         self.pg_start()
1007
1008         # Client side - verify ICMP type 11 packets
1009         capture = self.pg0.get_capture(len(pkts))
1010         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1011
1012     def test_dynamic_icmp_errors_out2in_ttl_1(self):
1013         """ NAT44 handling of server packets with TTL=1 """
1014
1015         self.nat44_add_address(self.nat_addr)
1016         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1017         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1018                                                   is_inside=0)
1019
1020         # Client side - create sessions
1021         pkts = self.create_stream_in(self.pg0, self.pg1)
1022         self.pg0.add_stream(pkts)
1023         self.pg_enable_capture(self.pg_interfaces)
1024         self.pg_start()
1025
1026         # Server side - generate traffic
1027         capture = self.pg1.get_capture(len(pkts))
1028         self.verify_capture_out(capture)
1029         pkts = self.create_stream_out(self.pg1, ttl=1)
1030         self.pg1.add_stream(pkts)
1031         self.pg_enable_capture(self.pg_interfaces)
1032         self.pg_start()
1033
1034         # Server side - verify ICMP type 11 packets
1035         capture = self.pg1.get_capture(len(pkts))
1036         self.verify_capture_out_with_icmp_errors(capture,
1037                                                  src_ip=self.pg1.local_ip4)
1038
1039     def test_dynamic_icmp_errors_in2out_ttl_2(self):
1040         """ NAT44 handling of error responses to client packets with TTL=2 """
1041
1042         self.nat44_add_address(self.nat_addr)
1043         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1044         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1045                                                   is_inside=0)
1046
1047         # Client side - generate traffic
1048         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1049         self.pg0.add_stream(pkts)
1050         self.pg_enable_capture(self.pg_interfaces)
1051         self.pg_start()
1052
1053         # Server side - simulate ICMP type 11 response
1054         capture = self.pg1.get_capture(len(pkts))
1055         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1056                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1057                 ICMP(type=11) / packet[IP] for packet in capture]
1058         self.pg1.add_stream(pkts)
1059         self.pg_enable_capture(self.pg_interfaces)
1060         self.pg_start()
1061
1062         # Client side - verify ICMP type 11 packets
1063         capture = self.pg0.get_capture(len(pkts))
1064         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1065
1066     def test_dynamic_icmp_errors_out2in_ttl_2(self):
1067         """ NAT44 handling of error responses to server packets with TTL=2 """
1068
1069         self.nat44_add_address(self.nat_addr)
1070         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1071         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1072                                                   is_inside=0)
1073
1074         # Client side - create sessions
1075         pkts = self.create_stream_in(self.pg0, self.pg1)
1076         self.pg0.add_stream(pkts)
1077         self.pg_enable_capture(self.pg_interfaces)
1078         self.pg_start()
1079
1080         # Server side - generate traffic
1081         capture = self.pg1.get_capture(len(pkts))
1082         self.verify_capture_out(capture)
1083         pkts = self.create_stream_out(self.pg1, ttl=2)
1084         self.pg1.add_stream(pkts)
1085         self.pg_enable_capture(self.pg_interfaces)
1086         self.pg_start()
1087
1088         # Client side - simulate ICMP type 11 response
1089         capture = self.pg0.get_capture(len(pkts))
1090         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1091                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1092                 ICMP(type=11) / packet[IP] for packet in capture]
1093         self.pg0.add_stream(pkts)
1094         self.pg_enable_capture(self.pg_interfaces)
1095         self.pg_start()
1096
1097         # Server side - verify ICMP type 11 packets
1098         capture = self.pg1.get_capture(len(pkts))
1099         self.verify_capture_out_with_icmp_errors(capture)
1100
1101     def test_ping_out_interface_from_outside(self):
1102         """ Ping NAT44 out interface from outside network """
1103
1104         self.nat44_add_address(self.nat_addr)
1105         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1106         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1107                                                   is_inside=0)
1108
1109         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1110              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1111              ICMP(id=self.icmp_id_out, type='echo-request'))
1112         pkts = [p]
1113         self.pg1.add_stream(pkts)
1114         self.pg_enable_capture(self.pg_interfaces)
1115         self.pg_start()
1116         capture = self.pg1.get_capture(len(pkts))
1117         self.assertEqual(1, len(capture))
1118         packet = capture[0]
1119         try:
1120             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1121             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1122             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1123             self.assertEqual(packet[ICMP].type, 0)  # echo reply
1124         except:
1125             self.logger.error(ppp("Unexpected or invalid packet "
1126                                   "(outside network):", packet))
1127             raise
1128
1129     def test_ping_internal_host_from_outside(self):
1130         """ Ping internal host from outside network """
1131
1132         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1133         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1134         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1135                                                   is_inside=0)
1136
1137         # out2in
1138         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1139                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1140                ICMP(id=self.icmp_id_out, type='echo-request'))
1141         self.pg1.add_stream(pkt)
1142         self.pg_enable_capture(self.pg_interfaces)
1143         self.pg_start()
1144         capture = self.pg0.get_capture(1)
1145         self.verify_capture_in(capture, self.pg0, packet_num=1)
1146         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1147
1148         # in2out
1149         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1150                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1151                ICMP(id=self.icmp_id_in, type='echo-reply'))
1152         self.pg0.add_stream(pkt)
1153         self.pg_enable_capture(self.pg_interfaces)
1154         self.pg_start()
1155         capture = self.pg1.get_capture(1)
1156         self.verify_capture_out(capture, same_port=True, packet_num=1)
1157         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1158
1159     def test_forwarding(self):
1160         """ NAT44 forwarding test """
1161
1162         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1163         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1164                                                   is_inside=0)
1165         self.vapi.nat44_forwarding_enable_disable(1)
1166
1167         real_ip = self.pg0.remote_ip4n
1168         alias_ip = self.nat_addr_n
1169         self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1170                                                external_ip=alias_ip)
1171
1172         try:
1173             # in2out - static mapping match
1174
1175             pkts = self.create_stream_out(self.pg1)
1176             self.pg1.add_stream(pkts)
1177             self.pg_enable_capture(self.pg_interfaces)
1178             self.pg_start()
1179             capture = self.pg0.get_capture(len(pkts))
1180             self.verify_capture_in(capture, self.pg0)
1181
1182             pkts = self.create_stream_in(self.pg0, self.pg1)
1183             self.pg0.add_stream(pkts)
1184             self.pg_enable_capture(self.pg_interfaces)
1185             self.pg_start()
1186             capture = self.pg1.get_capture(len(pkts))
1187             self.verify_capture_out(capture, same_port=True)
1188
1189             # in2out - no static mapping match
1190
1191             host0 = self.pg0.remote_hosts[0]
1192             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1193             try:
1194                 pkts = self.create_stream_out(self.pg1,
1195                                               dst_ip=self.pg0.remote_ip4,
1196                                               use_inside_ports=True)
1197                 self.pg1.add_stream(pkts)
1198                 self.pg_enable_capture(self.pg_interfaces)
1199                 self.pg_start()
1200                 capture = self.pg0.get_capture(len(pkts))
1201                 self.verify_capture_in(capture, self.pg0)
1202
1203                 pkts = self.create_stream_in(self.pg0, self.pg1)
1204                 self.pg0.add_stream(pkts)
1205                 self.pg_enable_capture(self.pg_interfaces)
1206                 self.pg_start()
1207                 capture = self.pg1.get_capture(len(pkts))
1208                 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1209                                         same_port=True)
1210             finally:
1211                 self.pg0.remote_hosts[0] = host0
1212
1213         finally:
1214             self.vapi.nat44_forwarding_enable_disable(0)
1215             self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1216                                                    external_ip=alias_ip,
1217                                                    is_add=0)
1218
1219     def test_static_in(self):
1220         """ 1:1 NAT initialized from inside network """
1221
1222         nat_ip = "10.0.0.10"
1223         self.tcp_port_out = 6303
1224         self.udp_port_out = 6304
1225         self.icmp_id_out = 6305
1226
1227         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1228         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1229         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1230                                                   is_inside=0)
1231
1232         # in2out
1233         pkts = self.create_stream_in(self.pg0, self.pg1)
1234         self.pg0.add_stream(pkts)
1235         self.pg_enable_capture(self.pg_interfaces)
1236         self.pg_start()
1237         capture = self.pg1.get_capture(len(pkts))
1238         self.verify_capture_out(capture, nat_ip, True)
1239
1240         # out2in
1241         pkts = self.create_stream_out(self.pg1, nat_ip)
1242         self.pg1.add_stream(pkts)
1243         self.pg_enable_capture(self.pg_interfaces)
1244         self.pg_start()
1245         capture = self.pg0.get_capture(len(pkts))
1246         self.verify_capture_in(capture, self.pg0)
1247
1248     def test_static_out(self):
1249         """ 1:1 NAT initialized from outside network """
1250
1251         nat_ip = "10.0.0.20"
1252         self.tcp_port_out = 6303
1253         self.udp_port_out = 6304
1254         self.icmp_id_out = 6305
1255
1256         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1257         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1258         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1259                                                   is_inside=0)
1260
1261         # out2in
1262         pkts = self.create_stream_out(self.pg1, nat_ip)
1263         self.pg1.add_stream(pkts)
1264         self.pg_enable_capture(self.pg_interfaces)
1265         self.pg_start()
1266         capture = self.pg0.get_capture(len(pkts))
1267         self.verify_capture_in(capture, self.pg0)
1268
1269         # in2out
1270         pkts = self.create_stream_in(self.pg0, self.pg1)
1271         self.pg0.add_stream(pkts)
1272         self.pg_enable_capture(self.pg_interfaces)
1273         self.pg_start()
1274         capture = self.pg1.get_capture(len(pkts))
1275         self.verify_capture_out(capture, nat_ip, True)
1276
1277     def test_static_with_port_in(self):
1278         """ 1:1 NAPT initialized from inside network """
1279
1280         self.tcp_port_out = 3606
1281         self.udp_port_out = 3607
1282         self.icmp_id_out = 3608
1283
1284         self.nat44_add_address(self.nat_addr)
1285         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1286                                       self.tcp_port_in, self.tcp_port_out,
1287                                       proto=IP_PROTOS.tcp)
1288         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1289                                       self.udp_port_in, self.udp_port_out,
1290                                       proto=IP_PROTOS.udp)
1291         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1292                                       self.icmp_id_in, self.icmp_id_out,
1293                                       proto=IP_PROTOS.icmp)
1294         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1295         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1296                                                   is_inside=0)
1297
1298         # in2out
1299         pkts = self.create_stream_in(self.pg0, self.pg1)
1300         self.pg0.add_stream(pkts)
1301         self.pg_enable_capture(self.pg_interfaces)
1302         self.pg_start()
1303         capture = self.pg1.get_capture(len(pkts))
1304         self.verify_capture_out(capture)
1305
1306         # out2in
1307         pkts = self.create_stream_out(self.pg1)
1308         self.pg1.add_stream(pkts)
1309         self.pg_enable_capture(self.pg_interfaces)
1310         self.pg_start()
1311         capture = self.pg0.get_capture(len(pkts))
1312         self.verify_capture_in(capture, self.pg0)
1313
1314     def test_static_with_port_out(self):
1315         """ 1:1 NAPT initialized from outside network """
1316
1317         self.tcp_port_out = 30606
1318         self.udp_port_out = 30607
1319         self.icmp_id_out = 30608
1320
1321         self.nat44_add_address(self.nat_addr)
1322         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1323                                       self.tcp_port_in, self.tcp_port_out,
1324                                       proto=IP_PROTOS.tcp)
1325         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1326                                       self.udp_port_in, self.udp_port_out,
1327                                       proto=IP_PROTOS.udp)
1328         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1329                                       self.icmp_id_in, self.icmp_id_out,
1330                                       proto=IP_PROTOS.icmp)
1331         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1332         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1333                                                   is_inside=0)
1334
1335         # out2in
1336         pkts = self.create_stream_out(self.pg1)
1337         self.pg1.add_stream(pkts)
1338         self.pg_enable_capture(self.pg_interfaces)
1339         self.pg_start()
1340         capture = self.pg0.get_capture(len(pkts))
1341         self.verify_capture_in(capture, self.pg0)
1342
1343         # in2out
1344         pkts = self.create_stream_in(self.pg0, self.pg1)
1345         self.pg0.add_stream(pkts)
1346         self.pg_enable_capture(self.pg_interfaces)
1347         self.pg_start()
1348         capture = self.pg1.get_capture(len(pkts))
1349         self.verify_capture_out(capture)
1350
1351     def test_static_vrf_aware(self):
1352         """ 1:1 NAT VRF awareness """
1353
1354         nat_ip1 = "10.0.0.30"
1355         nat_ip2 = "10.0.0.40"
1356         self.tcp_port_out = 6303
1357         self.udp_port_out = 6304
1358         self.icmp_id_out = 6305
1359
1360         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1361                                       vrf_id=10)
1362         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1363                                       vrf_id=10)
1364         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1365                                                   is_inside=0)
1366         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1367         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1368
1369         # inside interface VRF match NAT44 static mapping VRF
1370         pkts = self.create_stream_in(self.pg4, self.pg3)
1371         self.pg4.add_stream(pkts)
1372         self.pg_enable_capture(self.pg_interfaces)
1373         self.pg_start()
1374         capture = self.pg3.get_capture(len(pkts))
1375         self.verify_capture_out(capture, nat_ip1, True)
1376
1377         # inside interface VRF don't match NAT44 static mapping VRF (packets
1378         # are dropped)
1379         pkts = self.create_stream_in(self.pg0, self.pg3)
1380         self.pg0.add_stream(pkts)
1381         self.pg_enable_capture(self.pg_interfaces)
1382         self.pg_start()
1383         self.pg3.assert_nothing_captured()
1384
1385     def test_identity_nat(self):
1386         """ Identity NAT """
1387
1388         self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1389         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1390         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1391                                                   is_inside=0)
1392
1393         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1394              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1395              TCP(sport=12345, dport=56789))
1396         self.pg1.add_stream(p)
1397         self.pg_enable_capture(self.pg_interfaces)
1398         self.pg_start()
1399         capture = self.pg0.get_capture(1)
1400         p = capture[0]
1401         try:
1402             ip = p[IP]
1403             tcp = p[TCP]
1404             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1405             self.assertEqual(ip.src, self.pg1.remote_ip4)
1406             self.assertEqual(tcp.dport, 56789)
1407             self.assertEqual(tcp.sport, 12345)
1408             self.check_tcp_checksum(p)
1409             self.check_ip_checksum(p)
1410         except:
1411             self.logger.error(ppp("Unexpected or invalid packet:", p))
1412             raise
1413
1414     def test_static_lb(self):
1415         """ NAT44 local service load balancing """
1416         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1417         external_port = 80
1418         local_port = 8080
1419         server1 = self.pg0.remote_hosts[0]
1420         server2 = self.pg0.remote_hosts[1]
1421
1422         locals = [{'addr': server1.ip4n,
1423                    'port': local_port,
1424                    'probability': 70},
1425                   {'addr': server2.ip4n,
1426                    'port': local_port,
1427                    'probability': 30}]
1428
1429         self.nat44_add_address(self.nat_addr)
1430         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1431                                                   external_port,
1432                                                   IP_PROTOS.tcp,
1433                                                   local_num=len(locals),
1434                                                   locals=locals)
1435         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1436         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1437                                                   is_inside=0)
1438
1439         # from client to service
1440         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1441              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1442              TCP(sport=12345, dport=external_port))
1443         self.pg1.add_stream(p)
1444         self.pg_enable_capture(self.pg_interfaces)
1445         self.pg_start()
1446         capture = self.pg0.get_capture(1)
1447         p = capture[0]
1448         server = None
1449         try:
1450             ip = p[IP]
1451             tcp = p[TCP]
1452             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1453             if ip.dst == server1.ip4:
1454                 server = server1
1455             else:
1456                 server = server2
1457             self.assertEqual(tcp.dport, local_port)
1458             self.check_tcp_checksum(p)
1459             self.check_ip_checksum(p)
1460         except:
1461             self.logger.error(ppp("Unexpected or invalid packet:", p))
1462             raise
1463
1464         # from service back to client
1465         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1466              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1467              TCP(sport=local_port, dport=12345))
1468         self.pg0.add_stream(p)
1469         self.pg_enable_capture(self.pg_interfaces)
1470         self.pg_start()
1471         capture = self.pg1.get_capture(1)
1472         p = capture[0]
1473         try:
1474             ip = p[IP]
1475             tcp = p[TCP]
1476             self.assertEqual(ip.src, self.nat_addr)
1477             self.assertEqual(tcp.sport, external_port)
1478             self.check_tcp_checksum(p)
1479             self.check_ip_checksum(p)
1480         except:
1481             self.logger.error(ppp("Unexpected or invalid packet:", p))
1482             raise
1483
1484         # multiple clients
1485         server1_n = 0
1486         server2_n = 0
1487         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1488         pkts = []
1489         for client in clients:
1490             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1491                  IP(src=client, dst=self.nat_addr) /
1492                  TCP(sport=12345, dport=external_port))
1493             pkts.append(p)
1494         self.pg1.add_stream(pkts)
1495         self.pg_enable_capture(self.pg_interfaces)
1496         self.pg_start()
1497         capture = self.pg0.get_capture(len(pkts))
1498         for p in capture:
1499             if p[IP].dst == server1.ip4:
1500                 server1_n += 1
1501             else:
1502                 server2_n += 1
1503         self.assertTrue(server1_n > server2_n)
1504
1505     def test_multiple_inside_interfaces(self):
1506         """ NAT44 multiple non-overlapping address space inside interfaces """
1507
1508         self.nat44_add_address(self.nat_addr)
1509         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1510         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1511         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1512                                                   is_inside=0)
1513
1514         # between two NAT44 inside interfaces (no translation)
1515         pkts = self.create_stream_in(self.pg0, self.pg1)
1516         self.pg0.add_stream(pkts)
1517         self.pg_enable_capture(self.pg_interfaces)
1518         self.pg_start()
1519         capture = self.pg1.get_capture(len(pkts))
1520         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1521
1522         # from NAT44 inside to interface without NAT44 feature (no translation)
1523         pkts = self.create_stream_in(self.pg0, self.pg2)
1524         self.pg0.add_stream(pkts)
1525         self.pg_enable_capture(self.pg_interfaces)
1526         self.pg_start()
1527         capture = self.pg2.get_capture(len(pkts))
1528         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1529
1530         # in2out 1st interface
1531         pkts = self.create_stream_in(self.pg0, self.pg3)
1532         self.pg0.add_stream(pkts)
1533         self.pg_enable_capture(self.pg_interfaces)
1534         self.pg_start()
1535         capture = self.pg3.get_capture(len(pkts))
1536         self.verify_capture_out(capture)
1537
1538         # out2in 1st interface
1539         pkts = self.create_stream_out(self.pg3)
1540         self.pg3.add_stream(pkts)
1541         self.pg_enable_capture(self.pg_interfaces)
1542         self.pg_start()
1543         capture = self.pg0.get_capture(len(pkts))
1544         self.verify_capture_in(capture, self.pg0)
1545
1546         # in2out 2nd interface
1547         pkts = self.create_stream_in(self.pg1, self.pg3)
1548         self.pg1.add_stream(pkts)
1549         self.pg_enable_capture(self.pg_interfaces)
1550         self.pg_start()
1551         capture = self.pg3.get_capture(len(pkts))
1552         self.verify_capture_out(capture)
1553
1554         # out2in 2nd interface
1555         pkts = self.create_stream_out(self.pg3)
1556         self.pg3.add_stream(pkts)
1557         self.pg_enable_capture(self.pg_interfaces)
1558         self.pg_start()
1559         capture = self.pg1.get_capture(len(pkts))
1560         self.verify_capture_in(capture, self.pg1)
1561
1562     def test_inside_overlapping_interfaces(self):
1563         """ NAT44 multiple inside interfaces with overlapping address space """
1564
1565         static_nat_ip = "10.0.0.10"
1566         self.nat44_add_address(self.nat_addr)
1567         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1568                                                   is_inside=0)
1569         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1570         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1571         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1572         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1573                                       vrf_id=20)
1574
1575         # between NAT44 inside interfaces with same VRF (no translation)
1576         pkts = self.create_stream_in(self.pg4, self.pg5)
1577         self.pg4.add_stream(pkts)
1578         self.pg_enable_capture(self.pg_interfaces)
1579         self.pg_start()
1580         capture = self.pg5.get_capture(len(pkts))
1581         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1582
1583         # between NAT44 inside interfaces with different VRF (hairpinning)
1584         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1585              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1586              TCP(sport=1234, dport=5678))
1587         self.pg4.add_stream(p)
1588         self.pg_enable_capture(self.pg_interfaces)
1589         self.pg_start()
1590         capture = self.pg6.get_capture(1)
1591         p = capture[0]
1592         try:
1593             ip = p[IP]
1594             tcp = p[TCP]
1595             self.assertEqual(ip.src, self.nat_addr)
1596             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1597             self.assertNotEqual(tcp.sport, 1234)
1598             self.assertEqual(tcp.dport, 5678)
1599         except:
1600             self.logger.error(ppp("Unexpected or invalid packet:", p))
1601             raise
1602
1603         # in2out 1st interface
1604         pkts = self.create_stream_in(self.pg4, self.pg3)
1605         self.pg4.add_stream(pkts)
1606         self.pg_enable_capture(self.pg_interfaces)
1607         self.pg_start()
1608         capture = self.pg3.get_capture(len(pkts))
1609         self.verify_capture_out(capture)
1610
1611         # out2in 1st interface
1612         pkts = self.create_stream_out(self.pg3)
1613         self.pg3.add_stream(pkts)
1614         self.pg_enable_capture(self.pg_interfaces)
1615         self.pg_start()
1616         capture = self.pg4.get_capture(len(pkts))
1617         self.verify_capture_in(capture, self.pg4)
1618
1619         # in2out 2nd interface
1620         pkts = self.create_stream_in(self.pg5, self.pg3)
1621         self.pg5.add_stream(pkts)
1622         self.pg_enable_capture(self.pg_interfaces)
1623         self.pg_start()
1624         capture = self.pg3.get_capture(len(pkts))
1625         self.verify_capture_out(capture)
1626
1627         # out2in 2nd interface
1628         pkts = self.create_stream_out(self.pg3)
1629         self.pg3.add_stream(pkts)
1630         self.pg_enable_capture(self.pg_interfaces)
1631         self.pg_start()
1632         capture = self.pg5.get_capture(len(pkts))
1633         self.verify_capture_in(capture, self.pg5)
1634
1635         # pg5 session dump
1636         addresses = self.vapi.nat44_address_dump()
1637         self.assertEqual(len(addresses), 1)
1638         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1639         self.assertEqual(len(sessions), 3)
1640         for session in sessions:
1641             self.assertFalse(session.is_static)
1642             self.assertEqual(session.inside_ip_address[0:4],
1643                              self.pg5.remote_ip4n)
1644             self.assertEqual(session.outside_ip_address,
1645                              addresses[0].ip_address)
1646         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1647         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1648         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1649         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1650         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1651         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1652         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1653         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1654         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1655
1656         # in2out 3rd interface
1657         pkts = self.create_stream_in(self.pg6, self.pg3)
1658         self.pg6.add_stream(pkts)
1659         self.pg_enable_capture(self.pg_interfaces)
1660         self.pg_start()
1661         capture = self.pg3.get_capture(len(pkts))
1662         self.verify_capture_out(capture, static_nat_ip, True)
1663
1664         # out2in 3rd interface
1665         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1666         self.pg3.add_stream(pkts)
1667         self.pg_enable_capture(self.pg_interfaces)
1668         self.pg_start()
1669         capture = self.pg6.get_capture(len(pkts))
1670         self.verify_capture_in(capture, self.pg6)
1671
1672         # general user and session dump verifications
1673         users = self.vapi.nat44_user_dump()
1674         self.assertTrue(len(users) >= 3)
1675         addresses = self.vapi.nat44_address_dump()
1676         self.assertEqual(len(addresses), 1)
1677         for user in users:
1678             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1679                                                          user.vrf_id)
1680             for session in sessions:
1681                 self.assertEqual(user.ip_address, session.inside_ip_address)
1682                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1683                 self.assertTrue(session.protocol in
1684                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1685                                  IP_PROTOS.icmp])
1686
1687         # pg4 session dump
1688         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1689         self.assertTrue(len(sessions) >= 4)
1690         for session in sessions:
1691             self.assertFalse(session.is_static)
1692             self.assertEqual(session.inside_ip_address[0:4],
1693                              self.pg4.remote_ip4n)
1694             self.assertEqual(session.outside_ip_address,
1695                              addresses[0].ip_address)
1696
1697         # pg6 session dump
1698         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1699         self.assertTrue(len(sessions) >= 3)
1700         for session in sessions:
1701             self.assertTrue(session.is_static)
1702             self.assertEqual(session.inside_ip_address[0:4],
1703                              self.pg6.remote_ip4n)
1704             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1705                              map(int, static_nat_ip.split('.')))
1706             self.assertTrue(session.inside_port in
1707                             [self.tcp_port_in, self.udp_port_in,
1708                              self.icmp_id_in])
1709
1710     def test_hairpinning(self):
1711         """ NAT44 hairpinning - 1:1 NAPT """
1712
1713         host = self.pg0.remote_hosts[0]
1714         server = self.pg0.remote_hosts[1]
1715         host_in_port = 1234
1716         host_out_port = 0
1717         server_in_port = 5678
1718         server_out_port = 8765
1719
1720         self.nat44_add_address(self.nat_addr)
1721         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1722         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1723                                                   is_inside=0)
1724         # add static mapping for server
1725         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1726                                       server_in_port, server_out_port,
1727                                       proto=IP_PROTOS.tcp)
1728
1729         # send packet from host to server
1730         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1731              IP(src=host.ip4, dst=self.nat_addr) /
1732              TCP(sport=host_in_port, dport=server_out_port))
1733         self.pg0.add_stream(p)
1734         self.pg_enable_capture(self.pg_interfaces)
1735         self.pg_start()
1736         capture = self.pg0.get_capture(1)
1737         p = capture[0]
1738         try:
1739             ip = p[IP]
1740             tcp = p[TCP]
1741             self.assertEqual(ip.src, self.nat_addr)
1742             self.assertEqual(ip.dst, server.ip4)
1743             self.assertNotEqual(tcp.sport, host_in_port)
1744             self.assertEqual(tcp.dport, server_in_port)
1745             self.check_tcp_checksum(p)
1746             host_out_port = tcp.sport
1747         except:
1748             self.logger.error(ppp("Unexpected or invalid packet:", p))
1749             raise
1750
1751         # send reply from server to host
1752         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1753              IP(src=server.ip4, dst=self.nat_addr) /
1754              TCP(sport=server_in_port, dport=host_out_port))
1755         self.pg0.add_stream(p)
1756         self.pg_enable_capture(self.pg_interfaces)
1757         self.pg_start()
1758         capture = self.pg0.get_capture(1)
1759         p = capture[0]
1760         try:
1761             ip = p[IP]
1762             tcp = p[TCP]
1763             self.assertEqual(ip.src, self.nat_addr)
1764             self.assertEqual(ip.dst, host.ip4)
1765             self.assertEqual(tcp.sport, server_out_port)
1766             self.assertEqual(tcp.dport, host_in_port)
1767             self.check_tcp_checksum(p)
1768         except:
1769             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1770             raise
1771
1772     def test_hairpinning2(self):
1773         """ NAT44 hairpinning - 1:1 NAT"""
1774
1775         server1_nat_ip = "10.0.0.10"
1776         server2_nat_ip = "10.0.0.11"
1777         host = self.pg0.remote_hosts[0]
1778         server1 = self.pg0.remote_hosts[1]
1779         server2 = self.pg0.remote_hosts[2]
1780         server_tcp_port = 22
1781         server_udp_port = 20
1782
1783         self.nat44_add_address(self.nat_addr)
1784         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1785         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1786                                                   is_inside=0)
1787
1788         # add static mapping for servers
1789         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1790         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1791
1792         # host to server1
1793         pkts = []
1794         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1795              IP(src=host.ip4, dst=server1_nat_ip) /
1796              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1797         pkts.append(p)
1798         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1799              IP(src=host.ip4, dst=server1_nat_ip) /
1800              UDP(sport=self.udp_port_in, dport=server_udp_port))
1801         pkts.append(p)
1802         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1803              IP(src=host.ip4, dst=server1_nat_ip) /
1804              ICMP(id=self.icmp_id_in, type='echo-request'))
1805         pkts.append(p)
1806         self.pg0.add_stream(pkts)
1807         self.pg_enable_capture(self.pg_interfaces)
1808         self.pg_start()
1809         capture = self.pg0.get_capture(len(pkts))
1810         for packet in capture:
1811             try:
1812                 self.assertEqual(packet[IP].src, self.nat_addr)
1813                 self.assertEqual(packet[IP].dst, server1.ip4)
1814                 if packet.haslayer(TCP):
1815                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1816                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1817                     self.tcp_port_out = packet[TCP].sport
1818                     self.check_tcp_checksum(packet)
1819                 elif packet.haslayer(UDP):
1820                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1821                     self.assertEqual(packet[UDP].dport, server_udp_port)
1822                     self.udp_port_out = packet[UDP].sport
1823                 else:
1824                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1825                     self.icmp_id_out = packet[ICMP].id
1826             except:
1827                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1828                 raise
1829
1830         # server1 to host
1831         pkts = []
1832         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1833              IP(src=server1.ip4, dst=self.nat_addr) /
1834              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1835         pkts.append(p)
1836         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1837              IP(src=server1.ip4, dst=self.nat_addr) /
1838              UDP(sport=server_udp_port, dport=self.udp_port_out))
1839         pkts.append(p)
1840         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1841              IP(src=server1.ip4, dst=self.nat_addr) /
1842              ICMP(id=self.icmp_id_out, type='echo-reply'))
1843         pkts.append(p)
1844         self.pg0.add_stream(pkts)
1845         self.pg_enable_capture(self.pg_interfaces)
1846         self.pg_start()
1847         capture = self.pg0.get_capture(len(pkts))
1848         for packet in capture:
1849             try:
1850                 self.assertEqual(packet[IP].src, server1_nat_ip)
1851                 self.assertEqual(packet[IP].dst, host.ip4)
1852                 if packet.haslayer(TCP):
1853                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1854                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1855                     self.check_tcp_checksum(packet)
1856                 elif packet.haslayer(UDP):
1857                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1858                     self.assertEqual(packet[UDP].sport, server_udp_port)
1859                 else:
1860                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1861             except:
1862                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1863                 raise
1864
1865         # server2 to server1
1866         pkts = []
1867         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1868              IP(src=server2.ip4, dst=server1_nat_ip) /
1869              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1870         pkts.append(p)
1871         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1872              IP(src=server2.ip4, dst=server1_nat_ip) /
1873              UDP(sport=self.udp_port_in, dport=server_udp_port))
1874         pkts.append(p)
1875         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1876              IP(src=server2.ip4, dst=server1_nat_ip) /
1877              ICMP(id=self.icmp_id_in, type='echo-request'))
1878         pkts.append(p)
1879         self.pg0.add_stream(pkts)
1880         self.pg_enable_capture(self.pg_interfaces)
1881         self.pg_start()
1882         capture = self.pg0.get_capture(len(pkts))
1883         for packet in capture:
1884             try:
1885                 self.assertEqual(packet[IP].src, server2_nat_ip)
1886                 self.assertEqual(packet[IP].dst, server1.ip4)
1887                 if packet.haslayer(TCP):
1888                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1889                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1890                     self.tcp_port_out = packet[TCP].sport
1891                     self.check_tcp_checksum(packet)
1892                 elif packet.haslayer(UDP):
1893                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1894                     self.assertEqual(packet[UDP].dport, server_udp_port)
1895                     self.udp_port_out = packet[UDP].sport
1896                 else:
1897                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1898                     self.icmp_id_out = packet[ICMP].id
1899             except:
1900                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1901                 raise
1902
1903         # server1 to server2
1904         pkts = []
1905         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1906              IP(src=server1.ip4, dst=server2_nat_ip) /
1907              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1908         pkts.append(p)
1909         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1910              IP(src=server1.ip4, dst=server2_nat_ip) /
1911              UDP(sport=server_udp_port, dport=self.udp_port_out))
1912         pkts.append(p)
1913         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1914              IP(src=server1.ip4, dst=server2_nat_ip) /
1915              ICMP(id=self.icmp_id_out, type='echo-reply'))
1916         pkts.append(p)
1917         self.pg0.add_stream(pkts)
1918         self.pg_enable_capture(self.pg_interfaces)
1919         self.pg_start()
1920         capture = self.pg0.get_capture(len(pkts))
1921         for packet in capture:
1922             try:
1923                 self.assertEqual(packet[IP].src, server1_nat_ip)
1924                 self.assertEqual(packet[IP].dst, server2.ip4)
1925                 if packet.haslayer(TCP):
1926                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1927                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1928                     self.check_tcp_checksum(packet)
1929                 elif packet.haslayer(UDP):
1930                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1931                     self.assertEqual(packet[UDP].sport, server_udp_port)
1932                 else:
1933                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1934             except:
1935                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1936                 raise
1937
1938     def test_max_translations_per_user(self):
1939         """ MAX translations per user - recycle the least recently used """
1940
1941         self.nat44_add_address(self.nat_addr)
1942         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1943         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1944                                                   is_inside=0)
1945
1946         # get maximum number of translations per user
1947         nat44_config = self.vapi.nat_show_config()
1948
1949         # send more than maximum number of translations per user packets
1950         pkts_num = nat44_config.max_translations_per_user + 5
1951         pkts = []
1952         for port in range(0, pkts_num):
1953             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1954                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1955                  TCP(sport=1025 + port))
1956             pkts.append(p)
1957         self.pg0.add_stream(pkts)
1958         self.pg_enable_capture(self.pg_interfaces)
1959         self.pg_start()
1960
1961         # verify number of translated packet
1962         self.pg1.get_capture(pkts_num)
1963
1964     def test_interface_addr(self):
1965         """ Acquire NAT44 addresses from interface """
1966         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1967
1968         # no address in NAT pool
1969         adresses = self.vapi.nat44_address_dump()
1970         self.assertEqual(0, len(adresses))
1971
1972         # configure interface address and check NAT address pool
1973         self.pg7.config_ip4()
1974         adresses = self.vapi.nat44_address_dump()
1975         self.assertEqual(1, len(adresses))
1976         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1977
1978         # remove interface address and check NAT address pool
1979         self.pg7.unconfig_ip4()
1980         adresses = self.vapi.nat44_address_dump()
1981         self.assertEqual(0, len(adresses))
1982
1983     def test_interface_addr_static_mapping(self):
1984         """ Static mapping with addresses from interface """
1985         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1986         self.nat44_add_static_mapping(
1987             '1.2.3.4',
1988             external_sw_if_index=self.pg7.sw_if_index)
1989
1990         # static mappings with external interface
1991         static_mappings = self.vapi.nat44_static_mapping_dump()
1992         self.assertEqual(1, len(static_mappings))
1993         self.assertEqual(self.pg7.sw_if_index,
1994                          static_mappings[0].external_sw_if_index)
1995
1996         # configure interface address and check static mappings
1997         self.pg7.config_ip4()
1998         static_mappings = self.vapi.nat44_static_mapping_dump()
1999         self.assertEqual(1, len(static_mappings))
2000         self.assertEqual(static_mappings[0].external_ip_address[0:4],
2001                          self.pg7.local_ip4n)
2002         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2003
2004         # remove interface address and check static mappings
2005         self.pg7.unconfig_ip4()
2006         static_mappings = self.vapi.nat44_static_mapping_dump()
2007         self.assertEqual(0, len(static_mappings))
2008
2009     def test_interface_addr_identity_nat(self):
2010         """ Identity NAT with addresses from interface """
2011
2012         port = 53053
2013         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2014         self.vapi.nat44_add_del_identity_mapping(
2015             sw_if_index=self.pg7.sw_if_index,
2016             port=port,
2017             protocol=IP_PROTOS.tcp,
2018             addr_only=0)
2019
2020         # identity mappings with external interface
2021         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2022         self.assertEqual(1, len(identity_mappings))
2023         self.assertEqual(self.pg7.sw_if_index,
2024                          identity_mappings[0].sw_if_index)
2025
2026         # configure interface address and check identity mappings
2027         self.pg7.config_ip4()
2028         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2029         self.assertEqual(1, len(identity_mappings))
2030         self.assertEqual(identity_mappings[0].ip_address,
2031                          self.pg7.local_ip4n)
2032         self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2033         self.assertEqual(port, identity_mappings[0].port)
2034         self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2035
2036         # remove interface address and check identity mappings
2037         self.pg7.unconfig_ip4()
2038         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2039         self.assertEqual(0, len(identity_mappings))
2040
2041     def test_ipfix_nat44_sess(self):
2042         """ IPFIX logging NAT44 session created/delted """
2043         self.ipfix_domain_id = 10
2044         self.ipfix_src_port = 20202
2045         colector_port = 30303
2046         bind_layers(UDP, IPFIX, dport=30303)
2047         self.nat44_add_address(self.nat_addr)
2048         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2049         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2050                                                   is_inside=0)
2051         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2052                                      src_address=self.pg3.local_ip4n,
2053                                      path_mtu=512,
2054                                      template_interval=10,
2055                                      collector_port=colector_port)
2056         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2057                             src_port=self.ipfix_src_port)
2058
2059         pkts = self.create_stream_in(self.pg0, self.pg1)
2060         self.pg0.add_stream(pkts)
2061         self.pg_enable_capture(self.pg_interfaces)
2062         self.pg_start()
2063         capture = self.pg1.get_capture(len(pkts))
2064         self.verify_capture_out(capture)
2065         self.nat44_add_address(self.nat_addr, is_add=0)
2066         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2067         capture = self.pg3.get_capture(3)
2068         ipfix = IPFIXDecoder()
2069         # first load template
2070         for p in capture:
2071             self.assertTrue(p.haslayer(IPFIX))
2072             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2073             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2074             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2075             self.assertEqual(p[UDP].dport, colector_port)
2076             self.assertEqual(p[IPFIX].observationDomainID,
2077                              self.ipfix_domain_id)
2078             if p.haslayer(Template):
2079                 ipfix.add_template(p.getlayer(Template))
2080         # verify events in data set
2081         for p in capture:
2082             if p.haslayer(Data):
2083                 data = ipfix.decode_data_set(p.getlayer(Set))
2084                 self.verify_ipfix_nat44_ses(data)
2085
2086     def test_ipfix_addr_exhausted(self):
2087         """ IPFIX logging NAT addresses exhausted """
2088         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2089         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2090                                                   is_inside=0)
2091         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2092                                      src_address=self.pg3.local_ip4n,
2093                                      path_mtu=512,
2094                                      template_interval=10)
2095         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2096                             src_port=self.ipfix_src_port)
2097
2098         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2099              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2100              TCP(sport=3025))
2101         self.pg0.add_stream(p)
2102         self.pg_enable_capture(self.pg_interfaces)
2103         self.pg_start()
2104         capture = self.pg1.get_capture(0)
2105         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2106         capture = self.pg3.get_capture(3)
2107         ipfix = IPFIXDecoder()
2108         # first load template
2109         for p in capture:
2110             self.assertTrue(p.haslayer(IPFIX))
2111             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2112             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2113             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2114             self.assertEqual(p[UDP].dport, 4739)
2115             self.assertEqual(p[IPFIX].observationDomainID,
2116                              self.ipfix_domain_id)
2117             if p.haslayer(Template):
2118                 ipfix.add_template(p.getlayer(Template))
2119         # verify events in data set
2120         for p in capture:
2121             if p.haslayer(Data):
2122                 data = ipfix.decode_data_set(p.getlayer(Set))
2123                 self.verify_ipfix_addr_exhausted(data)
2124
2125     def test_pool_addr_fib(self):
2126         """ NAT44 add pool addresses to FIB """
2127         static_addr = '10.0.0.10'
2128         self.nat44_add_address(self.nat_addr)
2129         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2130         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2131                                                   is_inside=0)
2132         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2133
2134         # NAT44 address
2135         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2136              ARP(op=ARP.who_has, pdst=self.nat_addr,
2137                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2138         self.pg1.add_stream(p)
2139         self.pg_enable_capture(self.pg_interfaces)
2140         self.pg_start()
2141         capture = self.pg1.get_capture(1)
2142         self.assertTrue(capture[0].haslayer(ARP))
2143         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2144
2145         # 1:1 NAT address
2146         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2147              ARP(op=ARP.who_has, pdst=static_addr,
2148                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2149         self.pg1.add_stream(p)
2150         self.pg_enable_capture(self.pg_interfaces)
2151         self.pg_start()
2152         capture = self.pg1.get_capture(1)
2153         self.assertTrue(capture[0].haslayer(ARP))
2154         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2155
2156         # send ARP to non-NAT44 interface
2157         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2158              ARP(op=ARP.who_has, pdst=self.nat_addr,
2159                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2160         self.pg2.add_stream(p)
2161         self.pg_enable_capture(self.pg_interfaces)
2162         self.pg_start()
2163         capture = self.pg1.get_capture(0)
2164
2165         # remove addresses and verify
2166         self.nat44_add_address(self.nat_addr, is_add=0)
2167         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2168                                       is_add=0)
2169
2170         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2171              ARP(op=ARP.who_has, pdst=self.nat_addr,
2172                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2173         self.pg1.add_stream(p)
2174         self.pg_enable_capture(self.pg_interfaces)
2175         self.pg_start()
2176         capture = self.pg1.get_capture(0)
2177
2178         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2179              ARP(op=ARP.who_has, pdst=static_addr,
2180                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2181         self.pg1.add_stream(p)
2182         self.pg_enable_capture(self.pg_interfaces)
2183         self.pg_start()
2184         capture = self.pg1.get_capture(0)
2185
2186     def test_vrf_mode(self):
2187         """ NAT44 tenant VRF aware address pool mode """
2188
2189         vrf_id1 = 1
2190         vrf_id2 = 2
2191         nat_ip1 = "10.0.0.10"
2192         nat_ip2 = "10.0.0.11"
2193
2194         self.pg0.unconfig_ip4()
2195         self.pg1.unconfig_ip4()
2196         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2197         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2198         self.pg0.set_table_ip4(vrf_id1)
2199         self.pg1.set_table_ip4(vrf_id2)
2200         self.pg0.config_ip4()
2201         self.pg1.config_ip4()
2202
2203         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2204         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2205         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2206         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2207         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2208                                                   is_inside=0)
2209
2210         # first VRF
2211         pkts = self.create_stream_in(self.pg0, self.pg2)
2212         self.pg0.add_stream(pkts)
2213         self.pg_enable_capture(self.pg_interfaces)
2214         self.pg_start()
2215         capture = self.pg2.get_capture(len(pkts))
2216         self.verify_capture_out(capture, nat_ip1)
2217
2218         # second VRF
2219         pkts = self.create_stream_in(self.pg1, self.pg2)
2220         self.pg1.add_stream(pkts)
2221         self.pg_enable_capture(self.pg_interfaces)
2222         self.pg_start()
2223         capture = self.pg2.get_capture(len(pkts))
2224         self.verify_capture_out(capture, nat_ip2)
2225
2226         self.pg0.unconfig_ip4()
2227         self.pg1.unconfig_ip4()
2228         self.pg0.set_table_ip4(0)
2229         self.pg1.set_table_ip4(0)
2230         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2231         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2232
2233     def test_vrf_feature_independent(self):
2234         """ NAT44 tenant VRF independent address pool mode """
2235
2236         nat_ip1 = "10.0.0.10"
2237         nat_ip2 = "10.0.0.11"
2238
2239         self.nat44_add_address(nat_ip1)
2240         self.nat44_add_address(nat_ip2, vrf_id=99)
2241         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2242         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2243         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2244                                                   is_inside=0)
2245
2246         # first VRF
2247         pkts = self.create_stream_in(self.pg0, self.pg2)
2248         self.pg0.add_stream(pkts)
2249         self.pg_enable_capture(self.pg_interfaces)
2250         self.pg_start()
2251         capture = self.pg2.get_capture(len(pkts))
2252         self.verify_capture_out(capture, nat_ip1)
2253
2254         # second VRF
2255         pkts = self.create_stream_in(self.pg1, self.pg2)
2256         self.pg1.add_stream(pkts)
2257         self.pg_enable_capture(self.pg_interfaces)
2258         self.pg_start()
2259         capture = self.pg2.get_capture(len(pkts))
2260         self.verify_capture_out(capture, nat_ip1)
2261
2262     def test_dynamic_ipless_interfaces(self):
2263         """ NAT44 interfaces without configured IP address """
2264
2265         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2266                                       mactobinary(self.pg7.remote_mac),
2267                                       self.pg7.remote_ip4n,
2268                                       is_static=1)
2269         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2270                                       mactobinary(self.pg8.remote_mac),
2271                                       self.pg8.remote_ip4n,
2272                                       is_static=1)
2273
2274         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2275                                    dst_address_length=32,
2276                                    next_hop_address=self.pg7.remote_ip4n,
2277                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2278         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2279                                    dst_address_length=32,
2280                                    next_hop_address=self.pg8.remote_ip4n,
2281                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2282
2283         self.nat44_add_address(self.nat_addr)
2284         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2285         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2286                                                   is_inside=0)
2287
2288         # in2out
2289         pkts = self.create_stream_in(self.pg7, self.pg8)
2290         self.pg7.add_stream(pkts)
2291         self.pg_enable_capture(self.pg_interfaces)
2292         self.pg_start()
2293         capture = self.pg8.get_capture(len(pkts))
2294         self.verify_capture_out(capture)
2295
2296         # out2in
2297         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2298         self.pg8.add_stream(pkts)
2299         self.pg_enable_capture(self.pg_interfaces)
2300         self.pg_start()
2301         capture = self.pg7.get_capture(len(pkts))
2302         self.verify_capture_in(capture, self.pg7)
2303
2304     def test_static_ipless_interfaces(self):
2305         """ NAT44 interfaces without configured IP address - 1:1 NAT """
2306
2307         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2308                                       mactobinary(self.pg7.remote_mac),
2309                                       self.pg7.remote_ip4n,
2310                                       is_static=1)
2311         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2312                                       mactobinary(self.pg8.remote_mac),
2313                                       self.pg8.remote_ip4n,
2314                                       is_static=1)
2315
2316         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2317                                    dst_address_length=32,
2318                                    next_hop_address=self.pg7.remote_ip4n,
2319                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2320         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2321                                    dst_address_length=32,
2322                                    next_hop_address=self.pg8.remote_ip4n,
2323                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2324
2325         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2326         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2327         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2328                                                   is_inside=0)
2329
2330         # out2in
2331         pkts = self.create_stream_out(self.pg8)
2332         self.pg8.add_stream(pkts)
2333         self.pg_enable_capture(self.pg_interfaces)
2334         self.pg_start()
2335         capture = self.pg7.get_capture(len(pkts))
2336         self.verify_capture_in(capture, self.pg7)
2337
2338         # in2out
2339         pkts = self.create_stream_in(self.pg7, self.pg8)
2340         self.pg7.add_stream(pkts)
2341         self.pg_enable_capture(self.pg_interfaces)
2342         self.pg_start()
2343         capture = self.pg8.get_capture(len(pkts))
2344         self.verify_capture_out(capture, self.nat_addr, True)
2345
2346     def test_static_with_port_ipless_interfaces(self):
2347         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2348
2349         self.tcp_port_out = 30606
2350         self.udp_port_out = 30607
2351         self.icmp_id_out = 30608
2352
2353         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2354                                       mactobinary(self.pg7.remote_mac),
2355                                       self.pg7.remote_ip4n,
2356                                       is_static=1)
2357         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2358                                       mactobinary(self.pg8.remote_mac),
2359                                       self.pg8.remote_ip4n,
2360                                       is_static=1)
2361
2362         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2363                                    dst_address_length=32,
2364                                    next_hop_address=self.pg7.remote_ip4n,
2365                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2366         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2367                                    dst_address_length=32,
2368                                    next_hop_address=self.pg8.remote_ip4n,
2369                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2370
2371         self.nat44_add_address(self.nat_addr)
2372         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2373                                       self.tcp_port_in, self.tcp_port_out,
2374                                       proto=IP_PROTOS.tcp)
2375         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2376                                       self.udp_port_in, self.udp_port_out,
2377                                       proto=IP_PROTOS.udp)
2378         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2379                                       self.icmp_id_in, self.icmp_id_out,
2380                                       proto=IP_PROTOS.icmp)
2381         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2382         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2383                                                   is_inside=0)
2384
2385         # out2in
2386         pkts = self.create_stream_out(self.pg8)
2387         self.pg8.add_stream(pkts)
2388         self.pg_enable_capture(self.pg_interfaces)
2389         self.pg_start()
2390         capture = self.pg7.get_capture(len(pkts))
2391         self.verify_capture_in(capture, self.pg7)
2392
2393         # in2out
2394         pkts = self.create_stream_in(self.pg7, self.pg8)
2395         self.pg7.add_stream(pkts)
2396         self.pg_enable_capture(self.pg_interfaces)
2397         self.pg_start()
2398         capture = self.pg8.get_capture(len(pkts))
2399         self.verify_capture_out(capture)
2400
2401     def test_static_unknown_proto(self):
2402         """ 1:1 NAT translate packet with unknown protocol """
2403         nat_ip = "10.0.0.10"
2404         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2405         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2406         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2407                                                   is_inside=0)
2408
2409         # in2out
2410         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2411              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2412              GRE() /
2413              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2414              TCP(sport=1234, dport=1234))
2415         self.pg0.add_stream(p)
2416         self.pg_enable_capture(self.pg_interfaces)
2417         self.pg_start()
2418         p = self.pg1.get_capture(1)
2419         packet = p[0]
2420         try:
2421             self.assertEqual(packet[IP].src, nat_ip)
2422             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2423             self.assertTrue(packet.haslayer(GRE))
2424             self.check_ip_checksum(packet)
2425         except:
2426             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2427             raise
2428
2429         # out2in
2430         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2431              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2432              GRE() /
2433              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2434              TCP(sport=1234, dport=1234))
2435         self.pg1.add_stream(p)
2436         self.pg_enable_capture(self.pg_interfaces)
2437         self.pg_start()
2438         p = self.pg0.get_capture(1)
2439         packet = p[0]
2440         try:
2441             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2442             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2443             self.assertTrue(packet.haslayer(GRE))
2444             self.check_ip_checksum(packet)
2445         except:
2446             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2447             raise
2448
2449     def test_hairpinning_static_unknown_proto(self):
2450         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2451
2452         host = self.pg0.remote_hosts[0]
2453         server = self.pg0.remote_hosts[1]
2454
2455         host_nat_ip = "10.0.0.10"
2456         server_nat_ip = "10.0.0.11"
2457
2458         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2459         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2460         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2461         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2462                                                   is_inside=0)
2463
2464         # host to server
2465         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2466              IP(src=host.ip4, dst=server_nat_ip) /
2467              GRE() /
2468              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2469              TCP(sport=1234, dport=1234))
2470         self.pg0.add_stream(p)
2471         self.pg_enable_capture(self.pg_interfaces)
2472         self.pg_start()
2473         p = self.pg0.get_capture(1)
2474         packet = p[0]
2475         try:
2476             self.assertEqual(packet[IP].src, host_nat_ip)
2477             self.assertEqual(packet[IP].dst, server.ip4)
2478             self.assertTrue(packet.haslayer(GRE))
2479             self.check_ip_checksum(packet)
2480         except:
2481             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2482             raise
2483
2484         # server to host
2485         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2486              IP(src=server.ip4, dst=host_nat_ip) /
2487              GRE() /
2488              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2489              TCP(sport=1234, dport=1234))
2490         self.pg0.add_stream(p)
2491         self.pg_enable_capture(self.pg_interfaces)
2492         self.pg_start()
2493         p = self.pg0.get_capture(1)
2494         packet = p[0]
2495         try:
2496             self.assertEqual(packet[IP].src, server_nat_ip)
2497             self.assertEqual(packet[IP].dst, host.ip4)
2498             self.assertTrue(packet.haslayer(GRE))
2499             self.check_ip_checksum(packet)
2500         except:
2501             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2502             raise
2503
2504     def test_unknown_proto(self):
2505         """ NAT44 translate packet with unknown protocol """
2506         self.nat44_add_address(self.nat_addr)
2507         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2508         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2509                                                   is_inside=0)
2510
2511         # in2out
2512         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2513              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2514              TCP(sport=self.tcp_port_in, dport=20))
2515         self.pg0.add_stream(p)
2516         self.pg_enable_capture(self.pg_interfaces)
2517         self.pg_start()
2518         p = self.pg1.get_capture(1)
2519
2520         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2521              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2522              GRE() /
2523              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2524              TCP(sport=1234, dport=1234))
2525         self.pg0.add_stream(p)
2526         self.pg_enable_capture(self.pg_interfaces)
2527         self.pg_start()
2528         p = self.pg1.get_capture(1)
2529         packet = p[0]
2530         try:
2531             self.assertEqual(packet[IP].src, self.nat_addr)
2532             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2533             self.assertTrue(packet.haslayer(GRE))
2534             self.check_ip_checksum(packet)
2535         except:
2536             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2537             raise
2538
2539         # out2in
2540         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2541              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2542              GRE() /
2543              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2544              TCP(sport=1234, dport=1234))
2545         self.pg1.add_stream(p)
2546         self.pg_enable_capture(self.pg_interfaces)
2547         self.pg_start()
2548         p = self.pg0.get_capture(1)
2549         packet = p[0]
2550         try:
2551             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2552             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2553             self.assertTrue(packet.haslayer(GRE))
2554             self.check_ip_checksum(packet)
2555         except:
2556             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2557             raise
2558
2559     def test_hairpinning_unknown_proto(self):
2560         """ NAT44 translate packet with unknown protocol - hairpinning """
2561         host = self.pg0.remote_hosts[0]
2562         server = self.pg0.remote_hosts[1]
2563         host_in_port = 1234
2564         host_out_port = 0
2565         server_in_port = 5678
2566         server_out_port = 8765
2567         server_nat_ip = "10.0.0.11"
2568
2569         self.nat44_add_address(self.nat_addr)
2570         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2571         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2572                                                   is_inside=0)
2573
2574         # add static mapping for server
2575         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2576
2577         # host to server
2578         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2579              IP(src=host.ip4, dst=server_nat_ip) /
2580              TCP(sport=host_in_port, dport=server_out_port))
2581         self.pg0.add_stream(p)
2582         self.pg_enable_capture(self.pg_interfaces)
2583         self.pg_start()
2584         capture = self.pg0.get_capture(1)
2585
2586         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2587              IP(src=host.ip4, dst=server_nat_ip) /
2588              GRE() /
2589              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2590              TCP(sport=1234, dport=1234))
2591         self.pg0.add_stream(p)
2592         self.pg_enable_capture(self.pg_interfaces)
2593         self.pg_start()
2594         p = self.pg0.get_capture(1)
2595         packet = p[0]
2596         try:
2597             self.assertEqual(packet[IP].src, self.nat_addr)
2598             self.assertEqual(packet[IP].dst, server.ip4)
2599             self.assertTrue(packet.haslayer(GRE))
2600             self.check_ip_checksum(packet)
2601         except:
2602             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2603             raise
2604
2605         # server to host
2606         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2607              IP(src=server.ip4, dst=self.nat_addr) /
2608              GRE() /
2609              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2610              TCP(sport=1234, dport=1234))
2611         self.pg0.add_stream(p)
2612         self.pg_enable_capture(self.pg_interfaces)
2613         self.pg_start()
2614         p = self.pg0.get_capture(1)
2615         packet = p[0]
2616         try:
2617             self.assertEqual(packet[IP].src, server_nat_ip)
2618             self.assertEqual(packet[IP].dst, host.ip4)
2619             self.assertTrue(packet.haslayer(GRE))
2620             self.check_ip_checksum(packet)
2621         except:
2622             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2623             raise
2624
2625     def test_output_feature(self):
2626         """ NAT44 interface output feature (in2out postrouting) """
2627         self.nat44_add_address(self.nat_addr)
2628         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2629         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2630         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2631                                                          is_inside=0)
2632
2633         # in2out
2634         pkts = self.create_stream_in(self.pg0, self.pg3)
2635         self.pg0.add_stream(pkts)
2636         self.pg_enable_capture(self.pg_interfaces)
2637         self.pg_start()
2638         capture = self.pg3.get_capture(len(pkts))
2639         self.verify_capture_out(capture)
2640
2641         # out2in
2642         pkts = self.create_stream_out(self.pg3)
2643         self.pg3.add_stream(pkts)
2644         self.pg_enable_capture(self.pg_interfaces)
2645         self.pg_start()
2646         capture = self.pg0.get_capture(len(pkts))
2647         self.verify_capture_in(capture, self.pg0)
2648
2649         # from non-NAT interface to NAT inside interface
2650         pkts = self.create_stream_in(self.pg2, self.pg0)
2651         self.pg2.add_stream(pkts)
2652         self.pg_enable_capture(self.pg_interfaces)
2653         self.pg_start()
2654         capture = self.pg0.get_capture(len(pkts))
2655         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2656
2657     def test_output_feature_vrf_aware(self):
2658         """ NAT44 interface output feature VRF aware (in2out postrouting) """
2659         nat_ip_vrf10 = "10.0.0.10"
2660         nat_ip_vrf20 = "10.0.0.20"
2661
2662         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2663                                    dst_address_length=32,
2664                                    next_hop_address=self.pg3.remote_ip4n,
2665                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2666                                    table_id=10)
2667         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2668                                    dst_address_length=32,
2669                                    next_hop_address=self.pg3.remote_ip4n,
2670                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2671                                    table_id=20)
2672
2673         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2674         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2675         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2676         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2677         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2678                                                          is_inside=0)
2679
2680         # in2out VRF 10
2681         pkts = self.create_stream_in(self.pg4, self.pg3)
2682         self.pg4.add_stream(pkts)
2683         self.pg_enable_capture(self.pg_interfaces)
2684         self.pg_start()
2685         capture = self.pg3.get_capture(len(pkts))
2686         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2687
2688         # out2in VRF 10
2689         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2690         self.pg3.add_stream(pkts)
2691         self.pg_enable_capture(self.pg_interfaces)
2692         self.pg_start()
2693         capture = self.pg4.get_capture(len(pkts))
2694         self.verify_capture_in(capture, self.pg4)
2695
2696         # in2out VRF 20
2697         pkts = self.create_stream_in(self.pg6, self.pg3)
2698         self.pg6.add_stream(pkts)
2699         self.pg_enable_capture(self.pg_interfaces)
2700         self.pg_start()
2701         capture = self.pg3.get_capture(len(pkts))
2702         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2703
2704         # out2in VRF 20
2705         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2706         self.pg3.add_stream(pkts)
2707         self.pg_enable_capture(self.pg_interfaces)
2708         self.pg_start()
2709         capture = self.pg6.get_capture(len(pkts))
2710         self.verify_capture_in(capture, self.pg6)
2711
2712     def test_output_feature_hairpinning(self):
2713         """ NAT44 interface output feature hairpinning (in2out postrouting) """
2714         host = self.pg0.remote_hosts[0]
2715         server = self.pg0.remote_hosts[1]
2716         host_in_port = 1234
2717         host_out_port = 0
2718         server_in_port = 5678
2719         server_out_port = 8765
2720
2721         self.nat44_add_address(self.nat_addr)
2722         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2723         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2724                                                          is_inside=0)
2725
2726         # add static mapping for server
2727         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2728                                       server_in_port, server_out_port,
2729                                       proto=IP_PROTOS.tcp)
2730
2731         # send packet from host to server
2732         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2733              IP(src=host.ip4, dst=self.nat_addr) /
2734              TCP(sport=host_in_port, dport=server_out_port))
2735         self.pg0.add_stream(p)
2736         self.pg_enable_capture(self.pg_interfaces)
2737         self.pg_start()
2738         capture = self.pg0.get_capture(1)
2739         p = capture[0]
2740         try:
2741             ip = p[IP]
2742             tcp = p[TCP]
2743             self.assertEqual(ip.src, self.nat_addr)
2744             self.assertEqual(ip.dst, server.ip4)
2745             self.assertNotEqual(tcp.sport, host_in_port)
2746             self.assertEqual(tcp.dport, server_in_port)
2747             self.check_tcp_checksum(p)
2748             host_out_port = tcp.sport
2749         except:
2750             self.logger.error(ppp("Unexpected or invalid packet:", p))
2751             raise
2752
2753         # send reply from server to host
2754         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2755              IP(src=server.ip4, dst=self.nat_addr) /
2756              TCP(sport=server_in_port, dport=host_out_port))
2757         self.pg0.add_stream(p)
2758         self.pg_enable_capture(self.pg_interfaces)
2759         self.pg_start()
2760         capture = self.pg0.get_capture(1)
2761         p = capture[0]
2762         try:
2763             ip = p[IP]
2764             tcp = p[TCP]
2765             self.assertEqual(ip.src, self.nat_addr)
2766             self.assertEqual(ip.dst, host.ip4)
2767             self.assertEqual(tcp.sport, server_out_port)
2768             self.assertEqual(tcp.dport, host_in_port)
2769             self.check_tcp_checksum(p)
2770         except:
2771             self.logger.error(ppp("Unexpected or invalid packet:"), p)
2772             raise
2773
2774     def test_one_armed_nat44(self):
2775         """ One armed NAT44 """
2776         remote_host = self.pg9.remote_hosts[0]
2777         local_host = self.pg9.remote_hosts[1]
2778         external_port = 0
2779
2780         self.nat44_add_address(self.nat_addr)
2781         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2782         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2783                                                   is_inside=0)
2784
2785         # in2out
2786         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2787              IP(src=local_host.ip4, dst=remote_host.ip4) /
2788              TCP(sport=12345, dport=80))
2789         self.pg9.add_stream(p)
2790         self.pg_enable_capture(self.pg_interfaces)
2791         self.pg_start()
2792         capture = self.pg9.get_capture(1)
2793         p = capture[0]
2794         try:
2795             ip = p[IP]
2796             tcp = p[TCP]
2797             self.assertEqual(ip.src, self.nat_addr)
2798             self.assertEqual(ip.dst, remote_host.ip4)
2799             self.assertNotEqual(tcp.sport, 12345)
2800             external_port = tcp.sport
2801             self.assertEqual(tcp.dport, 80)
2802             self.check_tcp_checksum(p)
2803             self.check_ip_checksum(p)
2804         except:
2805             self.logger.error(ppp("Unexpected or invalid packet:", p))
2806             raise
2807
2808         # out2in
2809         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2810              IP(src=remote_host.ip4, dst=self.nat_addr) /
2811              TCP(sport=80, dport=external_port))
2812         self.pg9.add_stream(p)
2813         self.pg_enable_capture(self.pg_interfaces)
2814         self.pg_start()
2815         capture = self.pg9.get_capture(1)
2816         p = capture[0]
2817         try:
2818             ip = p[IP]
2819             tcp = p[TCP]
2820             self.assertEqual(ip.src, remote_host.ip4)
2821             self.assertEqual(ip.dst, local_host.ip4)
2822             self.assertEqual(tcp.sport, 80)
2823             self.assertEqual(tcp.dport, 12345)
2824             self.check_tcp_checksum(p)
2825             self.check_ip_checksum(p)
2826         except:
2827             self.logger.error(ppp("Unexpected or invalid packet:", p))
2828             raise
2829
2830     def test_del_session(self):
2831         """ Delete NAT44 session """
2832         self.nat44_add_address(self.nat_addr)
2833         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2834         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2835                                                   is_inside=0)
2836
2837         pkts = self.create_stream_in(self.pg0, self.pg1)
2838         self.pg0.add_stream(pkts)
2839         self.pg_enable_capture(self.pg_interfaces)
2840         self.pg_start()
2841         capture = self.pg1.get_capture(len(pkts))
2842
2843         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2844         nsessions = len(sessions)
2845
2846         self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2847                                     sessions[0].inside_port,
2848                                     sessions[0].protocol)
2849         self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2850                                     sessions[1].outside_port,
2851                                     sessions[1].protocol,
2852                                     is_in=0)
2853
2854         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2855         self.assertEqual(nsessions - len(sessions), 2)
2856
2857     def test_set_get_reass(self):
2858         """ NAT44 set/get virtual fragmentation reassembly """
2859         reas_cfg1 = self.vapi.nat_get_reass()
2860
2861         self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2862                                 max_reass=reas_cfg1.ip4_max_reass * 2,
2863                                 max_frag=reas_cfg1.ip4_max_frag * 2)
2864
2865         reas_cfg2 = self.vapi.nat_get_reass()
2866
2867         self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2868         self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2869         self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2870
2871         self.vapi.nat_set_reass(drop_frag=1)
2872         self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2873
2874     def test_frag_in_order(self):
2875         """ NAT44 translate fragments arriving in order """
2876         self.nat44_add_address(self.nat_addr)
2877         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2878         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2879                                                   is_inside=0)
2880
2881         data = "A" * 4 + "B" * 16 + "C" * 3
2882         self.tcp_port_in = random.randint(1025, 65535)
2883
2884         reass = self.vapi.nat_reass_dump()
2885         reass_n_start = len(reass)
2886
2887         # in2out
2888         pkts = self.create_stream_frag(self.pg0,
2889                                        self.pg1.remote_ip4,
2890                                        self.tcp_port_in,
2891                                        20,
2892                                        data)
2893         self.pg0.add_stream(pkts)
2894         self.pg_enable_capture(self.pg_interfaces)
2895         self.pg_start()
2896         frags = self.pg1.get_capture(len(pkts))
2897         p = self.reass_frags_and_verify(frags,
2898                                         self.nat_addr,
2899                                         self.pg1.remote_ip4)
2900         self.assertEqual(p[TCP].dport, 20)
2901         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2902         self.tcp_port_out = p[TCP].sport
2903         self.assertEqual(data, p[Raw].load)
2904
2905         # out2in
2906         pkts = self.create_stream_frag(self.pg1,
2907                                        self.nat_addr,
2908                                        20,
2909                                        self.tcp_port_out,
2910                                        data)
2911         self.pg1.add_stream(pkts)
2912         self.pg_enable_capture(self.pg_interfaces)
2913         self.pg_start()
2914         frags = self.pg0.get_capture(len(pkts))
2915         p = self.reass_frags_and_verify(frags,
2916                                         self.pg1.remote_ip4,
2917                                         self.pg0.remote_ip4)
2918         self.assertEqual(p[TCP].sport, 20)
2919         self.assertEqual(p[TCP].dport, self.tcp_port_in)
2920         self.assertEqual(data, p[Raw].load)
2921
2922         reass = self.vapi.nat_reass_dump()
2923         reass_n_end = len(reass)
2924
2925         self.assertEqual(reass_n_end - reass_n_start, 2)
2926
2927     def test_reass_hairpinning(self):
2928         """ NAT44 fragments hairpinning """
2929         host = self.pg0.remote_hosts[0]
2930         server = self.pg0.remote_hosts[1]
2931         host_in_port = random.randint(1025, 65535)
2932         host_out_port = 0
2933         server_in_port = random.randint(1025, 65535)
2934         server_out_port = random.randint(1025, 65535)
2935         data = "A" * 4 + "B" * 16 + "C" * 3
2936
2937         self.nat44_add_address(self.nat_addr)
2938         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2939         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2940                                                   is_inside=0)
2941         # add static mapping for server
2942         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2943                                       server_in_port, server_out_port,
2944                                       proto=IP_PROTOS.tcp)
2945
2946         # send packet from host to server
2947         pkts = self.create_stream_frag(self.pg0,
2948                                        self.nat_addr,
2949                                        host_in_port,
2950                                        server_out_port,
2951                                        data)
2952         self.pg0.add_stream(pkts)
2953         self.pg_enable_capture(self.pg_interfaces)
2954         self.pg_start()
2955         frags = self.pg0.get_capture(len(pkts))
2956         p = self.reass_frags_and_verify(frags,
2957                                         self.nat_addr,
2958                                         server.ip4)
2959         self.assertNotEqual(p[TCP].sport, host_in_port)
2960         self.assertEqual(p[TCP].dport, server_in_port)
2961         self.assertEqual(data, p[Raw].load)
2962
2963     def test_frag_out_of_order(self):
2964         """ NAT44 translate fragments arriving out of order """
2965         self.nat44_add_address(self.nat_addr)
2966         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2967         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2968                                                   is_inside=0)
2969
2970         data = "A" * 4 + "B" * 16 + "C" * 3
2971         random.randint(1025, 65535)
2972
2973         # in2out
2974         pkts = self.create_stream_frag(self.pg0,
2975                                        self.pg1.remote_ip4,
2976                                        self.tcp_port_in,
2977                                        20,
2978                                        data)
2979         pkts.reverse()
2980         self.pg0.add_stream(pkts)
2981         self.pg_enable_capture(self.pg_interfaces)
2982         self.pg_start()
2983         frags = self.pg1.get_capture(len(pkts))
2984         p = self.reass_frags_and_verify(frags,
2985                                         self.nat_addr,
2986                                         self.pg1.remote_ip4)
2987         self.assertEqual(p[TCP].dport, 20)
2988         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2989         self.tcp_port_out = p[TCP].sport
2990         self.assertEqual(data, p[Raw].load)
2991
2992         # out2in
2993         pkts = self.create_stream_frag(self.pg1,
2994                                        self.nat_addr,
2995                                        20,
2996                                        self.tcp_port_out,
2997                                        data)
2998         pkts.reverse()
2999         self.pg1.add_stream(pkts)
3000         self.pg_enable_capture(self.pg_interfaces)
3001         self.pg_start()
3002         frags = self.pg0.get_capture(len(pkts))
3003         p = self.reass_frags_and_verify(frags,
3004                                         self.pg1.remote_ip4,
3005                                         self.pg0.remote_ip4)
3006         self.assertEqual(p[TCP].sport, 20)
3007         self.assertEqual(p[TCP].dport, self.tcp_port_in)
3008         self.assertEqual(data, p[Raw].load)
3009
3010     def test_port_restricted(self):
3011         """ Port restricted NAT44 (MAP-E CE) """
3012         self.nat44_add_address(self.nat_addr)
3013         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3014         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3015                                                   is_inside=0)
3016         self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3017                       "psid-offset 6 psid-len 6")
3018
3019         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3020              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3021              TCP(sport=4567, dport=22))
3022         self.pg0.add_stream(p)
3023         self.pg_enable_capture(self.pg_interfaces)
3024         self.pg_start()
3025         capture = self.pg1.get_capture(1)
3026         p = capture[0]
3027         try:
3028             ip = p[IP]
3029             tcp = p[TCP]
3030             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3031             self.assertEqual(ip.src, self.nat_addr)
3032             self.assertEqual(tcp.dport, 22)
3033             self.assertNotEqual(tcp.sport, 4567)
3034             self.assertEqual((tcp.sport >> 6) & 63, 10)
3035             self.check_tcp_checksum(p)
3036             self.check_ip_checksum(p)
3037         except:
3038             self.logger.error(ppp("Unexpected or invalid packet:", p))
3039             raise
3040
3041     def test_twice_nat(self):
3042         """ Twice NAT44 """
3043         twice_nat_addr = '10.0.1.3'
3044         port_in = 8080
3045         port_out = 80
3046         eh_port_out = 4567
3047         eh_port_in = 0
3048         self.nat44_add_address(self.nat_addr)
3049         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3050         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3051                                       port_in, port_out, proto=IP_PROTOS.tcp,
3052                                       twice_nat=1)
3053         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3054         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3055                                                   is_inside=0)
3056
3057         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3058              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3059              TCP(sport=eh_port_out, dport=port_out))
3060         self.pg1.add_stream(p)
3061         self.pg_enable_capture(self.pg_interfaces)
3062         self.pg_start()
3063         capture = self.pg0.get_capture(1)
3064         p = capture[0]
3065         try:
3066             ip = p[IP]
3067             tcp = p[TCP]
3068             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3069             self.assertEqual(ip.src, twice_nat_addr)
3070             self.assertEqual(tcp.dport, port_in)
3071             self.assertNotEqual(tcp.sport, eh_port_out)
3072             eh_port_in = tcp.sport
3073             self.check_tcp_checksum(p)
3074             self.check_ip_checksum(p)
3075         except:
3076             self.logger.error(ppp("Unexpected or invalid packet:", p))
3077             raise
3078
3079         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3080              IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3081              TCP(sport=port_in, dport=eh_port_in))
3082         self.pg0.add_stream(p)
3083         self.pg_enable_capture(self.pg_interfaces)
3084         self.pg_start()
3085         capture = self.pg1.get_capture(1)
3086         p = capture[0]
3087         try:
3088             ip = p[IP]
3089             tcp = p[TCP]
3090             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3091             self.assertEqual(ip.src, self.nat_addr)
3092             self.assertEqual(tcp.dport, eh_port_out)
3093             self.assertEqual(tcp.sport, port_out)
3094             self.check_tcp_checksum(p)
3095             self.check_ip_checksum(p)
3096         except:
3097             self.logger.error(ppp("Unexpected or invalid packet:", p))
3098             raise
3099
3100     def test_twice_nat_lb(self):
3101         """ Twice NAT44 local service load balancing """
3102         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3103         twice_nat_addr = '10.0.1.3'
3104         local_port = 8080
3105         external_port = 80
3106         eh_port_out = 4567
3107         eh_port_in = 0
3108         server1 = self.pg0.remote_hosts[0]
3109         server2 = self.pg0.remote_hosts[1]
3110
3111         locals = [{'addr': server1.ip4n,
3112                    'port': local_port,
3113                    'probability': 50},
3114                   {'addr': server2.ip4n,
3115                    'port': local_port,
3116                    'probability': 50}]
3117
3118         self.nat44_add_address(self.nat_addr)
3119         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3120
3121         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3122                                                   external_port,
3123                                                   IP_PROTOS.tcp,
3124                                                   twice_nat=1,
3125                                                   local_num=len(locals),
3126                                                   locals=locals)
3127         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3128         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3129                                                   is_inside=0)
3130
3131         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3132              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3133              TCP(sport=eh_port_out, dport=external_port))
3134         self.pg1.add_stream(p)
3135         self.pg_enable_capture(self.pg_interfaces)
3136         self.pg_start()
3137         capture = self.pg0.get_capture(1)
3138         p = capture[0]
3139         server = None
3140         try:
3141             ip = p[IP]
3142             tcp = p[TCP]
3143             self.assertEqual(ip.src, twice_nat_addr)
3144             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3145             if ip.dst == server1.ip4:
3146                 server = server1
3147             else:
3148                 server = server2
3149             self.assertNotEqual(tcp.sport, eh_port_out)
3150             eh_port_in = tcp.sport
3151             self.assertEqual(tcp.dport, local_port)
3152             self.check_tcp_checksum(p)
3153             self.check_ip_checksum(p)
3154         except:
3155             self.logger.error(ppp("Unexpected or invalid packet:", p))
3156             raise
3157
3158         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3159              IP(src=server.ip4, dst=twice_nat_addr) /
3160              TCP(sport=local_port, dport=eh_port_in))
3161         self.pg0.add_stream(p)
3162         self.pg_enable_capture(self.pg_interfaces)
3163         self.pg_start()
3164         capture = self.pg1.get_capture(1)
3165         p = capture[0]
3166         try:
3167             ip = p[IP]
3168             tcp = p[TCP]
3169             self.assertEqual(ip.src, self.nat_addr)
3170             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3171             self.assertEqual(tcp.sport, external_port)
3172             self.assertEqual(tcp.dport, eh_port_out)
3173             self.check_tcp_checksum(p)
3174             self.check_ip_checksum(p)
3175         except:
3176             self.logger.error(ppp("Unexpected or invalid packet:", p))
3177             raise
3178
3179     def test_twice_nat_interface_addr(self):
3180         """ Acquire twice NAT44 addresses from interface """
3181         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3182
3183         # no address in NAT pool
3184         adresses = self.vapi.nat44_address_dump()
3185         self.assertEqual(0, len(adresses))
3186
3187         # configure interface address and check NAT address pool
3188         self.pg7.config_ip4()
3189         adresses = self.vapi.nat44_address_dump()
3190         self.assertEqual(1, len(adresses))
3191         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3192         self.assertEqual(adresses[0].twice_nat, 1)
3193
3194         # remove interface address and check NAT address pool
3195         self.pg7.unconfig_ip4()
3196         adresses = self.vapi.nat44_address_dump()
3197         self.assertEqual(0, len(adresses))
3198
3199     def tearDown(self):
3200         super(TestNAT44, self).tearDown()
3201         if not self.vpp_dead:
3202             self.logger.info(self.vapi.cli("show nat44 verbose"))
3203             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3204             self.vapi.cli("nat addr-port-assignment-alg default")
3205             self.clear_nat44()
3206
3207
3208 class TestNAT44Out2InDPO(MethodHolder):
3209     """ NAT44 Test Cases using out2in DPO """
3210
3211     @classmethod
3212     def setUpConstants(cls):
3213         super(TestNAT44Out2InDPO, cls).setUpConstants()
3214         cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3215
3216     @classmethod
3217     def setUpClass(cls):
3218         super(TestNAT44Out2InDPO, cls).setUpClass()
3219
3220         try:
3221             cls.tcp_port_in = 6303
3222             cls.tcp_port_out = 6303
3223             cls.udp_port_in = 6304
3224             cls.udp_port_out = 6304
3225             cls.icmp_id_in = 6305
3226             cls.icmp_id_out = 6305
3227             cls.nat_addr = '10.0.0.3'
3228             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3229             cls.dst_ip4 = '192.168.70.1'
3230
3231             cls.create_pg_interfaces(range(2))
3232
3233             cls.pg0.admin_up()
3234             cls.pg0.config_ip4()
3235             cls.pg0.resolve_arp()
3236
3237             cls.pg1.admin_up()
3238             cls.pg1.config_ip6()
3239             cls.pg1.resolve_ndp()
3240
3241             cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3242                                       dst_address_length=0,
3243                                       next_hop_address=cls.pg1.remote_ip6n,
3244                                       next_hop_sw_if_index=cls.pg1.sw_if_index)
3245
3246         except Exception:
3247             super(TestNAT44Out2InDPO, cls).tearDownClass()
3248             raise
3249
3250     def configure_xlat(self):
3251         self.dst_ip6_pfx = '1:2:3::'
3252         self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3253                                               self.dst_ip6_pfx)
3254         self.dst_ip6_pfx_len = 96
3255         self.src_ip6_pfx = '4:5:6::'
3256         self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3257                                               self.src_ip6_pfx)
3258         self.src_ip6_pfx_len = 96
3259         self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3260                                  self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3261                                  '\x00\x00\x00\x00', 0, is_translation=1,
3262                                  is_rfc6052=1)
3263
3264     def test_464xlat_ce(self):
3265         """ Test 464XLAT CE with NAT44 """
3266
3267         self.configure_xlat()
3268
3269         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3270         self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
3271
3272         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3273                                        self.dst_ip6_pfx_len)
3274         out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3275                                        self.src_ip6_pfx_len)
3276
3277         try:
3278             pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3279             self.pg0.add_stream(pkts)
3280             self.pg_enable_capture(self.pg_interfaces)
3281             self.pg_start()
3282             capture = self.pg1.get_capture(len(pkts))
3283             self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3284                                         dst_ip=out_src_ip6)
3285
3286             pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3287                                               out_dst_ip6)
3288             self.pg1.add_stream(pkts)
3289             self.pg_enable_capture(self.pg_interfaces)
3290             self.pg_start()
3291             capture = self.pg0.get_capture(len(pkts))
3292             self.verify_capture_in(capture, self.pg0)
3293         finally:
3294             self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3295                                                       is_add=0)
3296             self.vapi.nat44_add_del_address_range(self.nat_addr_n,
3297                                                   self.nat_addr_n, is_add=0)
3298
3299     def test_464xlat_ce_no_nat(self):
3300         """ Test 464XLAT CE without NAT44 """
3301
3302         self.configure_xlat()
3303
3304         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3305                                        self.dst_ip6_pfx_len)
3306         out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3307                                        self.src_ip6_pfx_len)
3308
3309         pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3310         self.pg0.add_stream(pkts)
3311         self.pg_enable_capture(self.pg_interfaces)
3312         self.pg_start()
3313         capture = self.pg1.get_capture(len(pkts))
3314         self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3315                                     nat_ip=out_dst_ip6, same_port=True)
3316
3317         pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3318         self.pg1.add_stream(pkts)
3319         self.pg_enable_capture(self.pg_interfaces)
3320         self.pg_start()
3321         capture = self.pg0.get_capture(len(pkts))
3322         self.verify_capture_in(capture, self.pg0)
3323
3324
3325 class TestDeterministicNAT(MethodHolder):
3326     """ Deterministic NAT Test Cases """
3327
3328     @classmethod
3329     def setUpConstants(cls):
3330         super(TestDeterministicNAT, cls).setUpConstants()
3331         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3332
3333     @classmethod
3334     def setUpClass(cls):
3335         super(TestDeterministicNAT, cls).setUpClass()
3336
3337         try:
3338             cls.tcp_port_in = 6303
3339             cls.tcp_external_port = 6303
3340             cls.udp_port_in = 6304
3341             cls.udp_external_port = 6304
3342             cls.icmp_id_in = 6305
3343             cls.nat_addr = '10.0.0.3'
3344
3345             cls.create_pg_interfaces(range(3))
3346             cls.interfaces = list(cls.pg_interfaces)
3347
3348             for i in cls.interfaces:
3349                 i.admin_up()
3350                 i.config_ip4()
3351                 i.resolve_arp()
3352
3353             cls.pg0.generate_remote_hosts(2)
3354             cls.pg0.configure_ipv4_neighbors()
3355
3356         except Exception:
3357             super(TestDeterministicNAT, cls).tearDownClass()
3358             raise
3359
3360     def create_stream_in(self, in_if, out_if, ttl=64):
3361         """
3362         Create packet stream for inside network
3363
3364         :param in_if: Inside interface
3365         :param out_if: Outside interface
3366         :param ttl: TTL of generated packets
3367         """
3368         pkts = []
3369         # TCP
3370         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3371              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3372              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3373         pkts.append(p)
3374
3375         # UDP
3376         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3377              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3378              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3379         pkts.append(p)
3380
3381         # ICMP
3382         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3383              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3384              ICMP(id=self.icmp_id_in, type='echo-request'))
3385         pkts.append(p)
3386
3387         return pkts
3388
3389     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3390         """
3391         Create packet stream for outside network
3392
3393         :param out_if: Outside interface
3394         :param dst_ip: Destination IP address (Default use global NAT address)
3395         :param ttl: TTL of generated packets
3396         """
3397         if dst_ip is None:
3398             dst_ip = self.nat_addr
3399         pkts = []
3400         # TCP
3401         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3402              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3403              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3404         pkts.append(p)
3405
3406         # UDP
3407         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3408              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3409              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3410         pkts.append(p)
3411
3412         # ICMP
3413         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3414              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3415              ICMP(id=self.icmp_external_id, type='echo-reply'))
3416         pkts.append(p)
3417
3418         return pkts
3419
3420     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
3421         """
3422         Verify captured packets on outside network
3423
3424         :param capture: Captured packets
3425         :param nat_ip: Translated IP address (Default use global NAT address)
3426         :param same_port: Sorce port number is not translated (Default False)
3427         :param packet_num: Expected number of packets (Default 3)
3428         """
3429         if nat_ip is None:
3430             nat_ip = self.nat_addr
3431         self.assertEqual(packet_num, len(capture))
3432         for packet in capture:
3433             try:
3434                 self.assertEqual(packet[IP].src, nat_ip)
3435                 if packet.haslayer(TCP):
3436                     self.tcp_port_out = packet[TCP].sport
3437                 elif packet.haslayer(UDP):
3438                     self.udp_port_out = packet[UDP].sport
3439                 else:
3440                     self.icmp_external_id = packet[ICMP].id
3441             except:
3442                 self.logger.error(ppp("Unexpected or invalid packet "
3443                                       "(outside network):", packet))
3444                 raise
3445
3446     def initiate_tcp_session(self, in_if, out_if):
3447         """
3448         Initiates TCP session
3449
3450         :param in_if: Inside interface
3451         :param out_if: Outside interface
3452         """
3453         try:
3454             # SYN packet in->out
3455             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3456                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3457                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3458                      flags="S"))
3459             in_if.add_stream(p)
3460             self.pg_enable_capture(self.pg_interfaces)
3461             self.pg_start()
3462             capture = out_if.get_capture(1)
3463             p = capture[0]
3464             self.tcp_port_out = p[TCP].sport
3465
3466             # SYN + ACK packet out->in
3467             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
3468                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
3469                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3470                      flags="SA"))
3471             out_if.add_stream(p)
3472             self.pg_enable_capture(self.pg_interfaces)
3473             self.pg_start()
3474             in_if.get_capture(1)
3475
3476             # ACK packet in->out
3477             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3478                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3479                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3480                      flags="A"))
3481             in_if.add_stream(p)
3482             self.pg_enable_capture(self.pg_interfaces)
3483             self.pg_start()
3484             out_if.get_capture(1)
3485
3486         except:
3487             self.logger.error("TCP 3 way handshake failed")
3488             raise
3489
3490     def verify_ipfix_max_entries_per_user(self, data):
3491         """
3492         Verify IPFIX maximum entries per user exceeded event
3493
3494         :param data: Decoded IPFIX data records
3495         """
3496         self.assertEqual(1, len(data))
3497         record = data[0]
3498         # natEvent
3499         self.assertEqual(ord(record[230]), 13)
3500         # natQuotaExceededEvent
3501         self.assertEqual('\x03\x00\x00\x00', record[466])
3502         # sourceIPv4Address
3503         self.assertEqual(self.pg0.remote_ip4n, record[8])
3504
3505     def test_deterministic_mode(self):
3506         """ NAT plugin run deterministic mode """
3507         in_addr = '172.16.255.0'
3508         out_addr = '172.17.255.50'
3509         in_addr_t = '172.16.255.20'
3510         in_addr_n = socket.inet_aton(in_addr)
3511         out_addr_n = socket.inet_aton(out_addr)
3512         in_addr_t_n = socket.inet_aton(in_addr_t)
3513         in_plen = 24
3514         out_plen = 32
3515
3516         nat_config = self.vapi.nat_show_config()
3517         self.assertEqual(1, nat_config.deterministic)
3518
3519         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
3520
3521         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
3522         self.assertEqual(rep1.out_addr[:4], out_addr_n)
3523         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
3524         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
3525
3526         deterministic_mappings = self.vapi.nat_det_map_dump()
3527         self.assertEqual(len(deterministic_mappings), 1)
3528         dsm = deterministic_mappings[0]
3529         self.assertEqual(in_addr_n, dsm.in_addr[:4])
3530         self.assertEqual(in_plen, dsm.in_plen)
3531         self.assertEqual(out_addr_n, dsm.out_addr[:4])
3532         self.assertEqual(out_plen, dsm.out_plen)
3533
3534         self.clear_nat_det()
3535         deterministic_mappings = self.vapi.nat_det_map_dump()
3536         self.assertEqual(len(deterministic_mappings), 0)
3537
3538     def test_set_timeouts(self):
3539         """ Set deterministic NAT timeouts """
3540         timeouts_before = self.vapi.nat_det_get_timeouts()
3541
3542         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
3543                                        timeouts_before.tcp_established + 10,
3544                                        timeouts_before.tcp_transitory + 10,
3545                                        timeouts_before.icmp + 10)
3546
3547         timeouts_after = self.vapi.nat_det_get_timeouts()
3548
3549         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
3550         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
3551         self.assertNotEqual(timeouts_before.tcp_established,
3552                             timeouts_after.tcp_established)
3553         self.assertNotEqual(timeouts_before.tcp_transitory,
3554                             timeouts_after.tcp_transitory)
3555
3556     def test_det_in(self):
3557         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
3558
3559         nat_ip = "10.0.0.10"
3560
3561         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3562                                       32,
3563                                       socket.inet_aton(nat_ip),
3564                                       32)
3565         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3566         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3567                                                   is_inside=0)
3568
3569         # in2out
3570         pkts = self.create_stream_in(self.pg0, self.pg1)
3571         self.pg0.add_stream(pkts)
3572         self.pg_enable_capture(self.pg_interfaces)
3573         self.pg_start()
3574         capture = self.pg1.get_capture(len(pkts))
3575         self.verify_capture_out(capture, nat_ip)
3576
3577         # out2in
3578         pkts = self.create_stream_out(self.pg1, nat_ip)
3579         self.pg1.add_stream(pkts)
3580         self.pg_enable_capture(self.pg_interfaces)
3581         self.pg_start()
3582         capture = self.pg0.get_capture(len(pkts))
3583         self.verify_capture_in(capture, self.pg0)
3584
3585         # session dump test
3586         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
3587         self.assertEqual(len(sessions), 3)
3588
3589         # TCP session
3590         s = sessions[0]
3591         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3592         self.assertEqual(s.in_port, self.tcp_port_in)
3593         self.assertEqual(s.out_port, self.tcp_port_out)
3594         self.assertEqual(s.ext_port, self.tcp_external_port)
3595
3596         # UDP session
3597         s = sessions[1]
3598         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3599         self.assertEqual(s.in_port, self.udp_port_in)
3600         self.assertEqual(s.out_port, self.udp_port_out)
3601         self.assertEqual(s.ext_port, self.udp_external_port)
3602
3603         # ICMP session
3604         s = sessions[2]
3605         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3606         self.assertEqual(s.in_port, self.icmp_id_in)
3607         self.assertEqual(s.out_port, self.icmp_external_id)
3608
3609     def test_multiple_users(self):
3610         """ Deterministic NAT multiple users """
3611
3612         nat_ip = "10.0.0.10"
3613         port_in = 80
3614         external_port = 6303
3615
3616         host0 = self.pg0.remote_hosts[0]
3617         host1 = self.pg0.remote_hosts[1]
3618
3619         self.vapi.nat_det_add_del_map(host0.ip4n,
3620                                       24,
3621                                       socket.inet_aton(nat_ip),
3622                                       32)
3623         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3624         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3625                                                   is_inside=0)
3626
3627         # host0 to out
3628         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
3629              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
3630              TCP(sport=port_in, dport=external_port))
3631         self.pg0.add_stream(p)
3632         self.pg_enable_capture(self.pg_interfaces)
3633         self.pg_start()
3634         capture = self.pg1.get_capture(1)
3635         p = capture[0]
3636         try:
3637             ip = p[IP]
3638             tcp = p[TCP]
3639             self.assertEqual(ip.src, nat_ip)
3640             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3641             self.assertEqual(tcp.dport, external_port)
3642             port_out0 = tcp.sport
3643         except:
3644             self.logger.error(ppp("Unexpected or invalid packet:", p))
3645             raise
3646
3647         # host1 to out
3648         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
3649              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
3650              TCP(sport=port_in, dport=external_port))
3651         self.pg0.add_stream(p)
3652         self.pg_enable_capture(self.pg_interfaces)
3653         self.pg_start()
3654         capture = self.pg1.get_capture(1)
3655         p = capture[0]
3656         try:
3657             ip = p[IP]
3658             tcp = p[TCP]
3659             self.assertEqual(ip.src, nat_ip)
3660             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3661             self.assertEqual(tcp.dport, external_port)
3662             port_out1 = tcp.sport
3663         except:
3664             self.logger.error(ppp("Unexpected or invalid packet:", p))
3665             raise
3666
3667         dms = self.vapi.nat_det_map_dump()
3668         self.assertEqual(1, len(dms))
3669         self.assertEqual(2, dms[0].ses_num)
3670
3671         # out to host0
3672         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3673              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3674              TCP(sport=external_port, dport=port_out0))
3675         self.pg1.add_stream(p)
3676         self.pg_enable_capture(self.pg_interfaces)
3677         self.pg_start()
3678         capture = self.pg0.get_capture(1)
3679         p = capture[0]
3680         try:
3681             ip = p[IP]
3682             tcp = p[TCP]
3683             self.assertEqual(ip.src, self.pg1.remote_ip4)
3684             self.assertEqual(ip.dst, host0.ip4)
3685             self.assertEqual(tcp.dport, port_in)
3686             self.assertEqual(tcp.sport, external_port)
3687         except:
3688             self.logger.error(ppp("Unexpected or invalid packet:", p))
3689             raise
3690
3691         # out to host1
3692         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3693              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3694              TCP(sport=external_port, dport=port_out1))
3695         self.pg1.add_stream(p)
3696         self.pg_enable_capture(self.pg_interfaces)
3697         self.pg_start()
3698         capture = self.pg0.get_capture(1)
3699         p = capture[0]
3700         try:
3701             ip = p[IP]
3702             tcp = p[TCP]
3703             self.assertEqual(ip.src, self.pg1.remote_ip4)
3704             self.assertEqual(ip.dst, host1.ip4)
3705             self.assertEqual(tcp.dport, port_in)
3706             self.assertEqual(tcp.sport, external_port)
3707         except:
3708             self.logger.error(ppp("Unexpected or invalid packet", p))
3709             raise
3710
3711         # session close api test
3712         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
3713                                             port_out1,
3714                                             self.pg1.remote_ip4n,
3715                                             external_port)
3716         dms = self.vapi.nat_det_map_dump()
3717         self.assertEqual(dms[0].ses_num, 1)
3718
3719         self.vapi.nat_det_close_session_in(host0.ip4n,
3720                                            port_in,
3721                                            self.pg1.remote_ip4n,
3722                                            external_port)
3723         dms = self.vapi.nat_det_map_dump()
3724         self.assertEqual(dms[0].ses_num, 0)
3725
3726     def test_tcp_session_close_detection_in(self):
3727         """ Deterministic NAT TCP session close from inside network """
3728         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3729                                       32,
3730                                       socket.inet_aton(self.nat_addr),
3731                                       32)
3732         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3733         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3734                                                   is_inside=0)
3735
3736         self.initiate_tcp_session(self.pg0, self.pg1)
3737
3738         # close the session from inside
3739         try:
3740             # FIN packet in -> out
3741             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3742                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3743                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3744                      flags="F"))
3745             self.pg0.add_stream(p)
3746             self.pg_enable_capture(self.pg_interfaces)
3747             self.pg_start()
3748             self.pg1.get_capture(1)
3749
3750             pkts = []
3751
3752             # ACK packet out -> in
3753             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3754                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3755                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3756                      flags="A"))
3757             pkts.append(p)
3758
3759             # FIN packet out -> in
3760             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3761                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3762                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3763                      flags="F"))
3764             pkts.append(p)
3765
3766             self.pg1.add_stream(pkts)
3767             self.pg_enable_capture(self.pg_interfaces)
3768             self.pg_start()
3769             self.pg0.get_capture(2)
3770
3771             # ACK packet in -> out
3772             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3773                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3774                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3775                      flags="A"))
3776             self.pg0.add_stream(p)
3777             self.pg_enable_capture(self.pg_interfaces)
3778             self.pg_start()
3779             self.pg1.get_capture(1)
3780
3781             # Check if deterministic NAT44 closed the session
3782             dms = self.vapi.nat_det_map_dump()
3783             self.assertEqual(0, dms[0].ses_num)
3784         except:
3785             self.logger.error("TCP session termination failed")
3786             raise
3787
3788     def test_tcp_session_close_detection_out(self):
3789         """ Deterministic NAT TCP session close from outside network """
3790         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3791                                       32,
3792                                       socket.inet_aton(self.nat_addr),
3793                                       32)
3794         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3795         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3796                                                   is_inside=0)
3797
3798         self.initiate_tcp_session(self.pg0, self.pg1)
3799
3800         # close the session from outside
3801         try:
3802             # FIN packet out -> in
3803             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3804                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3805                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3806                      flags="F"))
3807             self.pg1.add_stream(p)
3808             self.pg_enable_capture(self.pg_interfaces)
3809             self.pg_start()
3810             self.pg0.get_capture(1)
3811
3812             pkts = []
3813
3814             # ACK packet in -> out
3815             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3816                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3817                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3818                      flags="A"))
3819             pkts.append(p)
3820
3821             # ACK packet in -> out
3822             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3823                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3824                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3825                      flags="F"))
3826             pkts.append(p)
3827
3828             self.pg0.add_stream(pkts)
3829             self.pg_enable_capture(self.pg_interfaces)
3830             self.pg_start()
3831             self.pg1.get_capture(2)
3832
3833             # ACK packet out -> in
3834             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3835                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3836                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3837                      flags="A"))
3838             self.pg1.add_stream(p)
3839             self.pg_enable_capture(self.pg_interfaces)
3840             self.pg_start()
3841             self.pg0.get_capture(1)
3842
3843             # Check if deterministic NAT44 closed the session
3844             dms = self.vapi.nat_det_map_dump()
3845             self.assertEqual(0, dms[0].ses_num)
3846         except:
3847             self.logger.error("TCP session termination failed")
3848             raise
3849
3850     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3851     def test_session_timeout(self):
3852         """ Deterministic NAT session timeouts """
3853         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3854                                       32,
3855                                       socket.inet_aton(self.nat_addr),
3856                                       32)
3857         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3858         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3859                                                   is_inside=0)
3860
3861         self.initiate_tcp_session(self.pg0, self.pg1)
3862         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
3863         pkts = self.create_stream_in(self.pg0, self.pg1)
3864         self.pg0.add_stream(pkts)
3865         self.pg_enable_capture(self.pg_interfaces)
3866         self.pg_start()
3867         capture = self.pg1.get_capture(len(pkts))
3868         sleep(15)
3869
3870         dms = self.vapi.nat_det_map_dump()
3871         self.assertEqual(0, dms[0].ses_num)
3872
3873     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3874     def test_session_limit_per_user(self):
3875         """ Deterministic NAT maximum sessions per user limit """
3876         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3877                                       32,
3878                                       socket.inet_aton(self.nat_addr),
3879                                       32)
3880         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3881         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3882                                                   is_inside=0)
3883         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3884                                      src_address=self.pg2.local_ip4n,
3885                                      path_mtu=512,
3886                                      template_interval=10)
3887         self.vapi.nat_ipfix()
3888
3889         pkts = []
3890         for port in range(1025, 2025):
3891             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3892                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3893                  UDP(sport=port, dport=port))
3894             pkts.append(p)
3895
3896         self.pg0.add_stream(pkts)
3897         self.pg_enable_capture(self.pg_interfaces)
3898         self.pg_start()
3899         capture = self.pg1.get_capture(len(pkts))
3900
3901         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3902              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3903              UDP(sport=3001, dport=3002))
3904         self.pg0.add_stream(p)
3905         self.pg_enable_capture(self.pg_interfaces)
3906         self.pg_start()
3907         capture = self.pg1.assert_nothing_captured()
3908
3909         # verify ICMP error packet
3910         capture = self.pg0.get_capture(1)
3911         p = capture[0]
3912         self.assertTrue(p.haslayer(ICMP))
3913         icmp = p[ICMP]
3914         self.assertEqual(icmp.type, 3)
3915         self.assertEqual(icmp.code, 1)
3916         self.assertTrue(icmp.haslayer(IPerror))
3917         inner_ip = icmp[IPerror]
3918         self.assertEqual(inner_ip[UDPerror].sport, 3001)
3919         self.assertEqual(inner_ip[UDPerror].dport, 3002)
3920
3921         dms = self.vapi.nat_det_map_dump()
3922
3923         self.assertEqual(1000, dms[0].ses_num)
3924
3925         # verify IPFIX logging
3926         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
3927         sleep(1)
3928         capture = self.pg2.get_capture(2)
3929         ipfix = IPFIXDecoder()
3930         # first load template
3931         for p in capture:
3932             self.assertTrue(p.haslayer(IPFIX))
3933             if p.haslayer(Template):
3934                 ipfix.add_template(p.getlayer(Template))
3935         # verify events in data set
3936         for p in capture:
3937             if p.haslayer(Data):
3938                 data = ipfix.decode_data_set(p.getlayer(Set))
3939                 self.verify_ipfix_max_entries_per_user(data)
3940
3941     def clear_nat_det(self):
3942         """
3943         Clear deterministic NAT configuration.
3944         """
3945         self.vapi.nat_ipfix(enable=0)
3946         self.vapi.nat_det_set_timeouts()
3947         deterministic_mappings = self.vapi.nat_det_map_dump()
3948         for dsm in deterministic_mappings:
3949             self.vapi.nat_det_add_del_map(dsm.in_addr,
3950                                           dsm.in_plen,
3951                                           dsm.out_addr,
3952                                           dsm.out_plen,
3953                                           is_add=0)
3954
3955         interfaces = self.vapi.nat44_interface_dump()
3956         for intf in interfaces:
3957             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3958                                                       intf.is_inside,
3959                                                       is_add=0)
3960
3961     def tearDown(self):
3962         super(TestDeterministicNAT, self).tearDown()
3963         if not self.vpp_dead:
3964             self.logger.info(self.vapi.cli("show nat44 detail"))
3965             self.clear_nat_det()
3966
3967
3968 class TestNAT64(MethodHolder):
3969     """ NAT64 Test Cases """
3970
3971     @classmethod
3972     def setUpClass(cls):
3973         super(TestNAT64, cls).setUpClass()
3974
3975         try:
3976             cls.tcp_port_in = 6303
3977             cls.tcp_port_out = 6303
3978             cls.udp_port_in = 6304
3979             cls.udp_port_out = 6304
3980             cls.icmp_id_in = 6305
3981             cls.icmp_id_out = 6305
3982             cls.nat_addr = '10.0.0.3'
3983             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3984             cls.vrf1_id = 10
3985             cls.vrf1_nat_addr = '10.0.10.3'
3986             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3987                                                    cls.vrf1_nat_addr)
3988
3989             cls.create_pg_interfaces(range(5))
3990             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3991             cls.ip6_interfaces.append(cls.pg_interfaces[2])
3992             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3993
3994             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3995
3996             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3997
3998             cls.pg0.generate_remote_hosts(2)
3999
4000             for i in cls.ip6_interfaces:
4001                 i.admin_up()
4002                 i.config_ip6()
4003                 i.configure_ipv6_neighbors()
4004
4005             for i in cls.ip4_interfaces:
4006                 i.admin_up()
4007                 i.config_ip4()
4008                 i.resolve_arp()
4009
4010             cls.pg3.admin_up()
4011             cls.pg3.config_ip4()
4012             cls.pg3.resolve_arp()
4013             cls.pg3.config_ip6()
4014             cls.pg3.configure_ipv6_neighbors()
4015
4016         except Exception:
4017             super(TestNAT64, cls).tearDownClass()
4018             raise
4019
4020     def test_pool(self):
4021         """ Add/delete address to NAT64 pool """
4022         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4023
4024         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4025
4026         addresses = self.vapi.nat64_pool_addr_dump()
4027         self.assertEqual(len(addresses), 1)
4028         self.assertEqual(addresses[0].address, nat_addr)
4029
4030         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4031
4032         addresses = self.vapi.nat64_pool_addr_dump()
4033         self.assertEqual(len(addresses), 0)
4034
4035     def test_interface(self):
4036         """ Enable/disable NAT64 feature on the interface """
4037         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4038         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4039
4040         interfaces = self.vapi.nat64_interface_dump()
4041         self.assertEqual(len(interfaces), 2)
4042         pg0_found = False
4043         pg1_found = False
4044         for intf in interfaces:
4045             if intf.sw_if_index == self.pg0.sw_if_index:
4046                 self.assertEqual(intf.is_inside, 1)
4047                 pg0_found = True
4048             elif intf.sw_if_index == self.pg1.sw_if_index:
4049                 self.assertEqual(intf.is_inside, 0)
4050                 pg1_found = True
4051         self.assertTrue(pg0_found)
4052         self.assertTrue(pg1_found)
4053
4054         features = self.vapi.cli("show interface features pg0")
4055         self.assertNotEqual(features.find('nat64-in2out'), -1)
4056         features = self.vapi.cli("show interface features pg1")
4057         self.assertNotEqual(features.find('nat64-out2in'), -1)
4058
4059         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4060         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4061
4062         interfaces = self.vapi.nat64_interface_dump()
4063         self.assertEqual(len(interfaces), 0)
4064
4065     def test_static_bib(self):
4066         """ Add/delete static BIB entry """
4067         in_addr = socket.inet_pton(socket.AF_INET6,
4068                                    '2001:db8:85a3::8a2e:370:7334')
4069         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4070         in_port = 1234
4071         out_port = 5678
4072         proto = IP_PROTOS.tcp
4073
4074         self.vapi.nat64_add_del_static_bib(in_addr,
4075                                            out_addr,
4076                                            in_port,
4077                                            out_port,
4078                                            proto)
4079         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4080         static_bib_num = 0
4081         for bibe in bib:
4082             if bibe.is_static:
4083                 static_bib_num += 1
4084                 self.assertEqual(bibe.i_addr, in_addr)
4085                 self.assertEqual(bibe.o_addr, out_addr)
4086                 self.assertEqual(bibe.i_port, in_port)
4087                 self.assertEqual(bibe.o_port, out_port)
4088         self.assertEqual(static_bib_num, 1)
4089
4090         self.vapi.nat64_add_del_static_bib(in_addr,
4091                                            out_addr,
4092                                            in_port,
4093                                            out_port,
4094                                            proto,
4095                                            is_add=0)
4096         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4097         static_bib_num = 0
4098         for bibe in bib:
4099             if bibe.is_static:
4100                 static_bib_num += 1
4101         self.assertEqual(static_bib_num, 0)
4102
4103     def test_set_timeouts(self):
4104         """ Set NAT64 timeouts """
4105         # verify default values
4106         timeouts = self.vapi.nat64_get_timeouts()
4107         self.assertEqual(timeouts.udp, 300)
4108         self.assertEqual(timeouts.icmp, 60)
4109         self.assertEqual(timeouts.tcp_trans, 240)
4110         self.assertEqual(timeouts.tcp_est, 7440)
4111         self.assertEqual(timeouts.tcp_incoming_syn, 6)
4112
4113         # set and verify custom values
4114         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4115                                      tcp_est=7450, tcp_incoming_syn=10)
4116         timeouts = self.vapi.nat64_get_timeouts()
4117         self.assertEqual(timeouts.udp, 200)
4118         self.assertEqual(timeouts.icmp, 30)
4119         self.assertEqual(timeouts.tcp_trans, 250)
4120         self.assertEqual(timeouts.tcp_est, 7450)
4121         self.assertEqual(timeouts.tcp_incoming_syn, 10)
4122
4123     def test_dynamic(self):
4124         """ NAT64 dynamic translation test """
4125         self.tcp_port_in = 6303
4126         self.udp_port_in = 6304
4127         self.icmp_id_in = 6305
4128
4129         ses_num_start = self.nat64_get_ses_num()
4130
4131         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4132                                                 self.nat_addr_n)
4133         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4134         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4135
4136         # in2out
4137         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4138         self.pg0.add_stream(pkts)
4139         self.pg_enable_capture(self.pg_interfaces)
4140         self.pg_start()
4141         capture = self.pg1.get_capture(len(pkts))
4142         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4143                                 dst_ip=self.pg1.remote_ip4)
4144
4145         # out2in
4146         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4147         self.pg1.add_stream(pkts)
4148         self.pg_enable_capture(self.pg_interfaces)
4149         self.pg_start()
4150         capture = self.pg0.get_capture(len(pkts))
4151         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4152         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4153
4154         # in2out
4155         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4156         self.pg0.add_stream(pkts)
4157         self.pg_enable_capture(self.pg_interfaces)
4158         self.pg_start()
4159         capture = self.pg1.get_capture(len(pkts))
4160         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4161                                 dst_ip=self.pg1.remote_ip4)
4162
4163         # out2in
4164         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4165         self.pg1.add_stream(pkts)
4166         self.pg_enable_capture(self.pg_interfaces)
4167         self.pg_start()
4168         capture = self.pg0.get_capture(len(pkts))
4169         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4170
4171         ses_num_end = self.nat64_get_ses_num()
4172
4173         self.assertEqual(ses_num_end - ses_num_start, 3)
4174
4175         # tenant with specific VRF
4176         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4177                                                 self.vrf1_nat_addr_n,
4178                                                 vrf_id=self.vrf1_id)
4179         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4180
4181         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4182         self.pg2.add_stream(pkts)
4183         self.pg_enable_capture(self.pg_interfaces)
4184         self.pg_start()
4185         capture = self.pg1.get_capture(len(pkts))
4186         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4187                                 dst_ip=self.pg1.remote_ip4)
4188
4189         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4190         self.pg1.add_stream(pkts)
4191         self.pg_enable_capture(self.pg_interfaces)
4192         self.pg_start()
4193         capture = self.pg2.get_capture(len(pkts))
4194         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4195
4196     def test_static(self):
4197         """ NAT64 static translation test """
4198         self.tcp_port_in = 60303
4199         self.udp_port_in = 60304
4200         self.icmp_id_in = 60305
4201         self.tcp_port_out = 60303
4202         self.udp_port_out = 60304
4203         self.icmp_id_out = 60305
4204
4205         ses_num_start = self.nat64_get_ses_num()
4206
4207         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4208                                                 self.nat_addr_n)
4209         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4210         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4211
4212         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4213                                            self.nat_addr_n,
4214                                            self.tcp_port_in,
4215                                            self.tcp_port_out,
4216                                            IP_PROTOS.tcp)
4217         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4218                                            self.nat_addr_n,
4219                                            self.udp_port_in,
4220                                            self.udp_port_out,
4221                                            IP_PROTOS.udp)
4222         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4223                                            self.nat_addr_n,
4224                                            self.icmp_id_in,
4225                                            self.icmp_id_out,
4226                                            IP_PROTOS.icmp)
4227
4228         # in2out
4229         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4230         self.pg0.add_stream(pkts)
4231         self.pg_enable_capture(self.pg_interfaces)
4232         self.pg_start()
4233         capture = self.pg1.get_capture(len(pkts))
4234         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4235                                 dst_ip=self.pg1.remote_ip4, same_port=True)
4236
4237         # out2in
4238         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4239         self.pg1.add_stream(pkts)
4240         self.pg_enable_capture(self.pg_interfaces)
4241         self.pg_start()
4242         capture = self.pg0.get_capture(len(pkts))
4243         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4244         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4245
4246         ses_num_end = self.nat64_get_ses_num()
4247
4248         self.assertEqual(ses_num_end - ses_num_start, 3)
4249
4250     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4251     def test_session_timeout(self):
4252         """ NAT64 session timeout """
4253         self.icmp_id_in = 1234
4254         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4255                                                 self.nat_addr_n)
4256         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4257         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4258         self.vapi.nat64_set_timeouts(icmp=5)
4259
4260         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4261         self.pg0.add_stream(pkts)
4262         self.pg_enable_capture(self.pg_interfaces)
4263         self.pg_start()
4264         capture = self.pg1.get_capture(len(pkts))
4265
4266         ses_num_before_timeout = self.nat64_get_ses_num()
4267
4268         sleep(15)
4269
4270         # ICMP session after timeout
4271         ses_num_after_timeout = self.nat64_get_ses_num()
4272         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4273
4274     def test_icmp_error(self):
4275         """ NAT64 ICMP Error message translation """
4276         self.tcp_port_in = 6303
4277         self.udp_port_in = 6304
4278         self.icmp_id_in = 6305
4279
4280         ses_num_start = self.nat64_get_ses_num()
4281
4282         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4283                                                 self.nat_addr_n)
4284         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4285         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4286
4287         # send some packets to create sessions
4288         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4289         self.pg0.add_stream(pkts)
4290         self.pg_enable_capture(self.pg_interfaces)
4291         self.pg_start()
4292         capture_ip4 = self.pg1.get_capture(len(pkts))
4293         self.verify_capture_out(capture_ip4,
4294                                 nat_ip=self.nat_addr,
4295                                 dst_ip=self.pg1.remote_ip4)
4296
4297         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4298         self.pg1.add_stream(pkts)
4299         self.pg_enable_capture(self.pg_interfaces)
4300         self.pg_start()
4301         capture_ip6 = self.pg0.get_capture(len(pkts))
4302         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4303         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4304                                    self.pg0.remote_ip6)
4305
4306         # in2out
4307         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4308                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4309                 ICMPv6DestUnreach(code=1) /
4310                 packet[IPv6] for packet in capture_ip6]
4311         self.pg0.add_stream(pkts)
4312         self.pg_enable_capture(self.pg_interfaces)
4313         self.pg_start()
4314         capture = self.pg1.get_capture(len(pkts))
4315         for packet in capture:
4316             try:
4317                 self.assertEqual(packet[IP].src, self.nat_addr)
4318                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4319                 self.assertEqual(packet[ICMP].type, 3)
4320                 self.assertEqual(packet[ICMP].code, 13)
4321                 inner = packet[IPerror]
4322                 self.assertEqual(inner.src, self.pg1.remote_ip4)
4323                 self.assertEqual(inner.dst, self.nat_addr)
4324                 self.check_icmp_checksum(packet)
4325                 if inner.haslayer(TCPerror):
4326                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4327                 elif inner.haslayer(UDPerror):
4328                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4329                 else:
4330                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4331             except:
4332                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4333                 raise
4334
4335         # out2in
4336         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4337                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4338                 ICMP(type=3, code=13) /
4339                 packet[IP] for packet in capture_ip4]
4340         self.pg1.add_stream(pkts)
4341         self.pg_enable_capture(self.pg_interfaces)
4342         self.pg_start()
4343         capture = self.pg0.get_capture(len(pkts))
4344         for packet in capture:
4345             try:
4346                 self.assertEqual(packet[IPv6].src, ip.src)
4347                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4348                 icmp = packet[ICMPv6DestUnreach]
4349                 self.assertEqual(icmp.code, 1)
4350                 inner = icmp[IPerror6]
4351                 self.assertEqual(inner.src, self.pg0.remote_ip6)
4352                 self.assertEqual(inner.dst, ip.src)
4353                 self.check_icmpv6_checksum(packet)
4354                 if inner.haslayer(TCPerror):
4355                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4356                 elif inner.haslayer(UDPerror):
4357                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4358                 else:
4359                     self.assertEqual(inner[ICMPv6EchoRequest].id,
4360                                      self.icmp_id_in)
4361             except:
4362                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4363                 raise
4364
4365     def test_hairpinning(self):
4366         """ NAT64 hairpinning """
4367
4368         client = self.pg0.remote_hosts[0]
4369         server = self.pg0.remote_hosts[1]
4370         server_tcp_in_port = 22
4371         server_tcp_out_port = 4022
4372         server_udp_in_port = 23
4373         server_udp_out_port = 4023
4374         client_tcp_in_port = 1234
4375         client_udp_in_port = 1235
4376         client_tcp_out_port = 0
4377         client_udp_out_port = 0
4378         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4379         nat_addr_ip6 = ip.src
4380
4381         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4382                                                 self.nat_addr_n)
4383         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4384         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4385
4386         self.vapi.nat64_add_del_static_bib(server.ip6n,
4387                                            self.nat_addr_n,
4388                                            server_tcp_in_port,
4389                                            server_tcp_out_port,
4390                                            IP_PROTOS.tcp)
4391         self.vapi.nat64_add_del_static_bib(server.ip6n,
4392                                            self.nat_addr_n,
4393                                            server_udp_in_port,
4394                                            server_udp_out_port,
4395                                            IP_PROTOS.udp)
4396
4397         # client to server
4398         pkts = []
4399         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4400              IPv6(src=client.ip6, dst=nat_addr_ip6) /
4401              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4402         pkts.append(p)
4403         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4404              IPv6(src=client.ip6, dst=nat_addr_ip6) /
4405              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
4406         pkts.append(p)
4407         self.pg0.add_stream(pkts)
4408         self.pg_enable_capture(self.pg_interfaces)
4409         self.pg_start()
4410         capture = self.pg0.get_capture(len(pkts))
4411         for packet in capture:
4412             try:
4413                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4414                 self.assertEqual(packet[IPv6].dst, server.ip6)
4415                 if packet.haslayer(TCP):
4416                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
4417                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
4418                     self.check_tcp_checksum(packet)
4419                     client_tcp_out_port = packet[TCP].sport
4420                 else:
4421                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
4422                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
4423                     self.check_udp_checksum(packet)
4424                     client_udp_out_port = packet[UDP].sport
4425             except:
4426                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4427                 raise
4428
4429         # server to client
4430         pkts = []
4431         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4432              IPv6(src=server.ip6, dst=nat_addr_ip6) /
4433              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
4434         pkts.append(p)
4435         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4436              IPv6(src=server.ip6, dst=nat_addr_ip6) /
4437              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
4438         pkts.append(p)
4439         self.pg0.add_stream(pkts)
4440         self.pg_enable_capture(self.pg_interfaces)
4441         self.pg_start()
4442         capture = self.pg0.get_capture(len(pkts))
4443         for packet in capture:
4444             try:
4445                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4446                 self.assertEqual(packet[IPv6].dst, client.ip6)
4447                 if packet.haslayer(TCP):
4448                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
4449                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
4450                     self.check_tcp_checksum(packet)
4451                 else:
4452                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
4453                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
4454                     self.check_udp_checksum(packet)
4455             except:
4456                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4457                 raise
4458
4459         # ICMP error
4460         pkts = []
4461         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4462                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4463                 ICMPv6DestUnreach(code=1) /
4464                 packet[IPv6] for packet in capture]
4465         self.pg0.add_stream(pkts)
4466         self.pg_enable_capture(self.pg_interfaces)
4467         self.pg_start()
4468         capture = self.pg0.get_capture(len(pkts))
4469         for packet in capture:
4470             try:
4471                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4472                 self.assertEqual(packet[IPv6].dst, server.ip6)
4473                 icmp = packet[ICMPv6DestUnreach]
4474                 self.assertEqual(icmp.code, 1)
4475                 inner = icmp[IPerror6]
4476                 self.assertEqual(inner.src, server.ip6)
4477                 self.assertEqual(inner.dst, nat_addr_ip6)
4478                 self.check_icmpv6_checksum(packet)
4479                 if inner.haslayer(TCPerror):
4480                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
4481                     self.assertEqual(inner[TCPerror].dport,
4482                                      client_tcp_out_port)
4483                 else:
4484                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
4485                     self.assertEqual(inner[UDPerror].dport,
4486                                      client_udp_out_port)
4487             except:
4488                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4489                 raise
4490
4491     def test_prefix(self):
4492         """ NAT64 Network-Specific Prefix """
4493
4494         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4495                                                 self.nat_addr_n)
4496         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4497         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4498         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4499                                                 self.vrf1_nat_addr_n,
4500                                                 vrf_id=self.vrf1_id)
4501         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4502
4503         # Add global prefix
4504         global_pref64 = "2001:db8::"
4505         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
4506         global_pref64_len = 32
4507         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
4508
4509         prefix = self.vapi.nat64_prefix_dump()
4510         self.assertEqual(len(prefix), 1)
4511         self.assertEqual(prefix[0].prefix, global_pref64_n)
4512         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
4513         self.assertEqual(prefix[0].vrf_id, 0)
4514
4515         # Add tenant specific prefix
4516         vrf1_pref64 = "2001:db8:122:300::"
4517         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
4518         vrf1_pref64_len = 56
4519         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
4520                                        vrf1_pref64_len,
4521                                        vrf_id=self.vrf1_id)
4522         prefix = self.vapi.nat64_prefix_dump()
4523         self.assertEqual(len(prefix), 2)
4524
4525         # Global prefix
4526         pkts = self.create_stream_in_ip6(self.pg0,
4527                                          self.pg1,
4528                                          pref=global_pref64,
4529                                          plen=global_pref64_len)
4530         self.pg0.add_stream(pkts)
4531         self.pg_enable_capture(self.pg_interfaces)
4532         self.pg_start()
4533         capture = self.pg1.get_capture(len(pkts))
4534         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4535                                 dst_ip=self.pg1.remote_ip4)
4536
4537         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4538         self.pg1.add_stream(pkts)
4539         self.pg_enable_capture(self.pg_interfaces)
4540         self.pg_start()
4541         capture = self.pg0.get_capture(len(pkts))
4542         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4543                                   global_pref64,
4544                                   global_pref64_len)
4545         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
4546
4547         # Tenant specific prefix
4548         pkts = self.create_stream_in_ip6(self.pg2,
4549                                          self.pg1,
4550                                          pref=vrf1_pref64,
4551                                          plen=vrf1_pref64_len)
4552         self.pg2.add_stream(pkts)
4553         self.pg_enable_capture(self.pg_interfaces)
4554         self.pg_start()
4555         capture = self.pg1.get_capture(len(pkts))
4556         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4557                                 dst_ip=self.pg1.remote_ip4)
4558
4559         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4560         self.pg1.add_stream(pkts)
4561         self.pg_enable_capture(self.pg_interfaces)
4562         self.pg_start()
4563         capture = self.pg2.get_capture(len(pkts))
4564         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4565                                   vrf1_pref64,
4566                                   vrf1_pref64_len)
4567         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
4568
4569     def test_unknown_proto(self):
4570         """ NAT64 translate packet with unknown protocol """
4571
4572         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4573                                                 self.nat_addr_n)
4574         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4575         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4576         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4577
4578         # in2out
4579         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4580              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
4581              TCP(sport=self.tcp_port_in, dport=20))
4582         self.pg0.add_stream(p)
4583         self.pg_enable_capture(self.pg_interfaces)
4584         self.pg_start()
4585         p = self.pg1.get_capture(1)
4586
4587         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4588              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
4589              GRE() /
4590              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4591              TCP(sport=1234, dport=1234))
4592         self.pg0.add_stream(p)
4593         self.pg_enable_capture(self.pg_interfaces)
4594         self.pg_start()
4595         p = self.pg1.get_capture(1)
4596         packet = p[0]
4597         try:
4598             self.assertEqual(packet[IP].src, self.nat_addr)
4599             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4600             self.assertTrue(packet.haslayer(GRE))
4601             self.check_ip_checksum(packet)
4602         except:
4603             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4604             raise
4605
4606         # out2in
4607         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4608              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4609              GRE() /
4610              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4611              TCP(sport=1234, dport=1234))
4612         self.pg1.add_stream(p)
4613         self.pg_enable_capture(self.pg_interfaces)
4614         self.pg_start()
4615         p = self.pg0.get_capture(1)
4616         packet = p[0]
4617         try:
4618             self.assertEqual(packet[IPv6].src, remote_ip6)
4619             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4620             self.assertEqual(packet[IPv6].nh, 47)
4621         except:
4622             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4623             raise
4624
4625     def test_hairpinning_unknown_proto(self):
4626         """ NAT64 translate packet with unknown protocol - hairpinning """
4627
4628         client = self.pg0.remote_hosts[0]
4629         server = self.pg0.remote_hosts[1]
4630         server_tcp_in_port = 22
4631         server_tcp_out_port = 4022
4632         client_tcp_in_port = 1234
4633         client_tcp_out_port = 1235
4634         server_nat_ip = "10.0.0.100"
4635         client_nat_ip = "10.0.0.110"
4636         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
4637         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
4638         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
4639         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
4640
4641         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
4642                                                 client_nat_ip_n)
4643         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4644         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4645
4646         self.vapi.nat64_add_del_static_bib(server.ip6n,
4647                                            server_nat_ip_n,
4648                                            server_tcp_in_port,
4649                                            server_tcp_out_port,
4650                                            IP_PROTOS.tcp)
4651
4652         self.vapi.nat64_add_del_static_bib(server.ip6n,
4653                                            server_nat_ip_n,
4654                                            0,
4655                                            0,
4656                                            IP_PROTOS.gre)
4657
4658         self.vapi.nat64_add_del_static_bib(client.ip6n,
4659                                            client_nat_ip_n,
4660                                            client_tcp_in_port,
4661                                            client_tcp_out_port,
4662                                            IP_PROTOS.tcp)
4663
4664         # client to server
4665         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4666              IPv6(src=client.ip6, dst=server_nat_ip6) /
4667              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4668         self.pg0.add_stream(p)
4669         self.pg_enable_capture(self.pg_interfaces)
4670         self.pg_start()
4671         p = self.pg0.get_capture(1)
4672
4673         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4674              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
4675              GRE() /
4676              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4677              TCP(sport=1234, dport=1234))
4678         self.pg0.add_stream(p)
4679         self.pg_enable_capture(self.pg_interfaces)
4680         self.pg_start()
4681         p = self.pg0.get_capture(1)
4682         packet = p[0]
4683         try:
4684             self.assertEqual(packet[IPv6].src, client_nat_ip6)
4685             self.assertEqual(packet[IPv6].dst, server.ip6)
4686             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4687         except:
4688             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4689             raise
4690
4691         # server to client
4692         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4693              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
4694              GRE() /
4695              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4696              TCP(sport=1234, dport=1234))
4697         self.pg0.add_stream(p)
4698         self.pg_enable_capture(self.pg_interfaces)
4699         self.pg_start()
4700         p = self.pg0.get_capture(1)
4701         packet = p[0]
4702         try:
4703             self.assertEqual(packet[IPv6].src, server_nat_ip6)
4704             self.assertEqual(packet[IPv6].dst, client.ip6)
4705             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4706         except:
4707             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4708             raise
4709
4710     def test_one_armed_nat64(self):
4711         """ One armed NAT64 """
4712         external_port = 0
4713         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
4714                                            '64:ff9b::',
4715                                            96)
4716
4717         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4718                                                 self.nat_addr_n)
4719         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
4720         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
4721
4722         # in2out
4723         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4724              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
4725              TCP(sport=12345, dport=80))
4726         self.pg3.add_stream(p)
4727         self.pg_enable_capture(self.pg_interfaces)
4728         self.pg_start()
4729         capture = self.pg3.get_capture(1)
4730         p = capture[0]
4731         try:
4732             ip = p[IP]
4733             tcp = p[TCP]
4734             self.assertEqual(ip.src, self.nat_addr)
4735             self.assertEqual(ip.dst, self.pg3.remote_ip4)
4736             self.assertNotEqual(tcp.sport, 12345)
4737             external_port = tcp.sport
4738             self.assertEqual(tcp.dport, 80)
4739             self.check_tcp_checksum(p)
4740             self.check_ip_checksum(p)
4741         except:
4742             self.logger.error(ppp("Unexpected or invalid packet:", p))
4743             raise
4744
4745         # out2in
4746         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4747              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
4748              TCP(sport=80, dport=external_port))
4749         self.pg3.add_stream(p)
4750         self.pg_enable_capture(self.pg_interfaces)
4751         self.pg_start()
4752         capture = self.pg3.get_capture(1)
4753         p = capture[0]
4754         try:
4755             ip = p[IPv6]
4756             tcp = p[TCP]
4757             self.assertEqual(ip.src, remote_host_ip6)
4758             self.assertEqual(ip.dst, self.pg3.remote_ip6)
4759             self.assertEqual(tcp.sport, 80)
4760             self.assertEqual(tcp.dport, 12345)
4761             self.check_tcp_checksum(p)
4762         except:
4763             self.logger.error(ppp("Unexpected or invalid packet:", p))
4764             raise
4765
4766     def test_frag_in_order(self):
4767         """ NAT64 translate fragments arriving in order """
4768         self.tcp_port_in = random.randint(1025, 65535)
4769
4770         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4771                                                 self.nat_addr_n)
4772         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4773         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4774
4775         reass = self.vapi.nat_reass_dump()
4776         reass_n_start = len(reass)
4777
4778         # in2out
4779         data = 'a' * 200
4780         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4781                                            self.tcp_port_in, 20, data)
4782         self.pg0.add_stream(pkts)
4783         self.pg_enable_capture(self.pg_interfaces)
4784         self.pg_start()
4785         frags = self.pg1.get_capture(len(pkts))
4786         p = self.reass_frags_and_verify(frags,
4787                                         self.nat_addr,
4788                                         self.pg1.remote_ip4)
4789         self.assertEqual(p[TCP].dport, 20)
4790         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4791         self.tcp_port_out = p[TCP].sport
4792         self.assertEqual(data, p[Raw].load)
4793
4794         # out2in
4795         data = "A" * 4 + "b" * 16 + "C" * 3
4796         pkts = self.create_stream_frag(self.pg1,
4797                                        self.nat_addr,
4798                                        20,
4799                                        self.tcp_port_out,
4800                                        data)
4801         self.pg1.add_stream(pkts)
4802         self.pg_enable_capture(self.pg_interfaces)
4803         self.pg_start()
4804         frags = self.pg0.get_capture(len(pkts))
4805         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4806         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4807         self.assertEqual(p[TCP].sport, 20)
4808         self.assertEqual(p[TCP].dport, self.tcp_port_in)
4809         self.assertEqual(data, p[Raw].load)
4810
4811         reass = self.vapi.nat_reass_dump()
4812         reass_n_end = len(reass)
4813
4814         self.assertEqual(reass_n_end - reass_n_start, 2)
4815
4816     def test_reass_hairpinning(self):
4817         """ NAT64 fragments hairpinning """
4818         data = 'a' * 200
4819         client = self.pg0.remote_hosts[0]
4820         server = self.pg0.remote_hosts[1]
4821         server_in_port = random.randint(1025, 65535)
4822         server_out_port = random.randint(1025, 65535)
4823         client_in_port = random.randint(1025, 65535)
4824         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4825         nat_addr_ip6 = ip.src
4826
4827         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4828                                                 self.nat_addr_n)
4829         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4830         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4831
4832         # add static BIB entry for server
4833         self.vapi.nat64_add_del_static_bib(server.ip6n,
4834                                            self.nat_addr_n,
4835                                            server_in_port,
4836                                            server_out_port,
4837                                            IP_PROTOS.tcp)
4838
4839         # send packet from host to server
4840         pkts = self.create_stream_frag_ip6(self.pg0,
4841                                            self.nat_addr,
4842                                            client_in_port,
4843                                            server_out_port,
4844                                            data)
4845         self.pg0.add_stream(pkts)
4846         self.pg_enable_capture(self.pg_interfaces)
4847         self.pg_start()
4848         frags = self.pg0.get_capture(len(pkts))
4849         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
4850         self.assertNotEqual(p[TCP].sport, client_in_port)
4851         self.assertEqual(p[TCP].dport, server_in_port)
4852         self.assertEqual(data, p[Raw].load)
4853
4854     def test_frag_out_of_order(self):
4855         """ NAT64 translate fragments arriving out of order """
4856         self.tcp_port_in = random.randint(1025, 65535)
4857
4858         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4859                                                 self.nat_addr_n)
4860         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4861         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4862
4863         # in2out
4864         data = 'a' * 200
4865         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4866                                            self.tcp_port_in, 20, data)
4867         pkts.reverse()
4868         self.pg0.add_stream(pkts)
4869         self.pg_enable_capture(self.pg_interfaces)
4870         self.pg_start()
4871         frags = self.pg1.get_capture(len(pkts))
4872         p = self.reass_frags_and_verify(frags,
4873                                         self.nat_addr,
4874                                         self.pg1.remote_ip4)
4875         self.assertEqual(p[TCP].dport, 20)
4876         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4877         self.tcp_port_out = p[TCP].sport
4878         self.assertEqual(data, p[Raw].load)
4879
4880         # out2in
4881         data = "A" * 4 + "B" * 16 + "C" * 3
4882         pkts = self.create_stream_frag(self.pg1,
4883                                        self.nat_addr,
4884                                        20,
4885                                        self.tcp_port_out,
4886                                        data)
4887         pkts.reverse()
4888         self.pg1.add_stream(pkts)
4889         self.pg_enable_capture(self.pg_interfaces)
4890         self.pg_start()
4891         frags = self.pg0.get_capture(len(pkts))
4892         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4893         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4894         self.assertEqual(p[TCP].sport, 20)
4895         self.assertEqual(p[TCP].dport, self.tcp_port_in)
4896         self.assertEqual(data, p[Raw].load)
4897
4898     def test_interface_addr(self):
4899         """ Acquire NAT64 pool addresses from interface """
4900         self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
4901
4902         # no address in NAT64 pool
4903         adresses = self.vapi.nat44_address_dump()
4904         self.assertEqual(0, len(adresses))
4905
4906         # configure interface address and check NAT64 address pool
4907         self.pg4.config_ip4()
4908         addresses = self.vapi.nat64_pool_addr_dump()
4909         self.assertEqual(len(addresses), 1)
4910         self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
4911
4912         # remove interface address and check NAT64 address pool
4913         self.pg4.unconfig_ip4()
4914         addresses = self.vapi.nat64_pool_addr_dump()
4915         self.assertEqual(0, len(adresses))
4916
4917     def nat64_get_ses_num(self):
4918         """
4919         Return number of active NAT64 sessions.
4920         """
4921         st = self.vapi.nat64_st_dump()
4922         return len(st)
4923
4924     def clear_nat64(self):
4925         """
4926         Clear NAT64 configuration.
4927         """
4928         self.vapi.nat64_set_timeouts()
4929
4930         interfaces = self.vapi.nat64_interface_dump()
4931         for intf in interfaces:
4932             if intf.is_inside > 1:
4933                 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4934                                                   0,
4935                                                   is_add=0)
4936             self.vapi.nat64_add_del_interface(intf.sw_if_index,
4937                                               intf.is_inside,
4938                                               is_add=0)
4939
4940         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4941         for bibe in bib:
4942             if bibe.is_static:
4943                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4944                                                    bibe.o_addr,
4945                                                    bibe.i_port,
4946                                                    bibe.o_port,
4947                                                    bibe.proto,
4948                                                    bibe.vrf_id,
4949                                                    is_add=0)
4950
4951         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
4952         for bibe in bib:
4953             if bibe.is_static:
4954                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4955                                                    bibe.o_addr,
4956                                                    bibe.i_port,
4957                                                    bibe.o_port,
4958                                                    bibe.proto,
4959                                                    bibe.vrf_id,
4960                                                    is_add=0)
4961
4962         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
4963         for bibe in bib:
4964             if bibe.is_static:
4965                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4966                                                    bibe.o_addr,
4967                                                    bibe.i_port,
4968                                                    bibe.o_port,
4969                                                    bibe.proto,
4970                                                    bibe.vrf_id,
4971                                                    is_add=0)
4972
4973         adresses = self.vapi.nat64_pool_addr_dump()
4974         for addr in adresses:
4975             self.vapi.nat64_add_del_pool_addr_range(addr.address,
4976                                                     addr.address,
4977                                                     vrf_id=addr.vrf_id,
4978                                                     is_add=0)
4979
4980         prefixes = self.vapi.nat64_prefix_dump()
4981         for prefix in prefixes:
4982             self.vapi.nat64_add_del_prefix(prefix.prefix,
4983                                            prefix.prefix_len,
4984                                            vrf_id=prefix.vrf_id,
4985                                            is_add=0)
4986
4987     def tearDown(self):
4988         super(TestNAT64, self).tearDown()
4989         if not self.vpp_dead:
4990             self.logger.info(self.vapi.cli("show nat64 pool"))
4991             self.logger.info(self.vapi.cli("show nat64 interfaces"))
4992             self.logger.info(self.vapi.cli("show nat64 prefix"))
4993             self.logger.info(self.vapi.cli("show nat64 bib all"))
4994             self.logger.info(self.vapi.cli("show nat64 session table all"))
4995             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4996             self.clear_nat64()
4997
4998
4999 class TestDSlite(MethodHolder):
5000     """ DS-Lite Test Cases """
5001
5002     @classmethod
5003     def setUpClass(cls):
5004         super(TestDSlite, cls).setUpClass()
5005
5006         try:
5007             cls.nat_addr = '10.0.0.3'
5008             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5009
5010             cls.create_pg_interfaces(range(2))
5011             cls.pg0.admin_up()
5012             cls.pg0.config_ip4()
5013             cls.pg0.resolve_arp()
5014             cls.pg1.admin_up()
5015             cls.pg1.config_ip6()
5016             cls.pg1.generate_remote_hosts(2)
5017             cls.pg1.configure_ipv6_neighbors()
5018
5019         except Exception:
5020             super(TestDSlite, cls).tearDownClass()
5021             raise
5022
5023     def test_dslite(self):
5024         """ Test DS-Lite """
5025         self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5026                                                  self.nat_addr_n)
5027         aftr_ip4 = '192.0.0.1'
5028         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5029         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5030         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5031         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5032
5033         # UDP
5034         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5035              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
5036              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5037              UDP(sport=20000, dport=10000))
5038         self.pg1.add_stream(p)
5039         self.pg_enable_capture(self.pg_interfaces)
5040         self.pg_start()
5041         capture = self.pg0.get_capture(1)
5042         capture = capture[0]
5043         self.assertFalse(capture.haslayer(IPv6))
5044         self.assertEqual(capture[IP].src, self.nat_addr)
5045         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5046         self.assertNotEqual(capture[UDP].sport, 20000)
5047         self.assertEqual(capture[UDP].dport, 10000)
5048         self.check_ip_checksum(capture)
5049         out_port = capture[UDP].sport
5050
5051         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5052              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5053              UDP(sport=10000, dport=out_port))
5054         self.pg0.add_stream(p)
5055         self.pg_enable_capture(self.pg_interfaces)
5056         self.pg_start()
5057         capture = self.pg1.get_capture(1)
5058         capture = capture[0]
5059         self.assertEqual(capture[IPv6].src, aftr_ip6)
5060         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5061         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5062         self.assertEqual(capture[IP].dst, '192.168.1.1')
5063         self.assertEqual(capture[UDP].sport, 10000)
5064         self.assertEqual(capture[UDP].dport, 20000)
5065         self.check_ip_checksum(capture)
5066
5067         # TCP
5068         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5069              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5070              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5071              TCP(sport=20001, dport=10001))
5072         self.pg1.add_stream(p)
5073         self.pg_enable_capture(self.pg_interfaces)
5074         self.pg_start()
5075         capture = self.pg0.get_capture(1)
5076         capture = capture[0]
5077         self.assertFalse(capture.haslayer(IPv6))
5078         self.assertEqual(capture[IP].src, self.nat_addr)
5079         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5080         self.assertNotEqual(capture[TCP].sport, 20001)
5081         self.assertEqual(capture[TCP].dport, 10001)
5082         self.check_ip_checksum(capture)
5083         self.check_tcp_checksum(capture)
5084         out_port = capture[TCP].sport
5085
5086         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5087              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5088              TCP(sport=10001, dport=out_port))
5089         self.pg0.add_stream(p)
5090         self.pg_enable_capture(self.pg_interfaces)
5091         self.pg_start()
5092         capture = self.pg1.get_capture(1)
5093         capture = capture[0]
5094         self.assertEqual(capture[IPv6].src, aftr_ip6)
5095         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5096         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5097         self.assertEqual(capture[IP].dst, '192.168.1.1')
5098         self.assertEqual(capture[TCP].sport, 10001)
5099         self.assertEqual(capture[TCP].dport, 20001)
5100         self.check_ip_checksum(capture)
5101         self.check_tcp_checksum(capture)
5102
5103         # ICMP
5104         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5105              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5106              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5107              ICMP(id=4000, type='echo-request'))
5108         self.pg1.add_stream(p)
5109         self.pg_enable_capture(self.pg_interfaces)
5110         self.pg_start()
5111         capture = self.pg0.get_capture(1)
5112         capture = capture[0]
5113         self.assertFalse(capture.haslayer(IPv6))
5114         self.assertEqual(capture[IP].src, self.nat_addr)
5115         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5116         self.assertNotEqual(capture[ICMP].id, 4000)
5117         self.check_ip_checksum(capture)
5118         self.check_icmp_checksum(capture)
5119         out_id = capture[ICMP].id
5120
5121         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5122              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5123              ICMP(id=out_id, type='echo-reply'))
5124         self.pg0.add_stream(p)
5125         self.pg_enable_capture(self.pg_interfaces)
5126         self.pg_start()
5127         capture = self.pg1.get_capture(1)
5128         capture = capture[0]
5129         self.assertEqual(capture[IPv6].src, aftr_ip6)
5130         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5131         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5132         self.assertEqual(capture[IP].dst, '192.168.1.1')
5133         self.assertEqual(capture[ICMP].id, 4000)
5134         self.check_ip_checksum(capture)
5135         self.check_icmp_checksum(capture)
5136
5137         # ping DS-Lite AFTR tunnel endpoint address
5138         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5139              IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
5140              ICMPv6EchoRequest())
5141         self.pg1.add_stream(p)
5142         self.pg_enable_capture(self.pg_interfaces)
5143         self.pg_start()
5144         capture = self.pg1.get_capture(1)
5145         self.assertEqual(1, len(capture))
5146         capture = capture[0]
5147         self.assertEqual(capture[IPv6].src, aftr_ip6)
5148         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5149         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5150
5151     def tearDown(self):
5152         super(TestDSlite, self).tearDown()
5153         if not self.vpp_dead:
5154             self.logger.info(self.vapi.cli("show dslite pool"))
5155             self.logger.info(
5156                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5157             self.logger.info(self.vapi.cli("show dslite sessions"))
5158
5159 if __name__ == '__main__':
5160     unittest.main(testRunner=VppTestRunner)