NAT44: asymmetrical load balancing static mapping rule (VPP-1132)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6 import StringIO
7 import random
8
9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
19 from util import ppp
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
24
25
26 class MethodHolder(VppTestCase):
27     """ NAT create capture and verify method holder """
28
29     @classmethod
30     def setUpClass(cls):
31         super(MethodHolder, cls).setUpClass()
32
33     def tearDown(self):
34         super(MethodHolder, self).tearDown()
35
36     def check_ip_checksum(self, pkt):
37         """
38         Check IP checksum of the packet
39
40         :param pkt: Packet to check IP checksum
41         """
42         new = pkt.__class__(str(pkt))
43         del new['IP'].chksum
44         new = new.__class__(str(new))
45         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
46
47     def check_tcp_checksum(self, pkt):
48         """
49         Check TCP checksum in IP packet
50
51         :param pkt: Packet to check TCP checksum
52         """
53         new = pkt.__class__(str(pkt))
54         del new['TCP'].chksum
55         new = new.__class__(str(new))
56         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
57
58     def check_udp_checksum(self, pkt):
59         """
60         Check UDP checksum in IP packet
61
62         :param pkt: Packet to check UDP checksum
63         """
64         new = pkt.__class__(str(pkt))
65         del new['UDP'].chksum
66         new = new.__class__(str(new))
67         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
68
69     def check_icmp_errror_embedded(self, pkt):
70         """
71         Check ICMP error embeded packet checksum
72
73         :param pkt: Packet to check ICMP error embeded packet checksum
74         """
75         if pkt.haslayer(IPerror):
76             new = pkt.__class__(str(pkt))
77             del new['IPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
80
81         if pkt.haslayer(TCPerror):
82             new = pkt.__class__(str(pkt))
83             del new['TCPerror'].chksum
84             new = new.__class__(str(new))
85             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
86
87         if pkt.haslayer(UDPerror):
88             if pkt['UDPerror'].chksum != 0:
89                 new = pkt.__class__(str(pkt))
90                 del new['UDPerror'].chksum
91                 new = new.__class__(str(new))
92                 self.assertEqual(new['UDPerror'].chksum,
93                                  pkt['UDPerror'].chksum)
94
95         if pkt.haslayer(ICMPerror):
96             del new['ICMPerror'].chksum
97             new = new.__class__(str(new))
98             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
99
100     def check_icmp_checksum(self, pkt):
101         """
102         Check ICMP checksum in IPv4 packet
103
104         :param pkt: Packet to check ICMP checksum
105         """
106         new = pkt.__class__(str(pkt))
107         del new['ICMP'].chksum
108         new = new.__class__(str(new))
109         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110         if pkt.haslayer(IPerror):
111             self.check_icmp_errror_embedded(pkt)
112
113     def check_icmpv6_checksum(self, pkt):
114         """
115         Check ICMPv6 checksum in IPv4 packet
116
117         :param pkt: Packet to check ICMPv6 checksum
118         """
119         new = pkt.__class__(str(pkt))
120         if pkt.haslayer(ICMPv6DestUnreach):
121             del new['ICMPv6DestUnreach'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124                              pkt['ICMPv6DestUnreach'].cksum)
125             self.check_icmp_errror_embedded(pkt)
126         if pkt.haslayer(ICMPv6EchoRequest):
127             del new['ICMPv6EchoRequest'].cksum
128             new = new.__class__(str(new))
129             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130                              pkt['ICMPv6EchoRequest'].cksum)
131         if pkt.haslayer(ICMPv6EchoReply):
132             del new['ICMPv6EchoReply'].cksum
133             new = new.__class__(str(new))
134             self.assertEqual(new['ICMPv6EchoReply'].cksum,
135                              pkt['ICMPv6EchoReply'].cksum)
136
137     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
138         """
139         Create packet stream for inside network
140
141         :param in_if: Inside interface
142         :param out_if: Outside interface
143         :param dst_ip: Destination address
144         :param ttl: TTL of generated packets
145         """
146         if dst_ip is None:
147             dst_ip = out_if.remote_ip4
148
149         pkts = []
150         # TCP
151         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153              TCP(sport=self.tcp_port_in, dport=20))
154         pkts.append(p)
155
156         # UDP
157         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159              UDP(sport=self.udp_port_in, dport=20))
160         pkts.append(p)
161
162         # ICMP
163         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165              ICMP(id=self.icmp_id_in, type='echo-request'))
166         pkts.append(p)
167
168         return pkts
169
170     def compose_ip6(self, ip4, pref, plen):
171         """
172         Compose IPv4-embedded IPv6 addresses
173
174         :param ip4: IPv4 address
175         :param pref: IPv6 prefix
176         :param plen: IPv6 prefix length
177         :returns: IPv4-embedded IPv6 addresses
178         """
179         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
181         if plen == 32:
182             pref_n[4] = ip4_n[0]
183             pref_n[5] = ip4_n[1]
184             pref_n[6] = ip4_n[2]
185             pref_n[7] = ip4_n[3]
186         elif plen == 40:
187             pref_n[5] = ip4_n[0]
188             pref_n[6] = ip4_n[1]
189             pref_n[7] = ip4_n[2]
190             pref_n[9] = ip4_n[3]
191         elif plen == 48:
192             pref_n[6] = ip4_n[0]
193             pref_n[7] = ip4_n[1]
194             pref_n[9] = ip4_n[2]
195             pref_n[10] = ip4_n[3]
196         elif plen == 56:
197             pref_n[7] = ip4_n[0]
198             pref_n[9] = ip4_n[1]
199             pref_n[10] = ip4_n[2]
200             pref_n[11] = ip4_n[3]
201         elif plen == 64:
202             pref_n[9] = ip4_n[0]
203             pref_n[10] = ip4_n[1]
204             pref_n[11] = ip4_n[2]
205             pref_n[12] = ip4_n[3]
206         elif plen == 96:
207             pref_n[12] = ip4_n[0]
208             pref_n[13] = ip4_n[1]
209             pref_n[14] = ip4_n[2]
210             pref_n[15] = ip4_n[3]
211         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
212
213     def extract_ip4(self, ip6, plen):
214         """
215         Extract IPv4 address embedded in IPv6 addresses
216
217         :param ip6: IPv6 address
218         :param plen: IPv6 prefix length
219         :returns: extracted IPv4 address
220         """
221         ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
222         ip4_n = [None] * 4
223         if plen == 32:
224             ip4_n[0] = ip6_n[4]
225             ip4_n[1] = ip6_n[5]
226             ip4_n[2] = ip6_n[6]
227             ip4_n[3] = ip6_n[7]
228         elif plen == 40:
229             ip4_n[0] = ip6_n[5]
230             ip4_n[1] = ip6_n[6]
231             ip4_n[2] = ip6_n[7]
232             ip4_n[3] = ip6_n[9]
233         elif plen == 48:
234             ip4_n[0] = ip6_n[6]
235             ip4_n[1] = ip6_n[7]
236             ip4_n[2] = ip6_n[9]
237             ip4_n[3] = ip6_n[10]
238         elif plen == 56:
239             ip4_n[0] = ip6_n[7]
240             ip4_n[1] = ip6_n[9]
241             ip4_n[2] = ip6_n[10]
242             ip4_n[3] = ip6_n[11]
243         elif plen == 64:
244             ip4_n[0] = ip6_n[9]
245             ip4_n[1] = ip6_n[10]
246             ip4_n[2] = ip6_n[11]
247             ip4_n[3] = ip6_n[12]
248         elif plen == 96:
249             ip4_n[0] = ip6_n[12]
250             ip4_n[1] = ip6_n[13]
251             ip4_n[2] = ip6_n[14]
252             ip4_n[3] = ip6_n[15]
253         return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
254
255     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
256         """
257         Create IPv6 packet stream for inside network
258
259         :param in_if: Inside interface
260         :param out_if: Outside interface
261         :param ttl: Hop Limit of generated packets
262         :param pref: NAT64 prefix
263         :param plen: NAT64 prefix length
264         """
265         pkts = []
266         if pref is None:
267             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
268         else:
269             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
270
271         # TCP
272         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274              TCP(sport=self.tcp_port_in, dport=20))
275         pkts.append(p)
276
277         # UDP
278         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280              UDP(sport=self.udp_port_in, dport=20))
281         pkts.append(p)
282
283         # ICMP
284         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286              ICMPv6EchoRequest(id=self.icmp_id_in))
287         pkts.append(p)
288
289         return pkts
290
291     def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292                           use_inside_ports=False):
293         """
294         Create packet stream for outside network
295
296         :param out_if: Outside interface
297         :param dst_ip: Destination IP address (Default use global NAT address)
298         :param ttl: TTL of generated packets
299         :param use_inside_ports: Use inside NAT ports as destination ports
300                instead of outside ports
301         """
302         if dst_ip is None:
303             dst_ip = self.nat_addr
304         if not use_inside_ports:
305             tcp_port = self.tcp_port_out
306             udp_port = self.udp_port_out
307             icmp_id = self.icmp_id_out
308         else:
309             tcp_port = self.tcp_port_in
310             udp_port = self.udp_port_in
311             icmp_id = self.icmp_id_in
312         pkts = []
313         # TCP
314         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316              TCP(dport=tcp_port, sport=20))
317         pkts.append(p)
318
319         # UDP
320         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322              UDP(dport=udp_port, sport=20))
323         pkts.append(p)
324
325         # ICMP
326         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328              ICMP(id=icmp_id, type='echo-reply'))
329         pkts.append(p)
330
331         return pkts
332
333     def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
334         """
335         Create packet stream for outside network
336
337         :param out_if: Outside interface
338         :param dst_ip: Destination IP address (Default use global NAT address)
339         :param hl: HL of generated packets
340         """
341         pkts = []
342         # TCP
343         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345              TCP(dport=self.tcp_port_out, sport=20))
346         pkts.append(p)
347
348         # UDP
349         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351              UDP(dport=self.udp_port_out, sport=20))
352         pkts.append(p)
353
354         # ICMP
355         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357              ICMPv6EchoReply(id=self.icmp_id_out))
358         pkts.append(p)
359
360         return pkts
361
362     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363                            packet_num=3, dst_ip=None, is_ip6=False):
364         """
365         Verify captured packets on outside network
366
367         :param capture: Captured packets
368         :param nat_ip: Translated IP address (Default use global NAT address)
369         :param same_port: Sorce port number is not translated (Default False)
370         :param packet_num: Expected number of packets (Default 3)
371         :param dst_ip: Destination IP address (Default do not verify)
372         :param is_ip6: If L3 protocol is IPv6 (Default False)
373         """
374         if is_ip6:
375             IP46 = IPv6
376             ICMP46 = ICMPv6EchoRequest
377         else:
378             IP46 = IP
379             ICMP46 = ICMP
380         if nat_ip is None:
381             nat_ip = self.nat_addr
382         self.assertEqual(packet_num, len(capture))
383         for packet in capture:
384             try:
385                 if not is_ip6:
386                     self.check_ip_checksum(packet)
387                 self.assertEqual(packet[IP46].src, nat_ip)
388                 if dst_ip is not None:
389                     self.assertEqual(packet[IP46].dst, dst_ip)
390                 if packet.haslayer(TCP):
391                     if same_port:
392                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
393                     else:
394                         self.assertNotEqual(
395                             packet[TCP].sport, self.tcp_port_in)
396                     self.tcp_port_out = packet[TCP].sport
397                     self.check_tcp_checksum(packet)
398                 elif packet.haslayer(UDP):
399                     if same_port:
400                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
401                     else:
402                         self.assertNotEqual(
403                             packet[UDP].sport, self.udp_port_in)
404                     self.udp_port_out = packet[UDP].sport
405                 else:
406                     if same_port:
407                         self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
408                     else:
409                         self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410                     self.icmp_id_out = packet[ICMP46].id
411                     if is_ip6:
412                         self.check_icmpv6_checksum(packet)
413                     else:
414                         self.check_icmp_checksum(packet)
415             except:
416                 self.logger.error(ppp("Unexpected or invalid packet "
417                                       "(outside network):", packet))
418                 raise
419
420     def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421                                packet_num=3, dst_ip=None):
422         """
423         Verify captured packets on outside network
424
425         :param capture: Captured packets
426         :param nat_ip: Translated IP address
427         :param same_port: Sorce port number is not translated (Default False)
428         :param packet_num: Expected number of packets (Default 3)
429         :param dst_ip: Destination IP address (Default do not verify)
430         """
431         return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
432                                        dst_ip, True)
433
434     def verify_capture_in(self, capture, in_if, packet_num=3):
435         """
436         Verify captured packets on inside network
437
438         :param capture: Captured packets
439         :param in_if: Inside interface
440         :param packet_num: Expected number of packets (Default 3)
441         """
442         self.assertEqual(packet_num, len(capture))
443         for packet in capture:
444             try:
445                 self.check_ip_checksum(packet)
446                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447                 if packet.haslayer(TCP):
448                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449                     self.check_tcp_checksum(packet)
450                 elif packet.haslayer(UDP):
451                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
452                 else:
453                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454                     self.check_icmp_checksum(packet)
455             except:
456                 self.logger.error(ppp("Unexpected or invalid packet "
457                                       "(inside network):", packet))
458                 raise
459
460     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
461         """
462         Verify captured IPv6 packets on inside network
463
464         :param capture: Captured packets
465         :param src_ip: Source IP
466         :param dst_ip: Destination IP address
467         :param packet_num: Expected number of packets (Default 3)
468         """
469         self.assertEqual(packet_num, len(capture))
470         for packet in capture:
471             try:
472                 self.assertEqual(packet[IPv6].src, src_ip)
473                 self.assertEqual(packet[IPv6].dst, dst_ip)
474                 if packet.haslayer(TCP):
475                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476                     self.check_tcp_checksum(packet)
477                 elif packet.haslayer(UDP):
478                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
479                     self.check_udp_checksum(packet)
480                 else:
481                     self.assertEqual(packet[ICMPv6EchoReply].id,
482                                      self.icmp_id_in)
483                     self.check_icmpv6_checksum(packet)
484             except:
485                 self.logger.error(ppp("Unexpected or invalid packet "
486                                       "(inside network):", packet))
487                 raise
488
489     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
490         """
491         Verify captured packet that don't have to be translated
492
493         :param capture: Captured packets
494         :param ingress_if: Ingress interface
495         :param egress_if: Egress interface
496         """
497         for packet in capture:
498             try:
499                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501                 if packet.haslayer(TCP):
502                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503                 elif packet.haslayer(UDP):
504                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
505                 else:
506                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
507             except:
508                 self.logger.error(ppp("Unexpected or invalid packet "
509                                       "(inside network):", packet))
510                 raise
511
512     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513                                             packet_num=3, icmp_type=11):
514         """
515         Verify captured packets with ICMP errors on outside network
516
517         :param capture: Captured packets
518         :param src_ip: Translated IP address or IP address of VPP
519                        (Default use global NAT address)
520         :param packet_num: Expected number of packets (Default 3)
521         :param icmp_type: Type of error ICMP packet
522                           we are expecting (Default 11)
523         """
524         if src_ip is None:
525             src_ip = self.nat_addr
526         self.assertEqual(packet_num, len(capture))
527         for packet in capture:
528             try:
529                 self.assertEqual(packet[IP].src, src_ip)
530                 self.assertTrue(packet.haslayer(ICMP))
531                 icmp = packet[ICMP]
532                 self.assertEqual(icmp.type, icmp_type)
533                 self.assertTrue(icmp.haslayer(IPerror))
534                 inner_ip = icmp[IPerror]
535                 if inner_ip.haslayer(TCPerror):
536                     self.assertEqual(inner_ip[TCPerror].dport,
537                                      self.tcp_port_out)
538                 elif inner_ip.haslayer(UDPerror):
539                     self.assertEqual(inner_ip[UDPerror].dport,
540                                      self.udp_port_out)
541                 else:
542                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
543             except:
544                 self.logger.error(ppp("Unexpected or invalid packet "
545                                       "(outside network):", packet))
546                 raise
547
548     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
549                                            icmp_type=11):
550         """
551         Verify captured packets with ICMP errors on inside network
552
553         :param capture: Captured packets
554         :param in_if: Inside interface
555         :param packet_num: Expected number of packets (Default 3)
556         :param icmp_type: Type of error ICMP packet
557                           we are expecting (Default 11)
558         """
559         self.assertEqual(packet_num, len(capture))
560         for packet in capture:
561             try:
562                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563                 self.assertTrue(packet.haslayer(ICMP))
564                 icmp = packet[ICMP]
565                 self.assertEqual(icmp.type, icmp_type)
566                 self.assertTrue(icmp.haslayer(IPerror))
567                 inner_ip = icmp[IPerror]
568                 if inner_ip.haslayer(TCPerror):
569                     self.assertEqual(inner_ip[TCPerror].sport,
570                                      self.tcp_port_in)
571                 elif inner_ip.haslayer(UDPerror):
572                     self.assertEqual(inner_ip[UDPerror].sport,
573                                      self.udp_port_in)
574                 else:
575                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
576             except:
577                 self.logger.error(ppp("Unexpected or invalid packet "
578                                       "(inside network):", packet))
579                 raise
580
581     def create_stream_frag(self, src_if, dst, sport, dport, data):
582         """
583         Create fragmented packet stream
584
585         :param src_if: Source interface
586         :param dst: Destination IPv4 address
587         :param sport: Source TCP port
588         :param dport: Destination TCP port
589         :param data: Payload data
590         :returns: Fragmets
591         """
592         id = random.randint(0, 65535)
593         p = (IP(src=src_if.remote_ip4, dst=dst) /
594              TCP(sport=sport, dport=dport) /
595              Raw(data))
596         p = p.__class__(str(p))
597         chksum = p['TCP'].chksum
598         pkts = []
599         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601              TCP(sport=sport, dport=dport, chksum=chksum) /
602              Raw(data[0:4]))
603         pkts.append(p)
604         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606                 proto=IP_PROTOS.tcp) /
607              Raw(data[4:20]))
608         pkts.append(p)
609         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
611                 id=id) /
612              Raw(data[20:]))
613         pkts.append(p)
614         return pkts
615
616     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617                                pref=None, plen=0, frag_size=128):
618         """
619         Create fragmented packet stream
620
621         :param src_if: Source interface
622         :param dst: Destination IPv4 address
623         :param sport: Source TCP port
624         :param dport: Destination TCP port
625         :param data: Payload data
626         :param pref: NAT64 prefix
627         :param plen: NAT64 prefix length
628         :param fragsize: size of fragments
629         :returns: Fragmets
630         """
631         if pref is None:
632             dst_ip6 = ''.join(['64:ff9b::', dst])
633         else:
634             dst_ip6 = self.compose_ip6(dst, pref, plen)
635
636         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639              TCP(sport=sport, dport=dport) /
640              Raw(data))
641
642         return fragment6(p, frag_size)
643
644     def reass_frags_and_verify(self, frags, src, dst):
645         """
646         Reassemble and verify fragmented packet
647
648         :param frags: Captured fragments
649         :param src: Source IPv4 address to verify
650         :param dst: Destination IPv4 address to verify
651
652         :returns: Reassembled IPv4 packet
653         """
654         buffer = StringIO.StringIO()
655         for p in frags:
656             self.assertEqual(p[IP].src, src)
657             self.assertEqual(p[IP].dst, dst)
658             self.check_ip_checksum(p)
659             buffer.seek(p[IP].frag * 8)
660             buffer.write(p[IP].payload)
661         ip = frags[0].getlayer(IP)
662         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663                 proto=frags[0][IP].proto)
664         if ip.proto == IP_PROTOS.tcp:
665             p = (ip / TCP(buffer.getvalue()))
666             self.check_tcp_checksum(p)
667         elif ip.proto == IP_PROTOS.udp:
668             p = (ip / UDP(buffer.getvalue()))
669         return p
670
671     def reass_frags_and_verify_ip6(self, frags, src, dst):
672         """
673         Reassemble and verify fragmented packet
674
675         :param frags: Captured fragments
676         :param src: Source IPv6 address to verify
677         :param dst: Destination IPv6 address to verify
678
679         :returns: Reassembled IPv6 packet
680         """
681         buffer = StringIO.StringIO()
682         for p in frags:
683             self.assertEqual(p[IPv6].src, src)
684             self.assertEqual(p[IPv6].dst, dst)
685             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686             buffer.write(p[IPv6ExtHdrFragment].payload)
687         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688                   nh=frags[0][IPv6ExtHdrFragment].nh)
689         if ip.nh == IP_PROTOS.tcp:
690             p = (ip / TCP(buffer.getvalue()))
691             self.check_tcp_checksum(p)
692         elif ip.nh == IP_PROTOS.udp:
693             p = (ip / UDP(buffer.getvalue()))
694         return p
695
696     def verify_ipfix_nat44_ses(self, data):
697         """
698         Verify IPFIX NAT44 session create/delete event
699
700         :param data: Decoded IPFIX data records
701         """
702         nat44_ses_create_num = 0
703         nat44_ses_delete_num = 0
704         self.assertEqual(6, len(data))
705         for record in data:
706             # natEvent
707             self.assertIn(ord(record[230]), [4, 5])
708             if ord(record[230]) == 4:
709                 nat44_ses_create_num += 1
710             else:
711                 nat44_ses_delete_num += 1
712             # sourceIPv4Address
713             self.assertEqual(self.pg0.remote_ip4n, record[8])
714             # postNATSourceIPv4Address
715             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
716                              record[225])
717             # ingressVRFID
718             self.assertEqual(struct.pack("!I", 0), record[234])
719             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720             if IP_PROTOS.icmp == ord(record[4]):
721                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
723                                  record[227])
724             elif IP_PROTOS.tcp == ord(record[4]):
725                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
726                                  record[7])
727                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
728                                  record[227])
729             elif IP_PROTOS.udp == ord(record[4]):
730                 self.assertEqual(struct.pack("!H", self.udp_port_in),
731                                  record[7])
732                 self.assertEqual(struct.pack("!H", self.udp_port_out),
733                                  record[227])
734             else:
735                 self.fail("Invalid protocol")
736         self.assertEqual(3, nat44_ses_create_num)
737         self.assertEqual(3, nat44_ses_delete_num)
738
739     def verify_ipfix_addr_exhausted(self, data):
740         """
741         Verify IPFIX NAT addresses event
742
743         :param data: Decoded IPFIX data records
744         """
745         self.assertEqual(1, len(data))
746         record = data[0]
747         # natEvent
748         self.assertEqual(ord(record[230]), 3)
749         # natPoolID
750         self.assertEqual(struct.pack("!I", 0), record[283])
751
752     def verify_ipfix_max_sessions(self, data, limit):
753         """
754         Verify IPFIX maximum session entries exceeded event
755
756         :param data: Decoded IPFIX data records
757         :param limit: Number of maximum session entries that can be created.
758         """
759         self.assertEqual(1, len(data))
760         record = data[0]
761         # natEvent
762         self.assertEqual(ord(record[230]), 13)
763         # natQuotaExceededEvent
764         self.assertEqual(struct.pack("I", 1), record[466])
765         # maxSessionEntries
766         self.assertEqual(struct.pack("I", limit), record[471])
767
768     def verify_ipfix_max_bibs(self, data, limit):
769         """
770         Verify IPFIX maximum BIB entries exceeded event
771
772         :param data: Decoded IPFIX data records
773         :param limit: Number of maximum BIB entries that can be created.
774         """
775         self.assertEqual(1, len(data))
776         record = data[0]
777         # natEvent
778         self.assertEqual(ord(record[230]), 13)
779         # natQuotaExceededEvent
780         self.assertEqual(struct.pack("I", 2), record[466])
781         # maxBIBEntries
782         self.assertEqual(struct.pack("I", limit), record[472])
783
784     def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
785         """
786         Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
787
788         :param data: Decoded IPFIX data records
789         :param limit: Number of maximum fragments pending reassembly
790         :param src_addr: IPv6 source address
791         """
792         self.assertEqual(1, len(data))
793         record = data[0]
794         # natEvent
795         self.assertEqual(ord(record[230]), 13)
796         # natQuotaExceededEvent
797         self.assertEqual(struct.pack("I", 5), record[466])
798         # maxFragmentsPendingReassembly
799         self.assertEqual(struct.pack("I", limit), record[475])
800         # sourceIPv6Address
801         self.assertEqual(src_addr, record[27])
802
803     def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
804         """
805         Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
806
807         :param data: Decoded IPFIX data records
808         :param limit: Number of maximum fragments pending reassembly
809         :param src_addr: IPv4 source address
810         """
811         self.assertEqual(1, len(data))
812         record = data[0]
813         # natEvent
814         self.assertEqual(ord(record[230]), 13)
815         # natQuotaExceededEvent
816         self.assertEqual(struct.pack("I", 5), record[466])
817         # maxFragmentsPendingReassembly
818         self.assertEqual(struct.pack("I", limit), record[475])
819         # sourceIPv4Address
820         self.assertEqual(src_addr, record[8])
821
822     def verify_ipfix_bib(self, data, is_create, src_addr):
823         """
824         Verify IPFIX NAT64 BIB create and delete events
825
826         :param data: Decoded IPFIX data records
827         :param is_create: Create event if nonzero value otherwise delete event
828         :param src_addr: IPv6 source address
829         """
830         self.assertEqual(1, len(data))
831         record = data[0]
832         # natEvent
833         if is_create:
834             self.assertEqual(ord(record[230]), 10)
835         else:
836             self.assertEqual(ord(record[230]), 11)
837         # sourceIPv6Address
838         self.assertEqual(src_addr, record[27])
839         # postNATSourceIPv4Address
840         self.assertEqual(self.nat_addr_n, record[225])
841         # protocolIdentifier
842         self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
843         # ingressVRFID
844         self.assertEqual(struct.pack("!I", 0), record[234])
845         # sourceTransportPort
846         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847         # postNAPTSourceTransportPort
848         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
849
850     def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
851                                dst_port):
852         """
853         Verify IPFIX NAT64 session create and delete events
854
855         :param data: Decoded IPFIX data records
856         :param is_create: Create event if nonzero value otherwise delete event
857         :param src_addr: IPv6 source address
858         :param dst_addr: IPv4 destination address
859         :param dst_port: destination TCP port
860         """
861         self.assertEqual(1, len(data))
862         record = data[0]
863         # natEvent
864         if is_create:
865             self.assertEqual(ord(record[230]), 6)
866         else:
867             self.assertEqual(ord(record[230]), 7)
868         # sourceIPv6Address
869         self.assertEqual(src_addr, record[27])
870         # destinationIPv6Address
871         self.assertEqual(socket.inet_pton(socket.AF_INET6,
872                                           self.compose_ip6(dst_addr,
873                                                            '64:ff9b::',
874                                                            96)),
875                          record[28])
876         # postNATSourceIPv4Address
877         self.assertEqual(self.nat_addr_n, record[225])
878         # postNATDestinationIPv4Address
879         self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
880                          record[226])
881         # protocolIdentifier
882         self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
883         # ingressVRFID
884         self.assertEqual(struct.pack("!I", 0), record[234])
885         # sourceTransportPort
886         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887         # postNAPTSourceTransportPort
888         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889         # destinationTransportPort
890         self.assertEqual(struct.pack("!H", dst_port), record[11])
891         # postNAPTDestinationTransportPort
892         self.assertEqual(struct.pack("!H", dst_port), record[228])
893
894
895 class TestNAT44(MethodHolder):
896     """ NAT44 Test Cases """
897
898     @classmethod
899     def setUpClass(cls):
900         super(TestNAT44, cls).setUpClass()
901
902         try:
903             cls.tcp_port_in = 6303
904             cls.tcp_port_out = 6303
905             cls.udp_port_in = 6304
906             cls.udp_port_out = 6304
907             cls.icmp_id_in = 6305
908             cls.icmp_id_out = 6305
909             cls.nat_addr = '10.0.0.3'
910             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911             cls.ipfix_src_port = 4739
912             cls.ipfix_domain_id = 1
913
914             cls.create_pg_interfaces(range(10))
915             cls.interfaces = list(cls.pg_interfaces[0:4])
916
917             for i in cls.interfaces:
918                 i.admin_up()
919                 i.config_ip4()
920                 i.resolve_arp()
921
922             cls.pg0.generate_remote_hosts(3)
923             cls.pg0.configure_ipv4_neighbors()
924
925             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926             cls.vapi.ip_table_add_del(10, is_add=1)
927             cls.vapi.ip_table_add_del(20, is_add=1)
928
929             cls.pg4._local_ip4 = "172.16.255.1"
930             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932             cls.pg4.set_table_ip4(10)
933             cls.pg5._local_ip4 = "172.17.255.3"
934             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936             cls.pg5.set_table_ip4(10)
937             cls.pg6._local_ip4 = "172.16.255.1"
938             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940             cls.pg6.set_table_ip4(20)
941             for i in cls.overlapping_interfaces:
942                 i.config_ip4()
943                 i.admin_up()
944                 i.resolve_arp()
945
946             cls.pg7.admin_up()
947             cls.pg8.admin_up()
948
949             cls.pg9.generate_remote_hosts(2)
950             cls.pg9.config_ip4()
951             ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952             cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
953                                                   ip_addr_n,
954                                                   24)
955             cls.pg9.admin_up()
956             cls.pg9.resolve_arp()
957             cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959             cls.pg9.resolve_arp()
960
961         except Exception:
962             super(TestNAT44, cls).tearDownClass()
963             raise
964
965     def clear_nat44(self):
966         """
967         Clear NAT44 configuration.
968         """
969         # I found no elegant way to do this
970         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971                                    dst_address_length=32,
972                                    next_hop_address=self.pg7.remote_ip4n,
973                                    next_hop_sw_if_index=self.pg7.sw_if_index,
974                                    is_add=0)
975         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976                                    dst_address_length=32,
977                                    next_hop_address=self.pg8.remote_ip4n,
978                                    next_hop_sw_if_index=self.pg8.sw_if_index,
979                                    is_add=0)
980
981         for intf in [self.pg7, self.pg8]:
982             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
983             for n in neighbors:
984                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
985                                               n.mac_address,
986                                               n.ip_address,
987                                               is_add=0)
988
989         if self.pg7.has_ip4_config:
990             self.pg7.unconfig_ip4()
991
992         self.vapi.nat44_forwarding_enable_disable(0)
993
994         interfaces = self.vapi.nat44_interface_addr_dump()
995         for intf in interfaces:
996             self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997                                                twice_nat=intf.twice_nat,
998                                                is_add=0)
999
1000         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001                             domain_id=self.ipfix_domain_id)
1002         self.ipfix_src_port = 4739
1003         self.ipfix_domain_id = 1
1004
1005         interfaces = self.vapi.nat44_interface_dump()
1006         for intf in interfaces:
1007             if intf.is_inside > 1:
1008                 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1009                                                           0,
1010                                                           is_add=0)
1011             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1012                                                       intf.is_inside,
1013                                                       is_add=0)
1014
1015         interfaces = self.vapi.nat44_interface_output_feature_dump()
1016         for intf in interfaces:
1017             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1018                                                              intf.is_inside,
1019                                                              is_add=0)
1020
1021         static_mappings = self.vapi.nat44_static_mapping_dump()
1022         for sm in static_mappings:
1023             self.vapi.nat44_add_del_static_mapping(
1024                 sm.local_ip_address,
1025                 sm.external_ip_address,
1026                 local_port=sm.local_port,
1027                 external_port=sm.external_port,
1028                 addr_only=sm.addr_only,
1029                 vrf_id=sm.vrf_id,
1030                 protocol=sm.protocol,
1031                 twice_nat=sm.twice_nat,
1032                 is_add=0)
1033
1034         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1035         for lb_sm in lb_static_mappings:
1036             self.vapi.nat44_add_del_lb_static_mapping(
1037                 lb_sm.external_addr,
1038                 lb_sm.external_port,
1039                 lb_sm.protocol,
1040                 vrf_id=lb_sm.vrf_id,
1041                 twice_nat=lb_sm.twice_nat,
1042                 out2in_only=lb_sm.out2in_only,
1043                 is_add=0,
1044                 local_num=0,
1045                 locals=[])
1046
1047         identity_mappings = self.vapi.nat44_identity_mapping_dump()
1048         for id_m in identity_mappings:
1049             self.vapi.nat44_add_del_identity_mapping(
1050                 addr_only=id_m.addr_only,
1051                 ip=id_m.ip_address,
1052                 port=id_m.port,
1053                 sw_if_index=id_m.sw_if_index,
1054                 vrf_id=id_m.vrf_id,
1055                 protocol=id_m.protocol,
1056                 is_add=0)
1057
1058         adresses = self.vapi.nat44_address_dump()
1059         for addr in adresses:
1060             self.vapi.nat44_add_del_address_range(addr.ip_address,
1061                                                   addr.ip_address,
1062                                                   twice_nat=addr.twice_nat,
1063                                                   is_add=0)
1064
1065         self.vapi.nat_set_reass()
1066         self.vapi.nat_set_reass(is_ip6=1)
1067
1068     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1069                                  local_port=0, external_port=0, vrf_id=0,
1070                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
1071                                  proto=0, twice_nat=0):
1072         """
1073         Add/delete NAT44 static mapping
1074
1075         :param local_ip: Local IP address
1076         :param external_ip: External IP address
1077         :param local_port: Local port number (Optional)
1078         :param external_port: External port number (Optional)
1079         :param vrf_id: VRF ID (Default 0)
1080         :param is_add: 1 if add, 0 if delete (Default add)
1081         :param external_sw_if_index: External interface instead of IP address
1082         :param proto: IP protocol (Mandatory if port specified)
1083         :param twice_nat: 1 if translate external host address and port
1084         """
1085         addr_only = 1
1086         if local_port and external_port:
1087             addr_only = 0
1088         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1089         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1090         self.vapi.nat44_add_del_static_mapping(
1091             l_ip,
1092             e_ip,
1093             external_sw_if_index,
1094             local_port,
1095             external_port,
1096             addr_only,
1097             vrf_id,
1098             proto,
1099             twice_nat,
1100             is_add)
1101
1102     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1103         """
1104         Add/delete NAT44 address
1105
1106         :param ip: IP address
1107         :param is_add: 1 if add, 0 if delete (Default add)
1108         :param twice_nat: twice NAT address for extenal hosts
1109         """
1110         nat_addr = socket.inet_pton(socket.AF_INET, ip)
1111         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1112                                               vrf_id=vrf_id,
1113                                               twice_nat=twice_nat)
1114
1115     def test_dynamic(self):
1116         """ NAT44 dynamic translation test """
1117
1118         self.nat44_add_address(self.nat_addr)
1119         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1120         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1121                                                   is_inside=0)
1122
1123         # in2out
1124         pkts = self.create_stream_in(self.pg0, self.pg1)
1125         self.pg0.add_stream(pkts)
1126         self.pg_enable_capture(self.pg_interfaces)
1127         self.pg_start()
1128         capture = self.pg1.get_capture(len(pkts))
1129         self.verify_capture_out(capture)
1130
1131         # out2in
1132         pkts = self.create_stream_out(self.pg1)
1133         self.pg1.add_stream(pkts)
1134         self.pg_enable_capture(self.pg_interfaces)
1135         self.pg_start()
1136         capture = self.pg0.get_capture(len(pkts))
1137         self.verify_capture_in(capture, self.pg0)
1138
1139     def test_dynamic_icmp_errors_in2out_ttl_1(self):
1140         """ NAT44 handling of client packets with TTL=1 """
1141
1142         self.nat44_add_address(self.nat_addr)
1143         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1144         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1145                                                   is_inside=0)
1146
1147         # Client side - generate traffic
1148         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1149         self.pg0.add_stream(pkts)
1150         self.pg_enable_capture(self.pg_interfaces)
1151         self.pg_start()
1152
1153         # Client side - verify ICMP type 11 packets
1154         capture = self.pg0.get_capture(len(pkts))
1155         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1156
1157     def test_dynamic_icmp_errors_out2in_ttl_1(self):
1158         """ NAT44 handling of server packets with TTL=1 """
1159
1160         self.nat44_add_address(self.nat_addr)
1161         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1162         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1163                                                   is_inside=0)
1164
1165         # Client side - create sessions
1166         pkts = self.create_stream_in(self.pg0, self.pg1)
1167         self.pg0.add_stream(pkts)
1168         self.pg_enable_capture(self.pg_interfaces)
1169         self.pg_start()
1170
1171         # Server side - generate traffic
1172         capture = self.pg1.get_capture(len(pkts))
1173         self.verify_capture_out(capture)
1174         pkts = self.create_stream_out(self.pg1, ttl=1)
1175         self.pg1.add_stream(pkts)
1176         self.pg_enable_capture(self.pg_interfaces)
1177         self.pg_start()
1178
1179         # Server side - verify ICMP type 11 packets
1180         capture = self.pg1.get_capture(len(pkts))
1181         self.verify_capture_out_with_icmp_errors(capture,
1182                                                  src_ip=self.pg1.local_ip4)
1183
1184     def test_dynamic_icmp_errors_in2out_ttl_2(self):
1185         """ NAT44 handling of error responses to client packets with TTL=2 """
1186
1187         self.nat44_add_address(self.nat_addr)
1188         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1189         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1190                                                   is_inside=0)
1191
1192         # Client side - generate traffic
1193         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1194         self.pg0.add_stream(pkts)
1195         self.pg_enable_capture(self.pg_interfaces)
1196         self.pg_start()
1197
1198         # Server side - simulate ICMP type 11 response
1199         capture = self.pg1.get_capture(len(pkts))
1200         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1201                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1202                 ICMP(type=11) / packet[IP] for packet in capture]
1203         self.pg1.add_stream(pkts)
1204         self.pg_enable_capture(self.pg_interfaces)
1205         self.pg_start()
1206
1207         # Client side - verify ICMP type 11 packets
1208         capture = self.pg0.get_capture(len(pkts))
1209         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1210
1211     def test_dynamic_icmp_errors_out2in_ttl_2(self):
1212         """ NAT44 handling of error responses to server packets with TTL=2 """
1213
1214         self.nat44_add_address(self.nat_addr)
1215         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1216         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1217                                                   is_inside=0)
1218
1219         # Client side - create sessions
1220         pkts = self.create_stream_in(self.pg0, self.pg1)
1221         self.pg0.add_stream(pkts)
1222         self.pg_enable_capture(self.pg_interfaces)
1223         self.pg_start()
1224
1225         # Server side - generate traffic
1226         capture = self.pg1.get_capture(len(pkts))
1227         self.verify_capture_out(capture)
1228         pkts = self.create_stream_out(self.pg1, ttl=2)
1229         self.pg1.add_stream(pkts)
1230         self.pg_enable_capture(self.pg_interfaces)
1231         self.pg_start()
1232
1233         # Client side - simulate ICMP type 11 response
1234         capture = self.pg0.get_capture(len(pkts))
1235         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1236                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1237                 ICMP(type=11) / packet[IP] for packet in capture]
1238         self.pg0.add_stream(pkts)
1239         self.pg_enable_capture(self.pg_interfaces)
1240         self.pg_start()
1241
1242         # Server side - verify ICMP type 11 packets
1243         capture = self.pg1.get_capture(len(pkts))
1244         self.verify_capture_out_with_icmp_errors(capture)
1245
1246     def test_ping_out_interface_from_outside(self):
1247         """ Ping NAT44 out interface from outside network """
1248
1249         self.nat44_add_address(self.nat_addr)
1250         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1251         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1252                                                   is_inside=0)
1253
1254         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1255              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1256              ICMP(id=self.icmp_id_out, type='echo-request'))
1257         pkts = [p]
1258         self.pg1.add_stream(pkts)
1259         self.pg_enable_capture(self.pg_interfaces)
1260         self.pg_start()
1261         capture = self.pg1.get_capture(len(pkts))
1262         self.assertEqual(1, len(capture))
1263         packet = capture[0]
1264         try:
1265             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1266             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1267             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1268             self.assertEqual(packet[ICMP].type, 0)  # echo reply
1269         except:
1270             self.logger.error(ppp("Unexpected or invalid packet "
1271                                   "(outside network):", packet))
1272             raise
1273
1274     def test_ping_internal_host_from_outside(self):
1275         """ Ping internal host from outside network """
1276
1277         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1278         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1279         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1280                                                   is_inside=0)
1281
1282         # out2in
1283         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1284                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1285                ICMP(id=self.icmp_id_out, type='echo-request'))
1286         self.pg1.add_stream(pkt)
1287         self.pg_enable_capture(self.pg_interfaces)
1288         self.pg_start()
1289         capture = self.pg0.get_capture(1)
1290         self.verify_capture_in(capture, self.pg0, packet_num=1)
1291         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1292
1293         # in2out
1294         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1295                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1296                ICMP(id=self.icmp_id_in, type='echo-reply'))
1297         self.pg0.add_stream(pkt)
1298         self.pg_enable_capture(self.pg_interfaces)
1299         self.pg_start()
1300         capture = self.pg1.get_capture(1)
1301         self.verify_capture_out(capture, same_port=True, packet_num=1)
1302         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1303
1304     def test_forwarding(self):
1305         """ NAT44 forwarding test """
1306
1307         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1308         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1309                                                   is_inside=0)
1310         self.vapi.nat44_forwarding_enable_disable(1)
1311
1312         real_ip = self.pg0.remote_ip4n
1313         alias_ip = self.nat_addr_n
1314         self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1315                                                external_ip=alias_ip)
1316
1317         try:
1318             # in2out - static mapping match
1319
1320             pkts = self.create_stream_out(self.pg1)
1321             self.pg1.add_stream(pkts)
1322             self.pg_enable_capture(self.pg_interfaces)
1323             self.pg_start()
1324             capture = self.pg0.get_capture(len(pkts))
1325             self.verify_capture_in(capture, self.pg0)
1326
1327             pkts = self.create_stream_in(self.pg0, self.pg1)
1328             self.pg0.add_stream(pkts)
1329             self.pg_enable_capture(self.pg_interfaces)
1330             self.pg_start()
1331             capture = self.pg1.get_capture(len(pkts))
1332             self.verify_capture_out(capture, same_port=True)
1333
1334             # in2out - no static mapping match
1335
1336             host0 = self.pg0.remote_hosts[0]
1337             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1338             try:
1339                 pkts = self.create_stream_out(self.pg1,
1340                                               dst_ip=self.pg0.remote_ip4,
1341                                               use_inside_ports=True)
1342                 self.pg1.add_stream(pkts)
1343                 self.pg_enable_capture(self.pg_interfaces)
1344                 self.pg_start()
1345                 capture = self.pg0.get_capture(len(pkts))
1346                 self.verify_capture_in(capture, self.pg0)
1347
1348                 pkts = self.create_stream_in(self.pg0, self.pg1)
1349                 self.pg0.add_stream(pkts)
1350                 self.pg_enable_capture(self.pg_interfaces)
1351                 self.pg_start()
1352                 capture = self.pg1.get_capture(len(pkts))
1353                 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1354                                         same_port=True)
1355             finally:
1356                 self.pg0.remote_hosts[0] = host0
1357
1358         finally:
1359             self.vapi.nat44_forwarding_enable_disable(0)
1360             self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1361                                                    external_ip=alias_ip,
1362                                                    is_add=0)
1363
1364     def test_static_in(self):
1365         """ 1:1 NAT initialized from inside network """
1366
1367         nat_ip = "10.0.0.10"
1368         self.tcp_port_out = 6303
1369         self.udp_port_out = 6304
1370         self.icmp_id_out = 6305
1371
1372         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1373         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1374         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1375                                                   is_inside=0)
1376
1377         # in2out
1378         pkts = self.create_stream_in(self.pg0, self.pg1)
1379         self.pg0.add_stream(pkts)
1380         self.pg_enable_capture(self.pg_interfaces)
1381         self.pg_start()
1382         capture = self.pg1.get_capture(len(pkts))
1383         self.verify_capture_out(capture, nat_ip, True)
1384
1385         # out2in
1386         pkts = self.create_stream_out(self.pg1, nat_ip)
1387         self.pg1.add_stream(pkts)
1388         self.pg_enable_capture(self.pg_interfaces)
1389         self.pg_start()
1390         capture = self.pg0.get_capture(len(pkts))
1391         self.verify_capture_in(capture, self.pg0)
1392
1393     def test_static_out(self):
1394         """ 1:1 NAT initialized from outside network """
1395
1396         nat_ip = "10.0.0.20"
1397         self.tcp_port_out = 6303
1398         self.udp_port_out = 6304
1399         self.icmp_id_out = 6305
1400
1401         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1402         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1403         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1404                                                   is_inside=0)
1405
1406         # out2in
1407         pkts = self.create_stream_out(self.pg1, nat_ip)
1408         self.pg1.add_stream(pkts)
1409         self.pg_enable_capture(self.pg_interfaces)
1410         self.pg_start()
1411         capture = self.pg0.get_capture(len(pkts))
1412         self.verify_capture_in(capture, self.pg0)
1413
1414         # in2out
1415         pkts = self.create_stream_in(self.pg0, self.pg1)
1416         self.pg0.add_stream(pkts)
1417         self.pg_enable_capture(self.pg_interfaces)
1418         self.pg_start()
1419         capture = self.pg1.get_capture(len(pkts))
1420         self.verify_capture_out(capture, nat_ip, True)
1421
1422     def test_static_with_port_in(self):
1423         """ 1:1 NAPT initialized from inside network """
1424
1425         self.tcp_port_out = 3606
1426         self.udp_port_out = 3607
1427         self.icmp_id_out = 3608
1428
1429         self.nat44_add_address(self.nat_addr)
1430         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1431                                       self.tcp_port_in, self.tcp_port_out,
1432                                       proto=IP_PROTOS.tcp)
1433         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1434                                       self.udp_port_in, self.udp_port_out,
1435                                       proto=IP_PROTOS.udp)
1436         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1437                                       self.icmp_id_in, self.icmp_id_out,
1438                                       proto=IP_PROTOS.icmp)
1439         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1440         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1441                                                   is_inside=0)
1442
1443         # in2out
1444         pkts = self.create_stream_in(self.pg0, self.pg1)
1445         self.pg0.add_stream(pkts)
1446         self.pg_enable_capture(self.pg_interfaces)
1447         self.pg_start()
1448         capture = self.pg1.get_capture(len(pkts))
1449         self.verify_capture_out(capture)
1450
1451         # out2in
1452         pkts = self.create_stream_out(self.pg1)
1453         self.pg1.add_stream(pkts)
1454         self.pg_enable_capture(self.pg_interfaces)
1455         self.pg_start()
1456         capture = self.pg0.get_capture(len(pkts))
1457         self.verify_capture_in(capture, self.pg0)
1458
1459     def test_static_with_port_out(self):
1460         """ 1:1 NAPT initialized from outside network """
1461
1462         self.tcp_port_out = 30606
1463         self.udp_port_out = 30607
1464         self.icmp_id_out = 30608
1465
1466         self.nat44_add_address(self.nat_addr)
1467         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1468                                       self.tcp_port_in, self.tcp_port_out,
1469                                       proto=IP_PROTOS.tcp)
1470         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1471                                       self.udp_port_in, self.udp_port_out,
1472                                       proto=IP_PROTOS.udp)
1473         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1474                                       self.icmp_id_in, self.icmp_id_out,
1475                                       proto=IP_PROTOS.icmp)
1476         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1477         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1478                                                   is_inside=0)
1479
1480         # out2in
1481         pkts = self.create_stream_out(self.pg1)
1482         self.pg1.add_stream(pkts)
1483         self.pg_enable_capture(self.pg_interfaces)
1484         self.pg_start()
1485         capture = self.pg0.get_capture(len(pkts))
1486         self.verify_capture_in(capture, self.pg0)
1487
1488         # in2out
1489         pkts = self.create_stream_in(self.pg0, self.pg1)
1490         self.pg0.add_stream(pkts)
1491         self.pg_enable_capture(self.pg_interfaces)
1492         self.pg_start()
1493         capture = self.pg1.get_capture(len(pkts))
1494         self.verify_capture_out(capture)
1495
1496     def test_static_vrf_aware(self):
1497         """ 1:1 NAT VRF awareness """
1498
1499         nat_ip1 = "10.0.0.30"
1500         nat_ip2 = "10.0.0.40"
1501         self.tcp_port_out = 6303
1502         self.udp_port_out = 6304
1503         self.icmp_id_out = 6305
1504
1505         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1506                                       vrf_id=10)
1507         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1508                                       vrf_id=10)
1509         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1510                                                   is_inside=0)
1511         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1512         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1513
1514         # inside interface VRF match NAT44 static mapping VRF
1515         pkts = self.create_stream_in(self.pg4, self.pg3)
1516         self.pg4.add_stream(pkts)
1517         self.pg_enable_capture(self.pg_interfaces)
1518         self.pg_start()
1519         capture = self.pg3.get_capture(len(pkts))
1520         self.verify_capture_out(capture, nat_ip1, True)
1521
1522         # inside interface VRF don't match NAT44 static mapping VRF (packets
1523         # are dropped)
1524         pkts = self.create_stream_in(self.pg0, self.pg3)
1525         self.pg0.add_stream(pkts)
1526         self.pg_enable_capture(self.pg_interfaces)
1527         self.pg_start()
1528         self.pg3.assert_nothing_captured()
1529
1530     def test_identity_nat(self):
1531         """ Identity NAT """
1532
1533         self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1534         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1535         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1536                                                   is_inside=0)
1537
1538         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1539              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1540              TCP(sport=12345, dport=56789))
1541         self.pg1.add_stream(p)
1542         self.pg_enable_capture(self.pg_interfaces)
1543         self.pg_start()
1544         capture = self.pg0.get_capture(1)
1545         p = capture[0]
1546         try:
1547             ip = p[IP]
1548             tcp = p[TCP]
1549             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1550             self.assertEqual(ip.src, self.pg1.remote_ip4)
1551             self.assertEqual(tcp.dport, 56789)
1552             self.assertEqual(tcp.sport, 12345)
1553             self.check_tcp_checksum(p)
1554             self.check_ip_checksum(p)
1555         except:
1556             self.logger.error(ppp("Unexpected or invalid packet:", p))
1557             raise
1558
1559     def test_static_lb(self):
1560         """ NAT44 local service load balancing """
1561         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1562         external_port = 80
1563         local_port = 8080
1564         server1 = self.pg0.remote_hosts[0]
1565         server2 = self.pg0.remote_hosts[1]
1566
1567         locals = [{'addr': server1.ip4n,
1568                    'port': local_port,
1569                    'probability': 70},
1570                   {'addr': server2.ip4n,
1571                    'port': local_port,
1572                    'probability': 30}]
1573
1574         self.nat44_add_address(self.nat_addr)
1575         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1576                                                   external_port,
1577                                                   IP_PROTOS.tcp,
1578                                                   local_num=len(locals),
1579                                                   locals=locals)
1580         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1581         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1582                                                   is_inside=0)
1583
1584         # from client to service
1585         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1586              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1587              TCP(sport=12345, dport=external_port))
1588         self.pg1.add_stream(p)
1589         self.pg_enable_capture(self.pg_interfaces)
1590         self.pg_start()
1591         capture = self.pg0.get_capture(1)
1592         p = capture[0]
1593         server = None
1594         try:
1595             ip = p[IP]
1596             tcp = p[TCP]
1597             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1598             if ip.dst == server1.ip4:
1599                 server = server1
1600             else:
1601                 server = server2
1602             self.assertEqual(tcp.dport, local_port)
1603             self.check_tcp_checksum(p)
1604             self.check_ip_checksum(p)
1605         except:
1606             self.logger.error(ppp("Unexpected or invalid packet:", p))
1607             raise
1608
1609         # from service back to client
1610         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1611              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1612              TCP(sport=local_port, dport=12345))
1613         self.pg0.add_stream(p)
1614         self.pg_enable_capture(self.pg_interfaces)
1615         self.pg_start()
1616         capture = self.pg1.get_capture(1)
1617         p = capture[0]
1618         try:
1619             ip = p[IP]
1620             tcp = p[TCP]
1621             self.assertEqual(ip.src, self.nat_addr)
1622             self.assertEqual(tcp.sport, external_port)
1623             self.check_tcp_checksum(p)
1624             self.check_ip_checksum(p)
1625         except:
1626             self.logger.error(ppp("Unexpected or invalid packet:", p))
1627             raise
1628
1629         # multiple clients
1630         server1_n = 0
1631         server2_n = 0
1632         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1633         pkts = []
1634         for client in clients:
1635             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1636                  IP(src=client, dst=self.nat_addr) /
1637                  TCP(sport=12345, dport=external_port))
1638             pkts.append(p)
1639         self.pg1.add_stream(pkts)
1640         self.pg_enable_capture(self.pg_interfaces)
1641         self.pg_start()
1642         capture = self.pg0.get_capture(len(pkts))
1643         for p in capture:
1644             if p[IP].dst == server1.ip4:
1645                 server1_n += 1
1646             else:
1647                 server2_n += 1
1648         self.assertTrue(server1_n > server2_n)
1649
1650     def test_static_lb_2(self):
1651         """ NAT44 local service load balancing (asymmetrical rule) """
1652         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1653         external_port = 80
1654         local_port = 8080
1655         server1 = self.pg0.remote_hosts[0]
1656         server2 = self.pg0.remote_hosts[1]
1657
1658         locals = [{'addr': server1.ip4n,
1659                    'port': local_port,
1660                    'probability': 70},
1661                   {'addr': server2.ip4n,
1662                    'port': local_port,
1663                    'probability': 30}]
1664
1665         self.vapi.nat44_forwarding_enable_disable(1)
1666         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1667                                                   external_port,
1668                                                   IP_PROTOS.tcp,
1669                                                   out2in_only=1,
1670                                                   local_num=len(locals),
1671                                                   locals=locals)
1672         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1673         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1674                                                   is_inside=0)
1675
1676         # from client to service
1677         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1678              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1679              TCP(sport=12345, dport=external_port))
1680         self.pg1.add_stream(p)
1681         self.pg_enable_capture(self.pg_interfaces)
1682         self.pg_start()
1683         capture = self.pg0.get_capture(1)
1684         p = capture[0]
1685         server = None
1686         try:
1687             ip = p[IP]
1688             tcp = p[TCP]
1689             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1690             if ip.dst == server1.ip4:
1691                 server = server1
1692             else:
1693                 server = server2
1694             self.assertEqual(tcp.dport, local_port)
1695             self.check_tcp_checksum(p)
1696             self.check_ip_checksum(p)
1697         except:
1698             self.logger.error(ppp("Unexpected or invalid packet:", p))
1699             raise
1700
1701         # from service back to client
1702         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1703              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1704              TCP(sport=local_port, dport=12345))
1705         self.pg0.add_stream(p)
1706         self.pg_enable_capture(self.pg_interfaces)
1707         self.pg_start()
1708         capture = self.pg1.get_capture(1)
1709         p = capture[0]
1710         try:
1711             ip = p[IP]
1712             tcp = p[TCP]
1713             self.assertEqual(ip.src, self.nat_addr)
1714             self.assertEqual(tcp.sport, external_port)
1715             self.check_tcp_checksum(p)
1716             self.check_ip_checksum(p)
1717         except:
1718             self.logger.error(ppp("Unexpected or invalid packet:", p))
1719             raise
1720
1721         # from client to server (no translation)
1722         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1723              IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1724              TCP(sport=12346, dport=local_port))
1725         self.pg1.add_stream(p)
1726         self.pg_enable_capture(self.pg_interfaces)
1727         self.pg_start()
1728         capture = self.pg0.get_capture(1)
1729         p = capture[0]
1730         server = None
1731         try:
1732             ip = p[IP]
1733             tcp = p[TCP]
1734             self.assertEqual(ip.dst, server1.ip4)
1735             self.assertEqual(tcp.dport, local_port)
1736             self.check_tcp_checksum(p)
1737             self.check_ip_checksum(p)
1738         except:
1739             self.logger.error(ppp("Unexpected or invalid packet:", p))
1740             raise
1741
1742         # from service back to client (no translation)
1743         p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1744              IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1745              TCP(sport=local_port, dport=12346))
1746         self.pg0.add_stream(p)
1747         self.pg_enable_capture(self.pg_interfaces)
1748         self.pg_start()
1749         capture = self.pg1.get_capture(1)
1750         p = capture[0]
1751         try:
1752             ip = p[IP]
1753             tcp = p[TCP]
1754             self.assertEqual(ip.src, server1.ip4)
1755             self.assertEqual(tcp.sport, local_port)
1756             self.check_tcp_checksum(p)
1757             self.check_ip_checksum(p)
1758         except:
1759             self.logger.error(ppp("Unexpected or invalid packet:", p))
1760             raise
1761
1762     def test_multiple_inside_interfaces(self):
1763         """ NAT44 multiple non-overlapping address space inside interfaces """
1764
1765         self.nat44_add_address(self.nat_addr)
1766         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1767         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1768         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1769                                                   is_inside=0)
1770
1771         # between two NAT44 inside interfaces (no translation)
1772         pkts = self.create_stream_in(self.pg0, self.pg1)
1773         self.pg0.add_stream(pkts)
1774         self.pg_enable_capture(self.pg_interfaces)
1775         self.pg_start()
1776         capture = self.pg1.get_capture(len(pkts))
1777         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1778
1779         # from NAT44 inside to interface without NAT44 feature (no translation)
1780         pkts = self.create_stream_in(self.pg0, self.pg2)
1781         self.pg0.add_stream(pkts)
1782         self.pg_enable_capture(self.pg_interfaces)
1783         self.pg_start()
1784         capture = self.pg2.get_capture(len(pkts))
1785         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1786
1787         # in2out 1st interface
1788         pkts = self.create_stream_in(self.pg0, self.pg3)
1789         self.pg0.add_stream(pkts)
1790         self.pg_enable_capture(self.pg_interfaces)
1791         self.pg_start()
1792         capture = self.pg3.get_capture(len(pkts))
1793         self.verify_capture_out(capture)
1794
1795         # out2in 1st interface
1796         pkts = self.create_stream_out(self.pg3)
1797         self.pg3.add_stream(pkts)
1798         self.pg_enable_capture(self.pg_interfaces)
1799         self.pg_start()
1800         capture = self.pg0.get_capture(len(pkts))
1801         self.verify_capture_in(capture, self.pg0)
1802
1803         # in2out 2nd interface
1804         pkts = self.create_stream_in(self.pg1, self.pg3)
1805         self.pg1.add_stream(pkts)
1806         self.pg_enable_capture(self.pg_interfaces)
1807         self.pg_start()
1808         capture = self.pg3.get_capture(len(pkts))
1809         self.verify_capture_out(capture)
1810
1811         # out2in 2nd interface
1812         pkts = self.create_stream_out(self.pg3)
1813         self.pg3.add_stream(pkts)
1814         self.pg_enable_capture(self.pg_interfaces)
1815         self.pg_start()
1816         capture = self.pg1.get_capture(len(pkts))
1817         self.verify_capture_in(capture, self.pg1)
1818
1819     def test_inside_overlapping_interfaces(self):
1820         """ NAT44 multiple inside interfaces with overlapping address space """
1821
1822         static_nat_ip = "10.0.0.10"
1823         self.nat44_add_address(self.nat_addr)
1824         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1825                                                   is_inside=0)
1826         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1827         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1828         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1829         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1830                                       vrf_id=20)
1831
1832         # between NAT44 inside interfaces with same VRF (no translation)
1833         pkts = self.create_stream_in(self.pg4, self.pg5)
1834         self.pg4.add_stream(pkts)
1835         self.pg_enable_capture(self.pg_interfaces)
1836         self.pg_start()
1837         capture = self.pg5.get_capture(len(pkts))
1838         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1839
1840         # between NAT44 inside interfaces with different VRF (hairpinning)
1841         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1842              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1843              TCP(sport=1234, dport=5678))
1844         self.pg4.add_stream(p)
1845         self.pg_enable_capture(self.pg_interfaces)
1846         self.pg_start()
1847         capture = self.pg6.get_capture(1)
1848         p = capture[0]
1849         try:
1850             ip = p[IP]
1851             tcp = p[TCP]
1852             self.assertEqual(ip.src, self.nat_addr)
1853             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1854             self.assertNotEqual(tcp.sport, 1234)
1855             self.assertEqual(tcp.dport, 5678)
1856         except:
1857             self.logger.error(ppp("Unexpected or invalid packet:", p))
1858             raise
1859
1860         # in2out 1st interface
1861         pkts = self.create_stream_in(self.pg4, self.pg3)
1862         self.pg4.add_stream(pkts)
1863         self.pg_enable_capture(self.pg_interfaces)
1864         self.pg_start()
1865         capture = self.pg3.get_capture(len(pkts))
1866         self.verify_capture_out(capture)
1867
1868         # out2in 1st interface
1869         pkts = self.create_stream_out(self.pg3)
1870         self.pg3.add_stream(pkts)
1871         self.pg_enable_capture(self.pg_interfaces)
1872         self.pg_start()
1873         capture = self.pg4.get_capture(len(pkts))
1874         self.verify_capture_in(capture, self.pg4)
1875
1876         # in2out 2nd interface
1877         pkts = self.create_stream_in(self.pg5, self.pg3)
1878         self.pg5.add_stream(pkts)
1879         self.pg_enable_capture(self.pg_interfaces)
1880         self.pg_start()
1881         capture = self.pg3.get_capture(len(pkts))
1882         self.verify_capture_out(capture)
1883
1884         # out2in 2nd interface
1885         pkts = self.create_stream_out(self.pg3)
1886         self.pg3.add_stream(pkts)
1887         self.pg_enable_capture(self.pg_interfaces)
1888         self.pg_start()
1889         capture = self.pg5.get_capture(len(pkts))
1890         self.verify_capture_in(capture, self.pg5)
1891
1892         # pg5 session dump
1893         addresses = self.vapi.nat44_address_dump()
1894         self.assertEqual(len(addresses), 1)
1895         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1896         self.assertEqual(len(sessions), 3)
1897         for session in sessions:
1898             self.assertFalse(session.is_static)
1899             self.assertEqual(session.inside_ip_address[0:4],
1900                              self.pg5.remote_ip4n)
1901             self.assertEqual(session.outside_ip_address,
1902                              addresses[0].ip_address)
1903         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1904         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1905         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1906         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1907         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1908         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1909         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1910         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1911         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1912
1913         # in2out 3rd interface
1914         pkts = self.create_stream_in(self.pg6, self.pg3)
1915         self.pg6.add_stream(pkts)
1916         self.pg_enable_capture(self.pg_interfaces)
1917         self.pg_start()
1918         capture = self.pg3.get_capture(len(pkts))
1919         self.verify_capture_out(capture, static_nat_ip, True)
1920
1921         # out2in 3rd interface
1922         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1923         self.pg3.add_stream(pkts)
1924         self.pg_enable_capture(self.pg_interfaces)
1925         self.pg_start()
1926         capture = self.pg6.get_capture(len(pkts))
1927         self.verify_capture_in(capture, self.pg6)
1928
1929         # general user and session dump verifications
1930         users = self.vapi.nat44_user_dump()
1931         self.assertTrue(len(users) >= 3)
1932         addresses = self.vapi.nat44_address_dump()
1933         self.assertEqual(len(addresses), 1)
1934         for user in users:
1935             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1936                                                          user.vrf_id)
1937             for session in sessions:
1938                 self.assertEqual(user.ip_address, session.inside_ip_address)
1939                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1940                 self.assertTrue(session.protocol in
1941                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1942                                  IP_PROTOS.icmp])
1943
1944         # pg4 session dump
1945         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1946         self.assertTrue(len(sessions) >= 4)
1947         for session in sessions:
1948             self.assertFalse(session.is_static)
1949             self.assertEqual(session.inside_ip_address[0:4],
1950                              self.pg4.remote_ip4n)
1951             self.assertEqual(session.outside_ip_address,
1952                              addresses[0].ip_address)
1953
1954         # pg6 session dump
1955         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1956         self.assertTrue(len(sessions) >= 3)
1957         for session in sessions:
1958             self.assertTrue(session.is_static)
1959             self.assertEqual(session.inside_ip_address[0:4],
1960                              self.pg6.remote_ip4n)
1961             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1962                              map(int, static_nat_ip.split('.')))
1963             self.assertTrue(session.inside_port in
1964                             [self.tcp_port_in, self.udp_port_in,
1965                              self.icmp_id_in])
1966
1967     def test_hairpinning(self):
1968         """ NAT44 hairpinning - 1:1 NAPT """
1969
1970         host = self.pg0.remote_hosts[0]
1971         server = self.pg0.remote_hosts[1]
1972         host_in_port = 1234
1973         host_out_port = 0
1974         server_in_port = 5678
1975         server_out_port = 8765
1976
1977         self.nat44_add_address(self.nat_addr)
1978         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1979         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1980                                                   is_inside=0)
1981         # add static mapping for server
1982         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1983                                       server_in_port, server_out_port,
1984                                       proto=IP_PROTOS.tcp)
1985
1986         # send packet from host to server
1987         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1988              IP(src=host.ip4, dst=self.nat_addr) /
1989              TCP(sport=host_in_port, dport=server_out_port))
1990         self.pg0.add_stream(p)
1991         self.pg_enable_capture(self.pg_interfaces)
1992         self.pg_start()
1993         capture = self.pg0.get_capture(1)
1994         p = capture[0]
1995         try:
1996             ip = p[IP]
1997             tcp = p[TCP]
1998             self.assertEqual(ip.src, self.nat_addr)
1999             self.assertEqual(ip.dst, server.ip4)
2000             self.assertNotEqual(tcp.sport, host_in_port)
2001             self.assertEqual(tcp.dport, server_in_port)
2002             self.check_tcp_checksum(p)
2003             host_out_port = tcp.sport
2004         except:
2005             self.logger.error(ppp("Unexpected or invalid packet:", p))
2006             raise
2007
2008         # send reply from server to host
2009         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2010              IP(src=server.ip4, dst=self.nat_addr) /
2011              TCP(sport=server_in_port, dport=host_out_port))
2012         self.pg0.add_stream(p)
2013         self.pg_enable_capture(self.pg_interfaces)
2014         self.pg_start()
2015         capture = self.pg0.get_capture(1)
2016         p = capture[0]
2017         try:
2018             ip = p[IP]
2019             tcp = p[TCP]
2020             self.assertEqual(ip.src, self.nat_addr)
2021             self.assertEqual(ip.dst, host.ip4)
2022             self.assertEqual(tcp.sport, server_out_port)
2023             self.assertEqual(tcp.dport, host_in_port)
2024             self.check_tcp_checksum(p)
2025         except:
2026             self.logger.error(ppp("Unexpected or invalid packet:"), p)
2027             raise
2028
2029     def test_hairpinning2(self):
2030         """ NAT44 hairpinning - 1:1 NAT"""
2031
2032         server1_nat_ip = "10.0.0.10"
2033         server2_nat_ip = "10.0.0.11"
2034         host = self.pg0.remote_hosts[0]
2035         server1 = self.pg0.remote_hosts[1]
2036         server2 = self.pg0.remote_hosts[2]
2037         server_tcp_port = 22
2038         server_udp_port = 20
2039
2040         self.nat44_add_address(self.nat_addr)
2041         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2042         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2043                                                   is_inside=0)
2044
2045         # add static mapping for servers
2046         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2047         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2048
2049         # host to server1
2050         pkts = []
2051         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2052              IP(src=host.ip4, dst=server1_nat_ip) /
2053              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2054         pkts.append(p)
2055         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2056              IP(src=host.ip4, dst=server1_nat_ip) /
2057              UDP(sport=self.udp_port_in, dport=server_udp_port))
2058         pkts.append(p)
2059         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2060              IP(src=host.ip4, dst=server1_nat_ip) /
2061              ICMP(id=self.icmp_id_in, type='echo-request'))
2062         pkts.append(p)
2063         self.pg0.add_stream(pkts)
2064         self.pg_enable_capture(self.pg_interfaces)
2065         self.pg_start()
2066         capture = self.pg0.get_capture(len(pkts))
2067         for packet in capture:
2068             try:
2069                 self.assertEqual(packet[IP].src, self.nat_addr)
2070                 self.assertEqual(packet[IP].dst, server1.ip4)
2071                 if packet.haslayer(TCP):
2072                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2073                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2074                     self.tcp_port_out = packet[TCP].sport
2075                     self.check_tcp_checksum(packet)
2076                 elif packet.haslayer(UDP):
2077                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2078                     self.assertEqual(packet[UDP].dport, server_udp_port)
2079                     self.udp_port_out = packet[UDP].sport
2080                 else:
2081                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2082                     self.icmp_id_out = packet[ICMP].id
2083             except:
2084                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2085                 raise
2086
2087         # server1 to host
2088         pkts = []
2089         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2090              IP(src=server1.ip4, dst=self.nat_addr) /
2091              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2092         pkts.append(p)
2093         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2094              IP(src=server1.ip4, dst=self.nat_addr) /
2095              UDP(sport=server_udp_port, dport=self.udp_port_out))
2096         pkts.append(p)
2097         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2098              IP(src=server1.ip4, dst=self.nat_addr) /
2099              ICMP(id=self.icmp_id_out, type='echo-reply'))
2100         pkts.append(p)
2101         self.pg0.add_stream(pkts)
2102         self.pg_enable_capture(self.pg_interfaces)
2103         self.pg_start()
2104         capture = self.pg0.get_capture(len(pkts))
2105         for packet in capture:
2106             try:
2107                 self.assertEqual(packet[IP].src, server1_nat_ip)
2108                 self.assertEqual(packet[IP].dst, host.ip4)
2109                 if packet.haslayer(TCP):
2110                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2111                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2112                     self.check_tcp_checksum(packet)
2113                 elif packet.haslayer(UDP):
2114                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2115                     self.assertEqual(packet[UDP].sport, server_udp_port)
2116                 else:
2117                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2118             except:
2119                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2120                 raise
2121
2122         # server2 to server1
2123         pkts = []
2124         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2125              IP(src=server2.ip4, dst=server1_nat_ip) /
2126              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2127         pkts.append(p)
2128         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2129              IP(src=server2.ip4, dst=server1_nat_ip) /
2130              UDP(sport=self.udp_port_in, dport=server_udp_port))
2131         pkts.append(p)
2132         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2133              IP(src=server2.ip4, dst=server1_nat_ip) /
2134              ICMP(id=self.icmp_id_in, type='echo-request'))
2135         pkts.append(p)
2136         self.pg0.add_stream(pkts)
2137         self.pg_enable_capture(self.pg_interfaces)
2138         self.pg_start()
2139         capture = self.pg0.get_capture(len(pkts))
2140         for packet in capture:
2141             try:
2142                 self.assertEqual(packet[IP].src, server2_nat_ip)
2143                 self.assertEqual(packet[IP].dst, server1.ip4)
2144                 if packet.haslayer(TCP):
2145                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2146                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2147                     self.tcp_port_out = packet[TCP].sport
2148                     self.check_tcp_checksum(packet)
2149                 elif packet.haslayer(UDP):
2150                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
2151                     self.assertEqual(packet[UDP].dport, server_udp_port)
2152                     self.udp_port_out = packet[UDP].sport
2153                 else:
2154                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2155                     self.icmp_id_out = packet[ICMP].id
2156             except:
2157                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2158                 raise
2159
2160         # server1 to server2
2161         pkts = []
2162         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2163              IP(src=server1.ip4, dst=server2_nat_ip) /
2164              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2165         pkts.append(p)
2166         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2167              IP(src=server1.ip4, dst=server2_nat_ip) /
2168              UDP(sport=server_udp_port, dport=self.udp_port_out))
2169         pkts.append(p)
2170         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2171              IP(src=server1.ip4, dst=server2_nat_ip) /
2172              ICMP(id=self.icmp_id_out, type='echo-reply'))
2173         pkts.append(p)
2174         self.pg0.add_stream(pkts)
2175         self.pg_enable_capture(self.pg_interfaces)
2176         self.pg_start()
2177         capture = self.pg0.get_capture(len(pkts))
2178         for packet in capture:
2179             try:
2180                 self.assertEqual(packet[IP].src, server1_nat_ip)
2181                 self.assertEqual(packet[IP].dst, server2.ip4)
2182                 if packet.haslayer(TCP):
2183                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2184                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2185                     self.check_tcp_checksum(packet)
2186                 elif packet.haslayer(UDP):
2187                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2188                     self.assertEqual(packet[UDP].sport, server_udp_port)
2189                 else:
2190                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2191             except:
2192                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2193                 raise
2194
2195     def test_max_translations_per_user(self):
2196         """ MAX translations per user - recycle the least recently used """
2197
2198         self.nat44_add_address(self.nat_addr)
2199         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2200         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2201                                                   is_inside=0)
2202
2203         # get maximum number of translations per user
2204         nat44_config = self.vapi.nat_show_config()
2205
2206         # send more than maximum number of translations per user packets
2207         pkts_num = nat44_config.max_translations_per_user + 5
2208         pkts = []
2209         for port in range(0, pkts_num):
2210             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2211                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2212                  TCP(sport=1025 + port))
2213             pkts.append(p)
2214         self.pg0.add_stream(pkts)
2215         self.pg_enable_capture(self.pg_interfaces)
2216         self.pg_start()
2217
2218         # verify number of translated packet
2219         self.pg1.get_capture(pkts_num)
2220
2221     def test_interface_addr(self):
2222         """ Acquire NAT44 addresses from interface """
2223         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2224
2225         # no address in NAT pool
2226         adresses = self.vapi.nat44_address_dump()
2227         self.assertEqual(0, len(adresses))
2228
2229         # configure interface address and check NAT address pool
2230         self.pg7.config_ip4()
2231         adresses = self.vapi.nat44_address_dump()
2232         self.assertEqual(1, len(adresses))
2233         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2234
2235         # remove interface address and check NAT address pool
2236         self.pg7.unconfig_ip4()
2237         adresses = self.vapi.nat44_address_dump()
2238         self.assertEqual(0, len(adresses))
2239
2240     def test_interface_addr_static_mapping(self):
2241         """ Static mapping with addresses from interface """
2242         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2243         self.nat44_add_static_mapping(
2244             '1.2.3.4',
2245             external_sw_if_index=self.pg7.sw_if_index)
2246
2247         # static mappings with external interface
2248         static_mappings = self.vapi.nat44_static_mapping_dump()
2249         self.assertEqual(1, len(static_mappings))
2250         self.assertEqual(self.pg7.sw_if_index,
2251                          static_mappings[0].external_sw_if_index)
2252
2253         # configure interface address and check static mappings
2254         self.pg7.config_ip4()
2255         static_mappings = self.vapi.nat44_static_mapping_dump()
2256         self.assertEqual(1, len(static_mappings))
2257         self.assertEqual(static_mappings[0].external_ip_address[0:4],
2258                          self.pg7.local_ip4n)
2259         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2260
2261         # remove interface address and check static mappings
2262         self.pg7.unconfig_ip4()
2263         static_mappings = self.vapi.nat44_static_mapping_dump()
2264         self.assertEqual(0, len(static_mappings))
2265
2266     def test_interface_addr_identity_nat(self):
2267         """ Identity NAT with addresses from interface """
2268
2269         port = 53053
2270         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2271         self.vapi.nat44_add_del_identity_mapping(
2272             sw_if_index=self.pg7.sw_if_index,
2273             port=port,
2274             protocol=IP_PROTOS.tcp,
2275             addr_only=0)
2276
2277         # identity mappings with external interface
2278         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2279         self.assertEqual(1, len(identity_mappings))
2280         self.assertEqual(self.pg7.sw_if_index,
2281                          identity_mappings[0].sw_if_index)
2282
2283         # configure interface address and check identity mappings
2284         self.pg7.config_ip4()
2285         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2286         self.assertEqual(1, len(identity_mappings))
2287         self.assertEqual(identity_mappings[0].ip_address,
2288                          self.pg7.local_ip4n)
2289         self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2290         self.assertEqual(port, identity_mappings[0].port)
2291         self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2292
2293         # remove interface address and check identity mappings
2294         self.pg7.unconfig_ip4()
2295         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2296         self.assertEqual(0, len(identity_mappings))
2297
2298     def test_ipfix_nat44_sess(self):
2299         """ IPFIX logging NAT44 session created/delted """
2300         self.ipfix_domain_id = 10
2301         self.ipfix_src_port = 20202
2302         colector_port = 30303
2303         bind_layers(UDP, IPFIX, dport=30303)
2304         self.nat44_add_address(self.nat_addr)
2305         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2306         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2307                                                   is_inside=0)
2308         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2309                                      src_address=self.pg3.local_ip4n,
2310                                      path_mtu=512,
2311                                      template_interval=10,
2312                                      collector_port=colector_port)
2313         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2314                             src_port=self.ipfix_src_port)
2315
2316         pkts = self.create_stream_in(self.pg0, self.pg1)
2317         self.pg0.add_stream(pkts)
2318         self.pg_enable_capture(self.pg_interfaces)
2319         self.pg_start()
2320         capture = self.pg1.get_capture(len(pkts))
2321         self.verify_capture_out(capture)
2322         self.nat44_add_address(self.nat_addr, is_add=0)
2323         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2324         capture = self.pg3.get_capture(9)
2325         ipfix = IPFIXDecoder()
2326         # first load template
2327         for p in capture:
2328             self.assertTrue(p.haslayer(IPFIX))
2329             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2330             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2331             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2332             self.assertEqual(p[UDP].dport, colector_port)
2333             self.assertEqual(p[IPFIX].observationDomainID,
2334                              self.ipfix_domain_id)
2335             if p.haslayer(Template):
2336                 ipfix.add_template(p.getlayer(Template))
2337         # verify events in data set
2338         for p in capture:
2339             if p.haslayer(Data):
2340                 data = ipfix.decode_data_set(p.getlayer(Set))
2341                 self.verify_ipfix_nat44_ses(data)
2342
2343     def test_ipfix_addr_exhausted(self):
2344         """ IPFIX logging NAT addresses exhausted """
2345         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2346         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2347                                                   is_inside=0)
2348         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2349                                      src_address=self.pg3.local_ip4n,
2350                                      path_mtu=512,
2351                                      template_interval=10)
2352         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2353                             src_port=self.ipfix_src_port)
2354
2355         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2356              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2357              TCP(sport=3025))
2358         self.pg0.add_stream(p)
2359         self.pg_enable_capture(self.pg_interfaces)
2360         self.pg_start()
2361         capture = self.pg1.get_capture(0)
2362         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2363         capture = self.pg3.get_capture(9)
2364         ipfix = IPFIXDecoder()
2365         # first load template
2366         for p in capture:
2367             self.assertTrue(p.haslayer(IPFIX))
2368             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2369             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2370             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2371             self.assertEqual(p[UDP].dport, 4739)
2372             self.assertEqual(p[IPFIX].observationDomainID,
2373                              self.ipfix_domain_id)
2374             if p.haslayer(Template):
2375                 ipfix.add_template(p.getlayer(Template))
2376         # verify events in data set
2377         for p in capture:
2378             if p.haslayer(Data):
2379                 data = ipfix.decode_data_set(p.getlayer(Set))
2380                 self.verify_ipfix_addr_exhausted(data)
2381
2382     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2383     def test_ipfix_max_sessions(self):
2384         """ IPFIX logging maximum session entries exceeded """
2385         self.nat44_add_address(self.nat_addr)
2386         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2387         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2388                                                   is_inside=0)
2389
2390         nat44_config = self.vapi.nat_show_config()
2391         max_sessions = 10 * nat44_config.translation_buckets
2392
2393         pkts = []
2394         for i in range(0, max_sessions):
2395             src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2396             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2397                  IP(src=src, dst=self.pg1.remote_ip4) /
2398                  TCP(sport=1025))
2399             pkts.append(p)
2400         self.pg0.add_stream(pkts)
2401         self.pg_enable_capture(self.pg_interfaces)
2402         self.pg_start()
2403
2404         self.pg1.get_capture(max_sessions)
2405         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2406                                      src_address=self.pg3.local_ip4n,
2407                                      path_mtu=512,
2408                                      template_interval=10)
2409         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2410                             src_port=self.ipfix_src_port)
2411
2412         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2413              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2414              TCP(sport=1025))
2415         self.pg0.add_stream(p)
2416         self.pg_enable_capture(self.pg_interfaces)
2417         self.pg_start()
2418         self.pg1.get_capture(0)
2419         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2420         capture = self.pg3.get_capture(9)
2421         ipfix = IPFIXDecoder()
2422         # first load template
2423         for p in capture:
2424             self.assertTrue(p.haslayer(IPFIX))
2425             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2426             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2427             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2428             self.assertEqual(p[UDP].dport, 4739)
2429             self.assertEqual(p[IPFIX].observationDomainID,
2430                              self.ipfix_domain_id)
2431             if p.haslayer(Template):
2432                 ipfix.add_template(p.getlayer(Template))
2433         # verify events in data set
2434         for p in capture:
2435             if p.haslayer(Data):
2436                 data = ipfix.decode_data_set(p.getlayer(Set))
2437                 self.verify_ipfix_max_sessions(data, max_sessions)
2438
2439     def test_pool_addr_fib(self):
2440         """ NAT44 add pool addresses to FIB """
2441         static_addr = '10.0.0.10'
2442         self.nat44_add_address(self.nat_addr)
2443         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2444         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2445                                                   is_inside=0)
2446         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2447
2448         # NAT44 address
2449         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2450              ARP(op=ARP.who_has, pdst=self.nat_addr,
2451                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2452         self.pg1.add_stream(p)
2453         self.pg_enable_capture(self.pg_interfaces)
2454         self.pg_start()
2455         capture = self.pg1.get_capture(1)
2456         self.assertTrue(capture[0].haslayer(ARP))
2457         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2458
2459         # 1:1 NAT address
2460         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2461              ARP(op=ARP.who_has, pdst=static_addr,
2462                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2463         self.pg1.add_stream(p)
2464         self.pg_enable_capture(self.pg_interfaces)
2465         self.pg_start()
2466         capture = self.pg1.get_capture(1)
2467         self.assertTrue(capture[0].haslayer(ARP))
2468         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2469
2470         # send ARP to non-NAT44 interface
2471         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2472              ARP(op=ARP.who_has, pdst=self.nat_addr,
2473                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2474         self.pg2.add_stream(p)
2475         self.pg_enable_capture(self.pg_interfaces)
2476         self.pg_start()
2477         capture = self.pg1.get_capture(0)
2478
2479         # remove addresses and verify
2480         self.nat44_add_address(self.nat_addr, is_add=0)
2481         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2482                                       is_add=0)
2483
2484         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2485              ARP(op=ARP.who_has, pdst=self.nat_addr,
2486                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2487         self.pg1.add_stream(p)
2488         self.pg_enable_capture(self.pg_interfaces)
2489         self.pg_start()
2490         capture = self.pg1.get_capture(0)
2491
2492         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2493              ARP(op=ARP.who_has, pdst=static_addr,
2494                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2495         self.pg1.add_stream(p)
2496         self.pg_enable_capture(self.pg_interfaces)
2497         self.pg_start()
2498         capture = self.pg1.get_capture(0)
2499
2500     def test_vrf_mode(self):
2501         """ NAT44 tenant VRF aware address pool mode """
2502
2503         vrf_id1 = 1
2504         vrf_id2 = 2
2505         nat_ip1 = "10.0.0.10"
2506         nat_ip2 = "10.0.0.11"
2507
2508         self.pg0.unconfig_ip4()
2509         self.pg1.unconfig_ip4()
2510         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2511         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2512         self.pg0.set_table_ip4(vrf_id1)
2513         self.pg1.set_table_ip4(vrf_id2)
2514         self.pg0.config_ip4()
2515         self.pg1.config_ip4()
2516
2517         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2518         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2519         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2520         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2521         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2522                                                   is_inside=0)
2523
2524         # first VRF
2525         pkts = self.create_stream_in(self.pg0, self.pg2)
2526         self.pg0.add_stream(pkts)
2527         self.pg_enable_capture(self.pg_interfaces)
2528         self.pg_start()
2529         capture = self.pg2.get_capture(len(pkts))
2530         self.verify_capture_out(capture, nat_ip1)
2531
2532         # second VRF
2533         pkts = self.create_stream_in(self.pg1, self.pg2)
2534         self.pg1.add_stream(pkts)
2535         self.pg_enable_capture(self.pg_interfaces)
2536         self.pg_start()
2537         capture = self.pg2.get_capture(len(pkts))
2538         self.verify_capture_out(capture, nat_ip2)
2539
2540         self.pg0.unconfig_ip4()
2541         self.pg1.unconfig_ip4()
2542         self.pg0.set_table_ip4(0)
2543         self.pg1.set_table_ip4(0)
2544         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2545         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2546
2547     def test_vrf_feature_independent(self):
2548         """ NAT44 tenant VRF independent address pool mode """
2549
2550         nat_ip1 = "10.0.0.10"
2551         nat_ip2 = "10.0.0.11"
2552
2553         self.nat44_add_address(nat_ip1)
2554         self.nat44_add_address(nat_ip2, vrf_id=99)
2555         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2556         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2557         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2558                                                   is_inside=0)
2559
2560         # first VRF
2561         pkts = self.create_stream_in(self.pg0, self.pg2)
2562         self.pg0.add_stream(pkts)
2563         self.pg_enable_capture(self.pg_interfaces)
2564         self.pg_start()
2565         capture = self.pg2.get_capture(len(pkts))
2566         self.verify_capture_out(capture, nat_ip1)
2567
2568         # second VRF
2569         pkts = self.create_stream_in(self.pg1, self.pg2)
2570         self.pg1.add_stream(pkts)
2571         self.pg_enable_capture(self.pg_interfaces)
2572         self.pg_start()
2573         capture = self.pg2.get_capture(len(pkts))
2574         self.verify_capture_out(capture, nat_ip1)
2575
2576     def test_dynamic_ipless_interfaces(self):
2577         """ NAT44 interfaces without configured IP address """
2578
2579         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2580                                       mactobinary(self.pg7.remote_mac),
2581                                       self.pg7.remote_ip4n,
2582                                       is_static=1)
2583         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2584                                       mactobinary(self.pg8.remote_mac),
2585                                       self.pg8.remote_ip4n,
2586                                       is_static=1)
2587
2588         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2589                                    dst_address_length=32,
2590                                    next_hop_address=self.pg7.remote_ip4n,
2591                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2592         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2593                                    dst_address_length=32,
2594                                    next_hop_address=self.pg8.remote_ip4n,
2595                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2596
2597         self.nat44_add_address(self.nat_addr)
2598         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2599         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2600                                                   is_inside=0)
2601
2602         # in2out
2603         pkts = self.create_stream_in(self.pg7, self.pg8)
2604         self.pg7.add_stream(pkts)
2605         self.pg_enable_capture(self.pg_interfaces)
2606         self.pg_start()
2607         capture = self.pg8.get_capture(len(pkts))
2608         self.verify_capture_out(capture)
2609
2610         # out2in
2611         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2612         self.pg8.add_stream(pkts)
2613         self.pg_enable_capture(self.pg_interfaces)
2614         self.pg_start()
2615         capture = self.pg7.get_capture(len(pkts))
2616         self.verify_capture_in(capture, self.pg7)
2617
2618     def test_static_ipless_interfaces(self):
2619         """ NAT44 interfaces without configured IP address - 1:1 NAT """
2620
2621         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2622                                       mactobinary(self.pg7.remote_mac),
2623                                       self.pg7.remote_ip4n,
2624                                       is_static=1)
2625         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2626                                       mactobinary(self.pg8.remote_mac),
2627                                       self.pg8.remote_ip4n,
2628                                       is_static=1)
2629
2630         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2631                                    dst_address_length=32,
2632                                    next_hop_address=self.pg7.remote_ip4n,
2633                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2634         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2635                                    dst_address_length=32,
2636                                    next_hop_address=self.pg8.remote_ip4n,
2637                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2638
2639         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2640         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2641         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2642                                                   is_inside=0)
2643
2644         # out2in
2645         pkts = self.create_stream_out(self.pg8)
2646         self.pg8.add_stream(pkts)
2647         self.pg_enable_capture(self.pg_interfaces)
2648         self.pg_start()
2649         capture = self.pg7.get_capture(len(pkts))
2650         self.verify_capture_in(capture, self.pg7)
2651
2652         # in2out
2653         pkts = self.create_stream_in(self.pg7, self.pg8)
2654         self.pg7.add_stream(pkts)
2655         self.pg_enable_capture(self.pg_interfaces)
2656         self.pg_start()
2657         capture = self.pg8.get_capture(len(pkts))
2658         self.verify_capture_out(capture, self.nat_addr, True)
2659
2660     def test_static_with_port_ipless_interfaces(self):
2661         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2662
2663         self.tcp_port_out = 30606
2664         self.udp_port_out = 30607
2665         self.icmp_id_out = 30608
2666
2667         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2668                                       mactobinary(self.pg7.remote_mac),
2669                                       self.pg7.remote_ip4n,
2670                                       is_static=1)
2671         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2672                                       mactobinary(self.pg8.remote_mac),
2673                                       self.pg8.remote_ip4n,
2674                                       is_static=1)
2675
2676         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2677                                    dst_address_length=32,
2678                                    next_hop_address=self.pg7.remote_ip4n,
2679                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2680         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2681                                    dst_address_length=32,
2682                                    next_hop_address=self.pg8.remote_ip4n,
2683                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2684
2685         self.nat44_add_address(self.nat_addr)
2686         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2687                                       self.tcp_port_in, self.tcp_port_out,
2688                                       proto=IP_PROTOS.tcp)
2689         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2690                                       self.udp_port_in, self.udp_port_out,
2691                                       proto=IP_PROTOS.udp)
2692         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2693                                       self.icmp_id_in, self.icmp_id_out,
2694                                       proto=IP_PROTOS.icmp)
2695         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2696         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2697                                                   is_inside=0)
2698
2699         # out2in
2700         pkts = self.create_stream_out(self.pg8)
2701         self.pg8.add_stream(pkts)
2702         self.pg_enable_capture(self.pg_interfaces)
2703         self.pg_start()
2704         capture = self.pg7.get_capture(len(pkts))
2705         self.verify_capture_in(capture, self.pg7)
2706
2707         # in2out
2708         pkts = self.create_stream_in(self.pg7, self.pg8)
2709         self.pg7.add_stream(pkts)
2710         self.pg_enable_capture(self.pg_interfaces)
2711         self.pg_start()
2712         capture = self.pg8.get_capture(len(pkts))
2713         self.verify_capture_out(capture)
2714
2715     def test_static_unknown_proto(self):
2716         """ 1:1 NAT translate packet with unknown protocol """
2717         nat_ip = "10.0.0.10"
2718         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2719         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2720         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2721                                                   is_inside=0)
2722
2723         # in2out
2724         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2725              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2726              GRE() /
2727              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2728              TCP(sport=1234, dport=1234))
2729         self.pg0.add_stream(p)
2730         self.pg_enable_capture(self.pg_interfaces)
2731         self.pg_start()
2732         p = self.pg1.get_capture(1)
2733         packet = p[0]
2734         try:
2735             self.assertEqual(packet[IP].src, nat_ip)
2736             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2737             self.assertTrue(packet.haslayer(GRE))
2738             self.check_ip_checksum(packet)
2739         except:
2740             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2741             raise
2742
2743         # out2in
2744         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2745              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2746              GRE() /
2747              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2748              TCP(sport=1234, dport=1234))
2749         self.pg1.add_stream(p)
2750         self.pg_enable_capture(self.pg_interfaces)
2751         self.pg_start()
2752         p = self.pg0.get_capture(1)
2753         packet = p[0]
2754         try:
2755             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2756             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2757             self.assertTrue(packet.haslayer(GRE))
2758             self.check_ip_checksum(packet)
2759         except:
2760             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2761             raise
2762
2763     def test_hairpinning_static_unknown_proto(self):
2764         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2765
2766         host = self.pg0.remote_hosts[0]
2767         server = self.pg0.remote_hosts[1]
2768
2769         host_nat_ip = "10.0.0.10"
2770         server_nat_ip = "10.0.0.11"
2771
2772         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2773         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2774         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2775         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2776                                                   is_inside=0)
2777
2778         # host to server
2779         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2780              IP(src=host.ip4, dst=server_nat_ip) /
2781              GRE() /
2782              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2783              TCP(sport=1234, dport=1234))
2784         self.pg0.add_stream(p)
2785         self.pg_enable_capture(self.pg_interfaces)
2786         self.pg_start()
2787         p = self.pg0.get_capture(1)
2788         packet = p[0]
2789         try:
2790             self.assertEqual(packet[IP].src, host_nat_ip)
2791             self.assertEqual(packet[IP].dst, server.ip4)
2792             self.assertTrue(packet.haslayer(GRE))
2793             self.check_ip_checksum(packet)
2794         except:
2795             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2796             raise
2797
2798         # server to host
2799         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2800              IP(src=server.ip4, dst=host_nat_ip) /
2801              GRE() /
2802              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2803              TCP(sport=1234, dport=1234))
2804         self.pg0.add_stream(p)
2805         self.pg_enable_capture(self.pg_interfaces)
2806         self.pg_start()
2807         p = self.pg0.get_capture(1)
2808         packet = p[0]
2809         try:
2810             self.assertEqual(packet[IP].src, server_nat_ip)
2811             self.assertEqual(packet[IP].dst, host.ip4)
2812             self.assertTrue(packet.haslayer(GRE))
2813             self.check_ip_checksum(packet)
2814         except:
2815             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2816             raise
2817
2818     def test_unknown_proto(self):
2819         """ NAT44 translate packet with unknown protocol """
2820         self.nat44_add_address(self.nat_addr)
2821         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2822         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2823                                                   is_inside=0)
2824
2825         # in2out
2826         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2827              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2828              TCP(sport=self.tcp_port_in, dport=20))
2829         self.pg0.add_stream(p)
2830         self.pg_enable_capture(self.pg_interfaces)
2831         self.pg_start()
2832         p = self.pg1.get_capture(1)
2833
2834         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2835              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2836              GRE() /
2837              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2838              TCP(sport=1234, dport=1234))
2839         self.pg0.add_stream(p)
2840         self.pg_enable_capture(self.pg_interfaces)
2841         self.pg_start()
2842         p = self.pg1.get_capture(1)
2843         packet = p[0]
2844         try:
2845             self.assertEqual(packet[IP].src, self.nat_addr)
2846             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2847             self.assertTrue(packet.haslayer(GRE))
2848             self.check_ip_checksum(packet)
2849         except:
2850             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2851             raise
2852
2853         # out2in
2854         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2855              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2856              GRE() /
2857              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2858              TCP(sport=1234, dport=1234))
2859         self.pg1.add_stream(p)
2860         self.pg_enable_capture(self.pg_interfaces)
2861         self.pg_start()
2862         p = self.pg0.get_capture(1)
2863         packet = p[0]
2864         try:
2865             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2866             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2867             self.assertTrue(packet.haslayer(GRE))
2868             self.check_ip_checksum(packet)
2869         except:
2870             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2871             raise
2872
2873     def test_hairpinning_unknown_proto(self):
2874         """ NAT44 translate packet with unknown protocol - hairpinning """
2875         host = self.pg0.remote_hosts[0]
2876         server = self.pg0.remote_hosts[1]
2877         host_in_port = 1234
2878         host_out_port = 0
2879         server_in_port = 5678
2880         server_out_port = 8765
2881         server_nat_ip = "10.0.0.11"
2882
2883         self.nat44_add_address(self.nat_addr)
2884         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2885         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2886                                                   is_inside=0)
2887
2888         # add static mapping for server
2889         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2890
2891         # host to server
2892         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2893              IP(src=host.ip4, dst=server_nat_ip) /
2894              TCP(sport=host_in_port, dport=server_out_port))
2895         self.pg0.add_stream(p)
2896         self.pg_enable_capture(self.pg_interfaces)
2897         self.pg_start()
2898         capture = self.pg0.get_capture(1)
2899
2900         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2901              IP(src=host.ip4, dst=server_nat_ip) /
2902              GRE() /
2903              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2904              TCP(sport=1234, dport=1234))
2905         self.pg0.add_stream(p)
2906         self.pg_enable_capture(self.pg_interfaces)
2907         self.pg_start()
2908         p = self.pg0.get_capture(1)
2909         packet = p[0]
2910         try:
2911             self.assertEqual(packet[IP].src, self.nat_addr)
2912             self.assertEqual(packet[IP].dst, server.ip4)
2913             self.assertTrue(packet.haslayer(GRE))
2914             self.check_ip_checksum(packet)
2915         except:
2916             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2917             raise
2918
2919         # server to host
2920         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2921              IP(src=server.ip4, dst=self.nat_addr) /
2922              GRE() /
2923              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2924              TCP(sport=1234, dport=1234))
2925         self.pg0.add_stream(p)
2926         self.pg_enable_capture(self.pg_interfaces)
2927         self.pg_start()
2928         p = self.pg0.get_capture(1)
2929         packet = p[0]
2930         try:
2931             self.assertEqual(packet[IP].src, server_nat_ip)
2932             self.assertEqual(packet[IP].dst, host.ip4)
2933             self.assertTrue(packet.haslayer(GRE))
2934             self.check_ip_checksum(packet)
2935         except:
2936             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2937             raise
2938
2939     def test_output_feature(self):
2940         """ NAT44 interface output feature (in2out postrouting) """
2941         self.nat44_add_address(self.nat_addr)
2942         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2943         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2944         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2945                                                          is_inside=0)
2946
2947         # in2out
2948         pkts = self.create_stream_in(self.pg0, self.pg3)
2949         self.pg0.add_stream(pkts)
2950         self.pg_enable_capture(self.pg_interfaces)
2951         self.pg_start()
2952         capture = self.pg3.get_capture(len(pkts))
2953         self.verify_capture_out(capture)
2954
2955         # out2in
2956         pkts = self.create_stream_out(self.pg3)
2957         self.pg3.add_stream(pkts)
2958         self.pg_enable_capture(self.pg_interfaces)
2959         self.pg_start()
2960         capture = self.pg0.get_capture(len(pkts))
2961         self.verify_capture_in(capture, self.pg0)
2962
2963         # from non-NAT interface to NAT inside interface
2964         pkts = self.create_stream_in(self.pg2, self.pg0)
2965         self.pg2.add_stream(pkts)
2966         self.pg_enable_capture(self.pg_interfaces)
2967         self.pg_start()
2968         capture = self.pg0.get_capture(len(pkts))
2969         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2970
2971     def test_output_feature_vrf_aware(self):
2972         """ NAT44 interface output feature VRF aware (in2out postrouting) """
2973         nat_ip_vrf10 = "10.0.0.10"
2974         nat_ip_vrf20 = "10.0.0.20"
2975
2976         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2977                                    dst_address_length=32,
2978                                    next_hop_address=self.pg3.remote_ip4n,
2979                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2980                                    table_id=10)
2981         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2982                                    dst_address_length=32,
2983                                    next_hop_address=self.pg3.remote_ip4n,
2984                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2985                                    table_id=20)
2986
2987         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2988         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2989         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2990         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2991         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2992                                                          is_inside=0)
2993
2994         # in2out VRF 10
2995         pkts = self.create_stream_in(self.pg4, self.pg3)
2996         self.pg4.add_stream(pkts)
2997         self.pg_enable_capture(self.pg_interfaces)
2998         self.pg_start()
2999         capture = self.pg3.get_capture(len(pkts))
3000         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3001
3002         # out2in VRF 10
3003         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3004         self.pg3.add_stream(pkts)
3005         self.pg_enable_capture(self.pg_interfaces)
3006         self.pg_start()
3007         capture = self.pg4.get_capture(len(pkts))
3008         self.verify_capture_in(capture, self.pg4)
3009
3010         # in2out VRF 20
3011         pkts = self.create_stream_in(self.pg6, self.pg3)
3012         self.pg6.add_stream(pkts)
3013         self.pg_enable_capture(self.pg_interfaces)
3014         self.pg_start()
3015         capture = self.pg3.get_capture(len(pkts))
3016         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3017
3018         # out2in VRF 20
3019         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3020         self.pg3.add_stream(pkts)
3021         self.pg_enable_capture(self.pg_interfaces)
3022         self.pg_start()
3023         capture = self.pg6.get_capture(len(pkts))
3024         self.verify_capture_in(capture, self.pg6)
3025
3026     def test_output_feature_hairpinning(self):
3027         """ NAT44 interface output feature hairpinning (in2out postrouting) """
3028         host = self.pg0.remote_hosts[0]
3029         server = self.pg0.remote_hosts[1]
3030         host_in_port = 1234
3031         host_out_port = 0
3032         server_in_port = 5678
3033         server_out_port = 8765
3034
3035         self.nat44_add_address(self.nat_addr)
3036         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3037         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3038                                                          is_inside=0)
3039
3040         # add static mapping for server
3041         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3042                                       server_in_port, server_out_port,
3043                                       proto=IP_PROTOS.tcp)
3044
3045         # send packet from host to server
3046         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3047              IP(src=host.ip4, dst=self.nat_addr) /
3048              TCP(sport=host_in_port, dport=server_out_port))
3049         self.pg0.add_stream(p)
3050         self.pg_enable_capture(self.pg_interfaces)
3051         self.pg_start()
3052         capture = self.pg0.get_capture(1)
3053         p = capture[0]
3054         try:
3055             ip = p[IP]
3056             tcp = p[TCP]
3057             self.assertEqual(ip.src, self.nat_addr)
3058             self.assertEqual(ip.dst, server.ip4)
3059             self.assertNotEqual(tcp.sport, host_in_port)
3060             self.assertEqual(tcp.dport, server_in_port)
3061             self.check_tcp_checksum(p)
3062             host_out_port = tcp.sport
3063         except:
3064             self.logger.error(ppp("Unexpected or invalid packet:", p))
3065             raise
3066
3067         # send reply from server to host
3068         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3069              IP(src=server.ip4, dst=self.nat_addr) /
3070              TCP(sport=server_in_port, dport=host_out_port))
3071         self.pg0.add_stream(p)
3072         self.pg_enable_capture(self.pg_interfaces)
3073         self.pg_start()
3074         capture = self.pg0.get_capture(1)
3075         p = capture[0]
3076         try:
3077             ip = p[IP]
3078             tcp = p[TCP]
3079             self.assertEqual(ip.src, self.nat_addr)
3080             self.assertEqual(ip.dst, host.ip4)
3081             self.assertEqual(tcp.sport, server_out_port)
3082             self.assertEqual(tcp.dport, host_in_port)
3083             self.check_tcp_checksum(p)
3084         except:
3085             self.logger.error(ppp("Unexpected or invalid packet:"), p)
3086             raise
3087
3088     def test_one_armed_nat44(self):
3089         """ One armed NAT44 """
3090         remote_host = self.pg9.remote_hosts[0]
3091         local_host = self.pg9.remote_hosts[1]
3092         external_port = 0
3093
3094         self.nat44_add_address(self.nat_addr)
3095         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3096         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3097                                                   is_inside=0)
3098
3099         # in2out
3100         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3101              IP(src=local_host.ip4, dst=remote_host.ip4) /
3102              TCP(sport=12345, dport=80))
3103         self.pg9.add_stream(p)
3104         self.pg_enable_capture(self.pg_interfaces)
3105         self.pg_start()
3106         capture = self.pg9.get_capture(1)
3107         p = capture[0]
3108         try:
3109             ip = p[IP]
3110             tcp = p[TCP]
3111             self.assertEqual(ip.src, self.nat_addr)
3112             self.assertEqual(ip.dst, remote_host.ip4)
3113             self.assertNotEqual(tcp.sport, 12345)
3114             external_port = tcp.sport
3115             self.assertEqual(tcp.dport, 80)
3116             self.check_tcp_checksum(p)
3117             self.check_ip_checksum(p)
3118         except:
3119             self.logger.error(ppp("Unexpected or invalid packet:", p))
3120             raise
3121
3122         # out2in
3123         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3124              IP(src=remote_host.ip4, dst=self.nat_addr) /
3125              TCP(sport=80, dport=external_port))
3126         self.pg9.add_stream(p)
3127         self.pg_enable_capture(self.pg_interfaces)
3128         self.pg_start()
3129         capture = self.pg9.get_capture(1)
3130         p = capture[0]
3131         try:
3132             ip = p[IP]
3133             tcp = p[TCP]
3134             self.assertEqual(ip.src, remote_host.ip4)
3135             self.assertEqual(ip.dst, local_host.ip4)
3136             self.assertEqual(tcp.sport, 80)
3137             self.assertEqual(tcp.dport, 12345)
3138             self.check_tcp_checksum(p)
3139             self.check_ip_checksum(p)
3140         except:
3141             self.logger.error(ppp("Unexpected or invalid packet:", p))
3142             raise
3143
3144     def test_del_session(self):
3145         """ Delete NAT44 session """
3146         self.nat44_add_address(self.nat_addr)
3147         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3148         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3149                                                   is_inside=0)
3150
3151         pkts = self.create_stream_in(self.pg0, self.pg1)
3152         self.pg0.add_stream(pkts)
3153         self.pg_enable_capture(self.pg_interfaces)
3154         self.pg_start()
3155         capture = self.pg1.get_capture(len(pkts))
3156
3157         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3158         nsessions = len(sessions)
3159
3160         self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3161                                     sessions[0].inside_port,
3162                                     sessions[0].protocol)
3163         self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3164                                     sessions[1].outside_port,
3165                                     sessions[1].protocol,
3166                                     is_in=0)
3167
3168         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3169         self.assertEqual(nsessions - len(sessions), 2)
3170
3171     def test_set_get_reass(self):
3172         """ NAT44 set/get virtual fragmentation reassembly """
3173         reas_cfg1 = self.vapi.nat_get_reass()
3174
3175         self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3176                                 max_reass=reas_cfg1.ip4_max_reass * 2,
3177                                 max_frag=reas_cfg1.ip4_max_frag * 2)
3178
3179         reas_cfg2 = self.vapi.nat_get_reass()
3180
3181         self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3182         self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3183         self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3184
3185         self.vapi.nat_set_reass(drop_frag=1)
3186         self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3187
3188     def test_frag_in_order(self):
3189         """ NAT44 translate fragments arriving in order """
3190         self.nat44_add_address(self.nat_addr)
3191         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3192         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3193                                                   is_inside=0)
3194
3195         data = "A" * 4 + "B" * 16 + "C" * 3
3196         self.tcp_port_in = random.randint(1025, 65535)
3197
3198         reass = self.vapi.nat_reass_dump()
3199         reass_n_start = len(reass)
3200
3201         # in2out
3202         pkts = self.create_stream_frag(self.pg0,
3203                                        self.pg1.remote_ip4,
3204                                        self.tcp_port_in,
3205                                        20,
3206                                        data)
3207         self.pg0.add_stream(pkts)
3208         self.pg_enable_capture(self.pg_interfaces)
3209         self.pg_start()
3210         frags = self.pg1.get_capture(len(pkts))
3211         p = self.reass_frags_and_verify(frags,
3212                                         self.nat_addr,
3213                                         self.pg1.remote_ip4)
3214         self.assertEqual(p[TCP].dport, 20)
3215         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3216         self.tcp_port_out = p[TCP].sport
3217         self.assertEqual(data, p[Raw].load)
3218
3219         # out2in
3220         pkts = self.create_stream_frag(self.pg1,
3221                                        self.nat_addr,
3222                                        20,
3223                                        self.tcp_port_out,
3224                                        data)
3225         self.pg1.add_stream(pkts)
3226         self.pg_enable_capture(self.pg_interfaces)
3227         self.pg_start()
3228         frags = self.pg0.get_capture(len(pkts))
3229         p = self.reass_frags_and_verify(frags,
3230                                         self.pg1.remote_ip4,
3231                                         self.pg0.remote_ip4)
3232         self.assertEqual(p[TCP].sport, 20)
3233         self.assertEqual(p[TCP].dport, self.tcp_port_in)
3234         self.assertEqual(data, p[Raw].load)
3235
3236         reass = self.vapi.nat_reass_dump()
3237         reass_n_end = len(reass)
3238
3239         self.assertEqual(reass_n_end - reass_n_start, 2)
3240
3241     def test_reass_hairpinning(self):
3242         """ NAT44 fragments hairpinning """
3243         host = self.pg0.remote_hosts[0]
3244         server = self.pg0.remote_hosts[1]
3245         host_in_port = random.randint(1025, 65535)
3246         host_out_port = 0
3247         server_in_port = random.randint(1025, 65535)
3248         server_out_port = random.randint(1025, 65535)
3249         data = "A" * 4 + "B" * 16 + "C" * 3
3250
3251         self.nat44_add_address(self.nat_addr)
3252         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3253         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3254                                                   is_inside=0)
3255         # add static mapping for server
3256         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3257                                       server_in_port, server_out_port,
3258                                       proto=IP_PROTOS.tcp)
3259
3260         # send packet from host to server
3261         pkts = self.create_stream_frag(self.pg0,
3262                                        self.nat_addr,
3263                                        host_in_port,
3264                                        server_out_port,
3265                                        data)
3266         self.pg0.add_stream(pkts)
3267         self.pg_enable_capture(self.pg_interfaces)
3268         self.pg_start()
3269         frags = self.pg0.get_capture(len(pkts))
3270         p = self.reass_frags_and_verify(frags,
3271                                         self.nat_addr,
3272                                         server.ip4)
3273         self.assertNotEqual(p[TCP].sport, host_in_port)
3274         self.assertEqual(p[TCP].dport, server_in_port)
3275         self.assertEqual(data, p[Raw].load)
3276
3277     def test_frag_out_of_order(self):
3278         """ NAT44 translate fragments arriving out of order """
3279         self.nat44_add_address(self.nat_addr)
3280         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3281         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3282                                                   is_inside=0)
3283
3284         data = "A" * 4 + "B" * 16 + "C" * 3
3285         random.randint(1025, 65535)
3286
3287         # in2out
3288         pkts = self.create_stream_frag(self.pg0,
3289                                        self.pg1.remote_ip4,
3290                                        self.tcp_port_in,
3291                                        20,
3292                                        data)
3293         pkts.reverse()
3294         self.pg0.add_stream(pkts)
3295         self.pg_enable_capture(self.pg_interfaces)
3296         self.pg_start()
3297         frags = self.pg1.get_capture(len(pkts))
3298         p = self.reass_frags_and_verify(frags,
3299                                         self.nat_addr,
3300                                         self.pg1.remote_ip4)
3301         self.assertEqual(p[TCP].dport, 20)
3302         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3303         self.tcp_port_out = p[TCP].sport
3304         self.assertEqual(data, p[Raw].load)
3305
3306         # out2in
3307         pkts = self.create_stream_frag(self.pg1,
3308                                        self.nat_addr,
3309                                        20,
3310                                        self.tcp_port_out,
3311                                        data)
3312         pkts.reverse()
3313         self.pg1.add_stream(pkts)
3314         self.pg_enable_capture(self.pg_interfaces)
3315         self.pg_start()
3316         frags = self.pg0.get_capture(len(pkts))
3317         p = self.reass_frags_and_verify(frags,
3318                                         self.pg1.remote_ip4,
3319                                         self.pg0.remote_ip4)
3320         self.assertEqual(p[TCP].sport, 20)
3321         self.assertEqual(p[TCP].dport, self.tcp_port_in)
3322         self.assertEqual(data, p[Raw].load)
3323
3324     def test_port_restricted(self):
3325         """ Port restricted NAT44 (MAP-E CE) """
3326         self.nat44_add_address(self.nat_addr)
3327         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3328         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3329                                                   is_inside=0)
3330         self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3331                       "psid-offset 6 psid-len 6")
3332
3333         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3334              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3335              TCP(sport=4567, dport=22))
3336         self.pg0.add_stream(p)
3337         self.pg_enable_capture(self.pg_interfaces)
3338         self.pg_start()
3339         capture = self.pg1.get_capture(1)
3340         p = capture[0]
3341         try:
3342             ip = p[IP]
3343             tcp = p[TCP]
3344             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3345             self.assertEqual(ip.src, self.nat_addr)
3346             self.assertEqual(tcp.dport, 22)
3347             self.assertNotEqual(tcp.sport, 4567)
3348             self.assertEqual((tcp.sport >> 6) & 63, 10)
3349             self.check_tcp_checksum(p)
3350             self.check_ip_checksum(p)
3351         except:
3352             self.logger.error(ppp("Unexpected or invalid packet:", p))
3353             raise
3354
3355     def test_twice_nat(self):
3356         """ Twice NAT44 """
3357         twice_nat_addr = '10.0.1.3'
3358         port_in = 8080
3359         port_out = 80
3360         eh_port_out = 4567
3361         eh_port_in = 0
3362         self.nat44_add_address(self.nat_addr)
3363         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3364         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3365                                       port_in, port_out, proto=IP_PROTOS.tcp,
3366                                       twice_nat=1)
3367         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3368         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3369                                                   is_inside=0)
3370
3371         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3372              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3373              TCP(sport=eh_port_out, dport=port_out))
3374         self.pg1.add_stream(p)
3375         self.pg_enable_capture(self.pg_interfaces)
3376         self.pg_start()
3377         capture = self.pg0.get_capture(1)
3378         p = capture[0]
3379         try:
3380             ip = p[IP]
3381             tcp = p[TCP]
3382             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3383             self.assertEqual(ip.src, twice_nat_addr)
3384             self.assertEqual(tcp.dport, port_in)
3385             self.assertNotEqual(tcp.sport, eh_port_out)
3386             eh_port_in = tcp.sport
3387             self.check_tcp_checksum(p)
3388             self.check_ip_checksum(p)
3389         except:
3390             self.logger.error(ppp("Unexpected or invalid packet:", p))
3391             raise
3392
3393         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3394              IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3395              TCP(sport=port_in, dport=eh_port_in))
3396         self.pg0.add_stream(p)
3397         self.pg_enable_capture(self.pg_interfaces)
3398         self.pg_start()
3399         capture = self.pg1.get_capture(1)
3400         p = capture[0]
3401         try:
3402             ip = p[IP]
3403             tcp = p[TCP]
3404             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3405             self.assertEqual(ip.src, self.nat_addr)
3406             self.assertEqual(tcp.dport, eh_port_out)
3407             self.assertEqual(tcp.sport, port_out)
3408             self.check_tcp_checksum(p)
3409             self.check_ip_checksum(p)
3410         except:
3411             self.logger.error(ppp("Unexpected or invalid packet:", p))
3412             raise
3413
3414     def test_twice_nat_lb(self):
3415         """ Twice NAT44 local service load balancing """
3416         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3417         twice_nat_addr = '10.0.1.3'
3418         local_port = 8080
3419         external_port = 80
3420         eh_port_out = 4567
3421         eh_port_in = 0
3422         server1 = self.pg0.remote_hosts[0]
3423         server2 = self.pg0.remote_hosts[1]
3424
3425         locals = [{'addr': server1.ip4n,
3426                    'port': local_port,
3427                    'probability': 50},
3428                   {'addr': server2.ip4n,
3429                    'port': local_port,
3430                    'probability': 50}]
3431
3432         self.nat44_add_address(self.nat_addr)
3433         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3434
3435         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3436                                                   external_port,
3437                                                   IP_PROTOS.tcp,
3438                                                   twice_nat=1,
3439                                                   local_num=len(locals),
3440                                                   locals=locals)
3441         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3442         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3443                                                   is_inside=0)
3444
3445         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3446              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3447              TCP(sport=eh_port_out, dport=external_port))
3448         self.pg1.add_stream(p)
3449         self.pg_enable_capture(self.pg_interfaces)
3450         self.pg_start()
3451         capture = self.pg0.get_capture(1)
3452         p = capture[0]
3453         server = None
3454         try:
3455             ip = p[IP]
3456             tcp = p[TCP]
3457             self.assertEqual(ip.src, twice_nat_addr)
3458             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3459             if ip.dst == server1.ip4:
3460                 server = server1
3461             else:
3462                 server = server2
3463             self.assertNotEqual(tcp.sport, eh_port_out)
3464             eh_port_in = tcp.sport
3465             self.assertEqual(tcp.dport, local_port)
3466             self.check_tcp_checksum(p)
3467             self.check_ip_checksum(p)
3468         except:
3469             self.logger.error(ppp("Unexpected or invalid packet:", p))
3470             raise
3471
3472         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3473              IP(src=server.ip4, dst=twice_nat_addr) /
3474              TCP(sport=local_port, dport=eh_port_in))
3475         self.pg0.add_stream(p)
3476         self.pg_enable_capture(self.pg_interfaces)
3477         self.pg_start()
3478         capture = self.pg1.get_capture(1)
3479         p = capture[0]
3480         try:
3481             ip = p[IP]
3482             tcp = p[TCP]
3483             self.assertEqual(ip.src, self.nat_addr)
3484             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3485             self.assertEqual(tcp.sport, external_port)
3486             self.assertEqual(tcp.dport, eh_port_out)
3487             self.check_tcp_checksum(p)
3488             self.check_ip_checksum(p)
3489         except:
3490             self.logger.error(ppp("Unexpected or invalid packet:", p))
3491             raise
3492
3493     def test_twice_nat_interface_addr(self):
3494         """ Acquire twice NAT44 addresses from interface """
3495         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3496
3497         # no address in NAT pool
3498         adresses = self.vapi.nat44_address_dump()
3499         self.assertEqual(0, len(adresses))
3500
3501         # configure interface address and check NAT address pool
3502         self.pg7.config_ip4()
3503         adresses = self.vapi.nat44_address_dump()
3504         self.assertEqual(1, len(adresses))
3505         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3506         self.assertEqual(adresses[0].twice_nat, 1)
3507
3508         # remove interface address and check NAT address pool
3509         self.pg7.unconfig_ip4()
3510         adresses = self.vapi.nat44_address_dump()
3511         self.assertEqual(0, len(adresses))
3512
3513     def test_ipfix_max_frags(self):
3514         """ IPFIX logging maximum fragments pending reassembly exceeded """
3515         self.nat44_add_address(self.nat_addr)
3516         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3517         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3518                                                   is_inside=0)
3519         self.vapi.nat_set_reass(max_frag=0)
3520         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3521                                      src_address=self.pg3.local_ip4n,
3522                                      path_mtu=512,
3523                                      template_interval=10)
3524         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3525                             src_port=self.ipfix_src_port)
3526
3527         data = "A" * 4 + "B" * 16 + "C" * 3
3528         self.tcp_port_in = random.randint(1025, 65535)
3529         pkts = self.create_stream_frag(self.pg0,
3530                                        self.pg1.remote_ip4,
3531                                        self.tcp_port_in,
3532                                        20,
3533                                        data)
3534         self.pg0.add_stream(pkts[-1])
3535         self.pg_enable_capture(self.pg_interfaces)
3536         self.pg_start()
3537         frags = self.pg1.get_capture(0)
3538         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
3539         capture = self.pg3.get_capture(9)
3540         ipfix = IPFIXDecoder()
3541         # first load template
3542         for p in capture:
3543             self.assertTrue(p.haslayer(IPFIX))
3544             self.assertEqual(p[IP].src, self.pg3.local_ip4)
3545             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3546             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3547             self.assertEqual(p[UDP].dport, 4739)
3548             self.assertEqual(p[IPFIX].observationDomainID,
3549                              self.ipfix_domain_id)
3550             if p.haslayer(Template):
3551                 ipfix.add_template(p.getlayer(Template))
3552         # verify events in data set
3553         for p in capture:
3554             if p.haslayer(Data):
3555                 data = ipfix.decode_data_set(p.getlayer(Set))
3556                 self.verify_ipfix_max_fragments_ip4(data, 0,
3557                                                     self.pg0.remote_ip4n)
3558
3559     def tearDown(self):
3560         super(TestNAT44, self).tearDown()
3561         if not self.vpp_dead:
3562             self.logger.info(self.vapi.cli("show nat44 verbose"))
3563             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3564             self.vapi.cli("nat addr-port-assignment-alg default")
3565             self.clear_nat44()
3566
3567
3568 class TestNAT44Out2InDPO(MethodHolder):
3569     """ NAT44 Test Cases using out2in DPO """
3570
3571     @classmethod
3572     def setUpConstants(cls):
3573         super(TestNAT44Out2InDPO, cls).setUpConstants()
3574         cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3575
3576     @classmethod
3577     def setUpClass(cls):
3578         super(TestNAT44Out2InDPO, cls).setUpClass()
3579
3580         try:
3581             cls.tcp_port_in = 6303
3582             cls.tcp_port_out = 6303
3583             cls.udp_port_in = 6304
3584             cls.udp_port_out = 6304
3585             cls.icmp_id_in = 6305
3586             cls.icmp_id_out = 6305
3587             cls.nat_addr = '10.0.0.3'
3588             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3589             cls.dst_ip4 = '192.168.70.1'
3590
3591             cls.create_pg_interfaces(range(2))
3592
3593             cls.pg0.admin_up()
3594             cls.pg0.config_ip4()
3595             cls.pg0.resolve_arp()
3596
3597             cls.pg1.admin_up()
3598             cls.pg1.config_ip6()
3599             cls.pg1.resolve_ndp()
3600
3601             cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3602                                       dst_address_length=0,
3603                                       next_hop_address=cls.pg1.remote_ip6n,
3604                                       next_hop_sw_if_index=cls.pg1.sw_if_index)
3605
3606         except Exception:
3607             super(TestNAT44Out2InDPO, cls).tearDownClass()
3608             raise
3609
3610     def configure_xlat(self):
3611         self.dst_ip6_pfx = '1:2:3::'
3612         self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3613                                               self.dst_ip6_pfx)
3614         self.dst_ip6_pfx_len = 96
3615         self.src_ip6_pfx = '4:5:6::'
3616         self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3617                                               self.src_ip6_pfx)
3618         self.src_ip6_pfx_len = 96
3619         self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3620                                  self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3621                                  '\x00\x00\x00\x00', 0, is_translation=1,
3622                                  is_rfc6052=1)
3623
3624     def test_464xlat_ce(self):
3625         """ Test 464XLAT CE with NAT44 """
3626
3627         self.configure_xlat()
3628
3629         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3630         self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
3631
3632         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3633                                        self.dst_ip6_pfx_len)
3634         out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3635                                        self.src_ip6_pfx_len)
3636
3637         try:
3638             pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3639             self.pg0.add_stream(pkts)
3640             self.pg_enable_capture(self.pg_interfaces)
3641             self.pg_start()
3642             capture = self.pg1.get_capture(len(pkts))
3643             self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3644                                         dst_ip=out_src_ip6)
3645
3646             pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3647                                               out_dst_ip6)
3648             self.pg1.add_stream(pkts)
3649             self.pg_enable_capture(self.pg_interfaces)
3650             self.pg_start()
3651             capture = self.pg0.get_capture(len(pkts))
3652             self.verify_capture_in(capture, self.pg0)
3653         finally:
3654             self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3655                                                       is_add=0)
3656             self.vapi.nat44_add_del_address_range(self.nat_addr_n,
3657                                                   self.nat_addr_n, is_add=0)
3658
3659     def test_464xlat_ce_no_nat(self):
3660         """ Test 464XLAT CE without NAT44 """
3661
3662         self.configure_xlat()
3663
3664         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3665                                        self.dst_ip6_pfx_len)
3666         out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3667                                        self.src_ip6_pfx_len)
3668
3669         pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3670         self.pg0.add_stream(pkts)
3671         self.pg_enable_capture(self.pg_interfaces)
3672         self.pg_start()
3673         capture = self.pg1.get_capture(len(pkts))
3674         self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3675                                     nat_ip=out_dst_ip6, same_port=True)
3676
3677         pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3678         self.pg1.add_stream(pkts)
3679         self.pg_enable_capture(self.pg_interfaces)
3680         self.pg_start()
3681         capture = self.pg0.get_capture(len(pkts))
3682         self.verify_capture_in(capture, self.pg0)
3683
3684
3685 class TestDeterministicNAT(MethodHolder):
3686     """ Deterministic NAT Test Cases """
3687
3688     @classmethod
3689     def setUpConstants(cls):
3690         super(TestDeterministicNAT, cls).setUpConstants()
3691         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3692
3693     @classmethod
3694     def setUpClass(cls):
3695         super(TestDeterministicNAT, cls).setUpClass()
3696
3697         try:
3698             cls.tcp_port_in = 6303
3699             cls.tcp_external_port = 6303
3700             cls.udp_port_in = 6304
3701             cls.udp_external_port = 6304
3702             cls.icmp_id_in = 6305
3703             cls.nat_addr = '10.0.0.3'
3704
3705             cls.create_pg_interfaces(range(3))
3706             cls.interfaces = list(cls.pg_interfaces)
3707
3708             for i in cls.interfaces:
3709                 i.admin_up()
3710                 i.config_ip4()
3711                 i.resolve_arp()
3712
3713             cls.pg0.generate_remote_hosts(2)
3714             cls.pg0.configure_ipv4_neighbors()
3715
3716         except Exception:
3717             super(TestDeterministicNAT, cls).tearDownClass()
3718             raise
3719
3720     def create_stream_in(self, in_if, out_if, ttl=64):
3721         """
3722         Create packet stream for inside network
3723
3724         :param in_if: Inside interface
3725         :param out_if: Outside interface
3726         :param ttl: TTL of generated packets
3727         """
3728         pkts = []
3729         # TCP
3730         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3731              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3732              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3733         pkts.append(p)
3734
3735         # UDP
3736         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3737              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3738              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3739         pkts.append(p)
3740
3741         # ICMP
3742         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3743              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3744              ICMP(id=self.icmp_id_in, type='echo-request'))
3745         pkts.append(p)
3746
3747         return pkts
3748
3749     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3750         """
3751         Create packet stream for outside network
3752
3753         :param out_if: Outside interface
3754         :param dst_ip: Destination IP address (Default use global NAT address)
3755         :param ttl: TTL of generated packets
3756         """
3757         if dst_ip is None:
3758             dst_ip = self.nat_addr
3759         pkts = []
3760         # TCP
3761         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3762              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3763              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3764         pkts.append(p)
3765
3766         # UDP
3767         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3768              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3769              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3770         pkts.append(p)
3771
3772         # ICMP
3773         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3774              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3775              ICMP(id=self.icmp_external_id, type='echo-reply'))
3776         pkts.append(p)
3777
3778         return pkts
3779
3780     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
3781         """
3782         Verify captured packets on outside network
3783
3784         :param capture: Captured packets
3785         :param nat_ip: Translated IP address (Default use global NAT address)
3786         :param same_port: Sorce port number is not translated (Default False)
3787         :param packet_num: Expected number of packets (Default 3)
3788         """
3789         if nat_ip is None:
3790             nat_ip = self.nat_addr
3791         self.assertEqual(packet_num, len(capture))
3792         for packet in capture:
3793             try:
3794                 self.assertEqual(packet[IP].src, nat_ip)
3795                 if packet.haslayer(TCP):
3796                     self.tcp_port_out = packet[TCP].sport
3797                 elif packet.haslayer(UDP):
3798                     self.udp_port_out = packet[UDP].sport
3799                 else:
3800                     self.icmp_external_id = packet[ICMP].id
3801             except:
3802                 self.logger.error(ppp("Unexpected or invalid packet "
3803                                       "(outside network):", packet))
3804                 raise
3805
3806     def initiate_tcp_session(self, in_if, out_if):
3807         """
3808         Initiates TCP session
3809
3810         :param in_if: Inside interface
3811         :param out_if: Outside interface
3812         """
3813         try:
3814             # SYN packet in->out
3815             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3816                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3817                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3818                      flags="S"))
3819             in_if.add_stream(p)
3820             self.pg_enable_capture(self.pg_interfaces)
3821             self.pg_start()
3822             capture = out_if.get_capture(1)
3823             p = capture[0]
3824             self.tcp_port_out = p[TCP].sport
3825
3826             # SYN + ACK packet out->in
3827             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
3828                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
3829                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3830                      flags="SA"))
3831             out_if.add_stream(p)
3832             self.pg_enable_capture(self.pg_interfaces)
3833             self.pg_start()
3834             in_if.get_capture(1)
3835
3836             # ACK packet in->out
3837             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3838                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3839                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3840                      flags="A"))
3841             in_if.add_stream(p)
3842             self.pg_enable_capture(self.pg_interfaces)
3843             self.pg_start()
3844             out_if.get_capture(1)
3845
3846         except:
3847             self.logger.error("TCP 3 way handshake failed")
3848             raise
3849
3850     def verify_ipfix_max_entries_per_user(self, data):
3851         """
3852         Verify IPFIX maximum entries per user exceeded event
3853
3854         :param data: Decoded IPFIX data records
3855         """
3856         self.assertEqual(1, len(data))
3857         record = data[0]
3858         # natEvent
3859         self.assertEqual(ord(record[230]), 13)
3860         # natQuotaExceededEvent
3861         self.assertEqual('\x03\x00\x00\x00', record[466])
3862         # maxEntriesPerUser
3863         self.assertEqual('\xe8\x03\x00\x00', record[473])
3864         # sourceIPv4Address
3865         self.assertEqual(self.pg0.remote_ip4n, record[8])
3866
3867     def test_deterministic_mode(self):
3868         """ NAT plugin run deterministic mode """
3869         in_addr = '172.16.255.0'
3870         out_addr = '172.17.255.50'
3871         in_addr_t = '172.16.255.20'
3872         in_addr_n = socket.inet_aton(in_addr)
3873         out_addr_n = socket.inet_aton(out_addr)
3874         in_addr_t_n = socket.inet_aton(in_addr_t)
3875         in_plen = 24
3876         out_plen = 32
3877
3878         nat_config = self.vapi.nat_show_config()
3879         self.assertEqual(1, nat_config.deterministic)
3880
3881         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
3882
3883         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
3884         self.assertEqual(rep1.out_addr[:4], out_addr_n)
3885         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
3886         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
3887
3888         deterministic_mappings = self.vapi.nat_det_map_dump()
3889         self.assertEqual(len(deterministic_mappings), 1)
3890         dsm = deterministic_mappings[0]
3891         self.assertEqual(in_addr_n, dsm.in_addr[:4])
3892         self.assertEqual(in_plen, dsm.in_plen)
3893         self.assertEqual(out_addr_n, dsm.out_addr[:4])
3894         self.assertEqual(out_plen, dsm.out_plen)
3895
3896         self.clear_nat_det()
3897         deterministic_mappings = self.vapi.nat_det_map_dump()
3898         self.assertEqual(len(deterministic_mappings), 0)
3899
3900     def test_set_timeouts(self):
3901         """ Set deterministic NAT timeouts """
3902         timeouts_before = self.vapi.nat_det_get_timeouts()
3903
3904         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
3905                                        timeouts_before.tcp_established + 10,
3906                                        timeouts_before.tcp_transitory + 10,
3907                                        timeouts_before.icmp + 10)
3908
3909         timeouts_after = self.vapi.nat_det_get_timeouts()
3910
3911         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
3912         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
3913         self.assertNotEqual(timeouts_before.tcp_established,
3914                             timeouts_after.tcp_established)
3915         self.assertNotEqual(timeouts_before.tcp_transitory,
3916                             timeouts_after.tcp_transitory)
3917
3918     def test_det_in(self):
3919         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
3920
3921         nat_ip = "10.0.0.10"
3922
3923         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3924                                       32,
3925                                       socket.inet_aton(nat_ip),
3926                                       32)
3927         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3928         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3929                                                   is_inside=0)
3930
3931         # in2out
3932         pkts = self.create_stream_in(self.pg0, self.pg1)
3933         self.pg0.add_stream(pkts)
3934         self.pg_enable_capture(self.pg_interfaces)
3935         self.pg_start()
3936         capture = self.pg1.get_capture(len(pkts))
3937         self.verify_capture_out(capture, nat_ip)
3938
3939         # out2in
3940         pkts = self.create_stream_out(self.pg1, nat_ip)
3941         self.pg1.add_stream(pkts)
3942         self.pg_enable_capture(self.pg_interfaces)
3943         self.pg_start()
3944         capture = self.pg0.get_capture(len(pkts))
3945         self.verify_capture_in(capture, self.pg0)
3946
3947         # session dump test
3948         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
3949         self.assertEqual(len(sessions), 3)
3950
3951         # TCP session
3952         s = sessions[0]
3953         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3954         self.assertEqual(s.in_port, self.tcp_port_in)
3955         self.assertEqual(s.out_port, self.tcp_port_out)
3956         self.assertEqual(s.ext_port, self.tcp_external_port)
3957
3958         # UDP session
3959         s = sessions[1]
3960         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3961         self.assertEqual(s.in_port, self.udp_port_in)
3962         self.assertEqual(s.out_port, self.udp_port_out)
3963         self.assertEqual(s.ext_port, self.udp_external_port)
3964
3965         # ICMP session
3966         s = sessions[2]
3967         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3968         self.assertEqual(s.in_port, self.icmp_id_in)
3969         self.assertEqual(s.out_port, self.icmp_external_id)
3970
3971     def test_multiple_users(self):
3972         """ Deterministic NAT multiple users """
3973
3974         nat_ip = "10.0.0.10"
3975         port_in = 80
3976         external_port = 6303
3977
3978         host0 = self.pg0.remote_hosts[0]
3979         host1 = self.pg0.remote_hosts[1]
3980
3981         self.vapi.nat_det_add_del_map(host0.ip4n,
3982                                       24,
3983                                       socket.inet_aton(nat_ip),
3984                                       32)
3985         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3986         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3987                                                   is_inside=0)
3988
3989         # host0 to out
3990         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
3991              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
3992              TCP(sport=port_in, dport=external_port))
3993         self.pg0.add_stream(p)
3994         self.pg_enable_capture(self.pg_interfaces)
3995         self.pg_start()
3996         capture = self.pg1.get_capture(1)
3997         p = capture[0]
3998         try:
3999             ip = p[IP]
4000             tcp = p[TCP]
4001             self.assertEqual(ip.src, nat_ip)
4002             self.assertEqual(ip.dst, self.pg1.remote_ip4)
4003             self.assertEqual(tcp.dport, external_port)
4004             port_out0 = tcp.sport
4005         except:
4006             self.logger.error(ppp("Unexpected or invalid packet:", p))
4007             raise
4008
4009         # host1 to out
4010         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4011              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4012              TCP(sport=port_in, dport=external_port))
4013         self.pg0.add_stream(p)
4014         self.pg_enable_capture(self.pg_interfaces)
4015         self.pg_start()
4016         capture = self.pg1.get_capture(1)
4017         p = capture[0]
4018         try:
4019             ip = p[IP]
4020             tcp = p[TCP]
4021             self.assertEqual(ip.src, nat_ip)
4022             self.assertEqual(ip.dst, self.pg1.remote_ip4)
4023             self.assertEqual(tcp.dport, external_port)
4024             port_out1 = tcp.sport
4025         except:
4026             self.logger.error(ppp("Unexpected or invalid packet:", p))
4027             raise
4028
4029         dms = self.vapi.nat_det_map_dump()
4030         self.assertEqual(1, len(dms))
4031         self.assertEqual(2, dms[0].ses_num)
4032
4033         # out to host0
4034         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4035              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4036              TCP(sport=external_port, dport=port_out0))
4037         self.pg1.add_stream(p)
4038         self.pg_enable_capture(self.pg_interfaces)
4039         self.pg_start()
4040         capture = self.pg0.get_capture(1)
4041         p = capture[0]
4042         try:
4043             ip = p[IP]
4044             tcp = p[TCP]
4045             self.assertEqual(ip.src, self.pg1.remote_ip4)
4046             self.assertEqual(ip.dst, host0.ip4)
4047             self.assertEqual(tcp.dport, port_in)
4048             self.assertEqual(tcp.sport, external_port)
4049         except:
4050             self.logger.error(ppp("Unexpected or invalid packet:", p))
4051             raise
4052
4053         # out to host1
4054         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4055              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4056              TCP(sport=external_port, dport=port_out1))
4057         self.pg1.add_stream(p)
4058         self.pg_enable_capture(self.pg_interfaces)
4059         self.pg_start()
4060         capture = self.pg0.get_capture(1)
4061         p = capture[0]
4062         try:
4063             ip = p[IP]
4064             tcp = p[TCP]
4065             self.assertEqual(ip.src, self.pg1.remote_ip4)
4066             self.assertEqual(ip.dst, host1.ip4)
4067             self.assertEqual(tcp.dport, port_in)
4068             self.assertEqual(tcp.sport, external_port)
4069         except:
4070             self.logger.error(ppp("Unexpected or invalid packet", p))
4071             raise
4072
4073         # session close api test
4074         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4075                                             port_out1,
4076                                             self.pg1.remote_ip4n,
4077                                             external_port)
4078         dms = self.vapi.nat_det_map_dump()
4079         self.assertEqual(dms[0].ses_num, 1)
4080
4081         self.vapi.nat_det_close_session_in(host0.ip4n,
4082                                            port_in,
4083                                            self.pg1.remote_ip4n,
4084                                            external_port)
4085         dms = self.vapi.nat_det_map_dump()
4086         self.assertEqual(dms[0].ses_num, 0)
4087
4088     def test_tcp_session_close_detection_in(self):
4089         """ Deterministic NAT TCP session close from inside network """
4090         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4091                                       32,
4092                                       socket.inet_aton(self.nat_addr),
4093                                       32)
4094         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4095         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4096                                                   is_inside=0)
4097
4098         self.initiate_tcp_session(self.pg0, self.pg1)
4099
4100         # close the session from inside
4101         try:
4102             # FIN packet in -> out
4103             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4104                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4105                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4106                      flags="F"))
4107             self.pg0.add_stream(p)
4108             self.pg_enable_capture(self.pg_interfaces)
4109             self.pg_start()
4110             self.pg1.get_capture(1)
4111
4112             pkts = []
4113
4114             # ACK packet out -> in
4115             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4116                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4117                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4118                      flags="A"))
4119             pkts.append(p)
4120
4121             # FIN packet out -> in
4122             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4123                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4124                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4125                      flags="F"))
4126             pkts.append(p)
4127
4128             self.pg1.add_stream(pkts)
4129             self.pg_enable_capture(self.pg_interfaces)
4130             self.pg_start()
4131             self.pg0.get_capture(2)
4132
4133             # ACK packet in -> out
4134             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4135                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4136                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4137                      flags="A"))
4138             self.pg0.add_stream(p)
4139             self.pg_enable_capture(self.pg_interfaces)
4140             self.pg_start()
4141             self.pg1.get_capture(1)
4142
4143             # Check if deterministic NAT44 closed the session
4144             dms = self.vapi.nat_det_map_dump()
4145             self.assertEqual(0, dms[0].ses_num)
4146         except:
4147             self.logger.error("TCP session termination failed")
4148             raise
4149
4150     def test_tcp_session_close_detection_out(self):
4151         """ Deterministic NAT TCP session close from outside network """
4152         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4153                                       32,
4154                                       socket.inet_aton(self.nat_addr),
4155                                       32)
4156         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4157         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4158                                                   is_inside=0)
4159
4160         self.initiate_tcp_session(self.pg0, self.pg1)
4161
4162         # close the session from outside
4163         try:
4164             # FIN packet out -> in
4165             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4166                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4167                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4168                      flags="F"))
4169             self.pg1.add_stream(p)
4170             self.pg_enable_capture(self.pg_interfaces)
4171             self.pg_start()
4172             self.pg0.get_capture(1)
4173
4174             pkts = []
4175
4176             # ACK packet in -> out
4177             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4178                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4179                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4180                      flags="A"))
4181             pkts.append(p)
4182
4183             # ACK packet in -> out
4184             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4185                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4186                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4187                      flags="F"))
4188             pkts.append(p)
4189
4190             self.pg0.add_stream(pkts)
4191             self.pg_enable_capture(self.pg_interfaces)
4192             self.pg_start()
4193             self.pg1.get_capture(2)
4194
4195             # ACK packet out -> in
4196             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4197                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4198                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4199                      flags="A"))
4200             self.pg1.add_stream(p)
4201             self.pg_enable_capture(self.pg_interfaces)
4202             self.pg_start()
4203             self.pg0.get_capture(1)
4204
4205             # Check if deterministic NAT44 closed the session
4206             dms = self.vapi.nat_det_map_dump()
4207             self.assertEqual(0, dms[0].ses_num)
4208         except:
4209             self.logger.error("TCP session termination failed")
4210             raise
4211
4212     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4213     def test_session_timeout(self):
4214         """ Deterministic NAT session timeouts """
4215         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4216                                       32,
4217                                       socket.inet_aton(self.nat_addr),
4218                                       32)
4219         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4220         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4221                                                   is_inside=0)
4222
4223         self.initiate_tcp_session(self.pg0, self.pg1)
4224         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4225         pkts = self.create_stream_in(self.pg0, self.pg1)
4226         self.pg0.add_stream(pkts)
4227         self.pg_enable_capture(self.pg_interfaces)
4228         self.pg_start()
4229         capture = self.pg1.get_capture(len(pkts))
4230         sleep(15)
4231
4232         dms = self.vapi.nat_det_map_dump()
4233         self.assertEqual(0, dms[0].ses_num)
4234
4235     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4236     def test_session_limit_per_user(self):
4237         """ Deterministic NAT maximum sessions per user limit """
4238         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4239                                       32,
4240                                       socket.inet_aton(self.nat_addr),
4241                                       32)
4242         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4243         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4244                                                   is_inside=0)
4245         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4246                                      src_address=self.pg2.local_ip4n,
4247                                      path_mtu=512,
4248                                      template_interval=10)
4249         self.vapi.nat_ipfix()
4250
4251         pkts = []
4252         for port in range(1025, 2025):
4253             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4254                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4255                  UDP(sport=port, dport=port))
4256             pkts.append(p)
4257
4258         self.pg0.add_stream(pkts)
4259         self.pg_enable_capture(self.pg_interfaces)
4260         self.pg_start()
4261         capture = self.pg1.get_capture(len(pkts))
4262
4263         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4264              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4265              UDP(sport=3001, dport=3002))
4266         self.pg0.add_stream(p)
4267         self.pg_enable_capture(self.pg_interfaces)
4268         self.pg_start()
4269         capture = self.pg1.assert_nothing_captured()
4270
4271         # verify ICMP error packet
4272         capture = self.pg0.get_capture(1)
4273         p = capture[0]
4274         self.assertTrue(p.haslayer(ICMP))
4275         icmp = p[ICMP]
4276         self.assertEqual(icmp.type, 3)
4277         self.assertEqual(icmp.code, 1)
4278         self.assertTrue(icmp.haslayer(IPerror))
4279         inner_ip = icmp[IPerror]
4280         self.assertEqual(inner_ip[UDPerror].sport, 3001)
4281         self.assertEqual(inner_ip[UDPerror].dport, 3002)
4282
4283         dms = self.vapi.nat_det_map_dump()
4284
4285         self.assertEqual(1000, dms[0].ses_num)
4286
4287         # verify IPFIX logging
4288         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
4289         sleep(1)
4290         capture = self.pg2.get_capture(2)
4291         ipfix = IPFIXDecoder()
4292         # first load template
4293         for p in capture:
4294             self.assertTrue(p.haslayer(IPFIX))
4295             if p.haslayer(Template):
4296                 ipfix.add_template(p.getlayer(Template))
4297         # verify events in data set
4298         for p in capture:
4299             if p.haslayer(Data):
4300                 data = ipfix.decode_data_set(p.getlayer(Set))
4301                 self.verify_ipfix_max_entries_per_user(data)
4302
4303     def clear_nat_det(self):
4304         """
4305         Clear deterministic NAT configuration.
4306         """
4307         self.vapi.nat_ipfix(enable=0)
4308         self.vapi.nat_det_set_timeouts()
4309         deterministic_mappings = self.vapi.nat_det_map_dump()
4310         for dsm in deterministic_mappings:
4311             self.vapi.nat_det_add_del_map(dsm.in_addr,
4312                                           dsm.in_plen,
4313                                           dsm.out_addr,
4314                                           dsm.out_plen,
4315                                           is_add=0)
4316
4317         interfaces = self.vapi.nat44_interface_dump()
4318         for intf in interfaces:
4319             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4320                                                       intf.is_inside,
4321                                                       is_add=0)
4322
4323     def tearDown(self):
4324         super(TestDeterministicNAT, self).tearDown()
4325         if not self.vpp_dead:
4326             self.logger.info(self.vapi.cli("show nat44 detail"))
4327             self.clear_nat_det()
4328
4329
4330 class TestNAT64(MethodHolder):
4331     """ NAT64 Test Cases """
4332
4333     @classmethod
4334     def setUpConstants(cls):
4335         super(TestNAT64, cls).setUpConstants()
4336         cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4337                                 "nat64 st hash buckets 256", "}"])
4338
4339     @classmethod
4340     def setUpClass(cls):
4341         super(TestNAT64, cls).setUpClass()
4342
4343         try:
4344             cls.tcp_port_in = 6303
4345             cls.tcp_port_out = 6303
4346             cls.udp_port_in = 6304
4347             cls.udp_port_out = 6304
4348             cls.icmp_id_in = 6305
4349             cls.icmp_id_out = 6305
4350             cls.nat_addr = '10.0.0.3'
4351             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4352             cls.vrf1_id = 10
4353             cls.vrf1_nat_addr = '10.0.10.3'
4354             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4355                                                    cls.vrf1_nat_addr)
4356             cls.ipfix_src_port = 4739
4357             cls.ipfix_domain_id = 1
4358
4359             cls.create_pg_interfaces(range(5))
4360             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4361             cls.ip6_interfaces.append(cls.pg_interfaces[2])
4362             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4363
4364             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4365
4366             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4367
4368             cls.pg0.generate_remote_hosts(2)
4369
4370             for i in cls.ip6_interfaces:
4371                 i.admin_up()
4372                 i.config_ip6()
4373                 i.configure_ipv6_neighbors()
4374
4375             for i in cls.ip4_interfaces:
4376                 i.admin_up()
4377                 i.config_ip4()
4378                 i.resolve_arp()
4379
4380             cls.pg3.admin_up()
4381             cls.pg3.config_ip4()
4382             cls.pg3.resolve_arp()
4383             cls.pg3.config_ip6()
4384             cls.pg3.configure_ipv6_neighbors()
4385
4386         except Exception:
4387             super(TestNAT64, cls).tearDownClass()
4388             raise
4389
4390     def test_pool(self):
4391         """ Add/delete address to NAT64 pool """
4392         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4393
4394         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4395
4396         addresses = self.vapi.nat64_pool_addr_dump()
4397         self.assertEqual(len(addresses), 1)
4398         self.assertEqual(addresses[0].address, nat_addr)
4399
4400         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4401
4402         addresses = self.vapi.nat64_pool_addr_dump()
4403         self.assertEqual(len(addresses), 0)
4404
4405     def test_interface(self):
4406         """ Enable/disable NAT64 feature on the interface """
4407         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4408         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4409
4410         interfaces = self.vapi.nat64_interface_dump()
4411         self.assertEqual(len(interfaces), 2)
4412         pg0_found = False
4413         pg1_found = False
4414         for intf in interfaces:
4415             if intf.sw_if_index == self.pg0.sw_if_index:
4416                 self.assertEqual(intf.is_inside, 1)
4417                 pg0_found = True
4418             elif intf.sw_if_index == self.pg1.sw_if_index:
4419                 self.assertEqual(intf.is_inside, 0)
4420                 pg1_found = True
4421         self.assertTrue(pg0_found)
4422         self.assertTrue(pg1_found)
4423
4424         features = self.vapi.cli("show interface features pg0")
4425         self.assertNotEqual(features.find('nat64-in2out'), -1)
4426         features = self.vapi.cli("show interface features pg1")
4427         self.assertNotEqual(features.find('nat64-out2in'), -1)
4428
4429         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4430         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4431
4432         interfaces = self.vapi.nat64_interface_dump()
4433         self.assertEqual(len(interfaces), 0)
4434
4435     def test_static_bib(self):
4436         """ Add/delete static BIB entry """
4437         in_addr = socket.inet_pton(socket.AF_INET6,
4438                                    '2001:db8:85a3::8a2e:370:7334')
4439         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4440         in_port = 1234
4441         out_port = 5678
4442         proto = IP_PROTOS.tcp
4443
4444         self.vapi.nat64_add_del_static_bib(in_addr,
4445                                            out_addr,
4446                                            in_port,
4447                                            out_port,
4448                                            proto)
4449         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4450         static_bib_num = 0
4451         for bibe in bib:
4452             if bibe.is_static:
4453                 static_bib_num += 1
4454                 self.assertEqual(bibe.i_addr, in_addr)
4455                 self.assertEqual(bibe.o_addr, out_addr)
4456                 self.assertEqual(bibe.i_port, in_port)
4457                 self.assertEqual(bibe.o_port, out_port)
4458         self.assertEqual(static_bib_num, 1)
4459
4460         self.vapi.nat64_add_del_static_bib(in_addr,
4461                                            out_addr,
4462                                            in_port,
4463                                            out_port,
4464                                            proto,
4465                                            is_add=0)
4466         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4467         static_bib_num = 0
4468         for bibe in bib:
4469             if bibe.is_static:
4470                 static_bib_num += 1
4471         self.assertEqual(static_bib_num, 0)
4472
4473     def test_set_timeouts(self):
4474         """ Set NAT64 timeouts """
4475         # verify default values
4476         timeouts = self.vapi.nat64_get_timeouts()
4477         self.assertEqual(timeouts.udp, 300)
4478         self.assertEqual(timeouts.icmp, 60)
4479         self.assertEqual(timeouts.tcp_trans, 240)
4480         self.assertEqual(timeouts.tcp_est, 7440)
4481         self.assertEqual(timeouts.tcp_incoming_syn, 6)
4482
4483         # set and verify custom values
4484         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4485                                      tcp_est=7450, tcp_incoming_syn=10)
4486         timeouts = self.vapi.nat64_get_timeouts()
4487         self.assertEqual(timeouts.udp, 200)
4488         self.assertEqual(timeouts.icmp, 30)
4489         self.assertEqual(timeouts.tcp_trans, 250)
4490         self.assertEqual(timeouts.tcp_est, 7450)
4491         self.assertEqual(timeouts.tcp_incoming_syn, 10)
4492
4493     def test_dynamic(self):
4494         """ NAT64 dynamic translation test """
4495         self.tcp_port_in = 6303
4496         self.udp_port_in = 6304
4497         self.icmp_id_in = 6305
4498
4499         ses_num_start = self.nat64_get_ses_num()
4500
4501         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4502                                                 self.nat_addr_n)
4503         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4504         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4505
4506         # in2out
4507         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4508         self.pg0.add_stream(pkts)
4509         self.pg_enable_capture(self.pg_interfaces)
4510         self.pg_start()
4511         capture = self.pg1.get_capture(len(pkts))
4512         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4513                                 dst_ip=self.pg1.remote_ip4)
4514
4515         # out2in
4516         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4517         self.pg1.add_stream(pkts)
4518         self.pg_enable_capture(self.pg_interfaces)
4519         self.pg_start()
4520         capture = self.pg0.get_capture(len(pkts))
4521         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4522         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4523
4524         # in2out
4525         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4526         self.pg0.add_stream(pkts)
4527         self.pg_enable_capture(self.pg_interfaces)
4528         self.pg_start()
4529         capture = self.pg1.get_capture(len(pkts))
4530         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4531                                 dst_ip=self.pg1.remote_ip4)
4532
4533         # out2in
4534         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4535         self.pg1.add_stream(pkts)
4536         self.pg_enable_capture(self.pg_interfaces)
4537         self.pg_start()
4538         capture = self.pg0.get_capture(len(pkts))
4539         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4540
4541         ses_num_end = self.nat64_get_ses_num()
4542
4543         self.assertEqual(ses_num_end - ses_num_start, 3)
4544
4545         # tenant with specific VRF
4546         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4547                                                 self.vrf1_nat_addr_n,
4548                                                 vrf_id=self.vrf1_id)
4549         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4550
4551         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4552         self.pg2.add_stream(pkts)
4553         self.pg_enable_capture(self.pg_interfaces)
4554         self.pg_start()
4555         capture = self.pg1.get_capture(len(pkts))
4556         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4557                                 dst_ip=self.pg1.remote_ip4)
4558
4559         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4560         self.pg1.add_stream(pkts)
4561         self.pg_enable_capture(self.pg_interfaces)
4562         self.pg_start()
4563         capture = self.pg2.get_capture(len(pkts))
4564         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4565
4566     def test_static(self):
4567         """ NAT64 static translation test """
4568         self.tcp_port_in = 60303
4569         self.udp_port_in = 60304
4570         self.icmp_id_in = 60305
4571         self.tcp_port_out = 60303
4572         self.udp_port_out = 60304
4573         self.icmp_id_out = 60305
4574
4575         ses_num_start = self.nat64_get_ses_num()
4576
4577         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4578                                                 self.nat_addr_n)
4579         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4580         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4581
4582         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4583                                            self.nat_addr_n,
4584                                            self.tcp_port_in,
4585                                            self.tcp_port_out,
4586                                            IP_PROTOS.tcp)
4587         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4588                                            self.nat_addr_n,
4589                                            self.udp_port_in,
4590                                            self.udp_port_out,
4591                                            IP_PROTOS.udp)
4592         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4593                                            self.nat_addr_n,
4594                                            self.icmp_id_in,
4595                                            self.icmp_id_out,
4596                                            IP_PROTOS.icmp)
4597
4598         # in2out
4599         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4600         self.pg0.add_stream(pkts)
4601         self.pg_enable_capture(self.pg_interfaces)
4602         self.pg_start()
4603         capture = self.pg1.get_capture(len(pkts))
4604         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4605                                 dst_ip=self.pg1.remote_ip4, same_port=True)
4606
4607         # out2in
4608         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4609         self.pg1.add_stream(pkts)
4610         self.pg_enable_capture(self.pg_interfaces)
4611         self.pg_start()
4612         capture = self.pg0.get_capture(len(pkts))
4613         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4614         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4615
4616         ses_num_end = self.nat64_get_ses_num()
4617
4618         self.assertEqual(ses_num_end - ses_num_start, 3)
4619
4620     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4621     def test_session_timeout(self):
4622         """ NAT64 session timeout """
4623         self.icmp_id_in = 1234
4624         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4625                                                 self.nat_addr_n)
4626         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4627         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4628         self.vapi.nat64_set_timeouts(icmp=5)
4629
4630         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4631         self.pg0.add_stream(pkts)
4632         self.pg_enable_capture(self.pg_interfaces)
4633         self.pg_start()
4634         capture = self.pg1.get_capture(len(pkts))
4635
4636         ses_num_before_timeout = self.nat64_get_ses_num()
4637
4638         sleep(15)
4639
4640         # ICMP session after timeout
4641         ses_num_after_timeout = self.nat64_get_ses_num()
4642         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4643
4644     def test_icmp_error(self):
4645         """ NAT64 ICMP Error message translation """
4646         self.tcp_port_in = 6303
4647         self.udp_port_in = 6304
4648         self.icmp_id_in = 6305
4649
4650         ses_num_start = self.nat64_get_ses_num()
4651
4652         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4653                                                 self.nat_addr_n)
4654         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4655         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4656
4657         # send some packets to create sessions
4658         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4659         self.pg0.add_stream(pkts)
4660         self.pg_enable_capture(self.pg_interfaces)
4661         self.pg_start()
4662         capture_ip4 = self.pg1.get_capture(len(pkts))
4663         self.verify_capture_out(capture_ip4,
4664                                 nat_ip=self.nat_addr,
4665                                 dst_ip=self.pg1.remote_ip4)
4666
4667         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4668         self.pg1.add_stream(pkts)
4669         self.pg_enable_capture(self.pg_interfaces)
4670         self.pg_start()
4671         capture_ip6 = self.pg0.get_capture(len(pkts))
4672         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4673         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4674                                    self.pg0.remote_ip6)
4675
4676         # in2out
4677         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4678                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4679                 ICMPv6DestUnreach(code=1) /
4680                 packet[IPv6] for packet in capture_ip6]
4681         self.pg0.add_stream(pkts)
4682         self.pg_enable_capture(self.pg_interfaces)
4683         self.pg_start()
4684         capture = self.pg1.get_capture(len(pkts))
4685         for packet in capture:
4686             try:
4687                 self.assertEqual(packet[IP].src, self.nat_addr)
4688                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4689                 self.assertEqual(packet[ICMP].type, 3)
4690                 self.assertEqual(packet[ICMP].code, 13)
4691                 inner = packet[IPerror]
4692                 self.assertEqual(inner.src, self.pg1.remote_ip4)
4693                 self.assertEqual(inner.dst, self.nat_addr)
4694                 self.check_icmp_checksum(packet)
4695                 if inner.haslayer(TCPerror):
4696                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4697                 elif inner.haslayer(UDPerror):
4698                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4699                 else:
4700                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4701             except:
4702                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4703                 raise
4704
4705         # out2in
4706         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4707                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4708                 ICMP(type=3, code=13) /
4709                 packet[IP] for packet in capture_ip4]
4710         self.pg1.add_stream(pkts)
4711         self.pg_enable_capture(self.pg_interfaces)
4712         self.pg_start()
4713         capture = self.pg0.get_capture(len(pkts))
4714         for packet in capture:
4715             try:
4716                 self.assertEqual(packet[IPv6].src, ip.src)
4717                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4718                 icmp = packet[ICMPv6DestUnreach]
4719                 self.assertEqual(icmp.code, 1)
4720                 inner = icmp[IPerror6]
4721                 self.assertEqual(inner.src, self.pg0.remote_ip6)
4722                 self.assertEqual(inner.dst, ip.src)
4723                 self.check_icmpv6_checksum(packet)
4724                 if inner.haslayer(TCPerror):
4725                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4726                 elif inner.haslayer(UDPerror):
4727                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4728                 else:
4729                     self.assertEqual(inner[ICMPv6EchoRequest].id,
4730                                      self.icmp_id_in)
4731             except:
4732                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4733                 raise
4734
4735     def test_hairpinning(self):
4736         """ NAT64 hairpinning """
4737
4738         client = self.pg0.remote_hosts[0]
4739         server = self.pg0.remote_hosts[1]
4740         server_tcp_in_port = 22
4741         server_tcp_out_port = 4022
4742         server_udp_in_port = 23
4743         server_udp_out_port = 4023
4744         client_tcp_in_port = 1234
4745         client_udp_in_port = 1235
4746         client_tcp_out_port = 0
4747         client_udp_out_port = 0
4748         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4749         nat_addr_ip6 = ip.src
4750
4751         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4752                                                 self.nat_addr_n)
4753         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4754         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4755
4756         self.vapi.nat64_add_del_static_bib(server.ip6n,
4757                                            self.nat_addr_n,
4758                                            server_tcp_in_port,
4759                                            server_tcp_out_port,
4760                                            IP_PROTOS.tcp)
4761         self.vapi.nat64_add_del_static_bib(server.ip6n,
4762                                            self.nat_addr_n,
4763                                            server_udp_in_port,
4764                                            server_udp_out_port,
4765                                            IP_PROTOS.udp)
4766
4767         # client to server
4768         pkts = []
4769         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4770              IPv6(src=client.ip6, dst=nat_addr_ip6) /
4771              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4772         pkts.append(p)
4773         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4774              IPv6(src=client.ip6, dst=nat_addr_ip6) /
4775              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
4776         pkts.append(p)
4777         self.pg0.add_stream(pkts)
4778         self.pg_enable_capture(self.pg_interfaces)
4779         self.pg_start()
4780         capture = self.pg0.get_capture(len(pkts))
4781         for packet in capture:
4782             try:
4783                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4784                 self.assertEqual(packet[IPv6].dst, server.ip6)
4785                 if packet.haslayer(TCP):
4786                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
4787                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
4788                     self.check_tcp_checksum(packet)
4789                     client_tcp_out_port = packet[TCP].sport
4790                 else:
4791                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
4792                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
4793                     self.check_udp_checksum(packet)
4794                     client_udp_out_port = packet[UDP].sport
4795             except:
4796                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4797                 raise
4798
4799         # server to client
4800         pkts = []
4801         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4802              IPv6(src=server.ip6, dst=nat_addr_ip6) /
4803              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
4804         pkts.append(p)
4805         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4806              IPv6(src=server.ip6, dst=nat_addr_ip6) /
4807              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
4808         pkts.append(p)
4809         self.pg0.add_stream(pkts)
4810         self.pg_enable_capture(self.pg_interfaces)
4811         self.pg_start()
4812         capture = self.pg0.get_capture(len(pkts))
4813         for packet in capture:
4814             try:
4815                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4816                 self.assertEqual(packet[IPv6].dst, client.ip6)
4817                 if packet.haslayer(TCP):
4818                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
4819                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
4820                     self.check_tcp_checksum(packet)
4821                 else:
4822                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
4823                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
4824                     self.check_udp_checksum(packet)
4825             except:
4826                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4827                 raise
4828
4829         # ICMP error
4830         pkts = []
4831         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4832                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4833                 ICMPv6DestUnreach(code=1) /
4834                 packet[IPv6] for packet in capture]
4835         self.pg0.add_stream(pkts)
4836         self.pg_enable_capture(self.pg_interfaces)
4837         self.pg_start()
4838         capture = self.pg0.get_capture(len(pkts))
4839         for packet in capture:
4840             try:
4841                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4842                 self.assertEqual(packet[IPv6].dst, server.ip6)
4843                 icmp = packet[ICMPv6DestUnreach]
4844                 self.assertEqual(icmp.code, 1)
4845                 inner = icmp[IPerror6]
4846                 self.assertEqual(inner.src, server.ip6)
4847                 self.assertEqual(inner.dst, nat_addr_ip6)
4848                 self.check_icmpv6_checksum(packet)
4849                 if inner.haslayer(TCPerror):
4850                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
4851                     self.assertEqual(inner[TCPerror].dport,
4852                                      client_tcp_out_port)
4853                 else:
4854                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
4855                     self.assertEqual(inner[UDPerror].dport,
4856                                      client_udp_out_port)
4857             except:
4858                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4859                 raise
4860
4861     def test_prefix(self):
4862         """ NAT64 Network-Specific Prefix """
4863
4864         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4865                                                 self.nat_addr_n)
4866         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4867         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4868         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4869                                                 self.vrf1_nat_addr_n,
4870                                                 vrf_id=self.vrf1_id)
4871         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4872
4873         # Add global prefix
4874         global_pref64 = "2001:db8::"
4875         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
4876         global_pref64_len = 32
4877         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
4878
4879         prefix = self.vapi.nat64_prefix_dump()
4880         self.assertEqual(len(prefix), 1)
4881         self.assertEqual(prefix[0].prefix, global_pref64_n)
4882         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
4883         self.assertEqual(prefix[0].vrf_id, 0)
4884
4885         # Add tenant specific prefix
4886         vrf1_pref64 = "2001:db8:122:300::"
4887         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
4888         vrf1_pref64_len = 56
4889         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
4890                                        vrf1_pref64_len,
4891                                        vrf_id=self.vrf1_id)
4892         prefix = self.vapi.nat64_prefix_dump()
4893         self.assertEqual(len(prefix), 2)
4894
4895         # Global prefix
4896         pkts = self.create_stream_in_ip6(self.pg0,
4897                                          self.pg1,
4898                                          pref=global_pref64,
4899                                          plen=global_pref64_len)
4900         self.pg0.add_stream(pkts)
4901         self.pg_enable_capture(self.pg_interfaces)
4902         self.pg_start()
4903         capture = self.pg1.get_capture(len(pkts))
4904         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4905                                 dst_ip=self.pg1.remote_ip4)
4906
4907         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4908         self.pg1.add_stream(pkts)
4909         self.pg_enable_capture(self.pg_interfaces)
4910         self.pg_start()
4911         capture = self.pg0.get_capture(len(pkts))
4912         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4913                                   global_pref64,
4914                                   global_pref64_len)
4915         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
4916
4917         # Tenant specific prefix
4918         pkts = self.create_stream_in_ip6(self.pg2,
4919                                          self.pg1,
4920                                          pref=vrf1_pref64,
4921                                          plen=vrf1_pref64_len)
4922         self.pg2.add_stream(pkts)
4923         self.pg_enable_capture(self.pg_interfaces)
4924         self.pg_start()
4925         capture = self.pg1.get_capture(len(pkts))
4926         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4927                                 dst_ip=self.pg1.remote_ip4)
4928
4929         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4930         self.pg1.add_stream(pkts)
4931         self.pg_enable_capture(self.pg_interfaces)
4932         self.pg_start()
4933         capture = self.pg2.get_capture(len(pkts))
4934         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4935                                   vrf1_pref64,
4936                                   vrf1_pref64_len)
4937         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
4938
4939     def test_unknown_proto(self):
4940         """ NAT64 translate packet with unknown protocol """
4941
4942         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4943                                                 self.nat_addr_n)
4944         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4945         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4946         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4947
4948         # in2out
4949         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4950              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
4951              TCP(sport=self.tcp_port_in, dport=20))
4952         self.pg0.add_stream(p)
4953         self.pg_enable_capture(self.pg_interfaces)
4954         self.pg_start()
4955         p = self.pg1.get_capture(1)
4956
4957         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4958              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
4959              GRE() /
4960              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4961              TCP(sport=1234, dport=1234))
4962         self.pg0.add_stream(p)
4963         self.pg_enable_capture(self.pg_interfaces)
4964         self.pg_start()
4965         p = self.pg1.get_capture(1)
4966         packet = p[0]
4967         try:
4968             self.assertEqual(packet[IP].src, self.nat_addr)
4969             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4970             self.assertTrue(packet.haslayer(GRE))
4971             self.check_ip_checksum(packet)
4972         except:
4973             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4974             raise
4975
4976         # out2in
4977         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4978              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4979              GRE() /
4980              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4981              TCP(sport=1234, dport=1234))
4982         self.pg1.add_stream(p)
4983         self.pg_enable_capture(self.pg_interfaces)
4984         self.pg_start()
4985         p = self.pg0.get_capture(1)
4986         packet = p[0]
4987         try:
4988             self.assertEqual(packet[IPv6].src, remote_ip6)
4989             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4990             self.assertEqual(packet[IPv6].nh, 47)
4991         except:
4992             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4993             raise
4994
4995     def test_hairpinning_unknown_proto(self):
4996         """ NAT64 translate packet with unknown protocol - hairpinning """
4997
4998         client = self.pg0.remote_hosts[0]
4999         server = self.pg0.remote_hosts[1]
5000         server_tcp_in_port = 22
5001         server_tcp_out_port = 4022
5002         client_tcp_in_port = 1234
5003         client_tcp_out_port = 1235
5004         server_nat_ip = "10.0.0.100"
5005         client_nat_ip = "10.0.0.110"
5006         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5007         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5008         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5009         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5010
5011         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5012                                                 client_nat_ip_n)
5013         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5014         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5015
5016         self.vapi.nat64_add_del_static_bib(server.ip6n,
5017                                            server_nat_ip_n,
5018                                            server_tcp_in_port,
5019                                            server_tcp_out_port,
5020                                            IP_PROTOS.tcp)
5021
5022         self.vapi.nat64_add_del_static_bib(server.ip6n,
5023                                            server_nat_ip_n,
5024                                            0,
5025                                            0,
5026                                            IP_PROTOS.gre)
5027
5028         self.vapi.nat64_add_del_static_bib(client.ip6n,
5029                                            client_nat_ip_n,
5030                                            client_tcp_in_port,
5031                                            client_tcp_out_port,
5032                                            IP_PROTOS.tcp)
5033
5034         # client to server
5035         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5036              IPv6(src=client.ip6, dst=server_nat_ip6) /
5037              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5038         self.pg0.add_stream(p)
5039         self.pg_enable_capture(self.pg_interfaces)
5040         self.pg_start()
5041         p = self.pg0.get_capture(1)
5042
5043         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5044              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5045              GRE() /
5046              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5047              TCP(sport=1234, dport=1234))
5048         self.pg0.add_stream(p)
5049         self.pg_enable_capture(self.pg_interfaces)
5050         self.pg_start()
5051         p = self.pg0.get_capture(1)
5052         packet = p[0]
5053         try:
5054             self.assertEqual(packet[IPv6].src, client_nat_ip6)
5055             self.assertEqual(packet[IPv6].dst, server.ip6)
5056             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5057         except:
5058             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5059             raise
5060
5061         # server to client
5062         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5063              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5064              GRE() /
5065              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5066              TCP(sport=1234, dport=1234))
5067         self.pg0.add_stream(p)
5068         self.pg_enable_capture(self.pg_interfaces)
5069         self.pg_start()
5070         p = self.pg0.get_capture(1)
5071         packet = p[0]
5072         try:
5073             self.assertEqual(packet[IPv6].src, server_nat_ip6)
5074             self.assertEqual(packet[IPv6].dst, client.ip6)
5075             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5076         except:
5077             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5078             raise
5079
5080     def test_one_armed_nat64(self):
5081         """ One armed NAT64 """
5082         external_port = 0
5083         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5084                                            '64:ff9b::',
5085                                            96)
5086
5087         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5088                                                 self.nat_addr_n)
5089         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5090         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5091
5092         # in2out
5093         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5094              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5095              TCP(sport=12345, dport=80))
5096         self.pg3.add_stream(p)
5097         self.pg_enable_capture(self.pg_interfaces)
5098         self.pg_start()
5099         capture = self.pg3.get_capture(1)
5100         p = capture[0]
5101         try:
5102             ip = p[IP]
5103             tcp = p[TCP]
5104             self.assertEqual(ip.src, self.nat_addr)
5105             self.assertEqual(ip.dst, self.pg3.remote_ip4)
5106             self.assertNotEqual(tcp.sport, 12345)
5107             external_port = tcp.sport
5108             self.assertEqual(tcp.dport, 80)
5109             self.check_tcp_checksum(p)
5110             self.check_ip_checksum(p)
5111         except:
5112             self.logger.error(ppp("Unexpected or invalid packet:", p))
5113             raise
5114
5115         # out2in
5116         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5117              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5118              TCP(sport=80, dport=external_port))
5119         self.pg3.add_stream(p)
5120         self.pg_enable_capture(self.pg_interfaces)
5121         self.pg_start()
5122         capture = self.pg3.get_capture(1)
5123         p = capture[0]
5124         try:
5125             ip = p[IPv6]
5126             tcp = p[TCP]
5127             self.assertEqual(ip.src, remote_host_ip6)
5128             self.assertEqual(ip.dst, self.pg3.remote_ip6)
5129             self.assertEqual(tcp.sport, 80)
5130             self.assertEqual(tcp.dport, 12345)
5131             self.check_tcp_checksum(p)
5132         except:
5133             self.logger.error(ppp("Unexpected or invalid packet:", p))
5134             raise
5135
5136     def test_frag_in_order(self):
5137         """ NAT64 translate fragments arriving in order """
5138         self.tcp_port_in = random.randint(1025, 65535)
5139
5140         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5141                                                 self.nat_addr_n)
5142         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5143         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5144
5145         reass = self.vapi.nat_reass_dump()
5146         reass_n_start = len(reass)
5147
5148         # in2out
5149         data = 'a' * 200
5150         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5151                                            self.tcp_port_in, 20, data)
5152         self.pg0.add_stream(pkts)
5153         self.pg_enable_capture(self.pg_interfaces)
5154         self.pg_start()
5155         frags = self.pg1.get_capture(len(pkts))
5156         p = self.reass_frags_and_verify(frags,
5157                                         self.nat_addr,
5158                                         self.pg1.remote_ip4)
5159         self.assertEqual(p[TCP].dport, 20)
5160         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5161         self.tcp_port_out = p[TCP].sport
5162         self.assertEqual(data, p[Raw].load)
5163
5164         # out2in
5165         data = "A" * 4 + "b" * 16 + "C" * 3
5166         pkts = self.create_stream_frag(self.pg1,
5167                                        self.nat_addr,
5168                                        20,
5169                                        self.tcp_port_out,
5170                                        data)
5171         self.pg1.add_stream(pkts)
5172         self.pg_enable_capture(self.pg_interfaces)
5173         self.pg_start()
5174         frags = self.pg0.get_capture(len(pkts))
5175         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5176         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5177         self.assertEqual(p[TCP].sport, 20)
5178         self.assertEqual(p[TCP].dport, self.tcp_port_in)
5179         self.assertEqual(data, p[Raw].load)
5180
5181         reass = self.vapi.nat_reass_dump()
5182         reass_n_end = len(reass)
5183
5184         self.assertEqual(reass_n_end - reass_n_start, 2)
5185
5186     def test_reass_hairpinning(self):
5187         """ NAT64 fragments hairpinning """
5188         data = 'a' * 200
5189         client = self.pg0.remote_hosts[0]
5190         server = self.pg0.remote_hosts[1]
5191         server_in_port = random.randint(1025, 65535)
5192         server_out_port = random.randint(1025, 65535)
5193         client_in_port = random.randint(1025, 65535)
5194         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5195         nat_addr_ip6 = ip.src
5196
5197         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5198                                                 self.nat_addr_n)
5199         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5200         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5201
5202         # add static BIB entry for server
5203         self.vapi.nat64_add_del_static_bib(server.ip6n,
5204                                            self.nat_addr_n,
5205                                            server_in_port,
5206                                            server_out_port,
5207                                            IP_PROTOS.tcp)
5208
5209         # send packet from host to server
5210         pkts = self.create_stream_frag_ip6(self.pg0,
5211                                            self.nat_addr,
5212                                            client_in_port,
5213                                            server_out_port,
5214                                            data)
5215         self.pg0.add_stream(pkts)
5216         self.pg_enable_capture(self.pg_interfaces)
5217         self.pg_start()
5218         frags = self.pg0.get_capture(len(pkts))
5219         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5220         self.assertNotEqual(p[TCP].sport, client_in_port)
5221         self.assertEqual(p[TCP].dport, server_in_port)
5222         self.assertEqual(data, p[Raw].load)
5223
5224     def test_frag_out_of_order(self):
5225         """ NAT64 translate fragments arriving out of order """
5226         self.tcp_port_in = random.randint(1025, 65535)
5227
5228         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5229                                                 self.nat_addr_n)
5230         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5231         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5232
5233         # in2out
5234         data = 'a' * 200
5235         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5236                                            self.tcp_port_in, 20, data)
5237         pkts.reverse()
5238         self.pg0.add_stream(pkts)
5239         self.pg_enable_capture(self.pg_interfaces)
5240         self.pg_start()
5241         frags = self.pg1.get_capture(len(pkts))
5242         p = self.reass_frags_and_verify(frags,
5243                                         self.nat_addr,
5244                                         self.pg1.remote_ip4)
5245         self.assertEqual(p[TCP].dport, 20)
5246         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5247         self.tcp_port_out = p[TCP].sport
5248         self.assertEqual(data, p[Raw].load)
5249
5250         # out2in
5251         data = "A" * 4 + "B" * 16 + "C" * 3
5252         pkts = self.create_stream_frag(self.pg1,
5253                                        self.nat_addr,
5254                                        20,
5255                                        self.tcp_port_out,
5256                                        data)
5257         pkts.reverse()
5258         self.pg1.add_stream(pkts)
5259         self.pg_enable_capture(self.pg_interfaces)
5260         self.pg_start()
5261         frags = self.pg0.get_capture(len(pkts))
5262         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5263         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5264         self.assertEqual(p[TCP].sport, 20)
5265         self.assertEqual(p[TCP].dport, self.tcp_port_in)
5266         self.assertEqual(data, p[Raw].load)
5267
5268     def test_interface_addr(self):
5269         """ Acquire NAT64 pool addresses from interface """
5270         self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5271
5272         # no address in NAT64 pool
5273         adresses = self.vapi.nat44_address_dump()
5274         self.assertEqual(0, len(adresses))
5275
5276         # configure interface address and check NAT64 address pool
5277         self.pg4.config_ip4()
5278         addresses = self.vapi.nat64_pool_addr_dump()
5279         self.assertEqual(len(addresses), 1)
5280         self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5281
5282         # remove interface address and check NAT64 address pool
5283         self.pg4.unconfig_ip4()
5284         addresses = self.vapi.nat64_pool_addr_dump()
5285         self.assertEqual(0, len(adresses))
5286
5287     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5288     def test_ipfix_max_bibs_sessions(self):
5289         """ IPFIX logging maximum session and BIB entries exceeded """
5290         max_bibs = 1280
5291         max_sessions = 2560
5292         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5293                                            '64:ff9b::',
5294                                            96)
5295
5296         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5297                                                 self.nat_addr_n)
5298         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5299         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5300
5301         pkts = []
5302         src = ""
5303         for i in range(0, max_bibs):
5304             src = "fd01:aa::%x" % (i)
5305             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5306                  IPv6(src=src, dst=remote_host_ip6) /
5307                  TCP(sport=12345, dport=80))
5308             pkts.append(p)
5309             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5310                  IPv6(src=src, dst=remote_host_ip6) /
5311                  TCP(sport=12345, dport=22))
5312             pkts.append(p)
5313         self.pg0.add_stream(pkts)
5314         self.pg_enable_capture(self.pg_interfaces)
5315         self.pg_start()
5316         self.pg1.get_capture(max_sessions)
5317
5318         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5319                                      src_address=self.pg3.local_ip4n,
5320                                      path_mtu=512,
5321                                      template_interval=10)
5322         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5323                             src_port=self.ipfix_src_port)
5324
5325         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5326              IPv6(src=src, dst=remote_host_ip6) /
5327              TCP(sport=12345, dport=25))
5328         self.pg0.add_stream(p)
5329         self.pg_enable_capture(self.pg_interfaces)
5330         self.pg_start()
5331         self.pg1.get_capture(0)
5332         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5333         capture = self.pg3.get_capture(9)
5334         ipfix = IPFIXDecoder()
5335         # first load template
5336         for p in capture:
5337             self.assertTrue(p.haslayer(IPFIX))
5338             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5339             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5340             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5341             self.assertEqual(p[UDP].dport, 4739)
5342             self.assertEqual(p[IPFIX].observationDomainID,
5343                              self.ipfix_domain_id)
5344             if p.haslayer(Template):
5345                 ipfix.add_template(p.getlayer(Template))
5346         # verify events in data set
5347         for p in capture:
5348             if p.haslayer(Data):
5349                 data = ipfix.decode_data_set(p.getlayer(Set))
5350                 self.verify_ipfix_max_sessions(data, max_sessions)
5351
5352         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5353              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5354              TCP(sport=12345, dport=80))
5355         self.pg0.add_stream(p)
5356         self.pg_enable_capture(self.pg_interfaces)
5357         self.pg_start()
5358         self.pg1.get_capture(0)
5359         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5360         capture = self.pg3.get_capture(1)
5361         # verify events in data set
5362         for p in capture:
5363             self.assertTrue(p.haslayer(IPFIX))
5364             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5365             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5366             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5367             self.assertEqual(p[UDP].dport, 4739)
5368             self.assertEqual(p[IPFIX].observationDomainID,
5369                              self.ipfix_domain_id)
5370             if p.haslayer(Data):
5371                 data = ipfix.decode_data_set(p.getlayer(Set))
5372                 self.verify_ipfix_max_bibs(data, max_bibs)
5373
5374     def test_ipfix_max_frags(self):
5375         """ IPFIX logging maximum fragments pending reassembly exceeded """
5376         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5377                                                 self.nat_addr_n)
5378         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5379         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5380         self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5381         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5382                                      src_address=self.pg3.local_ip4n,
5383                                      path_mtu=512,
5384                                      template_interval=10)
5385         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5386                             src_port=self.ipfix_src_port)
5387
5388         data = 'a' * 200
5389         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5390                                            self.tcp_port_in, 20, data)
5391         self.pg0.add_stream(pkts[-1])
5392         self.pg_enable_capture(self.pg_interfaces)
5393         self.pg_start()
5394         self.pg1.get_capture(0)
5395         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5396         capture = self.pg3.get_capture(9)
5397         ipfix = IPFIXDecoder()
5398         # first load template
5399         for p in capture:
5400             self.assertTrue(p.haslayer(IPFIX))
5401             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5402             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5403             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5404             self.assertEqual(p[UDP].dport, 4739)
5405             self.assertEqual(p[IPFIX].observationDomainID,
5406                              self.ipfix_domain_id)
5407             if p.haslayer(Template):
5408                 ipfix.add_template(p.getlayer(Template))
5409         # verify events in data set
5410         for p in capture:
5411             if p.haslayer(Data):
5412                 data = ipfix.decode_data_set(p.getlayer(Set))
5413                 self.verify_ipfix_max_fragments_ip6(data, 0,
5414                                                     self.pg0.remote_ip6n)
5415
5416     def test_ipfix_bib_ses(self):
5417         """ IPFIX logging NAT64 BIB/session create and delete events """
5418         self.tcp_port_in = random.randint(1025, 65535)
5419         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5420                                            '64:ff9b::',
5421                                            96)
5422
5423         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5424                                                 self.nat_addr_n)
5425         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5426         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5427         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5428                                      src_address=self.pg3.local_ip4n,
5429                                      path_mtu=512,
5430                                      template_interval=10)
5431         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5432                             src_port=self.ipfix_src_port)
5433
5434         # Create
5435         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5436              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5437              TCP(sport=self.tcp_port_in, dport=25))
5438         self.pg0.add_stream(p)
5439         self.pg_enable_capture(self.pg_interfaces)
5440         self.pg_start()
5441         p = self.pg1.get_capture(1)
5442         self.tcp_port_out = p[0][TCP].sport
5443         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5444         capture = self.pg3.get_capture(10)
5445         ipfix = IPFIXDecoder()
5446         # first load template
5447         for p in capture:
5448             self.assertTrue(p.haslayer(IPFIX))
5449             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5450             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5451             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5452             self.assertEqual(p[UDP].dport, 4739)
5453             self.assertEqual(p[IPFIX].observationDomainID,
5454                              self.ipfix_domain_id)
5455             if p.haslayer(Template):
5456                 ipfix.add_template(p.getlayer(Template))
5457         # verify events in data set
5458         for p in capture:
5459             if p.haslayer(Data):
5460                 data = ipfix.decode_data_set(p.getlayer(Set))
5461                 if ord(data[0][230]) == 10:
5462                     self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5463                 elif ord(data[0][230]) == 6:
5464                     self.verify_ipfix_nat64_ses(data,
5465                                                 1,
5466                                                 self.pg0.remote_ip6n,
5467                                                 self.pg1.remote_ip4,
5468                                                 25)
5469                 else:
5470                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
5471
5472         # Delete
5473         self.pg_enable_capture(self.pg_interfaces)
5474         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5475                                                 self.nat_addr_n,
5476                                                 is_add=0)
5477         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5478         capture = self.pg3.get_capture(2)
5479         # verify events in data set
5480         for p in capture:
5481             self.assertTrue(p.haslayer(IPFIX))
5482             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5483             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5484             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5485             self.assertEqual(p[UDP].dport, 4739)
5486             self.assertEqual(p[IPFIX].observationDomainID,
5487                              self.ipfix_domain_id)
5488             if p.haslayer(Data):
5489                 data = ipfix.decode_data_set(p.getlayer(Set))
5490                 if ord(data[0][230]) == 11:
5491                     self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5492                 elif ord(data[0][230]) == 7:
5493                     self.verify_ipfix_nat64_ses(data,
5494                                                 0,
5495                                                 self.pg0.remote_ip6n,
5496                                                 self.pg1.remote_ip4,
5497                                                 25)
5498                 else:
5499                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
5500
5501     def nat64_get_ses_num(self):
5502         """
5503         Return number of active NAT64 sessions.
5504         """
5505         st = self.vapi.nat64_st_dump()
5506         return len(st)
5507
5508     def clear_nat64(self):
5509         """
5510         Clear NAT64 configuration.
5511         """
5512         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5513                             domain_id=self.ipfix_domain_id)
5514         self.ipfix_src_port = 4739
5515         self.ipfix_domain_id = 1
5516
5517         self.vapi.nat64_set_timeouts()
5518
5519         interfaces = self.vapi.nat64_interface_dump()
5520         for intf in interfaces:
5521             if intf.is_inside > 1:
5522                 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5523                                                   0,
5524                                                   is_add=0)
5525             self.vapi.nat64_add_del_interface(intf.sw_if_index,
5526                                               intf.is_inside,
5527                                               is_add=0)
5528
5529         bib = self.vapi.nat64_bib_dump(255)
5530         for bibe in bib:
5531             if bibe.is_static:
5532                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5533                                                    bibe.o_addr,
5534                                                    bibe.i_port,
5535                                                    bibe.o_port,
5536                                                    bibe.proto,
5537                                                    bibe.vrf_id,
5538                                                    is_add=0)
5539
5540         adresses = self.vapi.nat64_pool_addr_dump()
5541         for addr in adresses:
5542             self.vapi.nat64_add_del_pool_addr_range(addr.address,
5543                                                     addr.address,
5544                                                     vrf_id=addr.vrf_id,
5545                                                     is_add=0)
5546
5547         prefixes = self.vapi.nat64_prefix_dump()
5548         for prefix in prefixes:
5549             self.vapi.nat64_add_del_prefix(prefix.prefix,
5550                                            prefix.prefix_len,
5551                                            vrf_id=prefix.vrf_id,
5552                                            is_add=0)
5553
5554     def tearDown(self):
5555         super(TestNAT64, self).tearDown()
5556         if not self.vpp_dead:
5557             self.logger.info(self.vapi.cli("show nat64 pool"))
5558             self.logger.info(self.vapi.cli("show nat64 interfaces"))
5559             self.logger.info(self.vapi.cli("show nat64 prefix"))
5560             self.logger.info(self.vapi.cli("show nat64 bib all"))
5561             self.logger.info(self.vapi.cli("show nat64 session table all"))
5562             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5563             self.clear_nat64()
5564
5565
5566 class TestDSlite(MethodHolder):
5567     """ DS-Lite Test Cases """
5568
5569     @classmethod
5570     def setUpClass(cls):
5571         super(TestDSlite, cls).setUpClass()
5572
5573         try:
5574             cls.nat_addr = '10.0.0.3'
5575             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5576
5577             cls.create_pg_interfaces(range(2))
5578             cls.pg0.admin_up()
5579             cls.pg0.config_ip4()
5580             cls.pg0.resolve_arp()
5581             cls.pg1.admin_up()
5582             cls.pg1.config_ip6()
5583             cls.pg1.generate_remote_hosts(2)
5584             cls.pg1.configure_ipv6_neighbors()
5585
5586         except Exception:
5587             super(TestDSlite, cls).tearDownClass()
5588             raise
5589
5590     def test_dslite(self):
5591         """ Test DS-Lite """
5592         self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5593                                                  self.nat_addr_n)
5594         aftr_ip4 = '192.0.0.1'
5595         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5596         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5597         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5598         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5599
5600         # UDP
5601         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5602              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
5603              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5604              UDP(sport=20000, dport=10000))
5605         self.pg1.add_stream(p)
5606         self.pg_enable_capture(self.pg_interfaces)
5607         self.pg_start()
5608         capture = self.pg0.get_capture(1)
5609         capture = capture[0]
5610         self.assertFalse(capture.haslayer(IPv6))
5611         self.assertEqual(capture[IP].src, self.nat_addr)
5612         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5613         self.assertNotEqual(capture[UDP].sport, 20000)
5614         self.assertEqual(capture[UDP].dport, 10000)
5615         self.check_ip_checksum(capture)
5616         out_port = capture[UDP].sport
5617
5618         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5619              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5620              UDP(sport=10000, dport=out_port))
5621         self.pg0.add_stream(p)
5622         self.pg_enable_capture(self.pg_interfaces)
5623         self.pg_start()
5624         capture = self.pg1.get_capture(1)
5625         capture = capture[0]
5626         self.assertEqual(capture[IPv6].src, aftr_ip6)
5627         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5628         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5629         self.assertEqual(capture[IP].dst, '192.168.1.1')
5630         self.assertEqual(capture[UDP].sport, 10000)
5631         self.assertEqual(capture[UDP].dport, 20000)
5632         self.check_ip_checksum(capture)
5633
5634         # TCP
5635         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5636              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5637              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5638              TCP(sport=20001, dport=10001))
5639         self.pg1.add_stream(p)
5640         self.pg_enable_capture(self.pg_interfaces)
5641         self.pg_start()
5642         capture = self.pg0.get_capture(1)
5643         capture = capture[0]
5644         self.assertFalse(capture.haslayer(IPv6))
5645         self.assertEqual(capture[IP].src, self.nat_addr)
5646         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5647         self.assertNotEqual(capture[TCP].sport, 20001)
5648         self.assertEqual(capture[TCP].dport, 10001)
5649         self.check_ip_checksum(capture)
5650         self.check_tcp_checksum(capture)
5651         out_port = capture[TCP].sport
5652
5653         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5654              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5655              TCP(sport=10001, dport=out_port))
5656         self.pg0.add_stream(p)
5657         self.pg_enable_capture(self.pg_interfaces)
5658         self.pg_start()
5659         capture = self.pg1.get_capture(1)
5660         capture = capture[0]
5661         self.assertEqual(capture[IPv6].src, aftr_ip6)
5662         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5663         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5664         self.assertEqual(capture[IP].dst, '192.168.1.1')
5665         self.assertEqual(capture[TCP].sport, 10001)
5666         self.assertEqual(capture[TCP].dport, 20001)
5667         self.check_ip_checksum(capture)
5668         self.check_tcp_checksum(capture)
5669
5670         # ICMP
5671         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5672              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5673              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5674              ICMP(id=4000, type='echo-request'))
5675         self.pg1.add_stream(p)
5676         self.pg_enable_capture(self.pg_interfaces)
5677         self.pg_start()
5678         capture = self.pg0.get_capture(1)
5679         capture = capture[0]
5680         self.assertFalse(capture.haslayer(IPv6))
5681         self.assertEqual(capture[IP].src, self.nat_addr)
5682         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5683         self.assertNotEqual(capture[ICMP].id, 4000)
5684         self.check_ip_checksum(capture)
5685         self.check_icmp_checksum(capture)
5686         out_id = capture[ICMP].id
5687
5688         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5689              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5690              ICMP(id=out_id, type='echo-reply'))
5691         self.pg0.add_stream(p)
5692         self.pg_enable_capture(self.pg_interfaces)
5693         self.pg_start()
5694         capture = self.pg1.get_capture(1)
5695         capture = capture[0]
5696         self.assertEqual(capture[IPv6].src, aftr_ip6)
5697         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5698         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5699         self.assertEqual(capture[IP].dst, '192.168.1.1')
5700         self.assertEqual(capture[ICMP].id, 4000)
5701         self.check_ip_checksum(capture)
5702         self.check_icmp_checksum(capture)
5703
5704         # ping DS-Lite AFTR tunnel endpoint address
5705         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5706              IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
5707              ICMPv6EchoRequest())
5708         self.pg1.add_stream(p)
5709         self.pg_enable_capture(self.pg_interfaces)
5710         self.pg_start()
5711         capture = self.pg1.get_capture(1)
5712         self.assertEqual(1, len(capture))
5713         capture = capture[0]
5714         self.assertEqual(capture[IPv6].src, aftr_ip6)
5715         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5716         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5717
5718     def tearDown(self):
5719         super(TestDSlite, self).tearDown()
5720         if not self.vpp_dead:
5721             self.logger.info(self.vapi.cli("show dslite pool"))
5722             self.logger.info(
5723                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5724             self.logger.info(self.vapi.cli("show dslite sessions"))
5725
5726
5727 class TestDSliteCE(MethodHolder):
5728     """ DS-Lite CE Test Cases """
5729
5730     @classmethod
5731     def setUpConstants(cls):
5732         super(TestDSliteCE, cls).setUpConstants()
5733         cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
5734
5735     @classmethod
5736     def setUpClass(cls):
5737         super(TestDSliteCE, cls).setUpClass()
5738
5739         try:
5740             cls.create_pg_interfaces(range(2))
5741             cls.pg0.admin_up()
5742             cls.pg0.config_ip4()
5743             cls.pg0.resolve_arp()
5744             cls.pg1.admin_up()
5745             cls.pg1.config_ip6()
5746             cls.pg1.generate_remote_hosts(1)
5747             cls.pg1.configure_ipv6_neighbors()
5748
5749         except Exception:
5750             super(TestDSliteCE, cls).tearDownClass()
5751             raise
5752
5753     def test_dslite_ce(self):
5754         """ Test DS-Lite CE """
5755
5756         b4_ip4 = '192.0.0.2'
5757         b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
5758         b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
5759         b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
5760         self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
5761
5762         aftr_ip4 = '192.0.0.1'
5763         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5764         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5765         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5766         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5767
5768         self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
5769                                    dst_address_length=128,
5770                                    next_hop_address=self.pg1.remote_ip6n,
5771                                    next_hop_sw_if_index=self.pg1.sw_if_index,
5772                                    is_ipv6=1)
5773
5774         # UDP encapsulation
5775         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5776              IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
5777              UDP(sport=10000, dport=20000))
5778         self.pg0.add_stream(p)
5779         self.pg_enable_capture(self.pg_interfaces)
5780         self.pg_start()
5781         capture = self.pg1.get_capture(1)
5782         capture = capture[0]
5783         self.assertEqual(capture[IPv6].src, b4_ip6)
5784         self.assertEqual(capture[IPv6].dst, aftr_ip6)
5785         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5786         self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
5787         self.assertEqual(capture[UDP].sport, 10000)
5788         self.assertEqual(capture[UDP].dport, 20000)
5789         self.check_ip_checksum(capture)
5790
5791         # UDP decapsulation
5792         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5793              IPv6(dst=b4_ip6, src=aftr_ip6) /
5794              IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
5795              UDP(sport=20000, dport=10000))
5796         self.pg1.add_stream(p)
5797         self.pg_enable_capture(self.pg_interfaces)
5798         self.pg_start()
5799         capture = self.pg0.get_capture(1)
5800         capture = capture[0]
5801         self.assertFalse(capture.haslayer(IPv6))
5802         self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
5803         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5804         self.assertEqual(capture[UDP].sport, 20000)
5805         self.assertEqual(capture[UDP].dport, 10000)
5806         self.check_ip_checksum(capture)
5807
5808         # ping DS-Lite B4 tunnel endpoint address
5809         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5810              IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
5811              ICMPv6EchoRequest())
5812         self.pg1.add_stream(p)
5813         self.pg_enable_capture(self.pg_interfaces)
5814         self.pg_start()
5815         capture = self.pg1.get_capture(1)
5816         self.assertEqual(1, len(capture))
5817         capture = capture[0]
5818         self.assertEqual(capture[IPv6].src, b4_ip6)
5819         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5820         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5821
5822     def tearDown(self):
5823         super(TestDSliteCE, self).tearDown()
5824         if not self.vpp_dead:
5825             self.logger.info(
5826                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5827             self.logger.info(
5828                 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
5829
5830 if __name__ == '__main__':
5831     unittest.main(testRunner=VppTestRunner)